DM supports retry policy.

This commit is contained in:
obdev
2023-06-08 04:18:00 +00:00
committed by ob-robot
parent 72b3c1ff76
commit 30b4cfd39a
11 changed files with 208 additions and 65 deletions

View File

@ -156,6 +156,12 @@ int ObExprJoinFilter::check_rf_ready(
if (join_filter_ctx->need_wait_ready()) {
while (!join_filter_ctx->is_ready() && OB_SUCC(exec_ctx.fast_check_status())) {
if (OB_NOT_NULL(rf_msg)) {
#ifdef ERRSIM
if (OB_FAIL(OB_E(EventTable::EN_PX_JOIN_FILTER_HOLD_MSG) OB_SUCCESS)) {
LOG_WARN("join filter hold msg by design", K(ret));
ob_usleep(80000000);
}
#endif
if (rf_msg->check_ready()) {
break;
}

View File

@ -265,6 +265,9 @@ int ObP2PDatahubManager::erase_msg_if(ObP2PDhKey &dh_key,
msg->get_register_dm_info().detectable_id_, msg->get_dm_cb_node_seq_id());
}
PX_P2P_DH.free_msg(msg);
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to erase msg, other threads still referencing it", K(dh_key), K(need_unreg_dm));
}
return ret;
}
@ -385,3 +388,47 @@ bool ObP2PDatahubManager::P2PMsgEraseIfCall::operator() (common::hash::HashMapPa
}
return need_erase;
}
int ObP2PDatahubManager::P2PMsgSetCall::operator() (const common::hash::HashMapPair<ObP2PDhKey,
ObP2PDatahubMsgBase *> &entry)
{
// entry.second == &dh_msg_
// 1. register into dm
// 2. do dh_msg_.regenerate()
UNUSED(entry);
int ret = OB_SUCCESS;
#ifdef ERRSIM
if (OB_FAIL(OB_E(EventTable::EN_PX_P2P_MSG_REG_DM_FAILED) OB_SUCCESS)) {
LOG_WARN("p2p msg reg dm failed by design", K(ret));
ret = OB_ALLOCATE_MEMORY_FAILED;
ret_ = ret;
return ret;
}
#endif
if (OB_FAIL(ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(
dh_msg_.get_register_dm_info(), dh_key_, dh_msg_.get_dm_cb_node_seq_id()))) {
LOG_WARN("[DM] failed to register check item to dm", K(dh_msg_.get_register_dm_info()),
K(dh_key_), K(dh_msg_.get_dm_cb_node_seq_id()));
} else {
succ_reg_dm_ = true;
LOG_TRACE("[DM] rf register check item to dm", K(dh_msg_.get_register_dm_info()),
K(dh_key_), K(dh_msg_.get_dm_cb_node_seq_id()));
if (OB_FAIL(dh_msg_.regenerate())) {
LOG_WARN("failed to do regen_call", K(dh_key_));
}
}
if (OB_FAIL(ret)) {
(void) revert();
}
ret_ = ret;
return ret;
}
void ObP2PDatahubManager::P2PMsgSetCall::revert()
{
if (succ_reg_dm_) {
(void) ObDetectManagerUtils::p2p_datahub_unregister_check_item_from_dm(
dh_msg_.get_register_dm_info().detectable_id_, dh_msg_.get_dm_cb_node_seq_id());
}
}

View File

@ -61,6 +61,19 @@ public:
int ret_;
};
struct P2PMsgSetCall
{
P2PMsgSetCall(ObP2PDhKey &dh_key, ObP2PDatahubMsgBase &db_msg)
: dh_key_(dh_key), dh_msg_(db_msg), ret_(OB_SUCCESS), succ_reg_dm_(false) {};
~P2PMsgSetCall() = default;
int operator() (const common::hash::HashMapPair<ObP2PDhKey, ObP2PDatahubMsgBase *> &entry);
void revert();
ObP2PDhKey &dh_key_;
ObP2PDatahubMsgBase &dh_msg_;
int ret_;
bool succ_reg_dm_;
};
public:
ObP2PDatahubManager() : map_(), is_inited_(false),
p2p_dh_proxy_(), p2p_dh_id_(0)

View File

