From b19f275714f270c598589fbb039e0f8a682b21f0 Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 3 Nov 2023 12:02:40 +0800 Subject: [PATCH] [improvement](insert) refactor group commit insert into (#25795) --- be/src/pipeline/pipeline_task.cpp | 31 ++-- be/src/runtime/group_commit_mgr.cpp | 142 +----------------- be/src/runtime/group_commit_mgr.h | 22 --- be/src/runtime/plan_fragment_executor.cpp | 32 ++-- be/src/service/internal_service.cpp | 121 +++++++++------ be/src/service/internal_service.h | 5 +- be/src/vec/sink/group_commit_block_sink.cpp | 5 +- .../doris/analysis/NativeInsertStmt.java | 66 ++++---- .../org/apache/doris/qe/StmtExecutor.java | 4 +- gensrc/proto/internal_service.proto | 8 +- .../insert_group_commit_into_duplicate.groovy | 18 +++ 11 files changed, 182 insertions(+), 272 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 111259062c..72552b4846 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -256,6 +256,18 @@ Status PipelineTask::execute(bool* eos) { } } + auto status = Status::OK(); + auto handle_group_commit = [&]() { + if (UNLIKELY(_fragment_context->is_group_commit() && !status.ok() && _block != nullptr)) { + auto* future_block = dynamic_cast(_block.get()); + std::unique_lock l(*(future_block->lock)); + if (!future_block->is_handled()) { + future_block->set_result(status, 0, 0); + future_block->cv->notify_all(); + } + } + }; + this->set_begin_execute_time(); while (!_fragment_context->is_canceled()) { if (_data_state != SourceState::MORE_DATA && !source_can_read()) { @@ -279,7 +291,11 @@ Status PipelineTask::execute(bool* eos) { { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); - RETURN_IF_ERROR(_root->get_block(_state, block, _data_state)); + status = _root->get_block(_state, block, _data_state); + if (UNLIKELY(!status.ok())) { + handle_group_commit(); + return status; + } } *eos = _data_state == SourceState::FINISHED; @@ -289,17 +305,8 @@ Status PipelineTask::execute(bool* eos) { _collect_query_statistics_with_every_batch) { RETURN_IF_ERROR(_collect_query_statistics()); } - auto status = _sink->sink(_state, block, _data_state); - if (UNLIKELY(!status.ok() || block->rows() == 0)) { - if (_fragment_context->is_group_commit()) { - auto* future_block = dynamic_cast(block); - std::unique_lock l(*(future_block->lock)); - if (!future_block->is_handled()) { - future_block->set_result(status, 0, 0); - future_block->cv->notify_all(); - } - } - } + status = _sink->sink(_state, block, _data_state); + handle_group_commit(); if (!status.is()) { RETURN_IF_ERROR(status); } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 718d7f19aa..02e484fd77 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -17,35 +17,18 @@ #include "runtime/group_commit_mgr.h" -#include -#include -#include -#include #include -#include -#include - #include "client_cache.h" #include "common/config.h" -#include "common/object_pool.h" -#include "exec/data_sink.h" -#include "io/fs/stream_load_pipe.h" #include "olap/wal_manager.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/runtime_state.h" -#include "runtime/stream_load/new_load_stream_mgr.h" -#include "runtime/stream_load/stream_load_context.h" #include "util/thrift_rpc_helper.h" -#include "vec/core/future_block.h" -#include "vec/exec/scan/new_file_scan_node.h" -#include "vec/sink/group_commit_block_sink.h" namespace doris { -class TPlan; - Status LoadBlockQueue::add_block(std::shared_ptr block) { DCHECK(block->get_schema_version() == schema_version); std::unique_lock l(*_mutex); @@ -203,7 +186,7 @@ Status GroupCommitTable::get_first_block_load_queue( Status GroupCommitTable::_create_group_commit_load( std::shared_ptr& load_block_queue) { Status st = Status::OK(); - std::unique_ptr> remove_pipe_func((int*)0x01, [&](int*) { + std::unique_ptr> finish_plan_func((int*)0x01, [&](int*) { if (!st.ok()) { std::unique_lock l(_lock); _need_plan_fragment = false; @@ -408,10 +391,6 @@ Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, } GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) { - static_cast(ThreadPoolBuilder("InsertIntoGroupCommitThreadPool") - .set_min_threads(config::group_commit_insert_threads) - .set_max_threads(config::group_commit_insert_threads) - .build(&_insert_into_thread_pool)); static_cast(ThreadPoolBuilder("GroupCommitThreadPool") .set_min_threads(1) .set_max_threads(config::group_commit_insert_threads) @@ -424,129 +403,10 @@ GroupCommitMgr::~GroupCommitMgr() { } void GroupCommitMgr::stop() { - _insert_into_thread_pool->shutdown(); _thread_pool->shutdown(); LOG(INFO) << "GroupCommitMgr is stopped"; } -Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan, - const TDescriptorTable& tdesc_tbl, - const TScanRangeParams& scan_range_params, - const PGroupCommitInsertRequest* request, - PGroupCommitInsertResponse* response) { - auto& nodes = plan.nodes; - DCHECK(nodes.size() > 0); - auto& plan_node = nodes.at(0); - - TUniqueId load_id; - load_id.__set_hi(request->load_id().hi()); - load_id.__set_lo(request->load_id().lo()); - - std::vector> future_blocks; - { - // 1. Prepare a pipe, then append rows to pipe, - // then scan node scans from the pipe, like stream load. - std::shared_ptr load_block_queue; - auto pipe = std::make_shared( - io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, - -1 /* total_length */, true /* use_proto */); - std::shared_ptr ctx = std::make_shared(_exec_env); - ctx->pipe = pipe; - RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(load_id, ctx)); - std::unique_ptr> remove_pipe_func((int*)0x01, [&](int*) { - if (load_block_queue != nullptr) { - load_block_queue->remove_load_id(load_id); - } - _exec_env->new_load_stream_mgr()->remove(load_id); - }); - static_cast(_insert_into_thread_pool->submit_func( - std::bind(&GroupCommitMgr::_append_row, this, pipe, request))); - - // 2. FileScanNode consumes data from the pipe. - std::unique_ptr runtime_state = RuntimeState::create_unique(); - TQueryOptions query_options; - query_options.query_type = TQueryType::LOAD; - TQueryGlobals query_globals; - static_cast(runtime_state->init(load_id, query_options, query_globals, _exec_env)); - runtime_state->set_query_mem_tracker(std::make_shared( - MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(load_id)), -1)); - DescriptorTbl* desc_tbl = nullptr; - RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), tdesc_tbl, &desc_tbl)); - runtime_state->set_desc_tbl(desc_tbl); - auto file_scan_node = - vectorized::NewFileScanNode(runtime_state->obj_pool(), plan_node, *desc_tbl); - std::unique_ptr> close_scan_node_func((int*)0x01, [&](int*) { - static_cast(file_scan_node.close(runtime_state.get())); - }); - // TFileFormatType::FORMAT_PROTO, TFileType::FILE_STREAM, set _range.load_id - RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get())); - RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get())); - std::vector params_vector; - params_vector.emplace_back(scan_range_params); - file_scan_node.set_scan_ranges(runtime_state.get(), params_vector); - RETURN_IF_ERROR(file_scan_node.open(runtime_state.get())); - - // 3. Put the block into block queue. - std::unique_ptr _block = - doris::vectorized::Block::create_unique(); - bool eof = false; - while (!eof) { - // TODO what to do if read one block error - RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), _block.get(), &eof)); - std::shared_ptr future_block = - std::make_shared(); - future_block->swap(*(_block.get())); - future_block->set_info(request->base_schema_version(), load_id); - if (load_block_queue == nullptr) { - RETURN_IF_ERROR(get_first_block_load_queue(request->db_id(), table_id, future_block, - load_block_queue)); - response->set_label(load_block_queue->label); - response->set_txn_id(load_block_queue->txn_id); - } - // TODO what to do if add one block error - if (future_block->rows() > 0) { - future_blocks.emplace_back(future_block); - } - RETURN_IF_ERROR(load_block_queue->add_block(future_block)); - } - if (!runtime_state->get_error_log_file_path().empty()) { - LOG(INFO) << "id=" << print_id(load_id) - << ", url=" << runtime_state->get_error_log_file_path() - << ", load rows=" << runtime_state->num_rows_load_total() - << ", filter rows=" << runtime_state->num_rows_load_filtered() - << ", unselect rows=" << runtime_state->num_rows_load_unselected() - << ", success rows=" << runtime_state->num_rows_load_success(); - } - } - int64_t total_rows = 0; - int64_t loaded_rows = 0; - // 4. wait to wal - for (const auto& future_block : future_blocks) { - std::unique_lock l(*(future_block->lock)); - if (!future_block->is_handled()) { - future_block->cv->wait(l); - } - // future_block->get_status() - total_rows += future_block->get_total_rows(); - loaded_rows += future_block->get_loaded_rows(); - } - response->set_loaded_rows(loaded_rows); - response->set_filtered_rows(total_rows - loaded_rows); - return Status::OK(); -} - -Status GroupCommitMgr::_append_row(std::shared_ptr pipe, - const PGroupCommitInsertRequest* request) { - for (int i = 0; i < request->data().size(); ++i) { - std::unique_ptr row(new PDataRow()); - row->CopyFrom(request->data(i)); - // TODO append may error when pipe is cancelled - RETURN_IF_ERROR(pipe->append(std::move(row))); - } - static_cast(pipe->finish()); - return Status::OK(); -} - Status GroupCommitMgr::get_first_block_load_queue( int64_t db_id, int64_t table_id, std::shared_ptr block, std::shared_ptr& load_block_queue) { diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 9bbbc5da61..4983811233 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -23,23 +23,15 @@ #include #include "common/status.h" -#include "io/fs/stream_load_pipe.h" #include "util/lock.h" #include "util/threadpool.h" -#include "util/thrift_util.h" #include "vec/core/block.h" #include "vec/core/future_block.h" namespace doris { class ExecEnv; -class TPlan; -class TDescriptorTable; class TUniqueId; -class TExecPlanFragmentParams; -class ObjectPool; class RuntimeState; -class StreamLoadContext; -class StreamLoadPipe; class LoadBlockQueue { public: @@ -131,13 +123,6 @@ public: void stop(); - // insert into - Status group_commit_insert(int64_t table_id, const TPlan& plan, - const TDescriptorTable& desc_tbl, - const TScanRangeParams& scan_range_params, - const PGroupCommitInsertRequest* request, - PGroupCommitInsertResponse* response); - // used when init group_commit_scan_node Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, std::shared_ptr& load_block_queue); @@ -146,18 +131,11 @@ public: std::shared_ptr& load_block_queue); private: - // used by insert into - Status _append_row(std::shared_ptr pipe, - const PGroupCommitInsertRequest* request); - ExecEnv* _exec_env; doris::Mutex _lock; // TODO remove table when unused std::unordered_map> _table_map; - - // thread pool to handle insert into: append data to pipe - std::unique_ptr _insert_into_thread_pool; std::unique_ptr _thread_pool; // memory consumption of all tables' load block queues, used for back pressure. std::shared_ptr _all_block_queues_bytes; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index a58744a569..9344378e91 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -332,9 +332,25 @@ Status PlanFragmentExecutor::open_vectorized_internal() { : doris::vectorized::Block::create_unique(); bool eos = false; + auto st = Status::OK(); + auto handle_group_commit = [&]() { + if (UNLIKELY(_group_commit && !st.ok() && block != nullptr)) { + auto* future_block = dynamic_cast(block.get()); + std::unique_lock l(*(future_block->lock)); + if (!future_block->is_handled()) { + future_block->set_result(st, 0, 0); + future_block->cv->notify_all(); + } + } + }; + while (!eos) { RETURN_IF_CANCELLED(_runtime_state); - RETURN_IF_ERROR(get_vectorized_internal(block.get(), &eos)); + st = get_vectorized_internal(block.get(), &eos); + if (UNLIKELY(!st.ok())) { + handle_group_commit(); + return st; + } // Collect this plan and sub plan statistics, and send to parent plan. if (_collect_query_statistics_with_every_batch) { @@ -342,23 +358,13 @@ Status PlanFragmentExecutor::open_vectorized_internal() { } if (!eos || block->rows() > 0) { - auto st = _sink->send(runtime_state(), block.get()); + st = _sink->send(runtime_state(), block.get()); //TODO: Asynchronisation need refactor this if (st.is()) { // created partition, do it again. st = _sink->send(runtime_state(), block.get()); DCHECK(!st.is()); } - if (UNLIKELY(!st.ok() || block->rows() == 0)) { - // Used for group commit insert - if (_group_commit) { - auto* future_block = dynamic_cast(block.get()); - std::unique_lock l(*(future_block->lock)); - if (!future_block->is_handled()) { - future_block->set_result(st, 0, 0); - future_block->cv->notify_all(); - } - } - } + handle_group_commit(); if (st.is()) { break; } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index f80f4ddb5e..cfaef4895b 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -86,7 +86,6 @@ #include "runtime/exec_env.h" #include "runtime/fold_constant_executor.h" #include "runtime/fragment_mgr.h" -#include "runtime/group_commit_mgr.h" #include "runtime/load_channel_mgr.h" #include "runtime/load_stream_mgr.h" #include "runtime/result_buffer_mgr.h" @@ -498,9 +497,9 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* } } -Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_request, - PFragmentRequestVersion version, - bool compact) { +Status PInternalServiceImpl::_exec_plan_fragment_impl( + const std::string& ser_request, PFragmentRequestVersion version, bool compact, + const std::function& cb) { // Sometimes the BE do not receive the first heartbeat message and it receives request from FE // If BE execute this fragment, it will core when it wants to get some property from master info. if (ExecEnv::GetInstance()->master_info() == nullptr) { @@ -516,7 +515,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req uint32_t len = ser_request.size(); RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact, &t_request)); } - return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); + if (cb) { + return _exec_env->fragment_mgr()->exec_plan_fragment(t_request, cb); + } else { + return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); + } } else if (version == PFragmentRequestVersion::VERSION_2) { TExecPlanFragmentParamsList t_request; { @@ -526,7 +529,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req } for (const TExecPlanFragmentParams& params : t_request.paramsList) { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + if (cb) { + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb)); + } else { + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + } } return Status::OK(); } else if (version == PFragmentRequestVersion::VERSION_3) { @@ -538,7 +545,11 @@ Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req } for (const TPipelineFragmentParams& params : t_request.params_list) { - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + if (cb) { + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params, cb)); + } else { + RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params)); + } } return Status::OK(); } else { @@ -1785,53 +1796,65 @@ void PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController* const PGroupCommitInsertRequest* request, PGroupCommitInsertResponse* response, google::protobuf::Closure* done) { - bool ret = _light_work_pool.try_offer([this, request, response, done]() { + TUniqueId load_id; + load_id.__set_hi(request->load_id().hi()); + load_id.__set_lo(request->load_id().lo()); + bool ret = _light_work_pool.try_offer([this, request, response, done, load_id]() { brpc::ClosureGuard closure_guard(done); - auto table_id = request->table_id(); - Status st = Status::OK(); - TPlan plan; - { - auto& plan_node = request->plan_node(); - const uint8_t* buf = (const uint8_t*)plan_node.data(); - uint32_t len = plan_node.size(); - st = deserialize_thrift_msg(buf, &len, false, &plan); - if (UNLIKELY(!st.ok())) { - LOG(WARNING) << "deserialize plan failed, msg=" << st; - response->mutable_status()->set_status_code(st.code()); - response->mutable_status()->set_error_msgs(0, st.to_string()); - return; + std::shared_ptr ctx = std::make_shared(_exec_env); + auto pipe = std::make_shared( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + -1 /* total_length */, true /* use_proto */); + ctx->pipe = pipe; + Status st = _exec_env->new_load_stream_mgr()->put(load_id, ctx); + if (st.ok()) { + doris::Mutex mutex; + doris::ConditionVariable cv; + bool handled = false; + try { + st = _exec_plan_fragment_impl( + request->exec_plan_fragment_request().request(), + request->exec_plan_fragment_request().version(), + request->exec_plan_fragment_request().compact(), + [&](RuntimeState* state, Status* status) { + response->set_label(state->import_label()); + response->set_txn_id(state->wal_id()); + response->set_loaded_rows(state->num_rows_load_success()); + response->set_filtered_rows(state->num_rows_load_filtered()); + st = *status; + std::unique_lock l(mutex); + handled = true; + cv.notify_one(); + }); + } catch (const Exception& e) { + st = e.to_status(); + } catch (...) { + st = Status::Error(ErrorCode::INTERNAL_ERROR, + "_exec_plan_fragment_impl meet unknown error"); + } + if (!st.ok()) { + LOG(WARNING) << "exec plan fragment failed, errmsg=" << st; + } else { + for (int i = 0; i < request->data().size(); ++i) { + std::unique_ptr row(new PDataRow()); + row->CopyFrom(request->data(i)); + st = pipe->append(std::move(row)); + if (!st.ok()) { + break; + } + } + if (st.ok()) { + static_cast(pipe->finish()); + std::unique_lock l(mutex); + if (!handled) { + cv.wait(l); + } + } } } - TDescriptorTable tdesc_tbl; - { - auto& desc_tbl = request->desc_tbl(); - const uint8_t* buf = (const uint8_t*)desc_tbl.data(); - uint32_t len = desc_tbl.size(); - st = deserialize_thrift_msg(buf, &len, false, &tdesc_tbl); - if (UNLIKELY(!st.ok())) { - LOG(WARNING) << "deserialize desc tbl failed, msg=" << st; - response->mutable_status()->set_status_code(st.code()); - response->mutable_status()->set_error_msgs(0, st.to_string()); - return; - } - } - TScanRangeParams tscan_range_params; - { - auto& bytes = request->scan_range_params(); - const uint8_t* buf = (const uint8_t*)bytes.data(); - uint32_t len = bytes.size(); - st = deserialize_thrift_msg(buf, &len, false, &tscan_range_params); - if (UNLIKELY(!st.ok())) { - LOG(WARNING) << "deserialize scan range failed, msg=" << st; - response->mutable_status()->set_status_code(st.code()); - response->mutable_status()->set_error_msgs(0, st.to_string()); - return; - } - } - st = _exec_env->group_commit_mgr()->group_commit_insert( - table_id, plan, tdesc_tbl, tscan_range_params, request, response); st.to_protobuf(response->mutable_status()); }); + _exec_env->new_load_stream_mgr()->remove(load_id); if (!ret) { offer_failed(response, done, _light_work_pool); return; diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index d5712e654c..3ef14b6c26 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -38,6 +38,7 @@ class ExecEnv; class PHandShakeRequest; class PHandShakeResponse; class LoadStreamMgr; +class RuntimeState; class PInternalServiceImpl : public PBackendService { public: @@ -211,7 +212,9 @@ private: google::protobuf::Closure* done); Status _exec_plan_fragment_impl(const std::string& s_request, PFragmentRequestVersion version, - bool compact); + bool compact, + const std::function& cb = + std::function()); Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response); diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index d7df3a2e69..dcef9dccec 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -94,7 +94,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { if (!future_block->is_handled()) { future_block->cv->wait(l); } - // future_block->get_status() + RETURN_IF_ERROR(future_block->get_status()); loaded_rows += future_block->get_loaded_rows(); total_rows += future_block->get_total_rows(); } @@ -159,8 +159,9 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, state->set_import_label(_load_block_queue->label); state->set_wal_id(_load_block_queue->txn_id); } + RETURN_IF_ERROR(_load_block_queue->add_block(future_block)); _future_blocks.emplace_back(future_block); - return _load_block_queue->add_block(future_block); + return Status::OK(); } } // namespace vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 9f371fbd03..68bec33a5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -53,18 +53,18 @@ import org.apache.doris.planner.GroupCommitOlapTableSink; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.planner.external.jdbc.JdbcTableSink; +import org.apache.doris.proto.InternalService; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.service.FrontendOptions; import org.apache.doris.tablefunction.GroupCommitTableValuedFunction; import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TMergeType; -import org.apache.doris.thrift.TPlan; -import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TScanRangeParams; import org.apache.doris.thrift.TStreamLoadPutRequest; @@ -162,9 +162,7 @@ public class NativeInsertStmt extends InsertStmt { private boolean isGroupCommit = false; private int baseSchemaVersion = -1; private TUniqueId loadId = null; - private ByteString planBytes = null; - private ByteString tableBytes = null; - private ByteString rangeBytes = null; + private ByteString execPlanFragmentParamsBytes = null; private long tableId = -1; // true if be generates an insert from group commit tvf stmt and executes to load data public boolean isGroupCommitTvf = false; @@ -1074,10 +1072,35 @@ public class NativeInsertStmt extends InsertStmt { } private void analyzeGroupCommit() { - if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit && targetTable instanceof OlapTable + if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit + && targetTable instanceof OlapTable && !ConnectContext.get().isTxnModel() && getQueryStmt() instanceof SelectStmt - && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && targetPartitionNames == null) { + && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && targetPartitionNames == null + && (label == null || Strings.isNullOrEmpty(label.getLabelName())) + && (analyzer == null || analyzer != null && !analyzer.isReAnalyze())) { + SelectStmt selectStmt = (SelectStmt) queryStmt; + if (selectStmt.getValueList() != null) { + for (List row : selectStmt.getValueList().getRows()) { + for (Expr expr : row) { + if (!expr.isLiteralOrCastExpr()) { + return; + } + } + } + } else { + SelectList selectList = selectStmt.getSelectList(); + if (selectList != null) { + List items = selectList.getItems(); + if (items != null) { + for (SelectListItem item : items) { + if (item.getExpr() != null && !item.getExpr().isLiteralOrCastExpr()) { + return; + } + } + } + } + } isGroupCommit = true; } } @@ -1088,7 +1111,7 @@ public class NativeInsertStmt extends InsertStmt { public void planForGroupCommit(TUniqueId queryId) throws UserException, TException { OlapTable olapTable = (OlapTable) getTargetTable(); - if (planBytes != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) { + if (execPlanFragmentParamsBytes != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) { return; } if (!targetColumns.isEmpty()) { @@ -1110,9 +1133,6 @@ public class NativeInsertStmt extends InsertStmt { StreamLoadPlanner planner = new StreamLoadPlanner((Database) getDbObj(), olapTable, streamLoadTask); // Will using load id as query id in fragment TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); - DescriptorTable descTable = planner.getDescTable(); - TPlanFragment fragment = tRequest.getFragment(); - TPlan plan = fragment.getPlan(); for (Map.Entry> entry : tRequest.params.per_node_scan_ranges.entrySet()) { for (TScanRangeParams scanRangeParams : entry.getValue()) { scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( @@ -1126,32 +1146,26 @@ public class NativeInsertStmt extends InsertStmt { Preconditions.checkState(scanRangeParams.size() == 1); // save plan message to be reused for prepare stmt loadId = queryId; - planBytes = ByteString.copyFrom(new TSerializer().serialize(plan)); - tableBytes = ByteString.copyFrom(new TSerializer().serialize(descTable.toThrift())); - rangeBytes = ByteString.copyFrom(new TSerializer().serialize(scanRangeParams.get(0))); baseSchemaVersion = olapTable.getBaseSchemaVersion(); + // see BackendServiceProxy#execPlanFragmentsAsync + TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); + paramsList.addToParamsList(tRequest); + execPlanFragmentParamsBytes = ByteString.copyFrom(new TSerializer().serialize(paramsList)); + } + + public InternalService.PExecPlanFragmentRequest getExecPlanFragmentRequest() { + return InternalService.PExecPlanFragmentRequest.newBuilder().setRequest(execPlanFragmentParamsBytes) + .setCompact(false).setVersion(InternalService.PFragmentRequestVersion.VERSION_2).build(); } public TUniqueId getLoadId() { return loadId; } - public ByteString getPlanBytes() { - return planBytes; - } - - public ByteString getTableBytes() { - return tableBytes; - } - public int getBaseSchemaVersion() { return baseSchemaVersion; } - public ByteString getRangeBytes() { - return rangeBytes; - } - public void setIsFromDeleteOrUpdateStmt(boolean isFromDeleteOrUpdateStmt) { this.isFromDeleteOrUpdateStmt = isFromDeleteOrUpdateStmt; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 09afe49943..82d6188bfc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1845,10 +1845,8 @@ public class StmtExecutor { PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder() .setDbId(insertStmt.getTargetTable().getDatabase().getId()) .setTableId(insertStmt.getTargetTable().getId()) - .setDescTbl(nativeInsertStmt.getTableBytes()) .setBaseSchemaVersion(nativeInsertStmt.getBaseSchemaVersion()) - .setPlanNode(nativeInsertStmt.getPlanBytes()) - .setScanRangeParams(nativeInsertStmt.getRangeBytes()) + .setExecPlanFragmentRequest(nativeInsertStmt.getExecPlanFragmentRequest()) .setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo) .build()).addAllData(rows) .build(); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 5881e5bf2a..99f4464fd5 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -720,16 +720,18 @@ message PGroupCommitInsertRequest { optional int64 db_id = 1; optional int64 table_id = 2; // Descriptors.TDescriptorTable - optional bytes desc_tbl = 3; + // optional bytes desc_tbl = 3; optional int64 base_schema_version = 4; // TExecPlanFragmentParams -> TPlanFragment -> PlanNodes.TPlan - optional bytes plan_node = 5; + // optional bytes plan_node = 5; // TScanRangeParams - optional bytes scan_range_params = 6; + // optional bytes scan_range_params = 6; optional PUniqueId load_id = 7; repeated PDataRow data = 8; + // TExecPlanFragmentParams + optional PExecPlanFragmentRequest exec_plan_fragment_request = 9; } message PGroupCommitInsertResponse { diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy index c5ee05ac1e..9fbc6aaf36 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy @@ -67,6 +67,17 @@ suite("insert_group_commit_into_duplicate") { assertTrue(serverInfo.contains("'label':'group_commit_")) } + def none_group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'VISIBLE'")) + assertTrue(!serverInfo.contains("'label':'group_commit_")) + } + try { // create table sql """ drop table if exists ${table}; """ @@ -174,6 +185,13 @@ suite("insert_group_commit_into_duplicate") { getRowCount(20) qt_sql """ select name, score from ${table} order by name asc; """ + + none_group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1 + none_group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1 + none_group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1 + def rowCount = sql "select count(*) from ${table}" + logger.info("row count: " + rowCount) + assertEquals(rowCount[0][0], 23) } } finally { // try_sql("DROP TABLE ${table}")