@ -186,7 +186,8 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) {
|
||||
|
||||
Status GroupCommitTable::get_first_block_load_queue(
|
||||
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version) {
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
|
||||
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
|
||||
DCHECK(table_id == _table_id);
|
||||
{
|
||||
std::unique_lock l(_lock);
|
||||
@ -212,7 +213,7 @@ Status GroupCommitTable::get_first_block_load_queue(
|
||||
if (!_is_creating_plan_fragment) {
|
||||
_is_creating_plan_fragment = true;
|
||||
RETURN_IF_ERROR(_thread_pool->submit_func([&] {
|
||||
auto st = _create_group_commit_load(be_exe_version);
|
||||
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
|
||||
if (!st.ok()) {
|
||||
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
|
||||
std::unique_lock l(_lock);
|
||||
@ -228,7 +229,9 @@ Status GroupCommitTable::get_first_block_load_queue(
|
||||
std::to_string(_table_id));
|
||||
}
|
||||
|
||||
Status GroupCommitTable::_create_group_commit_load(int be_exe_version) {
|
||||
Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
|
||||
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker);
|
||||
Status st = Status::OK();
|
||||
TStreamLoadPutRequest request;
|
||||
UniqueId load_id = UniqueId::gen_uid();
|
||||
@ -483,7 +486,8 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i
|
||||
int64_t base_schema_version,
|
||||
const UniqueId& load_id,
|
||||
std::shared_ptr<LoadBlockQueue>& load_block_queue,
|
||||
int be_exe_version) {
|
||||
int be_exe_version,
|
||||
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
|
||||
std::shared_ptr<GroupCommitTable> group_commit_table;
|
||||
{
|
||||
std::lock_guard wlock(_lock);
|
||||
@ -495,7 +499,7 @@ Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t table_i
|
||||
group_commit_table = _table_map[table_id];
|
||||
}
|
||||
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
|
||||
table_id, base_schema_version, load_id, load_block_queue, be_exe_version));
|
||||
table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user