[fix](memory) Fix Allocator cancel pipelinex query #32048

This commit is contained in:
Xinyi Zou
2024-03-11 22:09:08 +08:00
committed by yiguolei
parent ccd21a6ea4
commit 4268634115
6 changed files with 23 additions and 26 deletions

View File

@ -25,12 +25,11 @@
namespace doris {
void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const TUniqueId& fragment_instance_id) {
const std::shared_ptr<MemTrackerLimiter>& mem_tracker, const TUniqueId& query_id) {
DCHECK(mem_tracker);
CHECK(init());
flush_untracked_mem();
_fragment_instance_id = fragment_instance_id;
_query_id = query_id;
_limiter_tracker = mem_tracker;
_limiter_tracker_raw = mem_tracker.get();
_wait_gc = true;
@ -40,15 +39,15 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) {
CHECK(init());
flush_untracked_mem();
_fragment_instance_id = TUniqueId();
_query_id = TUniqueId();
_limiter_tracker = old_mem_tracker;
_limiter_tracker_raw = old_mem_tracker.get();
_wait_gc = false;
}
void ThreadMemTrackerMgr::cancel_instance(const std::string& exceed_msg) {
ExecEnv::GetInstance()->fragment_mgr()->cancel_instance(
_fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
_query_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
}
} // namespace doris

View File

@ -53,7 +53,7 @@ public:
// After attach, the current thread Memory Hook starts to consume/release task mem_tracker
void attach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const TUniqueId& fragment_instance_id);
const TUniqueId& query_id);
void detach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker =
ExecEnv::GetInstance()->orphan_mem_tracker());
@ -82,7 +82,7 @@ public:
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();
bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
bool is_attach_query() { return _query_id != TUniqueId(); }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
CHECK(init());
@ -95,7 +95,7 @@ public:
void disable_wait_gc() { _wait_gc = false; }
[[nodiscard]] bool wait_gc() const { return _wait_gc; }
void cancel_instance(const std::string& exceed_msg);
void cancel_query(const std::string& exceed_msg);
std::string print_debug_string() {
fmt::memory_buffer consumer_tracker_buf;
@ -130,7 +130,7 @@ private:
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
TUniqueId _fragment_instance_id = TUniqueId();
TUniqueId _query_id = TUniqueId();
};
inline bool ThreadMemTrackerMgr::init() {
@ -185,7 +185,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
"malloc or new large memory: {}, {}, this is just a warning, not prevent memory "
"alloc, stacktrace:\n{}",
size,
is_attach_query() ? "in query or load: " + print_id(_fragment_instance_id)
is_attach_query() ? "in query or load: " + print_id(_query_id)
: "not in query or load",
get_stack_trace());
_stop_consume = false;

View File

@ -24,18 +24,17 @@ namespace doris {
class MemTracker;
AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const TUniqueId& task_id, const TUniqueId& fragment_instance_id) {
const TUniqueId& task_id) {
ThreadLocalHandle::create_thread_local_if_not_exits();
signal::set_signal_task_id(task_id);
thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker);
thread_context()->attach_task(task_id, mem_tracker);
}
AttachTask::AttachTask(RuntimeState* runtime_state) {
ThreadLocalHandle::create_thread_local_if_not_exits();
signal::set_signal_task_id(runtime_state->query_id());
signal::set_signal_is_nereids(runtime_state->is_nereids());
thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(),
runtime_state->query_mem_tracker());
thread_context()->attach_task(runtime_state->query_id(), runtime_state->query_mem_tracker());
}
AttachTask::~AttachTask() {

View File

@ -45,8 +45,8 @@
// This will save some info about a working thread in the thread context.
// Looking forward to tracking memory during thread execution into MemTrackerLimiter.
#define SCOPED_ATTACH_TASK(arg1) auto VARNAME_LINENUM(attach_task) = AttachTask(arg1)
#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2, arg3) \
auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2, arg3)
#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) \
auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, arg2)
// Switch MemTrackerLimiter for count memory during thread execution.
// Used after SCOPED_ATTACH_TASK, in order to count the memory into another
@ -60,7 +60,7 @@
auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker)
#else
#define SCOPED_ATTACH_TASK(arg1, ...) (void)0
#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2, arg3) (void)0
#define SCOPED_ATTACH_TASK_WITH_ID(arg1, arg2) (void)0
#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) (void)0
#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0
#endif
@ -134,7 +134,7 @@ public:
~ThreadContext() = default;
void attach_task(const TUniqueId& task_id, const TUniqueId& fragment_instance_id,
void attach_task(const TUniqueId& task_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
#ifndef BE_TEST
// will only attach_task at the beginning of the thread function, there should be no duplicate attach_task.
@ -144,7 +144,7 @@ public:
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, fragment_instance_id);
thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, _task_id);
}
void detach_task() {
@ -292,8 +292,7 @@ private:
class AttachTask {
public:
explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const TUniqueId& task_id = TUniqueId(),
const TUniqueId& fragment_instance_id = TUniqueId());
const TUniqueId& task_id = TUniqueId());
explicit AttachTask(RuntimeState* runtime_state);

View File

@ -110,7 +110,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
"Query:{} canceled asyn, after waiting for memory {}ms, {}.",
print_id(doris::thread_context()->task_id()), wait_milliseconds,
err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_query(err_msg);
} else {
LOG(INFO) << fmt::format(
"Query:{} throw exception, after waiting for memory {}ms, {}.",
@ -148,7 +148,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz
if (!doris::enable_thread_catch_bad_alloc) {
LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.",
print_id(doris::thread_context()->task_id()), err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_query(err_msg);
} else {
LOG(INFO) << fmt::format("query/load:{} throw exception, {}.",
print_id(doris::thread_context()->task_id()), err_msg);

View File

@ -425,7 +425,7 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done) {
SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, _fragment_instance_id);
SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id);
int use_sender_id = _is_merging ? sender_id : 0;
return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done);
}