Fixed the deadlock issue in DM and P2P orthogonal scenarios.

This commit is contained in:
obdev
2024-04-10 11:06:24 +00:00
committed by ob-robot
parent f0ee52cb5b
commit 078371afbd
4 changed files with 35 additions and 37 deletions

View File

@ -395,43 +395,15 @@ int ObP2PDatahubManager::P2PMsgSetCall::operator() (const common::hash::HashMapP
ObP2PDatahubMsgBase *> &entry) ObP2PDatahubMsgBase *> &entry)
{ {
// entry.second == &dh_msg_ // entry.second == &dh_msg_
// 1. register into dm // once the msg is set to p2p datahub map, other threads will access it, so
// 2. do dh_msg_.regenerate() // the regenerate process must be done in the setting process.
UNUSED(entry); UNUSED(entry);
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
#ifdef ERRSIM if (OB_FAIL(dh_msg_.regenerate())) {
if (OB_FAIL(OB_E(EventTable::EN_PX_P2P_MSG_REG_DM_FAILED) OB_SUCCESS)) { LOG_WARN("failed to do regen_call", K(dh_key_));
LOG_WARN("p2p msg reg dm failed by design", K(ret)); } else if (FALSE_IT(dh_msg_.check_finish_receive())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
ret_ = ret;
return ret;
}
#endif
if (OB_FAIL(ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(
dh_msg_.get_register_dm_info(), dh_key_, dh_msg_.get_dm_cb_node_seq_id()))) {
LOG_WARN("[DM] failed to register check item to dm", K(dh_msg_.get_register_dm_info()),
K(dh_key_), K(dh_msg_.get_dm_cb_node_seq_id()));
} else {
succ_reg_dm_ = true;
LOG_TRACE("[DM] rf register check item to dm", K(dh_msg_.get_register_dm_info()),
K(dh_key_), K(dh_msg_.get_dm_cb_node_seq_id()));
if (OB_FAIL(dh_msg_.regenerate())) {
LOG_WARN("failed to do regen_call", K(dh_key_));
} else if (FALSE_IT(dh_msg_.check_finish_receive())) {
}
}
if (OB_FAIL(ret)) {
(void) revert();
} }
ret_ = ret; ret_ = ret;
return ret; return ret;
} }
void ObP2PDatahubManager::P2PMsgSetCall::revert()
{
if (succ_reg_dm_) {
(void) ObDetectManagerUtils::p2p_datahub_unregister_check_item_from_dm(
dh_msg_.get_register_dm_info().detectable_id_, dh_msg_.get_dm_cb_node_seq_id());
}
}

View File

@ -60,7 +60,6 @@ public:
: dh_key_(dh_key), dh_msg_(db_msg), ret_(OB_SUCCESS), succ_reg_dm_(false) {}; : dh_key_(dh_key), dh_msg_(db_msg), ret_(OB_SUCCESS), succ_reg_dm_(false) {};
~P2PMsgSetCall() = default; ~P2PMsgSetCall() = default;
int operator() (const common::hash::HashMapPair<ObP2PDhKey, ObP2PDatahubMsgBase *> &entry); int operator() (const common::hash::HashMapPair<ObP2PDhKey, ObP2PDatahubMsgBase *> &entry);
void revert();
ObP2PDhKey &dh_key_; ObP2PDhKey &dh_key_;
ObP2PDatahubMsgBase &dh_msg_; ObP2PDatahubMsgBase &dh_msg_;
int ret_; int ret_;

View File

@ -138,6 +138,21 @@ int ObP2PDatahubMsgBase::process_msg_internal(bool &need_free)
need_free = true; need_free = true;
} else { } else {
need_merge = false; // set success, not need to merge need_merge = false; // set success, not need to merge
int reg_dm_ret = OB_SUCCESS;
#ifdef ERRSIM
int reg_dm_ret = OB_E(EventTable::EN_PX_P2P_MSG_REG_DM_FAILED) OB_SUCCESS;
if (OB_SUCCESS != reg_dm_ret) {
LOG_WARN("p2p msg reg dm failed by design", K(ret));
reg_dm_ret = OB_ALLOCATE_MEMORY_FAILED;
}
#endif
if (OB_SUCCESS == reg_dm_ret) {
reg_dm_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(
register_dm_info_, dh_key, dm_cb_node_seq_id_);
}
if (OB_SUCCESS != reg_dm_ret) {
LOG_WARN("[DM] failed to register check item to dm", K(reg_dm_ret));
}
} }
// merge filter // merge filter

View File

@ -243,6 +243,21 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free)
need_free = true; need_free = true;
} else { } else {
need_merge = false; // set success, not need to merge need_merge = false; // set success, not need to merge
int reg_dm_ret = OB_SUCCESS;
#ifdef ERRSIM
int reg_dm_ret = OB_E(EventTable::EN_PX_P2P_MSG_REG_DM_FAILED) OB_SUCCESS;
if (OB_SUCCESS != reg_dm_ret) {
LOG_WARN("p2p msg reg dm failed by design", K(ret));
reg_dm_ret = OB_ALLOCATE_MEMORY_FAILED;
}
#endif
if (OB_SUCCESS == reg_dm_ret) {
reg_dm_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(
register_dm_info_, dh_key, dm_cb_node_seq_id_);
}
if (OB_SUCCESS != reg_dm_ret) {
LOG_WARN("[DM] failed to register check item to dm", K(reg_dm_ret));
}
} }
// merge piece bloom filter // merge piece bloom filter
@ -262,9 +277,6 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free)
rf_msg_in_map->dec_ref_count(); rf_msg_in_map->dec_ref_count();
} }
} }
if (OB_SUCC(ret) && !need_merge) {
(void)check_finish_receive();
}
if (need_free) { if (need_free) {
// msg not in map, dec ref count // msg not in map, dec ref count
guard.dec_msg_ref_count(); guard.dec_msg_ref_count();