From d0cd535cb9a615d837d41dc6c09d02ba4db1565e Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 20 Oct 2023 13:27:30 +0800 Subject: [PATCH] [improvement](insert) refactor group commit stream load (#25560) --- be/src/common/config.cpp | 2 +- be/src/common/config.h | 2 +- be/src/exec/data_sink.cpp | 15 + be/src/http/action/http_stream.cpp | 6 - be/src/http/action/stream_load.cpp | 7 - .../exec/group_commit_block_sink_operator.h | 50 +++ be/src/pipeline/pipeline_fragment_context.cpp | 6 + be/src/runtime/group_commit_mgr.cpp | 297 ++++++------------ be/src/runtime/group_commit_mgr.h | 26 +- .../stream_load/stream_load_executor.cpp | 12 +- be/src/vec/core/future_block.cpp | 6 +- be/src/vec/core/future_block.h | 7 +- be/src/vec/sink/group_commit_block_sink.cpp | 70 ++++- be/src/vec/sink/group_commit_block_sink.h | 18 +- .../doris/analysis/NativeInsertStmt.java | 20 +- .../doris/planner/GroupCommitBlockSink.java | 36 +++ .../apache/doris/planner/OlapTableSink.java | 1 + .../doris/planner/StreamLoadPlanner.java | 23 +- .../doris/service/FrontendServiceImpl.java | 21 +- .../GroupCommitTableValuedFunction.java | 4 +- .../org/apache/doris/task/StreamLoadTask.java | 11 + gensrc/thrift/DataSinks.thrift | 2 + .../insert_group_commit_into_duplicate.out | 3 + .../insert_group_commit_into_duplicate.groovy | 78 ++++- .../test_group_commit_http_stream.groovy | 78 ++--- .../test_group_commit_stream_load.groovy | 75 ++--- 26 files changed, 518 insertions(+), 358 deletions(-) create mode 100644 be/src/pipeline/exec/group_commit_block_sink_operator.h create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1634ea575f..5812176872 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1099,7 +1099,7 @@ DEFINE_Int32(group_commit_sync_wal_batch, "10"); // the count of thread to group commit insert DEFINE_Int32(group_commit_insert_threads, "10"); -DEFINE_mInt32(group_commit_interval_seconds, "10"); +DEFINE_mInt32(group_commit_interval_ms, "10000"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400"); diff --git a/be/src/common/config.h b/be/src/common/config.h index e17e4b5f67..f8350d8731 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1162,7 +1162,7 @@ DECLARE_Int32(group_commit_sync_wal_batch); // This config can be set to limit thread number in group commit insert thread pool. DECLARE_mInt32(group_commit_insert_threads); -DECLARE_mInt32(group_commit_interval_seconds); +DECLARE_mInt32(group_commit_interval_ms); // The configuration item is used to lower the priority of the scanner thread, // typically employed to ensure CPU scheduling for write operations. diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index e160e9ebee..1b9653e8b8 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -31,6 +31,7 @@ #include "common/config.h" #include "vec/sink/async_writer_sink.h" +#include "vec/sink/group_commit_block_sink.h" #include "vec/sink/multi_cast_data_stream_sink.h" #include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vmemory_scratch_sink.h" @@ -163,6 +164,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink RETURN_IF_ERROR(status); break; } + case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { + Status status = Status::OK(); + DCHECK(thrift_sink.__isset.olap_table_sink); + sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status)); + RETURN_IF_ERROR(status); + break; + } case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support in pipeline engine"); } @@ -319,6 +327,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink RETURN_IF_ERROR(status); break; } + case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { + Status status = Status::OK(); + DCHECK(thrift_sink.__isset.olap_table_sink); + sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status)); + RETURN_IF_ERROR(status); + break; + } default: { std::stringstream error_msg; std::map::const_iterator i = diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 2a2ddd8ad4..067f8c5d28 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -324,12 +324,6 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req, ctx->label = ctx->put_result.params.import_label; ctx->put_result.params.__set_wal_id(ctx->wal_id); - if (ctx->group_commit) { - ctx->db_id = ctx->put_result.db_id; - ctx->table_id = ctx->put_result.table_id; - ctx->schema_version = ctx->put_result.base_schema_version; - return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx); - } return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index be4d870323..5554184324 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -601,13 +601,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return Status::OK(); } - if (ctx->group_commit) { - ctx->db_id = ctx->put_result.db_id; - ctx->table_id = ctx->put_result.table_id; - ctx->schema_version = ctx->put_result.base_schema_version; - return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx); - } - return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); } diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h b/be/src/pipeline/exec/group_commit_block_sink_operator.h new file mode 100644 index 0000000000..0cf36818db --- /dev/null +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "operator.h" +#include "vec/sink/group_commit_block_sink.h" + +namespace doris { + +namespace pipeline { + +class GroupCommitBlockSinkOperatorBuilder final + : public DataSinkOperatorBuilder { +public: + GroupCommitBlockSinkOperatorBuilder(int32_t id, DataSink* sink) + : DataSinkOperatorBuilder(id, "GroupCommitBlockSinkOperator", sink) {} + + OperatorPtr build_operator() override; +}; + +class GroupCommitBlockSinkOperator final + : public DataSinkOperator { +public: + GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink) + : DataSinkOperator(operator_builder, sink) {} + + bool can_write() override { return true; } // TODO: need use mem_limit +}; + +OperatorPtr GroupCommitBlockSinkOperatorBuilder::build_operator() { + return std::make_shared(this, _sink); +} + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 22a533c233..6ba3ba941a 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -57,6 +57,7 @@ #include "pipeline/exec/empty_source_operator.h" #include "pipeline/exec/exchange_sink_operator.h" #include "pipeline/exec/exchange_source_operator.h" +#include "pipeline/exec/group_commit_block_sink_operator.h" #include "pipeline/exec/hashjoin_build_sink.h" #include "pipeline/exec/hashjoin_probe_operator.h" #include "pipeline/exec/multi_cast_data_stream_sink.h" @@ -770,6 +771,11 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr } break; } + case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { + sink_ = std::make_shared(next_operator_builder_id(), + _sink.get()); + break; + } case TDataSinkType::MYSQL_TABLE_SINK: case TDataSinkType::JDBC_TABLE_SINK: case TDataSinkType::ODBC_TABLE_SINK: { diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 9c08ccaf8d..3c91915887 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -48,11 +48,6 @@ Status LoadBlockQueue::add_block(std::shared_ptr block) if (block->rows() > 0) { _block_queue.push_back(block); } - if (block->is_eos()) { - _load_ids.erase(block->get_load_id()); - } else if (block->is_first()) { - _load_ids.emplace(block->get_load_id()); - } _cv->notify_one(); return Status::OK(); } @@ -62,31 +57,31 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo *eos = false; std::unique_lock l(*_mutex); if (!need_commit) { - auto left_seconds = config::group_commit_interval_seconds - - std::chrono::duration_cast( - std::chrono::steady_clock::now() - _start_time) - .count(); - if (left_seconds <= 0) { + auto left_milliseconds = config::group_commit_interval_ms - + std::chrono::duration_cast( + std::chrono::steady_clock::now() - _start_time) + .count(); + if (left_milliseconds <= 0) { need_commit = true; } } while (_status.ok() && _block_queue.empty() && (!need_commit || (need_commit && !_load_ids.empty()))) { - auto left_seconds = config::group_commit_interval_seconds; + auto left_milliseconds = config::group_commit_interval_ms; if (!need_commit) { - left_seconds = config::group_commit_interval_seconds - - std::chrono::duration_cast( - std::chrono::steady_clock::now() - _start_time) - .count(); - if (left_seconds <= 0) { + left_milliseconds = config::group_commit_interval_ms - + std::chrono::duration_cast( + std::chrono::steady_clock::now() - _start_time) + .count(); + if (left_milliseconds <= 0) { need_commit = true; break; } } #if !defined(USE_BTHREAD_SCANNER) - _cv->wait_for(l, std::chrono::seconds(left_seconds)); + _cv->wait_for(l, std::chrono::milliseconds(left_milliseconds)); #else - _cv->wait_for(l, left_seconds * 1000000); + _cv->wait_for(l, left_milliseconds * 1000); #endif } if (!_block_queue.empty()) { @@ -96,12 +91,10 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo *find_block = true; _block_queue.pop_front(); } - if (_block_queue.empty()) { - if (need_commit && _load_ids.empty()) { - *eos = true; - } else { - *eos = false; - } + if (_block_queue.empty() && need_commit && _load_ids.empty()) { + *eos = true; + } else { + *eos = false; } return Status::OK(); } @@ -114,6 +107,16 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { } } +Status LoadBlockQueue::add_load_id(const UniqueId& load_id) { + std::unique_lock l(*_mutex); + if (need_commit) { + return Status::InternalError("block queue is set need commit, id=" + + load_instance_id.to_string()); + } + _load_ids.emplace(load_id); + return Status::OK(); +} + void LoadBlockQueue::cancel(const Status& st) { DCHECK(!st.ok()); std::unique_lock l(*_mutex); @@ -133,59 +136,62 @@ Status GroupCommitTable::get_first_block_load_queue( int64_t table_id, std::shared_ptr block, std::shared_ptr& load_block_queue) { DCHECK(table_id == _table_id); - DCHECK(block->is_first() == true); + auto base_schema_version = block->get_schema_version(); { std::unique_lock l(_lock); - for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) { - // TODO if block schema version is less than fragment schema version, return error - if (!it->second->need_commit && - it->second->schema_version == block->get_schema_version()) { - if (block->get_schema_version() == it->second->schema_version) { - load_block_queue = it->second; - break; - } else if (block->get_schema_version() < it->second->schema_version) { - return Status::DataQualityError("schema version not match"); - } - } - } - } - if (load_block_queue == nullptr) { - Status st = Status::OK(); - for (int i = 0; i < 3; ++i) { - std::unique_lock l(_request_fragment_mutex); - // check if there is a re-usefully fragment - { - std::unique_lock l1(_lock); - for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) { - // TODO if block schema version is less than fragment schema version, return error - if (!it->second->need_commit) { - if (block->get_schema_version() == it->second->schema_version) { + for (int i = 0; i < 3; i++) { + bool is_schema_version_match = true; + for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) { + if (!it->second->need_commit) { + if (base_schema_version == it->second->schema_version) { + if (it->second->add_load_id(block->get_load_id()).ok()) { load_block_queue = it->second; - break; - } else if (block->get_schema_version() < it->second->schema_version) { - return Status::DataQualityError("schema version not match"); + return Status::OK(); } + } else if (base_schema_version < it->second->schema_version) { + is_schema_version_match = false; } } } - if (load_block_queue == nullptr) { - st = _create_group_commit_load(table_id, load_block_queue); - if (LIKELY(st.ok())) { - break; + if (!is_schema_version_match) { + return Status::DataQualityError("schema version not match"); + } + if (!_need_plan_fragment) { + _need_plan_fragment = true; + RETURN_IF_ERROR(_thread_pool->submit_func([&] { + [[maybe_unused]] auto st = _create_group_commit_load(load_block_queue); + })); + } +#if !defined(USE_BTHREAD_SCANNER) + _cv.wait_for(l, std::chrono::seconds(4)); +#else + _cv.wait_for(l, 4 * 1000000); +#endif + if (load_block_queue != nullptr) { + if (load_block_queue->schema_version == base_schema_version) { + if (load_block_queue->add_load_id(block->get_load_id()).ok()) { + return Status::OK(); + } + } else if (base_schema_version < load_block_queue->schema_version) { + return Status::DataQualityError("schema version not match"); } + load_block_queue.reset(); } } - RETURN_IF_ERROR(st); - if (load_block_queue->schema_version != block->get_schema_version()) { - // TODO check this is the first block - return Status::DataQualityError("schema version not match"); - } } - return Status::OK(); + return Status::InternalError("can not get a block queue"); } Status GroupCommitTable::_create_group_commit_load( - int64_t table_id, std::shared_ptr& load_block_queue) { + std::shared_ptr& load_block_queue) { + Status st = Status::OK(); + std::unique_ptr> remove_pipe_func((int*)0x01, [&](int*) { + if (!st.ok()) { + std::unique_lock l(_lock); + _need_plan_fragment = false; + _cv.notify_all(); + } + }); TStreamLoadPutRequest request; UniqueId load_id = UniqueId::gen_uid(); TUniqueId tload_id; @@ -194,8 +200,8 @@ Status GroupCommitTable::_create_group_commit_load( std::regex reg("-"); std::string label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_"); std::stringstream ss; - ss << "insert into table_id(" << table_id << ") WITH LABEL " << label - << " select * from group_commit(\"table_id\"=\"" << table_id << "\")"; + ss << "insert into table_id(" << _table_id << ") WITH LABEL " << label + << " select * from group_commit(\"table_id\"=\"" << _table_id << "\")"; request.__set_load_sql(ss.str()); request.__set_loadId(tload_id); request.__set_label(label); @@ -209,13 +215,14 @@ Status GroupCommitTable::_create_group_commit_load( } TStreamLoadPutResult result; TNetworkAddress master_addr = _exec_env->master_info()->network_address; - RETURN_IF_ERROR(ThriftRpcHelper::rpc( + st = ThriftRpcHelper::rpc( master_addr.hostname, master_addr.port, [&result, &request](FrontendServiceConnection& client) { client->streamLoadPut(result, request); }, - 10000L)); - Status st = Status::create(result.status); + 10000L); + RETURN_IF_ERROR(st); + st = Status::create(result.status); if (!st.ok()) { LOG(WARNING) << "create group commit load error, st=" << st.to_string(); } @@ -236,7 +243,7 @@ Status GroupCommitTable::_create_group_commit_load( DCHECK(pipeline_params.local_params.size() == 1); instance_id = pipeline_params.local_params[0].fragment_instance_id; } - VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << table_id + VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << _table_id << ", schema version=" << schema_version << ", label=" << label << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id) << ", is_pipeline=" << is_pipeline; @@ -245,11 +252,13 @@ Status GroupCommitTable::_create_group_commit_load( std::make_shared(instance_id, label, txn_id, schema_version); std::unique_lock l(_lock); _load_block_queues.emplace(instance_id, load_block_queue); + _need_plan_fragment = false; + _cv.notify_all(); } - params.__set_import_label(label); - st = _exec_plan_fragment(_db_id, table_id, label, txn_id, is_pipeline, params, pipeline_params); + st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, + pipeline_params); if (!st.ok()) { - static_cast(_finish_group_commit_load(_db_id, table_id, label, txn_id, instance_id, + static_cast(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, st, true, nullptr)); } return st; @@ -346,6 +355,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ if (state && !(state->get_error_log_file_path().empty())) { ss << ", error_url=" << state->get_error_log_file_path(); } + ss << ", rows=" << state->num_rows_load_success(); LOG(INFO) << ss.str(); return st; } @@ -384,6 +394,10 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) { .set_min_threads(config::group_commit_insert_threads) .set_max_threads(config::group_commit_insert_threads) .build(&_insert_into_thread_pool)); + static_cast(ThreadPoolBuilder("GroupCommitThreadPool") + .set_min_threads(1) + .set_max_threads(config::group_commit_insert_threads) + .build(&_thread_pool)); } GroupCommitMgr::~GroupCommitMgr() { @@ -392,6 +406,7 @@ GroupCommitMgr::~GroupCommitMgr() { void GroupCommitMgr::stop() { _insert_into_thread_pool->shutdown(); + _thread_pool->shutdown(); LOG(INFO) << "GroupCommitMgr is stopped"; } @@ -456,17 +471,16 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan, std::unique_ptr _block = doris::vectorized::Block::create_unique(); bool eof = false; - bool first = true; while (!eof) { // TODO what to do if read one block error RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), _block.get(), &eof)); std::shared_ptr future_block = std::make_shared(); future_block->swap(*(_block.get())); - future_block->set_info(request->base_schema_version(), load_id, first, eof); + future_block->set_info(request->base_schema_version(), load_id); if (load_block_queue == nullptr) { - RETURN_IF_ERROR(_get_first_block_load_queue(request->db_id(), table_id, - future_block, load_block_queue)); + RETURN_IF_ERROR(get_first_block_load_queue(request->db_id(), table_id, future_block, + load_block_queue)); response->set_label(load_block_queue->label); response->set_txn_id(load_block_queue->txn_id); } @@ -475,7 +489,6 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan, future_blocks.emplace_back(future_block); } RETURN_IF_ERROR(load_block_queue->add_block(future_block)); - first = false; } if (!runtime_state->get_error_log_file_path().empty()) { LOG(INFO) << "id=" << print_id(load_id) @@ -515,139 +528,15 @@ Status GroupCommitMgr::_append_row(std::shared_ptr pipe, return Status::OK(); } -Status GroupCommitMgr::group_commit_stream_load(std::shared_ptr ctx) { - return _insert_into_thread_pool->submit_func([ctx, this] { - Status st = _group_commit_stream_load(ctx); - if (!st.ok()) { - ctx->promise.set_value(st); - } - }); -} - -Status GroupCommitMgr::_group_commit_stream_load(std::shared_ptr ctx) { - auto& fragment_params = ctx->put_result.params; - auto& tdesc_tbl = fragment_params.desc_tbl; - DCHECK(fragment_params.params.per_node_scan_ranges.size() == 1); - DCHECK(fragment_params.params.per_node_scan_ranges.begin()->second.size() == 1); - auto& tscan_range_params = fragment_params.params.per_node_scan_ranges.begin()->second.at(0); - auto& nodes = fragment_params.fragment.plan.nodes; - DCHECK(nodes.size() > 0); - auto& plan_node = nodes.at(0); - - std::vector> future_blocks; - { - std::shared_ptr load_block_queue; - // 1. FileScanNode consumes data from the pipe. - std::unique_ptr runtime_state = RuntimeState::create_unique(); - TUniqueId load_id; - load_id.hi = ctx->id.hi; - load_id.lo = ctx->id.lo; - TQueryOptions query_options; - query_options.query_type = TQueryType::LOAD; - TQueryGlobals query_globals; - static_cast(runtime_state->init(load_id, query_options, query_globals, _exec_env)); - runtime_state->set_query_mem_tracker(std::make_shared( - MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", ctx->id.to_string()), -1)); - DescriptorTbl* desc_tbl = nullptr; - RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), tdesc_tbl, &desc_tbl)); - runtime_state->set_desc_tbl(desc_tbl); - auto file_scan_node = - vectorized::NewFileScanNode(runtime_state->obj_pool(), plan_node, *desc_tbl); - Status status = Status::OK(); - auto sink = stream_load::GroupCommitBlockSink( - runtime_state->obj_pool(), file_scan_node.row_desc(), - fragment_params.fragment.output_exprs, &status); - std::unique_ptr> close_scan_node_func((int*)0x01, [&](int*) { - if (load_block_queue != nullptr) { - load_block_queue->remove_load_id(load_id); - } - static_cast(file_scan_node.close(runtime_state.get())); - static_cast(sink.close(runtime_state.get(), status)); - }); - RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get())); - RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get())); - std::vector params_vector; - params_vector.emplace_back(tscan_range_params); - file_scan_node.set_scan_ranges(params_vector); - RETURN_IF_ERROR(file_scan_node.open(runtime_state.get())); - - RETURN_IF_ERROR(status); - RETURN_IF_ERROR(sink.init(fragment_params.fragment.output_sink)); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink.prepare(runtime_state.get())); - RETURN_IF_ERROR(sink.open(runtime_state.get())); - - // 2. Put the block into block queue. - std::unique_ptr _block = - doris::vectorized::Block::create_unique(); - bool first = true; - bool eof = false; - while (!eof) { - // TODO what to do if scan one block error - RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), _block.get(), &eof)); - RETURN_IF_ERROR(sink.send(runtime_state.get(), _block.get())); - std::shared_ptr future_block = - std::make_shared(); - future_block->swap(*(_block.get())); - future_block->set_info(ctx->schema_version, load_id, first, eof); - // TODO what to do if add one block error - if (load_block_queue == nullptr) { - RETURN_IF_ERROR(_get_first_block_load_queue(ctx->db_id, ctx->table_id, future_block, - load_block_queue)); - ctx->label = load_block_queue->label; - ctx->txn_id = load_block_queue->txn_id; - } - if (future_block->rows() > 0) { - future_blocks.emplace_back(future_block); - } - RETURN_IF_ERROR(load_block_queue->add_block(future_block)); - first = false; - } - ctx->number_unselected_rows = runtime_state->num_rows_load_unselected(); - ctx->number_filtered_rows = runtime_state->num_rows_load_filtered(); - ctx->error_url = runtime_state->get_error_log_file_path(); - if (!runtime_state->get_error_log_file_path().empty()) { - LOG(INFO) << "id=" << print_id(load_id) - << ", url=" << runtime_state->get_error_log_file_path() - << ", load rows=" << runtime_state->num_rows_load_total() - << ", filter rows=" << runtime_state->num_rows_load_filtered() - << ", unselect rows=" << runtime_state->num_rows_load_unselected() - << ", success rows=" << runtime_state->num_rows_load_success(); - } - } - - int64_t total_rows = 0; - int64_t loaded_rows = 0; - // 3. wait to wal - for (const auto& future_block : future_blocks) { - std::unique_lock l(*(future_block->lock)); - if (!future_block->is_handled()) { - future_block->cv->wait(l); - } - // future_block->get_status() - total_rows += future_block->get_total_rows(); - loaded_rows += future_block->get_loaded_rows(); - } - ctx->number_total_rows = total_rows + ctx->number_unselected_rows + ctx->number_filtered_rows; - ctx->number_loaded_rows = loaded_rows; - ctx->number_filtered_rows += total_rows - ctx->number_loaded_rows; - ctx->promise.set_value(Status::OK()); - VLOG_DEBUG << "finish read all block of pipe=" << ctx->id.to_string() - << ", total rows=" << ctx->number_total_rows - << ", loaded rows=" << ctx->number_loaded_rows - << ", filtered rows=" << ctx->number_filtered_rows - << ", unselected rows=" << ctx->number_unselected_rows; - return Status::OK(); -} - -Status GroupCommitMgr::_get_first_block_load_queue( +Status GroupCommitMgr::get_first_block_load_queue( int64_t db_id, int64_t table_id, std::shared_ptr block, std::shared_ptr& load_block_queue) { std::shared_ptr group_commit_table; { std::lock_guard wlock(_lock); if (_table_map.find(table_id) == _table_map.end()) { - _table_map.emplace(table_id, - std::make_shared(_exec_env, db_id, table_id)); + _table_map.emplace(table_id, std::make_shared( + _exec_env, _thread_pool.get(), db_id, table_id)); } group_commit_table = _table_map[table_id]; } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 01a0905c40..aa8d05534c 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -53,6 +53,7 @@ public: Status add_block(std::shared_ptr block); Status get_block(vectorized::Block* block, bool* find_block, bool* eos); + Status add_load_id(const UniqueId& load_id); void remove_load_id(const UniqueId& load_id); void cancel(const Status& st); @@ -76,8 +77,9 @@ private: class GroupCommitTable { public: - GroupCommitTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) - : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {}; + GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, int64_t db_id, + int64_t table_id) + : _exec_env(exec_env), _thread_pool(thread_pool), _db_id(db_id), _table_id(table_id) {}; Status get_first_block_load_queue(int64_t table_id, std::shared_ptr block, std::shared_ptr& load_block_queue); @@ -85,8 +87,7 @@ public: std::shared_ptr& load_block_queue); private: - Status _create_group_commit_load(int64_t table_id, - std::shared_ptr& load_block_queue); + Status _create_group_commit_load(std::shared_ptr& load_block_queue); Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label, int64_t txn_id, bool is_pipeline, const TExecPlanFragmentParams& params, @@ -96,13 +97,14 @@ private: bool prepare_failed, RuntimeState* state); ExecEnv* _exec_env; + ThreadPool* _thread_pool; int64_t _db_id; int64_t _table_id; doris::Mutex _lock; + doris::ConditionVariable _cv; // fragment_instance_id to load_block_queue std::unordered_map> _load_block_queues; - - doris::Mutex _request_fragment_mutex; + bool _need_plan_fragment = false; }; class GroupCommitMgr { @@ -119,22 +121,17 @@ public: const PGroupCommitInsertRequest* request, PGroupCommitInsertResponse* response); - // stream load - Status group_commit_stream_load(std::shared_ptr ctx); - // used when init group_commit_scan_node Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id, std::shared_ptr& load_block_queue); + Status get_first_block_load_queue(int64_t db_id, int64_t table_id, + std::shared_ptr block, + std::shared_ptr& load_block_queue); private: // used by insert into Status _append_row(std::shared_ptr pipe, const PGroupCommitInsertRequest* request); - // used by stream load - Status _group_commit_stream_load(std::shared_ptr ctx); - Status _get_first_block_load_queue(int64_t db_id, int64_t table_id, - std::shared_ptr block, - std::shared_ptr& load_block_queue); ExecEnv* _exec_env; @@ -144,6 +141,7 @@ private: // thread pool to handle insert into: append data to pipe std::unique_ptr _insert_into_thread_pool; + std::unique_ptr _thread_pool; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index b85d72b3b2..32e4d76dc7 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -74,6 +74,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrput_result.__isset.params) { st = _exec_env->fragment_mgr()->exec_plan_fragment( ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) { + if (ctx->group_commit) { + ctx->label = state->import_label(); + ctx->txn_id = state->wal_id(); + } ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); ctx->commit_infos = std::move(state->tablet_commit_infos()); if (status->ok()) { @@ -84,7 +88,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrnumber_total_rows - ctx->number_unselected_rows; - if (num_selected_rows > 0 && + if (!ctx->group_commit && num_selected_rows > 0 && (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { // NOTE: Do not modify the error message here, for historical reasons, @@ -147,6 +151,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrfragment_mgr()->exec_plan_fragment( ctx->put_result.pipeline_params, [ctx, this](RuntimeState* state, Status* status) { + if (ctx->group_commit) { + ctx->label = state->import_label(); + ctx->txn_id = state->wal_id(); + } ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); ctx->commit_infos = std::move(state->tablet_commit_infos()); if (status->ok()) { @@ -157,7 +165,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrnumber_total_rows - ctx->number_unselected_rows; - if (num_selected_rows > 0 && + if (!ctx->group_commit && num_selected_rows > 0 && (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { // NOTE: Do not modify the error message here, for historical reasons, diff --git a/be/src/vec/core/future_block.cpp b/be/src/vec/core/future_block.cpp index 3f3f59c344..19cb09163a 100644 --- a/be/src/vec/core/future_block.cpp +++ b/be/src/vec/core/future_block.cpp @@ -21,11 +21,9 @@ namespace doris::vectorized { -void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id, bool first, bool eos) { +void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id) { this->_schema_version = schema_version; this->_load_id = load_id; - this->_first = first; - this->_eos = eos; } void FutureBlock::set_result(Status status, int64_t total_rows, int64_t loaded_rows) { @@ -35,7 +33,7 @@ void FutureBlock::set_result(Status status, int64_t total_rows, int64_t loaded_r void FutureBlock::swap_future_block(std::shared_ptr other) { Block::swap(*other.get()); - set_info(other->_schema_version, other->_load_id, other->_first, other->_eos); + set_info(other->_schema_version, other->_load_id); lock = other->lock; cv = other->cv; _result = other->_result; diff --git a/be/src/vec/core/future_block.h b/be/src/vec/core/future_block.h index ca63fa3799..ee943b3f79 100644 --- a/be/src/vec/core/future_block.h +++ b/be/src/vec/core/future_block.h @@ -30,12 +30,9 @@ class FutureBlock : public Block { public: FutureBlock() : Block() {}; void swap_future_block(std::shared_ptr other); - void set_info(int64_t block_schema_version, const TUniqueId& load_id, bool first, - bool block_eos); + void set_info(int64_t block_schema_version, const TUniqueId& load_id); int64_t get_schema_version() { return _schema_version; } TUniqueId get_load_id() { return _load_id; } - bool is_first() { return _first; } - bool is_eos() { return _eos; } // hold lock before call this function void set_result(Status status, int64_t total_rows = 0, int64_t loaded_rows = 0); @@ -50,8 +47,6 @@ public: private: int64_t _schema_version; TUniqueId _load_id; - bool _first = false; - bool _eos = false; std::shared_ptr> _result = std::make_shared>(false, Status::OK(), 0, 0); diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 08f6d87ade..d7df3a2e69 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -26,7 +26,7 @@ namespace doris { -namespace stream_load { +namespace vectorized { GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector& texprs, Status* status) @@ -44,6 +44,9 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) { _tuple_desc_id = table_sink.tuple_id; _schema.reset(new OlapTableSchemaParam()); RETURN_IF_ERROR(_schema->init(table_sink.schema)); + _db_id = table_sink.db_id; + _table_id = table_sink.table_id; + _base_schema_version = table_sink.base_schema_version; return Status::OK(); } @@ -77,6 +80,31 @@ Status GroupCommitBlockSink::open(RuntimeState* state) { return vectorized::VExpr::open(_output_vexpr_ctxs, state); } +Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) { + if (_load_block_queue) { + _load_block_queue->remove_load_id(_load_id); + } + RETURN_IF_ERROR(DataSink::close(state, close_status)); + RETURN_IF_ERROR(close_status); + // wait to wal + int64_t total_rows = 0; + int64_t loaded_rows = 0; + for (const auto& future_block : _future_blocks) { + std::unique_lock l(*(future_block->lock)); + if (!future_block->is_handled()) { + future_block->cv->wait(l); + } + // future_block->get_status() + loaded_rows += future_block->get_loaded_rows(); + total_rows += future_block->get_total_rows(); + } + state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows - + loaded_rows); + state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() + + state->num_rows_load_filtered()); + return Status::OK(); +} + Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -97,9 +125,43 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_ bool has_filtered_rows = false; RETURN_IF_ERROR(_block_convertor->validate_and_convert_block( state, input_block, block, _output_vexpr_ctxs, rows, has_filtered_rows)); - block->swap(*input_block); - return Status::OK(); + // add block into block queue + return _add_block(state, block); } -} // namespace stream_load +Status GroupCommitBlockSink::_add_block(RuntimeState* state, + std::shared_ptr block) { + if (block->rows() == 0) { + return Status::OK(); + } + // add block to queue + auto _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); + { + vectorized::IColumn::Selector selector; + for (auto i = 0; i < block->rows(); i++) { + selector.emplace_back(i); + } + block->append_to_block_by_selector(_cur_mutable_block.get(), selector); + } + std::shared_ptr output_block = + std::make_shared(_cur_mutable_block->to_block()); + + std::shared_ptr future_block = + std::make_shared(); + future_block->swap(*(output_block.get())); + TUniqueId load_id; + load_id.__set_hi(load_id.hi); + load_id.__set_lo(load_id.lo); + future_block->set_info(_base_schema_version, load_id); + if (_load_block_queue == nullptr) { + RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue( + _db_id, _table_id, future_block, _load_block_queue)); + state->set_import_label(_load_block_queue->label); + state->set_wal_id(_load_block_queue->txn_id); + } + _future_blocks.emplace_back(future_block); + return _load_block_queue->add_block(future_block); +} + +} // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/group_commit_block_sink.h b/be/src/vec/sink/group_commit_block_sink.h index a309413f5a..ff798ffb00 100644 --- a/be/src/vec/sink/group_commit_block_sink.h +++ b/be/src/vec/sink/group_commit_block_sink.h @@ -24,8 +24,11 @@ namespace doris { class OlapTableSchemaParam; class MemTracker; +class LoadBlockQueue; -namespace stream_load { +namespace vectorized { + +class FutureBlock; class GroupCommitBlockSink : public DataSink { public: @@ -42,7 +45,11 @@ public: Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; + Status close(RuntimeState* state, Status close_status) override; + private: + Status _add_block(RuntimeState* state, std::shared_ptr block); + vectorized::VExprContextSPtrs _output_vexpr_ctxs; int _tuple_desc_id = -1; @@ -53,7 +60,14 @@ private: // this is tuple descriptor of destination OLAP table TupleDescriptor* _output_tuple_desc = nullptr; std::unique_ptr _block_convertor; + + int64_t _db_id; + int64_t _table_id; + int64_t _base_schema_version = 0; + UniqueId _load_id; + std::shared_ptr _load_block_queue; + std::vector> _future_blocks; }; -} // namespace stream_load +} // namespace vectorized } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 637462931b..dad7dc4601 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -48,6 +48,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.ExportSink; +import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.planner.GroupCommitOlapTableSink; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.StreamLoadPlanner; @@ -167,6 +168,7 @@ public class NativeInsertStmt extends InsertStmt { private long tableId = -1; // true if be generates an insert from group commit tvf stmt and executes to load data public boolean isGroupCommitTvf = false; + public boolean isGroupCommitStreamLoadSql = false; private boolean isFromDeleteOrUpdateStmt = false; @@ -933,10 +935,17 @@ public class NativeInsertStmt extends InsertStmt { } if (targetTable instanceof OlapTable) { checkInnerGroupCommit(); - OlapTableSink sink = isGroupCommitTvf ? new GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple, - targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()) - : new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, - analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + OlapTableSink sink; + if (isGroupCommitTvf) { + sink = new GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple, + targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + } else if (isGroupCommitStreamLoadSql) { + sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple, + targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + } else { + sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds, + analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert()); + } dataSink = sink; sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols); dataPartition = dataSink.getOutputPartition(); @@ -1092,7 +1101,8 @@ public class NativeInsertStmt extends InsertStmt { streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1) .setTbl(getTbl()) .setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN) - .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId); + .setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId) + .setGroupCommit(true); StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest); StreamLoadPlanner planner = new StreamLoadPlanner((Database) getDbObj(), olapTable, streamLoadTask); // Will using load id as query id in fragment diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java new file mode 100644 index 0000000000..63ab187335 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.thrift.TDataSinkType; + +import java.util.List; + +public class GroupCommitBlockSink extends OlapTableSink { + + public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, + boolean singleReplicaLoad) { + super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad); + } + + protected TDataSinkType getDataSinkType() { + return TDataSinkType.GROUP_COMMIT_BLOCK_SINK; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index bd1f6c8bff..23bec446f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -125,6 +125,7 @@ public class OlapTableSink extends DataSink { tSink.setLoadId(loadId); tSink.setTxnId(txnId); tSink.setDbId(dbId); + tSink.setBaseSchemaVersion(dstTable.getBaseSchemaVersion()); tSink.setLoadChannelTimeoutS(loadChannelTimeoutS); tSink.setSendBatchParallelism(sendBatchParallelism); this.isStrictMode = isStrictMode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index a6e47f33f2..934bca7ac0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -48,6 +48,7 @@ import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.LoadTaskInfo; +import org.apache.doris.task.StreamLoadTask; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -254,10 +255,15 @@ public class StreamLoadPlanner { // create dest sink List partitionIds = getAllPartitionIds(); - OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load); - olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, - taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); + OlapTableSink olapTableSink; + if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).isGroupCommit()) { + olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, + Config.enable_single_replica_load); + } else { + olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); + } + olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), + taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); olapTableSink.complete(analyzer); @@ -463,8 +469,13 @@ public class StreamLoadPlanner { // create dest sink List partitionIds = getAllPartitionIds(); - OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, - Config.enable_single_replica_load); + OlapTableSink olapTableSink; + if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).isGroupCommit()) { + olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds, + Config.enable_single_replica_load); + } else { + olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load); + } olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode()); olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 7d8626c1ea..d9d0a03f3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2090,8 +2090,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { NativeInsertStmt parsedStmt = (NativeInsertStmt) SqlParserUtils.getFirstStmt(parser); parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0)); parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); - if (request.isGroupCommit() && parsedStmt.getLabel() != null) { - throw new AnalysisException("label and group_commit can't be set at the same time"); + if (request.isGroupCommit()) { + if (parsedStmt.getLabel() != null) { + throw new AnalysisException("label and group_commit can't be set at the same time"); + } + parsedStmt.isGroupCommitStreamLoadSql = true; } StmtExecutor executor = new StmtExecutor(ctx, parsedStmt); ctx.setExecutor(executor); @@ -2235,13 +2238,15 @@ public class FrontendServiceImpl implements FrontendService.Iface { StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId(), multiTableFragmentInstanceIdIndex); - // add table indexes to transaction state - TransactionState txnState = Env.getCurrentGlobalTransactionMgr() - .getTransactionState(db.getId(), request.getTxnId()); - if (txnState == null) { - throw new UserException("txn does not exist: " + request.getTxnId()); + if (!request.isGroupCommit()) { + // add table indexes to transaction state + TransactionState txnState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(db.getId(), request.getTxnId()); + if (txnState == null) { + throw new UserException("txn does not exist: " + request.getTxnId()); + } + txnState.addTableIndexes(table); } - txnState.addTableIndexes(table); return plan; } finally { table.readUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java index 3abcbee06e..0b08f8c988 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java @@ -69,10 +69,10 @@ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunct Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn(); List tableColumns = table.getBaseSchema(false); for (int i = 1; i <= tableColumns.size(); i++) { - fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getDataType(), true)); + fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true)); } if (deleteSignColumn != null) { - fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getDataType(), true)); + fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getType(), true)); } return fileColumns; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 4bcf28da15..485a3599b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -94,6 +94,8 @@ public class StreamLoadTask implements LoadTaskInfo { private byte escape = 0; + private boolean groupCommit = false; + public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType, TFileCompressType compressType) { this.id = id; @@ -312,6 +314,7 @@ public class StreamLoadTask implements LoadTaskInfo { request.getFileType(), request.getFormatType(), request.getCompressType()); streamLoadTask.setOptionalFromTSLPutRequest(request); + streamLoadTask.setGroupCommit(request.isGroupCommit()); if (request.isSetFileSize()) { streamLoadTask.fileSize = request.getFileSize(); } @@ -519,5 +522,13 @@ public class StreamLoadTask implements LoadTaskInfo { public double getMaxFilterRatio() { return maxFilterRatio; } + + public void setGroupCommit(boolean groupCommit) { + this.groupCommit = groupCommit; + } + + public boolean isGroupCommit() { + return groupCommit; + } } diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index f1cad4cf26..91458072d3 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -37,6 +37,7 @@ enum TDataSinkType { JDBC_TABLE_SINK, MULTI_CAST_DATA_STREAM_SINK, GROUP_COMMIT_OLAP_TABLE_SINK, + GROUP_COMMIT_BLOCK_SINK, } enum TResultSinkType { @@ -255,6 +256,7 @@ struct TOlapTableSink { 18: optional Descriptors.TOlapTableLocationParam slave_location 19: optional i64 txn_timeout_s // timeout of load txn in second 20: optional bool write_file_cache + 21: optional i64 base_schema_version } struct TDataSink { diff --git a/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out b/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out index 5ee9ddd0be..97fc189755 100644 --- a/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out +++ b/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out @@ -91,3 +91,6 @@ q 50 q 50 q 50 +-- !sql -- +0 service_46da0dab-e27d-4820-aea2-9bfc15741615 1697032066304 0 3229b7cd-f3a2-4359-aa24-946388c9cc54 0 CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAEY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAIY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAMY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAUY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAYY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAcY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAgY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAkY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAoY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAsY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAwY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECA0Y/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECA4Y/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECA8Y/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECBAY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECBEY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECBIY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECBMY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5IixzZXJ2aWNlXzQ2ZGEwZGFiLWUyN2QtNDgyMC1hZWEyLTliZmMxNTc0MTYxNSo0c2VydmljZV9pbnN0YW5jZWFjODlhNGI3LTgxZjctNDNlOC04NWVkLWQyYjU3OGQ5ODA1MA== 1697032066304 36b2d9ff-4c25-49f3-a726-eea812564411 355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a false 3 service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050 statement: b9903670-3821-4f4c-a587-bbcf02c04b77 ["[tagKey_5=tagValue_5, tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_11=tagValue_11, tagKey_10=tagValue_10, tagKey_4=tagValue_4, tagKey_13=tagValue_13, tagKey_14=tagValue_14, tagKey_2=tagValue_2, tagKey_17=tagValue_17, tagKey_19=tagValue_19, tagKey_0=tagValue_0, tagKey_18=tagValue_18, tagKey_9=tagValue_9, tagKey_7=tagValue_7, tagKey_12=tagValue_12]"] + diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy index eed1a26f14..c5ee05ac1e 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy @@ -53,6 +53,20 @@ suite("insert_group_commit_into_duplicate") { return false } + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + } + try { // create table sql """ drop table if exists ${table}; """ @@ -74,20 +88,6 @@ suite("insert_group_commit_into_duplicate") { ); """ - def group_commit_insert = { sql, expected_row_count -> - def stmt = prepareStatement """ ${sql} """ - def result = stmt.executeUpdate() - logger.info("insert result: " + result) - def serverInfo = (((StatementImpl) stmt).results).getServerInfo() - logger.info("result server info: " + serverInfo) - if (result != expected_row_count) { - logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) - } - // assertEquals(result, expected_row_count) - assertTrue(serverInfo.contains("'status':'PREPARE'")) - assertTrue(serverInfo.contains("'label':'group_commit_")) - } - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { sql """ set enable_insert_group_commit = true; """ // TODO @@ -178,4 +178,54 @@ suite("insert_group_commit_into_duplicate") { } finally { // try_sql("DROP TABLE ${table}") } + + // table with array type + tableName = "insert_group_commit_into_duplicate_array" + table = dbName + "." + tableName + try { + // create table + sql """ drop table if exists ${table}; """ + + sql """ + CREATE table ${table} ( + teamID varchar(255), + service_id varchar(255), + start_time BigInt, + time_bucket BigInt , + segment_id String , + trace_id String , + data_binary String , + end_time BigInt , + endpoint_id String , + endpoint_name String , + is_error Boolean , + latency Int , + service_instance_id String , + statement String , + tags Array + ) UNIQUE key (`teamID`,`service_id`, `start_time`) + DISTRIBUTED BY hash(`start_time`) + BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + // TODO + sql """ set enable_nereids_dml = false; """ + + // 1. insert into + group_commit_insert """ + INSERT INTO ${table} (`data_binary`, `end_time`, `endpoint_id`, `endpoint_name`, `is_error`, `latency`, `segment_id`, `service_id`, `service_instance_id`, `start_time`, `statement`, `tags`, `teamID`, `time_bucket`, `trace_id`) + VALUES + ('CgEwEiQzMjI5YjdjZC1mM2EyLTQzNTktYWEyNC05NDYzODhjOWNjNTQaggQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAEY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAIY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAMY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAQY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAUY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAYY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAcY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAgY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAkY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAoY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAsY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECAwY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECA0Y/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECA4Y/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECA8Y/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECBAY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECBEY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECBIY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5GoQECBMY/6n597ExIP+p+fexMWIWCgh0YWdLZXlfMBIKdGFnVmFsdWVfMGIWCgh0YWdLZXlfMRIKdGFnVmFsdWVfMWIWCgh0YWdLZXlfMhIKdGFnVmFsdWVfMmIWCgh0YWdLZXlfMxIKdGFnVmFsdWVfM2IWCgh0YWdLZXlfNBIKdGFnVmFsdWVfNGIWCgh0YWdLZXlfNRIKdGFnVmFsdWVfNWIWCgh0YWdLZXlfNhIKdGFnVmFsdWVfNmIWCgh0YWdLZXlfNxIKdGFnVmFsdWVfN2IWCgh0YWdLZXlfOBIKdGFnVmFsdWVfOGIWCgh0YWdLZXlfORIKdGFnVmFsdWVfOWIYCgl0YWdLZXlfMTASC3RhZ1ZhbHVlXzEwYhgKCXRhZ0tleV8xMRILdGFnVmFsdWVfMTFiGAoJdGFnS2V5XzEyEgt0YWdWYWx1ZV8xMmIYCgl0YWdLZXlfMTMSC3RhZ1ZhbHVlXzEzYhgKCXRhZ0tleV8xNBILdGFnVmFsdWVfMTRiGAoJdGFnS2V5XzE1Egt0YWdWYWx1ZV8xNWIYCgl0YWdLZXlfMTYSC3RhZ1ZhbHVlXzE2YhgKCXRhZ0tleV8xNxILdGFnVmFsdWVfMTdiGAoJdGFnS2V5XzE4Egt0YWdWYWx1ZV8xOGIYCgl0YWdLZXlfMTkSC3RhZ1ZhbHVlXzE5IixzZXJ2aWNlXzQ2ZGEwZGFiLWUyN2QtNDgyMC1hZWEyLTliZmMxNTc0MTYxNSo0c2VydmljZV9pbnN0YW5jZWFjODlhNGI3LTgxZjctNDNlOC04NWVkLWQyYjU3OGQ5ODA1MA==', + 1697032066304, '36b2d9ff-4c25-49f3-a726-eea812564411', '355f96cd-b1b1-4688-a5f6-a8e3f3a55c9a', false, 3, '3229b7cd-f3a2-4359-aa24-946388c9cc54', 'service_46da0dab-e27d-4820-aea2-9bfc15741615', 'service_instanceac89a4b7-81f7-43e8-85ed-d2b578d98050', 1697032066304, 'statement: b9903670-3821-4f4c-a587-bbcf02c04b77', ['[tagKey_5=tagValue_5, tagKey_3=tagValue_3, tagKey_1=tagValue_1, tagKey_16=tagValue_16, tagKey_8=tagValue_8, tagKey_15=tagValue_15, tagKey_6=tagValue_6, tagKey_11=tagValue_11, tagKey_10=tagValue_10, tagKey_4=tagValue_4, tagKey_13=tagValue_13, tagKey_14=tagValue_14, tagKey_2=tagValue_2, tagKey_17=tagValue_17, tagKey_19=tagValue_19, tagKey_0=tagValue_0, tagKey_18=tagValue_18, tagKey_9=tagValue_9, tagKey_7=tagValue_7, tagKey_12=tagValue_12]'], '0', 0, '0'); + """, 1 + + getRowCount(1) + qt_sql """ select * from ${table}; """ + } + } finally { + // try_sql("DROP TABLE ${table}") + } } diff --git a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy index b1c51286e0..818749f2ff 100644 --- a/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_group_commit_http_stream.groovy @@ -49,6 +49,26 @@ suite("test_group_commit_http_stream") { return false } + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + try { // create table sql """ drop table if exists ${tableName}; """ @@ -85,6 +105,10 @@ suite("test_group_commit_http_stream") { unset 'label' time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } } } @@ -101,6 +125,10 @@ suite("test_group_commit_http_stream") { unset 'label' time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } } // stream load with different column order @@ -116,6 +144,10 @@ suite("test_group_commit_http_stream") { unset 'label' time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } } // stream load with where condition @@ -133,17 +165,8 @@ suite("test_group_commit_http_stream") { time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertTrue(json.GroupCommit) - // assertEquals(2, json.NumberTotalRows) - assertEquals(1, json.NumberLoadedRows) - assertEquals(0, json.NumberFilteredRows) - // assertEquals(1, json.NumberUnselectedRows) + // TODO different with stream load: 2, 1, 0, 1 + checkStreamLoadResult(exception, result, 1, 1, 0, 0) } } @@ -160,6 +183,10 @@ suite("test_group_commit_http_stream") { unset 'label' time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } } // stream load with filtered rows @@ -179,18 +206,8 @@ suite("test_group_commit_http_stream") { time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertTrue(json.GroupCommit) - // assertEquals(6, json.NumberTotalRows) - // assertEquals(2, json.NumberLoadedRows) - // assertEquals(3, json.NumberFilteredRows) - // assertEquals(1, json.NumberUnselectedRows) - // assertFalse(json.ErrorURL.isEmpty()) + // TODO different with stream load: 6, 2, 3, 1 + checkStreamLoadResult(exception, result, 6, 4, 2, 0) } } @@ -217,7 +234,7 @@ suite("test_group_commit_http_stream") { } } - getRowCount(7) + getRowCount(23) qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; " } finally { // try_sql("DROP TABLE ${tableName}") @@ -301,18 +318,7 @@ suite("test_group_commit_http_stream") { // if declared a check callback, the default check condition will ignore. // So you must check all condition check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load ${i}, result: ${result}") - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - if (json.NumberLoadedRows != 600572) { - logger.warn("Stream load ${i}, loaded rows: ${json.NumberLoadedRows}") - } - assertTrue(json.LoadBytes > 0) - assertTrue(json.GroupCommit) + checkStreamLoadResult(exception, result, 600572, 600572, 0, 0) } } } diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index 6034e35a10..b5f46f2922 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -48,6 +48,26 @@ suite("test_group_commit_stream_load") { return false } + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + try { // create table sql """ drop table if exists ${tableName}; """ @@ -84,6 +104,10 @@ suite("test_group_commit_stream_load") { unset 'label' time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } } } @@ -98,6 +122,10 @@ suite("test_group_commit_stream_load") { unset 'label' time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } } // stream load with different column order @@ -111,6 +139,10 @@ suite("test_group_commit_stream_load") { unset 'label' time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } } // stream load with where condition @@ -127,17 +159,7 @@ suite("test_group_commit_stream_load") { time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertTrue(json.GroupCommit) - assertEquals(2, json.NumberTotalRows) - assertEquals(1, json.NumberLoadedRows) - assertEquals(0, json.NumberFilteredRows) - assertEquals(1, json.NumberUnselectedRows) + checkStreamLoadResult(exception, result, 2, 1, 0, 1) } } @@ -152,6 +174,10 @@ suite("test_group_commit_stream_load") { unset 'label' time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } } // stream load with filtered rows @@ -168,18 +194,7 @@ suite("test_group_commit_stream_load") { time 10000 // limit inflight 10s check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertTrue(json.GroupCommit) - assertEquals(6, json.NumberTotalRows) - assertEquals(2, json.NumberLoadedRows) - assertEquals(3, json.NumberFilteredRows) - assertEquals(1, json.NumberUnselectedRows) - assertFalse(json.ErrorURL.isEmpty()) + checkStreamLoadResult(exception, result, 6, 2, 3, 1) } } @@ -286,19 +301,7 @@ suite("test_group_commit_stream_load") { // if declared a check callback, the default check condition will ignore. // So you must check all condition check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load ${i}, result: ${result}") - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - if (json.NumberLoadedRows != 600572) { - logger.warn("Stream load ${i}, loaded rows: ${json.NumberLoadedRows}") - } - // assertEquals(json.NumberLoadedRows, 600572) - assertTrue(json.LoadBytes > 0) - assertTrue(json.GroupCommit) + checkStreamLoadResult(exception, result, 600572, 600572, 0, 0) } } }