diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.cpp b/src/sql/dtl/ob_dtl_interm_result_manager.cpp index c0f5ca649..6bd58838c 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.cpp +++ b/src/sql/dtl/ob_dtl_interm_result_manager.cpp @@ -61,6 +61,19 @@ void ObAtomicGetIntermResultInfoCall::operator() (common::hash::HashMapPairis_read_), K(entry.first)); } +void ObAtomicGetIntermMemProfileCall::operator() (common::hash::HashMapPair &entry) +{ + int &ret = ret_; + if (OB_NOT_NULL(entry.second)) { + mem_profile_info_ = entry.second; + ATOMIC_INC(&(mem_profile_info_->ref_count_)); + } else { + ret_ = OB_ERR_UNEXPECTED; + LOG_WARN("mem_profile_info is null", K(ret_), K(entry.first)); + } +} + void ObAtomicAppendBlockCall::operator() (common::hash::HashMapPair &entry) { @@ -347,7 +360,9 @@ int ObDTLIntermResultManager::erase_interm_result_info(const ObDTLIntermResultKe if (need_unregister_check_item_from_dm) { ObDetectManagerUtils::intern_result_unregister_check_item_from_dm(result_info); } - dec_interm_result_ref_count(result_info); + if (OB_FAIL(dec_interm_result_ref_count(result_info))) { + LOG_WARN("Fail to dec interm_result ref_count", K(ret)); + } } return ret; } @@ -766,18 +781,22 @@ int ObDTLIntermResultManager::access_mem_profile(const ObDTLMemProfileKey &mem_p ObDtlLinkedBuffer &buffer) { int ret = OB_SUCCESS; - if (OB_FAIL(mem_profile_map_.get_refactored(mem_profile_key, mem_profile_info))) { - if (ret == OB_HASH_NOT_EXIST) { - ret = OB_SUCCESS; - if (OB_FAIL(init_mem_profile(mem_profile_key, mem_profile_info, buffer))) { - LOG_WARN("fail to init mem_profile", K(ret), K(mem_profile_key)); - } - } else { - LOG_WARN("fail to get mem_profile", K(ret), K(mem_profile_key)); + ObAtomicGetIntermMemProfileCall call; + if (OB_FAIL(mem_profile_map_.atomic_refactored(mem_profile_key, call))) { + } else { + ret = call.ret_; + } + if (ret == OB_SUCCESS) { + mem_profile_info = call.mem_profile_info_; + } else if (ret == OB_HASH_NOT_EXIST) { + ret = OB_SUCCESS; + if (OB_FAIL(init_mem_profile(mem_profile_key, mem_profile_info, buffer))) { + LOG_WARN("fail to init mem_profile", K(ret), K(mem_profile_key)); } + } else { + LOG_WARN("fail to get mem_profile", K(ret), K(mem_profile_key)); } if (OB_SUCC(ret) && OB_NOT_NULL(mem_profile_info)) { - inc_mem_profile_ref_count(mem_profile_info); DTL_IR_STORE_DO(interm_res_info, set_allocator, mem_profile_info->allocator_); DTL_IR_STORE_DO(interm_res_info, set_callback, mem_profile_info); DTL_IR_STORE_DO(interm_res_info, set_dir_id, mem_profile_info->sql_mem_processor_.get_dir_id()); @@ -793,21 +812,26 @@ int ObDTLIntermResultManager::init_mem_profile(const ObDTLMemProfileKey &key, ObDtlLinkedBuffer &buffer) { int ret = OB_SUCCESS; - if (buffer.seq_no() > 1) { // seq_no begin from 1 + if (OB_UNLIKELY(buffer.seq_no() > 1)) { // seq_no begin from 1 ret = OB_INVALID_ARGUMENT; LOG_WARN("The buffer is not the first packet, \ but the corresponding mem_profile does not exist.", K(buffer.seq_no())); - } - lib::ObMutexGuard guard(mem_profile_mutex_); - // Possible scenario: Multiple interm results accessing the same mem_profile. - // Through lock control, when the first one is initialized, the second one tries to initialize again. - // At this point, by calling get_refactored, - // it detects that the other end has already initialized the current mem_profile, - // and it directly exits. - if (OB_FAIL(ret)) { - } else if (OB_FAIL(mem_profile_map_.get_refactored(key, info))) { - if (ret == OB_HASH_NOT_EXIST) { + } else { + lib::ObMutexGuard guard(mem_profile_mutex_); + // Possible scenario: Multiple interm results accessing the same mem_profile. + // Through lock control, when the first one is initialized, the second one tries to initialize again. + // At this point, by calling get_refactored, + // it detects that the other end has already initialized the current mem_profile, + // and it directly exits. + ObAtomicGetIntermMemProfileCall call; + if (OB_FAIL(mem_profile_map_.atomic_refactored(key, call))) { + } else { + ret = call.ret_; + } + if (ret == OB_SUCCESS) { + info = call.mem_profile_info_; + } else if (ret == OB_HASH_NOT_EXIST) { ret = OB_SUCCESS; void *info_buf = nullptr; ObMemAttr mem_info_attr(MTL_ID(), "IRMMemInfo", common::ObCtxIds::EXECUTE_CTX_ID); @@ -839,6 +863,8 @@ int ObDTLIntermResultManager::init_mem_profile(const ObDTLMemProfileKey &key, LOG_WARN("failed to init sql memory manager processor", K(ret)); } else if (OB_FAIL(mem_profile_map_.set_refactored(key, info))) { LOG_WARN("fail to set row store in result manager", K(ret)); + } else { + inc_mem_profile_ref_count(info); } if (OB_FAIL(ret)) { free_mem_profile(info); @@ -851,11 +877,19 @@ int ObDTLIntermResultManager::init_mem_profile(const ObDTLMemProfileKey &key, return ret; } +bool MemProfileEraseIfRef0::operator() (common::hash::HashMapPair &entry) +{ + return ATOMIC_LOAD(&(entry.second->ref_count_)) <= 0; +} + int ObDTLIntermResultManager::destroy_mem_profile(const ObDTLMemProfileKey &key) { int ret = OB_SUCCESS; ObDTLMemProfileInfo *info = nullptr; - if (OB_FAIL(mem_profile_map_.erase_refactored(key, &info))) { + MemProfileEraseIfRef0 functor; + bool is_erased = false; + if (OB_FAIL(mem_profile_map_.erase_if(key, functor, is_erased, &info))) { // The reason for the nonexistence of the corresponding mem_profile is as follows: // 1. The ref_cnt of the current thread is decremented (which becomes 1). // 2. The ref_cnt of another thread is decremented, and it executes destroy. @@ -866,7 +900,7 @@ int ObDTLIntermResultManager::destroy_mem_profile(const ObDTLMemProfileKey &key) } else { LOG_WARN("erase mem_profile failed", K(ret), K(key)); } - } else { + } else if (is_erased) { free_mem_profile(info); } return ret; @@ -900,7 +934,6 @@ int ObDTLIntermResultManager::dec_mem_profile_ref_count(const ObDTLMemProfileKey void ObDTLIntermResultManager::free_mem_profile(ObDTLMemProfileInfo *&info) { - int ret = OB_SUCCESS; if (OB_NOT_NULL(info)) { info->sql_mem_processor_.unregister_profile(); info->allocator_.reset(); diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.h b/src/sql/dtl/ob_dtl_interm_result_manager.h index 859813a96..9d30f4c8e 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.h +++ b/src/sql/dtl/ob_dtl_interm_result_manager.h @@ -98,7 +98,7 @@ public: ~ObDTLMemProfileInfo() {} // The local channel and the rpc channel may modify the interm results concurrently, - // and these interme results may be linked to the same profile. + // and these interm results may be linked to the same profile. // Therefore, access to the profile needs to be protected by locks // to prevent concurrent modification issues. void alloc(int64_t size) @@ -304,6 +304,28 @@ public: int ret_; }; +class ObAtomicGetIntermMemProfileCall +{ +public: + explicit ObAtomicGetIntermMemProfileCall() : + ret_(OB_SUCCESS), mem_profile_info_(nullptr) {} + ~ObAtomicGetIntermMemProfileCall() = default; + void operator() (common::hash::HashMapPair &entry); +public: + int ret_; + ObDTLMemProfileInfo *mem_profile_info_; +}; + +class MemProfileEraseIfRef0 +{ +public: + MemProfileEraseIfRef0() {}; + ~MemProfileEraseIfRef0() {}; + bool operator() (common::hash::HashMapPair &entry); +}; + class ObAtomicAppendBlockCall { public: