diff --git a/deps/oblib/src/lib/hash/ob_hashmap.h b/deps/oblib/src/lib/hash/ob_hashmap.h index d69581887..98a1f74e7 100644 --- a/deps/oblib/src/lib/hash/ob_hashmap.h +++ b/deps/oblib/src/lib/hash/ob_hashmap.h @@ -196,15 +196,16 @@ public: const _value_type *ret = get(const_cast(key)); return const_cast<_value_type*>(ret); } - inline int set_refactored(const _key_type &key, const _value_type &value, int flag = 0, - int broadcast = 0, int overwrite_key = 0) + template + int set_refactored(const _key_type &key, const _value_type &value, int flag = 0, + int broadcast = 0, int overwrite_key = 0, _callback *callback = nullptr) { int ret = OB_SUCCESS; pair_type pair; if (OB_FAIL(pair.init(key, value))) { HASH_WRITE_LOG(HASH_WARNING, "init pair failed, ret=%d", ret); } else { - ret = ht_.set_refactored(key, pair, flag, broadcast, overwrite_key); + ret = ht_.set_refactored(key, pair, flag, broadcast, overwrite_key, callback); } return ret; }; diff --git a/deps/oblib/src/lib/hash/ob_hashtable.h b/deps/oblib/src/lib/hash/ob_hashtable.h index 0bcf29250..afc783f02 100644 --- a/deps/oblib/src/lib/hash/ob_hashtable.h +++ b/deps/oblib/src/lib/hash/ob_hashtable.h @@ -1015,6 +1015,32 @@ private: return ret; } + inline int internal_erase(hashbucket &bucket, + const _key_type &key) + { + int ret = OB_HASH_NOT_EXIST; + hashnode *node = bucket.node; + hashnode *prev = nullptr; + while (nullptr != node) { + if (equal_(getkey_(node->data), key)) { + ret = OB_SUCCESS; + if (nullptr == prev) { + bucket.node = node->next; + } else { + prev->next = node->next; + } + allocer_->free(node); + node = nullptr; + ATOMIC_DEC((uint64_t *) &size_); + break; + } else { + prev = node; + node = node->next; + } + } + return ret; + } + // This function is unsafe in that the pointer it returns might be invalid when // the user uses it. The multi-thread safeness is left to the caller. int internal_get_with_timeout_unsafe(hashbucket &bucket, @@ -1123,9 +1149,30 @@ public: return ret; } - // flag: 0 shows that not cover existing object. + template + int callback_helper(const _value_type &value, _callback *callback) + { + return (*callback)(value); + } + + template <> + int callback_helper(const _value_type &value, void *callback) + { + return OB_SUCCESS; + } + + // if key not exist, set node and execute callback + // if callback failed, erase the node + // parameters: + // flag: 0 shows that not cover existing object. + // callback: MUST with a int operater() + // return value: + // OB_SUCCESS for success, the node is set + // OB_HASH_EXIST for node already exist + // others for error + template ::value> int set_refactored(const _key_type &key, const _value_type &value, int flag = 0, - int broadcast = 0, int overwrite_key = 0) + int broadcast = 0, int overwrite_key = 0, _callback *callback = nullptr) { int ret = OB_SUCCESS; uint64_t hash_value = 0; @@ -1190,6 +1237,17 @@ public: } if (OB_SUCC(ret) && NULL == node) { ret = internal_set(bucket, value, false); + if (with_callback) { + if (OB_SUCC(ret)) { + int tmp_ret = callback_helper(value, callback); + if (OB_SUCCESS != tmp_ret) { + HASH_WRITE_LOG(HASH_WARNING, "hashtable executes callback failed, tmp_ret=%d", tmp_ret); + ret = OB_ERR_UNEXPECTED; + // never fail because internal_set succeed + (void) internal_erase(bucket, key); + } + } + } } } return ret; diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index 42d03b2cd..9c420cd5f 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -620,6 +620,8 @@ class EventTable EN_PX_NOT_ERASE_P2P_DH_MSG = 608, EN_PX_SLOW_PROCESS_SQC_FINISH_MSG = 609, EN_PX_JOIN_FILTER_NOT_MERGE_MSG = 610, + EN_PX_P2P_MSG_REG_DM_FAILED= 611, + EN_PX_JOIN_FILTER_HOLD_MSG = 612, // please add new trace point after 700 or before 600 // Compaction Related 700-750 diff --git a/src/share/detect/ob_detect_callback.cpp b/src/share/detect/ob_detect_callback.cpp index 3e49cfb29..067ab7718 100644 --- a/src/share/detect/ob_detect_callback.cpp +++ b/src/share/detect/ob_detect_callback.cpp @@ -53,6 +53,20 @@ int ObIDetectCallback::atomic_set_finished(const common::ObAddr &addr, ObTaskSta return ret; } +int ObIDetectCallback::atomic_set_running(const common::ObAddr &addr) +{ + int ret = OB_SEARCH_NOT_FOUND; + ARRAY_FOREACH_NORET(peer_states_, idx) { + if (peer_states_.at(idx).peer_addr_ == addr) { + ATOMIC_SET((int32_t*)&peer_states_.at(idx).peer_state_, + (int32_t)ObTaskState::RUNNING); + ret = OB_SUCCESS; + break; + } + } + return ret; +} + int64_t ObIDetectCallback::inc_ref_count(int64_t count) { return ATOMIC_AAF(&ref_count_, count); @@ -212,11 +226,13 @@ int ObQcDetectCB::atomic_set_finished(const common::ObAddr &addr, ObTaskState *s int ret = OB_SEARCH_NOT_FOUND; for (int i = 0; i < get_peer_states().count(); ++i) { if (get_peer_states().at(i).peer_addr_ == addr) { - sql::dtl::ObDtlRpcChannel* dtl_rpc_channel = static_cast(dtl_channels_.at(i)); - if (dtl_rpc_channel->recv_sqc_fin_res()) { - ATOMIC_SET((int32_t*)&get_peer_states().at(i).peer_state_, (int32_t)ObTaskState::FINISHED); - if (OB_NOT_NULL(state)) { + ATOMIC_SET((int32_t*)&get_peer_states().at(i).peer_state_, (int32_t)ObTaskState::FINISHED); + if (OB_NOT_NULL(state)) { + sql::dtl::ObDtlRpcChannel* dtl_rpc_channel = static_cast(dtl_channels_.at(i)); + if (dtl_rpc_channel->recv_sqc_fin_res()) { *state = ObTaskState::FINISHED; + } else { + *state = ObTaskState::RUNNING; } } ret = OB_SUCCESS; diff --git a/src/share/detect/ob_detect_callback.h b/src/share/detect/ob_detect_callback.h index 17434bca8..f91c59b03 100644 --- a/src/share/detect/ob_detect_callback.h +++ b/src/share/detect/ob_detect_callback.h @@ -81,7 +81,10 @@ public: ObArray &get_peer_states() { return peer_states_; } + // set peer state to finished and get the old state virtual int atomic_set_finished(const common::ObAddr &addr, ObTaskState *state=nullptr); + // if do_callback failed, reset state to running for next detect loop + virtual int atomic_set_running(const common::ObAddr &addr); int64_t get_ref_count() { return ATOMIC_LOAD(&ref_count_); } int64_t inc_ref_count(int64_t count = 1); int64_t dec_ref_count(); diff --git a/src/share/detect/ob_detect_manager.cpp b/src/share/detect/ob_detect_manager.cpp index 30cf049cc..44623e683 100644 --- a/src/share/detect/ob_detect_manager.cpp +++ b/src/share/detect/ob_detect_manager.cpp @@ -454,38 +454,42 @@ bool ObDetectManager::ObDetectCallbackNodeExecuteCall::operator()(hash::HashMapP ObDetectCallbackNode *node = entry.second; int node_cnt = 0; // node_cnt in linked list, remove kv pair if the last node is removed while (OB_NOT_NULL(node)) { + ret = OB_SUCCESS; node_cnt++; ObDetectCallbackNode *next_node = node->next_; // if a callback can be reentrant, don't set is_executed so that it can be do for several times // typical scene: qc detects sqc, if at least 2 sqc failed, both of them should be set not_need_report, // the callback should be reentrant and node not set to executed if (!node->is_executed()) { - if (!node->cb_->reentrant()) { - node->set_executed(); - } LIB_LOG(WARN, "[DM] DM found peer not exist, execute detect callback", K(node->cb_->get_trace_id()), K(from_svr_addr_), K(detectable_id), K(node->cb_->get_detect_callback_type()), K(node->sequence_id_)); node->cb_->set_from_svr_addr(from_svr_addr_); if (OB_FAIL(node->cb_->do_callback())) { + // if do_callback failed, reset state to running for next detect loop + node->cb_->atomic_set_running(from_svr_addr_); LIB_LOG(WARN, "[DM] failed to do_callback", K(node->cb_->get_trace_id()), K(from_svr_addr_), K(detectable_id), K(node->cb_->get_detect_callback_type()), K(node->sequence_id_)); - } - // ref_count > 0 means that cb is still referred by work thread, don‘t remove it from the linked list - int64_t ref_count = node->cb_->get_ref_count(); - if (0 == ref_count) { - if (node->next_ != nullptr) { - node->next_->prev_ = node->prev_; + } else { + if (!node->cb_->reentrant()) { + node->set_executed(); } - if (node->prev_ != nullptr) { - node->prev_->next_ = node->next_; - } else { - // Prev is empty, which means that the current deleted element is the head of the linked list pointed to by map_value, and head is set to next - entry.second = node->next_; + // ref_count > 0 means that cb is still referred by work thread, don‘t remove it from the linked list + int64_t ref_count = node->cb_->get_ref_count(); + if (0 == ref_count) { + if (node->next_ != nullptr) { + node->next_->prev_ = node->prev_; + } + if (node->prev_ != nullptr) { + node->prev_->next_ = node->next_; + } else { + // Prev is empty, which means that the current deleted element is the head of the linked list pointed to by map_value, and head is set to next + entry.second = node->next_; + } + dm_->delete_cb_node(node); + node_cnt--; } - dm_->delete_cb_node(node); - node_cnt--; } } node = next_node; @@ -499,7 +503,7 @@ void ObDetectManager::ObCheckStateFinishCall::operator()(hash::HashMapPaircb_->atomic_set_finished(addr_, &state)) { if (ObTaskState::FINISHED == state) { finished_ = true; diff --git a/src/sql/engine/expr/ob_expr_join_filter.cpp b/src/sql/engine/expr/ob_expr_join_filter.cpp index 6632eb8aa..a6b088bbb 100644 --- a/src/sql/engine/expr/ob_expr_join_filter.cpp +++ b/src/sql/engine/expr/ob_expr_join_filter.cpp @@ -156,6 +156,12 @@ int ObExprJoinFilter::check_rf_ready( if (join_filter_ctx->need_wait_ready()) { while (!join_filter_ctx->is_ready() && OB_SUCC(exec_ctx.fast_check_status())) { if (OB_NOT_NULL(rf_msg)) { +#ifdef ERRSIM + if (OB_FAIL(OB_E(EventTable::EN_PX_JOIN_FILTER_HOLD_MSG) OB_SUCCESS)) { + LOG_WARN("join filter hold msg by design", K(ret)); + ob_usleep(80000000); + } +#endif if (rf_msg->check_ready()) { break; } diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp index 0a7690338..047b1f3bf 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp @@ -265,6 +265,9 @@ int ObP2PDatahubManager::erase_msg_if(ObP2PDhKey &dh_key, msg->get_register_dm_info().detectable_id_, msg->get_dm_cb_node_seq_id()); } PX_P2P_DH.free_msg(msg); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to erase msg, other threads still referencing it", K(dh_key), K(need_unreg_dm)); } return ret; } @@ -385,3 +388,47 @@ bool ObP2PDatahubManager::P2PMsgEraseIfCall::operator() (common::hash::HashMapPa } return need_erase; } + +int ObP2PDatahubManager::P2PMsgSetCall::operator() (const common::hash::HashMapPair &entry) +{ + // entry.second == &dh_msg_ + // 1. register into dm + // 2. do dh_msg_.regenerate() + UNUSED(entry); + int ret = OB_SUCCESS; + +#ifdef ERRSIM + if (OB_FAIL(OB_E(EventTable::EN_PX_P2P_MSG_REG_DM_FAILED) OB_SUCCESS)) { + LOG_WARN("p2p msg reg dm failed by design", K(ret)); + ret = OB_ALLOCATE_MEMORY_FAILED; + ret_ = ret; + return ret; + } +#endif + if (OB_FAIL(ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm( + dh_msg_.get_register_dm_info(), dh_key_, dh_msg_.get_dm_cb_node_seq_id()))) { + LOG_WARN("[DM] failed to register check item to dm", K(dh_msg_.get_register_dm_info()), + K(dh_key_), K(dh_msg_.get_dm_cb_node_seq_id())); + } else { + succ_reg_dm_ = true; + LOG_TRACE("[DM] rf register check item to dm", K(dh_msg_.get_register_dm_info()), + K(dh_key_), K(dh_msg_.get_dm_cb_node_seq_id())); + if (OB_FAIL(dh_msg_.regenerate())) { + LOG_WARN("failed to do regen_call", K(dh_key_)); + } + } + if (OB_FAIL(ret)) { + (void) revert(); + } + ret_ = ret; + return ret; +} + +void ObP2PDatahubManager::P2PMsgSetCall::revert() +{ + if (succ_reg_dm_) { + (void) ObDetectManagerUtils::p2p_datahub_unregister_check_item_from_dm( + dh_msg_.get_register_dm_info().detectable_id_, dh_msg_.get_dm_cb_node_seq_id()); + } +} diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h index ce9550529..4820c5347 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h @@ -61,6 +61,19 @@ public: int ret_; }; + struct P2PMsgSetCall + { + P2PMsgSetCall(ObP2PDhKey &dh_key, ObP2PDatahubMsgBase &db_msg) + : dh_key_(dh_key), dh_msg_(db_msg), ret_(OB_SUCCESS), succ_reg_dm_(false) {}; + ~P2PMsgSetCall() = default; + int operator() (const common::hash::HashMapPair &entry); + void revert(); + ObP2PDhKey &dh_key_; + ObP2PDatahubMsgBase &dh_msg_; + int ret_; + bool succ_reg_dm_; + }; + public: ObP2PDatahubManager() : map_(), is_inited_(false), p2p_dh_proxy_(), p2p_dh_id_(0) diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp index f09cc5d5d..cb89b5153 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp @@ -116,31 +116,36 @@ int ObP2PDatahubMsgBase::process_msg_internal(bool &need_free) { int ret = OB_SUCCESS; ObP2PDhKey dh_key(p2p_datahub_id_, px_sequence_id_, task_id_); - ObP2PDatahubManager::P2PMsgMergeCall call(*this); + ObP2PDatahubManager::P2PMsgSetCall set_call(dh_key, *this); + ObP2PDatahubManager::P2PMsgMergeCall merge_call(*this); ObP2PDatahubManager::MsgMap &map = PX_P2P_DH.get_map(); start_time_ = ObTimeUtility::current_time(); ObP2PDatahubMsgGuard guard(this); - do { - if (OB_HASH_EXIST == (ret = map.set_refactored(dh_key, this))) { - if (OB_FAIL(map.atomic_refactored(dh_key, call))) { - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("fail to merge p2p dh msg", K(ret)); - } - } - } else if (OB_SUCCESS == ret) { - (void)check_finish_receive(); - // set_refactored success, means this msg is in map, so register check item into dm - int reg_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(register_dm_info_, - dh_key, dm_cb_node_seq_id_); - if (OB_SUCCESS != reg_ret) { - LOG_WARN("[DM] failed to register check item to dm", K(reg_ret)); - } - LOG_TRACE("[DM] p2p msg register check item to dm", K(reg_ret), K(register_dm_info_), - K(dh_key), K(dm_cb_node_seq_id_), K(this)); + + bool need_merge = true; + if (OB_FAIL(map.set_refactored(dh_key, this, 0/*flag*/, 0/*broadcast*/, 0/*overwrite_key*/, &set_call))) { + if (OB_HASH_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to set refactored", K(ret)); } - } while (ret == OB_HASH_NOT_EXIST); - if (call.need_free_) { need_free = true; + } else { + need_merge = false; // set success, not need to merge + } + + // merge filter + if (OB_SUCC(ret) && need_merge) { + if (OB_FAIL(map.atomic_refactored(dh_key, merge_call))) { + if (OB_HASH_NOT_EXIST != ret) { + LOG_WARN("fail to merge p2p dh msg", K(ret)); + } + } + } + if (OB_SUCC(ret) && !need_merge) { + (void)check_finish_receive(); + } + if (need_free) { // msg not in map, dec ref count guard.dec_msg_ref_count(); } diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp index 1db9a74ce..4164b147e 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp @@ -209,18 +209,19 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free) { int ret = OB_SUCCESS; ObP2PDhKey dh_key(p2p_datahub_id_, px_sequence_id_, task_id_); - ObP2PDatahubManager::P2PMsgMergeCall call(*this); - ObP2PDatahubManager::P2PRegenerateCall regen_call(*this); + ObP2PDatahubManager::P2PMsgSetCall set_call(dh_key, *this); + ObP2PDatahubManager::P2PMsgMergeCall merge_call(*this); ObP2PDatahubManager::MsgMap &map = PX_P2P_DH.get_map(); start_time_ = ObTimeUtility::current_time(); bool need_merge = true; if (OB_FAIL(generate_receive_count_array(piece_size_, bloom_filter_.get_begin_idx()))) { + need_free = true; LOG_WARN("fail to generate receive count array", K(ret)); } else { //set msg ObP2PDatahubMsgGuard guard(this); - if (OB_FAIL(map.set_refactored(dh_key, this))) { + if (OB_FAIL(map.set_refactored(dh_key, this, 0/*flag*/, 0/*broadcast*/, 0/*overwrite_key*/, &set_call))) { if (OB_HASH_EXIST == ret) { ret = OB_SUCCESS; } else { @@ -228,25 +229,12 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free) } need_free = true; } else { - need_merge = false; - // set_refactored success, means this msg is in map, so register check item into dm - int reg_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(register_dm_info_, - dh_key, dm_cb_node_seq_id_); - if (OB_SUCCESS != reg_ret) { - LOG_WARN("[DM] failed to register check item to dm", K(reg_ret)); - } - LOG_TRACE("[DM] rf register check item to dm", K(reg_ret), K(register_dm_info_), - K(dh_key), K(dm_cb_node_seq_id_), K(this)); - } - // create whole bloom filter, no need wait - if (OB_SUCC(ret)) { - if (OB_FAIL(map.atomic_refactored(dh_key, regen_call))) { - LOG_WARN("fail to update bloom filter msg", K(ret)); - } + need_merge = false; // set success, not need to merge } + // merge piece bloom filter if (OB_SUCC(ret) && need_merge) { - if (OB_FAIL(map.read_atomic(dh_key, call))) { + if (OB_FAIL(map.read_atomic(dh_key, merge_call))) { if (OB_HASH_NOT_EXIST != ret) { LOG_WARN("fail to merge p2p dh msg", K(ret)); }