[improvement](insert) refactor group commit stream load (#25560)

This commit is contained in:
meiyi
2023-10-20 13:27:30 +08:00
committed by GitHub
parent 9a675fcdfc
commit d0cd535cb9
26 changed files with 518 additions and 358 deletions

View File

@ -1099,7 +1099,7 @@ DEFINE_Int32(group_commit_sync_wal_batch, "10");
// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_mInt32(group_commit_interval_seconds, "10");
DEFINE_mInt32(group_commit_interval_ms, "10000");
DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");

View File

@ -1162,7 +1162,7 @@ DECLARE_Int32(group_commit_sync_wal_batch);
// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);
DECLARE_mInt32(group_commit_interval_seconds);
DECLARE_mInt32(group_commit_interval_ms);
// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.

View File

@ -31,6 +31,7 @@
#include "common/config.h"
#include "vec/sink/async_writer_sink.h"
#include "vec/sink/group_commit_block_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmemory_scratch_sink.h"
@ -163,6 +164,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status));
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
return Status::NotSupported("MULTI_CAST_DATA_STREAM_SINK only support in pipeline engine");
}
@ -319,6 +327,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
sink->reset(new vectorized::GroupCommitBlockSink(pool, row_desc, output_exprs, &status));
RETURN_IF_ERROR(status);
break;
}
default: {
std::stringstream error_msg;
std::map<int, const char*>::const_iterator i =

View File

@ -324,12 +324,6 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
if (ctx->group_commit) {
ctx->db_id = ctx->put_result.db_id;
ctx->table_id = ctx->put_result.table_id;
ctx->schema_version = ctx->put_result.base_schema_version;
return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
}
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}

View File

@ -601,13 +601,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return Status::OK();
}
if (ctx->group_commit) {
ctx->db_id = ctx->put_result.db_id;
ctx->table_id = ctx->put_result.table_id;
ctx->schema_version = ctx->put_result.base_schema_version;
return _exec_env->group_commit_mgr()->group_commit_stream_load(ctx);
}
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}

View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "operator.h"
#include "vec/sink/group_commit_block_sink.h"
namespace doris {
namespace pipeline {
class GroupCommitBlockSinkOperatorBuilder final
: public DataSinkOperatorBuilder<vectorized::GroupCommitBlockSink> {
public:
GroupCommitBlockSinkOperatorBuilder(int32_t id, DataSink* sink)
: DataSinkOperatorBuilder(id, "GroupCommitBlockSinkOperator", sink) {}
OperatorPtr build_operator() override;
};
class GroupCommitBlockSinkOperator final
: public DataSinkOperator<GroupCommitBlockSinkOperatorBuilder> {
public:
GroupCommitBlockSinkOperator(OperatorBuilderBase* operator_builder, DataSink* sink)
: DataSinkOperator(operator_builder, sink) {}
bool can_write() override { return true; } // TODO: need use mem_limit
};
OperatorPtr GroupCommitBlockSinkOperatorBuilder::build_operator() {
return std::make_shared<GroupCommitBlockSinkOperator>(this, _sink);
}
} // namespace pipeline
} // namespace doris

View File

