diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index d51bac6b18..a9b0848984 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1074,6 +1074,7 @@ DEFINE_String(group_commit_replay_wal_dir, "./wal"); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); DEFINE_Int32(group_commit_sync_wal_batch, "10"); +DEFINE_Bool(wait_internal_group_commit_finish, "false"); // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index e43370cc34..b6e911438e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1143,6 +1143,7 @@ DECLARE_String(group_commit_replay_wal_dir); DECLARE_Int32(group_commit_replay_wal_retry_num); DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); DECLARE_Int32(group_commit_sync_wal_batch); +DECLARE_Bool(wait_internal_group_commit_finish); // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 067f8c5d28..d81f42824c 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -167,7 +167,8 @@ int HttpStreamAction::on_header(HttpRequest* req) { ctx->load_type = TLoadType::MANUL_LOAD; ctx->load_src_type = TLoadSourceType::RAW; - ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true"); + ctx->group_commit = iequal(req->header(HTTP_GROUP_COMMIT), "true") || + config::wait_internal_group_commit_finish; ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true" ? true : false; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index d16d41795a..896a1f2775 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -188,8 +188,9 @@ int StreamLoadAction::on_header(HttpRequest* req) { url_decode(req->param(HTTP_TABLE_KEY), &ctx->table); ctx->label = req->header(HTTP_LABEL_KEY); Status st = Status::OK(); - if (iequal(req->header(HTTP_GROUP_COMMIT), "true")) { - if (!ctx->label.empty()) { + if (iequal(req->header(HTTP_GROUP_COMMIT), "true") || + config::wait_internal_group_commit_finish) { + if (iequal(req->header(HTTP_GROUP_COMMIT), "true") && !ctx->label.empty()) { st = Status::InternalError("label and group_commit can't be set at the same time"); } ctx->group_commit = true; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 02e484fd77..77995da8d4 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -31,7 +31,7 @@ namespace doris { Status LoadBlockQueue::add_block(std::shared_ptr block) { DCHECK(block->get_schema_version() == schema_version); - std::unique_lock l(*_mutex); + std::unique_lock l(mutex); RETURN_IF_ERROR(_status); while (*_all_block_queues_bytes > config::group_commit_max_queue_size) { _put_cond.wait_for( @@ -49,7 +49,7 @@ Status LoadBlockQueue::add_block(std::shared_ptr block) Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, bool* eos) { *find_block = false; *eos = false; - std::unique_lock l(*_mutex); + std::unique_lock l(mutex); if (!need_commit) { auto left_milliseconds = config::group_commit_interval_ms - std::chrono::duration_cast( @@ -99,7 +99,7 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo } void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { - std::unique_lock l(*_mutex); + std::unique_lock l(mutex); if (_load_ids.find(load_id) != _load_ids.end()) { _load_ids.erase(load_id); _get_cond.notify_all(); @@ -107,7 +107,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { } Status LoadBlockQueue::add_load_id(const UniqueId& load_id) { - std::unique_lock l(*_mutex); + std::unique_lock l(mutex); if (need_commit) { return Status::InternalError("block queue is set need commit, id=" + load_instance_id.to_string()); @@ -118,7 +118,7 @@ Status LoadBlockQueue::add_load_id(const UniqueId& load_id) { void LoadBlockQueue::cancel(const Status& st) { DCHECK(!st.ok()); - std::unique_lock l(*_mutex); + std::unique_lock l(mutex); _status = st; while (!_block_queue.empty()) { { @@ -250,7 +250,8 @@ Status GroupCommitTable::_create_group_commit_load( << ", is_pipeline=" << is_pipeline; { load_block_queue = std::make_shared( - instance_id, label, txn_id, schema_version, _all_block_queues_bytes); + instance_id, label, txn_id, schema_version, _all_block_queues_bytes, + result.wait_internal_group_commit_finish); std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); _need_plan_fragment = false; @@ -269,16 +270,6 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ const std::string& label, int64_t txn_id, const TUniqueId& instance_id, Status& status, bool prepare_failed, RuntimeState* state) { - { - std::lock_guard l(_lock); - if (prepare_failed || !status.ok()) { - auto it = _load_block_queues.find(instance_id); - if (it != _load_block_queues.end()) { - it->second->cancel(status); - } - } - _load_block_queues.erase(instance_id); - } Status st; Status result_status; if (status.ok()) { @@ -317,6 +308,21 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ 10000L); result_status = Status::create(result.status); } + { + std::lock_guard l(_lock); + auto it = _load_block_queues.find(instance_id); + if (it != _load_block_queues.end()) { + auto& load_block_queue = it->second; + if (prepare_failed || !status.ok()) { + load_block_queue->cancel(status); + } + if (load_block_queue->wait_internal_group_commit_finish) { + std::unique_lock l2(load_block_queue->mutex); + load_block_queue->internal_group_commit_finish_cv.notify_all(); + } + } + _load_block_queues.erase(instance_id); + } if (!st.ok()) { LOG(WARNING) << "request finish error, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label << ", txn_id=" << txn_id diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 4983811233..9c0a8a047f 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -37,14 +37,15 @@ class LoadBlockQueue { public: LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id, int64_t schema_version, - std::shared_ptr all_block_queues_bytes) + std::shared_ptr all_block_queues_bytes, + bool wait_internal_group_commit_finish) : load_instance_id(load_instance_id), label(label), txn_id(txn_id), schema_version(schema_version), + wait_internal_group_commit_finish(wait_internal_group_commit_finish), _start_time(std::chrono::steady_clock::now()), _all_block_queues_bytes(all_block_queues_bytes) { - _mutex = std::make_shared(); _single_block_queue_bytes = std::make_shared(0); }; @@ -60,11 +61,13 @@ public: int64_t txn_id; int64_t schema_version; bool need_commit = false; + bool wait_internal_group_commit_finish = false; + doris::Mutex mutex; + doris::ConditionVariable internal_group_commit_finish_cv; private: std::chrono::steady_clock::time_point _start_time; - std::shared_ptr _mutex; doris::ConditionVariable _put_cond; doris::ConditionVariable _get_cond; // the set of load ids of all blocks in this queue diff --git a/be/src/runtime/stream_load/new_load_stream_mgr.h b/be/src/runtime/stream_load/new_load_stream_mgr.h index 45be8bb306..61e4010a2c 100644 --- a/be/src/runtime/stream_load/new_load_stream_mgr.h +++ b/be/src/runtime/stream_load/new_load_stream_mgr.h @@ -52,7 +52,7 @@ public: _stream_map.emplace(id, stream); } - LOG(INFO) << "put stream load pipe: " << id; + VLOG_NOTICE << "put stream load pipe: " << id; return Status::OK(); } @@ -63,7 +63,7 @@ public: return iter->second; } } - LOG(INFO) << "stream load pipe does not exist: " << id; + VLOG_NOTICE << "stream load pipe does not exist: " << id; return nullptr; } @@ -71,7 +71,7 @@ public: std::lock_guard l(_lock); if (auto iter = _stream_map.find(id); iter != _stream_map.end()) { _stream_map.erase(iter); - LOG(INFO) << "remove stream load pipe: " << id; + VLOG_NOTICE << "remove stream load pipe: " << id; } } diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index dcef9dccec..ddff0cf330 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -102,6 +102,10 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { loaded_rows); state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() + state->num_rows_load_filtered()); + if (_load_block_queue && _load_block_queue->wait_internal_group_commit_finish) { + std::unique_lock l(_load_block_queue->mutex); + _load_block_queue->internal_group_commit_finish_cv.wait(l); + } return Status::OK(); } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index ef1cfddab9..50c31755a0 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -484,6 +484,12 @@ public class Config extends ConfigBase { "Default timeout for insert load job, in seconds."}) public static int insert_load_default_timeout_second = 14400; // 4 hour + @ConfField(mutable = true, masterOnly = true, description = { + "等内部攒批真正写入完成才返回;insert into和stream load默认开启攒批", + "Wait for the internal batch to be written before returning; " + + "insert into and stream load use group commit by default."}) + public static boolean wait_internal_group_commit_finish = false; + @ConfField(mutable = true, masterOnly = true, description = {"Stream load 的默认超时时间,单位是秒。", "Default timeout for stream load job, in seconds."}) public static int stream_load_default_timeout_second = 86400 * 3; // 3days diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 27e4c6bf36..e99a539b14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; @@ -248,21 +249,8 @@ public class NativeInsertStmt extends InsertStmt { OlapTable olapTable = (OlapTable) table; tblName.setDb(olapTable.getDatabase().getFullName()); tblName.setTbl(olapTable.getName()); - if (olapTable.getDeleteSignColumn() != null) { - List columns = Lists.newArrayList(olapTable.getBaseSchema(false)); - // The same order as GroupCommitTableValuedFunction#getTableColumns - // delete sign col - columns.add(olapTable.getDeleteSignColumn()); - // version col - Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst() - .orElse(null); - if (versionColumn != null) { - columns.add(versionColumn); - } - // sequence col - if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) { - columns.add(olapTable.getSequenceCol()); - } + if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) { + List columns = Lists.newArrayList(olapTable.getBaseSchema(true)); targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList()); } } @@ -1086,7 +1074,7 @@ public class NativeInsertStmt extends InsertStmt { LOG.warn("analyze group commit failed", e); return; } - if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit + if (ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit() && targetTable instanceof OlapTable && !ConnectContext.get().isTxnModel() && getQueryStmt() instanceof SelectStmt diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index 79dc9485fc..4541120452 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -165,7 +165,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, } OlapTableSink sink = ((OlapTableSink) planner.getFragments().get(0).getSink()); - if (ctx.getSessionVariable().enableInsertGroupCommit) { + if (ctx.getSessionVariable().isEnableInsertGroupCommit()) { // group commit if (analyzeGroupCommit(sink, physicalOlapTableSink)) { handleGroupCommit(ctx, sink, physicalOlapTableSink); @@ -421,7 +421,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, } private boolean analyzeGroupCommit(OlapTableSink sink, PhysicalOlapTableSink physicalOlapTableSink) { - return ConnectContext.get().getSessionVariable().enableInsertGroupCommit + return ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit() && physicalOlapTableSink.getTargetTable() instanceof OlapTable && !ConnectContext.get().isTxnModel() && sink.getFragment().getPlanRoot() instanceof UnionNode diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index b4c117874f..711c5a2f01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -2918,4 +2918,8 @@ public class SessionVariable implements Serializable, Writable { throw new UnsupportedOperationException("sqlDialect value is invalid, the invalid value is " + sqlDialect); } } + + public boolean isEnableInsertGroupCommit() { + return enableInsertGroupCommit || Config.wait_internal_group_commit_finish; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 0c3902372d..4ec2110156 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1084,7 +1084,7 @@ public class StmtExecutor { analyzeVariablesInStmt(); } - if (context.getSessionVariable().enableInsertGroupCommit && parsedStmt instanceof NativeInsertStmt) { + if (context.getSessionVariable().isEnableInsertGroupCommit() && parsedStmt instanceof NativeInsertStmt) { NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt; nativeInsertStmt.analyzeGroupCommit(new Analyzer(context.getEnv(), context)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4cea18424f..c19be000e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2169,6 +2169,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.setDbId(parsedStmt.getTargetTable().getDatabase().getId()); result.setTableId(parsedStmt.getTargetTable().getId()); result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion()); + result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish); } catch (UserException e) { LOG.warn("exec sql error", e); throw new UserException("exec sql error" + e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java index 4029d612f1..db3622c11a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java @@ -66,27 +66,10 @@ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunct throw new AnalysisException("Only support OLAP table, but table type of table_id " + tableId + " is " + table.getType()); } - List tableColumns = table.getBaseSchema(false); + List tableColumns = table.getBaseSchema(true); for (int i = 1; i <= tableColumns.size(); i++) { fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true)); } - OlapTable olapTable = (OlapTable) table; - // delete sign column - Column deleteSignColumn = olapTable.getDeleteSignColumn(); - if (deleteSignColumn != null) { - fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true)); - } - // version column - Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst() - .orElse(null); - if (versionColumn != null) { - fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true)); - } - // sequence column - if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) { - Column sequenceCol = olapTable.getSequenceCol(); - fileColumns.add(new Column("c" + (fileColumns.size() + 1), sequenceCol.getType(), true)); - } return fileColumns; } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 04eaa57824..42f4016d10 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -653,6 +653,7 @@ struct TStreamLoadPutResult { 4: optional i64 base_schema_version 5: optional i64 db_id 6: optional i64 table_id + 7: optional bool wait_internal_group_commit_finish = false } struct TStreamLoadMultiTablePutResult {