[branch-2.1](memory) Refactor refresh workload groups weighted memory ratio and record refresh interval memory growth (#39760)
pick #38168 overwrites changes in #37221 on workload_group_manager.cpp. If need to pick 37221, ignore it.
This commit is contained in:
@ -588,8 +588,7 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
|
||||
// Sleep time in milliseconds between memtbale flush mgr refresh iterations
|
||||
DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5");
|
||||
|
||||
// Sleep time in milliseconds between refresh iterations of workload group memory statistics
|
||||
DEFINE_mInt64(wg_mem_refresh_interval_ms, "50");
|
||||
DEFINE_mInt64(wg_weighted_memory_ratio_refresh_interval_ms, "50");
|
||||
|
||||
// percent of (active memtables size / all memtables size) when reach hard limit
|
||||
DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
|
||||
|
||||
@ -645,8 +645,8 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
|
||||
// Sleep time in milliseconds between memtbale flush mgr memory refresh iterations
|
||||
DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);
|
||||
|
||||
// Sleep time in milliseconds between refresh iterations of workload group memory statistics
|
||||
DECLARE_mInt64(wg_mem_refresh_interval_ms);
|
||||
// Sleep time in milliseconds between refresh iterations of workload group weighted memory ratio
|
||||
DECLARE_mInt64(wg_weighted_memory_ratio_refresh_interval_ms);
|
||||
|
||||
// percent of (active memtables size / all memtables size) when reach hard limit
|
||||
DECLARE_mInt32(memtable_hard_limit_active_percent);
|
||||
|
||||
@ -377,11 +377,11 @@ void Daemon::je_purge_dirty_pages_thread() const {
|
||||
} while (true);
|
||||
}
|
||||
|
||||
void Daemon::wg_mem_used_refresh_thread() {
|
||||
// Refresh memory usage and limit of workload groups
|
||||
void Daemon::wg_weighted_memory_ratio_refresh_thread() {
|
||||
// Refresh weighted memory ratio of workload groups
|
||||
while (!_stop_background_threads_latch.wait_for(
|
||||
std::chrono::milliseconds(config::wg_mem_refresh_interval_ms))) {
|
||||
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_memory_info();
|
||||
std::chrono::milliseconds(config::wg_weighted_memory_ratio_refresh_interval_ms))) {
|
||||
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_ratio();
|
||||
}
|
||||
}
|
||||
|
||||
@ -420,7 +420,8 @@ void Daemon::start() {
|
||||
CHECK(st.ok()) << st;
|
||||
|
||||
st = Thread::create(
|
||||
"Daemon", "wg_mem_refresh_thread", [this]() { this->wg_mem_used_refresh_thread(); },
|
||||
"Daemon", "wg_weighted_memory_ratio_refresh_thread",
|
||||
[this]() { this->wg_weighted_memory_ratio_refresh_thread(); },
|
||||
&_threads.emplace_back());
|
||||
CHECK(st.ok()) << st;
|
||||
}
|
||||
|
||||
@ -44,7 +44,7 @@ private:
|
||||
void calculate_metrics_thread();
|
||||
void je_purge_dirty_pages_thread() const;
|
||||
void report_runtime_query_statistics_thread();
|
||||
void wg_mem_used_refresh_thread();
|
||||
void wg_weighted_memory_ratio_refresh_thread();
|
||||
|
||||
CountDownLatch _stop_background_threads_latch;
|
||||
std::vector<scoped_refptr<Thread>> _threads;
|
||||
|
||||
@ -263,7 +263,8 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
status = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
|
||||
[this, &parent, state, query_id, mem_tracker, shared_state_holder, execution_context,
|
||||
submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
|
||||
@ -269,7 +269,8 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
|
||||
|
||||
auto exception_catch_func = [spill_func, query_id, mem_tracker, shared_state_holder,
|
||||
execution_context, this]() {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
|
||||
@ -210,7 +210,8 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
|
||||
|
||||
auto exception_catch_func = [query_id, mem_tracker, shared_state_holder, execution_context,
|
||||
spill_func, this]() {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
@ -338,7 +339,8 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
|
||||
|
||||
auto exception_catch_func = [read_func, query_id, mem_tracker, shared_state_holder,
|
||||
execution_context, state, this]() {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
@ -426,7 +428,8 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
|
||||
|
||||
auto exception_catch_func = [read_func, mem_tracker, shared_state_holder, execution_context,
|
||||
query_id, this]() {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
|
||||
@ -127,7 +127,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
|
||||
auto spill_func = [build_blocks = std::move(build_blocks), state, num_slots, this]() mutable {
|
||||
Defer defer {[&]() {
|
||||
// need to reset build_block here, or else build_block will be destructed
|
||||
// after SCOPED_ATTACH_TASK_WITH_ID and will trigger memory_orphan_check failure
|
||||
// after SCOPED_ATTACH_TASK and will trigger memory_orphan_check failure
|
||||
build_blocks.clear();
|
||||
}};
|
||||
|
||||
@ -216,7 +216,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
|
||||
|
||||
auto exception_catch_func = [spill_func, shared_state_holder, execution_context, state,
|
||||
query_id, mem_tracker, this]() mutable {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
@ -289,7 +290,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
|
||||
auto st = spill_io_pool->submit_func([this, query_id, mem_tracker, shared_state_holder,
|
||||
execution_context, spilling_stream, i, submit_timer] {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
|
||||
@ -296,7 +296,8 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
|
||||
|
||||
auto exception_catch_func = [this, query_id, mem_tracker, shared_state_holder,
|
||||
execution_context, spill_func]() {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
|
||||
@ -175,7 +175,8 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
|
||||
|
||||
auto exception_catch_func = [this, query_id, mem_tracker, shared_state_holder,
|
||||
execution_context, spill_func]() {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
|
||||
QueryThreadContext query_thread_context {query_id, mem_tracker};
|
||||
SCOPED_ATTACH_TASK(query_thread_context);
|
||||
std::shared_ptr<TaskExecutionContext> execution_context_lock;
|
||||
auto shared_state_sptr = shared_state_holder.lock();
|
||||
if (shared_state_sptr) {
|
||||
|
||||
@ -136,12 +136,11 @@ PipelineFragmentContext::PipelineFragmentContext(
|
||||
_create_time(MonotonicNanos()) {
|
||||
_fragment_watcher.start();
|
||||
_start_time = VecDateTimeValue::local_time();
|
||||
_query_thread_context = {query_id, _query_ctx->query_mem_tracker};
|
||||
}
|
||||
|
||||
PipelineFragmentContext::~PipelineFragmentContext() {
|
||||
// The memory released by the query end is recorded in the query mem tracker.
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker);
|
||||
auto st = _query_ctx->exec_status();
|
||||
_query_ctx.reset();
|
||||
_tasks.clear();
|
||||
|
||||
@ -187,8 +187,6 @@ protected:
|
||||
|
||||
std::shared_ptr<QueryContext> _query_ctx;
|
||||
|
||||
QueryThreadContext _query_thread_context;
|
||||
|
||||
MonotonicStopWatch _fragment_watcher;
|
||||
RuntimeProfile::Counter* _start_timer = nullptr;
|
||||
RuntimeProfile::Counter* _prepare_timer = nullptr;
|
||||
|
||||
@ -112,7 +112,7 @@ PipelineXFragmentContext::PipelineXFragmentContext(
|
||||
|
||||
PipelineXFragmentContext::~PipelineXFragmentContext() {
|
||||
// The memory released by the query end is recorded in the query mem tracker.
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker);
|
||||
auto st = _query_ctx->exec_status();
|
||||
_tasks.clear();
|
||||
if (!_task_runtime_states.empty()) {
|
||||
|
||||
@ -370,7 +370,7 @@ bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t revocable_
|
||||
} else if (is_wg_mem_low_water_mark) {
|
||||
int64_t query_weighted_limit = 0;
|
||||
int64_t query_weighted_consumption = 0;
|
||||
query_ctx->get_weighted_mem_info(query_weighted_limit, query_weighted_consumption);
|
||||
query_ctx->get_weighted_memory(query_weighted_limit, query_weighted_consumption);
|
||||
if (query_weighted_consumption < query_weighted_limit) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -947,7 +947,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
|
||||
|
||||
std::shared_ptr<QueryContext> query_ctx;
|
||||
RETURN_IF_ERROR(_get_query_ctx(params, params.query_id, true, query_source, query_ctx));
|
||||
SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, params.query_id);
|
||||
SCOPED_ATTACH_TASK(query_ctx.get());
|
||||
const bool enable_pipeline_x = params.query_options.__isset.enable_pipeline_x_engine &&
|
||||
params.query_options.enable_pipeline_x_engine;
|
||||
if (enable_pipeline_x) {
|
||||
@ -1093,7 +1093,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
|
||||
|
||||
for (size_t i = 0; i < target_size; i++) {
|
||||
RETURN_IF_ERROR(_thread_pool->submit_func([&, i]() {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, query_ctx->query_id());
|
||||
SCOPED_ATTACH_TASK(query_ctx.get());
|
||||
prepare_status[i] = pre_and_submit(i);
|
||||
std::unique_lock<std::mutex> lock(m);
|
||||
prepare_done++;
|
||||
@ -1611,7 +1611,8 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
|
||||
runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
|
||||
pool = &pip_context->get_query_ctx()->obj_pool;
|
||||
query_thread_context = {pip_context->get_query_ctx()->query_id(),
|
||||
pip_context->get_query_ctx()->query_mem_tracker};
|
||||
pip_context->get_query_ctx()->query_mem_tracker,
|
||||
pip_context->get_query_ctx()->workload_group()};
|
||||
} else {
|
||||
auto iter = _fragment_instance_map.find(tfragment_instance_id);
|
||||
if (iter == _fragment_instance_map.end()) {
|
||||
@ -1717,7 +1718,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
|
||||
// when filter_controller->merge is still in progress
|
||||
query_ctx = iter->second;
|
||||
}
|
||||
SCOPED_ATTACH_TASK_WITH_ID(query_ctx->query_mem_tracker, query_ctx->query_id());
|
||||
SCOPED_ATTACH_TASK(query_ctx.get());
|
||||
auto merge_status = filter_controller->merge(request, attach_data, opt_remote_rf);
|
||||
return merge_status;
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/tablets_channel.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "runtime/workload_group/workload_group_manager.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -43,7 +44,8 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_hig
|
||||
std::shared_ptr<QueryContext> query_context =
|
||||
ExecEnv::GetInstance()->fragment_mgr()->get_query_context(_load_id.to_thrift());
|
||||
if (query_context != nullptr) {
|
||||
_query_thread_context = {_load_id.to_thrift(), query_context->query_mem_tracker};
|
||||
_query_thread_context = {_load_id.to_thrift(), query_context->query_mem_tracker,
|
||||
query_context->workload_group()};
|
||||
} else {
|
||||
_query_thread_context = {
|
||||
_load_id.to_thrift(),
|
||||
|
||||
@ -40,6 +40,7 @@
|
||||
#include "runtime/load_channel.h"
|
||||
#include "runtime/load_stream_mgr.h"
|
||||
#include "runtime/load_stream_writer.h"
|
||||
#include "runtime/workload_group/workload_group_manager.h"
|
||||
#include "util/debug_points.h"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "util/thrift_util.h"
|
||||
@ -352,7 +353,8 @@ LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool e
|
||||
std::shared_ptr<QueryContext> query_context =
|
||||
ExecEnv::GetInstance()->fragment_mgr()->get_query_context(load_tid);
|
||||
if (query_context != nullptr) {
|
||||
_query_thread_context = {load_tid, query_context->query_mem_tracker};
|
||||
_query_thread_context = {load_tid, query_context->query_mem_tracker,
|
||||
query_context->workload_group()};
|
||||
} else {
|
||||
_query_thread_context = {load_tid, MemTrackerLimiter::create_shared(
|
||||
MemTrackerLimiter::Type::LOAD,
|
||||
|
||||
@ -215,7 +215,7 @@ public:
|
||||
}
|
||||
|
||||
// Iterator into mem_tracker_limiter_pool for this object. Stored to have O(1) remove.
|
||||
std::list<std::weak_ptr<MemTrackerLimiter>>::iterator tg_tracker_limiter_group_it;
|
||||
std::list<std::weak_ptr<MemTrackerLimiter>>::iterator wg_tracker_limiter_group_it;
|
||||
|
||||
private:
|
||||
friend class ThreadMemTrackerMgr;
|
||||
|
||||
@ -33,6 +33,7 @@
|
||||
#include "runtime/memory/global_memory_arbitrator.h"
|
||||
#include "runtime/memory/mem_tracker.h"
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
#include "runtime/workload_group/workload_group.h"
|
||||
#include "util/stack_util.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
@ -71,6 +72,10 @@ public:
|
||||
|
||||
TUniqueId query_id() { return _query_id; }
|
||||
|
||||
void set_wg_wptr(const std::weak_ptr<WorkloadGroup>& wg_wptr) { _wg_wptr = wg_wptr; }
|
||||
|
||||
void reset_wg_wptr() { _wg_wptr.reset(); }
|
||||
|
||||
void start_count_scope_mem() {
|
||||
CHECK(init());
|
||||
_scope_mem = _reserved_mem; // consume in advance
|
||||
@ -151,6 +156,7 @@ private:
|
||||
std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
|
||||
MemTrackerLimiter* _limiter_tracker_raw = nullptr;
|
||||
std::vector<MemTracker*> _consumer_tracker_stack;
|
||||
std::weak_ptr<WorkloadGroup> _wg_wptr;
|
||||
|
||||
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
|
||||
bool _stop_consume = false;
|
||||
@ -287,8 +293,16 @@ inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
|
||||
if (!_limiter_tracker_raw->try_consume(size)) {
|
||||
return false;
|
||||
}
|
||||
auto wg_ptr = _wg_wptr.lock();
|
||||
if (!wg_ptr) {
|
||||
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
|
||||
_limiter_tracker_raw->release(size); // rollback
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
|
||||
_limiter_tracker_raw->release(size); // rollback
|
||||
_limiter_tracker_raw->release(size); // rollback
|
||||
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
|
||||
return false;
|
||||
}
|
||||
if (_count_scope_mem) {
|
||||
@ -306,6 +320,10 @@ inline void ThreadMemTrackerMgr::release_reserved() {
|
||||
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
|
||||
_untracked_mem);
|
||||
_limiter_tracker_raw->release(_reserved_mem);
|
||||
auto wg_ptr = _wg_wptr.lock();
|
||||
if (!wg_ptr) {
|
||||
wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem);
|
||||
}
|
||||
if (_count_scope_mem) {
|
||||
_scope_mem -= _reserved_mem;
|
||||
}
|
||||
|
||||
@ -269,15 +269,16 @@ public:
|
||||
return _running_big_mem_op_num.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void set_weighted_mem(int64_t weighted_limit, int64_t weighted_consumption) {
|
||||
void set_weighted_memory(int64_t weighted_limit, double weighted_ratio) {
|
||||
std::lock_guard<std::mutex> l(_weighted_mem_lock);
|
||||
_weighted_consumption = weighted_consumption;
|
||||
_weighted_limit = weighted_limit;
|
||||
_weighted_ratio = weighted_ratio;
|
||||
}
|
||||
void get_weighted_mem_info(int64_t& weighted_limit, int64_t& weighted_consumption) {
|
||||
|
||||
void get_weighted_memory(int64_t& weighted_limit, int64_t& weighted_consumption) {
|
||||
std::lock_guard<std::mutex> l(_weighted_mem_lock);
|
||||
weighted_limit = _weighted_limit;
|
||||
weighted_consumption = _weighted_consumption;
|
||||
weighted_consumption = int64_t(query_mem_tracker->consumption() * _weighted_ratio);
|
||||
}
|
||||
|
||||
DescriptorTbl* desc_tbl = nullptr;
|
||||
@ -360,7 +361,7 @@ private:
|
||||
std::mutex _pipeline_map_write_lock;
|
||||
|
||||
std::mutex _weighted_mem_lock;
|
||||
int64_t _weighted_consumption = 0;
|
||||
double _weighted_ratio = 0;
|
||||
int64_t _weighted_limit = 0;
|
||||
timespec _query_arrival_timestamp;
|
||||
// Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to
|
||||
|
||||
@ -18,7 +18,9 @@
|
||||
#include "runtime/thread_context.h"
|
||||
|
||||
#include "common/signal_handler.h"
|
||||
#include "runtime/query_context.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "runtime/workload_group/workload_group_manager.h"
|
||||
|
||||
namespace doris {
|
||||
class MemTracker;
|
||||
@ -26,34 +28,38 @@ class MemTracker;
|
||||
QueryThreadContext ThreadContext::query_thread_context() {
|
||||
DCHECK(doris::pthread_context_ptr_init);
|
||||
ORPHAN_TRACKER_CHECK();
|
||||
return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker()};
|
||||
return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker(), _wg_wptr};
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
|
||||
const TUniqueId& task_id) {
|
||||
ThreadLocalHandle::create_thread_local_if_not_exits();
|
||||
signal::set_signal_task_id(task_id);
|
||||
thread_context()->attach_task(task_id, mem_tracker);
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
|
||||
ThreadLocalHandle::create_thread_local_if_not_exits();
|
||||
signal::set_signal_task_id(TUniqueId());
|
||||
thread_context()->attach_task(TUniqueId(), mem_tracker);
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(RuntimeState* runtime_state) {
|
||||
ThreadLocalHandle::create_thread_local_if_not_exits();
|
||||
signal::set_signal_task_id(runtime_state->query_id());
|
||||
signal::set_signal_is_nereids(runtime_state->is_nereids());
|
||||
thread_context()->attach_task(runtime_state->query_id(), runtime_state->query_mem_tracker());
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(const QueryThreadContext& query_thread_context) {
|
||||
void AttachTask::init(const QueryThreadContext& query_thread_context) {
|
||||
ThreadLocalHandle::create_thread_local_if_not_exits();
|
||||
signal::set_signal_task_id(query_thread_context.query_id);
|
||||
thread_context()->attach_task(query_thread_context.query_id,
|
||||
query_thread_context.query_mem_tracker);
|
||||
query_thread_context.query_mem_tracker,
|
||||
query_thread_context.wg_wptr);
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
|
||||
QueryThreadContext query_thread_context = {TUniqueId(), mem_tracker};
|
||||
init(query_thread_context);
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(RuntimeState* runtime_state) {
|
||||
signal::set_signal_is_nereids(runtime_state->is_nereids());
|
||||
QueryThreadContext query_thread_context = {runtime_state->query_id(),
|
||||
runtime_state->query_mem_tracker(),
|
||||
runtime_state->get_query_ctx()->workload_group()};
|
||||
init(query_thread_context);
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(const QueryThreadContext& query_thread_context) {
|
||||
init(query_thread_context);
|
||||
}
|
||||
|
||||
AttachTask::AttachTask(QueryContext* query_ctx) {
|
||||
QueryThreadContext query_thread_context = {query_ctx->query_id(), query_ctx->query_mem_tracker,
|
||||
query_ctx->workload_group()};
|
||||
init(query_thread_context);
|
||||
}
|
||||
|
||||
AttachTask::~AttachTask() {
|
||||
|
||||
@ -45,8 +45,6 @@
|
||||
// This will save some info about a working thread in the thread context.
|
||||
// Looking forward to tracking memory during thread execution into MemTrackerLimiter.
|
||||
#define SCOPED_ATTACH_TASK(arg1) auto VARNAME_LINENUM(attach_task) = AttachTask(arg1)
|
||||
#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \
|
||||
auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2)
|
||||
|
||||
// Switch MemTrackerLimiter for count memory during thread execution.
|
||||
// Used after SCOPED_ATTACH_TASK, in order to count the memory into another
|
||||
@ -86,8 +84,6 @@
|
||||
// thread context need to be initialized, required by Allocator and elsewhere.
|
||||
#define SCOPED_ATTACH_TASK(arg1, ...) \
|
||||
auto VARNAME_LINENUM(scoped_tls_at) = doris::ScopedInitThreadContext()
|
||||
#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \
|
||||
auto VARNAME_LINENUM(scoped_tls_atwi) = doris::ScopedInitThreadContext()
|
||||
#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(arg1) \
|
||||
auto VARNAME_LINENUM(scoped_tls_stmtl) = doris::ScopedInitThreadContext()
|
||||
#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
|
||||
@ -121,6 +117,7 @@ class ThreadContext;
|
||||
class MemTracker;
|
||||
class RuntimeState;
|
||||
class QueryThreadContext;
|
||||
class WorkloadGroup;
|
||||
|
||||
extern bthread_key_t btls_key;
|
||||
|
||||
@ -155,7 +152,8 @@ public:
|
||||
~ThreadContext() = default;
|
||||
|
||||
void attach_task(const TUniqueId& task_id,
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
|
||||
const std::weak_ptr<WorkloadGroup>& wg_wptr) {
|
||||
// will only attach_task at the beginning of the thread function, there should be no duplicate attach_task.
|
||||
DCHECK(mem_tracker);
|
||||
// Orphan is thread default tracker.
|
||||
@ -163,16 +161,20 @@ public:
|
||||
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
|
||||
<< ", attach mem tracker label: " << mem_tracker->label();
|
||||
_task_id = task_id;
|
||||
_wg_wptr = wg_wptr;
|
||||
thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
|
||||
thread_mem_tracker_mgr->set_query_id(_task_id);
|
||||
thread_mem_tracker_mgr->set_wg_wptr(_wg_wptr);
|
||||
thread_mem_tracker_mgr->enable_wait_gc();
|
||||
thread_mem_tracker_mgr->reset_query_cancelled_flag(false);
|
||||
}
|
||||
|
||||
void detach_task() {
|
||||
_task_id = TUniqueId();
|
||||
_wg_wptr.reset();
|
||||
thread_mem_tracker_mgr->detach_limiter_tracker();
|
||||
thread_mem_tracker_mgr->set_query_id(TUniqueId());
|
||||
thread_mem_tracker_mgr->reset_wg_wptr();
|
||||
thread_mem_tracker_mgr->disable_wait_gc();
|
||||
}
|
||||
|
||||
@ -223,12 +225,15 @@ public:
|
||||
thread_mem_tracker_mgr->release_reserved();
|
||||
}
|
||||
|
||||
std::weak_ptr<WorkloadGroup> workload_group() { return _wg_wptr; }
|
||||
|
||||
int thread_local_handle_count = 0;
|
||||
int skip_memory_check = 0;
|
||||
int skip_large_memory_check = 0;
|
||||
|
||||
private:
|
||||
TUniqueId _task_id;
|
||||
std::weak_ptr<WorkloadGroup> _wg_wptr;
|
||||
};
|
||||
|
||||
class ThreadLocalHandle {
|
||||
@ -309,6 +314,11 @@ static ThreadContext* thread_context() {
|
||||
class QueryThreadContext {
|
||||
public:
|
||||
QueryThreadContext() = default;
|
||||
QueryThreadContext(const TUniqueId& query_id,
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
|
||||
const std::weak_ptr<WorkloadGroup>& wg_wptr)
|
||||
: query_id(query_id), query_mem_tracker(mem_tracker), wg_wptr(wg_wptr) {}
|
||||
// If use WorkloadGroup and can get WorkloadGroup ptr, must as a parameter.
|
||||
QueryThreadContext(const TUniqueId& query_id,
|
||||
const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
|
||||
: query_id(query_id), query_mem_tracker(mem_tracker) {}
|
||||
@ -318,6 +328,7 @@ public:
|
||||
ORPHAN_TRACKER_CHECK();
|
||||
query_id = doris::thread_context()->task_id();
|
||||
query_mem_tracker = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
|
||||
wg_wptr = doris::thread_context()->workload_group();
|
||||
#else
|
||||
query_id = TUniqueId();
|
||||
query_mem_tracker = doris::ExecEnv::GetInstance()->orphan_mem_tracker();
|
||||
@ -326,6 +337,7 @@ public:
|
||||
|
||||
TUniqueId query_id;
|
||||
std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
|
||||
std::weak_ptr<WorkloadGroup> wg_wptr;
|
||||
};
|
||||
|
||||
class ScopeMemCountByHook {
|
||||
@ -357,15 +369,18 @@ public:
|
||||
|
||||
class AttachTask {
|
||||
public:
|
||||
explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
|
||||
const TUniqueId& task_id);
|
||||
|
||||
// not query or load, initialize with memory tracker, empty query id and default normal workload group.
|
||||
explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker);
|
||||
|
||||
// is query or load, initialize with memory tracker, query id and workload group wptr.
|
||||
explicit AttachTask(RuntimeState* runtime_state);
|
||||
|
||||
explicit AttachTask(QueryContext* query_ctx);
|
||||
|
||||
explicit AttachTask(const QueryThreadContext& query_thread_context);
|
||||
|
||||
void init(const QueryThreadContext& query_thread_context);
|
||||
|
||||
~AttachTask();
|
||||
};
|
||||
|
||||
@ -380,7 +395,8 @@ public:
|
||||
|
||||
explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) {
|
||||
ThreadLocalHandle::create_thread_local_if_not_exits();
|
||||
DCHECK(thread_context()->task_id() == query_thread_context.query_id);
|
||||
DCHECK(thread_context()->task_id() ==
|
||||
query_thread_context.query_id); // workload group alse not change
|
||||
DCHECK(query_thread_context.query_mem_tracker);
|
||||
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
|
||||
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
|
||||
|
||||
@ -107,39 +107,59 @@ void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
|
||||
}
|
||||
}
|
||||
|
||||
int64_t WorkloadGroup::memory_used() {
|
||||
int64_t WorkloadGroup::make_memory_tracker_snapshots(
|
||||
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots) {
|
||||
int64_t used_memory = 0;
|
||||
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
|
||||
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
|
||||
for (const auto& trackerWptr : mem_tracker_group.trackers) {
|
||||
auto tracker = trackerWptr.lock();
|
||||
CHECK(tracker != nullptr);
|
||||
if (tracker_snapshots != nullptr) {
|
||||
tracker_snapshots->insert(tracker_snapshots->end(), tracker);
|
||||
}
|
||||
used_memory += tracker->consumption();
|
||||
}
|
||||
}
|
||||
refresh_memory(used_memory);
|
||||
return used_memory;
|
||||
}
|
||||
|
||||
void WorkloadGroup::set_weighted_memory_used(int64_t wg_total_mem_used, double ratio) {
|
||||
_weighted_mem_used.store(int64_t(wg_total_mem_used * ratio), std::memory_order_relaxed);
|
||||
int64_t WorkloadGroup::memory_used() {
|
||||
return make_memory_tracker_snapshots(nullptr);
|
||||
}
|
||||
|
||||
void WorkloadGroup::refresh_memory(int64_t used_memory) {
|
||||
// refresh total memory used.
|
||||
_total_mem_used = used_memory;
|
||||
// reserve memory is recorded in the query mem tracker
|
||||
// and _total_mem_used already contains all the current reserve memory.
|
||||
// so after refreshing _total_mem_used, reset _wg_refresh_interval_memory_growth.
|
||||
_wg_refresh_interval_memory_growth.store(0.0);
|
||||
}
|
||||
|
||||
void WorkloadGroup::set_weighted_memory_ratio(double ratio) {
|
||||
_weighted_mem_ratio = ratio;
|
||||
}
|
||||
|
||||
void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
|
||||
std::unique_lock<std::shared_mutex> wlock(_mutex);
|
||||
auto group_num = mem_tracker_ptr->group_num();
|
||||
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
|
||||
mem_tracker_ptr->tg_tracker_limiter_group_it =
|
||||
mem_tracker_ptr->wg_tracker_limiter_group_it =
|
||||
_mem_tracker_limiter_pool[group_num].trackers.insert(
|
||||
_mem_tracker_limiter_pool[group_num].trackers.end(), mem_tracker_ptr);
|
||||
}
|
||||
|
||||
void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
|
||||
std::unique_lock<std::shared_mutex> wlock(_mutex);
|
||||
auto group_num = mem_tracker_ptr->group_num();
|
||||
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
|
||||
if (mem_tracker_ptr->tg_tracker_limiter_group_it !=
|
||||
if (mem_tracker_ptr->wg_tracker_limiter_group_it !=
|
||||
_mem_tracker_limiter_pool[group_num].trackers.end()) {
|
||||
_mem_tracker_limiter_pool[group_num].trackers.erase(
|
||||
mem_tracker_ptr->tg_tracker_limiter_group_it);
|
||||
mem_tracker_ptr->tg_tracker_limiter_group_it =
|
||||
mem_tracker_ptr->wg_tracker_limiter_group_it);
|
||||
mem_tracker_ptr->wg_tracker_limiter_group_it =
|
||||
_mem_tracker_limiter_pool[group_num].trackers.end();
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,7 +77,12 @@ public:
|
||||
return _memory_limit;
|
||||
};
|
||||
|
||||
// make memory snapshots and refresh total memory used at the same time.
|
||||
int64_t make_memory_tracker_snapshots(
|
||||
std::list<std::shared_ptr<MemTrackerLimiter>>* tracker_snapshots);
|
||||
// call make_memory_tracker_snapshots, so also refresh total memory used.
|
||||
int64_t memory_used();
|
||||
void refresh_memory(int64_t used_memory);
|
||||
|
||||
int spill_threshold_low_water_mark() const {
|
||||
return _spill_low_watermark.load(std::memory_order_relaxed);
|
||||
@ -86,10 +91,31 @@ public:
|
||||
return _spill_high_watermark.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void set_weighted_memory_used(int64_t wg_total_mem_used, double ratio);
|
||||
void set_weighted_memory_ratio(double ratio);
|
||||
bool add_wg_refresh_interval_memory_growth(int64_t size) {
|
||||
// `weighted_mem_used` is a rough memory usage in this group,
|
||||
// because we can only get a precise memory usage by MemTracker which is not include page cache.
|
||||
auto weighted_mem_used =
|
||||
int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load() + size) *
|
||||
_weighted_mem_ratio);
|
||||
if ((weighted_mem_used > ((double)_memory_limit *
|
||||
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
|
||||
return false;
|
||||
} else {
|
||||
_wg_refresh_interval_memory_growth.fetch_add(size);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
void sub_wg_refresh_interval_memory_growth(int64_t size) {
|
||||
_wg_refresh_interval_memory_growth.fetch_sub(size);
|
||||
}
|
||||
|
||||
void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) const {
|
||||
auto weighted_mem_used = _weighted_mem_used.load(std::memory_order_relaxed);
|
||||
// `weighted_mem_used` is a rough memory usage in this group,
|
||||
// because we can only get a precise memory usage by MemTracker which is not include page cache.
|
||||
auto weighted_mem_used =
|
||||
int64_t((_total_mem_used + _wg_refresh_interval_memory_growth.load()) *
|
||||
_weighted_mem_ratio);
|
||||
*is_low_wartermark =
|
||||
(weighted_mem_used > ((double)_memory_limit *
|
||||
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
|
||||
@ -138,7 +164,7 @@ public:
|
||||
|
||||
bool can_be_dropped() {
|
||||
std::shared_lock<std::shared_mutex> r_lock(_mutex);
|
||||
return _is_shutdown && _query_ctxs.size() == 0;
|
||||
return _is_shutdown && _query_ctxs.empty();
|
||||
}
|
||||
|
||||
int query_num() {
|
||||
@ -169,8 +195,12 @@ private:
|
||||
const uint64_t _id;
|
||||
std::string _name;
|
||||
int64_t _version;
|
||||
int64_t _memory_limit; // bytes
|
||||
std::atomic_int64_t _weighted_mem_used = 0; // bytes
|
||||
int64_t _memory_limit; // bytes
|
||||
// last value of make_memory_tracker_snapshots, refresh every time make_memory_tracker_snapshots is called.
|
||||
std::atomic_int64_t _total_mem_used = 0; // bytes
|
||||
// last value of refresh_wg_weighted_memory_ratio.
|
||||
std::atomic<double> _weighted_mem_ratio = 0.0;
|
||||
std::atomic_int64_t _wg_refresh_interval_memory_growth;
|
||||
bool _enable_memory_overcommit;
|
||||
std::atomic<uint64_t> _cpu_share;
|
||||
std::vector<TrackerLimiterGroup> _mem_tracker_limiter_pool;
|
||||
|
||||
@ -148,50 +148,34 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
|
||||
|
||||
struct WorkloadGroupMemInfo {
|
||||
int64_t total_mem_used = 0;
|
||||
int64_t weighted_mem_used = 0;
|
||||
bool is_low_wartermark = false;
|
||||
bool is_high_wartermark = false;
|
||||
double mem_used_ratio = 0;
|
||||
std::list<std::shared_ptr<MemTrackerLimiter>> tracker_snapshots =
|
||||
std::list<std::shared_ptr<MemTrackerLimiter>>();
|
||||
};
|
||||
void WorkloadGroupMgr::refresh_wg_memory_info() {
|
||||
|
||||
void WorkloadGroupMgr::refresh_wg_weighted_memory_ratio() {
|
||||
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
|
||||
// workload group id -> workload group queries
|
||||
std::unordered_map<uint64_t, std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>>>
|
||||
all_wg_queries;
|
||||
for (auto& [wg_id, wg] : _workload_groups) {
|
||||
all_wg_queries.insert({wg_id, wg->queries()});
|
||||
}
|
||||
|
||||
// 1. make all workload groups memory snapshots(refresh workload groups total memory used at the same time)
|
||||
// and calculate total memory used of all queries.
|
||||
int64_t all_queries_mem_used = 0;
|
||||
|
||||
// calculate total memory used of each workload group and total memory used of all queries
|
||||
std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
|
||||
for (auto& [wg_id, wg_queries] : all_wg_queries) {
|
||||
int64_t wg_total_mem_used = 0;
|
||||
for (const auto& [query_id, query_ctx_ptr] : wg_queries) {
|
||||
if (auto query_ctx = query_ctx_ptr.lock()) {
|
||||
wg_total_mem_used += query_ctx->query_mem_tracker->consumption();
|
||||
}
|
||||
}
|
||||
all_queries_mem_used += wg_total_mem_used;
|
||||
wgs_mem_info[wg_id] = {wg_total_mem_used};
|
||||
for (auto& [wg_id, wg] : _workload_groups) {
|
||||
wgs_mem_info[wg_id].total_mem_used =
|
||||
wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots);
|
||||
all_queries_mem_used += wgs_mem_info[wg_id].total_mem_used;
|
||||
}
|
||||
|
||||
// *TODO*, modify to use doris::GlobalMemoryArbitrator::process_memory_usage().
|
||||
auto proc_vm_rss = PerfCounters::get_vm_rss();
|
||||
if (all_queries_mem_used <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (proc_vm_rss < all_queries_mem_used) {
|
||||
all_queries_mem_used = proc_vm_rss;
|
||||
}
|
||||
|
||||
// 2. calculate weighted ratio.
|
||||
// process memory used is actually bigger than all_queries_mem_used,
|
||||
// because memory of page cache, allocator cache, segment cache etc. are included
|
||||
// in proc_vm_rss.
|
||||
// we count these cache memories equally on workload groups.
|
||||
double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
|
||||
auto process_memory_usage = GlobalMemoryArbitrator::process_memory_usage();
|
||||
all_queries_mem_used = std::min(process_memory_usage, all_queries_mem_used);
|
||||
double ratio = (double)process_memory_usage / (double)all_queries_mem_used;
|
||||
if (ratio <= 1.25) {
|
||||
std::string debug_msg =
|
||||
fmt::format("\nProcess Memory Summary: {}, {}, all quries mem: {}",
|
||||
@ -202,66 +186,57 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
|
||||
}
|
||||
|
||||
for (auto& wg : _workload_groups) {
|
||||
// 3.1 calculate query weighted memory limit of task group
|
||||
auto wg_mem_limit = wg.second->memory_limit();
|
||||
auto& wg_mem_info = wgs_mem_info[wg.first];
|
||||
wg_mem_info.weighted_mem_used = int64_t(wg_mem_info.total_mem_used * ratio);
|
||||
wg_mem_info.mem_used_ratio = (double)wg_mem_info.weighted_mem_used / wg_mem_limit;
|
||||
|
||||
wg.second->set_weighted_memory_used(wg_mem_info.total_mem_used, ratio);
|
||||
|
||||
auto spill_low_water_mark = wg.second->spill_threshold_low_water_mark();
|
||||
auto spill_high_water_mark = wg.second->spill_threashold_high_water_mark();
|
||||
wg_mem_info.is_high_wartermark = (wg_mem_info.weighted_mem_used >
|
||||
((double)wg_mem_limit * spill_high_water_mark / 100));
|
||||
wg_mem_info.is_low_wartermark = (wg_mem_info.weighted_mem_used >
|
||||
((double)wg_mem_limit * spill_low_water_mark / 100));
|
||||
|
||||
// calculate query weighted memory limit of task group
|
||||
const auto& wg_queries = all_wg_queries[wg.first];
|
||||
auto wg_query_count = wg_queries.size();
|
||||
auto wg_query_count = wgs_mem_info[wg.first].tracker_snapshots.size();
|
||||
int64_t query_weighted_mem_limit =
|
||||
wg_query_count ? (wg_mem_limit + wg_query_count) / wg_query_count : wg_mem_limit;
|
||||
|
||||
// 3.2 set all workload groups weighted memory ratio and all query weighted memory limit and ratio.
|
||||
wg.second->set_weighted_memory_ratio(ratio);
|
||||
for (const auto& query : wg.second->queries()) {
|
||||
auto query_ctx = query.second.lock();
|
||||
if (!query_ctx) {
|
||||
continue;
|
||||
}
|
||||
query_ctx->set_weighted_memory(query_weighted_mem_limit, ratio);
|
||||
}
|
||||
|
||||
// 3.3 only print debug logs, if workload groups is_high_wartermark or is_low_wartermark.
|
||||
auto weighted_mem_used = int64_t(wgs_mem_info[wg.first].total_mem_used * ratio);
|
||||
bool is_high_wartermark =
|
||||
(weighted_mem_used >
|
||||
((double)wg_mem_limit * wg.second->spill_threashold_high_water_mark() / 100));
|
||||
bool is_low_wartermark =
|
||||
(weighted_mem_used >
|
||||
((double)wg_mem_limit * wg.second->spill_threshold_low_water_mark() / 100));
|
||||
std::string debug_msg;
|
||||
if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) {
|
||||
if (is_high_wartermark || is_low_wartermark) {
|
||||
debug_msg = fmt::format(
|
||||
"\nWorkload Group {}: mem limit: {}, mem used: {}, weighted mem used: {}, used "
|
||||
"ratio: {}, query "
|
||||
"count: {}, query_weighted_mem_limit: {}",
|
||||
wg.second->name(), PrettyPrinter::print(wg_mem_limit, TUnit::BYTES),
|
||||
PrettyPrinter::print(wg_mem_info.total_mem_used, TUnit::BYTES),
|
||||
PrettyPrinter::print(wg_mem_info.weighted_mem_used, TUnit::BYTES),
|
||||
wg_mem_info.mem_used_ratio, wg_query_count,
|
||||
PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES),
|
||||
PrettyPrinter::print(weighted_mem_used, TUnit::BYTES),
|
||||
(double)weighted_mem_used / wg_mem_limit, wg_query_count,
|
||||
PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES));
|
||||
|
||||
debug_msg += "\n Query Memory Summary:";
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
// check whether queries need to revoke memory for task group
|
||||
for (const auto& query : wg_queries) {
|
||||
auto query_ctx = query.second.lock();
|
||||
if (!query_ctx) {
|
||||
continue;
|
||||
}
|
||||
auto query_consumption = query_ctx->query_mem_tracker->consumption();
|
||||
auto query_weighted_consumption = int64_t(query_consumption * ratio);
|
||||
query_ctx->set_weighted_mem(query_weighted_mem_limit, query_weighted_consumption);
|
||||
|
||||
if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) {
|
||||
// check whether queries need to revoke memory for task group
|
||||
for (const auto& query_mem_tracker : wgs_mem_info[wg.first].tracker_snapshots) {
|
||||
debug_msg += fmt::format(
|
||||
"\n MemTracker Label={}, Parent Label={}, Used={}, WeightedUsed={}, "
|
||||
"Peak={}",
|
||||
query_ctx->query_mem_tracker->label(),
|
||||
query_ctx->query_mem_tracker->parent_label(),
|
||||
PrettyPrinter::print(query_consumption, TUnit::BYTES),
|
||||
PrettyPrinter::print(query_weighted_consumption, TUnit::BYTES),
|
||||
PrettyPrinter::print(query_ctx->query_mem_tracker->peak_consumption(),
|
||||
TUnit::BYTES));
|
||||
query_mem_tracker->label(), query_mem_tracker->parent_label(),
|
||||
PrettyPrinter::print(query_mem_tracker->consumption(), TUnit::BYTES),
|
||||
PrettyPrinter::print(int64_t(query_mem_tracker->consumption() * ratio),
|
||||
TUnit::BYTES),
|
||||
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
|
||||
}
|
||||
}
|
||||
if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) {
|
||||
LOG_EVERY_T(INFO, 10) << debug_msg;
|
||||
LOG_EVERY_T(INFO, 1) << debug_msg;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,7 +54,7 @@ public:
|
||||
|
||||
bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
|
||||
|
||||
void refresh_wg_memory_info();
|
||||
void refresh_wg_weighted_memory_ratio();
|
||||
|
||||
private:
|
||||
std::shared_mutex _group_mutex;
|
||||
|
||||
@ -112,7 +112,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
|
||||
(_local_state && _local_state->should_run_serial())) {
|
||||
_max_thread_num = 1;
|
||||
}
|
||||
_query_thread_context = {_query_id, _state->query_mem_tracker()};
|
||||
_query_thread_context = {_query_id, _state->query_mem_tracker(),
|
||||
_state->get_query_ctx()->workload_group()};
|
||||
}
|
||||
|
||||
ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent,
|
||||
|
||||
@ -338,10 +338,8 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta
|
||||
int num_senders, bool is_merging, RuntimeProfile* profile)
|
||||
: HasTaskExecutionCtx(state),
|
||||
_mgr(stream_mgr),
|
||||
#ifdef USE_MEM_TRACKER
|
||||
_query_mem_tracker(state->query_mem_tracker()),
|
||||
_query_id(state->query_id()),
|
||||
#endif
|
||||
_query_thread_context(state->query_id(), state->query_mem_tracker(),
|
||||
state->get_query_ctx()->workload_group()),
|
||||
_fragment_instance_id(fragment_instance_id),
|
||||
_dest_node_id(dest_node_id),
|
||||
_row_desc(row_desc),
|
||||
@ -424,7 +422,7 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
|
||||
|
||||
Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
|
||||
int64_t packet_seq, ::google::protobuf::Closure** done) {
|
||||
SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id);
|
||||
SCOPED_ATTACH_TASK(_query_thread_context);
|
||||
int use_sender_id = _is_merging ? sender_id : 0;
|
||||
return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done);
|
||||
}
|
||||
|
||||
@ -43,6 +43,7 @@
|
||||
#include "common/status.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/task_execution_context.h"
|
||||
#include "runtime/thread_context.h"
|
||||
#include "util/runtime_profile.h"
|
||||
#include "util/stopwatch.hpp"
|
||||
#include "vec/core/block.h"
|
||||
@ -128,10 +129,7 @@ private:
|
||||
// DataStreamMgr instance used to create this recvr. (Not owned)
|
||||
VDataStreamMgr* _mgr = nullptr;
|
||||
|
||||
#ifdef USE_MEM_TRACKER
|
||||
std::shared_ptr<MemTrackerLimiter> _query_mem_tracker = nullptr;
|
||||
TUniqueId _query_id;
|
||||
#endif
|
||||
QueryThreadContext _query_thread_context;
|
||||
|
||||
// Fragment and node id of the destination exchange node this receiver is used by.
|
||||
TUniqueId _fragment_instance_id;
|
||||
|
||||
@ -26,7 +26,18 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
TEST(ThreadMemTrackerMgrTest, ConsumeMemory) {
|
||||
class ThreadMemTrackerMgrTest : public testing::Test {
|
||||
public:
|
||||
ThreadMemTrackerMgrTest() = default;
|
||||
~ThreadMemTrackerMgrTest() override = default;
|
||||
|
||||
void SetUp() override {}
|
||||
|
||||
protected:
|
||||
std::shared_ptr<WorkloadGroup> workload_group;
|
||||
};
|
||||
|
||||
TEST_F(ThreadMemTrackerMgrTest, ConsumeMemory) {
|
||||
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
|
||||
std::shared_ptr<MemTrackerLimiter> t =
|
||||
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ConsumeMemory");
|
||||
@ -34,7 +45,7 @@ TEST(ThreadMemTrackerMgrTest, ConsumeMemory) {
|
||||
int64_t size1 = 4 * 1024;
|
||||
int64_t size2 = 4 * 1024 * 1024;
|
||||
|
||||
thread_context->attach_task(TUniqueId(), t);
|
||||
thread_context->attach_task(TUniqueId(), t, workload_group);
|
||||
thread_context->consume_memory(size1);
|
||||
// size1 < config::mem_tracker_consume_min_size_bytes, not consume mem tracker.
|
||||
EXPECT_EQ(t->consumption(), 0);
|
||||
@ -80,7 +91,7 @@ TEST(ThreadMemTrackerMgrTest, Boundary) {
|
||||
// TODO, Boundary check may not be necessary, add some `IF` maybe increase cost time.
|
||||
}
|
||||
|
||||
TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
|
||||
TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
|
||||
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
|
||||
std::shared_ptr<MemTrackerLimiter> t1 = MemTrackerLimiter::create_shared(
|
||||
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker1");
|
||||
@ -92,7 +103,7 @@ TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
|
||||
int64_t size1 = 4 * 1024;
|
||||
int64_t size2 = 4 * 1024 * 1024;
|
||||
|
||||
thread_context->attach_task(TUniqueId(), t1);
|
||||
thread_context->attach_task(TUniqueId(), t1, workload_group);
|
||||
thread_context->consume_memory(size1);
|
||||
thread_context->consume_memory(size2);
|
||||
EXPECT_EQ(t1->consumption(), size1 + size2);
|
||||
@ -152,7 +163,7 @@ TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
|
||||
EXPECT_EQ(t1->consumption(), 0);
|
||||
}
|
||||
|
||||
TEST(ThreadMemTrackerMgrTest, MultiMemTracker) {
|
||||
TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) {
|
||||
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
|
||||
std::shared_ptr<MemTrackerLimiter> t1 =
|
||||
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-MultiMemTracker1");
|
||||
@ -162,7 +173,7 @@ TEST(ThreadMemTrackerMgrTest, MultiMemTracker) {
|
||||
int64_t size1 = 4 * 1024;
|
||||
int64_t size2 = 4 * 1024 * 1024;
|
||||
|
||||
thread_context->attach_task(TUniqueId(), t1);
|
||||
thread_context->attach_task(TUniqueId(), t1, workload_group);
|
||||
thread_context->consume_memory(size1);
|
||||
thread_context->consume_memory(size2);
|
||||
thread_context->consume_memory(size1);
|
||||
@ -213,7 +224,7 @@ TEST(ThreadMemTrackerMgrTest, MultiMemTracker) {
|
||||
EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
|
||||
}
|
||||
|
||||
TEST(ThreadMemTrackerMgrTest, ScopedCount) {
|
||||
TEST_F(ThreadMemTrackerMgrTest, ScopedCount) {
|
||||
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
|
||||
std::shared_ptr<MemTrackerLimiter> t1 =
|
||||
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ScopedCount");
|
||||
@ -221,7 +232,7 @@ TEST(ThreadMemTrackerMgrTest, ScopedCount) {
|
||||
int64_t size1 = 4 * 1024;
|
||||
int64_t size2 = 4 * 1024 * 1024;
|
||||
|
||||
thread_context->attach_task(TUniqueId(), t1);
|
||||
thread_context->attach_task(TUniqueId(), t1, workload_group);
|
||||
thread_context->thread_mem_tracker_mgr->start_count_scope_mem();
|
||||
thread_context->consume_memory(size1);
|
||||
thread_context->consume_memory(size2);
|
||||
@ -239,7 +250,7 @@ TEST(ThreadMemTrackerMgrTest, ScopedCount) {
|
||||
EXPECT_EQ(scope_mem, size1 + size2 + size1 + size2 + size1);
|
||||
}
|
||||
|
||||
TEST(ThreadMemTrackerMgrTest, ReserveMemory) {
|
||||
TEST_F(ThreadMemTrackerMgrTest, ReserveMemory) {
|
||||
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
|
||||
std::shared_ptr<MemTrackerLimiter> t =
|
||||
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-ReserveMemory");
|
||||
@ -248,7 +259,7 @@ TEST(ThreadMemTrackerMgrTest, ReserveMemory) {
|
||||
int64_t size2 = 4 * 1024 * 1024;
|
||||
int64_t size3 = size2 * 1024;
|
||||
|
||||
thread_context->attach_task(TUniqueId(), t);
|
||||
thread_context->attach_task(TUniqueId(), t, workload_group);
|
||||
thread_context->consume_memory(size1);
|
||||
thread_context->consume_memory(size2);
|
||||
EXPECT_EQ(t->consumption(), size1 + size2);
|
||||
@ -338,7 +349,7 @@ TEST(ThreadMemTrackerMgrTest, ReserveMemory) {
|
||||
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
|
||||
}
|
||||
|
||||
TEST(ThreadMemTrackerMgrTest, NestedReserveMemory) {
|
||||
TEST_F(ThreadMemTrackerMgrTest, NestedReserveMemory) {
|
||||
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
|
||||
std::shared_ptr<MemTrackerLimiter> t = MemTrackerLimiter::create_shared(
|
||||
MemTrackerLimiter::Type::OTHER, "UT-NestedReserveMemory");
|
||||
@ -346,7 +357,7 @@ TEST(ThreadMemTrackerMgrTest, NestedReserveMemory) {
|
||||
int64_t size2 = 4 * 1024 * 1024;
|
||||
int64_t size3 = size2 * 1024;
|
||||
|
||||
thread_context->attach_task(TUniqueId(), t);
|
||||
thread_context->attach_task(TUniqueId(), t, workload_group);
|
||||
thread_context->try_reserve_memory(size3);
|
||||
EXPECT_EQ(t->consumption(), size3);
|
||||
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
|
||||
@ -386,7 +397,7 @@ TEST(ThreadMemTrackerMgrTest, NestedReserveMemory) {
|
||||
EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
|
||||
}
|
||||
|
||||
TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) {
|
||||
TEST_F(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) {
|
||||
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
|
||||
std::shared_ptr<MemTrackerLimiter> t1 = MemTrackerLimiter::create_shared(
|
||||
MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTrackerReserveMemory1");
|
||||
@ -399,7 +410,7 @@ TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) {
|
||||
int64_t size2 = 4 * 1024 * 1024;
|
||||
int64_t size3 = size2 * 1024;
|
||||
|
||||
thread_context->attach_task(TUniqueId(), t1);
|
||||
thread_context->attach_task(TUniqueId(), t1, workload_group);
|
||||
thread_context->try_reserve_memory(size3);
|
||||
thread_context->consume_memory(size2);
|
||||
EXPECT_EQ(t1->consumption(), size3);
|
||||
|
||||
Reference in New Issue
Block a user