@ -57,6 +57,7 @@
#include "pipeline/exec/empty_source_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/group_commit_block_sink_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
@ -770,6 +771,11 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
}
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
sink_ = std::make_shared<GroupCommitBlockSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
break;
}
case TDataSinkType::MYSQL_TABLE_SINK:
case TDataSinkType::JDBC_TABLE_SINK:
case TDataSinkType::ODBC_TABLE_SINK: {

View File

@ -48,11 +48,6 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::FutureBlock> block)
if (block->rows() > 0) {
_block_queue.push_back(block);
}
if (block->is_eos()) {
_load_ids.erase(block->get_load_id());
} else if (block->is_first()) {
_load_ids.emplace(block->get_load_id());
}
_cv->notify_one();
return Status::OK();
}
@ -62,31 +57,31 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
*eos = false;
std::unique_lock l(*_mutex);
if (!need_commit) {
auto left_seconds = config::group_commit_interval_seconds -
std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (left_seconds <= 0) {
auto left_milliseconds = config::group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (left_milliseconds <= 0) {
need_commit = true;
}
}
while (_status.ok() && _block_queue.empty() &&
(!need_commit || (need_commit && !_load_ids.empty()))) {
auto left_seconds = config::group_commit_interval_seconds;
auto left_milliseconds = config::group_commit_interval_ms;
if (!need_commit) {
left_seconds = config::group_commit_interval_seconds -
std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (left_seconds <= 0) {
left_milliseconds = config::group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (left_milliseconds <= 0) {
need_commit = true;
break;
}
}
#if !defined(USE_BTHREAD_SCANNER)
_cv->wait_for(l, std::chrono::seconds(left_seconds));
_cv->wait_for(l, std::chrono::milliseconds(left_milliseconds));
#else
_cv->wait_for(l, left_seconds * 1000000);
_cv->wait_for(l, left_milliseconds * 1000);
#endif
}
if (!_block_queue.empty()) {
@ -96,12 +91,10 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
*find_block = true;
_block_queue.pop_front();
}
if (_block_queue.empty()) {
if (need_commit && _load_ids.empty()) {
*eos = true;
} else {
*eos = false;
}
if (_block_queue.empty() && need_commit && _load_ids.empty()) {
*eos = true;
} else {
*eos = false;
}
return Status::OK();
}
@ -114,6 +107,16 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
}
}
Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
std::unique_lock l(*_mutex);
if (need_commit) {
return Status::InternalError("block queue is set need commit, id=" +
load_instance_id.to_string());
}
_load_ids.emplace(load_id);
return Status::OK();
}
void LoadBlockQueue::cancel(const Status& st) {
DCHECK(!st.ok());
std::unique_lock l(*_mutex);
@ -133,59 +136,62 @@ Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
DCHECK(table_id == _table_id);
DCHECK(block->is_first() == true);
auto base_schema_version = block->get_schema_version();
{
std::unique_lock l(_lock);
for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) {
// TODO if block schema version is less than fragment schema version, return error
if (!it->second->need_commit &&
it->second->schema_version == block->get_schema_version()) {
if (block->get_schema_version() == it->second->schema_version) {
load_block_queue = it->second;
break;
} else if (block->get_schema_version() < it->second->schema_version) {
return Status::DataQualityError("schema version not match");
}
}
}
}
if (load_block_queue == nullptr) {
Status st = Status::OK();
for (int i = 0; i < 3; ++i) {
std::unique_lock l(_request_fragment_mutex);
// check if there is a re-usefully fragment
{
std::unique_lock l1(_lock);
for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) {
// TODO if block schema version is less than fragment schema version, return error
if (!it->second->need_commit) {
if (block->get_schema_version() == it->second->schema_version) {
for (int i = 0; i < 3; i++) {
bool is_schema_version_match = true;
for (auto it = _load_block_queues.begin(); it != _load_block_queues.end(); ++it) {
if (!it->second->need_commit) {
if (base_schema_version == it->second->schema_version) {
if (it->second->add_load_id(block->get_load_id()).ok()) {
load_block_queue = it->second;
break;
} else if (block->get_schema_version() < it->second->schema_version) {
return Status::DataQualityError("schema version not match");
return Status::OK();
}
} else if (base_schema_version < it->second->schema_version) {
is_schema_version_match = false;
}
}
}
if (load_block_queue == nullptr) {
st = _create_group_commit_load(table_id, load_block_queue);
if (LIKELY(st.ok())) {
break;
if (!is_schema_version_match) {
return Status::DataQualityError("schema version not match");
}
if (!_need_plan_fragment) {
_need_plan_fragment = true;
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
[[maybe_unused]] auto st = _create_group_commit_load(load_block_queue);
}));
}
#if !defined(USE_BTHREAD_SCANNER)
_cv.wait_for(l, std::chrono::seconds(4));
#else
_cv.wait_for(l, 4 * 1000000);
#endif
if (load_block_queue != nullptr) {
if (load_block_queue->schema_version == base_schema_version) {
if (load_block_queue->add_load_id(block->get_load_id()).ok()) {
return Status::OK();
}
} else if (base_schema_version < load_block_queue->schema_version) {
return Status::DataQualityError("schema version not match");
}
load_block_queue.reset();
}
}
RETURN_IF_ERROR(st);
if (load_block_queue->schema_version != block->get_schema_version()) {
// TODO check this is the first block
return Status::DataQualityError("schema version not match");
}
}
return Status::OK();
return Status::InternalError("can not get a block queue");
}
Status GroupCommitTable::_create_group_commit_load(
int64_t table_id, std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
Status st = Status::OK();
std::unique_ptr<int, std::function<void(int*)>> remove_pipe_func((int*)0x01, [&](int*) {
if (!st.ok()) {
std::unique_lock l(_lock);
_need_plan_fragment = false;
_cv.notify_all();
}
});
TStreamLoadPutRequest request;
UniqueId load_id = UniqueId::gen_uid();
TUniqueId tload_id;
@ -194,8 +200,8 @@ Status GroupCommitTable::_create_group_commit_load(
std::regex reg("-");
std::string label = "group_commit_" + std::regex_replace(load_id.to_string(), reg, "_");
std::stringstream ss;
ss << "insert into table_id(" << table_id << ") WITH LABEL " << label
<< " select * from group_commit(\"table_id\"=\"" << table_id << "\")";
ss << "insert into table_id(" << _table_id << ") WITH LABEL " << label
<< " select * from group_commit(\"table_id\"=\"" << _table_id << "\")";
request.__set_load_sql(ss.str());
request.__set_loadId(tload_id);
request.__set_label(label);
@ -209,13 +215,14 @@ Status GroupCommitTable::_create_group_commit_load(
}
TStreamLoadPutResult result;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&result, &request](FrontendServiceConnection& client) {
client->streamLoadPut(result, request);
},
10000L));
Status st = Status::create(result.status);
10000L);
RETURN_IF_ERROR(st);
st = Status::create(result.status);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
}
@ -236,7 +243,7 @@ Status GroupCommitTable::_create_group_commit_load(
DCHECK(pipeline_params.local_params.size() == 1);
instance_id = pipeline_params.local_params[0].fragment_instance_id;
}
VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << table_id
VLOG_DEBUG << "create plan fragment, db_id=" << _db_id << ", table=" << _table_id
<< ", schema version=" << schema_version << ", label=" << label
<< ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id)
<< ", is_pipeline=" << is_pipeline;
@ -245,11 +252,13 @@ Status GroupCommitTable::_create_group_commit_load(
std::make_shared<LoadBlockQueue>(instance_id, label, txn_id, schema_version);
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
_cv.notify_all();
}
params.__set_import_label(label);
st = _exec_plan_fragment(_db_id, table_id, label, txn_id, is_pipeline, params, pipeline_params);
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params,
pipeline_params);
if (!st.ok()) {
static_cast<void>(_finish_group_commit_load(_db_id, table_id, label, txn_id, instance_id,
static_cast<void>(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id,
st, true, nullptr));
}
return st;
@ -346,6 +355,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
if (state && !(state->get_error_log_file_path().empty())) {
ss << ", error_url=" << state->get_error_log_file_path();
}
ss << ", rows=" << state->num_rows_load_success();
LOG(INFO) << ss.str();
return st;
}
@ -384,6 +394,10 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : _exec_env(exec_env) {
.set_min_threads(config::group_commit_insert_threads)
.set_max_threads(config::group_commit_insert_threads)
.build(&_insert_into_thread_pool));
static_cast<void>(ThreadPoolBuilder("GroupCommitThreadPool")
.set_min_threads(1)
.set_max_threads(config::group_commit_insert_threads)
.build(&_thread_pool));
}
GroupCommitMgr::~GroupCommitMgr() {
@ -392,6 +406,7 @@ GroupCommitMgr::~GroupCommitMgr() {
void GroupCommitMgr::stop() {
_insert_into_thread_pool->shutdown();
_thread_pool->shutdown();
LOG(INFO) << "GroupCommitMgr is stopped";
}
@ -456,17 +471,16 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
std::unique_ptr<doris::vectorized::Block> _block =
doris::vectorized::Block::create_unique();
bool eof = false;
bool first = true;
while (!eof) {
// TODO what to do if read one block error
RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), _block.get(), &eof));
std::shared_ptr<doris::vectorized::FutureBlock> future_block =
std::make_shared<doris::vectorized::FutureBlock>();
future_block->swap(*(_block.get()));
future_block->set_info(request->base_schema_version(), load_id, first, eof);
future_block->set_info(request->base_schema_version(), load_id);
if (load_block_queue == nullptr) {
RETURN_IF_ERROR(_get_first_block_load_queue(request->db_id(), table_id,
future_block, load_block_queue));
RETURN_IF_ERROR(get_first_block_load_queue(request->db_id(), table_id, future_block,
load_block_queue));
response->set_label(load_block_queue->label);
response->set_txn_id(load_block_queue->txn_id);
}
@ -475,7 +489,6 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan,
future_blocks.emplace_back(future_block);
}
RETURN_IF_ERROR(load_block_queue->add_block(future_block));
first = false;
}
if (!runtime_state->get_error_log_file_path().empty()) {
LOG(INFO) << "id=" << print_id(load_id)
@ -515,139 +528,15 @@ Status GroupCommitMgr::_append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
return Status::OK();
}
Status GroupCommitMgr::group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx) {
return _insert_into_thread_pool->submit_func([ctx, this] {
Status st = _group_commit_stream_load(ctx);
if (!st.ok()) {
ctx->promise.set_value(st);
}
});
}
Status GroupCommitMgr::_group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx) {
auto& fragment_params = ctx->put_result.params;
auto& tdesc_tbl = fragment_params.desc_tbl;
DCHECK(fragment_params.params.per_node_scan_ranges.size() == 1);
DCHECK(fragment_params.params.per_node_scan_ranges.begin()->second.size() == 1);
auto& tscan_range_params = fragment_params.params.per_node_scan_ranges.begin()->second.at(0);
auto& nodes = fragment_params.fragment.plan.nodes;
DCHECK(nodes.size() > 0);
auto& plan_node = nodes.at(0);
std::vector<std::shared_ptr<doris::vectorized::FutureBlock>> future_blocks;
{
std::shared_ptr<LoadBlockQueue> load_block_queue;
// 1. FileScanNode consumes data from the pipe.
std::unique_ptr<RuntimeState> runtime_state = RuntimeState::create_unique();
TUniqueId load_id;
load_id.hi = ctx->id.hi;
load_id.lo = ctx->id.lo;
TQueryOptions query_options;
query_options.query_type = TQueryType::LOAD;
TQueryGlobals query_globals;
static_cast<void>(runtime_state->init(load_id, query_options, query_globals, _exec_env));
runtime_state->set_query_mem_tracker(std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", ctx->id.to_string()), -1));
DescriptorTbl* desc_tbl = nullptr;
RETURN_IF_ERROR(DescriptorTbl::create(runtime_state->obj_pool(), tdesc_tbl, &desc_tbl));
runtime_state->set_desc_tbl(desc_tbl);
auto file_scan_node =
vectorized::NewFileScanNode(runtime_state->obj_pool(), plan_node, *desc_tbl);
Status status = Status::OK();
auto sink = stream_load::GroupCommitBlockSink(
runtime_state->obj_pool(), file_scan_node.row_desc(),
fragment_params.fragment.output_exprs, &status);
std::unique_ptr<int, std::function<void(int*)>> close_scan_node_func((int*)0x01, [&](int*) {
if (load_block_queue != nullptr) {
load_block_queue->remove_load_id(load_id);
}
static_cast<void>(file_scan_node.close(runtime_state.get()));
static_cast<void>(sink.close(runtime_state.get(), status));
});
RETURN_IF_ERROR(file_scan_node.init(plan_node, runtime_state.get()));
RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
std::vector<TScanRangeParams> params_vector;
params_vector.emplace_back(tscan_range_params);
file_scan_node.set_scan_ranges(params_vector);
RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(sink.init(fragment_params.fragment.output_sink));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink.prepare(runtime_state.get()));
RETURN_IF_ERROR(sink.open(runtime_state.get()));
// 2. Put the block into block queue.
std::unique_ptr<doris::vectorized::Block> _block =
doris::vectorized::Block::create_unique();
bool first = true;
bool eof = false;
while (!eof) {
// TODO what to do if scan one block error
RETURN_IF_ERROR(file_scan_node.get_next(runtime_state.get(), _block.get(), &eof));
RETURN_IF_ERROR(sink.send(runtime_state.get(), _block.get()));
std::shared_ptr<doris::vectorized::FutureBlock> future_block =
std::make_shared<doris::vectorized::FutureBlock>();
future_block->swap(*(_block.get()));
future_block->set_info(ctx->schema_version, load_id, first, eof);
// TODO what to do if add one block error
if (load_block_queue == nullptr) {
RETURN_IF_ERROR(_get_first_block_load_queue(ctx->db_id, ctx->table_id, future_block,
load_block_queue));
ctx->label = load_block_queue->label;
ctx->txn_id = load_block_queue->txn_id;
}
if (future_block->rows() > 0) {
future_blocks.emplace_back(future_block);
}
RETURN_IF_ERROR(load_block_queue->add_block(future_block));
first = false;
}
ctx->number_unselected_rows = runtime_state->num_rows_load_unselected();
ctx->number_filtered_rows = runtime_state->num_rows_load_filtered();
ctx->error_url = runtime_state->get_error_log_file_path();
if (!runtime_state->get_error_log_file_path().empty()) {
LOG(INFO) << "id=" << print_id(load_id)
<< ", url=" << runtime_state->get_error_log_file_path()
<< ", load rows=" << runtime_state->num_rows_load_total()
<< ", filter rows=" << runtime_state->num_rows_load_filtered()
<< ", unselect rows=" << runtime_state->num_rows_load_unselected()
<< ", success rows=" << runtime_state->num_rows_load_success();
}
}
int64_t total_rows = 0;
int64_t loaded_rows = 0;
// 3. wait to wal
for (const auto& future_block : future_blocks) {
std::unique_lock<doris::Mutex> l(*(future_block->lock));
if (!future_block->is_handled()) {
future_block->cv->wait(l);
}
// future_block->get_status()
total_rows += future_block->get_total_rows();
loaded_rows += future_block->get_loaded_rows();
}
ctx->number_total_rows = total_rows + ctx->number_unselected_rows + ctx->number_filtered_rows;
ctx->number_loaded_rows = loaded_rows;
ctx->number_filtered_rows += total_rows - ctx->number_loaded_rows;
ctx->promise.set_value(Status::OK());
VLOG_DEBUG << "finish read all block of pipe=" << ctx->id.to_string()
<< ", total rows=" << ctx->number_total_rows
<< ", loaded rows=" << ctx->number_loaded_rows
<< ", filtered rows=" << ctx->number_filtered_rows
<< ", unselected rows=" << ctx->number_unselected_rows;
return Status::OK();
}
Status GroupCommitMgr::_get_first_block_load_queue(
Status GroupCommitMgr::get_first_block_load_queue(
int64_t db_id, int64_t table_id, std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
if (_table_map.find(table_id) == _table_map.end()) {
_table_map.emplace(table_id,
std::make_shared<GroupCommitTable>(_exec_env, db_id, table_id));
_table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
_exec_env, _thread_pool.get(), db_id, table_id));
}
group_commit_table = _table_map[table_id];
}

