diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 399728956e..8457296885 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -400,7 +400,7 @@ void Daemon::wg_weighted_memory_ratio_refresh_thread() { // Refresh weighted memory ratio of workload groups while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) { - doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_ratio(); + doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit(); } } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 580c425884..00464e59f2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -368,19 +368,18 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_ } return false; } else if (is_wg_mem_low_water_mark) { - int64_t query_weighted_limit = 0; - int64_t query_weighted_consumption = 0; - query_ctx->get_weighted_memory(query_weighted_limit, query_weighted_consumption); - if (query_weighted_consumption < query_weighted_limit) { + int64_t spill_threshold = query_ctx->spill_threshold(); + int64_t memory_usage = query_ctx->query_mem_tracker->consumption(); + if (spill_threshold == 0 || memory_usage < spill_threshold) { return false; } auto big_memory_operator_num = query_ctx->get_running_big_mem_op_num(); DCHECK(big_memory_operator_num >= 0); int64_t mem_limit_of_op; if (0 == big_memory_operator_num) { - mem_limit_of_op = int64_t(query_weighted_limit * 0.8); + mem_limit_of_op = int64_t(spill_threshold * 0.8); } else { - mem_limit_of_op = query_weighted_limit / big_memory_operator_num; + mem_limit_of_op = spill_threshold / big_memory_operator_num; } VLOG_DEBUG << "revoke memory, low water mark, revocable_mem_bytes: " diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp b/be/src/runtime/memory/global_memory_arbitrator.cpp index 35fa350987..344bcbc598 100644 --- a/be/src/runtime/memory/global_memory_arbitrator.cpp +++ b/be/src/runtime/memory/global_memory_arbitrator.cpp @@ -40,7 +40,7 @@ std::atomic GlobalMemoryArbitrator::_s_process_reserved_memory = 0; std::atomic GlobalMemoryArbitrator::refresh_interval_memory_growth = 0; bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { - if (sys_mem_available() - bytes < MemInfo::sys_mem_available_low_water_mark()) { + if (sys_mem_available() - bytes < MemInfo::sys_mem_available_warning_water_mark()) { return false; } int64_t old_reserved_mem = _s_process_reserved_memory.load(std::memory_order_relaxed); @@ -50,7 +50,7 @@ bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) { if (UNLIKELY(vm_rss_sub_allocator_cache() + refresh_interval_memory_growth.load(std::memory_order_relaxed) + new_reserved_mem >= - MemInfo::mem_limit())) { + MemInfo::soft_mem_limit())) { return false; } } while (!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem, new_reserved_mem, diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 767cfdae74..50a8a5e0f7 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -95,7 +95,8 @@ public: void consume(int64_t size, int skip_large_memory_check = 0); void flush_untracked_mem(); - bool try_reserve(int64_t size); + doris::Status try_reserve(int64_t size); + void release_reserved(); bool is_attach_query() { return _query_id != TUniqueId(); } @@ -295,7 +296,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { _stop_consume = false; } -inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { +inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { DCHECK(_limiter_tracker_raw); DCHECK(size >= 0); CHECK(init()); @@ -303,19 +304,29 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); if (!_limiter_tracker_raw->try_consume(size)) { - return false; + auto err_msg = fmt::format( + "reserve memory failed, size: {}, because memory tracker consumption: {}, limit: " + "{}", + size, _limiter_tracker_raw->consumption(), _limiter_tracker_raw->limit()); + return doris::Status::MemoryLimitExceeded(err_msg); } auto wg_ptr = _wg_wptr.lock(); - if (!wg_ptr) { + if (wg_ptr) { if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) { + auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, + wg_ptr->memory_debug_string()); _limiter_tracker_raw->release(size); // rollback - return false; + return doris::Status::MemoryLimitExceeded(err_msg); } } if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) { - _limiter_tracker_raw->release(size); // rollback - wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback - return false; + auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size, + GlobalMemoryArbitrator::process_mem_log_str()); + _limiter_tracker_raw->release(size); // rollback + if (wg_ptr) { + wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback + } + return doris::Status::MemoryLimitExceeded(err_msg); } if (_count_scope_mem) { _scope_mem += size; @@ -324,7 +335,7 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) { tracker->consume(size); } _reserved_mem += size; - return true; + return doris::Status::OK(); } inline void ThreadMemTrackerMgr::release_reserved() { @@ -333,7 +344,7 @@ inline void ThreadMemTrackerMgr::release_reserved() { _untracked_mem); _limiter_tracker_raw->release(_reserved_mem); auto wg_ptr = _wg_wptr.lock(); - if (!wg_ptr) { + if (wg_ptr) { wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem); } if (_count_scope_mem) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 8c892bf9f7..83b934df58 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -269,18 +269,8 @@ public: return _running_big_mem_op_num.load(std::memory_order_relaxed); } - void set_weighted_memory(int64_t weighted_limit, double weighted_ratio) { - std::lock_guard l(_weighted_mem_lock); - _weighted_limit = weighted_limit; - _weighted_ratio = weighted_ratio; - } - - void get_weighted_memory(int64_t& weighted_limit, int64_t& weighted_consumption) { - std::lock_guard l(_weighted_mem_lock); - weighted_limit = _weighted_limit; - weighted_consumption = int64_t(query_mem_tracker->consumption() * _weighted_ratio); - } - + void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; } + int64_t spill_threshold() { return _spill_threshold; } DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; std::string user; @@ -366,9 +356,7 @@ private: std::map> _fragment_id_to_pipeline_ctx; std::mutex _pipeline_map_write_lock; - std::mutex _weighted_mem_lock; - double _weighted_ratio = 0; - int64_t _weighted_limit = 0; + std::atomic _spill_threshold {0}; timespec _query_arrival_timestamp; // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to // help us manage the query. diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 84d0ccfe24..d1aede848a 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -241,7 +241,7 @@ public: thread_mem_tracker_mgr->consume(size, skip_large_memory_check); } - bool try_reserve_memory(const int64_t size) const { + doris::Status try_reserve_memory(const int64_t size) const { #ifdef USE_MEM_TRACKER DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || thread_mem_tracker()->label() != "Orphan") diff --git a/be/src/runtime/workload_group/workload_group.cpp b/be/src/runtime/workload_group/workload_group.cpp index 57a855c9bf..6329556369 100644 --- a/be/src/runtime/workload_group/workload_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -98,6 +98,21 @@ std::string WorkloadGroup::debug_string() const { _scan_bytes_per_second, _remote_scan_bytes_per_second); } +std::string WorkloadGroup::memory_debug_string() const { + return fmt::format( + "TG[id = {}, name = {}, memory_limit = {}, enable_memory_overcommit = " + "{}, weighted_memory_limit = {}, total_mem_used = {}, " + "wg_refresh_interval_memory_growth = {}, spill_low_watermark = {}, " + "spill_high_watermark = {}, version = {}, is_shutdown = {}, query_num = {}]", + _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES), + _enable_memory_overcommit ? "true" : "false", + PrettyPrinter::print(_weighted_memory_limit, TUnit::BYTES), + PrettyPrinter::print(_total_mem_used, TUnit::BYTES), + PrettyPrinter::print(_wg_refresh_interval_memory_growth, TUnit::BYTES), + _spill_low_watermark, _spill_high_watermark, _version, _is_shutdown, + _query_ctxs.size()); +} + void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { if (UNLIKELY(tg_info.id != _id)) { return; @@ -162,10 +177,6 @@ void WorkloadGroup::refresh_memory(int64_t used_memory) { _wg_refresh_interval_memory_growth.store(0.0); } -void WorkloadGroup::set_weighted_memory_ratio(double ratio) { - _weighted_mem_ratio = ratio; -} - void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { std::unique_lock wlock(_mutex); auto group_num = mem_tracker_ptr->group_num(); diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 9ad3d6e62a..5d6b201eaa 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -79,6 +79,12 @@ public: return _memory_limit; }; + int64_t weighted_memory_limit() const { return _weighted_memory_limit; }; + + void set_weighted_memory_limit(int64_t weighted_memory_limit) { + _weighted_memory_limit = weighted_memory_limit; + } + // make memory snapshots and refresh total memory used at the same time. int64_t make_memory_tracker_snapshots( std::list>* tracker_snapshots); @@ -95,13 +101,11 @@ public: void set_weighted_memory_ratio(double ratio); bool add_wg_refresh_interval_memory_growth(int64_t size) { - // `weighted_mem_used` is a rough memory usage in this group, - // because we can only get a precise memory usage by MemTracker which is not include page cache. - auto weighted_mem_used = - int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load() + size) * - _weighted_mem_ratio); - if ((weighted_mem_used > ((double)_memory_limit * - _spill_high_watermark.load(std::memory_order_relaxed) / 100))) { + auto realtime_total_mem_used = + _total_mem_used + _wg_refresh_interval_memory_growth.load() + size; + if ((realtime_total_mem_used > + ((double)_weighted_memory_limit * + _spill_high_watermark.load(std::memory_order_relaxed) / 100))) { return false; } else { _wg_refresh_interval_memory_growth.fetch_add(size); @@ -113,20 +117,17 @@ public: } void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const { - // `weighted_mem_used` is a rough memory usage in this group, - // because we can only get a precise memory usage by MemTracker which is not include page cache. - auto weighted_mem_used = - int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load()) * - _weighted_mem_ratio); - *is_low_wartermark = - (weighted_mem_used > ((double)_memory_limit * - _spill_low_watermark.load(std::memory_order_relaxed) / 100)); - *is_high_wartermark = - (weighted_mem_used > ((double)_memory_limit * - _spill_high_watermark.load(std::memory_order_relaxed) / 100)); + auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load(); + *is_low_wartermark = (realtime_total_mem_used > + ((double)_weighted_memory_limit * + _spill_low_watermark.load(std::memory_order_relaxed) / 100)); + *is_high_wartermark = (realtime_total_mem_used > + ((double)_weighted_memory_limit * + _spill_high_watermark.load(std::memory_order_relaxed) / 100)); } std::string debug_string() const; + std::string memory_debug_string() const; void check_and_update(const WorkloadGroupInfo& tg_info); @@ -215,10 +216,11 @@ private: std::string _name; int64_t _version; int64_t _memory_limit; // bytes + // `weighted_memory_limit` less than or equal to _memory_limit, calculate after exclude public memory. + // more detailed description in `refresh_wg_weighted_memory_limit`. + std::atomic _weighted_memory_limit {0}; // // last value of make_memory_tracker_snapshots, refresh every time make_memory_tracker_snapshots is called. std::atomic_int64_t _total_mem_used = 0; // bytes - // last value of refresh_wg_weighted_memory_ratio. - std::atomic _weighted_mem_ratio = 0.0; std::atomic_int64_t _wg_refresh_interval_memory_growth; bool _enable_memory_overcommit; std::atomic _cpu_share; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index dc121895ae..1df0dcc3a4 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -153,86 +153,101 @@ struct WorkloadGroupMemInfo { std::list>(); }; -void WorkloadGroupMgr::refresh_wg_weighted_memory_ratio() { +void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() { std::shared_lock r_lock(_group_mutex); // 1. make all workload groups memory snapshots(refresh workload groups total memory used at the same time) // and calculate total memory used of all queries. - int64_t all_queries_mem_used = 0; + int64_t all_workload_groups_mem_usage = 0; std::unordered_map wgs_mem_info; for (auto& [wg_id, wg] : _workload_groups) { wgs_mem_info[wg_id].total_mem_used = wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots); - all_queries_mem_used += wgs_mem_info[wg_id].total_mem_used; + all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used; } - if (all_queries_mem_used <= 0) { + if (all_workload_groups_mem_usage <= 0) { return; } - // 2. calculate weighted ratio. - // process memory used is actually bigger than all_queries_mem_used, - // because memory of page cache, allocator cache, segment cache etc. are included - // in proc_vm_rss. - // we count these cache memories equally on workload groups. + // 2. calculate weighted memory limit ratio. + // when construct workload group, mem_limit is equal to (process_memory_limit * group_limit_percent), + // here, it is assumed that the available memory of workload groups is equal to process_memory_limit. + // + // but process_memory_usage is actually bigger than all_workload_groups_mem_usage, + // because public_memory of page cache, allocator cache, segment cache etc. are included in process_memory_usage. + // so actual available memory of the workload groups is equal to (process_memory_limit - public_memory) + // + // we will exclude this public_memory when calculate workload group mem_limit. + // so a ratio is calculated to multiply the workload group mem_limit from the previous construction. auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage(); - all_queries_mem_used = std::min(process_memory_usage, all_queries_mem_used); - double ratio = (double)process_memory_usage / (double)all_queries_mem_used; - if (ratio <= 1.25) { - std::string debug_msg = - fmt::format("\nProcess Memory Summary: {}, {}, all quries mem: {}", - doris::GlobalMemoryArbitrator::process_memory_used_details_str(), - doris::GlobalMemoryArbitrator::sys_mem_available_details_str(), - PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES)); - LOG_EVERY_T(INFO, 10) << debug_msg; + auto process_memory_limit = MemInfo::mem_limit(); + double weighted_memory_limit_ratio = 1; + // if all_workload_groups_mem_usage is greater than process_memory_usage, it means that the memory statistics + // of the workload group are inaccurate. + // the reason is that query/load/etc. tracked is virtual memory, and virtual memory is not used in time. + // + // At this time, weighted_memory_limit_ratio is equal to 1, and workload group mem_limit is still equal to + // (process_memory_limit * group_limit_percent), this may cause query spill to occur earlier, + // However, there is no good solution at present, but we cannot predict when these virtual memory will be used. + if (all_workload_groups_mem_usage < process_memory_usage) { + int64_t public_memory = process_memory_usage - all_workload_groups_mem_usage; + weighted_memory_limit_ratio = 1 - (double)public_memory / (double)process_memory_limit; } - for (auto& wg : _workload_groups) { - // 3.1 calculate query weighted memory limit of task group - auto wg_mem_limit = wg.second->memory_limit(); - auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size(); - int64_t query_weighted_mem_limit = - wg_query_count ? (wg_mem_limit + wg_query_count) / wg_query_count : wg_mem_limit; + std::string debug_msg = fmt::format( + "\nProcess Memory Summary: {}, {}, all workload groups memory usage: {}, " + "weighted_memory_limit_ratio: {}", + doris::GlobalMemoryArbitrator::process_memory_used_details_str(), + doris::GlobalMemoryArbitrator::sys_mem_available_details_str(), + PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES), + weighted_memory_limit_ratio); + LOG_EVERY_T(INFO, 10) << debug_msg; - // 3.2 set all workload groups weighted memory ratio and all query weighted memory limit and ratio. - wg.second->set_weighted_memory_ratio(ratio); + for (auto& wg : _workload_groups) { + // 3.1 calculate query spill threshold of task group + auto wg_weighted_mem_limit = + int64_t(wg.second->memory_limit() * weighted_memory_limit_ratio); + wg.second->set_weighted_memory_limit(wg_weighted_mem_limit); + + // 3.2 set workload groups weighted memory limit and all query spill threshold. + auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size(); + int64_t query_spill_threshold = + wg_query_count ? (wg_weighted_mem_limit + wg_query_count) / wg_query_count + : wg_weighted_mem_limit; for (const auto& query : wg.second->queries()) { auto query_ctx = query.second.lock(); if (!query_ctx) { continue; } - query_ctx->set_weighted_memory(query_weighted_mem_limit, ratio); + query_ctx->set_spill_threshold(query_spill_threshold); } // 3.3 only print debug logs, if workload groups is_high_wartermark or is_low_wartermark. - auto weighted_mem_used = int64_t(wgs_mem_info[wg.first].total_mem_used * ratio); - bool is_high_wartermark = - (weighted_mem_used > - ((double)wg_mem_limit * wg.second->spill_threashold_high_water_mark() / 100)); - bool is_low_wartermark = - (weighted_mem_used > - ((double)wg_mem_limit * wg.second->spill_threshold_low_water_mark() / 100)); + bool is_low_wartermark = false; + bool is_high_wartermark = false; + wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark); std::string debug_msg; if (is_high_wartermark || is_low_wartermark) { debug_msg = fmt::format( - "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem used: {}, used " - "ratio: {}, query " - "count: {}, query_weighted_mem_limit: {}", - wg.second->name(), PrettyPrinter::print(wg_mem_limit, TUnit::BYTES), + "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem limit: {}, " + "used " + "ratio: {}, query count: {}, query spill threshold: {}", + wg.second->name(), + PrettyPrinter::print(wg.second->memory_limit(), TUnit::BYTES), PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES), - PrettyPrinter::print(weighted_mem_used, TUnit::BYTES), - (double)weighted_mem_used / wg_mem_limit, wg_query_count, - PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES)); + PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES), + (double)wgs_mem_info[wg.first].total_mem_used / wg_weighted_mem_limit, + wg_query_count, PrettyPrinter::print(query_spill_threshold, TUnit::BYTES)); debug_msg += "\n Query Memory Summary:"; // check whether queries need to revoke memory for task group for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) { debug_msg += fmt::format( - "\n MemTracker Label={}, Parent Label={}, Used={}, WeightedUsed={}, " + "\n MemTracker Label={}, Parent Label={}, Used={}, SpillThreshold={}, " "Peak={}", query_mem_tracker->label(), query_mem_tracker->parent_label(), PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES), - PrettyPrinter::print(int64_t(query_mem_tracker->consumption() * ratio), - TUnit::BYTES), + PrettyPrinter::print(query_spill_threshold, TUnit::BYTES), PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES)); } LOG_EVERY_T(INFO, 1) << debug_msg; diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 4c4e82409d..15740d061a 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -58,7 +58,7 @@ public: bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); } - void refresh_wg_weighted_memory_ratio(); + void refresh_wg_weighted_memory_limit(); void get_wg_resource_usage(vectorized::Block* block); diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 5a37267a93..745ef4dea0 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -56,6 +56,7 @@ list(REMOVE_ITEM UT_FILES ${CMAKE_CURRENT_SOURCE_DIR}/olap/remote_rowset_gc_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/runtime/jsonb_value_test.cpp ${CMAKE_CURRENT_SOURCE_DIR}/runtime/large_int_value_test.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/vec/runtime/vdata_stream_test.cpp ) list(APPEND UT_FILES diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp index ab15fce05a..c1feb43fe9 100644 --- a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp +++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp @@ -87,7 +87,7 @@ TEST_F(ThreadMemTrackerMgrTest, ConsumeMemory) { EXPECT_EQ(t->consumption(), 0); // detach automatic call flush_untracked_mem. } -TEST(ThreadMemTrackerMgrTest, Boundary) { +TEST_F(ThreadMemTrackerMgrTest, Boundary) { // TODO, Boundary check may not be necessary, add some `IF` maybe increase cost time. } @@ -264,7 +264,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { thread_context->consume_memory(size2); EXPECT_EQ(t->consumption(), size1 + size2); - thread_context->try_reserve_memory(size3); + auto st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -284,14 +285,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 + size1); - std::cout << "11111 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " - << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; thread_context->consume_memory(size1); thread_context->consume_memory(size1); - std::cout << "2222 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " - << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; - std::cout << "3333 " << thread_context->thread_mem_tracker_mgr->untracked_mem() << ", " - << thread_context->thread_mem_tracker_mgr->reserved_mem() << std::endl; // reserved memory used done EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); @@ -308,7 +303,8 @@ TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) { EXPECT_EQ(t->consumption(), size1 + size2); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0); - thread_context->try_reserve_memory(size3); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t->consumption(), size1 + size2 + size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -358,7 +354,8 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { int64_t size3 = size2 * 1024; thread_context->attach_task(TUniqueId(), t, workload_group); - thread_context->try_reserve_memory(size3); + auto st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); @@ -369,15 +366,18 @@ TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) { EXPECT_EQ(t->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); - thread_context->try_reserve_memory(size2); + st = thread_context->try_reserve_memory(size2); + EXPECT_TRUE(st.ok()); // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2 // ThreadMemTrackerMgr _untracked_mem = 0 EXPECT_EQ(t->consumption(), size3 + size2); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3); // size3 - size2 + size2 - thread_context->try_reserve_memory(size3); - thread_context->try_reserve_memory(size3); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); thread_context->consume_memory(size3); thread_context->consume_memory(size2); thread_context->consume_memory(size3); @@ -411,13 +411,15 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { int64_t size3 = size2 * 1024; thread_context->attach_task(TUniqueId(), t1, workload_group); - thread_context->try_reserve_memory(size3); + auto st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); thread_context->consume_memory(size2); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2); - thread_context->try_reserve_memory(size3); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(t2->consumption(), size3); EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2 + size3); @@ -428,7 +430,8 @@ TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) { EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 - size2); thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3); - thread_context->try_reserve_memory(size3); + st = thread_context->try_reserve_memory(size3); + EXPECT_TRUE(st.ok()); EXPECT_EQ(t1->consumption(), size3); EXPECT_EQ(t2->consumption(), size3 + size2); EXPECT_EQ(t3->consumption(), size3);