[fix] (mem tracker) Fix core dump during transmit_block (#10133)
In some cases, query mem tracker does not exist in BE when transmit block. This will result in a null pointer for get query mem tracker in brpc transmit_block
This commit is contained in:
@ -123,8 +123,9 @@ public:
|
||||
void attach(const TaskType& type, const std::string& task_id,
|
||||
const TUniqueId& fragment_instance_id,
|
||||
const std::shared_ptr<doris::MemTracker>& mem_tracker) {
|
||||
std::string new_tracker_label = mem_tracker == nullptr ? "null" : mem_tracker->label();
|
||||
DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "")
|
||||
<< ",new tracker label: " << mem_tracker->label()
|
||||
<< ",new tracker label: " << new_tracker_label
|
||||
<< ",old tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label();
|
||||
DCHECK(type != TaskType::UNKNOWN);
|
||||
_type = type;
|
||||
|
||||
@ -114,15 +114,21 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_
|
||||
google::protobuf::Closure* done,
|
||||
const Status& extract_st) {
|
||||
std::string query_id;
|
||||
TUniqueId finst_id;
|
||||
std::shared_ptr<MemTracker> query_tracker;
|
||||
if (request->has_query_id()) {
|
||||
query_id = print_id(request->query_id());
|
||||
TUniqueId finst_id;
|
||||
finst_id.__set_hi(request->finst_id().hi());
|
||||
finst_id.__set_lo(request->finst_id().lo());
|
||||
SCOPED_ATTACH_TASK_THREAD(
|
||||
ThreadContext::TaskType::QUERY, query_id, finst_id,
|
||||
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
|
||||
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
|
||||
query_tracker = _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
|
||||
} else {
|
||||
query_id = "default_transmit_data";
|
||||
}
|
||||
if (!query_tracker) {
|
||||
query_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker();
|
||||
}
|
||||
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::QUERY, query_id, finst_id, query_tracker);
|
||||
VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
|
||||
<< " query_id=" << query_id << " node=" << request->node_id();
|
||||
// The response is accessed when done->Run is called in transmit_data(),
|
||||
@ -633,15 +639,21 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl
|
||||
google::protobuf::Closure* done,
|
||||
const Status& extract_st) {
|
||||
std::string query_id;
|
||||
TUniqueId finst_id;
|
||||
std::shared_ptr<MemTracker> query_tracker;
|
||||
if (request->has_query_id()) {
|
||||
query_id = print_id(request->query_id());
|
||||
TUniqueId finst_id;
|
||||
finst_id.__set_hi(request->finst_id().hi());
|
||||
finst_id.__set_lo(request->finst_id().lo());
|
||||
SCOPED_ATTACH_TASK_THREAD(
|
||||
ThreadContext::TaskType::QUERY, query_id, finst_id,
|
||||
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
|
||||
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
|
||||
query_tracker = _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
|
||||
} else {
|
||||
query_id = "default_transmit_block";
|
||||
}
|
||||
if (!query_tracker) {
|
||||
query_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker();
|
||||
}
|
||||
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::QUERY, query_id, finst_id, query_tracker);
|
||||
VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id())
|
||||
<< " query_id=" << query_id << " node=" << request->node_id();
|
||||
// The response is accessed when done->Run is called in transmit_block(),
|
||||
|
||||
Reference in New Issue
Block a user