From 45b2dbab6afa093f3402ac919929f97ff28f5c9a Mon Sep 17 00:00:00 2001 From: meiyi Date: Tue, 12 Dec 2023 16:33:36 +0800 Subject: [PATCH] [improve](group commit) Group commit support max filter ratio when rows is less than value in config (#28139) --- be/src/common/config.cpp | 3 +- be/src/common/config.h | 3 +- be/src/runtime/group_commit_mgr.cpp | 6 +- be/src/runtime/group_commit_mgr.h | 2 - .../stream_load/stream_load_executor.cpp | 9 + be/src/vec/sink/group_commit_block_sink.cpp | 65 +++- be/src/vec/sink/group_commit_block_sink.h | 5 + .../doris/analysis/NativeInsertStmt.java | 2 +- .../doris/planner/GroupCommitBlockSink.java | 5 +- .../doris/planner/GroupCommitPlanner.java | 5 +- .../doris/planner/StreamLoadPlanner.java | 6 +- .../org/apache/doris/qe/StmtExecutor.java | 5 +- gensrc/thrift/DataSinks.thrift | 1 + ...ert_group_commit_into_max_filter_ratio.out | 34 ++ .../data/insert_p0/test_group_commit_10.csv | 4 + .../insert_p0/test_group_commit_11.csv.gz | Bin 0 -> 35223 bytes .../test_group_commit_http_stream.out | 3 - ..._group_commit_into_max_filter_ratio.groovy | 339 ++++++++++++++++++ ...sert_group_commit_with_prepare_stmt.groovy | 2 + .../test_group_commit_http_stream.groovy | 14 +- .../test_group_commit_stream_load.groovy | 2 +- 21 files changed, 477 insertions(+), 38 deletions(-) create mode 100644 regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out create mode 100644 regression-test/data/insert_p0/test_group_commit_10.csv create mode 100644 regression-test/data/insert_p0/test_group_commit_11.csv.gz create mode 100644 regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 549f6a0db0..c97959f2e4 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1100,10 +1100,11 @@ DEFINE_Int16(bitmap_serialize_version, "1"); DEFINE_String(group_commit_replay_wal_dir, "./wal"); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); -DEFINE_Bool(wait_internal_group_commit_finish, "false"); // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); +DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000"); +DEFINE_Bool(wait_internal_group_commit_finish, "false"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400"); diff --git a/be/src/common/config.h b/be/src/common/config.h index bad4724ac9..e3dbe3234c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1173,10 +1173,11 @@ DECLARE_Int16(bitmap_serialize_version); DECLARE_String(group_commit_replay_wal_dir); DECLARE_Int32(group_commit_replay_wal_retry_num); DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); -DECLARE_Bool(wait_internal_group_commit_finish); // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); +DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio); +DECLARE_Bool(wait_internal_group_commit_finish); // The configuration item is used to lower the priority of the scanner thread, // typically employed to ensure CPU scheduling for write operations. diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 224d446b66..b97d5de8a5 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -133,8 +133,7 @@ void LoadBlockQueue::cancel(const Status& st) { Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr block, std::shared_ptr& load_block_queue, - int be_exe_version) { + std::shared_ptr& load_block_queue, int be_exe_version) { DCHECK(table_id == _table_id); { std::unique_lock l(_lock); @@ -425,7 +424,6 @@ void GroupCommitMgr::stop() { Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr block, std::shared_ptr& load_block_queue, int be_exe_version) { std::shared_ptr group_commit_table; @@ -439,7 +437,7 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i group_commit_table = _table_map[table_id]; } return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id, - block, load_block_queue, be_exe_version); + load_block_queue, be_exe_version); } Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 90b0e7a040..44826cd946 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -100,7 +100,6 @@ public: _all_block_queues_bytes(all_block_queue_bytes) {}; Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr block, std::shared_ptr& load_block_queue, int be_exe_version); Status get_load_block_queue(const TUniqueId& instance_id, @@ -142,7 +141,6 @@ public: std::shared_ptr& load_block_queue); Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id, - std::shared_ptr block, std::shared_ptr& load_block_queue, int be_exe_version); diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 50bb240756..1fc8eb8120 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -101,6 +101,15 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrnumber_loaded_rows); } } else { + if (ctx->group_commit && status->is()) { + ctx->number_total_rows = state->num_rows_load_total(); + ctx->number_loaded_rows = state->num_rows_load_success(); + ctx->number_filtered_rows = state->num_rows_load_filtered(); + ctx->number_unselected_rows = state->num_rows_load_unselected(); + if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) { + ctx->error_url = to_load_error_http_path(state->get_error_log_file_path()); + } + } LOG(WARNING) << "fragment execute failed" << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) << ", err_msg=" << status->to_string() << ", " << ctx->brief(); diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 2d40f94a54..bb5c5c70d0 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -49,6 +49,7 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { _base_schema_version = table_sink.base_schema_version; _group_commit_mode = table_sink.group_commit_mode; _load_id = table_sink.load_id; + _max_filter_ratio = table_sink.max_filter_ratio; return Status::OK(); } @@ -84,18 +85,28 @@ Status GroupCommitBlockSink::open(RuntimeState* state) { } Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { + RETURN_IF_ERROR(DataSink::close(state, close_status)); + RETURN_IF_ERROR(close_status); + int64_t total_rows = state->num_rows_load_total(); + int64_t loaded_rows = state->num_rows_load_total(); + state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() + + state->num_rows_load_filtered()); + state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows - + loaded_rows); + if (!_is_block_appended) { + // if not meet the max_filter_ratio, we should return error status directly + int64_t num_selected_rows = + state->num_rows_load_total() - state->num_rows_load_unselected(); + if (num_selected_rows > 0 && + (double)state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) { + return Status::DataQualityError("too many filtered rows"); + } + RETURN_IF_ERROR(_add_blocks()); + } if (_load_block_queue) { _load_block_queue->remove_load_id(_load_id); } - RETURN_IF_ERROR(DataSink::close(state, close_status)); - RETURN_IF_ERROR(close_status); // wait to wal - int64_t total_rows = state->num_rows_load_total(); - int64_t loaded_rows = state->num_rows_load_total(); - state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows - - loaded_rows); - state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() + - state->num_rows_load_filtered()); auto st = Status::OK(); if (_load_block_queue && (_load_block_queue->wait_internal_group_commit_finish || _group_commit_mode == TGroupCommitMode::SYNC_MODE)) { @@ -148,6 +159,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, if (block->rows() == 0) { return Status::OK(); } + // the insert group commit tvf always accept nullable columns, so we should convert + // the non-nullable columns to nullable columns for (int i = 0; i < block->columns(); ++i) { if (block->get_by_position(i).type->is_nullable()) { continue; @@ -166,22 +179,42 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, } std::shared_ptr output_block = vectorized::Block::create_shared(); output_block->swap(cur_mutable_block->to_block()); + if (!_is_block_appended && state->num_rows_load_total() + state->num_rows_load_unselected() + + state->num_rows_load_filtered() <= + config::group_commit_memory_rows_for_max_filter_ratio) { + _blocks.emplace_back(output_block); + } else { + if (!_is_block_appended) { + RETURN_IF_ERROR(_add_blocks()); + } + RETURN_IF_ERROR(_load_block_queue->add_block( + output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); + } + return Status::OK(); +} + +Status GroupCommitBlockSink::_add_blocks() { + DCHECK(_is_block_appended == false); TUniqueId load_id; load_id.__set_hi(_load_id.hi); load_id.__set_lo(_load_id.lo); if (_load_block_queue == nullptr) { - if (state->exec_env()->wal_mgr()->is_running()) { - RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( - _db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue, - state->be_exec_version())); - state->set_import_label(_load_block_queue->label); - state->set_wal_id(_load_block_queue->txn_id); + if (_state->exec_env()->wal_mgr()->is_running()) { + RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( + _db_id, _table_id, _base_schema_version, load_id, _load_block_queue, + _state->be_exec_version())); + _state->set_import_label(_load_block_queue->label); + _state->set_wal_id(_load_block_queue->txn_id); } else { return Status::InternalError("be is stopping"); } } - RETURN_IF_ERROR(_load_block_queue->add_block( - output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); + for (auto it = _blocks.begin(); it != _blocks.end(); ++it) { + RETURN_IF_ERROR(_load_block_queue->add_block( + *it, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); + } + _is_block_appended = true; + _blocks.clear(); return Status::OK(); } diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index 4be5a5514c..2ae37be368 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -47,6 +47,7 @@ public: private: Status _add_block(RuntimeState* state, std::shared_ptr block); + Status _add_blocks(); vectorized::VExprContextSPtrs _output_vexpr_ctxs; @@ -65,6 +66,10 @@ private: TGroupCommitMode::type _group_commit_mode; UniqueId _load_id; std::shared_ptr _load_block_queue; + // used to calculate if meet the max filter ratio + std::vector> _blocks; + bool _is_block_appended = false; + double _max_filter_ratio = 0.0; }; } // namespace vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index dec20ab480..f523e3e594 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -970,7 +970,7 @@ public class NativeInsertStmt extends InsertStmt { if (isGroupCommitStreamLoadSql) { sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple, targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(), - ConnectContext.get().getSessionVariable().getGroupCommit()); + ConnectContext.get().getSessionVariable().getGroupCommit(), 0); } else { sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java index 14ecda1a5b..8030dcf4e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java @@ -29,11 +29,13 @@ import java.util.List; public class GroupCommitBlockSink extends OlapTableSink { private String groupCommit; + private double maxFilterRatio; public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, - boolean singleReplicaLoad, String groupCommit) { + boolean singleReplicaLoad, String groupCommit, double maxFilterRatio) { super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad); this.groupCommit = groupCommit; + this.maxFilterRatio = maxFilterRatio; } protected TDataSinkType getDataSinkType() { @@ -45,6 +47,7 @@ public class GroupCommitBlockSink extends OlapTableSink { TGroupCommitMode groupCommitMode = parseGroupCommit(groupCommit); Preconditions.checkNotNull(groupCommitMode, "Group commit is: " + groupCommit); tDataSink.olap_table_sink.setGroupCommitMode(groupCommitMode); + tDataSink.olap_table_sink.setMaxFilterRatio(maxFilterRatio); return tDataSink; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index b3557bd563..bc99d771d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -98,11 +98,12 @@ public class GroupCommitPlanner { } streamLoadPutRequest .setDb(db.getFullName()) - .setMaxFilterRatio(1) + .setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict ? 0 : 1) .setTbl(table.getName()) .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId) - .setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit); + .setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit) + .setStrictMode(ConnectContext.get().getSessionVariable().enableInsertStrict); StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); // Will using load id as query id in fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 65fe216369..bf3261d394 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -261,7 +261,8 @@ public class StreamLoadPlanner { OlapTableSink olapTableSink; if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) { olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit()); + Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(), + taskInfo.getMaxFilterRatio()); } else { olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); } @@ -481,7 +482,8 @@ public class StreamLoadPlanner { OlapTableSink olapTableSink; if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) { olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit()); + Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(), + taskInfo.getMaxFilterRatio()); } else { olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 463b54771b..0aabb8d7f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -176,6 +176,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; +import com.google.protobuf.ProtocolStringList; import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -1891,7 +1892,9 @@ public class StmtExecutor { List rows = groupCommitPlanner.getRows(nativeInsertStmt); PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows); TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); - if (code == TStatusCode.DATA_QUALITY_ERROR) { + ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList(); + if (code == TStatusCode.DATA_QUALITY_ERROR && !errorMsgsList.isEmpty() && errorMsgsList.get(0) + .contains("schema version not match")) { LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, " + "schema version: {}, retry: {}", insertStmt.getOrigStmt().originStmt, groupCommitPlanner.getBackend().getId(), diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 40c0e760e8..daf3bcc06a 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -266,6 +266,7 @@ struct TOlapTableSink { // used by GroupCommitBlockSink 21: optional i64 base_schema_version 22: optional TGroupCommitMode group_commit_mode + 23: optional double max_filter_ratio } struct TDataSink { diff --git a/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out b/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out new file mode 100644 index 0000000000..62743feeb6 --- /dev/null +++ b/regression-test/data/insert_p0/insert_group_commit_into_max_filter_ratio.out @@ -0,0 +1,34 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 a 10 +2 \N -1 +3 a 10 +9 a \N + +-- !sql -- +1 a 10 +2 \N -1 +3 a 10 +6 a \N +7 a \N +9 a \N + +-- !sql -- +1 a 21 +1 a 21 +2 b 22 +2 b 22 +3 c 23 +3 c 23 +4 d \N + +-- !sql -- +1 a 21 +1 a 21 +2 b 22 +2 b 22 +3 c 23 +3 c 23 +4 d \N +4 d \N + diff --git a/regression-test/data/insert_p0/test_group_commit_10.csv b/regression-test/data/insert_p0/test_group_commit_10.csv new file mode 100644 index 0000000000..ef677f0693 --- /dev/null +++ b/regression-test/data/insert_p0/test_group_commit_10.csv @@ -0,0 +1,4 @@ +1,a,21 +2,b,22 +3,c,23 +4,d,a \ No newline at end of file diff --git a/regression-test/data/insert_p0/test_group_commit_11.csv.gz b/regression-test/data/insert_p0/test_group_commit_11.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..59c57ffdbf01b5fd032fb89f57ba92a869169b9d GIT binary patch literal 35223 zcmb2|=HS>np)i$!xg@o?BtE?;zqBAeIX^cyvn1ZoP%pW-j6vtFchh5!2`U$B=ctOS zFn%bWv+KX=sd=1c0XIuB&X?>=t7+*wzW@Kfzu&*ldH(sm>G|vT4bMM*Kj-<+^0S|R zRiFR-_xsG}&)-|0e}6xJ-v0g15C6aM`2EiRkI(O4uK%ItuWfDT^N-)ZxXyci|MK+8 zxO)Hi7eD`1{|Y*P{{Ch0>bn2)_TBmX{rjh{asB(Jum3*lf9<#Wh1c(&zFsX`uOI)u z^xxi^z1P2jL`Tz6Z{$D-^VuJbi z>zA)Dulwg0|NhUvx%=+@{Jpzo@9U4{KdY{Pe*fv~y6g9!zK%T(B0kr*{$~gf^W)$D{Zak9>izTkx35?4|95ZSz29K|=l5@4?|uG%-S7Gv-$7FK zV5#5VzkR*8e}Dh_@0EXQYxbW1{QmLl+PHf8`1i$N{?G3pzuH~DfBgFIGxeIk=bw5G zlHUiC|NQ&+ud4gU?_agqJ}`e?eSG};>c6$Md*Ah0s-mxK8~zkmK(cm4kP>%aH@uiJO;{bvyW zbNuIj&)@8S{;zr7|3&h@=YL85H@ogl{gwWo>wj(hA8G$=|Lf!b+W*e@ze)c0{BOzs zZr8o3ztjJ7{qK$cEA5}{e}DWR`@b3g56SCy?(x$%-xT;Ams<{E&sp(YZY7wp>bcx%Fk{Vg zxwVz&B=7fG#-6v0`z&*-?=X`IO*IrE%v8$N%tlRX#2NPoMt9n5(Cb|-rtSmfQ# zb`YcR*37mvt8XvP7H3#L`(pe32bjTP_nqTh@wQCsXCijrCC-85RP4TMfEaq&*XH;h zmpcKr{^WDHQ((sF=W=JjjI+<>&Vd={pUYhUF>a@s&APntcnQc-^{+eKOXqD~dE0RL z*PX{p!9w45x|e|%%fIbB4q|-Vc*gg0$(x6BpI<0D^RxCH<2(>UvhJNCm|&JoOr ztb3OTV${5?|IgraT<&G%x&5rS4VVAgdAtf_p8D^d?$sd1^4~j;gBj|7cDmQhbH4rY z^Ln5EXMU6~tIv2AKg;&{XPH~|InUh#e7)`S&vWVzZ+rYQe)B)e*?nMreGqY(TbK;K|8d{=m(?5oRxkd4_Kp16moMh~ z{?Gfyzr5b^_wvj8=l*^CGXDI(pI_dm|8MvuzNOyjIn)K#P^YY@pS3(0Wc{{es5|g6 z9)EdXf1knU_`J`cK%evZIx^$jcb?mZ&;M+F4)Q*nQTS7DalY{~66|(cIQF5&V%yV9Qo>V&n>se zE`NXe_pusqIAmBqgNYWO>j$O7_fNMcL%oG?;f|W`_qYGY?=TcA^6itqI3Fv4cnj?A z^Tl;+H~S!ggDo^Zew^%Me+rSxz`=4*V0Z|eB%o1n-aqwk z#Ixs{>I`PjKLbhz8TA&ki|fGg`me$4`JX%BcvZ|j_j4yWUYl>t`CNSdr_J;9eQzs% zy<9K$tjOZ`#kigvT+O^s_nhLD6BqI`25dlJ;$n zKTOWCKZQszAgkL!VF0n~;qxbT0pBJU*=zld+$lfp@0L6Lr+SDdbEo{Q zzgO<`pRLP#$DVF)`F-(cn7PSN|G-^ar!ya(Z=S=lGn#FN(BODm@%!d}`G1H41LT-` zLb~dX8pwjYw?(%2+<91mhD+1Ouhl(u6AeKcewJeoCQNg_z82cA|59A7f8pP$C+B>I z6sOnKiqCDn{aV|vN z{y%+d;wSyojoh#Hxq4Gw;4|~ozpKu?KlLm1=D%%g;-~w%?>G1?y}2&+S$W#uOK0-W z{@QvI>Pt}4L9@Ez_uQEMQxJ6w$cO)8ad{kS&abQY33wiy%RpxF%fW3Yz% zXtvKNu0!)UG}N!GH~cHNIX=a%?OXWF;&b=G4%&ZiPVv8c;4C@y*__YcK?(Kkdei4~ z_JcE6{Z`B8fA)g%>EB%?@X`z-2de47im))^>eO%Rry|mGHLNB?Vf?=bc7rU`F;~{t z5z+z9sSq+W zw`}EpmG9A?>U`gQfAV+Ho%~b3Lx28TwB;#cK zn)(MEI~U5%Sk}LXCPbW!n!oq<$N%vM6`}w3;tJDe&^Unw_X>OcAHSXI6DoM#&X;&L zryiU&{+}^;{%12N!hSDrvwS`uTrAj|pRxQ8t|$JTOg#599TXYAj1g4_M6L{`2n%D+ zpS4Tury**N^PqGBs&7#k|7yW*u!cHjh5boFI>5CQ)Qp$!?fyPYhbJB& zsBlGP{H@Hyt^-^dqnJ^gVSVpk+g16Eh%zJn{Lka}kNtW)V}IY*=Z5w2ulZB{IbEB7 z;!p1y`$<2wBkR>Zmv8zPu&rM6wSVg6i}I)cY+YkN{VX&vv4j&m2y6b@uCqVk3yx9L zPyyAR=Q5gSf!EOM>0*JA~ekvgy;24FP z@$&t~zm8>y_yHLK_Wt)d=Rdy(`w0}IsEofi%dzW##3-^E;27O@M}9h@*Z_sZ|9!Ax zWj}VtmH8W>Iu74I`up&X{XO587uHL^?|=Ny>fZe0e{1*HPx#GUS+Dfn{L?>=ck7@0 zomunW@1Fdr<%{j7o`(iJmM}eoCKXWM_x<~ie+vI4ZrPtaZ}}fwF8_-=C>0(_Y>v{`dHZ%Fzu; zmy#+BI}OjwxFV6~Q^aGfX4@t|S>f;m&xsFDsGMA*I$3S=yfeC|3{vw#(_&{vPm7Fx zb}jYV^qs9UP1EvH)8@{Mo)ww>>{{Bj**lNUG)>PdZL|Koqj&RjDXZ;^7MnLdJGh{7 z$psq=lYW(1?A^~Mx_y3d(MBj!&S#dp*4bw7&mPMvH)P7aG~u^9tGw{p!4;KDuGqxA zX1iN7;og>6?CYORjQE^z%_b;Y?#?XtqO;AhpFP%Be#n;NG~;hOt92;NSOCena%$E*~F6133qKS<;m@t z?f&R&bLD4`y_E&|awpCCznxY7`>b#se<^ohRd9iv%p7;2bIq-vJq}lXD3I&4;8#1R zEc*Q5(MpxaHa9H#OXjf4Kc6_^bHWpwpdz_BbKHZ@J&qE79k%g{+B2IOmi<@eu$w=h zIOlW1bDK-Wa)0KyH=S!<_}SxPWkHFYrxpLSbIQKY4_>ZRd1W)hsy}4z@oL!((M*?T zulxMqwT)1jT+dwhQ|FpDfA+XlxuHxh)0+R@IpxCV2k%rad1tf3y8p>s_Uh*o_kDiw z-bScGZvNKud4GNKh0Zr0{p|6$@!A7xkd3al6Ehv!CSM}gm<93A|Z4cIRtW&P=Zp!AoH|s%F({_y=%O0%dTBlucx+$Cc-mV9~nzkG4 zIQC#I&pP7@@8)dYd$%4`HE*}r@$A7`zIE0Wr<=3+@BMo4t9iS_4z`DD1=cxNc(-H= z-jjM*)w11VhuXunLhHOMPPb$W-?Mu7t7Ut@4!4JEMb-sZc(-PY-ivxz)w(@mN7}=+ zV(X$SPPb-@->Z7~t95(Aj<$zuCDtWZc(-Ls-kbHXs%?A5j%5$mO0CPTINg>leQ(#p zUv1k9b{u=SR%Tssg?D?l?7dqLtJ=3$?0EKYt=zimiqq}c^7no{{MEj_VF%lzwF>K+ zE4(|h74Jzss_NL@u|w_ATBUW}6{kD0mG46R=pSXsH$`O zj2&r@)~c@lKsb-3p5J2*6;NS60zvL9;d3sHRn|J`bna!e$Tgtb9OT= z|8i^f64%E3(unuD-)^tI;@bFnYxv7o7UC92UTxFcP)Mp3o=^{Y_=^|P4}&;+uH9Q8~+4LGc7sr_10`>?dc8G##0w;yIRzvgqzN`Cn5%~SXVqz`)kXcowGZV2a`&h&jFL-o`W z%>sGe4dI>BnZAEysGeG*`Qfi2%igzFH_mt6W#5~!ev$#mGSwumnDu+Tj*0AutDZVR zvtaM~zkd$@W#riVR#gA<%o$+MS_XrB)3ofs*LUf9@AzZiEZ2M2AN%IH-aG!dFDJWy zFBY$Q&maG0x&FKU_&3k>-|;8BQ8#$kpYSH#;2nSBo8<=Y`V()gKH>}V=*ihk*Q?*9 z+r8tjeUZD~V$OLLKagdxJL#3kkAFKl?|tjK`P1hu*m?IfYwVWBRZo2c zcGX$1t5z*P@SQ#O%eL^E^XZ3{gMw*^KiI(eU<2*J1~zcj)E{xnuRh9DyR4CG{@#_> zmJE@Ao}s9ZhOMeD=giE4Y_icbDA z(+h0RT(H?*D?w&&33_N8{PouAD?yF#`Ga5PuK%(0JWDXhKF?(!`-FVK9+(YwtOeMy zC1687f(`k!^1#=3&yT(1?|Z9${9S+F+w|k_`1{{3KmM-2|Lf0%MQEe{J>kIMvNyBQn57Tmc)g zF%%T2MXL{dt@~cw1WNni4}+V(-db%E4l-ipI~Wdu_}Z_CWhv<DzLHdZw6#|4Fxbitu$3a(pg1)Q z16#QUor?+$Gv2G!Kn3nz2<=p zasV6D2{uS+Ey$phsK(c}-#4%LdTaHRD3HrDw}Oq!ZuQ@EZTI}Tz4rfh&9C2U&#-%b z!(Mx)-SZpw+OynVp78qa`Gfc5TkiTFx+mXy_xz!I@@;qh58sn--x|)j9c03!JdlG_ z?{USvuePs`Ucc9CKiCOP+BII=p;G_65IIt@7v{bU;lmkeD3z? z2Y>rK6#XIk`$}>3(Z{uFpSWs{+Wo#0x3O~m%RNl>$@3fX)?4g7za$Q9=+}ziLCc=01?frJC0BMW7P47UYD`de>|z!`%UX__=Bqcza^D> zziDwsfPAI20pz5V1d!>R2SH{{YGPgY{m%KVAmf_B#<6pQN-RM!JJI;{d%c9kK;F~R zDcF0xZqdHD>Zt}gpoq_mcu@8I^Y8EMKfVk9`R@MXyYZjz?LWQ;|M`CW$M@nt-`RhD zpZw>$`_JDx^?QG3{`>v-&+p2AznTC3?)>*#`R_NanPBHQZ3H=INfO98okzjW=?1w$ zRRZh=Be2~WU<+?XJgC}VUGKQ}o7U5a9cLHBRZlGgOMC^J^b2f~ND3&d&vJhK_w8NF z+f*g{;EAktNAm(Q*8ezhz9pq`{S8~Lx1!oVXHL`kU|u%csrsmIotiq>t^lxIOTb13 zMuNg>(oS(t<_snLGE068syGP(?RZBYQVic;7a!TOEXy4eZMhV>($oq zlV?EELbE{9s-|G0bHGO5(J9#b{NBXC^?SVv!2-{~c3q9!k+?Xndg?nc?`huK&JQ?f7bI_{$3*yMh+A-gbUZ{(2_I#J^`6hG`*w~azkg=SX zL1s-_#=7qN?(&6KveySKZ=D?ovIHyOuG$vuWSD!vFfL5&O|~_vEFxjjQLsyvkIcY`%F*xXkPHNn1f)aLESQb@Cd>E}?ax zbj`dwpJj7+=Ji9e#-MTTP@w||8ieIW`4-7^&z*cr@r%^`Ymkf z@2I8kLRXdNhwWM)wkh260ocxiAfr{A!A5h4gR-88Ucuh|doO2J&wP0PN~xdi7Pxh|KuaEImf`}^n%Ti0h?o^2MV0Vm! z$o`~V2fpt6zHd|2+t8|AAg73wJ`CGj^)_@Cm={tCviapJrt80N&c5_&Yq;lYup_R6 z>{MM3HZ%cj=n=hwz4!MnUK>|E^_1S78)5J7v6aWOZ~uPquK1z1y#?Q=7F4U>snxu5 zkE=YMd;9l8cf}9CJzMboYC*O6om$J7?T4@4{##JRT6gSS$8P=3+tH8SWk32hZ^!RN zJN9b6+ozTHzH7IB_wCz{-o1YGZQqXHhj#2We7Dc&+}y&SZ<(&YzPIB~S>x-{@|e2a z2fpsI*V`Zapz5yT^!s`Rd#m5Ss{Oob_LEPyS=U|9Edncd+X+^_8lpT>Z-4BL%=K~G zPe0ubPTwD|zV3slI1f>g4KdIhY~Z^&r}M>k+$(<1xaRuVVz7DNAgY;T>vk7(C+y8z z2vQMeei3ZmR&#-CukCh$ZB&m1J8(M0JYR@;%R#E|eZN;5u|E10*qOeiKi?kW?fklH zEkt1iSYh5;u)_2Ac15h;{Ti(BZzt(;(7IJuCFZv zD@=nZ+znCq5UjAi`f|$N{d=>Es(-#+|EH`mextPdZSA_>+WTW4XqUZDsXm%t|8zSj ziQCf7hMadhb{3eP4I__omZ(-+tQnuIT;lJ^Fj@NbA4X z*8hI)^zTcj_kR7f@7tv~a9Cu&zrN-=n7MxMseEw40macbt^Kh(wr-BAo(kd}zxO5* zLmHG=L1A}mHOQ;B-~BFDy$wYOYz_bU_E7JVuMlYvkLh~#+uUU+0@>@qfre!Nz3=xH zMOZ_2UGYA<;=A0v-%9uPc7EU2RsR0bUj4^+Z&$p3UGd%S-fyRS zdnbP1H>v#nlfC*+??zX?&#wF)ckg%7y}dKP@0(Tr{@Gsr=XY;czJFc$z3$%crh9uA ze&4sK{QZl)`Y-QBSG~`!+P(byz7^&9ulBBg^>*F8s_)zG{oZwN@A~iiHk9YT*}MMD zUF+)i-qqi)-TQs(-rnus_w6Xpf46u2yRG5hpYE-$eqUSr{@(rS_y@bqf7}cI@lJN% zch!B>-S2CA-`{&&9sgvv`OkadKi|df`<}Y5diML;x$o~iua1AQ+x*wP@L%ud?)$!U zU-jzuwQJwsdtDv>X1Dq8d*Q#|-QD;7>AvdS?`!wIzxTd6{=;tbKlj4_yp!GkU3Gu; z>G!o~-{1RO9sgyw`QLltf8WLK|DL+P`u6+UyYKIPua5t*+x*|X@PF^-?*G1YfA#D4 zwQt|w`&}LXXSezPd*T1z-QEBF>Hg~9?`!|QzxTg7o?(wU!~JlE_p%4Rs~)K4{!q*N z;U05MJj)()#{1!n?_&>qPd!jA{h?O&!#(zzc#b{hO!vc?-p@Voed&Q}?GLrOAMSD2 z#PjSiXTBfK{QmBN?@td@TYspv{cw-JCSG8VIm`WUmiMv;zpEas_Wn@o`{AB&O}xk+ zbJqLetnXtFeosAE9sQv;_QO5#ns|vl=4|)F+1}4R_g*4-xgYLH*Tl>0F=xLY z&i?-H!S7EGR#$(ht^IIMz9wE_k2%NvaE|w~hrX*Gs_ywuJ52FN zJeb$)t2fsW&Zf7ngUoN0eupocE{lJk|Nip*I7lfN{wD12x32sDPUVaL=(@Z1e%!~~ z@ju^!8dq<&hyVE&j#>I*NEi06X5)=2Q7wk7k zyg#F0zft1-nFaez67SC{*l)J*yV{lA`F=a%=WclKw!r~(C?2!wC10N&@otI ztp0aeUse6@yuP-*5i?W(=uR&_f;6t^rkvNS;M?;q!gXpwSrA82i9Bz3Tf~{qNcF-}dT%?~ea= zR{#6z_-}Xhzn_k;`t7~<_rK}CSq*-(&G;>9@SAVOZ&iceVl#f58erO9`@TB;6(rC> zPH5eJ89CgcPPun}-^XRpK*ux!qWkyV^0#2Et>*8+G&Jgv~w|=D#^>|5g-S&frcZAP2Lq`@Z$P*(xll1LV+(91W4 zV`3&Zs6ltWTV91Qk&L^?77B?*s1L61^IQdGf^2wD^?h&X_wLo-&W3(pz53hT(C?>L ze|sDH{q^c^e?z}>uldFs_Fa2TReR{(-(stOn}+?CTm9QL?6=zL-=Sd;{kWWi5z<-+dwGxpv$f>=mEFG_>x-R_4-X%3oD@~5pvHsfCQ zeXnqokjD(J+V{UdAQ{xU{n^#s`S*9n@8ACZ{_gk#+w&jnjz75l{e#`{hqmWG+#P>- z`}>Exe6J^&UY8jV&$yeS4ez_20L@ z*|$%J6wRX5+=%Gw4c(q;m!h7S@ zZ~b<8eb>C}-g*(_RE(S;px)SDJN+f1@G!}RhUoem)xOYH$AkPgZ-%&;BaDc=jA`8O zd*8nx8V~yx&^0pQrME zTg`sH%KPm#`vofRchu|`s=VJ>vtMNAcBSjLt0#Wt`qQ&l`e&Wxr+uuf*mlmA)g z_-UWw&pOvn`;>pyd4Afb`m@gW(?0c|b%CGuY5uGW-Bhmnd^>l{`tJ~`Tn)m5Q03|n z9+-NceV;XU{dNtokhvz9+76-2wVrS1&Y!XFdi2!Kw?Tty+XJq@zIzIkO<(_oP_n1N zJX;7A3#O{y{a*k2>#?Vx!tm=g2=(mgpR$KKd4uUwgY^@4cH~ z_PyH>>Mn%351}4{sr$9(EBD@eGUvYDp09i2@{7-bx}W(a5ULbHm4T>x_xC04y?1|K za&7g`x9)Yj53Ij2`x+>TPrR@9LA3gQRrS$)`%kx7{~f!x>t3Ag_q*@DpT1kYyL#{M zy6?YP-~Vp={#*3@@4oN9Rp0-f_Wif%```1v|8{-ssqgGFtNgA^_f#zk1%>%6APtW`xqWeDx z%Kzx`uQ@7ScRc@3r~MD<`j7h8ZpVGR-49Akh_FEM_+d~fa{?5B*c*3-cZRgJ`rgq&SEO8Zep+2?i12WcUnkrTlx%_m$rx-v__nd~f-^^u7Lj z&sWW_n*Y`AtKI9`SGCuEUx6!FeQ)}`@V)wb*H^_?#ed!RW#6mXm$g@aU;4f3`_k{B z@4er{6u9qgzqk0F^ZUT}3%{@UexdqG^$XiqwqNAF%2oAO@$ZF6N4{VAea-hP)z_+D z*}k^@I`8Yes`FL;d(-zWzqk2b@Ozl_)#{gfU+w)8_cg9+eU<*+@V(RTt-j~|KJ@+4 z@2kFFs)kDMdtLjw_WJK@zt??V`#tu3^!wfSw%;q?o4@yc)%~jbU-x~5D_Hq`@%P~O z?(di1>%Zr}SAOsOD*Gz?uXSJQUj2RX_sZ{!zhMft-&=n#e{cNW`&Ij^_J6JWTKD?z ztH0NNU;VxI`|9t}@5A526zs36ud4t0@5{eee_#H+`up`5!yi|2XviN3Z=y_WH;A|BhCH`b0B99Ww0QiKHESA8nrx z>MY{uhqMiU10KCUlP~^5(*C1%-9u%&50dkLbm{-;yZ=MD{)73yNAYX6j|LIJWb*Ia zpR&f$WJpO)8cl|J6c;w5$&g@9B^1V^$#66ozNI)7QEb3yG8|2YgCZG%)|23F8xq&a z0ri52>g0fWLCC!sJe?in-VC134pMIhPiKd?P7bISL{ujS)C(f2lLP7n5z)y3^@6~& z4EB%J>yGNz9AUS6EFAx__5Mfc`bYf#j>Z4!{<sAmcK=89e-HBi9Niz0KN>^?lgXdh zpKqB)lOaVpX*3y9T-b~zLxMS#U>J`kLxMp>sL~uwhNHnIr8U9zjwX#vg zk>QAg-L;(huRq@32%Qtq)v0>v;XkkY|E_MAo^w4O8#(*%->LiluFf~NSigT2HnRP< zcJ1%*`*{U%^`Y2EzVG3`zOCPP_racjtFV!O+-i3p46k`;z3-#2{g2-GKS$-|J{~Zh zS8fh6Tf3*7P-iq~m#NSW-{%G%~xIN+bx?z&m@6T30-&^^9kNNw# z-=FRMd~Z+rz1iQ-Re!!$nST!|Ie!oT`^n!Q)_$_v?-B&slPw``$=sD zO!D)-J^A;he?R;C)4t01d*<&?|9)0md4G@o``O>0)ey#ph*1Z15q4Ju?>UAHa z_y6d&w|UfyuRY0 z?B{RczrF=xD<&Yt2e_~p!SZ#B_uYMYk z-gd9ueX#OxC^pja&d8Sk**jX`kzJsV7I@?r=%WQ5`33rDfrqUqhZOOI3iQze4_n(6 zQUnrg!J5Cu)51gQT;S|^AayQq_B@a~7dU$!NSzCuJrAVL1+IKq6s;*qHS zp5%M~yzbk*zRi2?_4-#>$g>YexP1P#?b4$K9-gs5JO%n_fk%FUK3d?BU!ac`coY=q zqXi!ME!h2`cv^U9(;C=&9;nkA*n1vm(;8TN9%$1VSbH95(;8TN9{6TCAQK>D8e?U3YlvK z&#cytVEOy22uu`Voli!ZC>p`?b*l+X6pi2l4Loj&$N!@R9t8#ZXn{vyq6klcK3d=r zm?*+ippO=K~`{^RE*aH(f`dn>~W% z_s8NHMgUC|?Itj;ID+N>lo6OHdQV_naRkeMzD;1FXe1YCpouVi{vR#y$S=@G3q0}* i^w9#3`~rQnz@wlA`~H`9!T4r*$N&I#^30zA literal 0 HcmV?d00001 diff --git a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out index abe3210dd8..57c2525815 100644 --- a/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out +++ b/regression-test/data/load_p0/http_stream/test_group_commit_http_stream.out @@ -19,9 +19,6 @@ 6 f 60 7 e 70 8 f 80 -10 a 10 -11 a 11 -12 a \N -- !sql -- 2402288 diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy new file mode 100644 index 0000000000..02f3ca1e7a --- /dev/null +++ b/regression-test/suites/insert_p0/insert_group_commit_into_max_filter_ratio.groovy @@ -0,0 +1,339 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.mysql.cj.jdbc.StatementImpl + +suite("insert_group_commit_into_max_filter_ratio") { + def dbName = "regression_test_insert_p0" + def tableName = "insert_group_commit_into_max_filter_ratio" + def dbTableName = dbName + "." + tableName + + def get_row_count = { expectedRowCount -> + def rowCount = sql "select count(*) from ${dbTableName}" + logger.info("rowCount: " + rowCount + ", expecedRowCount: " + expectedRowCount) + assertEquals(expectedRowCount, rowCount[0][0]) + } + + def get_row_count_with_retry = { expectedRowCount -> + def retry = 0 + while (retry < 30) { + sleep(2000) + def rowCount = sql "select count(*) from ${dbTableName}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + } + + def off_mode_group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'VISIBLE'")) + assertFalse(serverInfo.contains("'label':'group_commit_")) + } + + def fail_group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + try { + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'ABORTED'")) + // assertFalse(serverInfo.contains("'label':'group_commit_")) + } catch (Exception e) { + logger.info("exception: " + e) + } + } + + def check_stream_load_result = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def check_stream_load_result_with_exception = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + // assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + assertTrue(json.Message.contains("too many filtered rows")) + } + + def check_off_mode_stream_load_result = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertFalse(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + // create table + sql """ drop table if exists ${tableName}; """ + sql """ + CREATE TABLE ${tableName} ( + `id` int(11) NOT NULL, + `type` varchar(1) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "1000" + ); + """ + + // insert + // legacy, nereids + // if enable strict mode + // 100 rows(success, fail), 10000 rows(success, fail), 15000 rows(success, fail) + // async mode, sync mode, off mode + for (item in ["legacy", "nereids"]) { + sql """ truncate table ${tableName} """ + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + // sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + sql """ set group_commit = sync_mode; """ + group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10); """, 1 + sql """ set group_commit = async_mode; """ + group_commit_insert """ insert into ${dbTableName}(id) select 2; """, 1 + sql """ set group_commit = off_mode; """ + off_mode_group_commit_insert """ insert into ${dbTableName} values (3, 'a', 10); """, 1 + sql """ set group_commit = async_mode; """ + fail_group_commit_insert """ insert into ${dbTableName} values (4, 'abc', 10); """, 0 + sql """ set enable_insert_strict = false; """ + group_commit_insert """ insert into ${dbTableName} values (5, 'abc', 10); """, 0 + + // The row 6 and 7 is different between legacy and nereids + try { + sql """ set group_commit = off_mode; """ + sql """ set enable_insert_strict = true; """ + sql """ insert into ${dbTableName} values (6, 'a', 'a'); """ + } catch (Exception e) { + logger.info("exception: " + e) + assertTrue(e.toString().contains("Invalid number format")) + } + + try { + sql """ set group_commit = off_mode; """ + sql """ set enable_insert_strict = false; """ + sql """ insert into ${dbTableName} values (7, 'a', 'a'); """ + } catch (Exception e) { + logger.info("exception: " + e) + assertTrue(e.toString().contains("Invalid number format")) + } + + // TODO should throw exception? + sql """ set group_commit = async_mode; """ + sql """ set enable_insert_strict = true; """ + fail_group_commit_insert """ insert into ${dbTableName} values (8, 'a', 'a'); """, 0 + + sql """ set group_commit = async_mode; """ + sql """ set enable_insert_strict = false; """ + group_commit_insert """ insert into ${dbTableName} values (9, 'a', 'a'); """, 0 + } + get_row_count_with_retry(4 + item == "nereids" ? 2 : 0) + order_qt_sql """ select * from ${dbTableName} """ + } + sql """ truncate table ${tableName} """ + + // 2. stream load(async or sync mode, strict mode, max_filter_ratio, 10000 rows) + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + file "test_group_commit_10.csv" + unset 'label' + set 'group_commit', 'async_mode' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result(exception, result, 4, 4, 0, 0) + } + } + get_row_count_with_retry(4) + + // sync_mode, strict_mode = true, max_filter_ratio = 0 + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + file "test_group_commit_10.csv" + unset 'label' + set 'group_commit', 'sync_mode' + set 'strict_mode', 'true' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result_with_exception(exception, result, 4, 3, 1, 0) + } + } + get_row_count(4) + + // sync_mode, strict_mode = true, max_filter_ratio = 0.3 + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + file "test_group_commit_10.csv" + unset 'label' + set 'group_commit', 'sync_mode' + set 'strict_mode', 'true' + set 'max_filter_ratio', '0.3' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result(exception, result, 4, 3, 1, 0) + } + } + get_row_count(7) + + order_qt_sql """ select * from ${tableName} """ + + // 10001 rows + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + file "test_group_commit_11.csv.gz" + unset 'label' + set 'compress_type', 'gz' + set 'group_commit', 'sync_mode' + set 'strict_mode', 'true' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result(exception, result, 10001, 10000, 1, 0) + } + } + get_row_count(10007) + sql """ truncate table ${tableName} """ + + // 3. http stream(async or sync mode, strict mode, max_filter_ratio, 10000 rows) + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${dbTableName} select * from http_stream + ("format"="csv", "column_separator"=",") + """ + set 'group_commit', 'sync_mode' + file "test_group_commit_10.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_stream_load_result(exception, result, 4, 4, 0, 0) + } + } + get_row_count_with_retry(4) + + // not use group commit + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${dbTableName} select * from http_stream + ("format"="csv", "column_separator"=",") + """ + file "test_group_commit_10.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + check_off_mode_stream_load_result(exception, result, 4, 4, 0, 0) + } + } + get_row_count(8) + + order_qt_sql """ select * from ${tableName} """ +} diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy index 144cc30c09..72d5222981 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -100,6 +100,7 @@ suite("insert_group_commit_with_prepare_stmt") { """ sql """ set group_commit = async_mode; """ + sql """ set enable_insert_strict = false; """ // 1. insert into def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, ?, ?) """ @@ -159,6 +160,7 @@ suite("insert_group_commit_with_prepare_stmt") { """ sql """ set group_commit = async_mode; """ + sql """ set enable_insert_strict = false; """ // 1. insert into def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, ?, ?) """ diff --git a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy index de2581e023..6909a919c6 100644 --- a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy @@ -212,14 +212,22 @@ suite("test_group_commit_http_stream") { set 'group_commit', 'async_mode' file "test_stream_load3.csv" - set 'max_filter_ratio', '0.7' + // TODO max_filter_ratio is not supported http_stream + // set 'max_filter_ratio', '0.7' unset 'label' time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> // TODO different with stream load: 6, 2, 3, 1 - checkStreamLoadResult(exception, result, 6, 4, 2, 0) + // checkStreamLoadResult(exception, result, 5, 4, 1, 0) + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("too many filtered rows")) } } @@ -246,7 +254,7 @@ suite("test_group_commit_http_stream") { } } - getRowCount(22) + getRowCount(19) qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " } finally { // try_sql("DROP TABLE ${tableName}") diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index 6ae9fb13b4..b60b6dc555 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -194,7 +194,7 @@ suite("test_group_commit_stream_load") { time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 6, 2, 3, 1) + checkStreamLoadResult(exception, result, 6, 3, 2, 1) } }