diff --git a/mittest/logservice/env/ob_simple_arb_server.h b/mittest/logservice/env/ob_simple_arb_server.h index 59aabd0c7a..525b45b6f8 100644 --- a/mittest/logservice/env/ob_simple_arb_server.h +++ b/mittest/logservice/env/ob_simple_arb_server.h @@ -111,6 +111,10 @@ public: {blacklist_.block_net(src);} void unblock_net(const ObAddr &src) override final {blacklist_.unblock_net(src);} + void block_pcode(const ObRpcPacketCode &pcode) override final + {blacklist_.block_pcode(pcode);} + void unblock_pcode(const ObRpcPacketCode &pcode) override final + {blacklist_.unblock_pcode(pcode);} void set_rpc_loss(const ObAddr &src, const int loss_rate) override final {blacklist_.set_rpc_loss(src, loss_rate);} void reset_rpc_loss(const ObAddr &src) override final diff --git a/mittest/logservice/env/ob_simple_log_cluster_env.cpp b/mittest/logservice/env/ob_simple_log_cluster_env.cpp index a247336096..0790ce4fdb 100755 --- a/mittest/logservice/env/ob_simple_log_cluster_env.cpp +++ b/mittest/logservice/env/ob_simple_log_cluster_env.cpp @@ -781,6 +781,22 @@ void ObSimpleLogClusterTestEnv::unblock_net(const int64_t id1, const int64_t id2 SERVER_LOG(INFO, "unblock_net success", K(addr1), K(addr2)); } +void ObSimpleLogClusterTestEnv::block_pcode(const int64_t id1, const ObRpcPacketCode &pcode) +{ + auto cluster = get_cluster(); + ObAddr addr1 = cluster[id1]->get_addr(); + cluster[id1]->block_pcode(pcode); + SERVER_LOG(INFO, "block_pcode success", K(addr1), K(pcode)); +} + +void ObSimpleLogClusterTestEnv::unblock_pcode(const int64_t id1, const ObRpcPacketCode &pcode) +{ + auto cluster = get_cluster(); + ObAddr addr1 = cluster[id1]->get_addr(); + cluster[id1]->unblock_pcode(pcode); + SERVER_LOG(INFO, "unblock_pcode success", K(addr1), K(pcode)); +} + // set rpc loss by rate from id1 to id2 void ObSimpleLogClusterTestEnv::set_rpc_loss(const int64_t id1, const int64_t id2, const int loss_rate) { diff --git a/mittest/logservice/env/ob_simple_log_cluster_env.h b/mittest/logservice/env/ob_simple_log_cluster_env.h index 446688449d..4456fdbdf5 100644 --- a/mittest/logservice/env/ob_simple_log_cluster_env.h +++ b/mittest/logservice/env/ob_simple_log_cluster_env.h @@ -191,6 +191,8 @@ public: virtual void unblock_all_net(const int64_t id); virtual void block_net(const int64_t id1, const int64_t id2, const bool is_single_direction = false); virtual void unblock_net(const int64_t id1, const int64_t id2); + virtual void block_pcode(const int64_t id1, const ObRpcPacketCode &pcode); + virtual void unblock_pcode(const int64_t id1, const ObRpcPacketCode &pcode); virtual void set_rpc_loss(const int64_t id1, const int64_t id2, const int loss_rate); virtual void reset_rpc_loss(const int64_t id1, const int64_t id2); virtual int submit_log(PalfHandleImplGuard &leader, int count, int id); diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index e5d0010707..96563f29c7 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -376,6 +376,10 @@ int ObMittestBlacklist::init(const common::ObAddr &self) SERVER_LOG(WARN, "create blacklist_ failed", K(ret)); } else if (false == blacklist_.created()) { SERVER_LOG(WARN, "blacklist_ created failed"); + } else if (OB_FAIL(pcode_blacklist_.create(1024))) { + SERVER_LOG(WARN, "create pcode_blacklist_ failed", K(ret)); + } else if (false == pcode_blacklist_.created()) { + SERVER_LOG(WARN, "pcode_blacklist_ created failed"); } else { self_ = self; } @@ -394,11 +398,27 @@ void ObMittestBlacklist::unblock_net(const ObAddr &src) SERVER_LOG(INFO, "unblock_net", K(src), K(blacklist_)); } +void ObMittestBlacklist::block_pcode(const ObRpcPacketCode &pcode) +{ + pcode_blacklist_.set_refactored((int64_t)pcode); + SERVER_LOG(INFO, "block_pcode", K(pcode), K(pcode_blacklist_)); +} + +void ObMittestBlacklist::unblock_pcode(const ObRpcPacketCode &pcode) +{ + pcode_blacklist_.erase_refactored((int64_t)pcode); + SERVER_LOG(INFO, "unblock_pcode", K(pcode), K(pcode_blacklist_)); +} + bool ObMittestBlacklist::need_filter_packet_by_blacklist(const ObAddr &addr) { return OB_HASH_EXIST == blacklist_.exist_refactored(addr); } +bool ObMittestBlacklist::need_filter_packet_by_pcode_blacklist(const ObRpcPacketCode &pcode) +{ + return OB_HASH_EXIST == pcode_blacklist_.exist_refactored((int64_t)pcode); +} void ObMittestBlacklist::set_rpc_loss(const ObAddr &src, const int loss_rate) { @@ -501,6 +521,7 @@ void ObLogDeliver::destroy(const bool is_shutdown) TG_DESTROY(tg_id_); if (is_shutdown) { blacklist_.destroy(); + pcode_blacklist_.destroy(); } tg_id_ = 0; SERVER_LOG(INFO, "destroy ObLogDeliver"); @@ -605,6 +626,10 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req) } return false; }; + if (this->need_filter_packet_by_pcode_blacklist(pcode)) { + SERVER_LOG(WARN, "need_filter_packet_by_pcode_blacklist", K(ret), K(pcode), K(pcode_blacklist_), KPC(palf_env_impl_)); + return OB_SUCCESS; + } switch (pkt.get_pcode()) { #define PROCESS(processer) \ processer p;\ diff --git a/mittest/logservice/env/ob_simple_log_server.h b/mittest/logservice/env/ob_simple_log_server.h index 84f45ca26b..873896fb72 100644 --- a/mittest/logservice/env/ob_simple_log_server.h +++ b/mittest/logservice/env/ob_simple_log_server.h @@ -102,7 +102,10 @@ public: int init(const common::ObAddr &self); void block_net(const ObAddr &src); void unblock_net(const ObAddr &src); + void block_pcode(const ObRpcPacketCode &pcode); + void unblock_pcode(const ObRpcPacketCode &pcode); bool need_filter_packet_by_blacklist(const ObAddr &address); + bool need_filter_packet_by_pcode_blacklist(const ObRpcPacketCode &pcode); void set_need_drop_packet(const bool need_drop_packet) { need_drop_packet_ = need_drop_packet; } void set_rpc_loss(const ObAddr &src, const int loss_rate); void reset_rpc_loss(const ObAddr &src); @@ -111,6 +114,7 @@ public: TO_STRING_KV(K_(blacklist), K_(rpc_loss_config)); protected: hash::ObHashSet blacklist_; + hash::ObHashSet pcode_blacklist_; bool need_drop_packet_; common::ObSEArray rpc_loss_config_; common::ObAddr self_; @@ -206,6 +210,8 @@ public: virtual void set_need_drop_packet(const bool need_drop_packet) = 0; virtual void block_net(const ObAddr &src) = 0; virtual void unblock_net(const ObAddr &src) = 0; + virtual void block_pcode(const ObRpcPacketCode &pcode) = 0; + virtual void unblock_pcode(const ObRpcPacketCode &pcode) = 0; virtual void set_rpc_loss(const ObAddr &src, const int loss_rate) = 0; virtual void reset_rpc_loss(const ObAddr &src) = 0; virtual int simple_init(const std::string &cluster_name, @@ -279,6 +285,10 @@ public: { deliver_.block_net(src); } void unblock_net(const ObAddr &src) override final { deliver_.unblock_net(src); } + void block_pcode(const ObRpcPacketCode &pcode) override final + { deliver_.block_pcode(pcode); } + void unblock_pcode(const ObRpcPacketCode &pcode) override final + { deliver_.unblock_pcode(pcode); } void set_rpc_loss(const ObAddr &src, const int loss_rate) override final { deliver_.set_rpc_loss(src, loss_rate); } void reset_rpc_loss(const ObAddr &src) override final diff --git a/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp b/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp index 9542a2fa40..00cb9d7d92 100644 --- a/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp +++ b/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp @@ -179,6 +179,8 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_during_degrading) EXPECT_FALSE(leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr)); EXPECT_EQ(leader_max_flushed_end_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_); + dynamic_cast(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start(); + dynamic_cast(get_cluster()[b_idx])->log_service_.get_arbitration_service()->start(); revert_cluster_palf_handle_guard(palf_list); } delete_paxos_group(id); @@ -289,6 +291,8 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_to_other_during_de EXPECT_TRUE(a_handle->palf_handle_impl_->config_mgr_.log_ms_meta_.curr_.log_sync_memberlist_.contains(b_addr)); EXPECT_TRUE(a_handle->palf_handle_impl_->config_mgr_.config_meta_.curr_.log_sync_memberlist_.contains(b_addr)); + dynamic_cast(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start(); + dynamic_cast(get_cluster()[b_idx])->log_service_.get_arbitration_service()->start(); revert_cluster_palf_handle_guard(palf_list); } delete_paxos_group(id); @@ -372,6 +376,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_degrade_when_no_leader EXPECT_UNTIL_EQ(true, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.contains(b_addr)); unblock_net(leader_idx, b_idx); + dynamic_cast(get_cluster()[b_idx])->log_service_.get_arbitration_service()->start(); get_cluster()[c_idx]->get_palf_env()->revert_palf_handle_impl(c_ihandle); revert_cluster_palf_handle_guard(palf_list); } @@ -436,6 +441,76 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_degrade_when_arb_crash PALF_LOG(INFO, "end test test_2f1a_degrade_when_arb_crash", K(id)); } +TEST_F(TestObSimpleLogClusterArbMockEleService, test_arb_degrade_probe) +{ + int ret = OB_SUCCESS; + const int64_t id = ATOMIC_AAF(&palf_id_, 1); + const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; + OB_LOGGER.set_log_level("TRACE"); + SET_CASE_LOG_FILE(TEST_NAME, "test_arb_degrade_probe"); + PALF_LOG(INFO, "begin test test_arb_degrade_probe", K(id)); + { + int64_t leader_idx = 0; + int64_t arb_replica_idx = 0; + PalfHandleImplGuard leader; + std::vector palf_list; + EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader)); + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT)); + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, CONFIG_CHANGE_TIMEOUT)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id)); + EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list)); + dynamic_cast(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start(); + + const int64_t b_idx = (leader_idx + 1) % 5; + const int64_t c_idx = (leader_idx + 2) % 5; + const int64_t d_idx = (leader_idx + 3) % 5; + const int64_t e_idx = (leader_idx + 4) % 5; + const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr(); + const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr(); + const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr(); + const common::ObAddr d_addr = get_cluster()[d_idx]->get_addr(); + const common::ObAddr e_addr = get_cluster()[e_idx]->get_addr(); + PalfHandleImplGuard *a_handle = palf_list[leader_idx]; + PalfHandleImplGuard *b_handle = palf_list[b_idx]; + + // CASE 1. D and E UNKNOWN, do not degrade + block_pcode(d_idx, ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG); + block_pcode(e_idx, ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG); + EXPECT_TRUE(dynamic_cast(get_cluster()[d_idx])->deliver_.need_filter_packet_by_pcode_blacklist(ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG)); + EXPECT_TRUE(dynamic_cast(get_cluster()[e_idx])->deliver_.need_filter_packet_by_pcode_blacklist(ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG)); + sleep(6); + EXPECT_EQ(0, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.get_member_number()); + + // CASE 2. D block_net, E UNKNOWN, do not degrade + block_net(leader_idx, d_idx); + sleep(2); + EXPECT_EQ(0, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.get_member_number()); + + // CASE 3. D block_net, E block_net, degrade + block_net(leader_idx, e_idx); + EXPECT_TRUE(is_degraded(leader, d_idx)); + EXPECT_TRUE(is_degraded(leader, e_idx)); + + // CASE 4. D and E unblock_net but block_pcode, do not upgrade + unblock_net(leader_idx, d_idx); + unblock_net(leader_idx, e_idx); + sleep(2); + EXPECT_EQ(2, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.get_member_number()); + + // CASE 5. D unblock_net and unblock_pcode, upgrade E + unblock_pcode(d_idx, ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG); + EXPECT_UNTIL_EQ(false, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.contains(d_addr)); + + // CASE 6. E unblock_net and unblock_pcode, upgrade E + unblock_pcode(e_idx, ObRpcPacketCode::OB_LOG_ARB_PROBE_MSG); + EXPECT_UNTIL_EQ(false, leader.palf_handle_impl_->config_mgr_.config_meta_.curr_.degraded_learnerlist_.contains(d_addr)); + + revert_cluster_palf_handle_guard(palf_list); + } + delete_paxos_group(id); + PALF_LOG(INFO, "end test test_arb_degrade_probe", K(id)); +} + } // end unittest } // end oceanbase