View File

@ -53,6 +53,7 @@ public:
Status add_block(std::shared_ptr<vectorized::FutureBlock> block);
Status get_block(vectorized::Block* block, bool* find_block, bool* eos);
Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
@ -76,8 +77,9 @@ private:
class GroupCommitTable {
public:
GroupCommitTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
: _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {};
GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, int64_t db_id,
int64_t table_id)
: _exec_env(exec_env), _thread_pool(thread_pool), _db_id(db_id), _table_id(table_id) {};
Status get_first_block_load_queue(int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
@ -85,8 +87,7 @@ public:
std::shared_ptr<LoadBlockQueue>& load_block_queue);
private:
Status _create_group_commit_load(int64_t table_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
Status _create_group_commit_load(std::shared_ptr<LoadBlockQueue>& load_block_queue);
Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const std::string& label,
int64_t txn_id, bool is_pipeline,
const TExecPlanFragmentParams& params,
@ -96,13 +97,14 @@ private:
bool prepare_failed, RuntimeState* state);
ExecEnv* _exec_env;
ThreadPool* _thread_pool;
int64_t _db_id;
int64_t _table_id;
doris::Mutex _lock;
doris::ConditionVariable _cv;
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues;
doris::Mutex _request_fragment_mutex;
bool _need_plan_fragment = false;
};
class GroupCommitMgr {
@ -119,22 +121,17 @@ public:
const PGroupCommitInsertRequest* request,
PGroupCommitInsertResponse* response);
// stream load
Status group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
// used when init group_commit_scan_node
Status get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
Status get_first_block_load_queue(int64_t db_id, int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
private:
// used by insert into
Status _append_row(std::shared_ptr<io::StreamLoadPipe> pipe,
const PGroupCommitInsertRequest* request);
// used by stream load
Status _group_commit_stream_load(std::shared_ptr<StreamLoadContext> ctx);
Status _get_first_block_load_queue(int64_t db_id, int64_t table_id,
std::shared_ptr<vectorized::FutureBlock> block,
std::shared_ptr<LoadBlockQueue>& load_block_queue);
ExecEnv* _exec_env;
@ -144,6 +141,7 @@ private:
// thread pool to handle insert into: append data to pipe
std::unique_ptr<doris::ThreadPool> _insert_into_thread_pool;
std::unique_ptr<doris::ThreadPool> _thread_pool;
};
} // namespace doris

