[Arb] optimize requirements for arbitration degrading and upgrading

This commit is contained in:
BinChenn
2023-06-05 05:02:27 +00:00
committed by ob-robot
parent 26aa25d647
commit b94bbba9b5
6 changed files with 132 additions and 0 deletions

View File

@ -111,6 +111,10 @@ public:
{blacklist_.block_net(src);}
void unblock_net(const ObAddr &src) override final
{blacklist_.unblock_net(src);}
void block_pcode(const ObRpcPacketCode &pcode) override final
{blacklist_.block_pcode(pcode);}
void unblock_pcode(const ObRpcPacketCode &pcode) override final
{blacklist_.unblock_pcode(pcode);}
void set_rpc_loss(const ObAddr &src, const int loss_rate) override final
{blacklist_.set_rpc_loss(src, loss_rate);}
void reset_rpc_loss(const ObAddr &src) override final

View File

@ -781,6 +781,22 @@ void ObSimpleLogClusterTestEnv::unblock_net(const int64_t id1, const int64_t id2
SERVER_LOG(INFO, "unblock_net success", K(addr1), K(addr2));
}
void ObSimpleLogClusterTestEnv::block_pcode(const int64_t id1, const ObRpcPacketCode &pcode)
{
auto cluster = get_cluster();
ObAddr addr1 = cluster[id1]->get_addr();
cluster[id1]->block_pcode(pcode);
SERVER_LOG(INFO, "block_pcode success", K(addr1), K(pcode));
}
void ObSimpleLogClusterTestEnv::unblock_pcode(const int64_t id1, const ObRpcPacketCode &pcode)
{
auto cluster = get_cluster();
ObAddr addr1 = cluster[id1]->get_addr();
cluster[id1]->unblock_pcode(pcode);
SERVER_LOG(INFO, "unblock_pcode success", K(addr1), K(pcode));
}
// set rpc loss by rate from id1 to id2
void ObSimpleLogClusterTestEnv::set_rpc_loss(const int64_t id1, const int64_t id2, const int loss_rate)
{

View File

@ -191,6 +191,8 @@ public:
virtual void unblock_all_net(const int64_t id);
virtual void block_net(const int64_t id1, const int64_t id2, const bool is_single_direction = false);
virtual void unblock_net(const int64_t id1, const int64_t id2);
virtual void block_pcode(const int64_t id1, const ObRpcPacketCode &pcode);
virtual void unblock_pcode(const int64_t id1, const ObRpcPacketCode &pcode);
virtual void set_rpc_loss(const int64_t id1, const int64_t id2, const int loss_rate);
virtual void reset_rpc_loss(const int64_t id1, const int64_t id2);
virtual int submit_log(PalfHandleImplGuard &leader, int count, int id);

View File

@ -376,6 +376,10 @@ int ObMittestBlacklist::init(const common::ObAddr &self)
SERVER_LOG(WARN, "create blacklist_ failed", K(ret));
} else if (false == blacklist_.created()) {
SERVER_LOG(WARN, "blacklist_ created failed");
} else if (OB_FAIL(pcode_blacklist_.create(1024))) {
SERVER_LOG(WARN, "create pcode_blacklist_ failed", K(ret));
} else if (false == pcode_blacklist_.created()) {
SERVER_LOG(WARN, "pcode_blacklist_ created failed");
} else {
self_ = self;
}
@ -394,11 +398,27 @@ void ObMittestBlacklist::unblock_net(const ObAddr &src)
SERVER_LOG(INFO, "unblock_net", K(src), K(blacklist_));
}
void ObMittestBlacklist::block_pcode(const ObRpcPacketCode &pcode)
{
pcode_blacklist_.set_refactored((int64_t)pcode);
SERVER_LOG(INFO, "block_pcode", K(pcode), K(pcode_blacklist_));
}
void ObMittestBlacklist::unblock_pcode(const ObRpcPacketCode &pcode)
{
pcode_blacklist_.erase_refactored((int64_t)pcode);
SERVER_LOG(INFO, "unblock_pcode", K(pcode), K(pcode_blacklist_));
}
bool ObMittestBlacklist::need_filter_packet_by_blacklist(const ObAddr &addr)
{
return OB_HASH_EXIST == blacklist_.exist_refactored(addr);
}
bool ObMittestBlacklist::need_filter_packet_by_pcode_blacklist(const ObRpcPacketCode &pcode)
{
return OB_HASH_EXIST == pcode_blacklist_.exist_refactored((int64_t)pcode);
}
void ObMittestBlacklist::set_rpc_loss(const ObAddr &src, const int loss_rate)
{
@ -501,6 +521,7 @@ void ObLogDeliver::destroy(const bool is_shutdown)
TG_DESTROY(tg_id_);
if (is_shutdown) {
blacklist_.destroy();
pcode_blacklist_.destroy();
}
tg_id_ = 0;
SERVER_LOG(INFO, "destroy ObLogDeliver");
@ -605,6 +626,10 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req)
}
return false;
};
if (this->need_filter_packet_by_pcode_blacklist(pcode)) {
SERVER_LOG(WARN, "need_filter_packet_by_pcode_blacklist", K(ret), K(pcode), K(pcode_blacklist_), KPC(palf_env_impl_));
return OB_SUCCESS;
}
switch (pkt.get_pcode()) {
#define PROCESS(processer) \
processer p;\

View File

@ -102,7 +102,10 @@ public:
int init(const common::ObAddr &self);
void block_net(const ObAddr &src);
void unblock_net(const ObAddr &src);
void block_pcode(const ObRpcPacketCode &pcode);
void unblock_pcode(const ObRpcPacketCode &pcode);
bool need_filter_packet_by_blacklist(const ObAddr &address);
bool need_filter_packet_by_pcode_blacklist(const ObRpcPacketCode &pcode);
void set_need_drop_packet(const bool need_drop_packet) { need_drop_packet_ = need_drop_packet; }
void set_rpc_loss(const ObAddr &src, const int loss_rate);
void reset_rpc_loss(const ObAddr &src);
@ -111,6 +114,7 @@ public:
TO_STRING_KV(K_(blacklist), K_(rpc_loss_config));
protected:
hash::ObHashSet<ObAddr> blacklist_;
hash::ObHashSet<int64_t> pcode_blacklist_;
bool need_drop_packet_;
common::ObSEArray<LossConfig, 4> rpc_loss_config_;
common::ObAddr self_;
@ -206,6 +210,8 @@ public:
virtual void set_need_drop_packet(const bool need_drop_packet) = 0;
virtual void block_net(const ObAddr &src) = 0;
virtual void unblock_net(const ObAddr &src) = 0;
virtual void block_pcode(const ObRpcPacketCode &pcode) = 0;
virtual void unblock_pcode(const ObRpcPacketCode &pcode) = 0;
virtual void set_rpc_loss(const ObAddr &src, const int loss_rate) = 0;
virtual void reset_rpc_loss(const ObAddr &src) = 0;
virtual int simple_init(const std::string &cluster_name,
@ -279,6 +285,10 @@ public:
{ deliver_.block_net(src); }
void unblock_net(const ObAddr &src) override final
{ deliver_.unblock_net(src); }
void block_pcode(const ObRpcPacketCode &pcode) override final
{ deliver_.block_pcode(pcode); }
void unblock_pcode(const ObRpcPacketCode &pcode) override final
{ deliver_.unblock_pcode(pcode); }
void set_rpc_loss(const ObAddr &src, const int loss_rate) override final
{ deliver_.set_rpc_loss(src, loss_rate); }
void reset_rpc_loss(const ObAddr &src) override final

View File

@ -179,6 +179,8 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_during_degrading)
EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_EQ(leader_max_flushed_end_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_);
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start();
dynamic_cast<ObSimpleLogServer*>(get_cluster()[b_idx])->log_service_.get_arbitration_service()->start();
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
@ -289,6 +291,8 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_to_other_during_de
EXPECT_TRUE(a_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr));
EXPECT_TRUE(a_handle->palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr));
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start();
dynamic_cast<ObSimpleLogServer*>(get_cluster()[b_idx])->log_service_.get_arbitration_service()->start();
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
@ -372,6 +376,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_degrade_when_no_leader
EXPECT_UNTIL_EQ(true, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.contains(b_addr));
unblock_net(leader_idx, b_idx);
dynamic_cast<ObSimpleLogServer*>(get_cluster()[b_idx])->log_service_.get_arbitration_service()->start();
get_cluster()[c_idx]->get_palf_env()->revert_palf_handle_impl(c_ihandle);
revert_cluster_palf_handle_guard(palf_list);
}
@ -436,6 +441,76 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_degrade_when_arb_crash
PALF_LOG(INFO, "end test test_2f1a_degrade_when_arb_crash", K(id));
}
TEST_F(TestObSimpleLogClusterArbMockEleService, test_arb_degrade_probe)
{
int ret = OB_SUCCESS;
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L;
OB_LOGGER.set_log_level("TRACE");
SET_CASE_LOG_FILE(TEST_NAME, "test_arb_degrade_probe");
PALF_LOG(INFO, "begin test test_arb_degrade_probe", K(id));
{
int64_t leader_idx = 0;
int64_t arb_replica_idx = 0;
PalfHandleImplGuard leader;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, CONFIG_CHANGE_TIMEOUT));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start();
const int64_t b_idx = (leader_idx + 1) % 5;
const int64_t c_idx = (leader_idx + 2) % 5;
const int64_t d_idx = (leader_idx + 3) % 5;
const int64_t e_idx = (leader_idx + 4) % 5;
const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr();
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
const common::ObAddr d_addr = get_cluster()[d_idx]->get_addr();
const common::ObAddr e_addr = get_cluster()[e_idx]->get_addr();
PalfHandleImplGuard *a_handle = palf_list[leader_idx];
PalfHandleImplGuard *b_handle = palf_list[b_idx];
// CASE 1. D and E UNKNOWN, do not degrade
block_pcode(d_idx, ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG);
block_pcode(e_idx, ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG);
EXPECT_TRUE(dynamic_cast<ObSimpleLogServer*>(get_cluster()[d_idx])->deliver_.need_filter_packet_by_pcode_blacklist(ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG));
EXPECT_TRUE(dynamic_cast<ObSimpleLogServer*>(get_cluster()[e_idx])->deliver_.need_filter_packet_by_pcode_blacklist(ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG));
sleep(6);
EXPECT_EQ(0, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.get_member_number());
// CASE 2. D block_net, E UNKNOWN, do not degrade
block_net(leader_idx, d_idx);
sleep(2);
EXPECT_EQ(0, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.get_member_number());
// CASE 3. D block_net, E block_net, degrade
block_net(leader_idx, e_idx);
EXPECT_TRUE(is_degraded(leader, d_idx));
EXPECT_TRUE(is_degraded(leader, e_idx));
// CASE 4. D and E unblock_net but block_pcode, do not upgrade
unblock_net(leader_idx, d_idx);
unblock_net(leader_idx, e_idx);
sleep(2);
EXPECT_EQ(2, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.get_member_number());
// CASE 5. D unblock_net and unblock_pcode, upgrade E
unblock_pcode(d_idx, ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG);
EXPECT_UNTIL_EQ(false, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.contains(d_addr));
// CASE 6. E unblock_net and unblock_pcode, upgrade E
unblock_pcode(e_idx, ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG);
EXPECT_UNTIL_EQ(false, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.contains(d_addr));
revert_cluster_palf_handle_guard(palf_list);
}
delete_paxos_group(id);
PALF_LOG(INFO, "end test test_arb_degrade_probe", K(id));
}
} // end unittest
} // end oceanbase