diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 4f2697c0f5..b9108998f2 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -581,7 +581,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) { _add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); _add_batch_closure->cntl.http_request().set_content_type("application/json"); { - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _brpc_http_stub->tablet_writer_add_batch_by_http(&_add_batch_closure->cntl, NULL, &_add_batch_closure->result, _add_batch_closure); @@ -589,7 +589,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) { } else { _add_batch_closure->cntl.http_request().Clear(); { - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result, _add_batch_closure); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 5b0cfb1840..3e8d66fdbd 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -96,7 +96,7 @@ public: ~ReusableClosure() override { // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. join(); - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); cntl.Reset(); } @@ -124,7 +124,7 @@ public: // plz follow this order: reset() -> set_in_flight() -> send brpc batch void reset() { - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); cntl.Reset(); cid = cntl.call_id(); } diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 5f4910eec6..79496b449b 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -28,7 +28,11 @@ namespace doris { void GetResultBatchCtx::on_failure(const Status& status) { DCHECK(!status.ok()) << "status is ok, errmsg=" << status.get_error_msg(); status.to_protobuf(result->mutable_status()); - done->Run(); + { + // call by result sink + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + done->Run(); + } delete this; } @@ -40,7 +44,10 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics } result->set_packet_seq(packet_seq); result->set_eos(true); - done->Run(); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + done->Run(); + } delete this; } @@ -65,7 +72,10 @@ void GetResultBatchCtx::on_data(const std::unique_ptr& t_resul result->set_eos(eos); } st.to_protobuf(result->mutable_status()); - done->Run(); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + done->Run(); + } delete this; } diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index 26bb917a5d..c9533959f8 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -186,10 +186,10 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) { if (!_pending_closures.empty()) { auto closure_pair = _pending_closures.front(); - // TODO(zxy) There may be a problem here, pay attention later - // When the batch queue reaches the upper limit of memory, calling run to let - // brpc send data packets may cause additional memory to be released - closure_pair.first->Run(); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + closure_pair.first->Run(); + } _pending_closures.pop_front(); closure_pair.second.stop(); @@ -339,8 +339,11 @@ void DataStreamRecvr::SenderQueue::cancel() { { std::lock_guard l(_lock); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); + } } _pending_closures.clear(); } @@ -354,8 +357,11 @@ void DataStreamRecvr::SenderQueue::close() { std::lock_guard l(_lock); _is_cancelled = true; - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); + } } _pending_closures.clear(); } diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 99dc28972d..cb02491701 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -138,6 +138,7 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { _closure->ref(); } else { RETURN_IF_ERROR(_wait_last_brpc()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _closure->cntl.Reset(); } VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id @@ -159,6 +160,7 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_row_batch() && _brpc_request.row_batch().has_tuple_data() && _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); Status st = request_embed_attachment_contain_tuple>( &_brpc_request, _closure); @@ -174,6 +176,7 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { _closure->cntl.http_request().set_content_type("application/json"); _brpc_http_stub->transmit_data_by_http(&_closure->cntl, NULL, &_closure->result, _closure); } else { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _closure->cntl.http_request().Clear(); _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure); } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 506e7b6684..81af4a18f3 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -120,10 +120,12 @@ public: std::shared_ptr process_mem_tracker() { return _process_mem_tracker; } void set_global_mem_tracker(const std::shared_ptr& process_tracker, const std::shared_ptr& orphan_tracker, + const std::shared_ptr& nursery_mem_tracker, const std::shared_ptr& bthread_mem_tracker) { _process_mem_tracker = process_tracker; _orphan_mem_tracker = orphan_tracker; _orphan_mem_tracker_raw = orphan_tracker.get(); + _nursery_mem_tracker = nursery_mem_tracker; _bthread_mem_tracker = bthread_mem_tracker; } std::shared_ptr allocator_cache_mem_tracker() { @@ -131,6 +133,7 @@ public: } std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } + std::shared_ptr nursery_mem_tracker() { return _nursery_mem_tracker; } std::shared_ptr bthread_mem_tracker() { return _bthread_mem_tracker; } std::shared_ptr query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr load_pool_mem_tracker() { return _load_pool_mem_tracker; } @@ -226,7 +229,9 @@ private: // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; MemTrackerLimiter* _orphan_mem_tracker_raw; - // Bthread default mem tracker + // Parent is orphan, Nursery of orphan memory after manually switching thread mem tracker + std::shared_ptr _nursery_mem_tracker; + // Parent is orphan, bthread default mem tracker std::shared_ptr _bthread_mem_tracker; // The ancestor for all querys tracker. std::shared_ptr _query_pool_mem_tracker; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index bf0f66d344..dfdc868e67 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -202,6 +202,7 @@ Status ExecEnv::_init_mem_tracker() { std::make_shared(global_memory_limit_bytes, "Process"); _orphan_mem_tracker = std::make_shared(-1, "Orphan", _process_mem_tracker); _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); + _nursery_mem_tracker = std::make_shared(-1, "Nursery", _orphan_mem_tracker); _bthread_mem_tracker = std::make_shared(-1, "Bthread", _orphan_mem_tracker); thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 4521a24881..b19f9181b0 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -56,13 +56,13 @@ void ThreadMemTrackerMgr::exceeded(const std::string& failed_msg) { if (_cb_func != nullptr) { _cb_func(); } + auto cancel_msg = _limiter_tracker_raw->mem_limit_exceeded( + fmt::format("exec node:<{}>", last_consumer_tracker()), + _limiter_tracker_raw->parent().get(), failed_msg); if (is_attach_query()) { - auto cancel_msg = _limiter_tracker_raw->mem_limit_exceeded( - fmt::format("exec node:<{}>", last_consumer_tracker()), - _limiter_tracker_raw->parent().get(), failed_msg); exceeded_cancel_task(cancel_msg); - _check_limit = false; // Make sure it will only be canceled once } + _check_limit = false; // Make sure it will only be canceled once } } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 547115f398..5289db99e3 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -84,6 +84,17 @@ public: return _consumer_tracker_stack.empty() ? "" : _consumer_tracker_stack.back()->label(); } + void start_count_scope_mem() { + _scope_mem = 0; + _count_scope_mem = true; + } + + int64_t stop_count_scope_mem() { + flush_untracked_mem(); + _count_scope_mem = false; + return _scope_mem; + } + void set_exceed_call_back(ExceedCallBack cb_func) { _cb_func = cb_func; } // Note that, If call the memory allocation operation in TCMalloc new/delete Hook, @@ -135,6 +146,10 @@ private: // Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance. int64_t _untracked_mem = 0; int64_t old_untracked_mem = 0; + + bool _count_scope_mem = false; + int64_t _scope_mem = 0; + std::string failed_msg = std::string(); // _limiter_tracker_stack[0] = orphan_mem_tracker @@ -216,6 +231,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { if (!_init) init(); DCHECK(_limiter_tracker_raw); old_untracked_mem = _untracked_mem; + if (_count_scope_mem) _scope_mem += _untracked_mem; if (CheckLimit) { #ifndef BE_TEST // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker. diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 764bf1471f..3766431f50 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -247,7 +247,6 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { _query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker(); } _query_mem_tracker->enable_reset_zero(); - _scanner_mem_tracker->enable_reset_zero(); _instance_mem_tracker = std::make_shared( -1, "RuntimeState:instance:" + print_id(_fragment_instance_id), _query_mem_tracker, diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index a0689033e7..106426653e 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -47,7 +47,7 @@ public: _use_proto(use_proto) {} virtual ~StreamLoadPipe() { - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->nursery_mem_tracker()); while (!_buf_queue.empty()) _buf_queue.pop_front(); } @@ -119,7 +119,7 @@ public: } Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool* eof) override { - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->nursery_mem_tracker()); *bytes_read = 0; while (*bytes_read < data_size) { std::unique_lock l(_lock); diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 7631abaa7b..1e6be4a0d9 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -29,6 +29,15 @@ ThreadContextPtr::ThreadContextPtr() { init = true; } +ScopeMemCount::ScopeMemCount(int64_t* scope_mem) { + _scope_mem = scope_mem; + thread_context()->_thread_mem_tracker_mgr->start_count_scope_mem(); +} + +ScopeMemCount::~ScopeMemCount() { + *_scope_mem = thread_context()->_thread_mem_tracker_mgr->stop_count_scope_mem(); +} + AttachTask::AttachTask(const std::shared_ptr& mem_tracker, const ThreadContext::TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id) { @@ -55,6 +64,17 @@ AttachTask::~AttachTask() { #endif // NDEBUG } +SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter( + const std::shared_ptr& mem_tracker_limiter) { + DCHECK(mem_tracker_limiter); + thread_context()->_thread_mem_tracker_mgr->attach_limiter_tracker("", TUniqueId(), + mem_tracker_limiter); +} + +SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() { + thread_context()->_thread_mem_tracker_mgr->detach_limiter_tracker(); +} + AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { if (config::memory_verbose_track) { thread_context()->_thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker); diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 042acdd473..20a0b074c1 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -29,16 +29,58 @@ #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/threadlocal.h" +// Used to observe the memory usage of the specified code segment #ifdef USE_MEM_TRACKER -// Add thread mem tracker consumer during query execution. +// Count a code segment memory (memory malloc - memory free) to int64_t +// Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; xxx; } +#define SCOPED_MEM_COUNT(scope_mem) \ + auto VARNAME_LINENUM(scope_mem_count) = doris::ScopeMemCount(scope_mem) +// Count a code segment memory (memory malloc - memory free) to MemTracker. +// Compared to count `scope_mem`, MemTracker is easier to observe from the outside and is thread-safe. +// Usage example: std::unique_ptr tracker = std::make_unique("first_tracker"); +// { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); xxx; xxx; } #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) -// Attach to task when thread starts +#else +#define SCOPED_MEM_COUNT(scope_mem) (void)0 +#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 +#endif + +// Used to observe query/load/compaction/e.g. execution thread memory usage and respond when memory exceeds the limit. +#ifdef USE_MEM_TRACKER +// Attach to query/load/compaction/e.g. when thread starts. +// This will save some info about a working thread in the thread context. +// And count the memory during thread execution (is actually also the code segment that executes the function) +// to specify MemTrackerLimiter, and expect to handle when the memory exceeds the limit, for example cancel query. +// Usage is similar to SCOPED_CONSUME_MEM_TRACKER. #define SCOPED_ATTACH_TASK(arg1, ...) \ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) +// Switch MemTrackerLimiter for count memory during thread execution. +// Usually used after SCOPED_ATTACH_TASK, in order to count the memory of the specified code segment into another +// MemTrackerLimiter instead of the MemTrackerLimiter added by the attach task. +#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) \ + auto VARNAME_LINENUM(switch_mem_tracker) = SwitchThreadMemTrackerLimiter(mem_tracker_limiter) +// If you don't want to cancel query after thread MemTrackerLimiter exceed limit in a code segment, then use it. +// Usually used after SCOPED_ATTACH_TASK. +#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \ + auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit() +// If the thread MemTrackerLimiter exceeds the limit, an error status is returned. +// Usually used after SCOPED_ATTACH_TASK, during query execution. +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ + return doris::thread_context() \ + ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() \ + ->mem_limit_exceeded( \ + state, \ + fmt::format("exec node:<{}>, {}", \ + doris::thread_context() \ + ->_thread_mem_tracker_mgr->last_consumer_tracker(), \ + msg), \ + ##__VA_ARGS__); #else -#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 #define SCOPED_ATTACH_TASK(arg1, ...) (void)0 +#define SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_limiter) (void)0 +#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() (void)0 +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) (void)0 #endif namespace doris { @@ -136,6 +178,13 @@ public: void attach_task(const TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { +#ifndef BE_TEST + // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. + DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && + type != TaskType::UNKNOWN && _task_id == "" && mem_tracker != nullptr) + << ",new tracker label: " << mem_tracker->label() << ",old tracker label: " + << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label(); +#endif _type = type; _task_id = task_id; _fragment_instance_id = fragment_instance_id; @@ -230,6 +279,16 @@ static ThreadContext* thread_context() { } } +class ScopeMemCount { +public: + explicit ScopeMemCount(int64_t* scope_mem); + + ~ScopeMemCount(); + +private: + int64_t* _scope_mem; +}; + class AttachTask { public: explicit AttachTask(const std::shared_ptr& mem_tracker, @@ -242,6 +301,14 @@ public: ~AttachTask(); }; +class SwitchThreadMemTrackerLimiter { +public: + explicit SwitchThreadMemTrackerLimiter( + const std::shared_ptr& mem_tracker_limiter); + + ~SwitchThreadMemTrackerLimiter(); +}; + class AddThreadMemTrackerConsumer { public: explicit AddThreadMemTrackerConsumer(MemTracker* mem_tracker); @@ -264,30 +331,22 @@ private: bool _pre; }; -// The following macros are used to fix the tracking accuracy of caches etc. +// Basic macros for mem tracker, usually do not need to be modified and used. #ifdef USE_MEM_TRACKER -#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \ - auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit() +// For the memory that cannot be counted by mem hook, manually count it into the mem tracker, such as mmap. #define CONSUME_THREAD_MEM_TRACKER(size) \ doris::thread_context()->_thread_mem_tracker_mgr->consume(size) #define RELEASE_THREAD_MEM_TRACKER(size) \ doris::thread_context()->_thread_mem_tracker_mgr->consume(-size) + +// used to fix the tracking accuracy of caches. #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->transfer_to( \ size, tracker) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ tracker->transfer_to( \ size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()) -#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ - return doris::thread_context() \ - ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() \ - ->mem_limit_exceeded( \ - state, \ - fmt::format("exec node:<{}>, {}", \ - doris::thread_context() \ - ->_thread_mem_tracker_mgr->last_consumer_tracker(), \ - msg), \ - ##__VA_ARGS__); + // Mem Hook to consume thread mem tracker #define MEM_MALLOC_HOOK(size) \ do { \ @@ -306,12 +365,10 @@ private: } \ } while (0) #else -#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() (void)0 #define CONSUME_THREAD_MEM_TRACKER(size) (void)0 #define RELEASE_THREAD_MEM_TRACKER(size) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 -#define RETURN_LIMIT_EXCEEDED(state, msg, ...) (void)0 #define MEM_MALLOC_HOOK(size) (void)0 #define MEM_FREE_HOOK(size) (void)0 #endif diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 462abf2e97..356627b43c 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -139,7 +139,6 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_ query_id = "unkown_transmit_data"; transmit_tracker = std::make_shared(-1, "unkown_transmit_data"); } - SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " query_id=" << query_id << " node=" << request->node_id(); // The response is accessed when done->Run is called in transmit_data(), @@ -147,6 +146,7 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_ Status st; st.to_protobuf(response->mutable_status()); if (extract_st.ok()) { + SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); st = _exec_env->stream_mgr()->transmit_data(request, &done); if (!st.ok()) { LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg() @@ -649,7 +649,6 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl query_id = "unkown_transmit_block"; transmit_tracker = std::make_shared(-1, "unkown_transmit_block"); } - SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id()) << " query_id=" << query_id << " node=" << request->node_id(); // The response is accessed when done->Run is called in transmit_block(), @@ -657,6 +656,7 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl Status st; st.to_protobuf(response->mutable_status()); if (extract_st.ok()) { + SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); st = _exec_env->vstream_mgr()->transmit_block(request, &done); if (!st.ok()) { LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg() diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index e80354af25..71e2d36ab3 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -53,10 +53,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) { } // _cur_batch must be replaced with the returned batch. - { - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); - _current_block.reset(); - } + _current_block.reset(); *next_block = nullptr; if (_is_cancelled) { return Status::Cancelled("Cancelled"); @@ -80,7 +77,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) { if (!_pending_closures.empty()) { auto closure_pair = _pending_closures.front(); - closure_pair.first->Run(); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + closure_pair.first->Run(); + } _pending_closures.pop_front(); closure_pair.second.stop(); @@ -222,8 +222,11 @@ void VDataStreamRecvr::SenderQueue::cancel() { { std::lock_guard l(_lock); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); + } } _pending_closures.clear(); } @@ -237,8 +240,11 @@ void VDataStreamRecvr::SenderQueue::close() { std::lock_guard l(_lock); _is_cancelled = true; - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); + } } _pending_closures.clear(); } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 383f2ba51d..e92929d0d9 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -139,6 +139,7 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { _closure->ref(); } else { RETURN_IF_ERROR(_wait_last_brpc()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _closure->cntl.Reset(); } VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id @@ -161,6 +162,7 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_block() && _brpc_request.block().has_column_values() && _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); Status st = request_embed_attachment_contain_block>( &_brpc_request, _closure); @@ -177,6 +179,7 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { _brpc_http_stub->transmit_block_by_http(&_closure->cntl, nullptr, &_closure->result, _closure); } else { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _closure->cntl.http_request().Clear(); _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure); } diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index ffeb6525ec..3f4dfb5e72 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -354,7 +354,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) { _add_block_closure->cntl.http_request().set_content_type("application/json"); { - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _brpc_http_stub->tablet_writer_add_block_by_http(&_add_block_closure->cntl, NULL, &_add_block_closure->result, _add_block_closure); @@ -362,7 +362,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) { } else { _add_block_closure->cntl.http_request().Clear(); { - SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request, &_add_block_closure->result, _add_block_closure); } diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index ffe239cc0f..a1e53f7ed0 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -32,10 +32,12 @@ int main(int argc, char** argv) { std::make_shared(-1, "Process"); std::shared_ptr orphan_mem_tracker = std::make_shared(-1, "Orphan", process_mem_tracker); + std::shared_ptr nursery_mem_tracker = + std::make_shared(-1, "Nursery", orphan_mem_tracker); std::shared_ptr bthread_mem_tracker = std::make_shared(-1, "Bthread", orphan_mem_tracker); doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, - bthread_mem_tracker); + nursery_mem_tracker, bthread_mem_tracker); doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10);