View File

@ -74,6 +74,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
if (ctx->put_result.__isset.params) {
st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.params, [ctx, this](RuntimeState* state, Status* status) {
if (ctx->group_commit) {
ctx->label = state->import_label();
ctx->txn_id = state->wal_id();
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
if (status->ok()) {
@ -84,7 +88,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
int64_t num_selected_rows =
ctx->number_total_rows - ctx->number_unselected_rows;
if (num_selected_rows > 0 &&
if (!ctx->group_commit && num_selected_rows > 0 &&
(double)ctx->number_filtered_rows / num_selected_rows >
ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for historical reasons,
@ -147,6 +151,10 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
} else {
st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.pipeline_params, [ctx, this](RuntimeState* state, Status* status) {
if (ctx->group_commit) {
ctx->label = state->import_label();
ctx->txn_id = state->wal_id();
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
if (status->ok()) {
@ -157,7 +165,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
int64_t num_selected_rows =
ctx->number_total_rows - ctx->number_unselected_rows;
if (num_selected_rows > 0 &&
if (!ctx->group_commit && num_selected_rows > 0 &&
(double)ctx->number_filtered_rows / num_selected_rows >
ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for historical reasons,

View File

@ -21,11 +21,9 @@
namespace doris::vectorized {
void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id, bool first, bool eos) {
void FutureBlock::set_info(int64_t schema_version, const TUniqueId& load_id) {
this->_schema_version = schema_version;
this->_load_id = load_id;
this->_first = first;
this->_eos = eos;
}
void FutureBlock::set_result(Status status, int64_t total_rows, int64_t loaded_rows) {
@ -35,7 +33,7 @@ void FutureBlock::set_result(Status status, int64_t total_rows, int64_t loaded_r
void FutureBlock::swap_future_block(std::shared_ptr<FutureBlock> other) {
Block::swap(*other.get());
set_info(other->_schema_version, other->_load_id, other->_first, other->_eos);
set_info(other->_schema_version, other->_load_id);
lock = other->lock;
cv = other->cv;
_result = other->_result;

View File

@ -30,12 +30,9 @@ class FutureBlock : public Block {
public:
FutureBlock() : Block() {};
void swap_future_block(std::shared_ptr<FutureBlock> other);
void set_info(int64_t block_schema_version, const TUniqueId& load_id, bool first,
bool block_eos);
void set_info(int64_t block_schema_version, const TUniqueId& load_id);
int64_t get_schema_version() { return _schema_version; }
TUniqueId get_load_id() { return _load_id; }
bool is_first() { return _first; }
bool is_eos() { return _eos; }
// hold lock before call this function
void set_result(Status status, int64_t total_rows = 0, int64_t loaded_rows = 0);
@ -50,8 +47,6 @@ public:
private:
int64_t _schema_version;
TUniqueId _load_id;
bool _first = false;
bool _eos = false;
std::shared_ptr<std::tuple<bool, Status, int64_t, int64_t>> _result =
std::make_shared<std::tuple<bool, Status, int64_t, int64_t>>(false, Status::OK(), 0, 0);

View File

@ -26,7 +26,7 @@
namespace doris {
namespace stream_load {
namespace vectorized {
GroupCommitBlockSink::GroupCommitBlockSink(ObjectPool* pool, const RowDescriptor& row_desc,
const std::vector<TExpr>& texprs, Status* status)
@ -44,6 +44,9 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
_tuple_desc_id = table_sink.tuple_id;
_schema.reset(new OlapTableSchemaParam());
RETURN_IF_ERROR(_schema->init(table_sink.schema));
_db_id = table_sink.db_id;
_table_id = table_sink.table_id;
_base_schema_version = table_sink.base_schema_version;
return Status::OK();
}
@ -77,6 +80,31 @@ Status GroupCommitBlockSink::open(RuntimeState* state) {
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
if (_load_block_queue) {
_load_block_queue->remove_load_id(_load_id);
}
RETURN_IF_ERROR(DataSink::close(state, close_status));
RETURN_IF_ERROR(close_status);
// wait to wal
int64_t total_rows = 0;
int64_t loaded_rows = 0;
for (const auto& future_block : _future_blocks) {
std::unique_lock<doris::Mutex> l(*(future_block->lock));
if (!future_block->is_handled()) {
future_block->cv->wait(l);
}
// future_block->get_status()
loaded_rows += future_block->get_loaded_rows();
total_rows += future_block->get_total_rows();
}
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows -
loaded_rows);
state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() +
state->num_rows_load_filtered());
return Status::OK();
}
Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_block, bool eos) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
@ -97,9 +125,43 @@ Status GroupCommitBlockSink::send(RuntimeState* state, vectorized::Block* input_
bool has_filtered_rows = false;
RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
state, input_block, block, _output_vexpr_ctxs, rows, has_filtered_rows));
block->swap(*input_block);
return Status::OK();
// add block into block queue
return _add_block(state, block);
}
} // namespace stream_load
Status GroupCommitBlockSink::_add_block(RuntimeState* state,
std::shared_ptr<vectorized::Block> block) {
if (block->rows() == 0) {
return Status::OK();
}
// add block to queue
auto _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty());
{
vectorized::IColumn::Selector selector;
for (auto i = 0; i < block->rows(); i++) {
selector.emplace_back(i);
}
block->append_to_block_by_selector(_cur_mutable_block.get(), selector);
}
std::shared_ptr<vectorized::Block> output_block =
std::make_shared<vectorized::Block>(_cur_mutable_block->to_block());
std::shared_ptr<doris::vectorized::FutureBlock> future_block =
std::make_shared<doris::vectorized::FutureBlock>();
future_block->swap(*(output_block.get()));
TUniqueId load_id;
load_id.__set_hi(load_id.hi);
load_id.__set_lo(load_id.lo);
future_block->set_info(_base_schema_version, load_id);
if (_load_block_queue == nullptr) {
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
_db_id, _table_id, future_block, _load_block_queue));
state->set_import_label(_load_block_queue->label);
state->set_wal_id(_load_block_queue->txn_id);
}
_future_blocks.emplace_back(future_block);
return _load_block_queue->add_block(future_block);
}
} // namespace vectorized
} // namespace doris

View File

@ -24,8 +24,11 @@ namespace doris {
class OlapTableSchemaParam;
class MemTracker;
class LoadBlockQueue;
namespace stream_load {
namespace vectorized {
class FutureBlock;
class GroupCommitBlockSink : public DataSink {
public:
@ -42,7 +45,11 @@ public:
Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override;
Status close(RuntimeState* state, Status close_status) override;
private:
Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block);
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
int _tuple_desc_id = -1;
@ -53,7 +60,14 @@ private:
// this is tuple descriptor of destination OLAP table
TupleDescriptor* _output_tuple_desc = nullptr;
std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor;
int64_t _db_id;
int64_t _table_id;
int64_t _base_schema_version = 0;
UniqueId _load_id;
std::shared_ptr<LoadBlockQueue> _load_block_queue;
std::vector<std::shared_ptr<vectorized::FutureBlock>> _future_blocks;
};
} // namespace stream_load
} // namespace vectorized
} // namespace doris

View File

@ -48,6 +48,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.ExportSink;
import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.planner.GroupCommitOlapTableSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.StreamLoadPlanner;
@ -167,6 +168,7 @@ public class NativeInsertStmt extends InsertStmt {
private long tableId = -1;
// true if be generates an insert from group commit tvf stmt and executes to load data
public boolean isGroupCommitTvf = false;
public boolean isGroupCommitStreamLoadSql = false;
private boolean isFromDeleteOrUpdateStmt = false;
@ -933,10 +935,17 @@ public class NativeInsertStmt extends InsertStmt {
}
if (targetTable instanceof OlapTable) {
checkInnerGroupCommit();
OlapTableSink sink = isGroupCommitTvf ? new GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple,
targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert())
: new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
OlapTableSink sink;
if (isGroupCommitTvf) {
sink = new GroupCommitOlapTableSink((OlapTable) targetTable, olapTuple,
targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
} else if (isGroupCommitStreamLoadSql) {
sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple,
targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
} else {
sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
}
dataSink = sink;
sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
dataPartition = dataSink.getOutputPartition();
@ -1092,7 +1101,8 @@ public class NativeInsertStmt extends InsertStmt {
streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1)
.setTbl(getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId);
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
.setGroupCommit(true);
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
StreamLoadPlanner planner = new StreamLoadPlanner((Database) getDbObj(), olapTable, streamLoadTask);
// Will using load id as query id in fragment

View File

@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.thrift.TDataSinkType;
import java.util.List;
public class GroupCommitBlockSink extends OlapTableSink {
public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
boolean singleReplicaLoad) {
super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad);
}
protected TDataSinkType getDataSinkType() {
return TDataSinkType.GROUP_COMMIT_BLOCK_SINK;
}
}

View File

@ -125,6 +125,7 @@ public class OlapTableSink extends DataSink {
tSink.setLoadId(loadId);
tSink.setTxnId(txnId);
tSink.setDbId(dbId);
tSink.setBaseSchemaVersion(dstTable.getBaseSchemaVersion());
tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
tSink.setSendBatchParallelism(sendBatchParallelism);
this.isStrictMode = isStrictMode;

View File

@ -48,6 +48,7 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@ -254,10 +255,15 @@ public class StreamLoadPlanner {
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load);
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
OlapTableSink olapTableSink;
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).isGroupCommit()) {
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load);
} else {
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
}
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(),
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
olapTableSink.complete(analyzer);
@ -463,8 +469,13 @@ public class StreamLoadPlanner {
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load);
OlapTableSink olapTableSink;
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).isGroupCommit()) {
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load);
} else {
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
}
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);

