[bugfix] fix tcmalooc hook cancel deadlock (#10514)
This commit is contained in:
@ -203,6 +203,9 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) {
|
||||
void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_number,
|
||||
int64_t packet_seq,
|
||||
::google::protobuf::Closure** done) {
|
||||
// Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook,
|
||||
// limit memory via DataStreamRecvr::exceeds_limit.
|
||||
STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
|
||||
lock_guard<mutex> l(_lock);
|
||||
if (_is_cancelled) {
|
||||
return;
|
||||
@ -272,6 +275,9 @@ void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_n
|
||||
}
|
||||
|
||||
void DataStreamRecvr::SenderQueue::add_batch(RowBatch* batch, bool use_move) {
|
||||
// Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook,
|
||||
// limit memory via DataStreamRecvr::exceeds_limit.
|
||||
STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
|
||||
unique_lock<mutex> l(_lock);
|
||||
if (_is_cancelled) {
|
||||
return;
|
||||
|
||||
@ -182,14 +182,12 @@ Status ExecEnv::_init_mem_tracker() {
|
||||
global_memory_limit_bytes = MemInfo::physical_mem();
|
||||
}
|
||||
MemTracker::get_process_tracker()->set_limit(global_memory_limit_bytes);
|
||||
_query_pool_mem_tracker = MemTracker::create_tracker(global_memory_limit_bytes, "QueryPool",
|
||||
MemTracker::get_process_tracker(),
|
||||
MemTrackerLevel::OVERVIEW);
|
||||
_query_pool_mem_tracker = MemTracker::create_tracker(
|
||||
-1, "QueryPool", MemTracker::get_process_tracker(), MemTrackerLevel::OVERVIEW);
|
||||
REGISTER_HOOK_METRIC(query_mem_consumption,
|
||||
[this]() { return _query_pool_mem_tracker->consumption(); });
|
||||
_load_pool_mem_tracker = MemTracker::create_tracker(global_memory_limit_bytes, "LoadPool",
|
||||
MemTracker::get_process_tracker(),
|
||||
MemTrackerLevel::OVERVIEW);
|
||||
_load_pool_mem_tracker = MemTracker::create_tracker(
|
||||
-1, "LoadPool", MemTracker::get_process_tracker(), MemTrackerLevel::OVERVIEW);
|
||||
REGISTER_HOOK_METRIC(load_mem_consumption,
|
||||
[this]() { return _load_pool_mem_tracker->consumption(); });
|
||||
LOG(INFO) << "Using global memory limit: "
|
||||
|
||||
@ -90,6 +90,9 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
|
||||
void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
|
||||
int64_t packet_seq,
|
||||
::google::protobuf::Closure** done) {
|
||||
// Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook,
|
||||
// limit memory via DataStreamRecvr::exceeds_limit.
|
||||
STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
|
||||
std::lock_guard<std::mutex> l(_lock);
|
||||
if (_is_cancelled) {
|
||||
return;
|
||||
@ -140,6 +143,9 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
|
||||
}
|
||||
|
||||
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
|
||||
// Avoid deadlock when calling SenderQueue::cancel() in tcmalloc hook,
|
||||
// limit memory via DataStreamRecvr::exceeds_limit.
|
||||
STOP_CHECK_LIMIT_THREAD_LOCAL_MEM_TRACKER();
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
if (_is_cancelled) {
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user