diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index e7c884ed0..63386daa0 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -132,6 +132,7 @@ int ObSimpleLogServer::simple_init( } tenant_base_->init(); tenant_base_->set(&log_service_); + tenant_base_->set(&detector_); ObTenantEnv::set_tenant(tenant_base_); assert(&log_service_ == MTL(logservice::ObLogService*)); guard.click("init tenant_base"); diff --git a/mittest/logservice/env/ob_simple_log_server.h b/mittest/logservice/env/ob_simple_log_server.h index 253b64ce0..4acfdf580 100644 --- a/mittest/logservice/env/ob_simple_log_server.h +++ b/mittest/logservice/env/ob_simple_log_server.h @@ -45,6 +45,7 @@ #include "share/ob_occam_timer.h" #include "share/resource_manager/ob_cgroup_ctrl.h" #include "logservice/ob_net_keepalive_adapter.h" +#include "logservice/leader_coordinator/ob_failure_detector.h" #include #include @@ -283,6 +284,7 @@ private: common::ObMySQLProxy sql_proxy_; MockNetKeepAliveAdapter *net_keepalive_; ObSrvRpcProxy srv_proxy_; + logservice::coordinator::ObFailureDetector detector_; }; } // end unittest diff --git a/mittest/logservice/test_ob_simple_log_arb.cpp b/mittest/logservice/test_ob_simple_log_arb.cpp index 2a382eee3..559e3745d 100644 --- a/mittest/logservice/test_ob_simple_log_arb.cpp +++ b/mittest/logservice/test_ob_simple_log_arb.cpp @@ -42,6 +42,34 @@ class TestObSimpleLogClusterArbService : public ObSimpleLogClusterTestEnv public: TestObSimpleLogClusterArbService() : ObSimpleLogClusterTestEnv() {} + bool is_degraded(const PalfHandleImplGuard &leader, + const int64_t degraded_server_idx) + { + bool has_degraded = false; + while (!has_degraded) { + common::GlobalLearnerList degraded_learner_list; + leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); + has_degraded = degraded_learner_list.contains(get_cluster()[degraded_server_idx]->get_addr()); + sleep(1); + PALF_LOG(INFO, "wait degrade"); + } + return has_degraded; + } + + bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id) + { + bool has_upgraded = false; + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id)); + while (!has_upgraded) { + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id)); + common::GlobalLearnerList degraded_learner_list; + leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); + has_upgraded = (0 == degraded_learner_list.get_member_number()); + sleep(1); + PALF_LOG(INFO, "wait upgrade"); + } + return has_upgraded; + } }; int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3; @@ -68,37 +96,49 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_degrade_upgrade) EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list)); const int64_t another_f_idx = (leader_idx+1)%3; EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id)); - sleep(2); // 为备副本设置location cb,用于备副本找leader palf_list[another_f_idx]->get_palf_handle_impl()->set_location_cache_cb(&loc_cb); block_net(leader_idx, another_f_idx); // do not check OB_SUCCESS, may return OB_NOT_MASTER during degrading member submit_log(leader, 100, id); - bool has_degraded = false; - while (!has_degraded) { - common::GlobalLearnerList degraded_learner_list; - leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); - has_degraded = degraded_learner_list.contains(palf_list[another_f_idx]->palf_handle_impl_->self_); - sleep(1); - PALF_LOG(INFO, "wait degrade"); - } - EXPECT_TRUE(has_degraded); - // 等待lease过期,验证location cb是否可以找到leader - sleep(5); + EXPECT_TRUE(is_degraded(leader, another_f_idx)); + loc_cb.leader_ = leader.palf_handle_impl_->self_; unblock_net(leader_idx, another_f_idx); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id)); + + EXPECT_TRUE(is_upgraded(leader, id)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id)); + + // set clog disk error + ObTenantEnv::set_tenant(get_cluster()[leader_idx+1]->get_tenant_base()); + logservice::coordinator::ObFailureDetector *detector = MTL(logservice::coordinator::ObFailureDetector *); + if (NULL != detector) { + detector->has_add_clog_full_event_ = true; + } + + EXPECT_TRUE(is_degraded(leader, another_f_idx)); + + if (NULL != detector) { + detector->has_add_clog_full_event_ = false; + } + + EXPECT_TRUE(is_upgraded(leader, id)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id)); - bool has_upgraded = false; - while (!has_upgraded) { - common::GlobalLearnerList degraded_learner_list; - leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); - has_upgraded = (0 == degraded_learner_list.get_member_number()); - sleep(1); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id)); - PALF_LOG(INFO, "wait upgrade"); - } + // test disable sync + palf_list[another_f_idx]->palf_handle_impl_->disable_sync(); + EXPECT_TRUE(is_degraded(leader, another_f_idx)); + palf_list[another_f_idx]->palf_handle_impl_->enable_sync(); + EXPECT_TRUE(is_upgraded(leader, id)); + + // test disbale vote + palf_list[another_f_idx]->palf_handle_impl_->disable_vote(); + EXPECT_TRUE(is_degraded(leader, another_f_idx)); + palf_list[another_f_idx]->palf_handle_impl_->enable_vote(); + EXPECT_TRUE(is_upgraded(leader, id)); + revert_cluster_palf_handle_guard(palf_list); leader.reset(); delete_paxos_group(id); @@ -133,30 +173,19 @@ TEST_F(TestObSimpleLogClusterArbService, test_4f1a_degrade_upgrade) block_all_net(another_f1_idx); block_all_net(another_f2_idx); - bool has_degraded = false; - while (!has_degraded) { - common::GlobalLearnerList degraded_learner_list; - leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); - has_degraded = degraded_learner_list.contains(get_cluster()[another_f1_idx]->get_addr()); - has_degraded = has_degraded && degraded_learner_list.contains(get_cluster()[another_f2_idx]->get_addr()); - sleep(1); - } - EXPECT_TRUE(has_degraded); + + EXPECT_TRUE(is_degraded(leader, another_f1_idx)); + EXPECT_TRUE(is_degraded(leader, another_f2_idx)); + // 确保lease过期,验证loc_cb是否可以找到leader拉日志 sleep(5); unblock_all_net(another_f1_idx); unblock_all_net(another_f2_idx); loc_cb.leader_ = leader.palf_handle_impl_->self_; - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id)); + EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id)); + + EXPECT_TRUE(is_upgraded(leader, id)); - bool has_upgraded = false; - while (!has_upgraded) { - common::GlobalLearnerList degraded_learner_list; - leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); - has_upgraded = (0 == degraded_learner_list.get_member_number()); - sleep(1); - EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id)); - } revert_cluster_palf_handle_guard(palf_list); leader.reset(); delete_paxos_group(id); @@ -191,16 +220,8 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade) submit_log(leader, 20, id); // submit some logs which will be truncated - bool has_degraded = false; - while (!has_degraded) { - common::GlobalLearnerList degraded_learner_list; - palf_list[another_f_idx]->palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); - ObAddr old_leader_addr = palf_list[leader_idx]->palf_handle_impl_->self_; - has_degraded = degraded_learner_list.contains(old_leader_addr); - PALF_LOG(INFO, "check has_degraded", K(has_degraded), K(degraded_learner_list), K(old_leader_addr)); - sleep(1); - } - EXPECT_TRUE(has_degraded); + EXPECT_TRUE(is_degraded(*palf_list[another_f_idx], leader_idx)); + int64_t new_leader_idx = -1; PalfHandleImplGuard new_leader; EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, new_leader_idx)); @@ -210,15 +231,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade) unblock_net(leader_idx, arb_replica_idx); EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, id)); - bool has_upgraded = false; - while (!has_upgraded) { - common::GlobalLearnerList degraded_learner_list; - new_leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); - has_upgraded = (0 == degraded_learner_list.get_member_number()); - PALF_LOG(INFO, "wait upgrade", K(degraded_learner_list)); - EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id)); - sleep(1); - } + EXPECT_TRUE(is_upgraded(new_leader, id)); revert_cluster_palf_handle_guard(palf_list); leader.reset(); new_leader.reset(); @@ -267,16 +280,8 @@ TEST_F(TestObSimpleLogClusterArbService, test_4f1a_reconfirm_degrade_upgrade) EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, new_leader_idx)); } - bool has_degraded = false; - while (!has_degraded) { - common::GlobalLearnerList degraded_learner_list; - new_leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); - has_degraded = degraded_learner_list.contains(cluster[another_f1_idx]->get_addr()); - has_degraded = has_degraded && degraded_learner_list.contains(cluster[leader_idx]->get_addr()); - sleep(1); - EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id)); - } - EXPECT_TRUE(has_degraded); + EXPECT_TRUE(is_degraded(new_leader, another_f1_idx)); + EXPECT_TRUE(is_degraded(new_leader, leader_idx)); sleep(5); loc_cb.leader_ = new_leader.palf_handle_impl_->self_; @@ -286,15 +291,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_4f1a_reconfirm_degrade_upgrade) EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, id)); - bool has_upgraded = false; - while (!has_upgraded) { - common::GlobalLearnerList degraded_learner_list; - new_leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list); - has_upgraded = (0 == degraded_learner_list.get_member_number()); - PALF_LOG(INFO, "wait upgrade", K(ret), K(degraded_learner_list)); - EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id)); - sleep(1); - } + EXPECT_TRUE(is_upgraded(new_leader, id)); leader.reset(); new_leader.reset(); revert_cluster_palf_handle_guard(palf_list); @@ -479,6 +476,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_defensive) int64_t mode_version; EXPECT_EQ(OB_SUCCESS, get_middle_scn(50, leader, flashback_scn, header_origin)); switch_append_to_flashback(leader, mode_version); + sleep(1); EXPECT_EQ(OB_SUCCESS, palf_list[another_f_idx]->palf_handle_impl_->flashback(mode_version, flashback_scn, CONFIG_CHANGE_TIMEOUT)); // remove another follower diff --git a/src/logservice/logrpc/ob_log_rpc_req.cpp b/src/logservice/logrpc/ob_log_rpc_req.cpp index d97e6928a..cfbe7c99f 100644 --- a/src/logservice/logrpc/ob_log_rpc_req.cpp +++ b/src/logservice/logrpc/ob_log_rpc_req.cpp @@ -248,17 +248,23 @@ OB_SERIALIZE_MEMBER(LogGetPalfStatResp, palf_stat_); // ============= LogServerProbeMsg start ============= LogServerProbeMsg::LogServerProbeMsg() : src_(), + palf_id_(-1), + req_id_(-1), msg_type_(PROBE_REQ), - req_ts_(OB_INVALID_TIMESTAMP) { } + server_status_(-1) { } LogServerProbeMsg::LogServerProbeMsg( const common::ObAddr &src, + const int64_t palf_id, + const int64_t req_id, const LogServerProbeType msg_type, - const int64_t req_ts) + const int64_t status) : src_(src), + palf_id_(palf_id), + req_id_(req_id), msg_type_(msg_type), - req_ts_(req_ts) { } + server_status_(status) { } LogServerProbeMsg::~LogServerProbeMsg() { @@ -267,17 +273,19 @@ LogServerProbeMsg::~LogServerProbeMsg() bool LogServerProbeMsg::is_valid() const { - return src_.is_valid() && req_ts_ != OB_INVALID_TIMESTAMP; + return src_.is_valid() && -1 != palf_id_ && req_id_ != -1 && server_status_ != -1; } void LogServerProbeMsg::reset() { src_.reset(); + palf_id_ = -1; + req_id_ = -1; msg_type_ = PROBE_REQ; - req_ts_ = OB_INVALID_TIMESTAMP; + server_status_ = -1; } -OB_SERIALIZE_MEMBER(LogServerProbeMsg, src_, msg_type_, req_ts_); +OB_SERIALIZE_MEMBER(LogServerProbeMsg, src_, palf_id_, req_id_, msg_type_, server_status_); // ============= LogServerProbeMsg end ============= // ============= LogChangeAccessModeCmd start ============= diff --git a/src/logservice/logrpc/ob_log_rpc_req.h b/src/logservice/logrpc/ob_log_rpc_req.h index 5dcfe6732..e241fdd87 100644 --- a/src/logservice/logrpc/ob_log_rpc_req.h +++ b/src/logservice/logrpc/ob_log_rpc_req.h @@ -170,14 +170,20 @@ struct LogServerProbeMsg { OB_UNIS_VERSION(1); public: LogServerProbeMsg(); - LogServerProbeMsg(const common::ObAddr &src, const LogServerProbeType msg_type, const int64_t req_ts); + LogServerProbeMsg(const common::ObAddr &src, + const int64_t palf_id, + const int64_t req_id, + const LogServerProbeType msg_type, + const int64_t status); ~LogServerProbeMsg(); bool is_valid() const; void reset(); - TO_STRING_KV(K_(src), K_(msg_type), K_(req_ts)); + TO_STRING_KV(K_(src), K_(palf_id), K_(req_id), K_(msg_type), K_(server_status)); common::ObAddr src_; + int64_t palf_id_; + int64_t req_id_; LogServerProbeType msg_type_; - int64_t req_ts_; + int64_t server_status_; }; struct LogChangeAccessModeCmd { diff --git a/src/logservice/ob_arbitration_service.h b/src/logservice/ob_arbitration_service.h deleted file mode 100644 index 1082e84eb..000000000 --- a/src/logservice/ob_arbitration_service.h +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright (c) 2021 OceanBase -// OceanBase is licensed under Mulan PubL v2. -// You can use this software according to the terms and conditions of the Mulan PubL v2. -// You may obtain a copy of Mulan PubL v2 at: -// http://license.coscl.org.cn/MulanPubL-2.0 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -// See the Mulan PubL v2 for more details. - -#ifndef OCEANBASE_LOGSERVICE_OB_ARBITRATION_SERVICE_H_ -#define OCEANBASE_LOGSERVICE_OB_ARBITRATION_SERVICE_H_ - -#include "share/ob_thread_pool.h" -#include "share/ob_occam_timer.h" -#include "lib/hash/ob_linear_hash_map.h" // ObLinearHashMap -#include "lib/lock/ob_tc_rwlock.h" // RWLock -#include "common/ob_member_list.h" // common::ObMemberList -#include "logrpc/ob_log_rpc_req.h" // ProbeMsg - -namespace oceanbase -{ -namespace palf -{ -class PalfEnv; -class LogMemberAckInfo; -} - -namespace obrpc -{ -class ObLogServiceRpcProxy; -} - -namespace logservice -{ -class IObNetKeepAliveAdapter; -enum ServerAliveStatus -{ - UNKNOWN = 0, - ALIVE, - DEAD, -}; - -inline const char *server_status_to_string(ServerAliveStatus status) -{ - #define SERVER_ALIVE_STATUS_TO_STR(x) case(ServerAliveStatus::x): return #x - switch(status) - { - SERVER_ALIVE_STATUS_TO_STR(ALIVE); - SERVER_ALIVE_STATUS_TO_STR(DEAD); - default: - return "UNKNOWN"; - } - #undef SERVER_ALIVE_STATUS_TO_STR -} - -struct ServerProbeCtx -{ - ServerProbeCtx() - : last_req_ts_(OB_INVALID_TIMESTAMP), - last_resp_ts_(OB_INVALID_TIMESTAMP), - alive_status_(UNKNOWN) - { } - - void reset_probe_ts() - { - last_req_ts_ = OB_INVALID_TIMESTAMP; - last_resp_ts_ = OB_INVALID_TIMESTAMP; - } - - // ts are valid only in current status - int64_t last_req_ts_; - int64_t last_resp_ts_; - ServerAliveStatus alive_status_; - TO_STRING_KV(K_(last_req_ts), K_(last_resp_ts), "alive_status", server_status_to_string(alive_status_)); -}; - -class ServerProbeService -{ -public: - ServerProbeService() - : self_(), - probing_servers_(), - rpc_proxy_(NULL), - timer_(), - is_running_(false), - is_inited_(false) - { } - ~ServerProbeService() { destroy(); } - int init(const common::ObAddr &self, obrpc::ObLogServiceRpcProxy *rpc_proxy); - void destroy(); - int start_probe_server(const common::ObAddr &server); - int stop_probe_server(const common::ObAddr &server); - int get_server_alive_status(const common::ObAddr &server, ServerAliveStatus &status) const; - void run_probe_once(); - int handle_server_probe_msg(const common::ObAddr &sender, const LogServerProbeMsg &req); -private: - void run_probe_once_(); -private: - typedef RWLock::RLockGuard RLockGuard; - typedef RWLock::WLockGuard WLockGuard; - static constexpr int64_t DEAD_SERVER_TIMEOUT = 2 * 1000 * 1000; // 2s - typedef common::ObLinearHashMap ServerProbeMap; -private: - mutable RWLock lock_; - common::ObAddr self_; - ServerProbeMap probing_servers_; - obrpc::ObLogServiceRpcProxy *rpc_proxy_; - ObOccamTimer timer_; - bool is_running_; - bool is_inited_; -}; - -class ObArbitrationService : public share::ObThreadPool -{ -public: - ObArbitrationService(); - virtual ~ObArbitrationService(); - int init(const common::ObAddr &self, - palf::PalfEnv *palf_env, - obrpc::ObLogServiceRpcProxy *rpc_proxy, - IObNetKeepAliveAdapter *net_keepalive); - int start(); - void destroy(); - void run1(); - int handle_server_probe_msg(const common::ObAddr &sender, const LogServerProbeMsg &req); -private: - static constexpr int64_t MIN_LOOP_INTERVAL_US = 10 * 1000; // 10ms - static constexpr int64_t DEGRADE_ACTION_TIMEOUT_US = 10 * 1000 * 1000L; // 10s - static constexpr int64_t UPGRADE_ACTION_TIMEOUT_US = 10 * 1000 * 1000L; // 10s - static constexpr int64_t MAX_PALF_COUNT = 200; - typedef common::ObLinearHashMap PalfAckInfoMap; -private: - class DoDegradeFunctor - { - public: - explicit DoDegradeFunctor(const common::ObAddr &addr, - palf::PalfEnv *palf_env, - ServerProbeService *probe_srv, - IObNetKeepAliveAdapter *net_keepalive) - : self_(addr), - palf_env_(palf_env), - probe_srv_(probe_srv), - net_keepalive_(net_keepalive) - { } - bool operator()(const palf::LSKey &ls_key, const palf::LogMemberAckInfoList °rade_servers); - - public: - common::ObSEArray need_removed_palfs_; - bool is_valid() - { - return self_.is_valid() && OB_NOT_NULL(palf_env_) && - OB_NOT_NULL(probe_srv_) && OB_NOT_NULL(net_keepalive_); - } - private: - bool is_all_other_servers_alive_( - const common::ObMemberList &all_servers, - const common::GlobalLearnerList °raded_list, - const palf::LogMemberAckInfoList &excepted_servers); - bool is_allow_degrade_( - const common::ObMemberList &expected_member_list, - const int64_t replica_num, - const common::GlobalLearnerList °raded_list, - const palf::LogMemberAckInfoList &may_be_degraded_servers); - common::ObAddr self_; - palf::PalfEnv *palf_env_; - ServerProbeService *probe_srv_; - IObNetKeepAliveAdapter *net_keepalive_; - }; - - class DoUpgradeFunctor - { - public: - explicit DoUpgradeFunctor(palf::PalfEnv *palf_env, ServerProbeService *probe_srv, IObNetKeepAliveAdapter *net_keepalive) - : palf_env_(palf_env), - probe_srv_(probe_srv), - net_keepalive_(net_keepalive) - { } - bool operator()(const palf::LSKey &ls_key, const palf::LogMemberAckInfoList &upgrade_servers); - - public: - common::ObSEArray need_removed_palfs_; - private: - palf::PalfEnv *palf_env_; - ServerProbeService *probe_srv_; - IObNetKeepAliveAdapter *net_keepalive_; - }; - -private: - int start_probe_servers_(const palf::LogMemberAckInfoList &servers); - int start_probe_servers_(const common::ObMemberList &servers); - void start_probe_server_(const common::ObAddr &server) const; - int follower_probe_others_(const int64_t palf_id, const common::ObMemberList &paxos_member_list); - int get_server_sync_info_(); - void run_loop_(); - void update_arb_timeout_(); - int update_server_map_(PalfAckInfoMap &palf_map, - const palf::LSKey &key, - const palf::LogMemberAckInfoList &val, - bool &is_updated) - { - int ret = OB_SUCCESS; - palf::LogMemberAckInfoList existed_val; - is_updated = false; - if (OB_FAIL(palf_map.get(key, existed_val))) { - if (OB_ENTRY_NOT_EXIST == ret) { - if (OB_FAIL(palf_map.insert(key, val))) { - CLOG_LOG(WARN, "palf_map insert failed", K(key), K(val)); - } else { - is_updated = true; - } - } - } else if (palf::ack_info_list_addr_equal(existed_val, val)) { - // pass, do not insert - } else if (OB_FAIL(palf_map.insert_or_update(key, val))) { - CLOG_LOG(WARN, "palf_map insert_or_update failed", K(key), K(val)); - } else { - is_updated = true; - CLOG_LOG(TRACE, "update_server_map_ success", KR(ret), K(key), K(val), K(is_updated)); - } - CLOG_LOG(TRACE, "update_server_map_ finish", KR(ret), K(key), K(val), K(is_updated)); - return ret; - } -private: - common::ObAddr self_; - ServerProbeService probe_srv_; - PalfAckInfoMap may_be_degraded_palfs_; - PalfAckInfoMap may_be_upgraded_palfs_; - int64_t arb_timeout_us_; - int64_t follower_last_probe_time_us_; - palf::PalfEnv *palf_env_; - obrpc::ObLogServiceRpcProxy *rpc_proxy_; - IObNetKeepAliveAdapter *net_keepalive_; - bool is_inited_; -}; - -} // logservice -} // oceanbase - -#endif diff --git a/src/logservice/palf/log_ack_info.h b/src/logservice/palf/log_ack_info.h index 475236198..075ddc1cc 100644 --- a/src/logservice/palf/log_ack_info.h +++ b/src/logservice/palf/log_ack_info.h @@ -82,7 +82,8 @@ struct LogMemberAckInfo typedef common::ObSEArray LogMemberAckInfoList; -inline int64_t ack_info_list_get_index(const LogMemberAckInfoList &list_a, +template +inline int64_t ack_info_list_get_index(const common::ObSEArray &list_a, const common::ObAddr &addr) { int64_t index = -1; @@ -95,8 +96,9 @@ inline int64_t ack_info_list_get_index(const LogMemberAckInfoList &list_a, return index; } +template inline bool ack_info_list_addr_equal(const common::GlobalLearnerList &list_a, - const LogMemberAckInfoList &list_b) + const common::ObSEArray &list_b) { bool bool_ret = true; if (list_a.get_member_number() != list_b.count()) { @@ -112,8 +114,9 @@ inline bool ack_info_list_addr_equal(const common::GlobalLearnerList &list_a, return bool_ret; } -inline bool ack_info_list_addr_equal(const LogMemberAckInfoList &list_a, - const LogMemberAckInfoList &list_b) +template +inline bool ack_info_list_addr_equal(const common::ObSEArray &list_a, + const common::ObSEArray &list_b) { bool bool_ret = true; if (list_a.count() != list_b.count()) { diff --git a/src/logservice/palf/palf_handle.cpp b/src/logservice/palf/palf_handle.cpp index 5ae214608..704627976 100644 --- a/src/logservice/palf/palf_handle.cpp +++ b/src/logservice/palf/palf_handle.cpp @@ -412,6 +412,15 @@ int PalfHandle::disable_vote() return ret; } +bool PalfHandle::is_vote_enabled() const +{ + int ret = OB_SUCCESS; + bool bool_ret = false; + CHECK_VALID; + bool_ret = palf_handle_impl_->is_vote_enabled(); + return bool_ret; +} + int PalfHandle::enable_vote() { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/palf_handle.h b/src/logservice/palf/palf_handle.h index 35b02dd1f..6865da376 100644 --- a/src/logservice/palf/palf_handle.h +++ b/src/logservice/palf/palf_handle.h @@ -318,6 +318,11 @@ public: // OB_SUCCESS int get_access_mode(int64_t &mode_version, AccessMode &access_mode) const; int get_access_mode(AccessMode &access_mode) const; + + // @brief: check whether the palf instance is allowed to vote for logs + // By default, return true; + // After calling disable_vote(), return false. + bool is_vote_enabled() const; // @brief: store a persistent flag which means this paxos replica // can not reply ack when receiving logs. // By default, paxos replica can reply ack. diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index fe21cf04b..1b13be9eb 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -1207,6 +1207,16 @@ int PalfHandleImpl::disable_sync() return ret; } +bool PalfHandleImpl::is_vote_enabled() const +{ + bool bool_ret = false; + if (IS_NOT_INIT) { + } else { + bool_ret = state_mgr_.is_allow_vote(); + } + return bool_ret; +} + int PalfHandleImpl::disable_vote() { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index b0943bc24..1c87b3698 100644 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -558,6 +558,11 @@ public: const int64_t prev_log_id, const int64_t &prev_log_proposal_id, const LSN &committed_end_lsn) = 0; + + // @brief: check whether the palf instance is allowed to vote for logs + // By default, return true; + // After calling disable_vote(), return false. + virtual bool is_vote_enabled() const = 0; // @brief: store a persistent flag which means this paxos replica // can not reply ack when receiving logs. // By default, paxos replica can reply ack. @@ -695,6 +700,7 @@ public: int advance_base_info(const PalfBaseInfo &palf_base_info, const bool is_rebuild) override final; int locate_by_scn_coarsely(const share::SCN &scn, LSN &result_lsn) override final; int locate_by_lsn_coarsely(const LSN &lsn, share::SCN &result_scn) override final; + bool is_vote_enabled() const override final; int disable_vote() override final; int enable_vote() override final; public: diff --git a/unittest/logservice/test_ob_arbitration_service.cpp b/unittest/logservice/test_ob_arbitration_service.cpp index fe71fb9f6..e7383d226 100644 --- a/unittest/logservice/test_ob_arbitration_service.cpp +++ b/unittest/logservice/test_ob_arbitration_service.cpp @@ -60,65 +60,65 @@ TEST_F(TestObArbitrationService, locality_allow_degrade_test) { // 2F, degrade 1, allow MockNetKeepAliveAdapter net_keepalive; - ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive); + ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL); const int64_t palf_id = 1; const int64_t replica_num = 2; paxos_list.add_server(addr1); paxos_list.add_server(addr2); - palf::LogMemberAckInfoList dead_servers; + LogMemberStatusList dead_servers; common::GlobalLearnerList degraded_servers; - EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000)))); + EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000))))); EXPECT_TRUE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers)); } { // 4F, degrade 3, not allow MockNetKeepAliveAdapter net_keepalive; - ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive); + ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL); const int64_t palf_id = 1; paxos_list.add_server(addr3); paxos_list.add_server(addr4); const int64_t replica_num = 4; - palf::LogMemberAckInfoList dead_servers; + LogMemberStatusList dead_servers; common::GlobalLearnerList degraded_servers; - EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000)))); - EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr2, 1), 1, LSN(1000)))); - EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000)))); + EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000))))); + EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr2, 1), 1, LSN(1000))))); + EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000))))); EXPECT_FALSE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers)); } { // 4F, degrade 1, not allow MockNetKeepAliveAdapter net_keepalive; - ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive); + ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL); const int64_t palf_id = 1; const int64_t replica_num = 4; - palf::LogMemberAckInfoList dead_servers; + LogMemberStatusList dead_servers; common::GlobalLearnerList degraded_servers; - EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000)))); + EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr1, 1), 1, LSN(1000))))); EXPECT_FALSE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers)); } { // 3F1A, degrade 1, not allow MockNetKeepAliveAdapter net_keepalive; - ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive); + ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL); const int64_t palf_id = 1; paxos_list.remove_server(addr4); const int64_t replica_num = 3; - palf::LogMemberAckInfoList dead_servers; + LogMemberStatusList dead_servers; common::GlobalLearnerList degraded_servers; - EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000)))); + EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000))))); EXPECT_FALSE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers)); } { // 4F1A, degrade 2(addr2, addr3), allow MockNetKeepAliveAdapter net_keepalive; - ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL, &net_keepalive); + ObArbitrationService::DoDegradeFunctor do_degrade_func(addr1, NULL, NULL); const int64_t palf_id = 1; paxos_list.add_server(addr4); const int64_t replica_num = 4; - palf::LogMemberAckInfoList dead_servers; + LogMemberStatusList dead_servers; common::GlobalLearnerList degraded_servers; - EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr2, 1), 1, LSN(1000)))); - EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000)))); + EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr2, 1), 1, LSN(1000))))); + EXPECT_EQ(OB_SUCCESS, dead_servers.push_back(LogMemberStatus(LogMemberAckInfo(ObMember(addr3, 1), 1, LSN(1000))))); EXPECT_TRUE(do_degrade_func.is_allow_degrade_(paxos_list, replica_num, degraded_servers, dead_servers)); } }