View File

@ -2090,8 +2090,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
NativeInsertStmt parsedStmt = (NativeInsertStmt) SqlParserUtils.getFirstStmt(parser);
parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
if (request.isGroupCommit() && parsedStmt.getLabel() != null) {
throw new AnalysisException("label and group_commit can't be set at the same time");
if (request.isGroupCommit()) {
if (parsedStmt.getLabel() != null) {
throw new AnalysisException("label and group_commit can't be set at the same time");
}
parsedStmt.isGroupCommitStreamLoadSql = true;
}
StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
@ -2235,13 +2238,15 @@ public class FrontendServiceImpl implements FrontendService.Iface {
StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask);
TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId(),
multiTableFragmentInstanceIdIndex);
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), request.getTxnId());
if (txnState == null) {
throw new UserException("txn does not exist: " + request.getTxnId());
if (!request.isGroupCommit()) {
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), request.getTxnId());
if (txnState == null) {
throw new UserException("txn does not exist: " + request.getTxnId());
}
txnState.addTableIndexes(table);
}
txnState.addTableIndexes(table);
return plan;
} finally {
table.readUnlock();

View File

@ -69,10 +69,10 @@ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunct
Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn();
List<Column> tableColumns = table.getBaseSchema(false);
for (int i = 1; i <= tableColumns.size(); i++) {
fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getDataType(), true));
fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true));
}
if (deleteSignColumn != null) {
fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getDataType(), true));
fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getType(), true));
}
return fileColumns;
}

