[fix](group_commit) GroupCommitBlockSink shoud not use load_block_queue when creating load task fail (#31416)

This commit is contained in:
huanghaibin
2024-02-28 21:31:17 +08:00
committed by yiguolei
parent 60de835d48
commit 770cdabda3
2 changed files with 16 additions and 4 deletions

View File

@ -212,8 +212,11 @@ Status GroupCommitTable::get_first_block_load_queue(
if (!_need_plan_fragment) {
_need_plan_fragment = true;
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
[[maybe_unused]] auto st =
_create_group_commit_load(load_block_queue, be_exe_version);
auto st = _create_group_commit_load(load_block_queue, be_exe_version);
if (!st.ok()) {
LOG(WARNING) << "fail to create block queue,st=" << st.to_string();
load_block_queue.reset();
}
}));
}
_cv.wait_for(l, std::chrono::seconds(4));
@ -311,8 +314,6 @@ Status GroupCommitTable::_create_group_commit_load(
result.wait_internal_group_commit_finish, result.group_commit_interval_ms,
result.group_commit_data_bytes);
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
//create wal
if (!is_pipeline) {
RETURN_IF_ERROR(load_block_queue->create_wal(
@ -324,6 +325,8 @@ Status GroupCommitTable::_create_group_commit_load(
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
be_exe_version));
}
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
_cv.notify_all();
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params,