diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 4c30994733..eb03e00450 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1084,6 +1084,9 @@ DEFINE_Int32(grace_shutdown_wait_seconds, "120"); DEFINE_Int16(bitmap_serialize_version, "1"); +// the count of thread to group commit insert +DEFINE_Int32(group_commit_insert_threads, "10"); + #ifdef BE_TEST // test s3 DEFINE_String(test_s3_resource, "resource"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 61c9cc7d08..c37c58ea98 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1153,6 +1153,9 @@ DECLARE_Int32(grace_shutdown_wait_seconds); // BitmapValue serialize version. DECLARE_Int16(bitmap_serialize_version); +// This config can be set to limit thread number in group commit insert thread pool. +DECLARE_mInt32(group_commit_insert_threads); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 5c40475eeb..d5af0cff62 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -31,6 +31,7 @@ #include "common/config.h" #include "vec/sink/async_writer_sink.h" +#include "vec/sink/group_commit_vtablet_sink.h" #include "vec/sink/multi_cast_data_stream_sink.h" #include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vmemory_scratch_sink.h" @@ -155,6 +156,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink RETURN_IF_ERROR(status); break; } + case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: { + Status status; + DCHECK(thrift_sink.__isset.olap_table_sink); + sink->reset( + new stream_load::GroupCommitVOlapTableSink(pool, row_desc, output_exprs, &status)); + RETURN_IF_ERROR(status); + break; + } case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support in pipeline engine"); } @@ -303,7 +312,14 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink sink->reset(new vectorized::MultiCastDataStreamSink(multi_cast_data_streamer)); break; } - + case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: { + Status status; + DCHECK(thrift_sink.__isset.olap_table_sink); + sink->reset( + new stream_load::GroupCommitVOlapTableSink(pool, row_desc, output_exprs, &status)); + RETURN_IF_ERROR(status); + break; + } default: { std::stringstream error_msg; std::map::const_iterator i = diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 56902e3823..21f4ce9121 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -47,6 +47,7 @@ #include "vec/exec/distinct_vaggregation_node.h" #include "vec/exec/join/vhash_join_node.h" #include "vec/exec/join/vnested_loop_join_node.h" +#include "vec/exec/scan/group_commit_scan_node.h" #include "vec/exec/scan/new_es_scan_node.h" #include "vec/exec/scan/new_file_scan_node.h" #include "vec/exec/scan/new_jdbc_scan_node.h" @@ -324,6 +325,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::JDBC_SCAN_NODE: case TPlanNodeType::META_SCAN_NODE: case TPlanNodeType::PARTITION_SORT_NODE: + case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -451,6 +453,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::PARTITION_SORT_NODE: *node = pool->add(new vectorized::VPartitionSortNode(pool, tnode, descs)); return Status::OK(); + + case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: + *node = pool->add(new vectorized::GroupCommitScanNode(pool, tnode, descs)); + return Status::OK(); + default: std::map::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 9e6479be1b..f138d2fe4b 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -117,7 +117,7 @@ PipelineFragmentContext::PipelineFragmentContext( const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id, int backend_num, std::shared_ptr query_ctx, ExecEnv* exec_env, const std::function& call_back, - const report_status_callback& report_status_cb) + const report_status_callback& report_status_cb, bool group_commit) : _query_id(query_id), _fragment_instance_id(instance_id), _fragment_id(fragment_id), @@ -127,7 +127,8 @@ PipelineFragmentContext::PipelineFragmentContext( _call_back(call_back), _report_thread_active(false), _report_status_cb(report_status_cb), - _is_report_on_cancel(true) { + _is_report_on_cancel(true), + _group_commit(group_commit) { if (_query_ctx->get_task_group()) { _task_group_entity = _query_ctx->get_task_group()->task_entity(); } @@ -421,6 +422,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur case TPlanNodeType::ODBC_SCAN_NODE: case TPlanNodeType::FILE_SCAN_NODE: case TPlanNodeType::META_SCAN_NODE: + case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: case TPlanNodeType::ES_HTTP_SCAN_NODE: case TPlanNodeType::ES_SCAN_NODE: { OperatorBuilderPtr operator_t = std::make_shared(node->id(), node); @@ -733,6 +735,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr _sink.get()); break; } + case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: case TDataSinkType::OLAP_TABLE_SINK: { if (state->query_options().enable_memtable_on_sink_node) { sink_ = std::make_shared(next_operator_builder_id(), diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 8147eddd1c..47dad12d7c 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -63,7 +63,8 @@ public: const int fragment_id, int backend_num, std::shared_ptr query_ctx, ExecEnv* exec_env, const std::function& call_back, - const report_status_callback& report_status_cb); + const report_status_callback& report_status_cb, + bool group_commit = false); virtual ~PipelineFragmentContext(); @@ -133,6 +134,8 @@ public: return _task_group_entity; } + bool is_group_commit() { return _group_commit; } + protected: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); Status _build_pipelines(ExecNode*, PipelinePtr); @@ -211,6 +214,7 @@ protected: private: std::vector> _tasks; + bool _group_commit; }; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 035db53a27..261235d672 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -33,6 +33,7 @@ #include "task_queue.h" #include "util/defer_op.h" #include "util/runtime_profile.h" +#include "vec/core/future_block.h" namespace doris { class RuntimeState; @@ -160,7 +161,8 @@ Status PipelineTask::prepare(RuntimeState* state) { fmt::format_to(operator_ids_str, "]"); _task_profile->add_info_string("OperatorIds(source2root)", fmt::to_string(operator_ids_str)); - _block = doris::vectorized::Block::create_unique(); + _block = _fragment_context->is_group_commit() ? doris::vectorized::FutureBlock::create_unique() + : doris::vectorized::Block::create_unique(); // We should make sure initial state for task are runnable so that we can do some preparation jobs (e.g. initialize runtime filters). set_state(PipelineTaskState::RUNNABLE); @@ -277,6 +279,16 @@ Status PipelineTask::execute(bool* eos) { if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); auto status = _sink->sink(_state, block, _data_state); + if (UNLIKELY(!status.ok() || block->rows() == 0)) { + if (_fragment_context->is_group_commit()) { + auto* future_block = dynamic_cast(block); + std::unique_lock l(*(future_block->lock)); + if (!future_block->is_handled()) { + future_block->set_result(status, 0, 0); + future_block->cv->notify_all(); + } + } + } if (!status.is()) { RETURN_IF_ERROR(status); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index f240e4731a..18d69a5240 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -100,9 +100,9 @@ namespace doris::pipeline { PipelineXFragmentContext::PipelineXFragmentContext( const TUniqueId& query_id, const int fragment_id, std::shared_ptr query_ctx, ExecEnv* exec_env, const std::function& call_back, - const report_status_callback& report_status_cb) + const report_status_callback& report_status_cb, bool group_commit) : PipelineFragmentContext(query_id, TUniqueId(), fragment_id, -1, query_ctx, exec_env, - call_back, report_status_cb) {} + call_back, report_status_cb, group_commit) {} PipelineXFragmentContext::~PipelineXFragmentContext() { auto st = _query_ctx->exec_status(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 3a72adac11..e21a004c7a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -64,7 +64,8 @@ public: PipelineXFragmentContext(const TUniqueId& query_id, const int fragment_id, std::shared_ptr query_ctx, ExecEnv* exec_env, const std::function& call_back, - const report_status_callback& report_status_cb); + const report_status_callback& report_status_cb, + bool group_commit = false); ~PipelineXFragmentContext() override; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 212456b69b..f6c61d26f3 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -79,6 +79,7 @@ class ClientCache; class HeartbeatFlags; class FrontendServiceClient; class FileMetaCache; +class GroupCommitMgr; inline bool k_doris_exit = false; @@ -173,6 +174,7 @@ public: std::shared_ptr new_load_stream_mgr() { return _new_load_stream_mgr; } SmallFileMgr* small_file_mgr() { return _small_file_mgr; } BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; } + GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; } const std::vector& store_paths() const { return _store_paths; } @@ -286,6 +288,7 @@ private: std::mutex _frontends_lock; std::map _frontends; + GroupCommitMgr* _group_commit_mgr = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 5cf17d9e37..f53e5affbd 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -53,6 +53,7 @@ #include "runtime/exec_env.h" #include "runtime/external_scan_context_mgr.h" #include "runtime/fragment_mgr.h" +#include "runtime/group_commit_mgr.h" #include "runtime/heartbeat_flags.h" #include "runtime/load_channel_mgr.h" #include "runtime/load_path_mgr.h" @@ -194,6 +195,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { _routine_load_task_executor = new RoutineLoadTaskExecutor(this); _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); _block_spill_mgr = new BlockSpillManager(_store_paths); + _group_commit_mgr = new GroupCommitMgr(this); _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); _memtable_memory_limiter = std::make_unique(); @@ -433,6 +435,7 @@ void ExecEnv::_destroy() { SAFE_DELETE(_external_scan_context_mgr); SAFE_DELETE(_heartbeat_flags); SAFE_DELETE(_scanner_scheduler); + SAFE_DELETE(_group_commit_mgr); SAFE_DELETE(_file_meta_cache); // Master Info is a thrift object, it could be the last one to deconstruct. // Master info should be deconstruct later than fragment manager, because fragment will diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 96fe688297..fdb807c43d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -700,7 +700,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::make_shared( query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb, std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, - std::placeholders::_1)); + std::placeholders::_1), + params.group_commit); { SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params); diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp new file mode 100644 index 0000000000..5c9b3c1637 --- /dev/null +++ b/be/src/runtime/group_commit_mgr.cpp @@ -0,0 +1,515 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/group_commit_mgr.h" + +#include +#include +#include +#include +#include + +#include "client_cache.h" +#include "common/object_pool.h" +#include "exec/data_sink.h" +#include "io/fs/stream_load_pipe.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" +#include "util/thrift_rpc_helper.h" +#include "vec/exec/scan/new_file_scan_node.h" + +namespace doris { + +class TPlan; +class FragmentExecState; + +Status LoadBlockQueue::add_block(std::shared_ptr block) { + DCHECK(block->get_schema_version() == schema_version); + std::unique_lock l(*_mutex); + RETURN_IF_ERROR(_status); + if (block->rows() > 0) { + _block_queue.push_back(block); + } + if (block->is_eos()) { + _load_ids.erase(block->get_load_id()); + } else if (block->is_first()) { + _load_ids.emplace(block->get_load_id()); + } + _cv->notify_one(); + return Status::OK(); +} + +Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, bool* eos) { + *find_block = false; + *eos = false; + std::unique_lock l(*_mutex); + if (!need_commit) { + auto left_seconds = 10 - std::chrono::duration_cast( + std::chrono::steady_clock::now() - _start_time) + .count(); + if (left_seconds <= 0) { + need_commit = true; + } + } + while (_status.ok() && _block_queue.empty() && + (!need_commit || (need_commit && !_load_ids.empty()))) { + // TODO make 10s as a config + auto left_seconds = 10; + if (!need_commit) { + left_seconds = 10 - std::chrono::duration_cast( + std::chrono::steady_clock::now() - _start_time) + .count(); + if (left_seconds <= 0) { + need_commit = true; + break; + } + } +#if !defined(USE_BTHREAD_SCANNER) + _cv->wait_for(l, std::chrono::seconds(left_seconds)); +#else + _cv->wait_for(l, left_seconds * 1000000); +#endif + } + if (!_block_queue.empty()) { + auto& future_block = _block_queue.front(); + auto* fblock = static_cast(block); + fblock->swap_future_block(future_block); + *find_block = true; + _block_queue.pop_front(); + } + if (_block_queue.empty()) { + if (need_commit && _load_ids.empty()) { + *eos = true; + } else { + *eos = false; + } + } + return Status::OK(); +} + +void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { + std::unique_lock l(*_mutex); + if (_load_ids.find(load_id) != _load_ids.end()) { + _load_ids.erase(load_id); + _cv->notify_one(); + } +} + +void LoadBlockQueue::cancel(const Status& st) { + DCHECK(!st.ok()); + std::unique_lock l(*_mutex); + _status = st; + while (!_block_queue.empty()) { + { + auto& future_block = _block_queue.front(); + std::unique_lock l0(*(future_block->lock)); + future_block->set_result(st, future_block->rows(), 0); + future_block->cv->notify_all(); + } + _block_queue.pop_front(); + } +} + +Status GroupCommitTable::get_first_block_load_queue( + int64_t table_id, std::shared_ptr block, + std::shared_ptr& load_block_queue) { + DCHECK(table_id == _table_id); + DCHECK(block->is_first() == true); + { + std::unique_lock l(_lock); + for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) { + // TODO if block schema version is less than fragment schema version, return error + if (!it->second->need_commit && + it->second->schema_version == block->get_schema_version()) { + if (block->get_schema_version() == it->second->schema_version) { + load_block_queue = it->second; + break; + } else if (block->get_schema_version() < it->second->schema_version) { + return Status::DataQualityError("schema version not match"); + } + } + } + } + if (load_block_queue == nullptr) { + Status st = Status::OK(); + for (int i = 0; i < 3; ++i) { + std::unique_lock l(_request_fragment_mutex); + // check if there is a re-usefully fragment + { + std::unique_lock l1(_lock); + for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) { + // TODO if block schema version is less than fragment schema version, return error + if (!it->second->need_commit) { + if (block->get_schema_version() == it->second->schema_version) { + load_block_queue = it->second; + break; + } else if (block->get_schema_version() < it->second->schema_version) { + return Status::DataQualityError("schema version not match"); + } + } + } + } + if (load_block_queue == nullptr) { + st = _create_group_commit_load(table_id, load_block_queue); + if (LIKELY(st.ok())) { + break; + } + } + } + RETURN_IF_ERROR(st); + if (load_block_queue->schema_version != block->get_schema_version()) { + // TODO check this is the first block + return Status::DataQualityError("schema version not match"); + } + } + return Status::OK(); +} + +Status GroupCommitTable::_create_group_commit_load( + int64_t table_id, std::shared_ptr& load_block_queue) { + TStreamLoadPutRequest request; + std::stringstream ss; + ss << "insert into " << table_id << " select * from group_commit(\"table_id\"=\"" << table_id + << "\")"; + request.__set_load_sql(ss.str()); + UniqueId load_id = UniqueId::gen_uid(); + TUniqueId tload_id; + tload_id.__set_hi(load_id.hi); + tload_id.__set_lo(load_id.lo); + request.__set_loadId(tload_id); + std::string label = "group_commit_" + load_id.to_string(); + request.__set_label(label); + request.__set_token("group_commit"); // this is a fake, fe not check it now + request.__set_max_filter_ratio(1.0); + request.__set_strictMode(false); + if (_exec_env->master_info()->__isset.backend_id) { + request.__set_backend_id(_exec_env->master_info()->backend_id); + } else { + LOG(WARNING) << "_exec_env->master_info not set backend_id"; + } + TStreamLoadPutResult result; + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&result, &request](FrontendServiceConnection& client) { + client->streamLoadPut(result, request); + }, + 10000L)); + Status st = Status::create(result.status); + if (!st.ok()) { + LOG(WARNING) << "create group commit load error, st=" << st.to_string(); + } + RETURN_IF_ERROR(st); + auto schema_version = result.base_schema_version; + auto is_pipeline = result.__isset.pipeline_params; + auto& params = result.params; + auto& pipeline_params = result.pipeline_params; + int64_t txn_id; + TUniqueId instance_id; + if (!is_pipeline) { + DCHECK(params.fragment.output_sink.olap_table_sink.db_id == _db_id); + txn_id = params.txn_conf.txn_id; + instance_id = params.params.fragment_instance_id; + } else { + DCHECK(pipeline_params.fragment.output_sink.olap_table_sink.db_id == _db_id); + txn_id = pipeline_params.txn_conf.txn_id; + DCHECK(pipeline_params.local_params.size() == 1); + instance_id = pipeline_params.local_params[0].fragment_instance_id; + } + VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << table_id + << ", schema version=" << schema_version << ", label=" << label + << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id) + << ", is_pipeline=" << is_pipeline; + { + load_block_queue = + std::make_shared(instance_id, label, txn_id, schema_version); + std::unique_lock l(_lock); + _load_block_queues.emplace(instance_id, load_block_queue); + } + st = _exec_plan_fragment(_db_id, table_id, label, txn_id, is_pipeline, params, pipeline_params); + if (!st.ok()) { + _finish_group_commit_load(_db_id, table_id, label, txn_id, instance_id, st, true, nullptr); + } + return st; +} + +Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_id, + const std::string& label, int64_t txn_id, + const TUniqueId& instance_id, Status& status, + bool prepare_failed, RuntimeState* state) { + { + std::lock_guard l(_lock); + if (prepare_failed) { + auto it = _load_block_queues.find(instance_id); + if (it != _load_block_queues.end()) { + it->second->cancel(status); + } + } + _load_block_queues.erase(instance_id); + } + Status st; + Status result_status; + if (status.ok()) { + // commit txn + TLoadTxnCommitRequest request; + request.__set_auth_code(0); // this is a fake, fe not check it now + request.__set_db_id(db_id); + request.__set_table_id(table_id); + request.__set_txnId(txn_id); + if (state) { + request.__set_commitInfos(state->tablet_commit_infos()); + } + TLoadTxnCommitResult result; + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->loadTxnCommit(result, request); + }, + 10000L); + result_status = Status::create(result.status); + } else { + // abort txn + TLoadTxnRollbackRequest request; + request.__set_auth_code(0); // this is a fake, fe not check it now + request.__set_db_id(db_id); + request.__set_txnId(txn_id); + request.__set_reason(status.to_string()); + TLoadTxnRollbackResult result; + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->loadTxnRollback(result, request); + }, + 10000L); + result_status = Status::create(result.status); + } + if (!st.ok()) { + LOG(WARNING) << "request finish error, db_id=" << db_id << ", table_id=" << table_id + << ", label=" << label << ", txn_id=" << txn_id + << ", instance_id=" << print_id(instance_id) + << ", executor status=" << status.to_string() + << ", request commit status=" << st.to_string(); + return st; + } + // TODO handle execute and commit error + std::stringstream ss; + ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label + << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id); + if (prepare_failed) { + ss << ", prepare status=" << status.to_string(); + } else { + ss << ", execute status=" << status.to_string(); + } + ss << ", commit status=" << result_status.to_string(); + if (state && !(state->get_error_log_file_path().empty())) { + ss << ", error_url=" << state->get_error_log_file_path(); + } + LOG(INFO) << ss.str(); + return st; +} + +Status GroupCommitTable::_exec_plan_fragment(int64_t db_id, int64_t table_id, + const std::string& label, int64_t txn_id, + bool is_pipeline, + const TExecPlanFragmentParams& params, + const TPipelineFragmentParams& pipeline_params) { + auto finish_cb = [db_id, table_id, label, txn_id, this](RuntimeState* state, Status* status) { + _finish_group_commit_load(db_id, table_id, label, txn_id, state->fragment_instance_id(), + *status, false, state); + }; + if (is_pipeline) { + return _exec_env->fragment_mgr()->exec_plan_fragment(pipeline_params, finish_cb); + } else { + return _exec_env->fragment_mgr()->exec_plan_fragment(params, finish_cb); + } +} + +Status GroupCommitTable::get_load_block_queue(const TUniqueId& instance_id, + std::shared_ptr& load_block_queue) { + std::unique_lock l(_lock); + auto it = _load_block_queues.find(instance_id); + if (it == _load_block_queues.end()) { + return Status::InternalError("group commit load instance " + print_id(instance_id) + + " not found"); + } + load_block_queue = it->second; + return Status::OK(); +} + +GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) { + ThreadPoolBuilder("InsertIntoGroupCommitThreadPool") + .set_min_threads(config::group_commit_insert_threads) + .set_max_threads(config::group_commit_insert_threads) + .build(&_insert_into_thread_pool); +} + +GroupCommitMgr::~GroupCommitMgr() {} + +Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan, + const TDescriptorTable& tdesc_tbl, + const TScanRangeParams& scan_range_params, + const PGroupCommitInsertRequest* request, + PGroupCommitInsertResponse* response) { + auto& nodes = plan.nodes; + DCHECK(nodes.size() > 0); + auto& plan_node = nodes.at(0); + + TUniqueId load_id; + load_id.__set_hi(request->load_id().hi()); + load_id.__set_lo(request->load_id().lo()); + + std::vector> future_blocks; + { + // 1. Prepare a pipe, then append rows to pipe, + // then scan node scans from the pipe, like stream load. + std::shared_ptr load_block_queue; + auto pipe = std::make_shared( + io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, + -1 /* total_length */, true /* use_proto */); + std::shared_ptr ctx = std::make_shared(_exec_env); + ctx->pipe = pipe; + RETURN_IF_ERROR(_exec_env->new_load_stream_mgr()->put(load_id, ctx)); + std::unique_ptr> remove_pipe_func((int*)0x01, [&](int*) { + if (load_block_queue != nullptr) { + load_block_queue->remove_load_id(load_id); + } + _exec_env->new_load_stream_mgr()->remove(load_id); + }); + _insert_into_thread_pool->submit_func( + std::bind(&GroupCommitMgr::_append_row, this, pipe, request)); + + // 2. FileScanNode consumes data from the pipe. + std::unique_ptr runtime_state = RuntimeState::create_unique(); + TQueryOptions query_options; + query_options.query_type = TQueryType::LOAD; + TQueryGlobals query_globals; + runtime_state->init(load_id, query_options, query_globals, _exec_env); + runtime_state->set_query_mem_tracker(std::make_shared( + MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(load_id)), -1)); + DescriptorTbl* desc_tbl = nullptr; + RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), tdesc_tbl, &desc_tbl)); + runtime_state->set_desc_tbl(desc_tbl); + auto file_scan_node = + vectorized::NewFileScanNode(runtime_state->obj_pool(), plan_node, *desc_tbl); + std::unique_ptr> close_scan_node_func( + (int*)0x01, [&](int*) { file_scan_node.close(runtime_state.get()); }); + // TFileFormatType::FORMAT_PROTO, TFileType::FILE_STREAM, set _range.load_id + RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get())); + RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get())); + std::vector params_vector; + params_vector.emplace_back(scan_range_params); + file_scan_node.set_scan_ranges(params_vector); + RETURN_IF_ERROR(file_scan_node.open(runtime_state.get())); + + // 3. Put the block into block queue. + std::unique_ptr _block = + doris::vectorized::Block::create_unique(); + bool eof = false; + bool first = true; + while (!eof) { + // TODO what to do if read one block error + RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), _block.get(), &eof)); + std::shared_ptr future_block = + std::make_shared(); + future_block->swap(*(_block.get())); + future_block->set_info(request->base_schema_version(), load_id, first, eof); + if (load_block_queue == nullptr) { + RETURN_IF_ERROR(_get_first_block_load_queue(request->db_id(), table_id, + future_block, load_block_queue)); + response->set_label(load_block_queue->label); + response->set_txn_id(load_block_queue->txn_id); + } + // TODO what to do if add one block error + RETURN_IF_ERROR(load_block_queue->add_block(future_block)); + if (future_block->rows() > 0) { + future_blocks.emplace_back(future_block); + } + first = false; + } + if (!runtime_state->get_error_log_file_path().empty()) { + LOG(INFO) << "id=" << print_id(load_id) + << ", url=" << runtime_state->get_error_log_file_path() + << ", load rows=" << runtime_state->num_rows_load_total() + << ", filter rows=" << runtime_state->num_rows_load_filtered() + << ", unselect rows=" << runtime_state->num_rows_load_unselected() + << ", success rows=" << runtime_state->num_rows_load_success(); + } + } + int64_t total_rows = 0; + int64_t loaded_rows = 0; + // 4. wait to wal + for (const auto& future_block : future_blocks) { + std::unique_lock l(*(future_block->lock)); + if (!future_block->is_handled()) { + future_block->cv->wait(l); + } + // future_block->get_status() + total_rows += future_block->get_total_rows(); + loaded_rows += future_block->get_loaded_rows(); + } + response->set_loaded_rows(loaded_rows); + response->set_filtered_rows(total_rows - loaded_rows); + return Status::OK(); +} + +Status GroupCommitMgr::_append_row(std::shared_ptr pipe, + const PGroupCommitInsertRequest* request) { + for (int i = 0; i < request->data().size(); ++i) { + std::unique_ptr row(new PDataRow()); + row->CopyFrom(request->data(i)); + // TODO append may error when pipe is cancelled + RETURN_IF_ERROR(pipe->append(std::move(row))); + } + pipe->finish(); + return Status::OK(); +} + +Status GroupCommitMgr::_get_first_block_load_queue( + int64_t db_id, int64_t table_id, std::shared_ptr block, + std::shared_ptr& load_block_queue) { + std::shared_ptr group_commit_table; + { + std::lock_guard wlock(_lock); + if (_table_map.find(table_id) == _table_map.end()) { + _table_map.emplace(table_id, + std::make_shared(_exec_env, db_id, table_id)); + } + group_commit_table = _table_map[table_id]; + } + return group_commit_table->get_first_block_load_queue(table_id, block, load_block_queue); +} + +Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, + std::shared_ptr& load_block_queue) { + std::shared_ptr group_commit_table; + { + std::lock_guard l(_lock); + auto it = _table_map.find(table_id); + if (it == _table_map.end()) { + return Status::NotFound("table_id: " + std::to_string(table_id) + + ", instance_id: " + print_id(instance_id) + " dose not exist"); + } + group_commit_table = it->second; + } + return group_commit_table->get_load_block_queue(instance_id, load_block_queue); +} +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h new file mode 100644 index 0000000000..0d5797ed96 --- /dev/null +++ b/be/src/runtime/group_commit_mgr.h @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "io/fs/stream_load_pipe.h" +#include "util/lock.h" +#include "util/threadpool.h" +#include "util/thrift_util.h" +#include "vec/core/block.h" +#include "vec/core/future_block.h" + +namespace doris { +class ExecEnv; +class TPlan; +class TDescriptorTable; +class TUniqueId; +class TExecPlanFragmentParams; +class ObjectPool; +class RuntimeState; +class StreamLoadPipe; + +class LoadBlockQueue { +public: + LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id, + int64_t schema_version) + : load_instance_id(load_instance_id), + label(label), + txn_id(txn_id), + schema_version(schema_version), + _start_time(std::chrono::steady_clock::now()) { + _mutex = std::make_shared(); + _cv = std::make_shared(); + }; + + Status add_block(std::shared_ptr block); + Status get_block(vectorized::Block* block, bool* find_block, bool* eos); + void remove_load_id(const UniqueId& load_id); + void cancel(const Status& st); + + UniqueId load_instance_id; + std::string label; + int64_t txn_id; + int64_t schema_version; + bool need_commit = false; + +private: + std::chrono::steady_clock::time_point _start_time; + + std::shared_ptr _mutex; + std::shared_ptr _cv; + // the set of load ids of all blocks in this queue + std::set _load_ids; + std::list> _block_queue; + + Status _status = Status::OK(); +}; + +class GroupCommitTable { +public: + GroupCommitTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {}; + Status get_first_block_load_queue(int64_t table_id, + std::shared_ptr block, + std::shared_ptr& load_block_queue); + Status get_load_block_queue(const TUniqueId& instance_id, + std::shared_ptr& load_block_queue); + +private: + Status _create_group_commit_load(int64_t table_id, + std::shared_ptr& load_block_queue); + Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, + int64_t txn_id, bool is_pipeline, + const TExecPlanFragmentParams& params, + const TPipelineFragmentParams& pipeline_params); + Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const std::string& label, + int64_t txn_id, const TUniqueId& instance_id, Status& status, + bool prepare_failed, RuntimeState* state); + + ExecEnv* _exec_env; + int64_t _db_id; + int64_t _table_id; + doris::Mutex _lock; + // fragment_instance_id to load_block_queue + std::unordered_map> _load_block_queues; + + doris::Mutex _request_fragment_mutex; +}; + +class GroupCommitMgr { +public: + GroupCommitMgr(ExecEnv* exec_env); + virtual ~GroupCommitMgr(); + + // insert into + Status group_commit_insert(int64_t table_id, const TPlan& plan, + const TDescriptorTable& desc_tbl, + const TScanRangeParams& scan_range_params, + const PGroupCommitInsertRequest* request, + PGroupCommitInsertResponse* response); + + // used when init group_commit_scan_node + Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, + std::shared_ptr& load_block_queue); + +private: + // used by insert into + Status _append_row(std::shared_ptr pipe, + const PGroupCommitInsertRequest* request); + Status _get_first_block_load_queue(int64_t db_id, int64_t table_id, + std::shared_ptr block, + std::shared_ptr& load_block_queue); + + ExecEnv* _exec_env; + + doris::Mutex _lock; + // TODO remove table when unused + std::unordered_map> _table_map; + + // thread pool to handle insert into: append data to pipe + std::unique_ptr _insert_into_thread_pool; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 750ff70833..d351ff9ca0 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -62,6 +62,7 @@ #include "util/time.h" #include "util/uid_util.h" #include "vec/core/block.h" +#include "vec/core/future_block.h" #include "vec/exec/scan/new_es_scan_node.h" #include "vec/exec/scan/new_file_scan_node.h" #include "vec/exec/scan/new_jdbc_scan_node.h" @@ -119,12 +120,12 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { } _span = tracer->StartSpan("Plan_fragment_executor"); OpentelemetryScope scope {_span}; - if (request.__isset.query_options) { _timeout_second = request.query_options.execution_timeout; } const TPlanFragmentExecParams& params = request.params; + _group_commit = params.group_commit; LOG_INFO("PlanFragmentExecutor::prepare") .tag("query_id", print_id(_query_ctx->query_id())) .tag("instance_id", print_id(params.fragment_instance_id)) @@ -316,20 +317,33 @@ Status PlanFragmentExecutor::open_vectorized_internal() { return Status::OK(); } RETURN_IF_ERROR(_sink->open(runtime_state())); - doris::vectorized::Block block; + std::unique_ptr block = + _group_commit ? doris::vectorized::FutureBlock::create_unique() + : doris::vectorized::Block::create_unique(); bool eos = false; while (!eos) { RETURN_IF_CANCELLED(_runtime_state); - RETURN_IF_ERROR(get_vectorized_internal(&block, &eos)); + RETURN_IF_ERROR(get_vectorized_internal(block.get(), &eos)); // Collect this plan and sub plan statistics, and send to parent plan. if (_collect_query_statistics_with_every_batch) { _collect_query_statistics(); } - if (!eos || block.rows() > 0) { - auto st = _sink->send(runtime_state(), &block); + if (!eos || block->rows() > 0) { + auto st = _sink->send(runtime_state(), block.get()); + if (UNLIKELY(!st.ok() || block->rows() == 0)) { + // Used for group commit insert + if (_group_commit) { + auto* future_block = dynamic_cast(block.get()); + std::unique_lock l(*(future_block->lock)); + if (!future_block->is_handled()) { + future_block->set_result(st, 0, 0); + future_block->cv->notify_all(); + } + } + } if (st.is()) { break; } diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 47cd1ff9a8..3bfcbe005a 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -240,6 +240,8 @@ private: OpentelemetrySpan _span; + bool _group_commit = false; + ObjectPool* obj_pool() { return _runtime_state->obj_pool(); } // typedef for TPlanFragmentExecParams.per_node_scan_ranges diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 1b534a0261..9b923aec7d 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -86,6 +86,7 @@ #include "runtime/exec_env.h" #include "runtime/fold_constant_executor.h" #include "runtime/fragment_mgr.h" +#include "runtime/group_commit_mgr.h" #include "runtime/load_channel_mgr.h" #include "runtime/load_stream_mgr.h" #include "runtime/result_buffer_mgr.h" @@ -1744,4 +1745,61 @@ void PInternalServiceImpl::glob(google::protobuf::RpcController* controller, } } +void PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController* controller, + const PGroupCommitInsertRequest* request, + PGroupCommitInsertResponse* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto table_id = request->table_id(); + Status st = Status::OK(); + TPlan plan; + { + auto& plan_node = request->plan_node(); + const uint8_t* buf = (const uint8_t*)plan_node.data(); + uint32_t len = plan_node.size(); + st = deserialize_thrift_msg(buf, &len, false, &plan); + if (UNLIKELY(!st.ok())) { + LOG(WARNING) << "deserialize plan failed, msg=" << st; + response->mutable_status()->set_status_code(st.code()); + response->mutable_status()->set_error_msgs(0, st.to_string()); + return; + } + } + TDescriptorTable tdesc_tbl; + { + auto& desc_tbl = request->desc_tbl(); + const uint8_t* buf = (const uint8_t*)desc_tbl.data(); + uint32_t len = desc_tbl.size(); + st = deserialize_thrift_msg(buf, &len, false, &tdesc_tbl); + if (UNLIKELY(!st.ok())) { + LOG(WARNING) << "deserialize desc tbl failed, msg=" << st; + response->mutable_status()->set_status_code(st.code()); + response->mutable_status()->set_error_msgs(0, st.to_string()); + return; + } + } + TScanRangeParams tscan_range_params; + { + auto& bytes = request->scan_range_params(); + const uint8_t* buf = (const uint8_t*)bytes.data(); + uint32_t len = bytes.size(); + st = deserialize_thrift_msg(buf, &len, false, &tscan_range_params); + if (UNLIKELY(!st.ok())) { + LOG(WARNING) << "deserialize scan range failed, msg=" << st; + response->mutable_status()->set_status_code(st.code()); + response->mutable_status()->set_error_msgs(0, st.to_string()); + return; + } + } + st = _exec_env->group_commit_mgr()->group_commit_insert( + table_id, plan, tdesc_tbl, tscan_range_params, request, response); + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + return; + } +}; + } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 7d28d23761..e7a5914274 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -194,6 +194,11 @@ public: void glob(google::protobuf::RpcController* controller, const PGlobRequest* request, PGlobResponse* response, google::protobuf::Closure* done) override; + void group_commit_insert(google::protobuf::RpcController* controller, + const PGroupCommitInsertRequest* request, + PGroupCommitInsertResponse* response, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index cad45ac237..9bc455f054 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -89,6 +89,12 @@ public: Block(const std::vector& slots, size_t block_size, bool ignore_trivial_slot = false); + virtual ~Block() = default; + Block(const Block& block) = default; + Block& operator=(const Block& p) = default; + Block(Block&& block) = default; + Block& operator=(Block&& other) = default; + void reserve(size_t count); // Make sure the nammes is useless when use block void clear_names(); diff --git a/be/src/vec/core/future_block.cpp b/be/src/vec/core/future_block.cpp new file mode 100644 index 0000000000..3f3f59c344 --- /dev/null +++ b/be/src/vec/core/future_block.cpp @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/core/future_block.h" + +#include + +namespace doris::vectorized { + +void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id, bool first, bool eos) { + this->_schema_version = schema_version; + this->_load_id = load_id; + this->_first = first; + this->_eos = eos; +} + +void FutureBlock::set_result(Status status, int64_t total_rows, int64_t loaded_rows) { + auto result = std::make_tuple(true, status, total_rows, loaded_rows); + result.swap(*_result); +} + +void FutureBlock::swap_future_block(std::shared_ptr other) { + Block::swap(*other.get()); + set_info(other->_schema_version, other->_load_id, other->_first, other->_eos); + lock = other->lock; + cv = other->cv; + _result = other->_result; +} + +} // namespace doris::vectorized diff --git a/be/src/vec/core/future_block.h b/be/src/vec/core/future_block.h new file mode 100644 index 0000000000..ca63fa3799 --- /dev/null +++ b/be/src/vec/core/future_block.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "block.h" +#include "util/lock.h" + +namespace doris { + +namespace vectorized { + +class FutureBlock : public Block { + ENABLE_FACTORY_CREATOR(FutureBlock); + +public: + FutureBlock() : Block() {}; + void swap_future_block(std::shared_ptr other); + void set_info(int64_t block_schema_version, const TUniqueId& load_id, bool first, + bool block_eos); + int64_t get_schema_version() { return _schema_version; } + TUniqueId get_load_id() { return _load_id; } + bool is_first() { return _first; } + bool is_eos() { return _eos; } + + // hold lock before call this function + void set_result(Status status, int64_t total_rows = 0, int64_t loaded_rows = 0); + bool is_handled() { return std::get<0>(*(_result)); } + Status get_status() { return std::get<1>(*(_result)); } + int64_t get_total_rows() { return std::get<2>(*(_result)); } + int64_t get_loaded_rows() { return std::get<3>(*(_result)); } + + std::shared_ptr lock = std::make_shared(); + std::shared_ptr cv = std::make_shared(); + +private: + int64_t _schema_version; + TUniqueId _load_id; + bool _first = false; + bool _eos = false; + + std::shared_ptr> _result = + std::make_shared>(false, Status::OK(), 0, 0); +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/scan/group_commit_scan_node.cpp b/be/src/vec/exec/scan/group_commit_scan_node.cpp new file mode 100644 index 0000000000..d5bc456577 --- /dev/null +++ b/be/src/vec/exec/scan/group_commit_scan_node.cpp @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/scan/group_commit_scan_node.h" + +#include "runtime/group_commit_mgr.h" +#include "vec/columns/column_const.h" +#include "vec/exec/scan/new_olap_scanner.h" +#include "vec/exec/scan/vfile_scanner.h" +#include "vec/functions/in.h" + +namespace doris::vectorized { + +GroupCommitScanNode::GroupCommitScanNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : VScanNode(pool, tnode, descs) { + _output_tuple_id = tnode.file_scan_node.tuple_id; + _table_id = tnode.group_commit_scan_node.table_id; +} + +Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + bool find_node = false; + while (!find_node && !*eos) { + RETURN_IF_ERROR(load_block_queue->get_block(block, &find_node, eos)); + } + return Status::OK(); +} + +Status GroupCommitScanNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(VScanNode::init(tnode, state)); + return state->exec_env()->group_commit_mgr()->get_load_block_queue( + _table_id, state->fragment_instance_id(), load_block_queue); +} + +Status GroupCommitScanNode::prepare(RuntimeState* state) { + return VScanNode::prepare(state); +} + +void GroupCommitScanNode::set_scan_ranges(const std::vector& scan_ranges) {} + +Status GroupCommitScanNode::_init_profile() { + return VScanNode::_init_profile(); +} + +Status GroupCommitScanNode::_process_conjuncts() { + RETURN_IF_ERROR(VScanNode::_process_conjuncts()); + if (_eos) { + return Status::OK(); + } + // TODO: Push conjuncts down to reader. + return Status::OK(); +} + +std::string GroupCommitScanNode::get_name() { + return fmt::format("GROUP_COMMIT_SCAN_NODE({0})", _table_id); +} + +}; // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/group_commit_scan_node.h b/be/src/vec/exec/scan/group_commit_scan_node.h new file mode 100644 index 0000000000..86c504d581 --- /dev/null +++ b/be/src/vec/exec/scan/group_commit_scan_node.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/group_commit_mgr.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/scan/vscan_node.h" + +namespace doris::vectorized { + +class GroupCommitScanNode : public VScanNode { +public: + GroupCommitScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + + Status init(const TPlanNode& tnode, RuntimeState* state) override; + + Status prepare(RuntimeState* state) override; + + void set_scan_ranges(const std::vector& scan_ranges) override; + + std::string get_name() override; + + Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; + +protected: + Status _init_profile() override; + Status _process_conjuncts() override; + +private: + int64_t _table_id; + std::shared_ptr load_block_queue; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 626b7eb5c2..399a07cb77 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -123,7 +123,9 @@ Status ScannerContext::init() { #ifndef BE_TEST // 3. get thread token - thread_token = _state->get_query_ctx()->get_token(); + if (_state->get_query_ctx()) { + thread_token = _state->get_query_ctx()->get_token(); + } #endif // 4. This ctx will be submitted to the scanner scheduler right after init. diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index af7e8334ab..471e5bc349 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -164,7 +164,7 @@ public: // the unique id of this context std::string ctx_id; int32_t queue_idx = -1; - ThreadPoolToken* thread_token; + ThreadPoolToken* thread_token = nullptr; std::vector _btids; private: diff --git a/be/src/vec/sink/group_commit_vtablet_sink.cpp b/be/src/vec/sink/group_commit_vtablet_sink.cpp new file mode 100644 index 0000000000..3bf51993fd --- /dev/null +++ b/be/src/vec/sink/group_commit_vtablet_sink.cpp @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/group_commit_vtablet_sink.h" + +#include "vec/core/future_block.h" + +namespace doris { + +namespace stream_load { + +GroupCommitVOlapTableSink::GroupCommitVOlapTableSink(ObjectPool* pool, + const RowDescriptor& row_desc, + const std::vector& texprs, + Status* status) + : VOlapTableSink(pool, row_desc, texprs, status) {} + +void GroupCommitVOlapTableSink::handle_block(vectorized::Block* input_block, int64_t rows, + int64_t filter_rows) { + auto* future_block = dynamic_cast(input_block); + std::unique_lock l(*(future_block->lock)); + future_block->set_result(Status::OK(), rows, rows - filter_rows); + future_block->cv->notify_all(); +} +} // namespace stream_load +} // namespace doris diff --git a/be/src/vec/sink/group_commit_vtablet_sink.h b/be/src/vec/sink/group_commit_vtablet_sink.h new file mode 100644 index 0000000000..1c9e64355b --- /dev/null +++ b/be/src/vec/sink/group_commit_vtablet_sink.h @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include "vtablet_sink.h" + +namespace doris { + +namespace stream_load { + +class GroupCommitVOlapTableSink : public VOlapTableSink { +public: + GroupCommitVOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, + const std::vector& texprs, Status* status); + + void handle_block(vectorized::Block* input_block, int64_t rows, int64_t filter_rows) override; +}; + +} // namespace stream_load +} // namespace doris diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 09bcdc6e25..6c4799fad1 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -88,6 +88,7 @@ #include "vec/common/string_ref.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/core/future_block.h" #include "vec/core/types.h" #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_nullable.h" @@ -1317,6 +1318,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, std::shared_ptr block; bool has_filtered_rows = false; + int64_t filtered_rows = + _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows(); RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( state, input_block, block, _output_vexpr_ctxs, rows, eos, has_filtered_rows)); @@ -1372,6 +1375,9 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, block.get(), filter_col, block->columns())); } } + handle_block(input_block, rows, + _block_convertor->num_filtered_rows() + _tablet_finder->num_filtered_rows() - + filtered_rows); // Add block to node channel for (size_t i = 0; i < _channels.size(); i++) { for (const auto& entry : channel_to_payload[i]) { diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index dc408d392b..3f0ad7ed89 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -386,7 +386,7 @@ protected: // When OlapTableSink::open() called, there will be a consumer thread running in the background. // When you call VOlapTableSink::send(), you will be the producer who products pending batches. // Join the consumer thread in close(). -class VOlapTableSink final : public DataSink { +class VOlapTableSink : public DataSink { public: // Construct from thrift struct which is generated by FE. VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, @@ -413,6 +413,9 @@ public: // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the producer void _send_batch_process(); + // handle block after data is filtered, only useful for GroupCommitVOlapTabletSink + virtual void handle_block(vectorized::Block* input_block, int64_t rows, int64_t filter_rows) {} + private: friend class VNodeChannel; friend class IndexChannel; diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 78573f6e15..ede6dd7ca6 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4786,6 +4786,11 @@ insert_stmt ::= :} // TODO(zc) add default value for SQL-2003 // | KW_INSERT KW_INTO insert_target:target KW_DEFAULT KW_VALUES + | /* used for group commit */ + KW_INSERT KW_INTO INTEGER_LITERAL:table_id opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source + {: + RESULT = new NativeInsertStmt(table_id, label, cols, source, hints); + :} ; insert_target ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index e8f4a6692d..8c24c09819 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -47,12 +47,25 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.ExportSink; +import org.apache.doris.planner.GroupCommitOlapTableSink; import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.planner.external.jdbc.JdbcTableSink; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.tablefunction.GroupCommitTableValuedFunction; +import org.apache.doris.task.StreamLoadTask; +import org.apache.doris.thrift.TExecPlanFragmentParams; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TMergeType; +import org.apache.doris.thrift.TPlan; +import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TQueryOptions; +import org.apache.doris.thrift.TScanRangeParams; +import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; @@ -64,11 +77,15 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.protobuf.ByteString; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -138,6 +155,17 @@ public class NativeInsertStmt extends InsertStmt { private HashSet partialUpdateCols = new HashSet(); + // Used for group commit insert + private boolean isGroupCommit = false; + private int baseSchemaVersion = -1; + private TUniqueId loadId = null; + private ByteString planBytes = null; + private ByteString tableBytes = null; + private ByteString rangeBytes = null; + private long tableId = -1; + // true if be generates an insert from group commit tvf stmt and executes to load data + public boolean isInnerGroupCommit = false; + public NativeInsertStmt(InsertTarget target, String label, List cols, InsertSource source, List hints) { super(new LabelName(null, label), null, null); @@ -151,6 +179,12 @@ public class NativeInsertStmt extends InsertStmt { && ((SelectStmt) queryStmt).getTableRefs().isEmpty()); } + public NativeInsertStmt(long tableId, String label, List cols, InsertSource source, + List hints) { + this(new InsertTarget(new TableName(null, null, null), null), label, cols, source, hints); + this.tableId = tableId; + } + // Ctor for CreateTableAsSelectStmt and InsertOverwriteTableStmt public NativeInsertStmt(TableName name, PartitionNames targetPartitionNames, LabelName label, QueryStmt queryStmt, List planHints, List targetColumnNames) { @@ -201,6 +235,19 @@ public class NativeInsertStmt extends InsertStmt { public void getTables(Analyzer analyzer, Map tableMap, Set parentViewNameSet) throws AnalysisException { + if (tableId != -1) { + TableIf table = Env.getCurrentInternalCatalog().getTableByTableId(tableId); + Preconditions.checkState(table instanceof OlapTable); + OlapTable olapTable = (OlapTable) table; + tblName.setDb(olapTable.getDatabase().getFullName()); + tblName.setTbl(olapTable.getName()); + if (olapTable.getDeleteSignColumn() != null) { + List columns = olapTable.getBaseSchema(false); + columns.add(olapTable.getDeleteSignColumn()); + targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList()); + } + } + // get dbs of statement queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet); tblName.analyze(analyzer); @@ -295,8 +342,14 @@ public class NativeInsertStmt extends InsertStmt { // set target table and analyzeTargetTable(analyzer); + db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); - analyzeSubquery(analyzer); + analyzeGroupCommit(); + if (isGroupCommit()) { + return; + } + + analyzeSubquery(analyzer, false); analyzePlanHints(); @@ -307,7 +360,6 @@ public class NativeInsertStmt extends InsertStmt { // create data sink createDataSink(); - db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); // create label and begin transaction long timeoutSecond = ConnectContext.get().getExecTimeout(); if (label == null || Strings.isNullOrEmpty(label.getLabelName())) { @@ -431,7 +483,7 @@ public class NativeInsertStmt extends InsertStmt { } } - private void analyzeSubquery(Analyzer analyzer) throws UserException { + private void analyzeSubquery(Analyzer analyzer, boolean skipCheck) throws UserException { // Analyze columns mentioned in the statement. Set mentionedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); List realTargetColumnNames; @@ -573,7 +625,7 @@ public class NativeInsertStmt extends InsertStmt { // INSERT INTO VALUES(...) List> rows = selectStmt.getValueList().getRows(); for (int rowIdx = 0; rowIdx < rows.size(); ++rowIdx) { - analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex); + analyzeRow(analyzer, targetColumns, rows, rowIdx, origColIdxsForExtendCols, slotToIndex, skipCheck); } // clear these 2 structures, rebuild them using VALUES exprs @@ -591,7 +643,7 @@ public class NativeInsertStmt extends InsertStmt { // `selectStmt.getResultExprs().clear();` will clear the `rows` too, causing // error. rows.add(Lists.newArrayList(selectStmt.getResultExprs())); - analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex); + analyzeRow(analyzer, targetColumns, rows, 0, origColIdxsForExtendCols, slotToIndex, skipCheck); // rows may be changed in analyzeRow(), so rebuild the result exprs selectStmt.getResultExprs().clear(); for (Expr expr : rows.get(0)) { @@ -670,14 +722,17 @@ public class NativeInsertStmt extends InsertStmt { } private void analyzeRow(Analyzer analyzer, List targetColumns, List> rows, - int rowIdx, List> origColIdxsForExtendCols, Map slotToIndex) - throws AnalysisException { + int rowIdx, List> origColIdxsForExtendCols, Map slotToIndex, + boolean skipCheck) throws AnalysisException { // 1. check number of fields if equal with first row // targetColumns contains some shadow columns, which is added by system, // so we should minus this if (rows.get(rowIdx).size() != targetColumns.size() - origColIdxsForExtendCols.size()) { throw new AnalysisException("Column count doesn't match value count at row " + (rowIdx + 1)); } + if (skipCheck) { + return; + } ArrayList row = rows.get(rowIdx); if (!origColIdxsForExtendCols.isEmpty()) { @@ -814,9 +869,12 @@ public class NativeInsertStmt extends InsertStmt { return dataSink; } if (targetTable instanceof OlapTable) { - dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, - analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); - OlapTableSink sink = (OlapTableSink) dataSink; + checkInnerGroupCommit(); + OlapTableSink sink = isInnerGroupCommit ? new GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple, + targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()) + : new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, + analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + dataSink = sink; sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols); dataPartition = dataSink.getOutputPartition(); } else if (targetTable instanceof BrokerTable) { @@ -847,6 +905,16 @@ public class NativeInsertStmt extends InsertStmt { return dataSink; } + private void checkInnerGroupCommit() { + List tableRefs = new ArrayList<>(); + queryStmt.collectTableRefs(tableRefs); + if (tableRefs.size() == 1 && tableRefs.get(0) instanceof TableValuedFunctionRef + && ((TableValuedFunctionRef) tableRefs.get( + 0)).getTableFunction() instanceof GroupCommitTableValuedFunction) { + isInnerGroupCommit = true; + } + } + public void complete() throws UserException { if (!isExplain() && targetTable instanceof OlapTable) { ((OlapTableSink) dataSink).complete(analyzer); @@ -928,4 +996,82 @@ public class NativeInsertStmt extends InsertStmt { return RedirectStatus.FORWARD_WITH_SYNC; } } + + private void analyzeGroupCommit() { + if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit && targetTable instanceof OlapTable + && !ConnectContext.get().isTxnModel() + && getQueryStmt() instanceof SelectStmt + && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && targetPartitionNames == null) { + isGroupCommit = true; + } + } + + public boolean isGroupCommit() { + return isGroupCommit; + } + + public void planForGroupCommit(TUniqueId queryId) throws UserException, TException { + OlapTable olapTable = (OlapTable) getTargetTable(); + if (planBytes != null && olapTable.getBaseSchemaVersion() == baseSchemaVersion) { + return; + } + if (!targetColumns.isEmpty()) { + Analyzer analyzerTmp = analyzer; + reset(); + this.analyzer = analyzerTmp; + } + analyzeSubquery(analyzer, true); + TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest(); + if (targetColumnNames != null) { + streamLoadPutRequest.setColumns(String.join(",", targetColumnNames)); + } + streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1) + .setTbl(getTbl()) + .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) + .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId); + StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); + StreamLoadPlanner planner = new StreamLoadPlanner((Database) getDbObj(), olapTable, streamLoadTask); + // Will using load id as query id in fragment + TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); + DescriptorTable descTable = planner.getDescTable(); + TPlanFragment fragment = tRequest.getFragment(); + TPlan plan = fragment.getPlan(); + for (Map.Entry> entry : tRequest.params.per_node_scan_ranges.entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } + } + List scanRangeParams = tRequest.params.per_node_scan_ranges.values().stream() + .flatMap(Collection::stream).collect(Collectors.toList()); + Preconditions.checkState(scanRangeParams.size() == 1); + // save plan message to be reused for prepare stmt + loadId = queryId; + planBytes = ByteString.copyFrom(new TSerializer().serialize(plan)); + tableBytes = ByteString.copyFrom(new TSerializer().serialize(descTable.toThrift())); + rangeBytes = ByteString.copyFrom(new TSerializer().serialize(scanRangeParams.get(0))); + baseSchemaVersion = olapTable.getBaseSchemaVersion(); + } + + public TUniqueId getLoadId() { + return loadId; + } + + public ByteString getPlanBytes() { + return planBytes; + } + + public ByteString getTableBytes() { + return tableBytes; + } + + public int getBaseSchemaVersion() { + return baseSchemaVersion; + } + + public ByteString getRangeBytes() { + return rangeBytes; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 082542fb34..9a17070c15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -318,6 +318,16 @@ public class InternalCatalog implements CatalogIf { return null; } + public Table getTableByTableId(Long tableId) { + for (Database db : fullNameToDb.values()) { + Table table = db.getTableNullable(tableId); + if (table != null) { + return table; + } + } + return null; + } + // Use tryLock to avoid potential dead lock private boolean tryLock(boolean mustLock) { while (true) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitOlapTableSink.java new file mode 100644 index 0000000000..5f3455b33a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitOlapTableSink.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.thrift.TDataSinkType; + +import java.util.List; + +public class GroupCommitOlapTableSink extends OlapTableSink { + + public GroupCommitOlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, + boolean singleReplicaLoad) { + super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad); + } + + protected TDataSinkType getDataSinkType() { + return TDataSinkType.GROUP_COMMIT_OLAP_TABLE_SINK; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java new file mode 100644 index 0000000000..a8db418eaf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.external.FileScanNode; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TGroupCommitScanNode; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.collect.Lists; + +import java.util.List; + +public class GroupCommitScanNode extends FileScanNode { + + long tableId; + + public GroupCommitScanNode(PlanNodeId id, TupleDescriptor desc, long tableId) { + super(id, desc, "GROUP_COMMIT_SCAN_NODE", + StatisticalType.GROUP_COMMIT_SCAN_NODE, false); + this.tableId = tableId; + } + + @Override + protected void createScanRangeLocations() throws UserException { + } + + @Override + public void finalize(Analyzer analyzer) throws UserException { + } + + @Override + public List getScanRangeLocations(long maxScanRangeLength) { + return Lists.newArrayList(); + } + + @Override + public int getNumInstances() { + return 1; + } + + @Override + protected void toThrift(TPlanNode planNode) { + planNode.setNodeType(TPlanNodeType.GROUP_COMMIT_SCAN_NODE); + TGroupCommitScanNode scanNode = new TGroupCommitScanNode(); + scanNode.setTableId(tableId); + planNode.setGroupCommitScanNode(scanNode); + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + return "GroupCommitScanNode"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 24e3cf2029..57fc65dba5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -126,7 +126,7 @@ public class OlapTableSink extends DataSink { "if load_to_single_tablet set to true," + " the olap table must be with random distribution"); } tSink.setLoadToSingleTablet(loadToSingleTablet); - tDataSink = new TDataSink(TDataSinkType.OLAP_TABLE_SINK); + tDataSink = new TDataSink(getDataSinkType()); tDataSink.setOlapTableSink(tSink); if (partitionIds == null) { @@ -458,4 +458,7 @@ public class OlapTableSink extends DataSink { return nodesInfo; } + protected TDataSinkType getDataSinkType() { + return TDataSinkType.OLAP_TABLE_SINK; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 63c068e057..50a5f68b8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -577,5 +577,9 @@ public class StreamLoadPlanner { } return null; } + + public DescriptorTable getDescTable() { + return descTable; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index f0d1e16a0d..7c7aa7e2da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -40,6 +40,7 @@ import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; +import org.apache.doris.system.Backend; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionEntry; @@ -173,6 +174,7 @@ public class ConnectContext { private Map resultAttachedInfo; private String workloadGroupName = ""; + private Map insertGroupCommitTableToBeMap = new HashMap<>(); public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { @@ -820,5 +822,12 @@ public class ConnectContext { return this.workloadGroupName; } + public void setInsertGroupCommit(long tableId, Backend backend) { + insertGroupCommitTableToBeMap.put(tableId, backend); + } + + public Backend getInsertGroupCommit(long tableId) { + return insertGroupCommitTableToBeMap.get(tableId); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 88c9d2fd72..2d9303beb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -374,6 +374,7 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_TABLE_ANALYZE_PART_NUM = "external_table_analyze_part_num"; public static final String ENABLE_STRONG_CONSISTENCY = "enable_strong_consistency_read"; + public static final String ENABLE_INSERT_GROUP_COMMIT = "enable_insert_group_commit"; public static final String PARALLEL_SYNC_ANALYZE_TASK_NUM = "parallel_sync_analyze_task_num"; @@ -1144,6 +1145,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward = true) public boolean enableMemtableOnSinkNode = false; + @VariableMgr.VarAttr(name = ENABLE_INSERT_GROUP_COMMIT) + public boolean enableInsertGroupCommit = false; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index be1b8f9bc5..73e1525480 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -58,6 +58,7 @@ import org.apache.doris.analysis.SetOperationStmt; import org.apache.doris.analysis.SetStmt; import org.apache.doris.analysis.SetVar; import org.apache.doris.analysis.ShowStmt; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.StatementBase; @@ -129,6 +130,8 @@ import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; +import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.proto.Types; import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData; import org.apache.doris.qe.QueryState.MysqlStateType; @@ -139,19 +142,23 @@ import org.apache.doris.resource.workloadgroup.QueryQueue; import org.apache.doris.resource.workloadgroup.QueueOfferToken; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; +import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; +import org.apache.doris.system.Backend; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TLoadTxnBeginRequest; import org.apache.doris.thrift.TLoadTxnBeginResult; import org.apache.doris.thrift.TMergeType; +import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TResultBatch; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; @@ -187,6 +194,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -1058,6 +1066,10 @@ public class StmtExecutor { } parsedStmt.analyze(analyzer); if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) { + if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt) parsedStmt).isGroupCommit()) { + LOG.debug("skip generate query plan for group commit insert"); + return; + } ExprRewriter rewriter = analyzer.getExprRewriter(); rewriter.reset(); if (context.getSessionVariable().isEnableFoldConstantByBe()) { @@ -1745,6 +1757,87 @@ public class StmtExecutor { loadedRows = executeForTxn(insertStmt); label = context.getTxnEntry().getLabel(); txnId = context.getTxnEntry().getTxnConf().getTxnId(); + } else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) { + NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt; + Backend backend = context.getInsertGroupCommit(insertStmt.getTargetTable().getId()); + if (backend == null || !backend.isAlive()) { + List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (allBackendIds.isEmpty()) { + throw new DdlException("No alive backend"); + } + Collections.shuffle(allBackendIds); + backend = Env.getCurrentSystemInfo().getBackend(allBackendIds.get(0)); + context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend); + } + int maxRetry = 3; + for (int i = 0; i < maxRetry; i++) { + nativeInsertStmt.planForGroupCommit(context.queryId); + // handle rows + List rows = new ArrayList<>(); + SelectStmt selectStmt = (SelectStmt) insertStmt.getQueryStmt(); + if (selectStmt.getValueList() != null) { + for (List row : selectStmt.getValueList().getRows()) { + InternalService.PDataRow data = getRowStringValue(row); + rows.add(data); + } + } else { + List exprList = new ArrayList<>(); + for (Expr resultExpr : selectStmt.getResultExprs()) { + if (resultExpr instanceof SlotRef) { + exprList.add(((SlotRef) resultExpr).getDesc().getSourceExprs().get(0)); + } else { + exprList.add(resultExpr); + } + } + InternalService.PDataRow data = getRowStringValue(exprList); + rows.add(data); + } + TUniqueId loadId = nativeInsertStmt.getLoadId(); + PGroupCommitInsertRequest request = PGroupCommitInsertRequest.newBuilder() + .setDbId(insertStmt.getTargetTable().getDatabase().getId()) + .setTableId(insertStmt.getTargetTable().getId()) + .setDescTbl(nativeInsertStmt.getTableBytes()) + .setBaseSchemaVersion(nativeInsertStmt.getBaseSchemaVersion()) + .setPlanNode(nativeInsertStmt.getPlanBytes()) + .setScanRangeParams(nativeInsertStmt.getRangeBytes()) + .setLoadId(Types.PUniqueId.newBuilder().setHi(loadId.hi).setLo(loadId.lo) + .build()).addAllData(rows) + .build(); + Future future = BackendServiceProxy.getInstance() + .groupCommitInsert(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + PGroupCommitInsertResponse response = future.get(); + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code == TStatusCode.DATA_QUALITY_ERROR) { + LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, " + + "schema version: {}, retry: {}", insertStmt.getOrigStmt().originStmt, + backend.getId(), response.getStatus(), nativeInsertStmt.getBaseSchemaVersion(), i); + if (i < maxRetry) { + List tables = Lists.newArrayList(insertStmt.getTargetTable()); + MetaLockUtils.readLockTables(tables); + try { + insertStmt.reset(); + analyzer = new Analyzer(context.getEnv(), context); + analyzeAndGenerateQueryPlan(context.getSessionVariable().toThrift()); + } finally { + MetaLockUtils.readUnlockTables(tables); + } + continue; + } else { + errMsg = "group commit insert failed. backend id: " + backend.getId() + ", status: " + + response.getStatus(); + } + } else if (code != TStatusCode.OK) { + errMsg = "group commit insert failed. backend id: " + backend.getId() + ", status: " + + response.getStatus(); + ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); + } + label = response.getLabel(); + txnStatus = TransactionStatus.PREPARE; + txnId = response.getTxnId(); + loadedRows = response.getLoadedRows(); + filteredRows = (int) response.getFilteredRows(); + break; + } } else { label = insertStmt.getLabel(); LOG.info("Do insert [{}] with query id: {}", label, DebugUtil.printId(context.queryId())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index d012e757ef..e1f5a2c95b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -151,6 +151,11 @@ public class BackendServiceClient { return stub.glob(request); } + public Future groupCommitInsert( + InternalService.PGroupCommitInsertRequest request) { + return stub.groupCommitInsert(request); + } + public void shutdown() { if (!channel.isShutdown()) { channel.shutdown(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index d9fae1daf9..b30b8e0c5a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -23,6 +23,8 @@ import org.apache.doris.common.util.NetUtils; import org.apache.doris.metric.MetricRepo; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; +import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; +import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.proto.Types; import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TFoldConstantParams; @@ -411,4 +413,15 @@ public class BackendServiceProxy { } } + public Future groupCommitInsert(TNetworkAddress address, + PGroupCommitInsertRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.groupCommitInsert(request); + } catch (Throwable e) { + LOG.warn("failed to group commit insert from address={}:{}", address.getHostname(), + address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index af7a04b45f..c19e299960 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1313,11 +1313,15 @@ public class FrontendServiceImpl implements FrontendService.Iface { } private List queryLoadCommitTables(TLoadTxnCommitRequest request, Database db) throws UserException { + if (request.isSetTableId() && request.getTableId() > 0) { + Table table = Env.getCurrentEnv().getInternalCatalog().getTableByTableId(request.getTableId()); + return Collections.singletonList(table); + } + List tbNames; //check has multi table if (CollectionUtils.isNotEmpty(request.getTbls())) { tbNames = request.getTbls(); - } else { tbNames = Collections.singletonList(request.getTbl()); } @@ -2003,6 +2007,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { result.getParams().setTableName(parsedStmt.getTbl()); // The txn_id here is obtained from the NativeInsertStmt result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id)); + if (parsedStmt.isInnerGroupCommit) { + result.setBaseSchemaVersion(((OlapTable) parsedStmt.getTargetTable()).getBaseSchemaVersion()); + result.getParams().params.setGroupCommit(true); + } } catch (UserException e) { LOG.warn("exec sql error", e); throw new UserException("exec sql error" + e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index d1527df0ec..7fe9b03cbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -55,4 +55,5 @@ public enum StatisticalType { METADATA_SCAN_NODE, JDBC_SCAN_NODE, TEST_EXTERNAL_TABLE, + GROUP_COMMIT_SCAN_NODE } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java new file mode 100644 index 0000000000..3f7902d736 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.planner.GroupCommitScanNode; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.ScanNode; +import org.apache.doris.thrift.TFileType; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * The Implement of table valued function + * group_commit(). + */ +public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunction { + private static final Logger LOG = LogManager.getLogger(GroupCommitTableValuedFunction.class); + public static final String NAME = "group_commit"; + private long tableId = -1; + + public GroupCommitTableValuedFunction(Map params) throws AnalysisException { + tableId = Long.parseLong(params.get("table_id")); + } + + // =========== implement abstract methods of ExternalFileTableValuedFunction ================= + + @Override + public List getTableColumns() throws AnalysisException { + List fileColumns = new ArrayList<>(); + Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId); + List tableColumns = table.getBaseSchema(false); + for (int i = 1; i <= tableColumns.size(); i++) { + fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getDataType(), true)); + } + Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn(); + if (deleteSignColumn != null) { + fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getDataType(), true)); + } + return fileColumns; + } + + @Override + public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) { + return new GroupCommitScanNode(id, desc, tableId); + } + + @Override + public TFileType getTFileType() { + return TFileType.FILE_STREAM; + } + + @Override + public String getFilePath() { + return null; + } + + @Override + public BrokerDesc getBrokerDesc() { + return new BrokerDesc("GroupCommitTvfBroker", StorageType.STREAM, locationProperties); + } + + // =========== implement abstract methods of TableValuedFunctionIf ================= + @Override + public String getTableName() { + return "GroupCommitTableValuedFunction"; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java index 2a7b8d3f4f..ab37053c45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java @@ -66,6 +66,8 @@ public abstract class TableValuedFunctionIf { return new WorkloadGroupsTableValuedFunction(params); case CatalogsTableValuedFunction.NAME: return new CatalogsTableValuedFunction(params); + case GroupCommitTableValuedFunction.NAME: + return new GroupCommitTableValuedFunction(params); default: throw new AnalysisException("Could not find table function " + funcName); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java index bfb34d7a3b..64c3494ba7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java @@ -204,7 +204,7 @@ public class InsertStmtTest { stmt.setTargetTable(targetTable); stmt.setQueryStmt(queryStmt); - Deencapsulation.invoke(stmt, "analyzeSubquery", analyzer); + Deencapsulation.invoke(stmt, "analyzeSubquery", analyzer, false); System.out.println(stmt.getQueryStmt().toSql()); QueryStmt queryStmtSubstitute = stmt.getQueryStmt(); @@ -276,7 +276,7 @@ public class InsertStmtTest { stmt.setTargetTable(targetTable); stmt.setQueryStmt(queryStmt); - Deencapsulation.invoke(stmt, "analyzeSubquery", analyzer); + Deencapsulation.invoke(stmt, "analyzeSubquery", analyzer, false); System.out.println(stmt.getQueryStmt()); QueryStmt queryStmtSubstitue = stmt.getQueryStmt(); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 5a462987d2..0a816b4096 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -691,12 +691,36 @@ message PGlobRequest { } message PGlobResponse { - message PFileInfo { - optional string file = 1; - optional int64 size = 2; - }; + message PFileInfo { + optional string file = 1; + optional int64 size = 2; + }; + required PStatus status = 1; + repeated PFileInfo files = 2; +} + +message PGroupCommitInsertRequest { + optional int64 db_id = 1; + optional int64 table_id = 2; + // Descriptors.TDescriptorTable + optional bytes desc_tbl = 3; + optional int64 base_schema_version = 4; + + // TExecPlanFragmentParams -> TPlanFragment -> PlanNodes.TPlan + optional bytes plan_node = 5; + // TScanRangeParams + optional bytes scan_range_params = 6; + + optional PUniqueId load_id = 7; + repeated PDataRow data = 8; +} + +message PGroupCommitInsertResponse { required PStatus status = 1; - repeated PFileInfo files = 2; + optional string label = 2; + optional int64 txn_id = 3; + optional int64 loaded_rows = 4; + optional int64 filtered_rows = 5; } message POpenStreamSinkRequest { @@ -784,5 +808,6 @@ service PBackendService { rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns (PGetTabletVersionsResponse); rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns (PReportStreamLoadStatusResponse); rpc glob(PGlobRequest) returns (PGlobResponse); + rpc group_commit_insert(PGroupCommitInsertRequest) returns (PGroupCommitInsertResponse); }; diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index df23dfa984..bfa36527f2 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -36,6 +36,7 @@ enum TDataSinkType { RESULT_FILE_SINK, JDBC_TABLE_SINK, MULTI_CAST_DATA_STREAM_SINK, + GROUP_COMMIT_OLAP_TABLE_SINK, } enum TResultSinkType { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index c0ef12dadb..4d2ef60e43 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -635,6 +635,8 @@ struct TStreamLoadPutResult { // valid when status is OK 2: optional PaloInternalService.TExecPlanFragmentParams params 3: optional PaloInternalService.TPipelineFragmentParams pipeline_params + // used for group commit + 4: optional i64 base_schema_version } struct TStreamLoadMultiTablePutResult { @@ -693,6 +695,7 @@ struct TLoadTxnCommitRequest { 13: optional string token 14: optional i64 db_id 15: optional list tbls + 16: optional i64 table_id } struct TLoadTxnCommitResult { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 2f20057840..9c75e46f44 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -312,7 +312,7 @@ struct TPlanFragmentExecParams { 11: optional bool send_query_statistics_with_every_batch // Used to merge and send runtime filter 12: optional TRuntimeFilterParams runtime_filter_params - + 13: optional bool group_commit } // Global query parameters assigned by the coordinator. @@ -652,6 +652,7 @@ struct TPipelineFragmentParams { 28: optional string table_name // scan node id -> scan range params, only for external file scan 29: optional map file_scan_params + 30: optional bool group_commit = false; } struct TPipelineFragmentParamsList { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 5341f4983c..de940e902f 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -58,6 +58,7 @@ enum TPlanNodeType { JDBC_SCAN_NODE, TEST_EXTERNAL_SCAN_NODE, PARTITION_SORT_NODE, + GROUP_COMMIT_SCAN_NODE } // phases of an execution node @@ -1095,6 +1096,10 @@ struct TDataGenScanNode { 2: optional TDataGenFunctionName func_name } +struct TGroupCommitScanNode { + 1: optional i64 table_id; +} + // This is essentially a union of all messages corresponding to subclasses // of PlanNode. struct TPlanNode { @@ -1138,6 +1143,7 @@ struct TPlanNode { 35: optional TOdbcScanNode odbc_scan_node // Runtime filters assigned to this plan node, exist in HashJoinNode and ScanNode 36: optional list runtime_filters + 37: optional TGroupCommitScanNode group_commit_scan_node // Use in vec exec engine 40: optional Exprs.TExpr vconjunct diff --git a/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out b/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out new file mode 100644 index 0000000000..5ee9ddd0be --- /dev/null +++ b/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out @@ -0,0 +1,93 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 a 10 +2 b -1 +3 c -1 +4 \N -1 +5 q 50 +6 \N -1 + +-- !sql -- +1 a 10 +1 a 10 +2 b -1 +2 b -1 +3 c -1 +3 c -1 +4 e1 -1 +5 q 50 +5 q 50 +6 \N -1 +6 \N -1 + +-- !sql -- +1 a \N 10 +1 a \N 10 +1 a \N 10 +2 b \N -1 +2 b \N -1 +2 b \N -1 +3 c \N -1 +3 c \N -1 +3 c \N -1 +4 \N \N -1 +4 e1 \N -1 +5 q \N 50 +5 q \N 50 +5 q \N 50 +6 \N \N -1 +6 \N \N -1 +6 \N \N -1 + +-- !sql -- +2 b \N -1 +6 \N \N -1 + +-- !sql -- +1 a 10 5 +2 b -1 \N +2 b -1 \N +3 c -1 \N +4 \N -1 \N +5 q 50 6 +6 \N -1 \N +6 \N -1 \N + +-- !sql -- +1 a 10 +1 a 10 +2 b -1 +2 b -1 +2 b -1 +3 c -1 +3 c -1 +4 \N -1 +4 \N -1 +5 q 50 +5 q 50 +6 \N -1 +6 \N -1 +6 \N -1 + +-- !sql -- +\N -1 +\N -1 +\N -1 +\N -1 +\N -1 +\N -1 +\N -1 +a 10 +a 10 +a 10 +b -1 +b -1 +b -1 +b -1 +c -1 +c -1 +c -1 +q 50 +q 50 +q 50 + diff --git a/regression-test/data/insert_p0/insert_group_commit_with_prepare_stmt.out b/regression-test/data/insert_p0/insert_group_commit_with_prepare_stmt.out new file mode 100644 index 0000000000..ec1db28572 --- /dev/null +++ b/regression-test/data/insert_p0/insert_group_commit_with_prepare_stmt.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 a 10 +2 NULL 20 +3 c \N +4 d 40 +5 e \N +6 f 40 + +-- !sql -- +2 NULL 20 +3 c \N +4 d 40 +5 e \N +6 f 40 +7 e -1 +8 NULL -1 + +-- !sql -- +1 a 10 +2 NULL 20 +3 c \N +4 d 40 +5 e \N +6 f 40 + +-- !sql -- +1 a -1 +1 a 10 +2 NULL 20 +3 c \N +4 d 40 +5 e \N +6 f 40 +7 e -1 +8 NULL -1 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 464849cb22..9beb8e6213 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -512,6 +512,7 @@ class Suite implements GroovyInterceptable { } PreparedStatement prepareStatement(String sql) { + logger.info("Execute sql: ${sql}".toString()) return JdbcUtils.prepareStatement(context.getConnection(), sql) } @@ -627,5 +628,20 @@ class Suite implements GroovyInterceptable { return (row[4] as String) == "FINISHED" } + + String getServerPrepareJdbcUrl(String jdbcUrl, String database) { + String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + // set server side prepared statement url + return "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + database + "?&useServerPrepStmts=true" + } } diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy new file mode 100644 index 0000000000..7c6c8e3b5b --- /dev/null +++ b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("insert_group_commit_into_duplicate") { + def table = "insert_group_commit_into_duplicate" + + def getRowCount = { expectedRowCount -> + def retry = 0 + while (retry < 10) { + sleep(2000) + def rowCount = sql "select count(*) from ${table}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + def getAlterTableState = { + def retry = 0 + while (true) { + sleep(2000) + def state = sql "show alter table column where tablename = '${table}' order by CreateTime desc " + logger.info("alter table state: ${state}") + if (state.size()> 0 && state[0][9] == "FINISHED") { + return true + } + retry++ + if (retry >= 10) { + return false + } + } + return false + } + + try { + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE TABLE `${table}` ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + PARTITION BY RANGE(id) + ( + FROM (1) TO (100) INTERVAL 10 + ) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ set enable_insert_group_commit = true; """ + + // 1. insert into + def result = sql """ insert into ${table}(name, id) values('c', 3); """ + logger.info("insert result: " + result) + assertEquals(1, result.size()) + assertEquals(1, result[0].size()) + assertEquals(1, result[0][0]) + result = sql """ insert into ${table}(id) values(4); """ + logger.info("insert result: " + result) + result = sql """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """ + logger.info("insert result: " + result) + assertEquals(1, result.size()) + assertEquals(1, result[0].size()) + assertEquals(2, result[0][0]) + result = sql """ insert into ${table}(id, name) values(2, 'b'); """ + logger.info("insert result: " + result) + result = sql """ insert into ${table}(id) select 6; """ + logger.info("insert result: " + result) + + getRowCount(6) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + // 2. insert into and delete + sql """ delete from ${table} where id = 4; """ + sql """ insert into ${table}(name, id) values('c', 3); """ + /*sql """ insert into ${table}(id, name) values(4, 'd1'); """ + sql """ insert into ${table}(id, name) values(4, 'd1'); """ + sql """ delete from ${table} where id = 4; """*/ + sql """ insert into ${table}(id, name) values(4, 'e1'); """ + sql """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """ + sql """ insert into ${table}(id, name) values(2, 'b'); """ + sql """ insert into ${table}(id) select 6; """ + + getRowCount(11) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + // 3. insert into and light schema change: add column + sql """ insert into ${table}(name, id) values('c', 3); """ + sql """ insert into ${table}(id) values(4); """ + sql """ insert into ${table} values (1, 'a', 10),(5, 'q', 50); """ + sql """ alter table ${table} ADD column age int after name; """ + sql """ insert into ${table}(id, name) values(2, 'b'); """ + sql """ insert into ${table}(id) select 6; """ + + assertTrue(getAlterTableState(), "add column should success") + getRowCount(17) + qt_sql """ select * from ${table} order by id, name,score asc; """ + + // 4. insert into and truncate table + /*sql """ insert into ${table}(name, id) values('c', 3); """ + sql """ insert into ${table}(id) values(4); """ + sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """*/ + sql """ truncate table ${table}; """ + sql """ insert into ${table}(id, name) values(2, 'b'); """ + sql """ insert into ${table}(id) select 6; """ + + getRowCount(2) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + // 5. insert into and schema change: modify column order + sql """ insert into ${table}(name, id) values('c', 3); """ + sql """ insert into ${table}(id) values(4); """ + sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """ + // sql """ alter table ${table} order by (id, name, score, age); """ + sql """ insert into ${table}(id, name) values(2, 'b'); """ + sql """ insert into ${table}(id) select 6; """ + + // assertTrue(getAlterTableState(), "modify column order should success") + getRowCount(8) + qt_sql """ select id, name, score, age from ${table} order by id, name, score asc; """ + + // 6. insert into and light schema change: drop column + sql """ insert into ${table}(name, id) values('c', 3); """ + sql """ insert into ${table}(id) values(4); """ + sql """ insert into ${table} values (1, 'a', 5, 10),(5, 'q', 6, 50); """ + sql """ alter table ${table} DROP column age; """ + sql """ insert into ${table}(id, name) values(2, 'b'); """ + sql """ insert into ${table}(id) select 6; """ + + assertTrue(getAlterTableState(), "drop column should success") + getRowCount(14) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + // 7. insert into and add rollup + sql """ insert into ${table}(name, id) values('c', 3); """ + sql """ insert into ${table}(id) values(4); """ + result = sql """ insert into ${table} values (1, 'a', 10),(5, 'q', 50),(101, 'a', 100); """ + logger.info("insert result: " + result) + assertEquals(1, result.size()) + assertEquals(1, result[0].size()) + assertEquals(2, result[0][0]) + // sql """ alter table ${table} ADD ROLLUP r1(name, score); """ + sql """ insert into ${table}(id, name) values(2, 'b'); """ + sql """ insert into ${table}(id) select 6; """ + + getRowCount(20) + qt_sql """ select name, score from ${table} order by name asc; """ + } finally { + // try_sql("DROP TABLE ${table}") + } +} diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy new file mode 100644 index 0000000000..0e148cd37a --- /dev/null +++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.sql.PreparedStatement + +suite("insert_group_commit_with_exception") { + def table = "insert_group_commit_with_exception" + + def getRowCount = { expectedRowCount -> + def retry = 0 + while (retry < 10) { + sleep(2000) + def rowCount = sql "select count(*) from ${table}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + def getAlterTableState = { + def retry = 0 + while (true) { + sleep(2000) + def state = sql "show alter table column where tablename = '${table}' order by CreateTime desc " + logger.info("alter table state: ${state}") + if (state.size()> 0 && state[0][9] == "FINISHED") { + return true + } + retry++ + if (retry >= 10) { + return false + } + } + return false + } + + try { + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE TABLE `${table}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ set enable_insert_group_commit = true; """ + + // insert into without column + try { + def result = sql """ insert into ${table} values(1, 'a', 10, 100) """ + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try { + def result = sql """ insert into ${table} values(2, 'b') """ + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + result = sql """ insert into ${table} values(3, 'c', 30) """ + logger.info("insert result: " + result) + + // insert into with column + result = sql """ insert into ${table}(id, name) values(4, 'd') """ + logger.info("insert result: " + result) + + getRowCount(2) + + try { + result = sql """ insert into ${table}(id, name) values(5, 'd', 50) """ + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try { + result = sql """ insert into ${table}(id, name) values(6) """ + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try { + result = sql """ insert into ${table}(id, names) values(7, 'd') """ + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Unknown column 'names'")) + } + + + // prepare insert + def db = context.config.defaultDb + "_insert_p0" + String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, db) + + try (Connection connection = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword)) { + Statement statement = connection.createStatement(); + statement.execute("use ${db}"); + statement.execute("set enable_insert_group_commit = true;"); + // without column + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?, ?)")) { + ps.setObject(1, 8); + ps.setObject(2, "f"); + ps.setObject(3, 70); + ps.setObject(4, "a"); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?)")) { + ps.setObject(1, 9); + ps.setObject(2, "f"); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { + ps.setObject(1, 10); + ps.setObject(2, "f"); + ps.setObject(3, 90); + ps.addBatch(); + int[] result = ps.executeBatch(); + logger.info("prepare insert result: " + result) + } + + // with columns + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?, ?)")) { + ps.setObject(1, 11); + ps.setObject(2, "f"); + ps.addBatch(); + int[] result = ps.executeBatch(); + logger.info("prepare insert result: " + result) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?, ?, ?)")) { + ps.setObject(1, 12); + ps.setObject(2, "f"); + ps.setObject(3, "f"); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name) values(?)")) { + ps.setObject(1, 13); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, names) values(?, ?)")) { + ps.setObject(1, 12); + ps.setObject(2, "f"); + ps.addBatch(); + int[] result = ps.executeBatch(); + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Unknown column 'names'")) + } + + getRowCount(4) + + // prepare insert with multi rows + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { + for (int i = 0; i < 5; i++) { + ps.setObject(1, 13 + i); + ps.setObject(2, "f"); + ps.setObject(3, 90); + ps.addBatch(); + int[] result = ps.executeBatch(); + logger.info("prepare insert result: " + result) + } + } + getRowCount(9) + + // prepare insert with multi rows + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?),(?, ?, ?)")) { + for (int i = 0; i < 2; i++) { + ps.setObject(1, 18 + i); + ps.setObject(2, "f"); + ps.setObject(3, 90); + ps.setObject(4, 18 + i + 1); + ps.setObject(5, "f"); + ps.setObject(6, 90); + ps.addBatch(); + int[] result = ps.executeBatch(); + logger.info("prepare insert result: " + result) + } + } + getRowCount(13) + + // prepare insert without column names, and do schema change + try (PreparedStatement ps = connection.prepareStatement("insert into ${table} values(?, ?, ?)")) { + ps.setObject(1, 22) + ps.setObject(2, "f") + ps.setObject(3, 90) + ps.addBatch() + int[] result = ps.executeBatch() + logger.info("prepare insert result: " + result) + + sql """ alter table ${table} ADD column age int after name; """ + assertTrue(getAlterTableState(), "add column should success") + + try { + ps.setObject(1, 23) + ps.setObject(2, "f") + ps.setObject(3, 90) + ps.addBatch() + result = ps.executeBatch() + assertTrue(false) + } catch (Exception e) { + assertTrue(e.getMessage().contains("Column count doesn't match value count")) + } + } + getRowCount(14) + + // prepare insert with column names, and do schema change + try (PreparedStatement ps = connection.prepareStatement("insert into ${table}(id, name, score) values(?, ?, ?)")) { + ps.setObject(1, 24) + ps.setObject(2, "f") + ps.setObject(3, 90) + ps.addBatch() + int[] result = ps.executeBatch() + logger.info("prepare insert result: " + result) + + sql """ alter table ${table} DROP column age; """ + assertTrue(getAlterTableState(), "drop column should success") + + ps.setObject(1, 25) + ps.setObject(2, "f") + ps.setObject(3, 90) + ps.addBatch() + result = ps.executeBatch() + logger.info("prepare insert result: " + result) + } + getRowCount(16) + } + } finally { + // try_sql("DROP TABLE ${table}") + } +} diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy new file mode 100644 index 0000000000..e9eb6f29b5 --- /dev/null +++ b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("insert_group_commit_with_large_data") { + def table = "insert_group_commit_with_large_data" + + def getRowCount = { expectedRowCount -> + def retry = 0 + while (retry < 10) { + sleep(2000) + def rowCount = sql "select count(*) from ${table}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + try { + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE TABLE `${table}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ set enable_insert_group_commit = true; """ + + // insert into 5000 rows + def insert_sql = """ insert into ${table} values(1, 'a', 10) """ + for (def i in 2..5000) { + insert_sql += """, (${i}, 'a', 10) """ + } + def result = sql """ ${insert_sql} """ + logger.info("insert result: " + result) + assertEquals(1, result.size()) + assertEquals(1, result[0].size()) + assertEquals(5000, result[0][0]) + getRowCount(5000) + + // data size is large than 4MB, need " set global max_allowed_packet = 5508950 " + /*def name_value = "" + for (def i in 0..1024) { + name_value += 'a' + } + insert_sql = """ insert into ${table} values(1, '${name_value}', 10) """ + for (def i in 2..5000) { + insert_sql += """, (${i}, '${name_value}', 10) """ + } + result = sql """ ${insert_sql} """ + logger.info("insert result: " + result) + assertEquals(1, result.size()) + assertEquals(1, result[0].size()) + assertEquals(5000, result[0][0]) + getRowCount(10000)*/ + } finally { + // try_sql("DROP TABLE ${table}") + } +} diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy new file mode 100644 index 0000000000..441342855b --- /dev/null +++ b/regression-test/suites/insert_p0/insert_group_commit_with_prepare_stmt.groovy @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("insert_group_commit_with_prepare_stmt") { + def user = context.config.jdbcUser + def password = context.config.jdbcPassword + def realDb = "regression_test_insert_p0" + def table = realDb + ".insert_group_commit_with_prepare_stmt" + + sql "CREATE DATABASE IF NOT EXISTS ${realDb}" + + def getRowCount = { expectedRowCount -> + def retry = 0 + while (retry < 10) { + sleep(4000) + def rowCount = sql "select count(*) from ${table}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + // id, name, score + def insert_prepared = { stmt, k1, k2, k3 -> + stmt.setInt(1, k1) + stmt.setString(2, k2) + if (k3 == null) { + stmt.setNull(3, java.sql.Types.INTEGER) + } else { + stmt.setInt(3, k3) + } + stmt.addBatch() + } + + // name, id, delete_sign + def insert_prepared_partial = { stmt, k1, k2, k3 -> + stmt.setObject(1, k1) + stmt.setObject(2, k2) + stmt.setObject(3, k3) + stmt.addBatch() + } + + // name, id + def insert_prepared_partial_dup = { stmt, k1, k2 -> + stmt.setString(1, k1) + stmt.setInt(2, k2) + stmt.addBatch() + } + + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb) + logger.info("url: " + url) + + def result1 = connect(user=user, password=password, url=url) { + try { + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE TABLE ${table} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ set enable_insert_group_commit = true; """ + + // 1. insert into + def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, ?, ?) """ + assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, insert_stmt.class) + + insert_prepared insert_stmt, 1, "a", 10 + def result = insert_stmt.executeBatch() + logger.info("execute result: ${result}") + + insert_prepared insert_stmt, 2, null, 20 + insert_prepared insert_stmt, 3, "c", null + insert_prepared insert_stmt, 4, "d", 40 + result = insert_stmt.executeBatch() + logger.info("execute result: ${result}") + + insert_prepared insert_stmt, 5, "e", null + insert_prepared insert_stmt, 6, "f", 40 + result = insert_stmt.executeBatch() + logger.info("execute result: ${result}") + + getRowCount(6) + qt_sql """ select * from ${table} order by id asc; """ + + // 2. insert into partial columns + insert_stmt = prepareStatement """ INSERT INTO ${table}(name, id, __DORIS_DELETE_SIGN__) VALUES(?, ?, ?) """ + assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, insert_stmt.class) + + insert_prepared_partial insert_stmt, 'a', 1, 1 + result = insert_stmt.executeBatch() + logger.info("execute result: ${result}") + + insert_prepared_partial insert_stmt, 'e', 7, 0 + insert_prepared_partial insert_stmt, null, 8, 0 + result = insert_stmt.executeBatch() + logger.info("execute result: ${result}") + + getRowCount(7) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + } finally { + // try_sql("DROP TABLE ${table}") + } + } + + table = "test_prepared_stmt_duplicate" + result1 = connect(user=user, password=password, url=url) { + try { + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE TABLE ${table} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ set enable_insert_group_commit = true; """ + + // 1. insert into + def insert_stmt = prepareStatement """ INSERT INTO ${table} VALUES(?, ?, ?) """ + assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, insert_stmt.class) + + insert_prepared insert_stmt, 1, "a", 10 + def result = insert_stmt.executeBatch() + logger.info("execute result: " + result) + + insert_prepared insert_stmt, 2, null, 20 + insert_prepared insert_stmt, 3, "c", null + insert_prepared insert_stmt, 4, "d", 40 + result = insert_stmt.executeBatch() + logger.info("execute result: " + result) + + insert_prepared insert_stmt, 5, "e", null + insert_prepared insert_stmt, 6, "f", 40 + result = insert_stmt.executeBatch() + logger.info("execute result: " + result) + + getRowCount(6) + qt_sql """ select * from ${table} order by id asc; """ + + // 2. insert into partial columns + insert_stmt = prepareStatement """ INSERT INTO ${table}(name, id) VALUES(?, ?) """ + assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, insert_stmt.class) + + insert_prepared_partial_dup insert_stmt, 'a', 1 + result = insert_stmt.executeBatch() + logger.info("execute result: " + result) + + insert_prepared_partial_dup insert_stmt, 'e', 7 + insert_prepared_partial_dup insert_stmt, null, 8 + result = insert_stmt.executeBatch() + logger.info("execute result: " + result) + + getRowCount(9) + qt_sql """ select * from ${table} order by id, name, score asc; """ + + } finally { + // try_sql("DROP TABLE ${table}") + } + } +} diff --git a/regression-test/suites/insert_p0/prepare_insert.groovy b/regression-test/suites/insert_p0/prepare_insert.groovy index 300e245c88..45ac086a5f 100644 --- a/regression-test/suites/insert_p0/prepare_insert.groovy +++ b/regression-test/suites/insert_p0/prepare_insert.groovy @@ -42,23 +42,11 @@ suite("prepare_insert") { """ // Parse url - String jdbcUrl = context.config.jdbcUrl - String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) - def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) - def sql_port - if (urlWithoutSchema.indexOf("/") >= 0) { - // e.g: jdbc:mysql://locahost:8080/?a=b - sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) - } else { - // e.g: jdbc:mysql://locahost:8080 - sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) - } - // set server side prepared statement url - def url = "jdbc:mysql://" + sql_ip + ":" + sql_port + "/" + realDb + "?&useServerPrepStmts=true" + String url = getServerPrepareJdbcUrl(context.config.jdbcUrl, realDb) def result1 = connect(user = user, password = password, url = url) { def stmt = prepareStatement "insert into ${tableName} values(?, ?, ?)" - assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement) + assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, stmt.class) stmt.setInt(1, 1) stmt.setString(2, "a") stmt.setInt(3, 90) @@ -91,7 +79,7 @@ suite("prepare_insert") { def label = "insert_" + System.currentTimeMillis() result1 = connect(user = user, password = password, url = url) { def stmt = prepareStatement "insert into ${tableName} with label ${label} values(?, ?, ?)" - assertEquals(stmt.class, com.mysql.cj.jdbc.ClientPreparedStatement) + assertEquals(com.mysql.cj.jdbc.ClientPreparedStatement, stmt.class) stmt.setInt(1, 5) stmt.setString(2, "a5") stmt.setInt(3, 94) @@ -104,7 +92,7 @@ suite("prepare_insert") { url += "&rewriteBatchedStatements=true" result1 = connect(user = user, password = password, url = url) { def stmt = prepareStatement "insert into ${tableName} values(?, ?, ?)" - assertEquals(stmt.class, com.mysql.cj.jdbc.ServerPreparedStatement) + assertEquals(com.mysql.cj.jdbc.ServerPreparedStatement, stmt.class) stmt.setInt(1, 10) stmt.setString(2, "a") stmt.setInt(3, 90)