[fix](group commit) group commit may heap-use-after-free if execute plan failed (#31839)
This commit is contained in:
@ -212,10 +212,9 @@ Status GroupCommitTable::get_first_block_load_queue(
|
||||
if (!_is_creating_plan_fragment) {
|
||||
_is_creating_plan_fragment = true;
|
||||
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
|
||||
auto st = _create_group_commit_load(load_block_queue, be_exe_version);
|
||||
auto st = _create_group_commit_load(be_exe_version);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
|
||||
load_block_queue.reset();
|
||||
std::unique_lock l(_lock);
|
||||
_is_creating_plan_fragment = false;
|
||||
_cv.notify_all();
|
||||
@ -223,26 +222,13 @@ Status GroupCommitTable::get_first_block_load_queue(
|
||||
}));
|
||||
}
|
||||
_cv.wait_for(l, std::chrono::seconds(4));
|
||||
if (load_block_queue != nullptr) {
|
||||
if (load_block_queue->schema_version == base_schema_version) {
|
||||
if (load_block_queue->add_load_id(load_id).ok()) {
|
||||
return Status::OK();
|
||||
}
|
||||
} else if (base_schema_version < load_block_queue->schema_version) {
|
||||
return Status::DataQualityError<false>(
|
||||
"schema version not match, maybe a schema change is in process. Please "
|
||||
"retry this load manually.");
|
||||
}
|
||||
load_block_queue.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
return Status::InternalError<false>("can not get a block queue for table_id: " +
|
||||
std::to_string(_table_id));
|
||||
}
|
||||
|
||||
Status GroupCommitTable::_create_group_commit_load(
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) {
|
||||
Status GroupCommitTable::_create_group_commit_load(int be_exe_version) {
|
||||
Status st = Status::OK();
|
||||
TStreamLoadPutRequest request;
|
||||
UniqueId load_id = UniqueId::gen_uid();
|
||||
@ -305,7 +291,7 @@ Status GroupCommitTable::_create_group_commit_load(
|
||||
<< ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id)
|
||||
<< ", is_pipeline=" << is_pipeline;
|
||||
{
|
||||
load_block_queue = std::make_shared<LoadBlockQueue>(
|
||||
auto load_block_queue = std::make_shared<LoadBlockQueue>(
|
||||
instance_id, label, txn_id, schema_version, _all_block_queues_bytes,
|
||||
result.wait_internal_group_commit_finish, result.group_commit_interval_ms,
|
||||
result.group_commit_data_bytes);
|
||||
|
||||
Reference in New Issue
Block a user