@ -116,31 +116,36 @@ int ObP2PDatahubMsgBase::process_msg_internal(bool &need_free)
{
int ret = OB_SUCCESS;
ObP2PDhKey dh_key(p2p_datahub_id_, px_sequence_id_, task_id_);
ObP2PDatahubManager::P2PMsgMergeCall call(*this);
ObP2PDatahubManager::P2PMsgSetCall set_call(dh_key, *this);
ObP2PDatahubManager::P2PMsgMergeCall merge_call(*this);
ObP2PDatahubManager::MsgMap &map = PX_P2P_DH.get_map();
start_time_ = ObTimeUtility::current_time();
ObP2PDatahubMsgGuard guard(this);
do {
if (OB_HASH_EXIST == (ret = map.set_refactored(dh_key, this))) {
if (OB_FAIL(map.atomic_refactored(dh_key, call))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to merge p2p dh msg", K(ret));
}
}
} else if (OB_SUCCESS == ret) {
(void)check_finish_receive();
// set_refactored success, means this msg is in map, so register check item into dm
int reg_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(register_dm_info_,
dh_key, dm_cb_node_seq_id_);
if (OB_SUCCESS != reg_ret) {
LOG_WARN("[DM] failed to register check item to dm", K(reg_ret));
}
LOG_TRACE("[DM] p2p msg register check item to dm", K(reg_ret), K(register_dm_info_),
K(dh_key), K(dm_cb_node_seq_id_), K(this));
bool need_merge = true;
if (OB_FAIL(map.set_refactored(dh_key, this, 0/*flag*/, 0/*broadcast*/, 0/*overwrite_key*/, &set_call))) {
if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to set refactored", K(ret));
}
} while (ret == OB_HASH_NOT_EXIST);
if (call.need_free_) {
need_free = true;
} else {
need_merge = false; // set success, not need to merge
}
// merge filter
if (OB_SUCC(ret) && need_merge) {
if (OB_FAIL(map.atomic_refactored(dh_key, merge_call))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to merge p2p dh msg", K(ret));
}
}
}
if (OB_SUCC(ret) && !need_merge) {
(void)check_finish_receive();
}
if (need_free) {
// msg not in map, dec ref count
guard.dec_msg_ref_count();
}

View File

@ -209,18 +209,19 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free)
{
int ret = OB_SUCCESS;
ObP2PDhKey dh_key(p2p_datahub_id_, px_sequence_id_, task_id_);
ObP2PDatahubManager::P2PMsgMergeCall call(*this);
ObP2PDatahubManager::P2PRegenerateCall regen_call(*this);
ObP2PDatahubManager::P2PMsgSetCall set_call(dh_key, *this);
ObP2PDatahubManager::P2PMsgMergeCall merge_call(*this);
ObP2PDatahubManager::MsgMap &map = PX_P2P_DH.get_map();
start_time_ = ObTimeUtility::current_time();
bool need_merge = true;
if (OB_FAIL(generate_receive_count_array(piece_size_, bloom_filter_.get_begin_idx()))) {
need_free = true;
LOG_WARN("fail to generate receive count array", K(ret));
} else {
//set msg
ObP2PDatahubMsgGuard guard(this);
if (OB_FAIL(map.set_refactored(dh_key, this))) {
if (OB_FAIL(map.set_refactored(dh_key, this, 0/*flag*/, 0/*broadcast*/, 0/*overwrite_key*/, &set_call))) {
if (OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
} else {
@ -228,25 +229,12 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free)
}
need_free = true;
} else {
need_merge = false;
// set_refactored success, means this msg is in map, so register check item into dm
int reg_ret = ObDetectManagerUtils::p2p_datahub_register_check_item_into_dm(register_dm_info_,
dh_key, dm_cb_node_seq_id_);
if (OB_SUCCESS != reg_ret) {
LOG_WARN("[DM] failed to register check item to dm", K(reg_ret));
}
LOG_TRACE("[DM] rf register check item to dm", K(reg_ret), K(register_dm_info_),
K(dh_key), K(dm_cb_node_seq_id_), K(this));
}
// create whole bloom filter, no need wait
if (OB_SUCC(ret)) {
if (OB_FAIL(map.atomic_refactored(dh_key, regen_call))) {
LOG_WARN("fail to update bloom filter msg", K(ret));
}
need_merge = false; // set success, not need to merge
}
// merge piece bloom filter
if (OB_SUCC(ret) && need_merge) {
if (OB_FAIL(map.read_atomic(dh_key, call))) {
if (OB_FAIL(map.read_atomic(dh_key, merge_call))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to merge p2p dh msg", K(ret));
}