[branch-2.1](memory) pick reserve memory and workload group (#40543)

1. pick #38494
2. pick #39862
3. remove vdata_stream_test, master has been removed
This commit is contained in:
Xinyi Zou
2024-09-09 21:16:06 +08:00
committed by GitHub
parent 68d1064aa9
commit 354967c09f
12 changed files with 150 additions and 120 deletions

View File

@ -400,7 +400,7 @@ void Daemon::wg_weighted_memory_ratio_refresh_thread() {
// Refresh weighted memory ratio of workload groups
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) {
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_ratio();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();
}
}

View File

@ -368,19 +368,18 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_
}
return false;
} else if (is_wg_mem_low_water_mark) {
int64_t query_weighted_limit = 0;
int64_t query_weighted_consumption = 0;
query_ctx->get_weighted_memory(query_weighted_limit, query_weighted_consumption);
if (query_weighted_consumption < query_weighted_limit) {
int64_t spill_threshold = query_ctx->spill_threshold();
int64_t memory_usage = query_ctx->query_mem_tracker->consumption();
if (spill_threshold == 0 || memory_usage < spill_threshold) {
return false;
}
auto big_memory_operator_num = query_ctx->get_running_big_mem_op_num();
DCHECK(big_memory_operator_num >= 0);
int64_t mem_limit_of_op;
if (0 == big_memory_operator_num) {
mem_limit_of_op = int64_t(query_weighted_limit * 0.8);
mem_limit_of_op = int64_t(spill_threshold * 0.8);
} else {
mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
mem_limit_of_op = spill_threshold / big_memory_operator_num;
}
VLOG_DEBUG << "revoke memory, low water mark, revocable_mem_bytes: "

View File

@ -40,7 +40,7 @@ std::atomic<int64_t> GlobalMemoryArbitrator::_s_process_reserved_memory = 0;
std::atomic<int64_t> GlobalMemoryArbitrator::refresh_interval_memory_growth = 0;
bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) {
if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) {
return false;
}
int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed);
@ -50,7 +50,7 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (UNLIKELY(vm_rss_sub_allocator_cache() +
refresh_interval_memory_growth.load(std::memory_order_relaxed) +
new_reserved_mem >=
MemInfo::mem_limit())) {
MemInfo::soft_mem_limit())) {
return false;
}
} while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem,

View File

@ -95,7 +95,8 @@ public:
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();
bool try_reserve(int64_t size);
doris::Status try_reserve(int64_t size);
void release_reserved();
bool is_attach_query() { return _query_id != TUniqueId(); }
@ -295,7 +296,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
_stop_consume = false;
}
inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
DCHECK(_limiter_tracker_raw);
DCHECK(size >= 0);
CHECK(init());
@ -303,19 +304,29 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
// _untracked_mem store bytes that not synchronized to process reserved memory.
flush_untracked_mem();
if (!_limiter_tracker_raw->try_consume(size)) {
return false;
auto err_msg = fmt::format(
"reserve memory failed, size: {}, because memory tracker consumption: {}, limit: "
"{}",
size, _limiter_tracker_raw->consumption(), _limiter_tracker_raw->limit());
return doris::Status::MemoryLimitExceeded(err_msg);
}
auto wg_ptr = _wg_wptr.lock();
if (!wg_ptr) {
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size,
wg_ptr->memory_debug_string());
_limiter_tracker_raw->release(size); // rollback
return false;
return doris::Status::MemoryLimitExceeded(err_msg);
}
}
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
_limiter_tracker_raw->release(size); // rollback
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
return false;
auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size,
GlobalMemoryArbitrator::process_mem_log_str());
_limiter_tracker_raw->release(size); // rollback
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
}
return doris::Status::MemoryLimitExceeded(err_msg);
}
if (_count_scope_mem) {
_scope_mem += size;
@ -324,7 +335,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
tracker->consume(size);
}
_reserved_mem += size;
return true;
return doris::Status::OK();
}
inline void ThreadMemTrackerMgr::release_reserved() {
@ -333,7 +344,7 @@ inline void ThreadMemTrackerMgr::release_reserved() {
_untracked_mem);
_limiter_tracker_raw->release(_reserved_mem);
auto wg_ptr = _wg_wptr.lock();
if (!wg_ptr) {
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem);
}
if (_count_scope_mem) {

View File

@ -269,18 +269,8 @@ public:
return _running_big_mem_op_num.load(std::memory_order_relaxed);
}
void set_weighted_memory(int64_t weighted_limit, double weighted_ratio) {
std::lock_guard<std::mutex> l(_weighted_mem_lock);
_weighted_limit = weighted_limit;
_weighted_ratio = weighted_ratio;
}
void get_weighted_memory(int64_t& weighted_limit, int64_t& weighted_consumption) {
std::lock_guard<std::mutex> l(_weighted_mem_lock);
weighted_limit = _weighted_limit;
weighted_consumption = int64_t(query_mem_tracker->consumption() * _weighted_ratio);
}
void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; }
int64_t spill_threshold() { return _spill_threshold; }
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
std::string user;
@ -366,9 +356,7 @@ private:
std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
std::mutex _pipeline_map_write_lock;
std::mutex _weighted_mem_lock;
double _weighted_ratio = 0;
int64_t _weighted_limit = 0;
std::atomic<int64_t> _spill_threshold {0};
timespec _query_arrival_timestamp;
// Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to
// help us manage the query.