View File

@ -94,6 +94,8 @@ public class StreamLoadTask implements LoadTaskInfo {
private byte escape = 0;
private boolean groupCommit = false;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType,
TFileCompressType compressType) {
this.id = id;
@ -312,6 +314,7 @@ public class StreamLoadTask implements LoadTaskInfo {
request.getFileType(), request.getFormatType(),
request.getCompressType());
streamLoadTask.setOptionalFromTSLPutRequest(request);
streamLoadTask.setGroupCommit(request.isGroupCommit());
if (request.isSetFileSize()) {
streamLoadTask.fileSize = request.getFileSize();
}
@ -519,5 +522,13 @@ public class StreamLoadTask implements LoadTaskInfo {
public double getMaxFilterRatio() {
return maxFilterRatio;
}
public void setGroupCommit(boolean groupCommit) {
this.groupCommit = groupCommit;
}
public boolean isGroupCommit() {
return groupCommit;
}
}

View File

@ -37,6 +37,7 @@ enum TDataSinkType {
JDBC_TABLE_SINK,
MULTI_CAST_DATA_STREAM_SINK,
GROUP_COMMIT_OLAP_TABLE_SINK,
GROUP_COMMIT_BLOCK_SINK,
}
enum TResultSinkType {
@ -255,6 +256,7 @@ struct TOlapTableSink {
18: optional Descriptors.TOlapTableLocationParam slave_location
19: optional i64 txn_timeout_s // timeout of load txn in second
20: optional bool write_file_cache
21: optional i64 base_schema_version
}
struct TDataSink {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -49,6 +49,26 @@ suite("test_group_commit_http_stream") {
return false
}
def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertTrue(json.GroupCommit)
assertTrue(json.Label.startsWith("group_commit_"))
assertEquals(total_rows, json.NumberTotalRows)
assertEquals(loaded_rows, json.NumberLoadedRows)
assertEquals(filtered_rows, json.NumberFilteredRows)
assertEquals(unselected_rows, json.NumberUnselectedRows)
if (filtered_rows > 0) {
assertFalse(json.ErrorURL.isEmpty())
} else {
assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
}
}
try {
// create table
sql """ drop table if exists ${tableName}; """
@ -85,6 +105,10 @@ suite("test_group_commit_http_stream") {
unset 'label'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 4, 4, 0, 0)
}
}
}
@ -101,6 +125,10 @@ suite("test_group_commit_http_stream") {
unset 'label'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 2, 2, 0, 0)
}
}
// stream load with different column order
@ -116,6 +144,10 @@ suite("test_group_commit_http_stream") {
unset 'label'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 2, 2, 0, 0)
}
}
// stream load with where condition
@ -133,17 +165,8 @@ suite("test_group_commit_http_stream") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertTrue(json.GroupCommit)
// assertEquals(2, json.NumberTotalRows)
assertEquals(1, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
// assertEquals(1, json.NumberUnselectedRows)
// TODO different with stream load: 2, 1, 0, 1
checkStreamLoadResult(exception, result, 1, 1, 0, 0)
}
}
@ -160,6 +183,10 @@ suite("test_group_commit_http_stream") {
unset 'label'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 2, 2, 0, 0)
}
}
// stream load with filtered rows
@ -179,18 +206,8 @@ suite("test_group_commit_http_stream") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertTrue(json.GroupCommit)
// assertEquals(6, json.NumberTotalRows)
// assertEquals(2, json.NumberLoadedRows)
// assertEquals(3, json.NumberFilteredRows)
// assertEquals(1, json.NumberUnselectedRows)
// assertFalse(json.ErrorURL.isEmpty())
// TODO different with stream load: 6, 2, 3, 1
checkStreamLoadResult(exception, result, 6, 4, 2, 0)
}
}
@ -217,7 +234,7 @@ suite("test_group_commit_http_stream") {
}
}
getRowCount(7)
getRowCount(23)
qt_sql " SELECT * FROM ${tableName} order by id, name, score asc; "
} finally {
// try_sql("DROP TABLE ${tableName}")
@ -301,18 +318,7 @@ suite("test_group_commit_http_stream") {
// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load ${i}, result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
if (json.NumberLoadedRows != 600572) {
logger.warn("Stream load ${i}, loaded rows: ${json.NumberLoadedRows}")
}
assertTrue(json.LoadBytes > 0)
assertTrue(json.GroupCommit)
checkStreamLoadResult(exception, result, 600572, 600572, 0, 0)
}
}
}

