From 078371afbd7f611c69a7008fb65cc21a77141504 Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 10 Apr 2024 11:06:24 +0000 Subject: [PATCH] Fixed the deadlock issue in DM and P2P orthogonal scenarios. --- .../engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp | 38 +++---------------- src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h | 1 - .../engine/px/p2p_datahub/ob_p2p_dh_msg.cpp | 15 ++++++++ .../px/p2p_datahub/ob_runtime_filter_msg.cpp | 18 +++++++-- 4 files changed, 35 insertions(+), 37 deletions(-) diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp index 6719ffad28..67a3f73b4a 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp @@ -395,43 +395,15 @@ int ObP2PDatahubManager::P2PMsgSetCall::operator() (const common::hash::HashMapP ObP2PDatahubMsgBase *> &entry) { // entry.second == &dh_msg_ - // 1. register into dm - // 2. do dh_msg_.regenerate() + // once the msg is set to p2p datahub map, other threads will access it, so + // the regenerate process must be done in the setting process. UNUSED(entry); int ret = OB_SUCCESS; -#ifdef ERRSIM - if (OB_FAIL(OB_E(EventTable::EN_PX_P2P_MSG_REG_DM_FAILED) OB_SUCCESS)) { - LOG_WARN("p2p msg reg dm failed by design", K(ret)); - ret = OB_ALLOCATE_MEMORY_FAILED; - ret_ = ret; - return ret; - } -#endif - if (OB_FAIL(ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm( - dh_msg_.get_register_dm_info(), dh_key_, dh_msg_.get_dm_cb_node_seq_id()))) { - LOG_WARN("[DM] failed to register check item to dm", K(dh_msg_.get_register_dm_info()), - K(dh_key_), K(dh_msg_.get_dm_cb_node_seq_id())); - } else { - succ_reg_dm_ = true; - LOG_TRACE("[DM] rf register check item to dm", K(dh_msg_.get_register_dm_info()), - K(dh_key_), K(dh_msg_.get_dm_cb_node_seq_id())); - if (OB_FAIL(dh_msg_.regenerate())) { - LOG_WARN("failed to do regen_call", K(dh_key_)); - } else if (FALSE_IT(dh_msg_.check_finish_receive())) { - } - } - if (OB_FAIL(ret)) { - (void) revert(); + if (OB_FAIL(dh_msg_.regenerate())) { + LOG_WARN("failed to do regen_call", K(dh_key_)); + } else if (FALSE_IT(dh_msg_.check_finish_receive())) { } ret_ = ret; return ret; } - -void ObP2PDatahubManager::P2PMsgSetCall::revert() -{ - if (succ_reg_dm_) { - (void) ObDetectManagerUtils::p2p_datahub_unregister_check_item_from_dm( - dh_msg_.get_register_dm_info().detectable_id_, dh_msg_.get_dm_cb_node_seq_id()); - } -} diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h index c72c7c72ea..3006a259f2 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h @@ -60,7 +60,6 @@ public: : dh_key_(dh_key), dh_msg_(db_msg), ret_(OB_SUCCESS), succ_reg_dm_(false) {}; ~P2PMsgSetCall() = default; int operator() (const common::hash::HashMapPair &entry); - void revert(); ObP2PDhKey &dh_key_; ObP2PDatahubMsgBase &dh_msg_; int ret_; diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp index 8d8baa9af5..466660b14d 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp @@ -138,6 +138,21 @@ int ObP2PDatahubMsgBase::process_msg_internal(bool &need_free) need_free = true; } else { need_merge = false; // set success, not need to merge + int reg_dm_ret = OB_SUCCESS; +#ifdef ERRSIM + int reg_dm_ret = OB_E(EventTable::EN_PX_P2P_MSG_REG_DM_FAILED) OB_SUCCESS; + if (OB_SUCCESS != reg_dm_ret) { + LOG_WARN("p2p msg reg dm failed by design", K(ret)); + reg_dm_ret = OB_ALLOCATE_MEMORY_FAILED; + } +#endif + if (OB_SUCCESS == reg_dm_ret) { + reg_dm_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm( + register_dm_info_, dh_key, dm_cb_node_seq_id_); + } + if (OB_SUCCESS != reg_dm_ret) { + LOG_WARN("[DM] failed to register check item to dm", K(reg_dm_ret)); + } } // merge filter diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp index 1915e46868..a17f14572e 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp @@ -243,6 +243,21 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free) need_free = true; } else { need_merge = false; // set success, not need to merge + int reg_dm_ret = OB_SUCCESS; +#ifdef ERRSIM + int reg_dm_ret = OB_E(EventTable::EN_PX_P2P_MSG_REG_DM_FAILED) OB_SUCCESS; + if (OB_SUCCESS != reg_dm_ret) { + LOG_WARN("p2p msg reg dm failed by design", K(ret)); + reg_dm_ret = OB_ALLOCATE_MEMORY_FAILED; + } +#endif + if (OB_SUCCESS == reg_dm_ret) { + reg_dm_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm( + register_dm_info_, dh_key, dm_cb_node_seq_id_); + } + if (OB_SUCCESS != reg_dm_ret) { + LOG_WARN("[DM] failed to register check item to dm", K(reg_dm_ret)); + } } // merge piece bloom filter @@ -262,9 +277,6 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free) rf_msg_in_map->dec_ref_count(); } } - if (OB_SUCC(ret) && !need_merge) { - (void)check_finish_receive(); - } if (need_free) { // msg not in map, dec ref count guard.dec_msg_ref_count();