View File

@ -241,7 +241,7 @@ public:
thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
}
bool try_reserve_memory(const int64_t size) const {
doris::Status try_reserve_memory(const int64_t size) const {
#ifdef USE_MEM_TRACKER
DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check ||
thread_mem_tracker()->label() != "Orphan")

View File

@ -98,6 +98,21 @@ std::string WorkloadGroup::debug_string() const {
_scan_bytes_per_second, _remote_scan_bytes_per_second);
}
std::string WorkloadGroup::memory_debug_string() const {
return fmt::format(
"TG[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = "
"{}, weighted_memory_limit = {}, total_mem_used = {}, "
"wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, "
"spill_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]",
_id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
_enable_memory_overcommit ? "true" : "false",
PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES),
PrettyPrinter::print(_total_mem_used, TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth, TUnit::BYTES),
_spill_low_watermark, _spill_high_watermark, _version, _is_shutdown,
_query_ctxs.size());
}
void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
if (UNLIKELY(tg_info.id != _id)) {
return;
@ -162,10 +177,6 @@ void WorkloadGroup::refresh_memory(int64_t used_memory) {
_wg_refresh_interval_memory_growth.store(0.0);
}
void WorkloadGroup::set_weighted_memory_ratio(double ratio) {
_weighted_mem_ratio = ratio;
}
void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
auto group_num = mem_tracker_ptr->group_num();

View File

@ -79,6 +79,12 @@ public:
return _memory_limit;
};
int64_t weighted_memory_limit() const { return _weighted_memory_limit; };
void set_weighted_memory_limit(int64_t weighted_memory_limit) {
_weighted_memory_limit = weighted_memory_limit;
}
// make memory snapshots and refresh total memory used at the same time.
int64_t make_memory_tracker_snapshots(
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
@ -95,13 +101,11 @@ public:
void set_weighted_memory_ratio(double ratio);
bool add_wg_refresh_interval_memory_growth(int64_t size) {
// `weighted_mem_used` is a rough memory usage in this group,
// because we can only get a precise memory usage by MemTracker which is not include page cache.
auto weighted_mem_used =
int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load() + size) *
_weighted_mem_ratio);
if ((weighted_mem_used > ((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
auto realtime_total_mem_used =
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
return false;
} else {
_wg_refresh_interval_memory_growth.fetch_add(size);
@ -113,20 +117,17 @@ public:
}
void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const {
// `weighted_mem_used` is a rough memory usage in this group,
// because we can only get a precise memory usage by MemTracker which is not include page cache.
auto weighted_mem_used =
int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load()) *
_weighted_mem_ratio);
*is_low_wartermark =
(weighted_mem_used > ((double)_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark =
(weighted_mem_used > ((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
*is_low_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark = (realtime_total_mem_used >
((double)_weighted_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
}
std::string debug_string() const;
std::string memory_debug_string() const;
void check_and_update(const WorkloadGroupInfo& tg_info);
@ -215,10 +216,11 @@ private:
std::string _name;
int64_t _version;
int64_t _memory_limit; // bytes
// `weighted_memory_limit` less than or equal to _memory_limit, calculate after exclude public memory.
// more detailed description in `refresh_wg_weighted_memory_limit`.
std::atomic<int64_t> _weighted_memory_limit {0}; //
// last value of make_memory_tracker_snapshots, refresh every time make_memory_tracker_snapshots is called.
std::atomic_int64_t _total_mem_used = 0; // bytes
// last value of refresh_wg_weighted_memory_ratio.
std::atomic<double> _weighted_mem_ratio = 0.0;
std::atomic_int64_t _wg_refresh_interval_memory_growth;
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;

View File

@ -153,86 +153,101 @@ struct WorkloadGroupMemInfo {
std::list<std::shared_ptr<MemTrackerLimiter>>();
};
void WorkloadGroupMgr::refresh_wg_weighted_memory_ratio() {
void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
// 1. make all workload groups memory snapshots(refresh workload groups total memory used at the same time)
// and calculate total memory used of all queries.
int64_t all_queries_mem_used = 0;
int64_t all_workload_groups_mem_usage = 0;
std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
for (auto& [wg_id, wg] : _workload_groups) {
wgs_mem_info[wg_id].total_mem_used =
wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots);
all_queries_mem_used += wgs_mem_info[wg_id].total_mem_used;
all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used;
}
if (all_queries_mem_used <= 0) {
if (all_workload_groups_mem_usage <= 0) {
return;
}
// 2. calculate weighted ratio.
// process memory used is actually bigger than all_queries_mem_used,
// because memory of page cache, allocator cache, segment cache etc. are included
// in proc_vm_rss.
// we count these cache memories equally on workload groups.
// 2. calculate weighted memory limit ratio.
// when construct workload group, mem_limit is equal to (process_memory_limit * group_limit_percent),
// here, it is assumed that the available memory of workload groups is equal to process_memory_limit.
//
// but process_memory_usage is actually bigger than all_workload_groups_mem_usage,
// because public_memory of page cache, allocator cache, segment cache etc. are included in process_memory_usage.
// so actual available memory of the workload groups is equal to (process_memory_limit - public_memory)
//
// we will exclude this public_memory when calculate workload group mem_limit.
// so a ratio is calculated to multiply the workload group mem_limit from the previous construction.
auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage();
all_queries_mem_used = std::min(process_memory_usage, all_queries_mem_used);
double ratio = (double)process_memory_usage / (double)all_queries_mem_used;
if (ratio <= 1.25) {
std::string debug_msg =
fmt::format("\nProcess Memory Summary: {}, {}, all quries mem: {}",
doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
LOG_EVERY_T(INFO, 10) << debug_msg;
auto process_memory_limit = MemInfo::mem_limit();
double weighted_memory_limit_ratio = 1;
// if all_workload_groups_mem_usage is greater than process_memory_usage, it means that the memory statistics
// of the workload group are inaccurate.
// the reason is that query/load/etc. tracked is virtual memory, and virtual memory is not used in time.
//
// At this time, weighted_memory_limit_ratio is equal to 1, and workload group mem_limit is still equal to
// (process_memory_limit * group_limit_percent), this may cause query spill to occur earlier,
// However, there is no good solution at present, but we cannot predict when these virtual memory will be used.
if (all_workload_groups_mem_usage < process_memory_usage) {
int64_t public_memory = process_memory_usage - all_workload_groups_mem_usage;
weighted_memory_limit_ratio = 1 - (double)public_memory / (double)process_memory_limit;
}
for (auto& wg : _workload_groups) {
// 3.1 calculate query weighted memory limit of task group
auto wg_mem_limit = wg.second->memory_limit();
auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size();
int64_t query_weighted_mem_limit =
wg_query_count ? (wg_mem_limit + wg_query_count) / wg_query_count : wg_mem_limit;
std::string debug_msg = fmt::format(
"\nProcess Memory Summary: {}, {}, all workload groups memory usage: {}, "
"weighted_memory_limit_ratio: {}",
doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
weighted_memory_limit_ratio);
LOG_EVERY_T(INFO, 10) << debug_msg;
// 3.2 set all workload groups weighted memory ratio and all query weighted memory limit and ratio.
wg.second->set_weighted_memory_ratio(ratio);
for (auto& wg : _workload_groups) {
// 3.1 calculate query spill threshold of task group
auto wg_weighted_mem_limit =
int64_t(wg.second->memory_limit() * weighted_memory_limit_ratio);
wg.second->set_weighted_memory_limit(wg_weighted_mem_limit);
// 3.2 set workload groups weighted memory limit and all query spill threshold.
auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size();
int64_t query_spill_threshold =
wg_query_count ? (wg_weighted_mem_limit + wg_query_count) / wg_query_count
: wg_weighted_mem_limit;
for (const auto& query : wg.second->queries()) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
continue;
}
query_ctx->set_weighted_memory(query_weighted_mem_limit, ratio);
query_ctx->set_spill_threshold(query_spill_threshold);
}
// 3.3 only print debug logs, if workload groups is_high_wartermark or is_low_wartermark.
auto weighted_mem_used = int64_t(wgs_mem_info[wg.first].total_mem_used * ratio);
bool is_high_wartermark =
(weighted_mem_used >
((double)wg_mem_limit * wg.second->spill_threashold_high_water_mark() / 100));
bool is_low_wartermark =
(weighted_mem_used >
((double)wg_mem_limit * wg.second->spill_threshold_low_water_mark() / 100));
bool is_low_wartermark = false;
bool is_high_wartermark = false;
wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark);
std::string debug_msg;
if (is_high_wartermark || is_low_wartermark) {
debug_msg = fmt::format(
"\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem used: {}, used "
"ratio: {}, query "
"count: {}, query_weighted_mem_limit: {}",
wg.second->name(), PrettyPrinter::print(wg_mem_limit, TUnit::BYTES),
"\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem limit: {}, "
"used "
"ratio: {}, query count: {}, query spill threshold: {}",
wg.second->name(),
PrettyPrinter::print(wg.second->memory_limit(), TUnit::BYTES),
PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES),
PrettyPrinter::print(weighted_mem_used, TUnit::BYTES),
(double)weighted_mem_used / wg_mem_limit, wg_query_count,
PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES));
PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
(double)wgs_mem_info[wg.first].total_mem_used / wg_weighted_mem_limit,
wg_query_count, PrettyPrinter::print(query_spill_threshold, TUnit::BYTES));
debug_msg += "\n Query Memory Summary:";
// check whether queries need to revoke memory for task group
for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) {
debug_msg += fmt::format(
"\n MemTracker Label={}, Parent Label={}, Used={}, WeightedUsed={}, "
"\n MemTracker Label={}, Parent Label={}, Used={}, SpillThreshold={}, "
"Peak={}",
query_mem_tracker->label(), query_mem_tracker->parent_label(),
PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES),
PrettyPrinter::print(int64_t(query_mem_tracker->consumption() * ratio),
TUnit::BYTES),
PrettyPrinter::print(query_spill_threshold, TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
}
LOG_EVERY_T(INFO, 1) << debug_msg;

View File

@ -58,7 +58,7 @@ public:
bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
void refresh_wg_weighted_memory_ratio();
void refresh_wg_weighted_memory_limit();
void get_wg_resource_usage(vectorized::Block* block);

View File

@ -56,6 +56,7 @@ list(REMOVE_ITEM UT_FILES
${CMAKE_CURRENT_SOURCE_DIR}/olap/remote_rowset_gc_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/runtime/jsonb_value_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/runtime/large_int_value_test.cpp
${CMAKE_CURRENT_SOURCE_DIR}/vec/runtime/vdata_stream_test.cpp
)
list(APPEND UT_FILES

View File

@ -87,7 +87,7 @@ TEST_F(ThreadMemTrackerMgrTest, ConsumeMemory) {
EXPECT_EQ(t->consumption(), 0); // detach automatic call flush_untracked_mem.
}
TEST(ThreadMemTrackerMgrTest, Boundary) {
TEST_F(ThreadMemTrackerMgrTest, Boundary) {
// TODO, Boundary check may not be necessary, add some `IF` maybe increase cost time.
}
@ -264,7 +264,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
thread_context->consume_memory(size2);
EXPECT_EQ(t->consumption(), size1 + size2);
thread_context->try_reserve_memory(size3);
auto st = thread_context->try_reserve_memory(size3);
EXPECT_TRUE(st.ok());
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
@ -284,14 +285,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 + size1);
std::cout << "11111 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
<< thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl;
thread_context->consume_memory(size1);
thread_context->consume_memory(size1);
std::cout << "2222 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
<< thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl;
std::cout << "3333 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
<< thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl;
// reserved memory used done
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
@ -308,7 +303,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
EXPECT_EQ(t->consumption(), size1 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
thread_context->try_reserve_memory(size3);
st = thread_context->try_reserve_memory(size3);
EXPECT_TRUE(st.ok());
EXPECT_EQ(t->consumption(), size1 + size2 + size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
@ -358,7 +354,8 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
int64_t size3 = size2 * 1024;
thread_context->attach_task(TUniqueId(), t, workload_group);
thread_context->try_reserve_memory(size3);
auto st = thread_context->try_reserve_memory(size3);
EXPECT_TRUE(st.ok());
EXPECT_EQ(t->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
@ -369,15 +366,18 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
EXPECT_EQ(t->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->try_reserve_memory(size2);
st = thread_context->try_reserve_memory(size2);
EXPECT_TRUE(st.ok());
// ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2
// ThreadMemTrackerMgr _untracked_mem = 0
EXPECT_EQ(t->consumption(), size3 + size2);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
size3); // size3 - size2 + size2
thread_context->try_reserve_memory(size3);
thread_context->try_reserve_memory(size3);
st = thread_context->try_reserve_memory(size3);
EXPECT_TRUE(st.ok());
st = thread_context->try_reserve_memory(size3);
EXPECT_TRUE(st.ok());
thread_context->consume_memory(size3);
thread_context->consume_memory(size2);
thread_context->consume_memory(size3);
@ -411,13 +411,15 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) {
int64_t size3 = size2 * 1024;
thread_context->attach_task(TUniqueId(), t1, workload_group);
thread_context->try_reserve_memory(size3);
auto st = thread_context->try_reserve_memory(size3);
EXPECT_TRUE(st.ok());
thread_context->consume_memory(size2);
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
thread_context->try_reserve_memory(size3);
st = thread_context->try_reserve_memory(size3);
EXPECT_TRUE(st.ok());
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3);
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2 + size3);
@ -428,7 +430,8 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) {
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2);
thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
thread_context->try_reserve_memory(size3);
st = thread_context->try_reserve_memory(size3);
EXPECT_TRUE(st.ok());
EXPECT_EQ(t1->consumption(), size3);
EXPECT_EQ(t2->consumption(), size3 + size2);
EXPECT_EQ(t3->consumption(), size3);