View File

@ -48,6 +48,26 @@ suite("test_group_commit_stream_load") {
return false
}
def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertTrue(json.GroupCommit)
assertTrue(json.Label.startsWith("group_commit_"))
assertEquals(total_rows, json.NumberTotalRows)
assertEquals(loaded_rows, json.NumberLoadedRows)
assertEquals(filtered_rows, json.NumberFilteredRows)
assertEquals(unselected_rows, json.NumberUnselectedRows)
if (filtered_rows > 0) {
assertFalse(json.ErrorURL.isEmpty())
} else {
assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty())
}
}
try {
// create table
sql """ drop table if exists ${tableName}; """
@ -84,6 +104,10 @@ suite("test_group_commit_stream_load") {
unset 'label'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 4, 4, 0, 0)
}
}
}
@ -98,6 +122,10 @@ suite("test_group_commit_stream_load") {
unset 'label'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 2, 2, 0, 0)
}
}
// stream load with different column order
@ -111,6 +139,10 @@ suite("test_group_commit_stream_load") {
unset 'label'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 2, 2, 0, 0)
}
}
// stream load with where condition
@ -127,17 +159,7 @@ suite("test_group_commit_stream_load") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertTrue(json.GroupCommit)
assertEquals(2, json.NumberTotalRows)
assertEquals(1, json.NumberLoadedRows)
assertEquals(0, json.NumberFilteredRows)
assertEquals(1, json.NumberUnselectedRows)
checkStreamLoadResult(exception, result, 2, 1, 0, 1)
}
}
@ -152,6 +174,10 @@ suite("test_group_commit_stream_load") {
unset 'label'
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
checkStreamLoadResult(exception, result, 2, 2, 0, 0)
}
}
// stream load with filtered rows
@ -168,18 +194,7 @@ suite("test_group_commit_stream_load") {
time 10000 // limit inflight 10s
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertTrue(json.GroupCommit)
assertEquals(6, json.NumberTotalRows)
assertEquals(2, json.NumberLoadedRows)
assertEquals(3, json.NumberFilteredRows)
assertEquals(1, json.NumberUnselectedRows)
assertFalse(json.ErrorURL.isEmpty())
checkStreamLoadResult(exception, result, 6, 2, 3, 1)
}
}
@ -286,19 +301,7 @@ suite("test_group_commit_stream_load") {
// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load ${i}, result: ${result}")
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
if (json.NumberLoadedRows != 600572) {
logger.warn("Stream load ${i}, loaded rows: ${json.NumberLoadedRows}")
}
// assertEquals(json.NumberLoadedRows, 600572)
assertTrue(json.LoadBytes > 0)
assertTrue(json.GroupCommit)
checkStreamLoadResult(exception, result, 600572, 600572, 0, 0)
}
}
}