[fix] (mem tracker) Fix runtime instance tracker null pointer (#11272)

This commit is contained in:
Xinyi Zou
2022-07-28 14:58:13 +08:00
committed by GitHub
parent 75451ab0ed
commit 19b34c09b1
11 changed files with 29 additions and 27 deletions

View File

@ -72,9 +72,9 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
DCHECK(profile != nullptr);
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
<< ", node=" << dest_node_id;
shared_ptr<DataStreamRecvr> recvr(
new DataStreamRecvr(this, row_desc, fragment_instance_id, dest_node_id, num_senders,
is_merging, buffer_size, profile, sub_plan_query_statistics_recvr));
shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(
this, row_desc, state->query_mem_tracker(), fragment_instance_id, dest_node_id,
num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
lock_guard<mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));

View File

@ -447,8 +447,9 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) {
DataStreamRecvr::DataStreamRecvr(
DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
: _mgr(stream_mgr),
_fragment_instance_id(fragment_instance_id),
@ -459,7 +460,8 @@ DataStreamRecvr::DataStreamRecvr(
_num_buffered_bytes(0),
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
_mem_tracker = std::make_unique<MemTracker>("DataStreamRecvr", nullptr, _profile);
_mem_tracker = std::make_unique<MemTracker>(
"DataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile);
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;

View File

@ -116,8 +116,9 @@ private:
class SenderQueue;
DataStreamRecvr(DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, bool is_merging,
int total_buffer_limit, RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr);
// If receive queue is full, done is enqueue pending, and return with *done is nullptr

View File

@ -111,6 +111,7 @@ public:
// Note that 'f' must be valid for the lifetime of this tracker limiter.
void add_gc_function(GcFunction f) { _gc_functions.push_back(f); }
// TODO Should be managed in a separate process_mem_mgr, not in MemTracker
// If consumption is higher than max_consumption, attempts to free memory by calling
// any added GC functions. Returns true if max_consumption is still exceeded. Takes gc_lock.
// Note: If the cache of segment/chunk is released due to insufficient query memory at a certain moment,

View File

@ -34,7 +34,7 @@ MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std:
bool new_emplace = _task_mem_trackers.lazy_emplace_l(
task_id, [&](std::shared_ptr<MemTrackerLimiter>) {},
[&](const auto& ctor) {
ctor(task_id, std::make_unique<MemTrackerLimiter>(mem_limit, label, parent));
ctor(task_id, std::make_shared<MemTrackerLimiter>(mem_limit, label, parent));
});
if (new_emplace) {
LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id

View File

@ -37,14 +37,8 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
}
void ThreadMemTrackerMgr::detach_limiter_tracker() {
#ifndef BE_TEST
// Unexpectedly, the runtime state is destructed before the end of the query sub-thread,
// (_hash_table_build_thread has appeared) which is not a graceful exit.
// consider replacing CHECK with a conditional statement and checking for runtime state survival.
CHECK(_task_id == "" ||
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(_task_id));
#endif
flush_untracked_mem<false>();
// Do not flush untracked mem, instance executor thread may exit after instance fragment executor thread,
// `instance_mem_tracker` will be null pointer, which is not a graceful exit.
_task_id = "";
_fragment_instance_id = TUniqueId();
_exceed_cb.cancel_msg = "";

View File

@ -210,8 +210,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// If you do not want this check, set_check_attach=true
// TODO(zxy) The current p0 test cannot guarantee that all threads are checked,
// so disable it and try to open it when memory tracking is not on time.
DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
_limiter_tracker->label() != "Process");
// DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
// _limiter_tracker->label() != "Process");
#endif
Status st = _limiter_tracker->try_consume(_untracked_mem);
if (!st) {

View File

@ -265,6 +265,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
}
Status RuntimeState::init_instance_mem_tracker() {
_query_mem_tracker = nullptr;
_instance_mem_tracker = std::make_unique<MemTrackerLimiter>(-1, "RuntimeState:instance");
return Status::OK();
}

View File

@ -53,8 +53,8 @@ std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
<< ", node=" << dest_node_id;
std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
this, row_desc, fragment_instance_id, dest_node_id, num_senders, is_merging,
buffer_size, profile, sub_plan_query_statistics_recvr));
this, row_desc, state->query_mem_tracker(), fragment_instance_id, dest_node_id,
num_senders, is_merging, buffer_size, profile, sub_plan_query_statistics_recvr));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
std::lock_guard<std::mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));

View File

@ -249,8 +249,9 @@ void VDataStreamRecvr::SenderQueue::close() {
VDataStreamRecvr::VDataStreamRecvr(
VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, bool is_merging, int total_buffer_limit,
RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
: _mgr(stream_mgr),
_fragment_instance_id(fragment_instance_id),
@ -262,8 +263,10 @@ VDataStreamRecvr::VDataStreamRecvr(
_num_buffered_bytes(0),
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
// DataStreamRecvr may be destructed after the instance execution thread ends, `instance_mem_tracker`
// will be a null pointer, and remove_child fails when _mem_tracker is destructed.
_mem_tracker = std::make_unique<MemTracker>(
"VDataStreamRecvr:" + print_id(_fragment_instance_id), nullptr, _profile);
"VDataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// Create one queue per sender if is_merging is true.

View File

@ -51,9 +51,9 @@ class VExprContext;
class VDataStreamRecvr {
public:
VDataStreamRecvr(VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders, bool is_merging, int total_buffer_limit,
RuntimeProfile* profile,
MemTrackerLimiter* query_mem_tracker, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int num_senders, bool is_merging,
int total_buffer_limit, RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr);
~VDataStreamRecvr();