diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 1026433bed..a151519471 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -168,8 +168,18 @@ int HttpStreamAction::on_header(HttpRequest* req) { ctx->load_src_type = TLoadSourceType::RAW; Status st = Status::OK(); - if (iequal(req->header(HTTP_GROUP_COMMIT), "true") || - config::wait_internal_group_commit_finish) { + std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); + if (iequal(group_commit_mode, "off_mode")) { + group_commit_mode = ""; + } + if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && + !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { + st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); + if (iequal(group_commit_mode, "off_mode")) { + group_commit_mode = ""; + } + } + if (!group_commit_mode.empty() || config::wait_internal_group_commit_finish) { ctx->group_commit = load_size_smaller_than_wal_limit(req); if (!ctx->group_commit) { LOG(WARNING) << "The data size for this http load(" @@ -185,6 +195,7 @@ int HttpStreamAction::on_header(HttpRequest* req) { LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql : " << req->header(HTTP_SQL); + if (st.ok()) { st = _on_header(req, ctx); } @@ -310,7 +321,13 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req, request.__set_load_sql(http_req->header(HTTP_SQL)); request.__set_loadId(ctx->id.to_thrift()); request.__set_label(ctx->label); - request.__set_group_commit(ctx->group_commit); + if (ctx->group_commit) { + if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { + request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); + } else { + request.__set_group_commit_mode("sync_mode"); + } + } if (_exec_env->master_info()->__isset.backend_id) { request.__set_backend_id(_exec_env->master_info()->backend_id); } else { diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index cac484a3cf..ad476ee08e 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -188,9 +188,19 @@ int StreamLoadAction::on_header(HttpRequest* req) { url_decode(req->param(HTTP_TABLE_KEY), &ctx->table); ctx->label = req->header(HTTP_LABEL_KEY); Status st = Status::OK(); - if (iequal(req->header(HTTP_GROUP_COMMIT), "true") || - config::wait_internal_group_commit_finish) { - if (iequal(req->header(HTTP_GROUP_COMMIT), "true") && !ctx->label.empty()) { + std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); + if (iequal(group_commit_mode, "off_mode")) { + group_commit_mode = ""; + } + if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && + !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { + st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); + if (iequal(group_commit_mode, "off_mode")) { + group_commit_mode = ""; + } + } + if (!group_commit_mode.empty() || config::wait_internal_group_commit_finish) { + if (!group_commit_mode.empty() && !ctx->label.empty()) { st = Status::InternalError("label and group_commit can't be set at the same time"); } ctx->group_commit = load_size_smaller_than_wal_limit(req); @@ -609,7 +619,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, bool value = iequal(http_req->header(HTTP_MEMTABLE_ON_SINKNODE), "true"); request.__set_memtable_on_sink_node(value); } - request.__set_group_commit(ctx->group_commit); + if (ctx->group_commit) { + if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { + request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); + } else { + request.__set_group_commit_mode("sync_mode"); + } + } #ifndef BE_TEST // plan this load diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 2cc4926661..224d446b66 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -32,9 +32,9 @@ namespace doris { -Status LoadBlockQueue::add_block(std::shared_ptr block) { +Status LoadBlockQueue::add_block(std::shared_ptr block, bool write_wal) { std::unique_lock l(mutex); - RETURN_IF_ERROR(_status); + RETURN_IF_ERROR(status); while (_all_block_queues_bytes->load(std::memory_order_relaxed) > config::group_commit_max_queue_size) { _put_cond.wait_for( @@ -42,8 +42,9 @@ Status LoadBlockQueue::add_block(std::shared_ptr block) { } if (block->rows() > 0) { _block_queue.push_back(block); - //write wal - RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get())); + if (write_wal) { + RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get())); + } _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); _single_block_queue_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); } @@ -64,9 +65,9 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo need_commit = true; } } - while (_status.ok() && _block_queue.empty() && + while (status.ok() && _block_queue.empty() && (!need_commit || (need_commit && !_load_ids.empty()))) { - CHECK(*_single_block_queue_bytes == 0); + CHECK_EQ(_single_block_queue_bytes->load(), 0); auto left_milliseconds = _group_commit_interval_ms; if (!need_commit) { left_milliseconds = _group_commit_interval_ms - @@ -119,7 +120,7 @@ Status LoadBlockQueue::add_load_id(const UniqueId& load_id) { void LoadBlockQueue::cancel(const Status& st) { DCHECK(!st.ok()); std::unique_lock l(mutex); - _status = st; + status = st; while (!_block_queue.empty()) { { auto& future_block = _block_queue.front(); @@ -318,10 +319,11 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ if (prepare_failed || !status.ok()) { load_block_queue->cancel(status); } - if (load_block_queue->wait_internal_group_commit_finish) { + { std::unique_lock l2(load_block_queue->mutex); - load_block_queue->internal_group_commit_finish_cv.notify_all(); + load_block_queue->process_finish = true; } + load_block_queue->internal_group_commit_finish_cv.notify_all(); } _load_block_queues.erase(instance_id); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index be129d5457..90b0e7a040 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -49,7 +49,7 @@ public: _single_block_queue_bytes = std::make_shared(0); }; - Status add_block(std::shared_ptr block); + Status add_block(std::shared_ptr block, bool write_wal); Status get_block(vectorized::Block* block, bool* find_block, bool* eos); Status add_load_id(const UniqueId& load_id); void remove_load_id(const UniqueId& load_id); @@ -67,7 +67,9 @@ public: bool need_commit = false; bool wait_internal_group_commit_finish = false; std::mutex mutex; + bool process_finish = false; std::condition_variable internal_group_commit_finish_cv; + Status status = Status::OK(); private: std::chrono::steady_clock::time_point _start_time; @@ -78,7 +80,6 @@ private: std::set _load_ids; std::list> _block_queue; - Status _status = Status::OK(); // memory consumption of all tables' load block queues, used for back pressure. std::shared_ptr _all_block_queues_bytes; // memory consumption of one load block queue, used for correctness check. diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 0104235024..2d40f94a54 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -47,6 +47,7 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { _db_id = table_sink.db_id; _table_id = table_sink.table_id; _base_schema_version = table_sink.base_schema_version; + _group_commit_mode = table_sink.group_commit_mode; _load_id = table_sink.load_id; return Status::OK(); } @@ -95,11 +96,16 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { loaded_rows); state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() + state->num_rows_load_filtered()); - if (_load_block_queue && _load_block_queue->wait_internal_group_commit_finish) { + auto st = Status::OK(); + if (_load_block_queue && (_load_block_queue->wait_internal_group_commit_finish || + _group_commit_mode == TGroupCommitMode::SYNC_MODE)) { std::unique_lock l(_load_block_queue->mutex); - _load_block_queue->internal_group_commit_finish_cv.wait(l); + if (!_load_block_queue->process_finish) { + _load_block_queue->internal_group_commit_finish_cv.wait(l); + } + st = _load_block_queue->status; } - return Status::OK(); + return st; } Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { @@ -174,7 +180,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, return Status::InternalError("be is stopping"); } } - RETURN_IF_ERROR(_load_block_queue->add_block(output_block)); + RETURN_IF_ERROR(_load_block_queue->add_block( + output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); return Status::OK(); } diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index 02737a6c8e..4be5a5514c 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -62,6 +62,7 @@ private: int64_t _db_id; int64_t _table_id; int64_t _base_schema_version = 0; + TGroupCommitMode::type _group_commit_mode; UniqueId _load_id; std::shared_ptr _load_block_queue; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 6bd8187e8e..dec20ab480 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -969,7 +969,8 @@ public class NativeInsertStmt extends InsertStmt { OlapTableSink sink; if (isGroupCommitStreamLoadSql) { sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple, - targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(), + ConnectContext.get().getSessionVariable().getGroupCommit()); } else { sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); @@ -1155,7 +1156,8 @@ public class NativeInsertStmt extends InsertStmt { this.analyzer = analyzerTmp; } analyzeSubquery(analyzer, true); - groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId); + groupCommitPlanner = new GroupCommitPlanner((Database) db, olapTable, targetColumnNames, queryId, + ConnectContext.get().getSessionVariable().getGroupCommit()); // save plan message to be reused for prepare stmt loadId = queryId; baseSchemaVersion = olapTable.getBaseSchemaVersion(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index 213ec4df3c..a14f0f4285 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -186,7 +186,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, rows.add(GroupCommitPlanner.getRowStringValue(list, filterSize)); } GroupCommitPlanner groupCommitPlanner = new GroupCommitPlanner(physicalOlapTableSink.getDatabase(), - physicalOlapTableSink.getTargetTable(), null, ctx.queryId()); + physicalOlapTableSink.getTargetTable(), null, ctx.queryId(), + ConnectContext.get().getSessionVariable().getGroupCommit()); PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(ctx, rows); TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); if (code == TStatusCode.DATA_QUALITY_ERROR) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java index 63ab187335..14ecda1a5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java @@ -19,18 +19,44 @@ package org.apache.doris.planner; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; +import org.apache.doris.thrift.TGroupCommitMode; + +import com.google.common.base.Preconditions; import java.util.List; public class GroupCommitBlockSink extends OlapTableSink { + private String groupCommit; public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, - boolean singleReplicaLoad) { + boolean singleReplicaLoad, String groupCommit) { super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad); + this.groupCommit = groupCommit; } protected TDataSinkType getDataSinkType() { return TDataSinkType.GROUP_COMMIT_BLOCK_SINK; } + + @Override + protected TDataSink toThrift() { + TGroupCommitMode groupCommitMode = parseGroupCommit(groupCommit); + Preconditions.checkNotNull(groupCommitMode, "Group commit is: " + groupCommit); + tDataSink.olap_table_sink.setGroupCommitMode(groupCommitMode); + return tDataSink; + } + + public static TGroupCommitMode parseGroupCommit(String groupCommit) { + if (groupCommit == null) { + return null; + } else if (groupCommit.equalsIgnoreCase("async_mode")) { + return TGroupCommitMode.ASYNC_MODE; + } else if (groupCommit.equalsIgnoreCase("sync_mode")) { + return TGroupCommitMode.SYNC_MODE; + } else { + return null; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java index 5dcfdc1ba4..b3557bd563 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java @@ -79,7 +79,8 @@ public class GroupCommitPlanner { private TExecPlanFragmentParamsList paramsList; private ByteString execPlanFragmentParamsBytes; - public GroupCommitPlanner(Database db, OlapTable table, List targetColumnNames, TUniqueId queryId) + public GroupCommitPlanner(Database db, OlapTable table, List targetColumnNames, TUniqueId queryId, + String groupCommit) throws UserException, TException { this.db = db; this.table = table; @@ -101,7 +102,7 @@ public class GroupCommitPlanner { .setTbl(table.getName()) .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId) - .setGroupCommit(true).setTrimDoubleQuotes(true); + .setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit); StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); // Will using load id as query id in fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 351f7aa92f..357876c81c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -105,7 +105,7 @@ public class OlapTableSink extends DataSink { private HashSet partialUpdateInputColumns; // set after init called - private TDataSink tDataSink; + protected TDataSink tDataSink; private boolean singleReplicaLoad; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 54791e8fbe..65fe216369 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -259,9 +259,9 @@ public class StreamLoadPlanner { // create dest sink List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink; - if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).isGroupCommit()) { + if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) { olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load); + Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit()); } else { olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); } @@ -479,9 +479,9 @@ public class StreamLoadPlanner { // create dest sink List partitionIds = getAllPartitionIds(); OlapTableSink olapTableSink; - if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).isGroupCommit()) { + if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) { olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load); + Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit()); } else { olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 33603dd78a..813d2381c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -31,6 +31,7 @@ import org.apache.doris.nereids.metrics.EventSwitchParser; import org.apache.doris.nereids.parser.ParseDialect; import org.apache.doris.nereids.parser.ParseDialect.Dialect; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.qe.VariableMgr.VarAttr; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; @@ -398,7 +399,7 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_TABLE_ANALYZE_PART_NUM = "external_table_analyze_part_num"; public static final String ENABLE_STRONG_CONSISTENCY = "enable_strong_consistency_read"; - public static final String ENABLE_INSERT_GROUP_COMMIT = "enable_insert_group_commit"; + public static final String GROUP_COMMIT = "group_commit"; public static final String PARALLEL_SYNC_ANALYZE_TASK_NUM = "parallel_sync_analyze_task_num"; @@ -1323,8 +1324,8 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = LOAD_STREAM_PER_NODE) public int loadStreamPerNode = 20; - @VariableMgr.VarAttr(name = ENABLE_INSERT_GROUP_COMMIT) - public boolean enableInsertGroupCommit = false; + @VariableMgr.VarAttr(name = GROUP_COMMIT) + public String groupCommit = "off_mode"; @VariableMgr.VarAttr(name = INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD, description = {"在match_all中求取多个倒排索引的交集时,如果最大的倒排索引中的总数是最小倒排索引中的总数的整数倍," @@ -3095,7 +3096,14 @@ public class SessionVariable implements Serializable, Writable { } public boolean isEnableInsertGroupCommit() { - return enableInsertGroupCommit || Config.wait_internal_group_commit_finish; + return Config.wait_internal_group_commit_finish || GroupCommitBlockSink.parseGroupCommit(groupCommit) != null; + } + + public String getGroupCommit() { + if (Config.wait_internal_group_commit_finish) { + return "sync_mode"; + } + return groupCommit; } public boolean isEnableMaterializedViewRewrite() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 09064146d1..a8fdbf60cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -227,6 +227,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -1992,10 +1993,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { NativeInsertStmt parsedStmt = (NativeInsertStmt) SqlParserUtils.getFirstStmt(parser); parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0)); parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); - if (request.isGroupCommit()) { + if (!StringUtils.isEmpty(request.getGroupCommitMode())) { if (parsedStmt.getLabel() != null) { throw new AnalysisException("label and group_commit can't be set at the same time"); } + ctx.getSessionVariable().groupCommit = request.getGroupCommitMode(); parsedStmt.isGroupCommitStreamLoadSql = true; } StmtExecutor executor = new StmtExecutor(ctx, parsedStmt); @@ -2083,7 +2085,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { } StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); TExecPlanFragmentParams plan = planner.plan(streamLoadTask.getId(), multiTableFragmentInstanceIdIndex); - if (!request.isGroupCommit()) { + if (StringUtils.isEmpty(streamLoadTask.getGroupCommit())) { // add table indexes to transaction state TransactionState txnState = Env.getCurrentGlobalTransactionMgr() .getTransactionState(db.getId(), request.getTxnId()); @@ -2149,7 +2151,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId(), multiTableFragmentInstanceIdIndex); - if (!request.isGroupCommit()) { + if (StringUtils.isEmpty(streamLoadTask.getGroupCommit())) { // add table indexes to transaction state TransactionState txnState = Env.getCurrentGlobalTransactionMgr() .getTransactionState(db.getId(), request.getTxnId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index e0f2954367..68123b497c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -95,7 +95,7 @@ public class StreamLoadTask implements LoadTaskInfo { private byte escape = 0; - private boolean groupCommit = false; + private String groupCommit; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType, TFileCompressType compressType) { @@ -324,7 +324,7 @@ public class StreamLoadTask implements LoadTaskInfo { request.getFileType(), request.getFormatType(), request.getCompressType()); streamLoadTask.setOptionalFromTSLPutRequest(request); - streamLoadTask.setGroupCommit(request.isGroupCommit()); + streamLoadTask.setGroupCommit(request.getGroupCommitMode()); if (request.isSetFileSize()) { streamLoadTask.fileSize = request.getFileSize(); } @@ -538,11 +538,11 @@ public class StreamLoadTask implements LoadTaskInfo { return maxFilterRatio; } - public void setGroupCommit(boolean groupCommit) { + public void setGroupCommit(String groupCommit) { this.groupCommit = groupCommit; } - public boolean isGroupCommit() { + public String getGroupCommit() { return groupCommit; } } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index 9e96897f70..40c0e760e8 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -235,6 +235,12 @@ struct TExportSink { 7: optional string header } +enum TGroupCommitMode { + SYNC_MODE, + ASYNC_MODE, + OFF_MODE +} + struct TOlapTableSink { 1: required Types.TUniqueId load_id 2: required i64 txn_id @@ -256,7 +262,10 @@ struct TOlapTableSink { 18: optional Descriptors.TOlapTableLocationParam slave_location 19: optional i64 txn_timeout_s // timeout of load txn in second 20: optional bool write_file_cache + + // used by GroupCommitBlockSink 21: optional i64 base_schema_version + 22: optional TGroupCommitMode group_commit_mode } struct TDataSink { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c49b5438f7..4c62de1138 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -652,8 +652,9 @@ struct TStreamLoadPutRequest { // only valid when file type is CSV 52: optional i8 escape 53: optional bool memtable_on_sink_node; - 54: optional bool group_commit + 54: optional bool group_commit // deprecated 55: optional i32 stream_per_node; + 56: optional string group_commit_mode } struct TStreamLoadPutResult { diff --git a/regression-test/data/insert_p0/insert_group_commit_into_unique_sync_mode.out b/regression-test/data/insert_p0/insert_group_commit_into_unique_sync_mode.out new file mode 100644 index 0000000000..2946a07897 --- /dev/null +++ b/regression-test/data/insert_p0/insert_group_commit_into_unique_sync_mode.out @@ -0,0 +1,161 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 a 10 1 +2 b -1 0 +3 c -1 0 +4 \N -1 0 +5 q 50 0 +6 \N -1 0 +11 a 211 0 +12 b 22 1 +13 c 23 0 +14 d 24 0 +15 c 23 0 +16 d 24 1 +27 e 25 0 + +-- !sql -- +2 b -1 0 +3 c -1 0 +4 \N -1 0 +5 q 50 0 +6 \N -1 0 +11 a 211 0 +13 c 23 0 +14 d 24 0 +15 c 23 0 +27 e 25 0 + +-- !sql -- +1 a 10 10 1 +2 b 30 30 0 +3 c 30 30 0 +4 \N 70 70 0 +5 q 50 50 0 +6 \N 60 60 0 +11 a 211 211 0 +12 b 22 22 1 +13 c 23 23 0 +14 d 24 24 0 +15 c 23 23 0 +16 d 24 24 1 +27 e 25 25 0 + +-- !sql -- +2 b 30 30 0 +3 c 30 30 0 +4 \N 70 70 0 +5 q 50 50 0 +6 \N 60 60 0 +11 a 211 211 0 +13 c 23 23 0 +14 d 24 24 0 +15 c 23 23 0 +27 e 25 25 0 + +-- !sql -- +1 a 200 200 1 +2 b 30 200 0 +3 c 30 300 0 +5 q 50 500 0 +6 \N 60 600 0 +10 a 10 11 0 +11 a 11 10 1 +12 a 12 10 0 +13 a 13 10 0 +20 b 20 8 0 +21 b 21 7 0 +22 b 22 6 0 + +-- !sql -- +2 b 30 200 0 +3 c 30 300 0 +5 q 50 500 0 +6 \N 60 600 0 +10 a 10 11 0 +12 a 12 10 0 +13 a 13 10 0 +20 b 20 8 0 +21 b 21 7 0 +22 b 22 6 0 + +-- !sql -- +1 a 10 1 +2 b -1 0 +3 c -1 0 +4 \N -1 0 +5 q 50 0 +6 \N -1 0 +11 a 211 0 +12 b 22 1 +13 c 23 0 +14 d 24 0 +15 c 23 0 +16 d 24 1 +27 e 25 0 + +-- !sql -- +2 b -1 0 +3 c -1 0 +4 \N -1 0 +5 q 50 0 +6 \N -1 0 +11 a 211 0 +13 c 23 0 +14 d 24 0 +15 c 23 0 +27 e 25 0 + +-- !sql -- +1 a 10 10 1 +2 b 30 30 0 +3 c 30 30 0 +4 \N 70 70 0 +5 q 50 50 0 +6 \N 60 60 0 +11 a 211 211 0 +12 b 22 22 1 +13 c 23 23 0 +14 d 24 24 0 +15 c 23 23 0 +16 d 24 24 1 +27 e 25 25 0 + +-- !sql -- +2 b 30 30 0 +3 c 30 30 0 +4 \N 70 70 0 +5 q 50 50 0 +6 \N 60 60 0 +11 a 211 211 0 +13 c 23 23 0 +14 d 24 24 0 +15 c 23 23 0 +27 e 25 25 0 + +-- !sql -- +1 a 200 200 1 +2 b 30 200 0 +3 c 30 300 0 +5 q 50 500 0 +6 \N 60 600 0 +10 a 10 11 0 +11 a 11 10 1 +12 a 12 10 0 +13 a 13 10 0 +20 b 20 8 0 +21 b 21 7 0 +22 b 22 6 0 + +-- !sql -- +2 b 30 200 0 +3 c 30 300 0 +5 q 50 500 0 +6 \N 60 600 0 +10 a 10 11 0 +12 a 12 10 0 +13 a 13 10 0 +20 b 20 8 0 +21 b 21 7 0 +22 b 22 6 0 + diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index c87f6cc0bf..4b4fa430de 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -103,7 +103,7 @@ suite("insert_group_commit_into") { """ connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ @@ -214,54 +214,54 @@ suite("insert_group_commit_into") { // try_sql("DROP TABLE ${table}") } -// // test connect to observer fe -// try { -// def fes = sql_return_maparray "show frontends" -// logger.info("frontends: ${fes}") -// if (fes.size() > 1) { -// def observer_fe = null -// for (def fe : fes) { -// if (fe.IsMaster == "false") { -// observer_fe = fe -// break -// } -// } -// if (observer_fe != null) { -// def url = "jdbc:mysql://${observer_fe.Host}:${observer_fe.QueryPort}/" -// logger.info("observer url: " + url) -// connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = url) { -// sql """ set enable_insert_group_commit = true; """ -// sql """ set enable_nereids_dml = false; """ -// sql """ set enable_profile= true; """ -// -// // 1. insert into -// def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 -// assertTrue(server_info.contains('query_id')) -// // get query_id, such as 43f87963586a482a-b0496bcf9e2b5555 -// def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length() -// def query_id = server_info.substring(query_id_index, query_id_index + 33) -// logger.info("query_id: " + query_id) -// // 2. check profile -// StringBuilder sb = new StringBuilder(); -// sb.append("curl -X GET -u ${context.config.jdbcUser}:${context.config.jdbcPassword} http://${observer_fe.Host}:${observer_fe.HttpPort}") -// sb.append("/api/profile?query_id=").append(query_id) -// String command = sb.toString() -// logger.info(command) -// def process = command.execute() -// def code = process.waitFor() -// def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); -// def out = process.getText() -// logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err) -// assertEquals(code, 0) -// def json = parseJson(out) -// assertEquals("success", json.msg.toLowerCase()) -// } -// } -// } else { -// logger.info("only one fe, skip test connect to observer fe") -// } -// } finally { -// } + // test connect to observer fe + /*try { + def fes = sql_return_maparray "show frontends" + logger.info("frontends: ${fes}") + if (fes.size() > 1) { + def observer_fe = null + for (def fe : fes) { + if (fe.IsMaster == "false") { + observer_fe = fe + break + } + } + if (observer_fe != null) { + def url = "jdbc:mysql://${observer_fe.Host}:${observer_fe.QueryPort}/" + logger.info("observer url: " + url) + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = url) { + sql """ set group_commit = async_mode; """ + sql """ set enable_nereids_dml = false; """ + sql """ set enable_profile= true; """ + + // 1. insert into + def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 + assertTrue(server_info.contains('query_id')) + // get query_id, such as 43f87963586a482a-b0496bcf9e2b5555 + def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length() + def query_id = server_info.substring(query_id_index, query_id_index + 33) + logger.info("query_id: " + query_id) + // 2. check profile + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET -u ${context.config.jdbcUser}:${context.config.jdbcPassword} http://${observer_fe.Host}:${observer_fe.HttpPort}") + sb.append("/api/profile?query_id=").append(query_id) + String command = sb.toString() + logger.info(command) + def process = command.execute() + def code = process.waitFor() + def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream()))); + def out = process.getText() + logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def json = parseJson(out) + assertEquals("success", json.msg.toLowerCase()) + } + } + } else { + logger.info("only one fe, skip test connect to observer fe") + } + } finally { + }*/ // table with array type tableName = "insert_group_commit_into_duplicate_array" @@ -294,7 +294,7 @@ suite("insert_group_commit_into") { """ connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy index 9fae43ede1..fba2473dfe 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy @@ -92,7 +92,7 @@ suite("insert_group_commit_into_unique") { // 1. insert into connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ @@ -117,7 +117,7 @@ suite("insert_group_commit_into_unique") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name, score' file "test_group_commit_1.csv" unset 'label' @@ -135,7 +135,7 @@ suite("insert_group_commit_into_unique") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name, score, __DORIS_DELETE_SIGN__' file "test_group_commit_2.csv" unset 'label' @@ -178,7 +178,7 @@ suite("insert_group_commit_into_unique") { // 1. insert into connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ @@ -203,7 +203,7 @@ suite("insert_group_commit_into_unique") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name, score' file "test_group_commit_1.csv" unset 'label' @@ -221,7 +221,7 @@ suite("insert_group_commit_into_unique") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name, score, __DORIS_DELETE_SIGN__' file "test_group_commit_2.csv" unset 'label' @@ -265,7 +265,7 @@ suite("insert_group_commit_into_unique") { // 1. insert into connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ @@ -291,7 +291,7 @@ suite("insert_group_commit_into_unique") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__' set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' file "test_group_commit_3.csv" @@ -310,7 +310,7 @@ suite("insert_group_commit_into_unique") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__' set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' file "test_group_commit_4.csv" diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy new file mode 100644 index 0000000000..d278653ac1 --- /dev/null +++ b/regression-test/suites/insert_p0/insert_group_commit_into_unique_sync_mode.groovy @@ -0,0 +1,368 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.mysql.cj.jdbc.StatementImpl + +suite("insert_group_commit_into_unique_sync_mode") { + def dbName = "regression_test_insert_p0" + def tableName = "insert_group_commit_into_unique_sync" + def dbTableName = dbName + "." + tableName + + def getRowCount = { expectedRowCount -> + def rowCount = sql "select count(*) from ${dbTableName}" + logger.info("rowCount: " + rowCount + ", expecedRowCount: " + expectedRowCount) + assertEquals(expectedRowCount, rowCount[0][0]) + } + + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + } + + def off_mode_group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'VISIBLE'")) + assertFalse(serverInfo.contains("'label':'group_commit_")) + } + + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + def checkOffModeStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertFalse(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + for (item in ["legacy", "nereids"]) { + // 1. table without sequence column + try { + tableName = "insert_group_commit_into_unique_s_" + "1_" + item + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "1000" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set group_commit = sync_mode; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + // sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${dbTableName}(id) select 6; """, 1 + group_commit_insert """ insert into ${dbTableName}(id) values(4); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) values(1, 'a', 10, 1) """, 1 + + getRowCount(5) + // qt_sql """ select * from ${dbTableName} order by id, name, score asc; """ + } + + // 2. stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'sync_mode' + set 'columns', 'id, name, score' + file "test_group_commit_1.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + getRowCount(9) + // qt_sql """ select * from ${dbTableName} order by id, name, score asc; """ + + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) select * from http_stream + ("format"="csv", "column_separator"=",") + """ + set 'group_commit', 'sync_mode' + file "test_group_commit_2.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 5, 5, 0, 0) + } + } + getRowCount(10) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") + } + + // 2. table with "function_column.sequence_col" + try { + tableName = "insert_group_commit_into_unique_s_" + "2_" + item + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "function_column.sequence_col" = "score", + "group_commit_interval_ms" = "1000" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set group_commit = sync_mode; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + // sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${dbTableName}(id, score) select 6, 60; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score) values(4, 70); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id, score) values('c', 3, 30); """, 1 + sql """ set group_commit = OFF_MODE; """ + off_mode_group_commit_insert """ insert into ${dbTableName}(score, id, name) values(30, 2, 'b'); """, 1 + sql """ set group_commit = sync_mode; """ + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) values(1, 'a', 10, 1) """, 1 + + getRowCount(5) + // qt_sql """ select * from ${dbTableName} order by id, name, score asc; """ + }; + + // 2. stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'SYNC_mode' + set 'columns', 'id, name, score' + file "test_group_commit_1.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + getRowCount(9) + // qt_sql """ select * from ${dbTableName} order by id, name, score asc; """ + + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${dbTableName} (id, name, score, __DORIS_DELETE_SIGN__) + select * from http_stream ("format"="csv", "column_separator"=",") + """ + set 'group_commit', 'off_mode' + file "test_group_commit_2.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkOffModeStreamLoadResult(exception, result, 5, 5, 0, 0) + } + } + getRowCount(10) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") + sql """ set show_hidden_columns = false """ + } + + // 3. table with "function_column.sequence_type" + try { + tableName = "insert_group_commit_into_unique_s_" + "3_" + item + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "function_column.sequence_type" = "int", + "group_commit_interval_ms" = "1000" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set group_commit = sync_mode; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + // sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_SEQUENCE_COL__) values (1, 'a', 10, 100),(5, 'q', 50, 500); """, 2 + group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) select 6, 60, 600; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) values(6, 50, 500); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id, score, __DORIS_SEQUENCE_COL__) values('c', 3, 30, 300); """, 1 + group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__) values(30, 2, 'b', 200); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__, __DORIS_SEQUENCE_COL__) values(1, 'a', 200, 1, 200) """, 1 + group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__) values(30, 2, 'b', 100, 1); """, 1 + + getRowCount(4) + // qt_sql """ select * from ${dbTableName} order by id, name, score asc; """ + }; + + // 2. stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'SYNC_MODE' + set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__' + set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' + file "test_group_commit_3.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + getRowCount(8) + // qt_sql """ select * from ${dbTableName} order by id, name, score asc; """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'OFF_mode' + set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__' + set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' + file "test_group_commit_4.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkOffModeStreamLoadResult(exception, result, 7, 7, 0, 0) + } + } + getRowCount(10) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") + sql """ set show_hidden_columns = false """ + } + } +} diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy index 96910c0e1b..31ed7680c9 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy @@ -70,7 +70,7 @@ suite("insert_group_commit_with_exception") { ); """ - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ @@ -157,7 +157,7 @@ suite("insert_group_commit_with_exception") { try (Connection connection = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword)) { Statement statement = connection.createStatement(); statement.execute("use ${db}"); - statement.execute("set enable_insert_group_commit = true;"); + statement.execute("set group_commit = eventual_consistency;"); if (item == "nereids") { statement.execute("set enable_nereids_dml = true;"); statement.execute("set enable_nereids_planner=true;"); diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy index 888b294d19..6a708eaf5e 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy @@ -67,7 +67,7 @@ suite("insert_group_commit_with_large_data") { """ connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy index b9748dc8d1..144cc30c09 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -99,7 +99,7 @@ suite("insert_group_commit_with_prepare_stmt") { ); """ - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ // 1. insert into def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, ?, ?) """ @@ -158,7 +158,7 @@ suite("insert_group_commit_with_prepare_stmt") { ); """ - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ // 1. insert into def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, ?, ?) """ diff --git a/regression-test/suites/insert_p0/insert_with_null.groovy b/regression-test/suites/insert_p0/insert_with_null.groovy index 66096ed22a..5a7a19f82e 100644 --- a/regression-test/suites/insert_p0/insert_with_null.groovy +++ b/regression-test/suites/insert_p0/insert_with_null.groovy @@ -67,10 +67,10 @@ suite("insert_with_null") { if (write_mode == "txn_insert") { sql "begin" } else if (write_mode == "group_commit_legacy") { - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ } else if (write_mode == "group_commit_nereids") { - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ //sql """ set enable_fallback_to_original_planner=false; """ diff --git a/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy b/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy index e3704e54de..c4bf3bd000 100644 --- a/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy +++ b/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy @@ -60,7 +60,7 @@ suite("test_group_commit_interval_ms_property") { connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql "set enable_insert_group_commit = true;" + sql """ set group_commit = async_mode; """ if (item == "nereids") { sql """ set enable_nereids_dml = true; """ diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy index 2a01a42645..a6b05dcede 100644 --- a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_client.groovy @@ -103,7 +103,7 @@ l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, ("format"="csv", "column_separator"="|") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i unset 'label' diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy index cfaf802783..b73954aca5 100644 --- a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_multiple_table.groovy @@ -97,7 +97,7 @@ l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, ("format"="csv", "column_separator"="|") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i unset 'label' diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy index 2b21385c8b..d1d4e476b4 100644 --- a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy @@ -95,7 +95,7 @@ l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, ("format"="csv", "column_separator"="|") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i unset 'label' diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy index b9c15b03a0..8a979bf6f3 100644 --- a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy @@ -160,7 +160,7 @@ l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, ("format"="csv", "column_separator"="|") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i unset 'label' @@ -181,7 +181,7 @@ l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c14, c15, c ("format"="csv", "column_separator"="|") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i unset 'label' diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy index 05f6c074ac..3a094c17ce 100644 --- a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy @@ -103,13 +103,13 @@ PROPERTIES ( "replication_num" = "1" ); """ - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ } def do_insert_into = { file_name -> logger.info("file:" + file_name) - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ //read and insert BufferedReader reader; diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy index eaf963ffe9..eef2d2bc3e 100644 --- a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy @@ -103,12 +103,12 @@ PROPERTIES ( "replication_num" = "1" ); """ - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ } def do_insert_into = { file_name, table_name -> - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ logger.info("file:" + file_name) //read and insert diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy index ca745a39d8..9fee850899 100644 --- a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy @@ -98,7 +98,7 @@ PROPERTIES ( "replication_num" = "1" ); """ - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ } diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy index 76e6670e6d..917cadf31b 100644 --- a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy @@ -130,7 +130,7 @@ PROPERTIES ( "replication_num" = "1" ); """ - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ } @@ -163,7 +163,7 @@ PROPERTIES ( "replication_num" = "1" ); """ - sql """ set enable_insert_group_commit = true; """ + sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ } diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy index 90efd819fe..5560689694 100644 --- a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_client.groovy @@ -100,7 +100,7 @@ PROPERTIES ( set 'column_separator', '|' set 'columns', columns + ",lo_dummy" - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy index 5225c17873..7e7289b61f 100644 --- a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_multiple_table.groovy @@ -94,7 +94,7 @@ PROPERTIES ( set 'column_separator', '|' set 'columns', columns + ",lo_dummy" - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy index fc00bb0831..5f3d86116e 100644 --- a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy @@ -92,7 +92,7 @@ l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipin set 'column_separator', '|' set 'columns', columns + ",lo_dummy" - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy index 8d5171fc58..d7cf67d542 100644 --- a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy @@ -157,7 +157,7 @@ PROPERTIES ( set 'column_separator', '|' set 'columns', columns + ",lo_dummy" - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i diff --git a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy index 56b37c248e..de2581e023 100644 --- a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy @@ -111,7 +111,7 @@ suite("test_group_commit_http_stream") { insert into ${db}.${tableName} select * from http_stream ("format"="csv", "compress_type"="${compressionType}", "column_separator"=",") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file "${fileName}" unset 'label' @@ -131,7 +131,7 @@ suite("test_group_commit_http_stream") { ("format"="csv", "column_separator"=",") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file "test_stream_load1.csv" unset 'label' @@ -150,7 +150,7 @@ suite("test_group_commit_http_stream") { ("format"="csv", "column_separator"="|") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file "test_stream_load2.csv" unset 'label' @@ -169,7 +169,7 @@ suite("test_group_commit_http_stream") { ("format"="csv", "column_separator"=",") where c1 > 5 """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file "test_stream_load1.csv" unset 'label' @@ -190,7 +190,7 @@ suite("test_group_commit_http_stream") { ("format"="csv", "column_separator"=",") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file "test_stream_load1.csv" unset 'label' @@ -210,7 +210,7 @@ suite("test_group_commit_http_stream") { select c1, c2, c3 from http_stream ("format"="csv", "column_separator"=",") where c2 = 'a' """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file "test_stream_load3.csv" set 'max_filter_ratio', '0.7' unset 'label' @@ -232,7 +232,7 @@ suite("test_group_commit_http_stream") { ("format"="csv", "column_separator"="|") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file "test_stream_load2.csv" time 10000 // limit inflight 10s @@ -318,7 +318,7 @@ suite("test_group_commit_http_stream") { ("format"="csv", "compress_type"="GZ", "column_separator"="|") """ - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file """${getS3Url()}/regression/ssb/sf0.1/lineorder.tbl.gz""" diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy index 910589df11..f5cb97fbae 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_and_wal_back_pressure.groovy @@ -66,7 +66,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") { set 'column_separator', ',' set 'compress_type', 'GZ' set 'format', 'csv' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file 'test_group_commit_and_wal_back_pressure.csv.gz' @@ -84,7 +84,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") { set 'column_separator', ',' set 'compress_type', 'GZ' set 'format', 'csv' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file 'test_group_commit_and_wal_back_pressure.csv.gz' @@ -102,7 +102,7 @@ suite("test_group_commit_and_wal_back_pressure", "p2") { set 'column_separator', ',' set 'compress_type', 'GZ' set 'format', 'csv' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file 'test_group_commit_and_wal_back_pressure.csv.gz' diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index d478480f2d..6ae9fb13b4 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -97,7 +97,7 @@ suite("test_group_commit_stream_load") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'compress_type', "${compressionType}" // set 'columns', 'id, name, score' file "${fileName}" @@ -116,7 +116,7 @@ suite("test_group_commit_stream_load") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name' file "test_stream_load1.csv" unset 'label' @@ -133,7 +133,7 @@ suite("test_group_commit_stream_load") { table "${tableName}" set 'column_separator', '|' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'score, id, name' file "test_stream_load2.csv" unset 'label' @@ -150,7 +150,7 @@ suite("test_group_commit_stream_load") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name' file "test_stream_load1.csv" set 'where', 'id > 5' @@ -168,7 +168,7 @@ suite("test_group_commit_stream_load") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' set 'columns', 'id, name, score = id * 10' file "test_stream_load1.csv" unset 'label' @@ -185,7 +185,7 @@ suite("test_group_commit_stream_load") { table "${tableName}" set 'column_separator', ',' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' file "test_stream_load3.csv" set 'where', "name = 'a'" set 'max_filter_ratio', '0.7' @@ -204,7 +204,7 @@ suite("test_group_commit_stream_load") { // set 'label', 'test_stream_load' set 'column_separator', '|' - set 'group_commit', 'true' + set 'group_commit', 'async_mode' // set 'label', 'l_' + System.currentTimeMillis() file "test_stream_load2.csv" @@ -289,7 +289,7 @@ suite("test_group_commit_stream_load") { set 'column_separator', '|' set 'compress_type', 'GZ' set 'columns', columns + ",lo_dummy" - set 'group_commit', 'true' + set 'group_commit', 'async_mode' unset 'label' file """${getS3Url()}/regression/ssb/sf0.1/lineorder.tbl.gz"""