Resolve the issue of concurrent access to mem_profile_map in the interm result manager.

This commit is contained in:
qingsuijiu 2024-10-30 05:44:30 +00:00 committed by ob-robot
parent 065eca4a09
commit 89ec337ad6
2 changed files with 80 additions and 25 deletions

View File

@ -61,6 +61,19 @@ void ObAtomicGetIntermResultInfoCall::operator() (common::hash::HashMapPair<ObDT
LOG_DEBUG("debug start read", K(entry.second->is_read_), K(entry.first));
}
void ObAtomicGetIntermMemProfileCall::operator() (common::hash::HashMapPair<ObDTLMemProfileKey,
ObDTLMemProfileInfo *> &entry)
{
int &ret = ret_;
if (OB_NOT_NULL(entry.second)) {
mem_profile_info_ = entry.second;
ATOMIC_INC(&(mem_profile_info_->ref_count_));
} else {
ret_ = OB_ERR_UNEXPECTED;
LOG_WARN("mem_profile_info is null", K(ret_), K(entry.first));
}
}
void ObAtomicAppendBlockCall::operator() (common::hash::HashMapPair<ObDTLIntermResultKey,
ObDTLIntermResultInfo *> &entry)
{
@ -347,7 +360,9 @@ int ObDTLIntermResultManager::erase_interm_result_info(const ObDTLIntermResultKe
if (need_unregister_check_item_from_dm) {
ObDetectManagerUtils::intern_result_unregister_check_item_from_dm(result_info);
}
dec_interm_result_ref_count(result_info);
if (OB_FAIL(dec_interm_result_ref_count(result_info))) {
LOG_WARN("Fail to dec interm_result ref_count", K(ret));
}
}
return ret;
}
@ -766,18 +781,22 @@ int ObDTLIntermResultManager::access_mem_profile(const ObDTLMemProfileKey &mem_p
ObDtlLinkedBuffer &buffer)
{
int ret = OB_SUCCESS;
if (OB_FAIL(mem_profile_map_.get_refactored(mem_profile_key, mem_profile_info))) {
if (ret == OB_HASH_NOT_EXIST) {
ret = OB_SUCCESS;
if (OB_FAIL(init_mem_profile(mem_profile_key, mem_profile_info, buffer))) {
LOG_WARN("fail to init mem_profile", K(ret), K(mem_profile_key));
}
} else {
LOG_WARN("fail to get mem_profile", K(ret), K(mem_profile_key));
ObAtomicGetIntermMemProfileCall call;
if (OB_FAIL(mem_profile_map_.atomic_refactored(mem_profile_key, call))) {
} else {
ret = call.ret_;
}
if (ret == OB_SUCCESS) {
mem_profile_info = call.mem_profile_info_;
} else if (ret == OB_HASH_NOT_EXIST) {
ret = OB_SUCCESS;
if (OB_FAIL(init_mem_profile(mem_profile_key, mem_profile_info, buffer))) {
LOG_WARN("fail to init mem_profile", K(ret), K(mem_profile_key));
}
} else {
LOG_WARN("fail to get mem_profile", K(ret), K(mem_profile_key));
}
if (OB_SUCC(ret) && OB_NOT_NULL(mem_profile_info)) {
inc_mem_profile_ref_count(mem_profile_info);
DTL_IR_STORE_DO(interm_res_info, set_allocator, mem_profile_info->allocator_);
DTL_IR_STORE_DO(interm_res_info, set_callback, mem_profile_info);
DTL_IR_STORE_DO(interm_res_info, set_dir_id, mem_profile_info->sql_mem_processor_.get_dir_id());
@ -793,21 +812,26 @@ int ObDTLIntermResultManager::init_mem_profile(const ObDTLMemProfileKey &key,
ObDtlLinkedBuffer &buffer)
{
int ret = OB_SUCCESS;
if (buffer.seq_no() > 1) { // seq_no begin from 1
if (OB_UNLIKELY(buffer.seq_no() > 1)) { // seq_no begin from 1
ret = OB_INVALID_ARGUMENT;
LOG_WARN("The buffer is not the first packet, \
but the corresponding mem_profile does not exist.",
K(buffer.seq_no()));
}
lib::ObMutexGuard guard(mem_profile_mutex_);
// Possible scenario: Multiple interm results accessing the same mem_profile.
// Through lock control, when the first one is initialized, the second one tries to initialize again.
// At this point, by calling get_refactored,
// it detects that the other end has already initialized the current mem_profile,
// and it directly exits.
if (OB_FAIL(ret)) {
} else if (OB_FAIL(mem_profile_map_.get_refactored(key, info))) {
if (ret == OB_HASH_NOT_EXIST) {
} else {
lib::ObMutexGuard guard(mem_profile_mutex_);
// Possible scenario: Multiple interm results accessing the same mem_profile.
// Through lock control, when the first one is initialized, the second one tries to initialize again.
// At this point, by calling get_refactored,
// it detects that the other end has already initialized the current mem_profile,
// and it directly exits.
ObAtomicGetIntermMemProfileCall call;
if (OB_FAIL(mem_profile_map_.atomic_refactored(key, call))) {
} else {
ret = call.ret_;
}
if (ret == OB_SUCCESS) {
info = call.mem_profile_info_;
} else if (ret == OB_HASH_NOT_EXIST) {
ret = OB_SUCCESS;
void *info_buf = nullptr;
ObMemAttr mem_info_attr(MTL_ID(), "IRMMemInfo", common::ObCtxIds::EXECUTE_CTX_ID);
@ -839,6 +863,8 @@ int ObDTLIntermResultManager::init_mem_profile(const ObDTLMemProfileKey &key,
LOG_WARN("failed to init sql memory manager processor", K(ret));
} else if (OB_FAIL(mem_profile_map_.set_refactored(key, info))) {
LOG_WARN("fail to set row store in result manager", K(ret));
} else {
inc_mem_profile_ref_count(info);
}
if (OB_FAIL(ret)) {
free_mem_profile(info);
@ -851,11 +877,19 @@ int ObDTLIntermResultManager::init_mem_profile(const ObDTLMemProfileKey &key,
return ret;
}
bool MemProfileEraseIfRef0::operator() (common::hash::HashMapPair<ObDTLMemProfileKey,
ObDTLMemProfileInfo *> &entry)
{
return ATOMIC_LOAD(&(entry.second->ref_count_)) <= 0;
}
int ObDTLIntermResultManager::destroy_mem_profile(const ObDTLMemProfileKey &key)
{
int ret = OB_SUCCESS;
ObDTLMemProfileInfo *info = nullptr;
if (OB_FAIL(mem_profile_map_.erase_refactored(key, &info))) {
MemProfileEraseIfRef0 functor;
bool is_erased = false;
if (OB_FAIL(mem_profile_map_.erase_if(key, functor, is_erased, &info))) {
// The reason for the nonexistence of the corresponding mem_profile is as follows:
// 1. The ref_cnt of the current thread is decremented (which becomes 1).
// 2. The ref_cnt of another thread is decremented, and it executes destroy.
@ -866,7 +900,7 @@ int ObDTLIntermResultManager::destroy_mem_profile(const ObDTLMemProfileKey &key)
} else {
LOG_WARN("erase mem_profile failed", K(ret), K(key));
}
} else {
} else if (is_erased) {
free_mem_profile(info);
}
return ret;
@ -900,7 +934,6 @@ int ObDTLIntermResultManager::dec_mem_profile_ref_count(const ObDTLMemProfileKey
void ObDTLIntermResultManager::free_mem_profile(ObDTLMemProfileInfo *&info)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(info)) {
info->sql_mem_processor_.unregister_profile();
info->allocator_.reset();

View File

@ -98,7 +98,7 @@ public:
~ObDTLMemProfileInfo() {}
// The local channel and the rpc channel may modify the interm results concurrently,
// and these interme results may be linked to the same profile.
// and these interm results may be linked to the same profile.
// Therefore, access to the profile needs to be protected by locks
// to prevent concurrent modification issues.
void alloc(int64_t size)
@ -304,6 +304,28 @@ public:
int ret_;
};
class ObAtomicGetIntermMemProfileCall
{
public:
explicit ObAtomicGetIntermMemProfileCall() :
ret_(OB_SUCCESS), mem_profile_info_(nullptr) {}
~ObAtomicGetIntermMemProfileCall() = default;
void operator() (common::hash::HashMapPair<ObDTLMemProfileKey,
ObDTLMemProfileInfo *> &entry);
public:
int ret_;
ObDTLMemProfileInfo *mem_profile_info_;
};
class MemProfileEraseIfRef0
{
public:
MemProfileEraseIfRef0() {};
~MemProfileEraseIfRef0() {};
bool operator() (common::hash::HashMapPair<ObDTLMemProfileKey,
ObDTLMemProfileInfo *> &entry);
};
class ObAtomicAppendBlockCall
{
public: