From 9be0f0450662835d00674fc2c090ecc0af63cbed Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 29 Dec 2023 00:26:10 +0800 Subject: [PATCH] (improv)[group commit] refactor some group commit code (#29180) --- be/src/common/config.cpp | 17 ++- be/src/common/config.h | 17 ++- be/src/http/action/http_stream.cpp | 91 +++++++++------- be/src/http/action/http_stream.h | 5 +- be/src/http/action/stream_load.cpp | 100 ++++++++++-------- be/src/http/action/stream_load.h | 2 +- be/src/runtime/group_commit_mgr.h | 1 - be/src/vec/sink/group_commit_block_sink.cpp | 30 +++--- be/src/vec/sink/group_commit_block_sink.h | 4 +- .../doris/analysis/NativeInsertStmt.java | 7 ++ .../doris/planner/GroupCommitBlockSink.java | 2 + .../org/apache/doris/qe/SessionVariable.java | 5 +- .../org/apache/doris/qe/StmtExecutor.java | 5 + .../insert_p0/insert_group_commit_into.groovy | 5 +- ...sert_group_commit_with_prepare_stmt.groovy | 51 ++++++--- 15 files changed, 192 insertions(+), 150 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 6292b191f6..6ebfafce53 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1103,16 +1103,20 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120"); DEFINE_Int16(bitmap_serialize_version, "1"); -// group commit insert config +// group commit config DEFINE_String(group_commit_wal_path, ""); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); DEFINE_Int32(group_commit_relay_wal_threads, "10"); - -// the count of thread to group commit insert +// This config can be set to limit thread number in group commit request fragment thread pool. DEFINE_Int32(group_commit_insert_threads, "10"); DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000"); DEFINE_Bool(wait_internal_group_commit_finish, "false"); +// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M. +DEFINE_Int32(group_commit_max_queue_size, "67108864"); +// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. +// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. +DEFINE_String(group_commit_wal_max_disk_limit, "10%"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600"); @@ -1130,13 +1134,6 @@ DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); // Dir of default timezone files DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo"); -// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M. -DEFINE_Int32(group_commit_max_queue_size, "67108864"); - -// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. -// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. -DEFINE_String(group_commit_wal_max_disk_limit, "10%"); - // Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency DEFINE_Int32(ingest_binlog_work_pool_size, "-1"); diff --git a/be/src/common/config.h b/be/src/common/config.h index e580b38e48..4e09bb2b08 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1168,16 +1168,20 @@ DECLARE_Int32(grace_shutdown_wait_seconds); // BitmapValue serialize version. DECLARE_Int16(bitmap_serialize_version); -// group commit insert config +// group commit config DECLARE_String(group_commit_wal_path); DECLARE_Int32(group_commit_replay_wal_retry_num); DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); DECLARE_mInt32(group_commit_relay_wal_threads); - -// This config can be set to limit thread number in group commit insert thread pool. +// This config can be set to limit thread number in group commit request fragment thread pool. DECLARE_mInt32(group_commit_insert_threads); DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio); DECLARE_Bool(wait_internal_group_commit_finish); +// Max size(bytes) of group commit queues, used for mem back pressure. +DECLARE_Int32(group_commit_max_queue_size); +// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. +// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. +DECLARE_mString(group_commit_wal_max_disk_limit); // The configuration item is used to lower the priority of the scanner thread, // typically employed to ensure CPU scheduling for write operations. @@ -1203,13 +1207,6 @@ DECLARE_Bool(ignore_always_true_predicate_for_segment); // Dir of default timezone files DECLARE_String(default_tzfiles_path); -// Max size(bytes) of group commit queues, used for mem back pressure. -DECLARE_Int32(group_commit_max_queue_size); - -// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space. -// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified. -DECLARE_mString(group_commit_wal_max_disk_limit); - // Ingest binlog work pool size DECLARE_Int32(ingest_binlog_work_pool_size); diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 18256237a8..a18b10f491 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -167,45 +167,8 @@ int HttpStreamAction::on_header(HttpRequest* req) { ctx->load_type = TLoadType::MANUL_LOAD; ctx->load_src_type = TLoadSourceType::RAW; - - Status st = Status::OK(); - std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); - if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && - !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { - st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); - } else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) { - // off_mode and empty - group_commit_mode = "off_mode"; - ctx->group_commit = false; - } else { - // sync_mode and async_mode - ctx->group_commit = true; - } ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; - auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); - auto partitions = !req->header(HTTP_PARTITIONS).empty(); - if (!temp_partitions && !partitions && !ctx->two_phase_commit && - (!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) { - if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) { - ctx->group_commit = true; - group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode"; - if (iequal(group_commit_mode, "sync_mode")) { - size_t max_available_size = - ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); - LOG(INFO) << "When enable group commit, the data size can't be too large. The data " - "size " - "for this http load(" - << (req->header(HttpHeaders::CONTENT_LENGTH).empty() - ? 0 - : std::stol(req->header(HttpHeaders::CONTENT_LENGTH))) - << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" - << max_available_size - << " Bytes). So we set this load to \"group commit\"=sync_mode " - "automatically."; - st = Status::Error("Http load size too large."); - } - } - } + Status st = _handle_group_commit(req, ctx); LOG(INFO) << "new income streaming load request." << ctx->brief() << " sql : " << req->header(HTTP_SQL) << ", group_commit=" << ctx->group_commit; @@ -338,6 +301,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req, if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); } else { + // used for wait_internal_group_commit_finish request.__set_group_commit_mode("sync_mode"); } } @@ -366,10 +330,9 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req, ctx->txn_id = ctx->put_result.params.txn_conf.txn_id; ctx->label = ctx->put_result.params.import_label; ctx->put_result.params.__set_wal_id(ctx->wal_id); - if (http_req->header(HTTP_GROUP_COMMIT) == "async mode") { + if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { - size_t content_length = 0; - content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || ctx->format == TFileFormatType::FORMAT_CSV_LZO || ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || @@ -403,4 +366,50 @@ void HttpStreamAction::_save_stream_load_record(std::shared_ptr ctx) { + std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); + if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && + !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { + return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); + } + if (config::wait_internal_group_commit_finish) { + group_commit_mode = "sync_mode"; + } + if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) { + // off_mode and empty + ctx->group_commit = false; + return Status::OK(); + } + + auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && + iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); + auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); + auto partitions = !req->header(HTTP_PARTITIONS).empty(); + if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { + if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { + return Status::InternalError("label and group_commit can't be set at the same time"); + } + ctx->group_commit = true; + if (iequal(group_commit_mode, "async_mode")) { + group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode"; + if (iequal(group_commit_mode, "sync_mode")) { + size_t max_available_size = + ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); + LOG(INFO) << "When enable group commit, the data size can't be too large or " + "unknown. The data size for this stream load(" + << (req->header(HttpHeaders::CONTENT_LENGTH).empty() + ? 0 + : req->header(HttpHeaders::CONTENT_LENGTH)) + << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" + << max_available_size + << " Bytes). So we set this load to \"group commit\"=sync_mode\" " + "automatically."; + return Status::Error("Http load size too large."); + } + } + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/http/action/http_stream.h b/be/src/http/action/http_stream.h index 192e68257e..d4140a118d 100644 --- a/be/src/http/action/http_stream.h +++ b/be/src/http/action/http_stream.h @@ -47,12 +47,9 @@ public: private: Status _on_header(HttpRequest* http_req, std::shared_ptr ctx); Status _handle(HttpRequest* req, std::shared_ptr ctx); - Status _data_saved_path(HttpRequest* req, std::string* file_path); Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); - void _parse_format(const std::string& format_str, const std::string& compress_type_str, - TFileFormatType::type* format_type, TFileCompressType::type* compress_type); - bool _is_format_support_streaming(TFileFormatType::type format); + Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); private: ExecEnv* _exec_env; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index eddf31856a..a702be65d0 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -188,50 +188,8 @@ int StreamLoadAction::on_header(HttpRequest* req) { url_decode(req->param(HTTP_DB_KEY), &ctx->db); url_decode(req->param(HTTP_TABLE_KEY), &ctx->table); ctx->label = req->header(HTTP_LABEL_KEY); - Status st = Status::OK(); - std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); - if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && - !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { - st = Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); - } else if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) { - // off_mode and empty - group_commit_mode = "off_mode"; - ctx->group_commit = false; - } else { - // sync_mode and async_mode - ctx->group_commit = true; - } - auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && - iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); ctx->two_phase_commit = req->header(HTTP_TWO_PHASE_COMMIT) == "true"; - auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); - auto partitions = !req->header(HTTP_PARTITIONS).empty(); - if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit && - (!group_commit_mode.empty() || config::wait_internal_group_commit_finish)) { - if (!config::wait_internal_group_commit_finish && ctx->group_commit && - !ctx->label.empty()) { - st = Status::InternalError("label and group_commit can't be set at the same time"); - } - if (iequal(group_commit_mode, "async_mode") || config::wait_internal_group_commit_finish) { - ctx->group_commit = true; - group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode"; - if (iequal(group_commit_mode, "sync_mode")) { - size_t max_available_size = - ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); - LOG(INFO) << "When enable group commit, the data size can't be too large. The data " - "size " - "for this stream load(" - << (req->header(HttpHeaders::CONTENT_LENGTH).empty() - ? 0 - : std::stol(req->header(HttpHeaders::CONTENT_LENGTH))) - << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" - << max_available_size - << " Bytes). So we set this load to \"group commit\"=sync_mode " - "automatically."; - st = Status::Error("Stream load size too large."); - } - } - } + Status st = _handle_group_commit(req, ctx); if (!ctx->group_commit && ctx->label.empty()) { ctx->label = generate_uuid_string(); } @@ -649,7 +607,12 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_memtable_on_sink_node(value); } if (ctx->group_commit) { - request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); + if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { + request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); + } else { + // used for wait_internal_group_commit_finish + request.__set_group_commit_mode("sync_mode"); + } } #ifndef BE_TEST @@ -672,8 +635,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, } if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { - size_t content_length = 0; - content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); + size_t content_length = std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH)); if (ctx->format == TFileFormatType::FORMAT_CSV_GZ || ctx->format == TFileFormatType::FORMAT_CSV_LZO || ctx->format == TFileFormatType::FORMAT_CSV_BZ2 || @@ -730,4 +692,50 @@ void StreamLoadAction::_save_stream_load_record(std::shared_ptr ctx) { + std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); + if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && + !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { + return Status::InternalError("group_commit can only be [async_mode, sync_mode, off_mode]"); + } + if (config::wait_internal_group_commit_finish) { + group_commit_mode = "sync_mode"; + } + if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode")) { + // off_mode and empty + ctx->group_commit = false; + return Status::OK(); + } + + auto partial_columns = !req->header(HTTP_PARTIAL_COLUMNS).empty() && + iequal(req->header(HTTP_PARTIAL_COLUMNS), "true"); + auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty(); + auto partitions = !req->header(HTTP_PARTITIONS).empty(); + if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) { + if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { + return Status::InternalError("label and group_commit can't be set at the same time"); + } + ctx->group_commit = true; + if (iequal(group_commit_mode, "async_mode")) { + group_commit_mode = load_size_smaller_than_wal_limit(req) ? "async_mode" : "sync_mode"; + if (iequal(group_commit_mode, "sync_mode")) { + size_t max_available_size = + ExecEnv::GetInstance()->wal_mgr()->get_max_available_size(); + LOG(INFO) << "When enable group commit, the data size can't be too large or " + "unknown. The data size for this stream load(" + << (req->header(HttpHeaders::CONTENT_LENGTH).empty() + ? 0 + : req->header(HttpHeaders::CONTENT_LENGTH)) + << " Bytes) exceeds the WAL (Write-Ahead Log) limit (" + << max_available_size + << " Bytes). So we set this load to \"group commit\"=sync_mode\" " + "automatically."; + return Status::Error("Stream load size too large."); + } + } + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index eac3825f53..d1de89c939 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -50,7 +50,7 @@ private: Status _data_saved_path(HttpRequest* req, std::string* file_path); Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); - bool _load_size_smaller_than_wal_limit(HttpRequest* req); + Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); private: ExecEnv* _exec_env; diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 35afcc4624..b0553b4487 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -156,7 +156,6 @@ public: Status update_load_info(TUniqueId load_id, size_t content_length); Status get_load_info(TUniqueId load_id, size_t* content_length); Status remove_load_info(TUniqueId load_id); - std::condition_variable cv; private: ExecEnv* _exec_env = nullptr; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 3e38f9c42f..5686966587 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -19,8 +19,6 @@ #include -#include -#include #include #include "common/exception.h" @@ -29,7 +27,6 @@ #include "runtime/runtime_state.h" #include "util/doris_metrics.h" #include "vec/exprs/vexpr.h" -#include "vec/sink/volap_table_sink.h" #include "vec/sink/vtablet_finder.h" namespace doris { @@ -74,10 +71,10 @@ Status GroupCommitBlockSink::prepare(RuntimeState* state) { _state = state; // profile must add to state's object pool - _profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink")); + _profile = state->obj_pool()->add(new RuntimeProfile("GroupCommitBlockSink")); init_sink_common_profile(); - _mem_tracker = - std::make_shared("OlapTableSink:" + std::to_string(state->load_job_id())); + _mem_tracker = std::make_shared("GroupCommitBlockSink:" + + std::to_string(state->load_job_id())); SCOPED_TIMER(_profile->total_time_counter()); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -117,7 +114,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { (double)state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) { return Status::DataQualityError("too many filtered rows"); } - RETURN_IF_ERROR(_add_blocks(_group_commit_mode != TGroupCommitMode::SYNC_MODE, true)); + RETURN_IF_ERROR(_add_blocks(true)); } if (_load_block_queue) { _load_block_queue->remove_load_id(_load_id); @@ -223,15 +220,15 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, _blocks.emplace_back(output_block); } else { if (!_is_block_appended) { - RETURN_IF_ERROR(_add_blocks(_group_commit_mode != TGroupCommitMode::SYNC_MODE, false)); + RETURN_IF_ERROR(_add_blocks(false)); } RETURN_IF_ERROR(_load_block_queue->add_block( - output_block, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); + output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); } return Status::OK(); } -Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool is_blocks_contain_all_load_data) { +Status GroupCommitBlockSink::_add_blocks(bool is_blocks_contain_all_load_data) { DCHECK(_is_block_appended == false); TUniqueId load_id; load_id.__set_hi(_load_id.hi); @@ -241,18 +238,15 @@ Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool is_blocks_contain_ RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue( _db_id, _table_id, _base_schema_version, load_id, _load_block_queue, _state->be_exec_version())); - if (write_wal) { + if (_group_commit_mode == TGroupCommitMode::ASYNC_MODE) { _group_commit_mode = _load_block_queue->has_enough_wal_disk_space( _blocks, load_id, is_blocks_contain_all_load_data) ? TGroupCommitMode::ASYNC_MODE : TGroupCommitMode::SYNC_MODE; if (_group_commit_mode == TGroupCommitMode::SYNC_MODE) { - LOG(INFO) - << "Load label " << _load_block_queue->label - << " will not write wal because wal disk space usage reachs max limit."; - } else { - LOG(INFO) << "Load label " << _load_block_queue->label << " will write wal to " - << _load_block_queue->wal_base_path << "."; + LOG(INFO) << "Load id=" << print_id(_state->query_id()) + << ", use group commit label=" << _load_block_queue->label + << " will not write wal because wal disk space usage reach max limit"; } } _state->set_import_label(_load_block_queue->label); @@ -263,7 +257,7 @@ Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool is_blocks_contain_ } for (auto it = _blocks.begin(); it != _blocks.end(); ++it) { RETURN_IF_ERROR(_load_block_queue->add_block( - *it, _group_commit_mode != TGroupCommitMode::SYNC_MODE)); + *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE)); } _is_block_appended = true; _blocks.clear(); diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index c0971f4801..84ffebf8fe 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -47,7 +47,7 @@ public: private: Status _add_block(RuntimeState* state, std::shared_ptr block); - Status _add_blocks(bool write_wal, bool is_blocks_contain_all_load_data); + Status _add_blocks(bool is_blocks_contain_all_load_data); vectorized::VExprContextSPtrs _output_vexpr_ctxs; @@ -70,6 +70,8 @@ private: std::vector> _blocks; bool _is_block_appended = false; double _max_filter_ratio = 0.0; + + // used for find_partition VOlapTablePartitionParam* _vpartition = nullptr; // reuse for find_tablet. std::vector _partitions; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 8106aed1ac..1aff92b4d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -154,6 +154,7 @@ public class NativeInsertStmt extends InsertStmt { private long tableId = -1; public boolean isGroupCommitStreamLoadSql = false; private GroupCommitPlanner groupCommitPlanner; + private boolean reuseGroupCommitPlan = false; private boolean isFromDeleteOrUpdateStmt = false; @@ -1162,12 +1163,18 @@ public class NativeInsertStmt extends InsertStmt { return isGroupCommit; } + public boolean isReuseGroupCommitPlan() { + return reuseGroupCommitPlan; + } + public GroupCommitPlanner planForGroupCommit(TUniqueId queryId) throws UserException, TException { OlapTable olapTable = (OlapTable) getTargetTable(); if (groupCommitPlanner != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) { LOG.debug("reuse group commit plan, table={}", olapTable); + reuseGroupCommitPlan = true; return groupCommitPlanner; } + reuseGroupCommitPlan = false; if (!targetColumns.isEmpty()) { Analyzer analyzerTmp = analyzer; reset(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java index 8030dcf4e5..86f588e525 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java @@ -58,6 +58,8 @@ public class GroupCommitBlockSink extends OlapTableSink { return TGroupCommitMode.ASYNC_MODE; } else if (groupCommit.equalsIgnoreCase("sync_mode")) { return TGroupCommitMode.SYNC_MODE; + } else if (groupCommit.equalsIgnoreCase("off_mode")) { + return TGroupCommitMode.OFF_MODE; } else { return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 02402e69c7..d594fd395e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.parser.ParseDialect.Dialect; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.qe.VariableMgr.VarAttr; +import org.apache.doris.thrift.TGroupCommitMode; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterType; @@ -3176,7 +3177,9 @@ public class SessionVariable implements Serializable, Writable { } public boolean isEnableInsertGroupCommit() { - return Config.wait_internal_group_commit_finish || GroupCommitBlockSink.parseGroupCommit(groupCommit) != null; + return Config.wait_internal_group_commit_finish + || GroupCommitBlockSink.parseGroupCommit(groupCommit) == TGroupCommitMode.ASYNC_MODE + || GroupCommitBlockSink.parseGroupCommit(groupCommit) == TGroupCommitMode.SYNC_MODE; } public String getGroupCommit() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index a897c6e868..2851304dc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1919,6 +1919,7 @@ public class StmtExecutor { String errMsg = ""; TableType tblType = insertStmt.getTargetTable().getType(); boolean isGroupCommit = false; + boolean reuseGroupCommitPlan = false; if (context.isTxnModel()) { if (insertStmt.getQueryStmt() instanceof SelectStmt) { if (((SelectStmt) insertStmt.getQueryStmt()).getTableRefs().size() > 0) { @@ -1935,6 +1936,7 @@ public class StmtExecutor { int maxRetry = 3; for (int i = 0; i < maxRetry; i++) { GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId); + reuseGroupCommitPlan = nativeInsertStmt.isReuseGroupCommitPlan(); List rows = groupCommitPlanner.getRows(nativeInsertStmt); PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows); TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); @@ -2106,6 +2108,9 @@ public class StmtExecutor { } if (isGroupCommit) { sb.append(", 'query_id':'").append(DebugUtil.printId(context.queryId)).append("'"); + if (reuseGroupCommitPlan) { + sb.append(", 'reuse_group_commit_plan':'").append(true).append("'"); + } } sb.append("}"); diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 4b4fa430de..391e106aa2 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -215,7 +215,7 @@ suite("insert_group_commit_into") { } // test connect to observer fe - /*try { + try { def fes = sql_return_maparray "show frontends" logger.info("frontends: ${fes}") if (fes.size() > 1) { @@ -233,6 +233,7 @@ suite("insert_group_commit_into") { sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ sql """ set enable_profile= true; """ + sql """ set enable_nereids_planner = false; """ // 1. insert into def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1 @@ -261,7 +262,7 @@ suite("insert_group_commit_into") { logger.info("only one fe, skip test connect to observer fe") } } finally { - }*/ + } // table with array type tableName = "insert_group_commit_into_duplicate_array" diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy index 2a88adf5bf..0d8d7a8416 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -18,10 +18,15 @@ import com.mysql.cj.ServerPreparedQuery import com.mysql.cj.jdbc.ConnectionImpl import com.mysql.cj.jdbc.JdbcStatement -import com.mysql.cj.jdbc.ServerPreparedStatement +import com.mysql.cj.jdbc.ServerPreparedStatement; import com.mysql.cj.jdbc.StatementImpl +import com.mysql.cj.jdbc.result.ResultSetImpl +import com.mysql.cj.jdbc.result.ResultSetInternalMethods import java.lang.reflect.Field +import java.sql.ResultSet +import java.util.ArrayList +import java.util.List import java.util.concurrent.CopyOnWriteArrayList suite("insert_group_commit_with_prepare_stmt") { @@ -72,22 +77,38 @@ suite("insert_group_commit_with_prepare_stmt") { stmt.addBatch() } - def group_commit_insert = { stmt, expected_row_count -> + def group_commit_insert = { stmt, expected_row_count, reuse_plan = false -> def result = stmt.executeBatch() logger.info("insert result: " + result) def results = ((StatementImpl) stmt).results - if (results == null) { - logger.warn("result is null") - return - } - def serverInfo = results.getServerInfo() - logger.info("result server info: " + serverInfo) - if (result != expected_row_count) { - logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count) + if (results != null) { + def serverInfo = results.getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count) + } + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + assertEquals(reuse_plan, serverInfo.contains("reuse_group_commit_plan")) + } else { + // for batch insert + ConnectionImpl connection = (ConnectionImpl) stmt.getConnection() + Field field = ConnectionImpl.class.getDeclaredField("openStatements") + field.setAccessible(true) + CopyOnWriteArrayList openStatements = (CopyOnWriteArrayList) field.get(connection) + for (JdbcStatement openStatement : openStatements) { + ServerPreparedStatement serverPreparedStatement = (ServerPreparedStatement) openStatement; + Field field2 = StatementImpl.class.getDeclaredField("results"); + field2.setAccessible(true); + ResultSet resultSet = (ResultSetInternalMethods) field2.get(serverPreparedStatement); + if (resultSet != null) { + ResultSetImpl resultSetImpl = (ResultSetImpl) resultSet; + String serverInfo = resultSetImpl.getServerInfo(); + logger.info("serverInfo = " + serverInfo); + } + } } // assertEquals(result, expected_row_count) - assertTrue(serverInfo.contains("'status':'PREPARE'")) - assertTrue(serverInfo.contains("'label':'group_commit_")) } def getStmtId = { stmt -> @@ -140,12 +161,12 @@ suite("insert_group_commit_with_prepare_stmt") { insert_prepared insert_stmt, 2, null, 20 insert_prepared insert_stmt, 3, "c", null insert_prepared insert_stmt, 4, "d", 40 - group_commit_insert insert_stmt, 3 + group_commit_insert insert_stmt, 3, true assertEquals(stmtId, getStmtId(insert_stmt)) insert_prepared insert_stmt, 5, "e", null insert_prepared insert_stmt, 6, "f", 40 - group_commit_insert insert_stmt, 2 + group_commit_insert insert_stmt, 2, true assertEquals(stmtId, getStmtId(insert_stmt)) getRowCount(6) @@ -161,7 +182,7 @@ suite("insert_group_commit_with_prepare_stmt") { insert_prepared_partial insert_stmt, 'e', 7, 0 insert_prepared_partial insert_stmt, null, 8, 0 - group_commit_insert insert_stmt, 2 + group_commit_insert insert_stmt, 2, true assertEquals(stmtId2, getStmtId(insert_stmt)) getRowCount(7)