diff --git a/src/clog/ob_clog_config.h b/src/clog/ob_clog_config.h index 566fa53d34..266f2e9a06 100644 --- a/src/clog/ob_clog_config.h +++ b/src/clog/ob_clog_config.h @@ -126,6 +126,7 @@ const int64_t FAKE_ACK_MSG_VALID_TIME = CLOG_LEADER_ACTIVE_SYNC_TIMEOUT; const int64_t CLOG_LEADER_RECONFIRM_SYNC_TIMEOUT = 10 * 1000 * 1000; // time interval for requesting max_log_id during leader reconfirming. const int64_t CLOG_RECONFIRM_FETCH_MAX_LOG_ID_INTERVAL = 2 * 1000 * 1000; +const int64_t CLOG_RECONFIRM_PRIMARY_NOTIFY_STANDBY_INTERVAL = 2 * 1000 * 1000; const int64_t CLOG_AIO_WRITE_TIMEOUT = 30 * 1000 * 1000; // 30s const int64_t RECONFIRM_LOG_ARRAY_LENGTH = 128; // log array size for reconfirm diff --git a/src/clog/ob_clog_mgr.cpp b/src/clog/ob_clog_mgr.cpp index 8bb0eb5061..da155777f2 100644 --- a/src/clog/ob_clog_mgr.cpp +++ b/src/clog/ob_clog_mgr.cpp @@ -320,7 +320,11 @@ int ObCLogMgr::create_partition_(const ObPartitionKey& partition_key, const int6 OB_FAIL(pls->set_offline())) { // physical restore data phase need set clog offline STORAGE_LOG(WARN, "fail to set_offline", K(ret), K(partition_key)); - } else if (archive_restore_state >= share::REPLICA_RESTORE_DATA && OB_FAIL(pls->set_scan_disk_log_finished())) { + } else if ((archive_restore_state >= share::REPLICA_RESTORE_DATA + // For two-phase-created non-paxos replica, because it won't execute set_election_leader or + // set_member_list, so it need set_scan_disk_log_finished here. + || !ObReplicaTypeCheck::is_paxos_replica(replica_type)) && + OB_FAIL(pls->set_scan_disk_log_finished())) { // partitions created with physical restore flag need skip scan disk log stage, similar with add_partition // standby replica will always run here. CLOG_LOG(WARN, "set_scan_disk_log_finished failed", K(ret), K(partition_key)); diff --git a/src/clog/ob_i_log_engine.h b/src/clog/ob_i_log_engine.h index 6a1d2f0ebd..f1d217ef0a 100644 --- a/src/clog/ob_i_log_engine.h +++ b/src/clog/ob_i_log_engine.h @@ -134,6 +134,12 @@ public: virtual int notify_follower_log_missing(const common::ObAddr& server, const int64_t cluster_id, const common::ObPartitionKey& partition_key, const uint64_t start_log_id, const bool is_in_member_list, const int32_t msg_type) = 0; + virtual int send_restore_check_rqst(const common::ObAddr &server, const int64_t dst_cluster_id, + const common::ObPartitionKey &key, const ObRestoreCheckType restore_type) = 0; + virtual int send_query_restore_end_id_resp(const common::ObAddr &server, + const int64_t cluster_id, + const common::ObPartitionKey &partition_key, + const uint64_t last_restore_log_id) = 0; virtual void update_clog_info(const int64_t max_submit_timestamp) = 0; virtual void update_clog_info( const common::ObPartitionKey& partition_key, const uint64_t log_id, const int64_t submit_timestamp) = 0; diff --git a/src/clog/ob_log_cascading_mgr.cpp b/src/clog/ob_log_cascading_mgr.cpp index b38beddb57..b24491edf2 100644 --- a/src/clog/ob_log_cascading_mgr.cpp +++ b/src/clog/ob_log_cascading_mgr.cpp @@ -219,15 +219,6 @@ int ObLogCascadingMgr::set_parent_(const common::ObAddr& new_parent_addr, const K(new_parent), K(cur_parent), K(leader)); - } else if (STANDBY_LEADER == state_mgr_->get_role() && GCTX.is_sync_level_on_standby() && primary_leader.is_valid() && - new_parent_addr != primary_leader) { - // in max protection mode, the parent of standby leader must be primary leader - CLOG_LOG(WARN, - "standby_leader is in max_protection mode, but new_parent is not primary_leader", - K_(partition_key), - K(new_parent), - K(cur_parent), - K(primary_leader)); } else if (self_ == new_parent_addr) { // ignore self } else if (cur_parent == new_parent) { @@ -241,8 +232,15 @@ int ObLogCascadingMgr::set_parent_(const common::ObAddr& new_parent_addr, const prev_parent_ = parent_; parent_ = new_parent; if (partition_reach_time_interval(60 * 1000 * 1000, update_parent_warn_time_)) { - CLOG_LOG( - INFO, "update parent", K_(partition_key), K(cur_parent), K(new_parent), K(prev_parent_), K_(children_list)); + CLOG_LOG(INFO, + "update parent", + K_(partition_key), + K(cur_parent), + K(new_parent), + K(prev_parent_), + K_(children_list), + K(leader), + K(primary_leader)); } } if (parent_.get_server().is_valid()) { @@ -376,7 +374,8 @@ int ObLogCascadingMgr::reject_server( return ret; } -int ObLogCascadingMgr::process_reject_msg(const common::ObAddr& server, const int32_t msg_type) +int ObLogCascadingMgr::process_reject_msg( + const common::ObAddr& server, const int64_t cluster_id, const int32_t msg_type) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -435,6 +434,14 @@ int ObLogCascadingMgr::process_reject_msg(const common::ObAddr& server, const in "leader", state_mgr_->get_leader()); } + } else if (OB_REPLICA_MSG_TYPE_QUICK_REGISTER == msg_type) { + ObRegion self_region = DEFAULT_REGION_NAME; + if (OB_FAIL(get_server_region_(self_, self_region))) { + CLOG_LOG(WARN, "get_server_region_ failed", K_(partition_key), K(ret)); + } else if (OB_FAIL(fetch_register_server(server, cluster_id, self_region, state_mgr_->get_idc(), true, false))) { + CLOG_LOG(WARN, "fetch_register_server failed", K(ret), K(partition_key_)); + } else { + } } else { // do nothing } @@ -561,13 +568,17 @@ int ObLogCascadingMgr::primary_process_protect_mode_switch() if (old_sync_child.is_valid() && OB_FAIL(try_add_child_unlock_(old_sync_child))) { CLOG_LOG(WARN, "try_add_child_unlock_ failed", K_(partition_key), K(ret), K(old_sync_child)); } + // reset sync_standby_child_ sync_standby_child_.reset(); } else if (GCTX.need_sync_to_standby()) { // changed to sync mode(max protection/max availability), try to update sync child int64_t sync_cluster_id = OB_INVALID_CLUSTER_ID; ObAddr dst_standby_child; - if (OB_FAIL(get_sync_standby_cluster_id(sync_cluster_id)) || OB_INVALID_CLUSTER_ID == sync_cluster_id) { + if (OB_FAIL(get_sync_standby_cluster_id(sync_cluster_id))) { CLOG_LOG(WARN, "get_sync_standby_cluster_id failed", K_(partition_key), K(ret)); + } else if (OB_INVALID_CLUSTER_ID == sync_cluster_id) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(ERROR, "sync_cluster_id returned by GCTX is invalid", K_(partition_key), K(sync_cluster_id), K(ret)); } else if (has_async_standby_child_(sync_cluster_id, dst_standby_child)) { // if it is aysnc child, change it to sync child if (OB_FAIL(async_standby_children_.remove_server(dst_standby_child))) { @@ -579,6 +590,8 @@ int ObLogCascadingMgr::primary_process_protect_mode_switch() CLOG_LOG(INFO, "update sync_standby_child_ success", K_(partition_key), K_(sync_standby_child)); } } else { + // reset sync_standby_child_ + sync_standby_child_.reset(); // need get sync stadnby_leader from location cache, it may returns -4023, just ignore int tmp_ret = OB_SUCCESS; if (OB_SUCCESS != (tmp_ret = leader_try_update_sync_standby_child_unlock_(true))) { diff --git a/src/clog/ob_log_cascading_mgr.h b/src/clog/ob_log_cascading_mgr.h index b5755c574c..071102aeed 100644 --- a/src/clog/ob_log_cascading_mgr.h +++ b/src/clog/ob_log_cascading_mgr.h @@ -76,7 +76,7 @@ public: } bool is_valid_child(const common::ObAddr& server) const; int reject_server(const common::ObAddr& server, const int64_t cluster_id, const int32_t msg_type) const; - int process_reject_msg(const common::ObAddr& server, const int32_t msg_type); + int process_reject_msg(const common::ObAddr& server, const int64_t cluster_id, const int32_t msg_type); int process_region_change(const common::ObRegion& new_region); int process_fetch_reg_server_req(const common::ObAddr& server, const common::ObReplicaType replica_type, const bool is_self_lag_behind, const bool is_request_leader, const bool is_need_force_register, diff --git a/src/clog/ob_log_define.h b/src/clog/ob_log_define.h index d2422bc88f..b456f77d0f 100644 --- a/src/clog/ob_log_define.h +++ b/src/clog/ob_log_define.h @@ -141,6 +141,7 @@ enum ObReplicaMsgType { OB_REPLICA_MSG_TYPE_NOT_CHILD = 3, // I'm not your child OB_REPLICA_MSG_TYPE_NOT_EXIST = 4, // partition not exist OB_REPLICA_MSG_TYPE_DISABLED_STATE = 5, // server in disabled state + OB_REPLICA_MSG_TYPE_QUICK_REGISTER= 6, // quick register to me }; enum ObRegRespMsgType { @@ -160,7 +161,16 @@ enum ObFetchLogType { OB_FETCH_LOG_TYPE_MAX, }; -enum ReceiveLogType { +enum ObRestoreCheckType +{ + OB_CHECK_UNKNOWN = 0, + OB_CHECK_STANDBY_RESTORE = 1, + OB_CHECK_RESTORE_END_ID = 2, + OB_CHECK_MAX, +}; + +enum ReceiveLogType +{ RL_TYPE_UNKNOWN = 0, PUSH_LOG = 1, FETCH_LOG = 2, diff --git a/src/clog/ob_log_engine.cpp b/src/clog/ob_log_engine.cpp index 99397d7abd..69090bc80d 100644 --- a/src/clog/ob_log_engine.cpp +++ b/src/clog/ob_log_engine.cpp @@ -95,11 +95,11 @@ int ObLogEnv::init(const Config& cfg, const ObAddr& self_addr, ObIInfoBlockHandl ret = OB_INIT_FAIL; CLOG_LOG(WARN, "create file store failed.", K(ret)); } else if (OB_FAIL(direct_reader_.init(cfg.log_dir_, - nullptr/*no shared memory*/, - use_log_cache, - &log_cache_, - &log_tail_, - write_pool_type))) { + nullptr /*no shared memory*/, + use_log_cache, + &log_cache_, + &log_tail_, + write_pool_type))) { CLOG_LOG(WARN, "direct reader init error", K(ret), K(enable_log_cache), K(write_pool_type)); } else if (OB_FAIL(init_log_file_writer(cfg.log_dir_, file_store_))) { CLOG_LOG(WARN, "Fail to init log file writer ", K(ret)); @@ -338,7 +338,7 @@ bool ObLogEnv::cluster_version_before_2000_() const return GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_2000; } -int ObLogEnv::init_log_file_writer(const char *log_dir, const ObILogFileStore *file_store) +int ObLogEnv::init_log_file_writer(const char* log_dir, const ObILogFileStore* file_store) { int ret = OB_SUCCESS; if (nullptr == @@ -763,12 +763,7 @@ int ObLogEngine::init(const ObLogEnv::Config& cfg, const ObAddr& self_addr, obrp self_addr, cfg.index_cache_name_, cfg.index_cache_priority_, ilog_hot_cache_size))) { CLOG_LOG(WARN, "failed to init ilog_log_cache", K(ret)); } else if (OB_FAIL(ilog_storage_.init( - cfg.index_log_dir_, - server_seq, - self_addr, - &ilog_log_cache_, - partition_service, - &clog_env_))) { + cfg.index_log_dir_, server_seq, self_addr, &ilog_log_cache_, partition_service, &clog_env_))) { CLOG_LOG(WARN, "ilog_storage_ init failed", K(ret)); } else { batch_rpc_ = batch_rpc; @@ -1017,7 +1012,7 @@ static bool is_need_batch(int pcode) OB_REREGISTER_MSG == pcode || OB_CHECK_REBUILD_REQ == pcode || OB_FAKE_ACK_LOG == pcode || OB_RESTORE_LEADER_TAKEOVER_MSG == pcode || OB_RESTORE_ALIVE_REQ == pcode || OB_RESTORE_ALIVE_RESP == pcode || OB_RENEW_MS_CONFIRMED_INFO_REQ == pcode || OB_RENEW_MS_LOG_ACK == pcode || OB_FAKE_PUSH_LOG == pcode || - OB_SYNC_LOG_ARCHIVE_PROGRESS == pcode; + OB_SYNC_LOG_ARCHIVE_PROGRESS == pcode || OB_RESTORE_CHECK_REQ == pcode; } template @@ -1300,6 +1295,23 @@ int ObLogEngine::fetch_log_from_leader(const common::ObAddr& server, const int64 return ret; } +int ObLogEngine::send_restore_check_rqst(const common::ObAddr& server, const int64_t dst_cluster_id, + const common::ObPartitionKey& key, const ObRestoreCheckType restore_type) +{ + int ret = OB_SUCCESS; + const uint64_t tenant_id = OB_SERVER_TENANT_ID; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (!server.is_valid() || !key.is_valid() || OB_INVALID_CLUSTER_ID == dst_cluster_id) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", K(server), K(key), K(dst_cluster_id)); + } else { + ObRestoreCheckReq req(restore_type); + ret = post_packet(tenant_id, server, dst_cluster_id, key, OB_RESTORE_CHECK_REQ, req); + } + return ret; +} + int ObLogEngine::submit_check_rebuild_req( const common::ObAddr& server, const int64_t dst_cluster_id, const ObPartitionKey& key, const uint64_t start_id) { @@ -1719,6 +1731,25 @@ int ObLogEngine::send_restore_alive_resp( return ret; } +int ObLogEngine::send_query_restore_end_id_resp(const common::ObAddr& server, const int64_t cluster_id, + const common::ObPartitionKey& partition_key, const uint64_t last_restore_log_id) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (!server.is_valid() || OB_INVALID_ID == last_restore_log_id || !partition_key.is_valid()) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", K(server), K(partition_key), K(last_restore_log_id), K(ret)); + } else { + const uint64_t tenant_id = OB_SERVER_TENANT_ID; + ObQueryRestoreEndIdResp req(last_restore_log_id); + ret = post_packet(tenant_id, server, cluster_id, partition_key, OB_QUERY_RESTORE_END_ID_RESP, req); + CLOG_LOG( + DEBUG, "send_query_restore_end_id_resp finished", K(ret), K(server), K(partition_key), K(last_restore_log_id)); + } + return ret; +} + int ObLogEngine::notify_restore_leader_takeover(const common::ObAddr& server, const common::ObPartitionKey& key) { int ret = OB_SUCCESS; diff --git a/src/clog/ob_log_engine.h b/src/clog/ob_log_engine.h index 2f5261270c..49edfcb7cb 100644 --- a/src/clog/ob_log_engine.h +++ b/src/clog/ob_log_engine.h @@ -143,7 +143,7 @@ protected: // the clog_writer is returned to be busy static const int64_t BUFFER_ITEM_CONGESTED_PERCENTAGE = 50; bool cluster_version_before_2000_() const; - int init_log_file_writer(const char *log_dir, const ObILogFileStore *file_store); + int init_log_file_writer(const char* log_dir, const ObILogFileStore* file_store); bool is_inited_; Config config_; @@ -434,6 +434,8 @@ public: const common::ObProposalID proposal_id) override; int broadcast_info(const common::ObMemberList& mem_list, const common::ObPartitionKey& key, const common::ObReplicaType& replica_type, const uint64_t max_confirmed_log_id) override; + int send_restore_check_rqst(const common::ObAddr& server, const int64_t dst_cluster_id, + const common::ObPartitionKey& key, const ObRestoreCheckType restore_type); // confirmed_info msg is special that no need compare proposal_id int submit_confirmed_info(const share::ObCascadMemberList& mem_list, const common::ObPartitionKey& key, const uint64_t log_id, const ObConfirmedInfo& confirmed_info, const bool batch_committed) override; @@ -473,6 +475,8 @@ public: bool& remote_replica_is_normal) override; int get_remote_mc_ctx_array( const common::ObAddr& server, const common::ObPartitionArray& partition_array, McCtxArray& mc_ctx_array); + int send_query_restore_end_id_resp(const common::ObAddr& server, const int64_t cluster_id, + const common::ObPartitionKey& partition_key, const uint64_t last_restore_log_id); int update_min_using_file_id(); uint32_t get_clog_min_using_file_id() const override; uint32_t get_clog_min_file_id() const override; diff --git a/src/clog/ob_log_event_scheduler.cpp b/src/clog/ob_log_event_scheduler.cpp index d5ba8fb3fd..c0471616d2 100644 --- a/src/clog/ob_log_event_scheduler.cpp +++ b/src/clog/ob_log_event_scheduler.cpp @@ -26,15 +26,17 @@ ObLogEventScheduler::~ObLogEventScheduler() int ObLogEventScheduler::init() { int ret = common::OB_SUCCESS; - const char* CLOG_EVENT_TIME_WHEEL_NAME = "ClogEventTimeWheel"; + const char *CLOG_EVENT_TIME_WHEEL_NAME = "ClogEventTimeWheel"; + const int64_t thread_num = get_time_wheel_thread_num_(); if (IS_INIT) { ret = common::OB_INIT_TWICE; CLOG_LOG(ERROR, "ObLogEventScheduler init twice", K(ret)); - } else if (OB_FAIL(time_wheel_.init( - CLOG_EVENT_TIME_WHEEL_PRECISION, get_time_wheel_thread_num_(), CLOG_EVENT_TIME_WHEEL_NAME))) { + } else if (OB_FAIL(time_wheel_.init(CLOG_EVENT_TIME_WHEEL_PRECISION, + thread_num, CLOG_EVENT_TIME_WHEEL_NAME))) { CLOG_LOG(ERROR, "ObTimeWheel init fail", K(ret)); } else { is_inited_ = true; + CLOG_LOG(INFO, "ObTimeWheel init success", K(thread_num)); } return ret; } @@ -103,7 +105,10 @@ int ObLogEventScheduler::schedule_task_(ObLogStateEventTaskV2* task, const int64 int64_t ObLogEventScheduler::get_time_wheel_thread_num_() const { - int64_t thread_num = MAX(common::get_cpu_num() / 4, 4); + int64_t thread_num = MAX(common::get_cpu_num()/2, 4); + if (thread_num > common::ObTimeWheel::MAX_THREAD_NUM) { + thread_num = common::ObTimeWheel::MAX_THREAD_NUM; + } return thread_num; } } // namespace clog diff --git a/src/clog/ob_log_membership_mgr_V2.h b/src/clog/ob_log_membership_mgr_V2.h index 4ce0d53f9a..24b417087f 100644 --- a/src/clog/ob_log_membership_mgr_V2.h +++ b/src/clog/ob_log_membership_mgr_V2.h @@ -58,6 +58,7 @@ public: virtual int change_quorum(const common::ObMemberList& curr_member_list, const int64_t curr_quorum, const int64_t new_quorum, obrpc::ObMCLogInfo& log_info) = 0; virtual int64_t get_replica_num() const = 0; + virtual bool is_single_member_mode() const = 0; virtual common::ObReplicaType get_replica_type() const = 0; virtual common::ObReplicaProperty get_replica_property() const = 0; virtual const common::ObMemberList& get_curr_member_list() const = 0; @@ -114,6 +115,10 @@ public: const int64_t new_quorum, obrpc::ObMCLogInfo& log_info) override; virtual int64_t get_replica_num() const override; virtual common::ObReplicaType get_replica_type() const override; + virtual bool is_single_member_mode() const + { + return (1 == replica_num_); + } virtual common::ObReplicaProperty get_replica_property() const override; virtual const common::ObMemberList& get_curr_member_list() const override; virtual int receive_log(const ObLogEntry& log_entry, const common::ObAddr& server, const int64_t cluster_id, diff --git a/src/clog/ob_log_reconfirm.cpp b/src/clog/ob_log_reconfirm.cpp index 37aadabdac..50ded8c29c 100644 --- a/src/clog/ob_log_reconfirm.cpp +++ b/src/clog/ob_log_reconfirm.cpp @@ -126,6 +126,8 @@ ObLogReconfirm::ObLogReconfirm() last_renew_sync_standby_loc_ts_(OB_INVALID_TIMESTAMP), failover_truncate_log_id_(OB_INVALID_ID), max_membership_version_(OB_INVALID_TIMESTAMP), + last_check_start_id_(OB_INVALID_ID), + last_notify_sync_standby_time_(OB_INVALID_TIMESTAMP), is_standby_reconfirm_(false), receive_previous_max_log_ts_(false), is_inited_(false) @@ -195,11 +197,21 @@ int ObLogReconfirm::init_reconfirm_() CLOG_LOG(WARN, "submit_replay_task failed", K_(partition_key), K(ret)); } else if (!state_mgr_->is_cluster_allow_handle_prepare()) { ret = OB_STATE_NOT_MATCH; - CLOG_LOG(WARN, "cluster state do not allow handle prepare", K_(partition_key), K(ret)); + common::ObClusterType cluster_type = INVALID_CLUSTER_TYPE; + share::ServerServiceStatus server_status = share::OBSERVER_INVALID_STATUS; + GCTX.get_cluster_type_and_status(cluster_type, server_status); + CLOG_LOG(ERROR, + "cluster state do not allow handle prepare, it's maybe executing switchover", + K_(partition_key), + K(ret), + K(cluster_type), + K(server_status)); } else if (GCTX.is_primary_cluster() && GCTX.need_sync_to_standby() && !share::ObMultiClusterUtil::is_cluster_private_table(partition_key_.get_table_id()) && - OB_FAIL(cascading_mgr_->leader_try_update_sync_standby_child(false))) { // not trigger renew location + OB_FAIL(cascading_mgr_->leader_try_update_sync_standby_child(true))) { // primary leader cannot get standby leader from location cache, need renew + // if sw is empty, leader need renew location here, + // because state_mgr just triggers renew when sw is not empty const int64_t now = ObTimeUtility::current_time(); if (OB_INVALID_TIMESTAMP == last_renew_sync_standby_loc_ts_ || now - last_renew_sync_standby_loc_ts_ >= PRIMARY_RENEW_LOCATION_TIME_INTERVAL) { @@ -541,7 +553,9 @@ void ObLogReconfirm::reset() last_push_renew_ms_log_ts_ = OB_INVALID_TIMESTAMP; last_renew_sync_standby_loc_ts_ = OB_INVALID_TIMESTAMP; failover_truncate_log_id_ = OB_INVALID_ID; + last_check_start_id_ = OB_INVALID_ID; max_membership_version_ = OB_INVALID_TIMESTAMP; + last_notify_sync_standby_time_ = OB_INVALID_TIMESTAMP; is_standby_reconfirm_ = false; receive_previous_max_log_ts_ = false; } @@ -859,6 +873,68 @@ bool ObLogReconfirm::need_fetch_log_() return bool_ret; } +bool ObLogReconfirm::is_log_task_waiting_standby_ack_(const uint64_t start_id) +{ + bool bool_ret = false; + if (IS_NOT_INIT) { + } else { + const int64_t* ref = NULL; + ObLogTask* log_task = NULL; + if (OB_SUCCESS == (sw_->get_log_task(start_id, log_task, ref))) { + log_task->lock(); + if (GCTX.is_primary_cluster() && GCTX.need_sync_to_standby() && log_task->is_local_majority_flushed() && + !log_task->is_standby_majority_finished()) { + bool_ret = true; + } + log_task->unlock(); + } + if (NULL != ref) { + sw_->revert_log_task(ref); + ref = NULL; + } + } + return bool_ret; +} + +int ObLogReconfirm::primary_leader_try_notify_sync_standby_() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (GCTX.is_primary_cluster() && GCTX.need_sync_to_standby() && + !share::ObMultiClusterUtil::is_cluster_private_table(partition_key_.get_table_id()) && !sw_->is_empty()) { + const uint64_t start_id = sw_->get_start_id(); + const int64_t now = ObTimeUtility::current_time(); + if (last_check_start_id_ != start_id) { + last_check_start_id_ = start_id; + last_notify_sync_standby_time_ = OB_INVALID_TIMESTAMP; + } else if (is_log_task_waiting_standby_ack_(start_id)) { + if (OB_INVALID_TIMESTAMP == last_notify_sync_standby_time_ || + now - last_notify_sync_standby_time_ <= CLOG_RECONFIRM_PRIMARY_NOTIFY_STANDBY_INTERVAL) { + last_notify_sync_standby_time_ = now; + // primary leader update location cache to get sync_standby_child + (void)cascading_mgr_->leader_try_update_sync_standby_child(true); + share::ObCascadMember sync_standby_child = cascading_mgr_->get_sync_standby_child(); + if (!sync_standby_child.is_valid()) { + // sync_standby_child is invalid, skip + } else if (OB_FAIL(log_engine_->reject_server(sync_standby_child.get_server(), + sync_standby_child.get_cluster_id(), + partition_key_, + OB_REPLICA_MSG_TYPE_QUICK_REGISTER))) { + CLOG_LOG(WARN, "reject_server failed", K_(partition_key), K(ret)); + } else { + CLOG_LOG(INFO, + "reject sync_standby_child to trigger quick register success", + K_(partition_key), + K(sync_standby_child)); + } + } + } else { + } + } + return ret; +} + int ObLogReconfirm::reconfirm_log_() { int ret = OB_SUCCESS; @@ -868,6 +944,7 @@ int ObLogReconfirm::reconfirm_log_() if (need_fetch_log_()) { ret = try_fetch_log_(); } + (void)primary_leader_try_notify_sync_standby_(); if (OB_SUCC(ret)) { ret = confirm_log_(); } @@ -930,6 +1007,11 @@ int ObLogReconfirm::reconfirm() if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else { + if (GCTX.is_primary_cluster() && + !share::ObMultiClusterUtil::is_cluster_private_table(partition_key_.get_table_id())) { + // debug sync block point for primary non-private table + DEBUG_SYNC(BLOCK_CLOG_PRIMARY_RECONFIRM); + } uint64_t new_start_id = sw_->get_start_id(); // For newly created partition, follower replica maybe not ready. // Leader need retry send prepare request quickly. @@ -1070,6 +1152,8 @@ int ObLogReconfirm::reconfirm() "start_working log_id", max_flushed_id_ + 1); state_ = FINISHED; + } else { + (void)primary_leader_try_notify_sync_standby_(); } } if (state_ != FINISHED) { diff --git a/src/clog/ob_log_reconfirm.h b/src/clog/ob_log_reconfirm.h index df3b696dac..330c6f9881 100644 --- a/src/clog/ob_log_reconfirm.h +++ b/src/clog/ob_log_reconfirm.h @@ -234,6 +234,8 @@ private: int init_log_info_range_(const uint64_t range_start_id); int try_update_last_ts_(const int64_t log_ts); int try_update_failover_truncate_log_id_(ObLogEntryHeader* header); + bool is_log_task_waiting_standby_ack_(const uint64_t start_id); + int primary_leader_try_notify_sync_standby_(); private: static const int64_t BUF_SIZE = 2048; @@ -270,6 +272,8 @@ private: int64_t last_renew_sync_standby_loc_ts_; uint64_t failover_truncate_log_id_; int64_t max_membership_version_; + uint64_t last_check_start_id_; + int64_t last_notify_sync_standby_time_; bool is_standby_reconfirm_; bool receive_previous_max_log_ts_; bool is_inited_; diff --git a/src/clog/ob_log_req.cpp b/src/clog/ob_log_req.cpp index ec6b2582eb..2199abe7eb 100644 --- a/src/clog/ob_log_req.cpp +++ b/src/clog/ob_log_req.cpp @@ -635,5 +635,37 @@ DEF_TO_STRING(ObCheckRebuildReq) OB_SERIALIZE_MEMBER(ObCheckRebuildReq, start_id_); +void ObRestoreCheckReq::reset() +{ + restore_type_ = OB_CHECK_UNKNOWN; +} + +DEF_TO_STRING(ObRestoreCheckReq) +{ + int64_t pos = 0; + J_OBJ_START(); + J_KV("restore_type", restore_type_); + J_OBJ_END(); + return pos; +} + +OB_SERIALIZE_MEMBER(ObRestoreCheckReq, restore_type_); + +void ObQueryRestoreEndIdResp::reset() +{ + last_restore_log_id_ = OB_INVALID_ID; +} + +DEF_TO_STRING(ObQueryRestoreEndIdResp) +{ + int64_t pos = 0; + J_OBJ_START(); + J_KV("last_restore_log_id", last_restore_log_id_); + J_OBJ_END(); + return pos; +} + +OB_SERIALIZE_MEMBER(ObQueryRestoreEndIdResp, last_restore_log_id_); + }; // end namespace clog }; // end namespace oceanbase diff --git a/src/clog/ob_log_req.h b/src/clog/ob_log_req.h index d43cca7178..893e7db734 100644 --- a/src/clog/ob_log_req.h +++ b/src/clog/ob_log_req.h @@ -72,8 +72,11 @@ enum ObLogReqType { OB_STANDBY_ACK_LOG = 43, OB_STANDBY_QUERY_SYNC_START_ID = 44, OB_STANDBY_SYNC_START_ID_RESP = 45, + OB_RESTORE_CHECK_REQ = 46, + + OB_QUERY_RESTORE_END_ID_RESP = 50, // sentry req - OB_LOG_MAX_REQ_TYPE_ID = 46, + OB_LOG_MAX_REQ_TYPE_ID = 51, }; inline bool is_batch_submit_msg(const ObLogReqType type) @@ -786,6 +789,36 @@ public: TO_STRING_KV(K_(status)); }; +struct ObRestoreCheckReq : public ObINetReq { + OB_UNIS_VERSION(1); + +public: + ObRestoreCheckReq() : restore_type_(OB_CHECK_UNKNOWN) + {} + explicit ObRestoreCheckReq(const ObRestoreCheckType restore_type) : restore_type_(restore_type) + {} + ~ObRestoreCheckReq() + {} + void reset(); + DECLARE_TO_STRING; + ObRestoreCheckType restore_type_; +}; + +struct ObQueryRestoreEndIdResp : public ObINetReq { + OB_UNIS_VERSION(1); + +public: + ObQueryRestoreEndIdResp() : last_restore_log_id_(OB_INVALID_ID) + {} + explicit ObQueryRestoreEndIdResp(const uint64_t last_restore_log_id) : last_restore_log_id_(last_restore_log_id) + {} + ~ObQueryRestoreEndIdResp() + {} + void reset(); + DECLARE_TO_STRING; + uint64_t last_restore_log_id_; +}; + }; // end namespace clog }; // end namespace oceanbase diff --git a/src/clog/ob_log_restore_mgr.cpp b/src/clog/ob_log_restore_mgr.cpp index 8e2f539306..0af9cf18af 100644 --- a/src/clog/ob_log_restore_mgr.cpp +++ b/src/clog/ob_log_restore_mgr.cpp @@ -76,6 +76,7 @@ void ObLogRestoreMgr::destroy() sw_ = NULL; log_engine_ = NULL; mm_ = NULL; + restore_end_id_ack_list_.reset(); CLOG_LOG(INFO, "ObLogRestoreMgr destroy finished", K_(partition_key)); } } @@ -104,6 +105,8 @@ int ObLogRestoreMgr::leader_takeover_() if (OB_FAIL(try_submit_restore_task_())) { CLOG_LOG(WARN, "try_submit_restore_task_ failed", K(ret), K_(partition_key)); } + // reset restore_end_id_ack_list_ + (void)restore_end_id_ack_list_.reset(); } if (OB_FAIL(ret)) { // revoke when failure @@ -205,16 +208,22 @@ int ObLogRestoreMgr::follower_check_state_() CLOG_LOG(WARN, "get_leader_from_loc_cache failed", K_(partition_key), K(tmp_ret)); } } - if (!loc_leader.is_valid() || restore_leader_ == loc_leader) { + if (!loc_leader.is_valid() || restore_leader_ == loc_leader || self_ == loc_leader) { if (now - last_leader_resp_time_ >= RESTORE_LEADER_TIMEOUT_THRESHOLD && !ObReplicaTypeCheck::is_log_replica(mm_->get_replica_type()) && (share::REPLICA_RESTORE_DATA <= archive_restore_state_ && - archive_restore_state_ <= share::REPLICA_RESTORE_WAIT_ALL_DUMPED)) { - CLOG_LOG(INFO, "detect leader dead, self try takeover", K_(partition_key), K_(restore_leader)); + archive_restore_state_ <= share::REPLICA_RESTORE_LOG)) { + // If restore state is greater than REPLICA_RESTORE_LOG, it cannot takeover by RESTORE_LEADER. + // Because a lagged restore replica may increase to_leader_time(is_previous_leader) in meta table, + // which is greater than strong leader's value. + // This may cause root balance thread cannot find leader replica. + CLOG_LOG(INFO, + "detect leader dead, self try takeover", + K_(partition_key), + K_(restore_leader), + K_(archive_restore_state)); (void)leader_takeover_(); } - } else if (self_ == loc_leader) { - (void)leader_takeover_(); } else { restore_leader_ = loc_leader; } @@ -430,5 +439,61 @@ int ObLogRestoreMgr::change_restore_leader(const ObAddr& new_leader) return ret; } +int ObLogRestoreMgr::check_last_restore_id_majority_equal(const common::ObMemberList& member_list) +{ + int ret = OB_SUCCESS; + const int64_t member_num = member_list.get_member_number(); + if (!member_list.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else if (restore_end_id_ack_list_.get_count() >= member_num / 2) { + // already majority, do nothing + CLOG_LOG(INFO, "check_last_restore_id_majority_equal success", K_(partition_key), K(restore_end_id_ack_list_)); + } else { + // no marjoity, return eagain and try send rpc to other members + ret = OB_EAGAIN; + } + const int64_t now = ObTimeUtility::current_time(); + if (OB_EAGAIN == ret && (OB_INVALID_TIMESTAMP == last_query_last_restore_id_time_ || + now - last_query_last_restore_id_time_ >= QUERY_RESTORE_ID_INTERVAL)) { + const int64_t dst_cluster_id = obrpc::ObRpcNetHandler::CLUSTER_ID; + int tmp_ret = OB_SUCCESS; + common::ObAddr cur_server; + for (int64_t i = 0; i < member_num; ++i) { + cur_server.reset(); + if (OB_SUCCESS != (tmp_ret = member_list.get_server_by_index(i, cur_server))) { + CLOG_LOG(WARN, "get_member_by_index failed", K(tmp_ret), K_(partition_key)); + } else if (self_ == cur_server) { + // skip self + } else if (restore_end_id_ack_list_.contains(cur_server)) { + // no need query + } else if (OB_SUCCESS != (tmp_ret = log_engine_->send_restore_check_rqst( + cur_server, dst_cluster_id, partition_key_, OB_CHECK_RESTORE_END_ID))) { + CLOG_LOG(WARN, "send_restore_check_rqst failed", K(tmp_ret), K_(partition_key), K(cur_server)); + } else { + CLOG_LOG(DEBUG, "send_restore_check_rqst success", K(tmp_ret), K_(partition_key), K(cur_server)); + } + } + last_query_last_restore_id_time_ = now; + } + return ret; +} + +int ObLogRestoreMgr::process_restore_end_id_resp(const common::ObAddr& server) +{ + int ret = OB_SUCCESS; + + if (!server.is_valid()) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid arguments", K(ret), K_(partition_key), K(server)); + } else if (restore_end_id_ack_list_.contains(server)) { + // alredy exist, ignore + } else if (OB_FAIL(restore_end_id_ack_list_.add_server(server))) { + CLOG_LOG(WARN, "restore_end_id_ack_list_.add_server failed", K(ret), K_(partition_key), K(server)); + } else { + CLOG_LOG(INFO, "restore_end_id_ack_list_.add_server success", K_(partition_key), K(server)); + } + return ret; +} + } // namespace clog } // namespace oceanbase diff --git a/src/clog/ob_log_restore_mgr.h b/src/clog/ob_log_restore_mgr.h index c8524d6cdb..1a6665ae74 100644 --- a/src/clog/ob_log_restore_mgr.h +++ b/src/clog/ob_log_restore_mgr.h @@ -14,6 +14,8 @@ #define OCEANBASE_CLOG_OB_LOG_RESTORE_MGR_H_ #include "ob_log_define.h" +#include "ob_simple_member_list.h" +#include "common/ob_member_list.h" #include "common/ob_partition_key.h" #include "common/ob_role.h" #include "lib/lock/ob_spin_lock.h" @@ -48,12 +50,14 @@ public: last_renew_location_time_(common::OB_INVALID_TIMESTAMP), last_check_state_time_(common::OB_INVALID_TIMESTAMP), restore_log_finish_ts_(common::OB_INVALID_TIMESTAMP), + last_query_last_restore_id_time_(OB_INVALID_TIMESTAMP), role_(common::INVALID_ROLE), partition_key_(), self_(), restore_leader_(), archive_restore_state_(share::REPLICA_NOT_RESTORE), fetch_log_result_(OB_ARCHIVE_FETCH_LOG_INIT), + restore_end_id_ack_list_(), is_inited_(false) {} ~ObLogRestoreMgr() @@ -103,6 +107,8 @@ public: return restore_log_finish_ts_; } bool is_standby_restore_state() const; + int check_last_restore_id_majority_equal(const common::ObMemberList& member_list); + int process_restore_end_id_resp(const common::ObAddr& server); private: int leader_takeover_(); @@ -122,6 +128,8 @@ private: static const int64_t RESTORE_CHECK_STATE_INTERVAL = 10 * 1000 * 1000l; // interval for follower sending alive req static const int64_t RESTORE_ALIVE_REQ_INTERVAL = 5 * 1000 * 1000l; + // time interval for restore_leader check restore_end_id + static const int64_t QUERY_RESTORE_ID_INTERVAL = 2 * 1000 * 1000l; private: common::ObSpinLock lock_; @@ -136,12 +144,14 @@ private: int64_t last_renew_location_time_; int64_t last_check_state_time_; int64_t restore_log_finish_ts_; + int64_t last_query_last_restore_id_time_; common::ObRole role_; common::ObPartitionKey partition_key_; common::ObAddr self_; common::ObAddr restore_leader_; int16_t archive_restore_state_; ObArchiveFetchLogResult fetch_log_result_; + ObSimpleMemberList restore_end_id_ack_list_; bool is_inited_; }; } // namespace clog diff --git a/src/clog/ob_log_sliding_window.cpp b/src/clog/ob_log_sliding_window.cpp index 86b4516071..58434a5e19 100644 --- a/src/clog/ob_log_sliding_window.cpp +++ b/src/clog/ob_log_sliding_window.cpp @@ -67,13 +67,13 @@ ObLogSlidingWindow::ObLogSlidingWindow() next_index_log_id_(OB_INVALID_ID), scan_next_index_log_id_(OB_INVALID_ID), last_flushed_log_id_(0), - next_index_log_ts_(OB_INVALID_TIMESTAMP), switchover_info_lock_(common::ObLatchIds::CLOG_SWITCH_INFO_LOCK), leader_max_log_info_(), last_replay_log_(), fake_ack_info_mgr_(), last_slide_fid_(OB_INVALID_FILE_ID), check_can_receive_larger_log_warn_time_(OB_INVALID_TIMESTAMP), + set_index_log_submitted_debug_time_(OB_INVALID_TIMESTAMP), insert_log_try_again_warn_time_(OB_INVALID_TIMESTAMP), receive_confirmed_info_warn_time_(OB_INVALID_TIMESTAMP), get_end_log_id_warn_time_(OB_INVALID_TIMESTAMP), @@ -137,7 +137,6 @@ int ObLogSlidingWindow::init(ObLogReplayEngineWrapper* replay_engine, ObILogEngi leader_ts_ = epoch_id; saved_accum_checksum_ = accum_checksum; next_index_log_id_ = last_replay_log_id + 1; - next_index_log_ts_ = last_submit_ts; last_replay_log_.set(last_replay_log_id, last_submit_ts); set_next_replay_log_id_info(last_replay_log_id + 1, last_submit_ts + 1); last_slide_fid_ = 0; @@ -3696,6 +3695,12 @@ bool ObLogSlidingWindow::check_need_fetch_log_(const uint64_t start_log_id, bool K_(partition_key), K(need_check_rebuild)); } + } else if (restore_mgr_->is_standby_restore_state()) { + // standby replica no need fetch log when waiting restore + bool_ret = false; + if (REACH_TIME_INTERVAL(1000 * 1000)) { + CLOG_LOG(INFO, "self is waiting standby restore, no need fetch log", K_(partition_key)); + } } else if (OB_SUCCESS != (tmp_ret = replay_engine_->is_tenant_out_of_memory(partition_key_, is_tenant_out_of_mem))) { CLOG_LOG(WARN, "is_tenant_out_of_memory failed", K(tmp_ret), K_(partition_key)); } else if (is_tenant_out_of_mem) { @@ -3834,8 +3839,6 @@ int ObLogSlidingWindow::do_fetch_log(const uint64_t start_id, const uint64_t end is_fetched = true; if (restore_mgr_->is_archive_restoring()) { fetch_type = OB_FETCH_LOG_RESTORE_FOLLOWER; - } else if (restore_mgr_->is_standby_restore_state()) { - fetch_type = OB_FETCH_LOG_STANDBY_RESTORE; } else if (STANDBY_LEADER == state_mgr_->get_role() && dst_cluster_id != self_cluster_id) { fetch_type = OB_FETCH_LOG_STANDBY_REPLICA; CLOG_LOG(DEBUG, @@ -3886,8 +3889,6 @@ int ObLogSlidingWindow::do_fetch_log(const uint64_t start_id, const uint64_t end get_max_log_id(), "next_ilog_id", ATOMIC_LOAD(&next_index_log_id_), - "next_ilog_ts", - ATOMIC_LOAD(&next_index_log_ts_), "leader", state_mgr_->get_leader(), "parent", @@ -3998,6 +3999,15 @@ int ObLogSlidingWindow::sliding_cb(const int64_t sn, const ObILogExtRingBufferDa try_update_submit_timestamp(submit_timestamp + 1); } } + if (next_index_log_id_ < get_start_id()) { + CLOG_LOG(ERROR, + "next_index_log_id_ is smaller than start_id, unexpected", + K_(partition_key), + "start_id", + get_start_id(), + K_(next_index_log_id), + K(sn)); + } } else { ret = OB_EAGAIN; } @@ -4131,7 +4141,6 @@ int ObLogSlidingWindow::truncate_second_stage(const common::ObBaseStorageInfo& b leader_ts_ = leader_ts; if (next_index_log_id_ <= new_start_id) { next_index_log_id_ = new_start_id; - next_index_log_ts_ = base_storage_info.get_submit_timestamp(); checksum_->set_accum_checksum(new_start_id, accum_checksum); } else { checksum_->set_verify_checksum(new_start_id, accum_checksum); @@ -4200,7 +4209,6 @@ void ObLogSlidingWindow::destroy() leader_ts_ = OB_INVALID_TIMESTAMP; saved_accum_checksum_ = 0; next_index_log_id_ = OB_INVALID_ID; - next_index_log_ts_ = OB_INVALID_TIMESTAMP; fake_ack_info_mgr_.reset(); is_inited_ = false; CLOG_LOG(INFO, "ObLogSlidingWindow::destroy finished", K_(partition_key)); @@ -4219,7 +4227,7 @@ int ObLogSlidingWindow::handle_first_index_log_( ret = OB_INVALID_ARGUMENT; } else if (NULL != log_task && log_task->is_flush_local_finished() && log_task->is_log_confirmed()) { need_check_succeeding_log = false; - if (ATOMIC_LOAD(&next_index_log_id_) > log_id && (test_and_set_index_log_submitted_(log_task))) { + if (ATOMIC_LOAD(&next_index_log_id_) > log_id && (test_and_set_index_log_submitted_(log_id, log_task))) { if (do_pop && state_mgr_->can_slide_sw() && OB_SUCCESS != (tmp_ret = sw_.pop(false, CLOG_MAX_REPLAY_TIMEOUT, is_replay_failed, this)) && OB_CLOG_SLIDE_TIMEOUT != tmp_ret) { @@ -4292,7 +4300,7 @@ int ObLogSlidingWindow::handle_succeeding_index_log_( return ret; } -bool ObLogSlidingWindow::test_and_set_index_log_submitted_(ObLogTask* log_task) +bool ObLogSlidingWindow::test_and_set_index_log_submitted_(const uint64_t log_id, ObLogTask* log_task) { // caller guarantees log_task is not NULL bool bool_ret = false; @@ -4303,6 +4311,15 @@ bool ObLogSlidingWindow::test_and_set_index_log_submitted_(ObLogTask* log_task) } else { log_task->set_index_log_submitted(); bool_ret = true; + if (partition_reach_time_interval(30 * 1000 * 1000, set_index_log_submitted_debug_time_)) { + CLOG_LOG(INFO, + "set_index_log_submitted", + K(bool_ret), + K_(partition_key), + K_(next_index_log_id), + K(log_id), + K(*log_task)); + } } log_task->unlock(); } @@ -4320,6 +4337,14 @@ bool ObLogSlidingWindow::test_and_submit_index_log_(const uint64_t log_id, ObLog log_task->lock(); if (log_task->is_index_log_submitted()) { bool_ret = false; + CLOG_LOG(INFO, + "is_index_log_submitted is already true", + K(ret), + K_(partition_key), + K(bool_ret), + K_(next_index_log_id), + K(log_id), + K(*log_task)); } else if (OB_SUCCESS == (ret = submit_index_log_(log_id, log_task, accum_checksum))) { log_task->set_index_log_submitted(); bool_ret = true; @@ -4356,11 +4381,6 @@ bool ObLogSlidingWindow::test_and_submit_index_log_(const uint64_t log_id, ObLog const int64_t log_submit_ts = log_task->get_submit_timestamp(); // inc next_index_log_id after Leader submit_confirm_info ATOMIC_INC(&next_index_log_id_); - if (OB_INVALID_TIMESTAMP != log_submit_ts) { - ATOMIC_STORE(&next_index_log_ts_, log_submit_ts); - } else { - CLOG_LOG(WARN, "log_submit_ts is invalid", K_(partition_key), K(log_task)); - } // follower send confirmed clog to standby children if (OB_SUCCESS != (tmp_ret = follower_send_log_to_standby_children_(log_id, log_task))) { @@ -4473,7 +4493,6 @@ int ObLogSlidingWindow::set_next_index_log_id(const uint64_t log_id, const int64 const int64_t now = ObTimeUtility::current_time(); ATOMIC_STORE(&next_index_log_id_, log_id); ATOMIC_STORE(&scan_next_index_log_id_, log_id); - ATOMIC_STORE(&next_index_log_ts_, now); checksum_->set_accum_checksum(accum_checksum); } } @@ -4484,9 +4503,7 @@ int ObLogSlidingWindow::set_next_index_log_id(const uint64_t log_id, const int64 "next_index_log_id", ATOMIC_LOAD(&next_index_log_id_), K(log_id), - K(accum_checksum), - "next_index_log_ts", - ATOMIC_LOAD(&next_index_log_ts_)); + K(accum_checksum)); return ret; } diff --git a/src/clog/ob_log_sliding_window.h b/src/clog/ob_log_sliding_window.h index 996f7f76a1..6d404bff15 100644 --- a/src/clog/ob_log_sliding_window.h +++ b/src/clog/ob_log_sliding_window.h @@ -99,7 +99,6 @@ public: virtual void start_fetch_log_from_leader(bool& is_fetched) = 0; virtual int get_next_replay_log_timestamp(int64_t& next_replay_log_timestamp) const = 0; virtual uint64_t get_next_index_log_id() const = 0; - virtual int64_t get_next_index_log_ts() = 0; virtual int leader_active() = 0; virtual int leader_takeover() = 0; virtual int leader_revoke() = 0; @@ -129,6 +128,7 @@ public: virtual int try_update_submit_timestamp(const int64_t base_ts) = 0; virtual int64_t get_last_submit_timestamp() const = 0; virtual uint64_t get_max_confirmed_log_id() const = 0; + virtual bool is_empty() const = 0; }; class ObILogSWForMS { @@ -151,7 +151,6 @@ public: const bool is_batch_commited) = 0; virtual int alloc_log_id(const int64_t base_timestamp, uint64_t& log_id, int64_t& submit_timestamp) = 0; virtual uint64_t get_next_index_log_id() const = 0; - virtual int64_t get_next_index_log_ts() = 0; virtual int do_fetch_log(const uint64_t start_id, const uint64_t end_id, const enum ObFetchLogExecuteType& fetch_log_execute_type, bool& is_fetched) = 0; virtual int set_log_confirmed(const uint64_t log_id, const bool batch_committed) = 0; @@ -425,10 +424,6 @@ public: { return ATOMIC_LOAD(&next_index_log_id_); } - int64_t get_next_index_log_ts() override - { - return ATOMIC_LOAD(&next_index_log_ts_); - } int submit_replay_task(const bool need_async, bool& is_replayed, bool& is_replay_failed) override; void destroy(); int alloc_log_id(const int64_t base_timestamp, uint64_t& log_id, int64_t& submit_timestamp) override; @@ -608,7 +603,7 @@ private: int handle_succeeding_index_log_(const uint64_t log_id, const bool need_check_succeeding_log, const bool do_pop); int submit_index_log_(const uint64_t log_id, const ObLogTask* log_task, int64_t& accum_checksum); - bool test_and_set_index_log_submitted_(ObLogTask* log_task); + bool test_and_set_index_log_submitted_(const uint64_t log_id, ObLogTask* log_task); bool test_and_submit_index_log_(const uint64_t log_id, ObLogTask* log_task, int& ret); int try_submit_mc_success_cb_(const ObLogType& log_type, const uint64_t log_id, const char* log_buf, const int64_t log_buf_len, const common::ObProposalID& proposal_id); @@ -664,13 +659,13 @@ private: uint64_t next_index_log_id_; uint64_t scan_next_index_log_id_; uint64_t last_flushed_log_id_; - int64_t next_index_log_ts_; mutable common::ObSpinLock switchover_info_lock_; // protect leader_max_log_info_ LeaderMaxLogInfo leader_max_log_info_; LogIdTsPair last_replay_log_; FakeAckInfoMgr fake_ack_info_mgr_; file_id_t last_slide_fid_; mutable int64_t check_can_receive_larger_log_warn_time_; + mutable int64_t set_index_log_submitted_debug_time_; mutable int64_t insert_log_try_again_warn_time_; mutable int64_t receive_confirmed_info_warn_time_; mutable int64_t get_end_log_id_warn_time_; diff --git a/src/clog/ob_log_state_mgr.cpp b/src/clog/ob_log_state_mgr.cpp index 8617840681..22fd581058 100644 --- a/src/clog/ob_log_state_mgr.cpp +++ b/src/clog/ob_log_state_mgr.cpp @@ -59,6 +59,7 @@ ObLogStateMgr::ObLogStateMgr() prepared_primary_leader_(), region_(DEFAULT_REGION_NAME), leader_epoch_(OB_INVALID_TIMESTAMP), + prev_leader_epoch_(OB_INVALID_TIMESTAMP), self_(), lock_(ObLatchIds::CLOG_STAT_MGR_LOCK), region_lock_(ObLatchIds::CLOG_LOCALITY_LOCK), @@ -67,7 +68,8 @@ ObLogStateMgr::ObLogStateMgr() start_role_change_time_(OB_INVALID_TIMESTAMP), last_check_start_id_(OB_INVALID_ID), last_check_start_id_time_(OB_INVALID_TIMESTAMP), - // FIXME(): Need to define a separate id + last_check_restore_state_time_(OB_INVALID_TIMESTAMP), + last_check_standby_ms_time_(OB_INVALID_TIMESTAMP), fetch_state_lock_(ObLatchIds::CLOG_STAT_MGR_LOCK), curr_fetch_log_interval_(CLOG_FETCH_LOG_INTERVAL_LOWER_BOUND), curr_round_first_fetch_(true), @@ -145,6 +147,7 @@ int ObLogStateMgr::init(ObILogSWForStateMgr* sw, ObILogReconfirm* reconfirm, ObI prepared_primary_leader_.reset(); region_ = DEFAULT_REGION_NAME; leader_epoch_ = OB_INVALID_TIMESTAMP; + prev_leader_epoch_ = OB_INVALID_TIMESTAMP; self_ = self; freeze_version_ = freeze_version; start_role_change_time_ = OB_INVALID_TIMESTAMP; @@ -945,17 +948,42 @@ int ObLogStateMgr::switch_state(bool& need_retry) return ret; } +int ObLogStateMgr::standby_follower_try_trigger_restore_() +{ + // caller guarantees to hold state lock + int ret = OB_SUCCESS; + const int64_t now = ObTimeUtility::current_time(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_UNLIKELY(!restore_mgr_->is_standby_restore_state())) { + // no need check + } else if (!leader_.is_valid()) { + // leader is invalid, skip + } else if (OB_INVALID_TIMESTAMP == last_check_restore_state_time_ || + now - last_check_restore_state_time_ >= CHECK_STANDBY_RESTORE_STATE_INTERVAL) { + if (OB_FAIL(log_engine_->send_restore_check_rqst( + leader_.get_server(), leader_.get_cluster_id(), partition_key_, OB_CHECK_STANDBY_RESTORE))) { + CLOG_LOG(WARN, "send_restore_check_rqst failed", K(ret), K_(partition_key), K_(leader)); + } else { + last_check_restore_state_time_ = now; + } + } else { + } + return ret; +} + bool ObLogStateMgr::check_sliding_window_state() { bool state_changed = false; const int64_t now = ObTimeUtility::current_time(); if (!restore_mgr_->is_archive_restoring()) { - share::ObCascadMember new_primary_leader; + ObCascadMember new_leader; + ObCascadMember new_primary_leader; if (is_follower_replay_()) { state_changed = follower_replay_need_switch_(); } else if (is_leader_revoking_() || is_standby_leader_revoking_()) { state_changed = leader_revoking_need_switch_(); - } else if (need_update_leader_()) { + } else if (need_update_leader_(new_leader)) { state_changed = true; } else if (need_update_primary_leader_(new_primary_leader)) { state_changed = true; @@ -974,11 +1002,23 @@ bool ObLogStateMgr::check_sliding_window_state() } } else if (is_follower_active_()) { (void)check_and_try_fetch_log_(); + // standby follower try trigger restore + if (restore_mgr_->is_standby_restore_state()) { + (void)standby_follower_try_trigger_restore_(); + } } else if (is_standby_leader_active_()) { (void)check_and_try_fetch_log_(); - if (GCTX.is_in_flashback_state() || GCTX.is_primary_cluster() || GCTX.is_in_cleanup_state()) { + // check if standby_leader need switch role + bool is_error = false; + if (leader_active_need_switch_(is_error)) { state_changed = true; } + // standby_leader check whether renew_ms_log has been sent to all follower + (void)standby_leader_check_renew_ms_log_state_(); + if (GCTX.is_sync_level_on_standby()) { + // 处于强同步模式的备库leader需要check protection_level + (void)standby_leader_check_protection_level_(); + } } else { // do nothing; } @@ -1359,10 +1399,13 @@ void ObLogStateMgr::destroy() region_.reset(); idc_.reset(); leader_epoch_ = OB_INVALID_TIMESTAMP; + prev_leader_epoch_ = OB_INVALID_TIMESTAMP; self_.reset(); freeze_version_ = ObVersion(); last_check_start_id_ = OB_INVALID_ID; last_check_start_id_time_ = OB_INVALID_TIMESTAMP; + last_check_restore_state_time_ = OB_INVALID_TIMESTAMP; + last_check_standby_ms_time_ = OB_INVALID_TIMESTAMP; curr_fetch_log_interval_ = CLOG_FETCH_LOG_INTERVAL_LOWER_BOUND; curr_round_first_fetch_ = true; curr_round_fetch_begin_time_ = OB_INVALID_TIMESTAMP; @@ -1458,7 +1501,9 @@ int ObLogStateMgr::replay_to_follower_active_() ret = OB_ERR_UNEXPECTED; ; CLOG_LOG(ERROR, "election_ is NULL", K_(partition_key), K(ret)); - } else if (ObReplicaTypeCheck::is_paxos_replica_V2(mm_->get_replica_type()) && !election_started_) { + } else if (ObReplicaTypeCheck::is_paxos_replica_V2(mm_->get_replica_type()) && !election_started_ && + !restore_mgr_->is_archive_restoring()) { // physical restore replica cannot start election before set + // member_list int64_t before_start_partition = common::ObTimeUtility::current_time(); if (OB_FAIL(election_->start())) { CLOG_LOG(WARN, "election_ start failed", K_(partition_key), K(ret)); @@ -1525,17 +1570,28 @@ int ObLogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch, leader_ = new_leader; previous_leader_ = previous_leader; leader_epoch_ = new_leader_epoch; - last_leader_active_time_ = ObTimeUtility::current_time(); start_role_change_time_ = common::ObTimeUtility::current_time(); (void)standby_update_protection_level(); (void)mm_->reset_renew_ms_log_task(); - if (OB_FAIL(partition_service_->submit_pt_update_role_task(partition_key_))) { - CLOG_LOG(WARN, "ps_cb->submit_pt_update_task failed", K_(partition_key), K(ret)); - } } CLOG_LOG(INFO, "follower_active_to_reconfirm, switch role to STANDBY_LEADER", K_(partition_key), K_(role)); } else { - if (OB_FAIL(sw_->clean_log())) { + // firstly check whether leader_epoch is changed + if (OB_INVALID_TIMESTAMP != prev_leader_epoch_ && prev_leader_epoch_ >= new_leader_epoch) { + // switchover/failover mutiple times may lead to this case + ret = OB_EAGAIN; + CLOG_LOG(INFO, + "new_leader_epoch is not greater than previous, need change leader to self and retry", + K_(partition_key), + K(ret), + K_(prev_leader_epoch), + K(new_leader_epoch)); + // change leader to self, which can increase epoch + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = election_->change_leader_to_self())) { + CLOG_LOG(WARN, "change_leader_to_self failed", K(tmp_ret), K_(partition_key)); + } + } else if (OB_FAIL(sw_->clean_log())) { CLOG_LOG(ERROR, "clean sliding window failed", K_(partition_key), K(ret)); } else { reset_status_(); @@ -1545,11 +1601,7 @@ int ObLogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch, leader_ = new_leader; previous_leader_ = previous_leader; leader_epoch_ = new_leader_epoch; - last_leader_active_time_ = ObTimeUtility::current_time(); start_role_change_time_ = common::ObTimeUtility::current_time(); - if (OB_FAIL(partition_service_->submit_pt_update_role_task(partition_key_))) { - CLOG_LOG(WARN, "ps_cb->submit_pt_update_task failed", K_(partition_key), K(ret)); - } (void)try_renew_sync_standby_location(); } } @@ -1642,11 +1694,10 @@ int ObLogStateMgr::taking_over_to_leader_active_() { int ret = OB_SUCCESS; const int64_t reconfirm_to_active_cost = ObTimeUtility::current_time() - reconfirm_start_time_; - bool is_standby_tbl = is_can_elect_standby_leader(); if (OB_FAIL(on_leader_active_())) { CLOG_LOG(WARN, "on_leader_active_ failed, try again", K_(partition_key), K(ret)); - } else if (!is_standby_tbl && OB_FAIL(sw_->leader_active())) { + } else if (common::is_strong_leader(role_) && OB_FAIL(sw_->leader_active())) { CLOG_LOG(ERROR, "sw leader_active failed", K(ret), K(partition_key_)); } else { // do nothing @@ -1659,6 +1710,7 @@ int ObLogStateMgr::taking_over_to_leader_active_() } // role_ = LEADER; state_ = ACTIVE; + last_leader_active_time_ = ObTimeUtility::current_time(); start_role_change_time_ = OB_INVALID_TIMESTAMP; } @@ -1667,8 +1719,7 @@ int ObLogStateMgr::taking_over_to_leader_active_() K(ret), K_(partition_key), K(reconfirm_to_active_cost), - K(last_leader_active_time_), - K(is_standby_tbl)); + K(last_leader_active_time_)); return ret; } @@ -1694,6 +1745,8 @@ int ObLogStateMgr::leader_active_to_revoking_() } else if (OB_FAIL(on_leader_revoke_())) { CLOG_LOG(WARN, "on_leader_revoke_ failed, try again", K_(partition_key), K(ret)); } else { + // record current leader_epoch + prev_leader_epoch_ = leader_epoch_; reset_status_(); } } else { @@ -1726,6 +1779,8 @@ void ObLogStateMgr::reset_status_() mm_->reset_status(); last_check_start_id_ = sw_->get_start_id(); last_check_start_id_time_ = ObClockGenerator::getClock(); + last_check_restore_state_time_ = OB_INVALID_TIMESTAMP; + last_check_standby_ms_time_ = OB_INVALID_TIMESTAMP; reset_fetch_state(); last_wait_standby_ack_start_time_ = OB_INVALID_TIMESTAMP; last_check_pending_replay_cnt_ = 0; @@ -1875,8 +1930,13 @@ bool ObLogStateMgr::leader_revoking_need_switch_() bool ObLogStateMgr::follower_active_need_switch_() { bool state_changed = false; - if (need_update_leader_()) { + ObCascadMember new_leader; + if (need_update_leader_(new_leader)) { state_changed = true; + } else if (new_leader.is_valid() && self_ == new_leader.get_server()) { + // Self is new_leader, but my role_ is FOLLOWER, need update role_. + state_changed = true; + } else { } return state_changed; } @@ -1922,7 +1982,8 @@ int ObLogStateMgr::get_pending_replay_count_(int64_t& pending_replay_cnt) const bool ObLogStateMgr::is_reconfirm_role_change_or_sync_timeout_() { bool bool_ret = false; - if (need_update_leader_()) { + ObCascadMember new_leader; + if (need_update_leader_(new_leader)) { bool_ret = true; } else { const uint64_t start_id = sw_->get_start_id(); @@ -1934,7 +1995,8 @@ bool ObLogStateMgr::is_reconfirm_role_change_or_sync_timeout_() } else { int tmp_ret = OB_SUCCESS; bool is_waiting_standby_ack = false; - // 1) primary leader counts the time waiting for the standby ack + // 1) Primary leader counts the time waiting for the standby ack. + // If is_waiting_standby_ack is true, primary leader need wait until standby ack arrives. bool unused_bool = false; (void)primary_leader_check_start_log_state_(start_id, is_waiting_standby_ack, unused_bool); // 2) check if log sync timeout @@ -1942,11 +2004,9 @@ bool ObLogStateMgr::is_reconfirm_role_change_or_sync_timeout_() bool is_sw_timeout = false; int64_t pending_replay_cnt = 0; if (now - last_check_start_id_time_ > CLOG_LEADER_RECONFIRM_SYNC_TIMEOUT) { - // start log is waiting more than 10s - if (!sw_->is_empty()) { - // sw is not empty, start log sync timeout - is_sw_timeout = true; - } + // start log of sw is timeout, + // standby reconfirm timeout can be ditected even though sw is empty. + is_sw_timeout = true; int tmp_ret = OB_SUCCESS; if (OB_SUCCESS != (tmp_ret = get_pending_replay_count_(pending_replay_cnt))) { CLOG_LOG(ERROR, "get_pending_replay_count_ failed", K_(partition_key), K(tmp_ret)); @@ -1982,15 +2042,19 @@ bool ObLogStateMgr::is_reconfirm_role_change_or_sync_timeout_() CLOG_LOG(ERROR, "is_reconfirm_role_change_or_sync_timeout_", K_(partition_key), + K_(role), K(now), K(last_check_start_id_time_), K(max_log_id), K(start_id), K(is_wait_replay)); } else { + // partition does not exist, it no need revoke + bool_ret = false; CLOG_LOG(WARN, "is_reconfirm_role_change_or_sync_timeout_, partition has been dropped", K_(partition_key), + K_(role), K(now), K(last_check_start_id_time_), K(max_log_id), @@ -2003,6 +2067,7 @@ bool ObLogStateMgr::is_reconfirm_role_change_or_sync_timeout_() CLOG_LOG(INFO, "leader reconfirm need wait", K_(partition_key), + K_(role), K(last_check_pending_replay_cnt_), K(pending_replay_cnt), K(now), @@ -2077,8 +2142,9 @@ bool ObLogStateMgr::leader_taking_over_need_switch_() bool ObLogStateMgr::leader_active_need_switch_(bool& is_error) { bool state_changed = false; + ObCascadMember new_leader; is_error = false; - if (need_update_leader_()) { + if (need_update_leader_(new_leader)) { state_changed = true; } else if (LEADER == role_) { bool need_switch_leader_to_self = false; @@ -2087,7 +2153,7 @@ bool ObLogStateMgr::leader_active_need_switch_(bool& is_error) is_error = true; } else { // state_changed is false, check cluster type and status - // if primary cluster switches to standby, clog need do role chagne: leader->follower->standby_leader + // if primary cluster switches to standby,clog need do role chagne: leader->follower->standby_leader if (!ObMultiClusterUtil::is_cluster_private_table(partition_key_.get_table_id()) && GCTX.is_in_standby_active_state()) { state_changed = true; @@ -2099,8 +2165,8 @@ bool ObLogStateMgr::leader_active_need_switch_(bool& is_error) // primary leader switch leader to self, trigger start log reach majority after // mode degraded (max_availability). int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = election_->change_leader_to_self_async())) { - CLOG_LOG(WARN, "change_leader_to_self_async failed", K(tmp_ret), K_(partition_key)); + if (OB_SUCCESS != (tmp_ret = election_->change_leader_to_self())) { + CLOG_LOG(WARN, "change_leader_to_self failed", K(tmp_ret), K_(partition_key)); } } } else if (STANDBY_LEADER == role_) { @@ -2116,8 +2182,9 @@ bool ObLogStateMgr::leader_active_need_switch_(bool& is_error) void ObLogStateMgr::primary_leader_check_start_log_state_( const uint64_t start_id, bool& is_waiting_standby_ack, bool& need_switch_leader_to_self) { - // primary leader check if log sync timeout because of standby cluster in max availablility mode - if (LEADER == role_ && GCTX.is_in_max_availability_mode() && + // 1) Primary leader check if log sync timeout because of standby cluster in max availablility mode + // 2) If it is in max protection mode, assign value for is_waiting_standby_ack. + if (LEADER == role_ && (GCTX.need_sync_to_standby() || GCTX.is_in_max_availability_mode()) && !ObMultiClusterUtil::is_cluster_private_table(partition_key_.get_table_id())) { const int64_t* ref = NULL; ObLogTask* log_task = NULL; @@ -2126,7 +2193,11 @@ void ObLogStateMgr::primary_leader_check_start_log_state_( if (GCTX.need_sync_to_standby() && log_task->is_local_majority_flushed() && !log_task->is_standby_majority_finished()) { + // max_protect or max_availability mode is_waiting_standby_ack = true; + } + + if (is_waiting_standby_ack && GCTX.need_sync_to_standby() && GCTX.is_in_max_availability_mode()) { const int64_t net_timeout = GCTX.get_sync_standby_net_timeout(); const int64_t now = ObTimeUtility::current_time(); if (OB_INVALID_TIMESTAMP == last_wait_standby_ack_start_time_) { @@ -2155,7 +2226,8 @@ void ObLogStateMgr::primary_leader_check_start_log_state_( K(*log_task), K_(last_wait_standby_ack_start_time)); } - if (!GCTX.need_sync_to_standby() && log_task->is_local_majority_flushed()) { + + if (!GCTX.need_sync_to_standby() && GCTX.is_in_max_availability_mode() && log_task->is_local_majority_flushed()) { // primary leader in max_availability mode, already degraded to max_perf level // start log reach majority, need swtich leader to self need_switch_leader_to_self = true; @@ -2192,6 +2264,14 @@ int ObLogStateMgr::primary_process_protect_mode_switch() last_check_start_id_time_ = now; } last_wait_standby_ack_start_time_ = OB_INVALID_TIMESTAMP; + // check if need re-takeover + if (!GCTX.need_sync_to_standby() && mm_->is_single_member_mode() && !sw_->is_empty()) { + // single member primary leader switch leader to self when it switches to async mode and its sw is not empty, + // which can trigger start log reach majority after mode switch. + if (OB_FAIL(election_->change_leader_to_self())) { + CLOG_LOG(WARN, "change_leader_to_self failed", K(ret), K_(partition_key)); + } + } } return ret; } @@ -2233,14 +2313,36 @@ bool ObLogStateMgr::check_leader_sliding_window_not_slide_(bool& need_switch_lea } } else { state_changed = true; - CLOG_LOG(ERROR, - "leader_active_need_switch_", - K_(partition_key), - K(now), - K(last_check_start_id_time_), - "sw max_log_id", - sw_->get_max_log_id(), - K(start_id)); + // log sync timeout, check if partition exist + bool is_exist = true; + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = partition_service_->check_partition_exist(partition_key_, is_exist))) { + CLOG_LOG(WARN, "check_partition_exist failed", K_(partition_key), K(tmp_ret)); + } + if (is_exist) { + CLOG_LOG(ERROR, + "leader_active_need_switch_", + K_(partition_key), + K(now), + K(last_check_start_id_time_), + "sw max_log_id", + sw_->get_max_log_id(), + K(start_id)); + } else { + // partition does not exist, it no need revoke + state_changed = false; + // When partition is doing gc, initial leader may destroy firstly, + // new elected leader may takeover success and generate checkpoint log, + // but if other followers have destroyed, checkpoint log sync will timeout. + CLOG_LOG(WARN, + "leader_active_need_switch_, but partition has been dropped", + K_(partition_key), + K(now), + K(last_check_start_id_time_), + "sw max_log_id", + sw_->get_max_log_id(), + K(start_id)); + } report_start_id_trace(start_id); } } else { @@ -2292,7 +2394,7 @@ bool ObLogStateMgr::check_leader_sliding_window_not_slide_(bool& need_switch_lea } } } - // 2) primary leader count time for waiting standby ack in max protection mode + // 2) primary leader check timeout for max availability mode bool unused_bool = false; (void)primary_leader_check_start_log_state_(start_id, unused_bool, need_switch_leader_to_self); } @@ -2372,11 +2474,10 @@ bool ObLogStateMgr::need_update_primary_leader_(share::ObCascadMember& new_prima return bool_ret; } -bool ObLogStateMgr::need_update_leader_() +bool ObLogStateMgr::need_update_leader_(ObCascadMember& new_leader) { bool bool_ret = false; int ret = OB_SUCCESS; - ObCascadMember new_leader; ObAddr new_elect_real_leader; // ignore it ObAddr previous_leader; // ignore it int64_t new_leader_epoch = OB_INVALID_TIMESTAMP; @@ -2389,10 +2490,26 @@ bool ObLogStateMgr::need_update_leader_() if (leader_.is_valid()) { // leader is valid now but get failed, return true bool_ret = true; + CLOG_LOG(WARN, + "get_elect_leader_ failed, leader_ is valid, need update", + K(ret), + K_(partition_key), + K_(self), + K(leader_), + K(bool_ret)); } } else { if (new_leader.get_server() != leader_.get_server() || new_leader_epoch != leader_epoch_) { bool_ret = true; + CLOG_LOG(WARN, + "leader or epoch has changed, need update", + K_(partition_key), + K(bool_ret), + K(new_leader), + K(new_leader_epoch), + K(leader_), + K(leader_epoch_), + K_(self)); } } @@ -2703,7 +2820,9 @@ int ObLogStateMgr::try_update_leader_from_loc_cache() int tmp_ret = OB_SUCCESS; const int64_t now = ObTimeUtility::current_time(); - if (STANDBY_LEADER == role_) { + if (LEADER == role_) { + // self is LEADER, no need check location cache + } else if (STANDBY_LEADER == role_) { // standby_leader refresh primary leader if (need_renew && (OB_INVALID_TIMESTAMP == last_renew_primary_leader_time_ || now - last_renew_primary_leader_time_ >= RENEW_LOCATION_TIME_INTERVAL)) { @@ -2745,9 +2864,11 @@ int ObLogStateMgr::try_update_leader_from_loc_cache() CLOG_LOG(WARN, "nonblock_get_leader_by_election_from_loc_cache failed", K_(partition_key), K(tmp_ret)); } } else { + ObCascadMember old_leader = leader_; new_cluster_id = get_self_cluster_id(); ObCascadMember new_leader_member(new_leader, new_cluster_id); leader_ = new_leader_member; + CLOG_LOG(INFO, "update leader according to location cache", K(old_leader), "new leader", leader_); } } @@ -3128,6 +3249,7 @@ int ObLogStateMgr::standby_set_election_leader(const common::ObAddr& leader, con if (OB_INVALID_TIMESTAMP != leader_epoch) { if (leader == self_) { leader_epoch_ = leader_epoch; + is_new_created_leader_ = true; } else { leader_epoch_ = OB_INVALID_TIMESTAMP; } @@ -3221,7 +3343,26 @@ int ObLogStateMgr::try_send_sync_start_id_request_() return ret; } -int ObLogStateMgr::standby_leader_check_protection_level() +int ObLogStateMgr::standby_leader_check_renew_ms_log_state_() +{ + int ret = OB_SUCCESS; + const int64_t now = ObTimeUtility::current_time(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_UNLIKELY(STANDBY_LEADER != role_)) { + // self is not STANDBY_LEADER, skip + } else if (ATOMIC_LOAD(&last_check_standby_ms_time_) == OB_INVALID_TIMESTAMP || + now - ATOMIC_LOAD(&last_check_standby_ms_time_) >= STANDBY_CHECK_MS_INTERVAL) { + last_check_standby_ms_time_ = now; + if (OB_FAIL(mm_->check_renew_ms_log_sync_state())) { + CLOG_LOG(WARN, "check_renew_ms_log_sync_state failed", K(ret), K_(partition_key)); + } + } else { + } + return ret; +} + +int ObLogStateMgr::standby_leader_check_protection_level_() { int ret = OB_SUCCESS; diff --git a/src/clog/ob_log_state_mgr.h b/src/clog/ob_log_state_mgr.h index 3bf67a8c3b..3d0ab9de16 100644 --- a/src/clog/ob_log_state_mgr.h +++ b/src/clog/ob_log_state_mgr.h @@ -525,7 +525,6 @@ public: bool has_valid_member_list() const override; virtual int try_renew_sync_standby_location() override; int standby_update_protection_level(); - int standby_leader_check_protection_level(); int handle_sync_start_id_resp( const ObAddr& server, const int64_t cluster_id, const int64_t original_send_ts, const uint64_t sync_start_id); int get_standby_protection_level(uint32_t& protection_level) const; @@ -588,7 +587,7 @@ private: bool check_leader_sliding_window_not_slide_(bool& need_switch_leader_to_self); void check_and_try_fetch_log_(); - bool need_update_leader_(); + bool need_update_leader_(share::ObCascadMember& new_leader); bool follower_need_update_role_(share::ObCascadMember& new_leader, common::ObAddr& elect_real_leader, common::ObAddr& previous_leader, int64_t& new_leader_epoch); void set_leader_and_epoch_(const share::ObCascadMember& new_leader, const int64_t new_leader_epoch); @@ -611,6 +610,9 @@ private: const uint64_t start_id, bool& is_waiting_standby_ack, bool& need_switch_leader_to_self); bool need_fetch_log_() const; void reset_fetch_state_(); + int standby_leader_check_protection_level_(); + int standby_leader_check_renew_ms_log_state_(); + int standby_follower_try_trigger_restore_(); private: typedef common::SpinRWLock RWLock; @@ -620,6 +622,8 @@ private: static const int64_t START_PARTITION_WARN_INTERVAL = 1 * 1000; static const int64_t BUF_SIZE = 2048; static const int64_t STANDBY_CHECK_SYNC_START_ID_INTERVAL = 3l * 1000l * 1000l; + static const int64_t STANDBY_CHECK_MS_INTERVAL = 60l * 1000l * 1000l; + static const int64_t CHECK_STANDBY_RESTORE_STATE_INTERVAL = 2l * 1000l * 1000l; ObILogSWForStateMgr* sw_; ObILogReconfirm* reconfirm_; @@ -650,6 +654,7 @@ private: common::ObRegion region_; common::ObIDC idc_; int64_t leader_epoch_; + int64_t prev_leader_epoch_; common::ObAddr self_; mutable common::ObSpinLock lock_; // protect freeze_version_; mutable RWLock region_lock_; @@ -659,6 +664,8 @@ private: uint64_t last_check_start_id_; int64_t last_check_start_id_time_; + int64_t last_check_restore_state_time_; + int64_t last_check_standby_ms_time_; // congestion control mutable common::ObSpinLock fetch_state_lock_; diff --git a/src/clog/ob_partition_log_packet_handler.cpp b/src/clog/ob_partition_log_packet_handler.cpp index 0ec88d1c81..c6630b4c17 100644 --- a/src/clog/ob_partition_log_packet_handler.cpp +++ b/src/clog/ob_partition_log_packet_handler.cpp @@ -184,6 +184,12 @@ int ObPartitionLogPacketHandler::handle_single_request(ObLogReqContext& ctx) case OB_RENEW_MS_CONFIRMED_INFO_REQ: ret = receive_renew_ms_log_confirmed_info(log_service, ctx); break; + case OB_RESTORE_CHECK_REQ: + ret = process_restore_check_req(log_service, ctx); + break; + case OB_QUERY_RESTORE_END_ID_RESP: + ret = process_query_restore_end_id_resp(log_service, ctx); + break; default: ret = OB_ERR_UNEXPECTED; break; @@ -296,7 +302,7 @@ int ObPartitionLogPacketHandler::receive_renew_ms_log(LogService* log_service, C { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - ObPushLogReq req; + ObPushMsLogReq req; ObLogEntry entry; int64_t pos = 0; if (OB_ISNULL(log_service) || !ctx.is_valid()) { @@ -551,6 +557,42 @@ int ObPartitionLogPacketHandler::receive_confirmed_info(LogService* log_service, return ret; } +int ObPartitionLogPacketHandler::process_restore_check_req(LogService* log_service, Context& ctx) +{ + int ret = OB_SUCCESS; + ObRestoreCheckReq req; + if (OB_ISNULL(log_service) || !ctx.is_valid()) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", K(ret), K(log_service), K(ctx)); + } else if (OB_FAIL(deserialize(ctx, req))) { + CLOG_LOG(WARN, "ObRestoreCheckReq deserialize error", K(ret), K(log_service), K(ctx)); + } else { + if (OB_FAIL(log_service->process_restore_check_req(ctx.server_, ctx.cluster_id_, req.restore_type_))) { + CLOG_LOG(WARN, "process_restore_check_req failed", K(ret), K(log_service), K(ctx), K(req)); + } + } + CLOG_LOG(DEBUG, "process_restore_check_req finished", K(ret), K(log_service), K(ctx), K(req)); + return ret; +} + +int ObPartitionLogPacketHandler::process_query_restore_end_id_resp(LogService* log_service, Context& ctx) +{ + int ret = OB_SUCCESS; + ObQueryRestoreEndIdResp req; + if (OB_ISNULL(log_service) || !ctx.is_valid()) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", K(ret), K(log_service), K(ctx)); + } else if (OB_FAIL(deserialize(ctx, req))) { + CLOG_LOG(WARN, "ObRestoreCheckReq deserialize error", K(ret), K(log_service), K(ctx)); + } else { + if (OB_FAIL(log_service->process_query_restore_end_id_resp(ctx.server_, req.last_restore_log_id_))) { + CLOG_LOG(WARN, "process_query_restore_end_id_resp failed", K(ret), K(log_service), K(ctx), K(req)); + } + } + CLOG_LOG(DEBUG, "process_query_restore_end_id_resp finished", K(ret), K(log_service), K(ctx), K(req)); + return ret; +} + int ObPartitionLogPacketHandler::receive_renew_ms_log_confirmed_info(LogService* log_service, Context& ctx) { int ret = OB_SUCCESS; @@ -565,7 +607,7 @@ int ObPartitionLogPacketHandler::receive_renew_ms_log_confirmed_info(LogService* CLOG_LOG(WARN, "receive_renew_ms_log_confirmed_info failed", K(ret), K(log_service), K(ctx), K(req)); } } - CLOG_LOG(INFO, "receive_renew_ms_log_confirmed_info", K(ret), K(log_service), K(ctx), K(req)); + CLOG_LOG(DEBUG, "receive_renew_ms_log_confirmed_info", K(ret), K(log_service), K(ctx), K(req)); return ret; } @@ -645,7 +687,7 @@ int ObPartitionLogPacketHandler::process_reject_msg(LogService* log_service, Con } else if (OB_FAIL(deserialize(ctx, req))) { CLOG_LOG(WARN, "deserialize failed", K(ret), K(ctx)); } else { - ret = log_service->process_reject_msg(ctx.server_, req.msg_type_, req.timestamp_); + ret = log_service->process_reject_msg(ctx.server_, ctx.cluster_id_, req.msg_type_, req.timestamp_); if (OB_SUCCESS != ret && REACH_TIME_INTERVAL(5 * 1000 * 1000)) { CLOG_LOG(WARN, "process_reject_msg failed", K(ret), K(req), K(ctx)); } diff --git a/src/clog/ob_partition_log_packet_handler.h b/src/clog/ob_partition_log_packet_handler.h index 5c2c8df05d..1462cb58bd 100644 --- a/src/clog/ob_partition_log_packet_handler.h +++ b/src/clog/ob_partition_log_packet_handler.h @@ -73,6 +73,8 @@ protected: static int replace_sick_child(LogService* log_service, Context& ctx); static int process_leader_max_log_msg(LogService* log_service, Context& ctx); static int process_check_rebuild_req(LogService* log_service, Context& ctx); + static int process_restore_check_req(LogService* log_service, Context& ctx); + static int process_query_restore_end_id_resp(LogService* log_service, Context& ctx); private: storage::ObPartitionService* partition_service_; diff --git a/src/clog/ob_partition_log_service.cpp b/src/clog/ob_partition_log_service.cpp index 31309b6ed4..d764565a55 100644 --- a/src/clog/ob_partition_log_service.cpp +++ b/src/clog/ob_partition_log_service.cpp @@ -104,7 +104,6 @@ ObPartitionLogService::ObPartitionLogService() zone_priority_(UINT64_MAX), is_candidate_(true), // default true for recording change last_rebuild_time_(OB_INVALID_TIMESTAMP), - last_check_standby_ms_time_(OB_INVALID_TIMESTAMP), ack_log_time_(OB_INVALID_TIMESTAMP), recv_child_next_ilog_ts_time_(OB_INVALID_TIMESTAMP), submit_log_mc_time_(OB_INVALID_TIMESTAMP), @@ -585,53 +584,6 @@ int ObPartitionLogService::get_role_unlock(int64_t& leader_epoch, ObTsWindows& c return ret; } -// used for reporting meta_table -int ObPartitionLogService::get_role_for_partition_table(common::ObRole& role) const -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else { - RLockGuard guard(lock_); - role = FOLLOWER; - if (!restore_mgr_.is_archive_restoring()) { - int64_t unused_epoch = 0; - ObTsWindows unused_windows; - if (OB_SUCCESS == get_role_for_partition_table_unlock(unused_epoch, unused_windows)) { - // LEADER or STANDBY_LEADER or RESTORE_LEADER, and lease is valid - role = state_mgr_.get_role(); - } else { - // lease is expired or role is FOLLOWER - role = FOLLOWER; - } - } else { - role = restore_mgr_.get_role(); - } - } - return ret; -} - -int ObPartitionLogService::get_role_for_partition_table_unlock( - int64_t& leader_epoch, ObTsWindows& changing_leader_windows) const -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else { - const common::ObRole role = state_mgr_.get_role(); - if (!common::is_leader_by_election(role)) { - // check role - ret = OB_NOT_MASTER; - } else if (!check_election_leader_(leader_epoch, changing_leader_windows)) { - // check election lease - ret = OB_NOT_MASTER; - } else { - ret = OB_SUCCESS; - } - } - return ret; -} - int ObPartitionLogService::get_role_and_last_leader_active_time(common::ObRole& role, int64_t& timestamp) const { int ret = OB_SUCCESS; @@ -645,7 +597,7 @@ int ObPartitionLogService::get_role_and_last_leader_active_time(common::ObRole& if (!restore_mgr_.is_archive_restoring()) { int64_t unused_epoch = 0; ObTsWindows unused_windows; - if (OB_SUCC(get_role_for_partition_table_unlock(unused_epoch, unused_windows))) { + if (OB_SUCC(get_role_unlock(unused_epoch, unused_windows))) { role = state_mgr_.get_role(); } else if (OB_NOT_MASTER == ret) { ret = OB_SUCCESS; @@ -1203,6 +1155,27 @@ int ObPartitionLogService::change_restore_leader(const common::ObAddr& leader) return ret; } +int ObPartitionLogService::report_physical_restore_unexpected_error_() +{ + int ret = OB_SUCCESS; + int64_t job_id = 0; + const uint64_t tenant_id = partition_key_.get_tenant_id(); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(ObBackupInfoMgr::get_instance().get_restore_job_id(tenant_id, job_id))) { + if (OB_ENTRY_NOT_EXIST != ret) { + CLOG_LOG(WARN, "failed to get restore info", KR(ret), K(ret), K(tenant_id)); + } else { + CLOG_LOG(INFO, "physical restore info not exist", K(tenant_id), K(ret)); + } + } else if (OB_FAIL(ObRestoreFatalErrorReporter::get_instance().add_restore_error_task( + tenant_id, PHYSICAL_RESTORE_MOD_CLOG, OB_ERR_UNEXPECTED, job_id, MYADDR))) { + CLOG_LOG(INFO, "failed to report restore error", KR(ret), K(tenant_id), KR(ret)); + } else { + } + return ret; +} + int ObPartitionLogService::check_and_set_restore_progress() { // responsible for switching state: RESTORE_LOG->NOT_RESTORE @@ -1241,20 +1214,7 @@ int ObPartitionLogService::check_and_set_restore_progress() ret = OB_ERR_UNEXPECTED; CLOG_LOG(ERROR, "log is not enough for restoring", K_(partition_key)); SERVER_EVENT_ADD("clog_restore", "clog is not enough", "partition", partition_key_); - const uint64_t tenant_id = partition_key_.get_tenant_id(); - int tmp_ret = OB_SUCCESS; - int64_t job_id = 0; - if (OB_SUCCESS != (tmp_ret = ObBackupInfoMgr::get_instance().get_restore_job_id(tenant_id, job_id))) { - if (OB_ENTRY_NOT_EXIST != tmp_ret) { - CLOG_LOG(WARN, "failed to get restore info", KR(ret), K(tmp_ret), K(tenant_id)); - } else { - CLOG_LOG(INFO, "physical restore info not exist", K(tenant_id), K(tmp_ret)); - } - } else if (OB_SUCCESS != (tmp_ret = ObRestoreFatalErrorReporter::get_instance().add_restore_error_task( - tenant_id, PHYSICAL_RESTORE_MOD_CLOG, ret, job_id, MYADDR))) { - CLOG_LOG(INFO, "failed to report restore error", KR(tmp_ret), K(tenant_id), KR(ret)); - } else { /*do nothing*/ - } + (void)report_physical_restore_unexpected_error_(); } } @@ -1287,6 +1247,17 @@ int ObPartitionLogService::check_and_set_restore_progress() K(last_restore_log_id)); } else if (ms_log_id > last_slide_log_id) { CLOG_LOG(INFO, "need wait membership_log_id slide out", K_(partition_key), K(last_slide_log_id), K(ms_log_id)); + } else if (OB_FAIL(restore_mgr_.check_last_restore_id_majority_equal(mm_.get_curr_member_list()))) { + // all replicas need check majority's last_restore_log_id equal before update state to NOT_RESTORE. + CLOG_LOG(INFO, + "check_last_restore_id_majority_equal failed, need retry", + K_(partition_key), + K(last_restore_log_id), + K(ret)); + if (OB_EAGAIN == ret) { + // rewrite ret to SUCCESS + ret = OB_SUCCESS; + } } else { ObReplicaRestoreStatus flag = ObReplicaRestoreStatus::REPLICA_NOT_RESTORE; if (OB_FAIL(partition_service_->set_restore_flag(partition_key_, flag))) { @@ -2181,6 +2152,113 @@ bool ObPartitionLogService::is_primary_need_sync_to_standby_() const return bool_ret; } +int ObPartitionLogService::process_restore_check_req( + const common::ObAddr& server, const int64_t cluster_id, const ObRestoreCheckType restore_type) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (!server.is_valid() || OB_INVALID_CLUSTER_ID == cluster_id || OB_CHECK_UNKNOWN == restore_type) { + ret = OB_INVALID_ARGUMENT; + } else if (OB_CHECK_STANDBY_RESTORE == restore_type) { + RLockGuard guard(lock_); + if (restore_mgr_.is_standby_restore_state()) { + if (REACH_TIME_INTERVAL(100 * 1000)) { + CLOG_LOG(INFO, "self is still in restore state, ignore check restore msg", K_(partition_key), K(server)); + } + } else if (STANDBY_LEADER == state_mgr_.get_role()) { + // STANDBY_LEADER notify follower to exec restore + bool is_in_member_list = mm_.get_curr_member_list().contains(server); + const uint64_t fake_start_id = OB_INVALID_ID; + if (OB_FAIL(log_engine_->notify_follower_log_missing( + server, cluster_id, partition_key_, fake_start_id, is_in_member_list, OB_STANDBY_RESTORE_MSG))) { + CLOG_LOG(WARN, "notify_follower_log_missing failed", K_(partition_key), K(server)); + } else { + if (REACH_TIME_INTERVAL(100 * 1000)) { + CLOG_LOG(INFO, "notify follower restore finished", K_(partition_key), K(server)); + } + } + } else { + // other role, do nothing + } + } else if (OB_CHECK_RESTORE_END_ID == restore_type) { + if (OB_FAIL(process_query_restore_end_id_req_(server, cluster_id))) { + CLOG_LOG(WARN, "process_query_restore_end_id_req_ failed", K(ret), K_(partition_key), K(server)); + } + } else { + } + return ret; +} + +int ObPartitionLogService::process_query_restore_end_id_req_(const common::ObAddr& server, const int64_t cluster_id) +{ + int ret = OB_SUCCESS; + uint64_t local_restore_end_id = OB_INVALID_ID; + int64_t unused_log_ts = OB_INVALID_TIMESTAMP; + int64_t unused_version = OB_INVALID_TIMESTAMP; + + RLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL( + partition_service_->get_restore_replay_info(partition_key_, local_restore_end_id, unused_version))) { + CLOG_LOG(WARN, "failed to get_restore_replay_info", K(ret), K_(partition_key), K(local_restore_end_id)); + } else if (OB_INVALID_ID == local_restore_end_id) { + CLOG_LOG(WARN, "local_restore_end_id is invalid, cannot response", K_(partition_key), K(local_restore_end_id)); + } else if (OB_FAIL(log_engine_->send_query_restore_end_id_resp( + server, cluster_id, partition_key_, local_restore_end_id))) { + CLOG_LOG( + WARN, "send_query_restore_end_id_resp failed", K(ret), K_(partition_key), K(server), K(local_restore_end_id)); + } else { + } + return ret; +} + +int ObPartitionLogService::process_query_restore_end_id_resp( + const common::ObAddr& server, const uint64_t last_restore_log_id) +{ + int ret = OB_SUCCESS; + uint64_t local_restore_end_id = OB_INVALID_ID; + int64_t unused_log_ts = OB_INVALID_TIMESTAMP; + int64_t unused_version = OB_INVALID_TIMESTAMP; + + RLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (!server.is_valid() || OB_INVALID_ID == last_restore_log_id) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid arguments", K(ret), K_(partition_key), K(server), K(last_restore_log_id)); + } else if (!restore_mgr_.is_archive_restoring_mlist()) { + // self is not in restoring mlist state, ignore + } else if (!mm_.get_curr_member_list().contains(server)) { + CLOG_LOG(WARN, "server is not in member_list, ignore it", K_(partition_key), K(server), K(last_restore_log_id)); + } else if (OB_FAIL( + partition_service_->get_restore_replay_info(partition_key_, local_restore_end_id, unused_version))) { + CLOG_LOG(WARN, "failed to get_restore_replay_info", K(ret), K_(partition_key), K(local_restore_end_id)); + } else if (last_restore_log_id != local_restore_end_id) { + // last_restore_log_id is not match with local, unexpected error + (void)report_physical_restore_unexpected_error_(); + SERVER_EVENT_ADD("clog_restore", + "last_restore_log_id not match", + "partition", + partition_key_, + "last_restore_log_id", + local_restore_end_id); + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(ERROR, + "last_restore_log_id is not match with local, unexpected", + K(ret), + K_(partition_key), + K(local_restore_end_id), + K(last_restore_log_id), + K(server)); + } else if (OB_FAIL(restore_mgr_.process_restore_end_id_resp(server))) { + CLOG_LOG(WARN, "process_restore_end_id_resp failed", K(ret), K_(partition_key), K(server), K(last_restore_log_id)); + } else { + } + return ret; +} + int ObPartitionLogService::get_log(const common::ObAddr& server, const uint64_t start_log_id, const int64_t log_num, const ObFetchLogType fetch_type, const ObProposalID& proposal_id, const int64_t cluster_id, const common::ObReplicaType replica_type, const int64_t network_limit, const uint64_t max_confirmed_log_id) @@ -2241,6 +2319,17 @@ int ObPartitionLogService::get_log(const common::ObAddr& server, const uint64_t } else { // do nothing } + } else if (OB_FETCH_LOG_LEADER_RECONFIRM == fetch_type && + (restore_mgr_.is_archive_restoring() || restore_mgr_.is_archive_restoring_mlist())) { + // Restoring replica cannot response reconfirm fetch log req, because its last_restore_log_id maybe is + // not match with current leader. + // fix issue #35207594 + if (REACH_TIME_INTERVAL(1000 * 1000)) { + CLOG_LOG(INFO, + "self is still in archive restore state, cannot response reconfirm fetch log req", + K_(partition_key), + K(server)); + } } else { bool is_in_member_list = mm_.get_curr_member_list().contains(server); bool is_in_children_list = (cascading_mgr_.is_valid_child(server)); @@ -2364,12 +2453,6 @@ int ObPartitionLogService::get_log(const common::ObAddr& server, const uint64_t } if (OB_FETCH_LOG_RESTORE_FOLLOWER == fetch_type) { int tmp_ret = OB_SUCCESS; - if (RESTORE_LEADER == restore_mgr_.get_role()) { - if (OB_SUCCESS != - (tmp_ret = log_engine_->send_restore_alive_msg(server, cluster_id, partition_key_, start_log_id))) { - CLOG_LOG(WARN, "send_restore_alive_msg failed", K(tmp_ret), K(partition_key_), K(server)); - } - } if (start_log_id > sw_.get_max_log_id() && sw_.get_max_log_id() <= sw_.get_max_confirmed_log_id() && restore_mgr_.is_restore_log_finished()) { if (OB_SUCCESS != @@ -3188,7 +3271,7 @@ int ObPartitionLogService::primary_process_protect_mode_switch() // need use wlock to protect role // Because it may be concurrent with standby_ack_log(), if sync_standby_child changes, // it will cause primary leader incorrectly advance the log_task state - WLockGuardWithRetry guard(lock_, WRLOCK_TIMEOUT_US + ObTimeUtility::current_time()); + WLockGuard guard(lock_); timeguard.click(); if (LEADER != state_mgr_.get_role()) { // skip @@ -3489,24 +3572,7 @@ int ObPartitionLogService::check_state_() CLOG_LOG(WARN, "restore_mgr_.check_state failed", K_(partition_key), K(tmp_ret)); } } - - if (state_mgr_.is_can_elect_standby_leader()) { - const int64_t now = ObTimeUtility::current_time(); - if (STANDBY_LEADER == state_mgr_.get_role()) { - if (ATOMIC_LOAD(&last_check_standby_ms_time_) == OB_INVALID_TIMESTAMP || - now - ATOMIC_LOAD(&last_check_standby_ms_time_) >= STANDBY_CHECK_MS_INTERVAL) { - last_check_standby_ms_time_ = now; - if (OB_SUCCESS != (tmp_ret = ms_task_mgr_.check_renew_ms_log_sync_state())) { - CLOG_LOG(WARN, "check_renew_ms_log_sync_state failed", K(tmp_ret), K_(partition_key)); - } - } - } - } - - if (GCTX.is_sync_level_on_standby() && GCTX.is_standby_cluster() && STANDBY_LEADER == state_mgr_.get_role()) { - (void)state_mgr_.standby_leader_check_protection_level(); - } - + // update check time last_check_state_time_ = now; } return ret; @@ -3561,13 +3627,16 @@ int ObPartitionLogService::flush_cb(const ObLogFlushCbArg& arg) ObProposalID proposal_id = arg.proposal_id_; uint64_t max_log_id = OB_INVALID_ID; int64_t max_log_ts = OB_INVALID_TIMESTAMP; + common::ObRole role = INVALID_ROLE; do { RLockGuard guard(lock_); replica_type = mm_.get_replica_type(); sw_.get_max_log_id_info(max_log_id, max_log_ts); + role = state_mgr_.get_role(); } while (0); - if (arg_leader != self_ && arg_leader.is_valid() && ObReplicaTypeCheck::is_paxos_replica_V2(replica_type)) { + if (arg_leader != self_ && arg_leader.is_valid() && FOLLOWER == role && + ObReplicaTypeCheck::is_paxos_replica_V2(replica_type)) { tmp_ret = log_engine_->prepare_response( arg_leader, arg.cluster_id_, partition_key_, proposal_id, max_log_id, max_log_ts); CLOG_LOG(INFO, @@ -4559,7 +4628,8 @@ int ObPartitionLogService::process_keepalive_msg(const common::ObAddr& server, c if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (LEADER == state_mgr_.get_role() || (parent.is_valid() && !is_valid_parent)) { + } else if (LEADER == state_mgr_.get_role() || (parent.is_valid() && !is_valid_parent) || + state_mgr_.is_diff_cluster_req_in_disabled_state(cluster_id)) { if (partition_reach_time_interval(30 * 1000 * 1000, send_reject_msg_warn_time_)) { CLOG_LOG(WARN, "sender is unexpected, need reject", @@ -4571,13 +4641,21 @@ int ObPartitionLogService::process_keepalive_msg(const common::ObAddr& server, c K(next_log_id), K(next_log_ts_lb), "replica_type", - replica_type); + replica_type, + "self_cluster_id", + state_mgr_.get_self_cluster_id()); } ObReplicaMsgType msg_type = OB_REPLICA_MSG_TYPE_NOT_CHILD; (void)cascading_mgr_.reject_server(server, cluster_id, msg_type); } else if (deliver_cnt > MAX_DELIVER_CNT) { - ret = OB_ERR_UNEXPECTED; - CLOG_LOG(ERROR, + // Because every cascading msg is async, a replica A is added to its new parent B's children_list firstly, + // and then A is noticed to update parent to B. There is a time gap between these two events. + // During this time gap, B may assign A as other replica C's parent, and the typo maybe is C->D->A before A update + // parent. So in order to support smooth switching of parent, we need to accept short-term loop case and just break + // it immediately. + ObCascadMemberList children_list; + (void)cascading_mgr_.get_children_list(children_list); + CLOG_LOG(WARN, "deliver_cnt is larger than MAX_DELIVER_CNT, maybe loop appears", K(ret), K_(partition_key), @@ -4585,7 +4663,8 @@ int ObPartitionLogService::process_keepalive_msg(const common::ObAddr& server, c K(next_log_ts_lb), K(deliver_cnt), "parent", - cascading_mgr_.get_parent()); + cascading_mgr_.get_parent(), + K(children_list)); // if predict there is loop, reset children_list to break it cascading_mgr_.reset_children_list(); } else if (OB_FAIL(sw_.follower_update_leader_next_log_info(next_log_id, next_log_ts_lb))) { @@ -5566,6 +5645,15 @@ int ObPartitionLogService::set_member_list(const ObMemberList& member_list, cons K(assigned_leader), K(lease_start)); } + } else { + // assigned_leader maybe invalid in physical restoring scenario, skip + CLOG_LOG(WARN, + "assigned_leader is invlaid", + K(ret), + K_(partition_key), + K(member_list), + K(assigned_leader), + K(lease_start)); } if (OB_SUCC(ret)) { @@ -6058,10 +6146,29 @@ int ObPartitionLogService::try_update_leader_from_loc_cache() int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (state_mgr_.try_update_leader_from_loc_cache()) { - CLOG_LOG(WARN, "try_update_leader_from_loc_cache failed", K_(partition_key), K(ret)); } else { - CLOG_LOG(DEBUG, "try_update_leader_from_loc_cache", K_(partition_key)); + bool is_self_strong_leader = false; + do { + // It need RDLock here, because state_mgr_ may update role concurrently. + RLockGuard guard(lock_); + if (LEADER == state_mgr_.get_role()) { + is_self_strong_leader = true; + } + } while (0); + + if (is_self_strong_leader) { + // strong leader no need check location cache + } else { + // WRLock is needed here, because staet_mgr_ may update leader_ member. + WLockGuardWithTimeout guard(lock_, WRLOCK_TIMEOUT_US + ObTimeUtility::current_time(), ret); + if (OB_FAIL(ret)) { + CLOG_LOG(WARN, "wrlock failed", K(ret), K(partition_key_)); + } else if (state_mgr_.try_update_leader_from_loc_cache()) { + CLOG_LOG(WARN, "try_update_leader_from_loc_cache failed", K_(partition_key), K(ret)); + } else { + } + } + CLOG_LOG(DEBUG, "try_update_leader_from_loc_cache", K_(partition_key), K(is_self_strong_leader)); } return ret; } @@ -6375,7 +6482,7 @@ int ObPartitionLogService::process_replica_type_change_() } int ObPartitionLogService::process_reject_msg( - const common::ObAddr& server, const int32_t msg_type, const int64_t send_ts) + const common::ObAddr& server, const int64_t cluster_id, const int32_t msg_type, const int64_t send_ts) { int ret = OB_SUCCESS; const int64_t now = ObTimeUtility::current_time(); @@ -6388,7 +6495,7 @@ int ObPartitionLogService::process_reject_msg( CLOG_LOG(WARN, "msg is expired", K(ret), K_(partition_key), K(server), K(msg_type), K(send_ts)); } else { RLockGuard guard(lock_); - ret = cascading_mgr_.process_reject_msg(server, msg_type); + ret = cascading_mgr_.process_reject_msg(server, cluster_id, msg_type); } return ret; } @@ -6576,6 +6683,7 @@ int ObPartitionLogService::check_is_normal_partition(bool& is_normal_partition) bool is_disk_not_enough = false; bool is_disk_error = false; bool is_clog_disk_error = false; + bool is_archive_restoring = false; if (IS_NOT_INIT) { ret = OB_NOT_INIT; CLOG_LOG(ERROR, "ObPartitionLogService is not inited", K(ret), K(partition_key_)); @@ -6586,7 +6694,11 @@ int ObPartitionLogService::check_is_normal_partition(bool& is_normal_partition) } else { is_clog_disk_error = log_engine_->is_clog_disk_error(); is_disk_not_enough = !log_engine_->is_disk_space_enough(); - is_normal_partition = !(is_disk_not_enough || is_out_of_memory || is_disk_error || is_clog_disk_error); + // physical restoring replica cannot participate in member change. + // because its election module has not been started, it cannot vote. + is_archive_restoring = restore_mgr_.is_archive_restoring(); + is_normal_partition = + !(is_disk_not_enough || is_out_of_memory || is_disk_error || is_clog_disk_error || is_archive_restoring); } return ret; } diff --git a/src/clog/ob_partition_log_service.h b/src/clog/ob_partition_log_service.h index 0feda65c68..e5f5889406 100644 --- a/src/clog/ob_partition_log_service.h +++ b/src/clog/ob_partition_log_service.h @@ -256,7 +256,6 @@ public: virtual int set_restore_fetch_log_finished(ObArchiveFetchLogResult fetch_log_result) = 0; virtual int try_update_next_replay_log_ts_in_restore(const int64_t new_ts) = 0; virtual int get_role(common::ObRole& role) const = 0; - virtual int get_role_for_partition_table(common::ObRole& role) const = 0; virtual int get_role_unsafe(int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows) const = 0; virtual int get_role_unlock(int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows) const = 0; virtual int get_role_and_last_leader_active_time(common::ObRole& role, int64_t& timestamp) const = 0; @@ -281,6 +280,8 @@ public: virtual int ack_renew_ms_log(const ObAddr& server, const uint64_t log_id, const int64_t submit_timestamp, const ObProposalID& ms_proposal_id) = 0; virtual int fake_receive_log(const ObAddr& server, const uint64_t log_id, const ObProposalID& proposal_id) = 0; + virtual int process_restore_check_req( + const common::ObAddr& server, const int64_t cluster_id, const ObRestoreCheckType restore_type) = 0; virtual int get_log(const common::ObAddr& server, const uint64_t log_id, const int64_t log_num, const ObFetchLogType fetch_type, const common::ObProposalID& proposal_id, const int64_t cluster_id, const common::ObReplicaType replica_type, const int64_t network_limit, const uint64_t max_confirmed_log_id) = 0; @@ -344,7 +345,8 @@ public: const share::ObCascadMemberList& candidate_list, const int32_t msg_type) = 0; virtual int replace_sick_child( const common::ObAddr& sender, const int64_t cluster_id, const common::ObAddr& sick_child) = 0; - virtual int process_reject_msg(const common::ObAddr& server, const int32_t msg_type, const int64_t timestamp) = 0; + virtual int process_reject_msg( + const common::ObAddr& server, const int64_t cluster_id, const int32_t msg_type, const int64_t timestamp) = 0; virtual int process_reregister_msg( const common::ObAddr& src_server, const share::ObCascadMember& new_leader, const int64_t send_ts) = 0; virtual int process_restore_alive_msg(const common::ObAddr& server, const uint64_t start_log_id) = 0; @@ -482,6 +484,7 @@ public: virtual int check_and_try_leader_revoke(const election::ObElection::RevokeType& revoke_type) = 0; virtual int renew_ms_log_flush_cb(const storage::ObMsInfoTask& task) = 0; virtual int try_update_leader_from_loc_cache() = 0; + virtual int process_query_restore_end_id_resp(const common::ObAddr& server, const uint64_t last_restore_log_id) = 0; }; class ObPartitionLogService : public ObIPartitionLogService { @@ -527,11 +530,8 @@ public: virtual int process_restore_takeover_msg(const int64_t send_ts) override; virtual int try_update_next_replay_log_ts_in_restore(const int64_t new_ts) override; virtual int get_role(common::ObRole& role) const override; - virtual int get_role_for_partition_table(common::ObRole& role) const override; virtual int get_role_unsafe(int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows) const override; virtual int get_role_unlock(int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows) const override; - virtual int get_role_for_partition_table_unlock( - int64_t& leader_epoch, common::ObTsWindows& changing_leader_windows) const; virtual int get_role_and_last_leader_active_time(common::ObRole& role, int64_t& timestamp) const override; virtual int get_role_and_leader_epoch(common::ObRole& role, int64_t& leader_epoch) override; virtual int get_role_and_leader_epoch(common::ObRole& role, int64_t& leader_epoch, int64_t& takeover_time) override; @@ -555,6 +555,8 @@ public: virtual int ack_renew_ms_log(const ObAddr& server, const uint64_t log_id, const int64_t submit_timestamp, const ObProposalID& ms_proposal_id) override; virtual int fake_receive_log(const ObAddr& server, const uint64_t log_id, const ObProposalID& proposal_id) override; + virtual int process_restore_check_req( + const common::ObAddr& server, const int64_t cluster_id, const ObRestoreCheckType restore_type); virtual int get_log(const common::ObAddr& server, const uint64_t log_id, const int64_t log_num, const ObFetchLogType fetch_type, const common::ObProposalID& proposal_id, const int64_t cluster_id, const common::ObReplicaType replica_type, const int64_t network_limit, @@ -746,6 +748,8 @@ public: virtual int restore_leader_try_confirm_log() override; virtual bool is_standby_restore_state() const override; virtual int check_and_try_leader_revoke(const election::ObElection::RevokeType& revoke_type) override; + virtual int process_query_restore_end_id_resp( + const common::ObAddr& server, const uint64_t last_restore_log_id) override; private: enum { DEFAULT_TIMEOUT = 10 * 1000 * 1000 }; @@ -787,7 +791,8 @@ private: int64_t get_zone_priority() const; int response_sliding_window_info_(const common::ObAddr& server, const bool is_leader); int process_replica_type_change_(); - int process_reject_msg(const common::ObAddr& server, const int32_t msg_type, const int64_t timestamp) override; + int process_reject_msg( + const common::ObAddr& server, const int64_t cluster_id, const int32_t msg_type, const int64_t timestamp); int process_reregister_msg( const common::ObAddr& src_server, const share::ObCascadMember& new_leader, const int64_t send_ts) override; int process_restore_alive_msg(const common::ObAddr& server, const uint64_t start_log_id) override; @@ -815,6 +820,7 @@ private: const int64_t switchover_epoch, const uint64_t leader_max_log_id, const int64_t leader_next_log_ts) const; int send_max_log_msg_to_mlist_( const int64_t switchover_epoch, const uint64_t leader_max_log_id, const int64_t leader_next_log_ts) const; + int process_query_restore_end_id_req_(const common::ObAddr& server, const int64_t cluster_id); bool is_paxos_offline_replica_() const; bool is_no_update_next_replay_log_id_info_too_long_() const; int check_state_(); @@ -854,6 +860,7 @@ private: bool is_primary_need_sync_to_standby_() const; bool is_tenant_out_of_memory_() const; int get_role_and_leader_epoch_unlock_(common::ObRole& role, int64_t& leader_epoch, int64_t& takeover_time); + int report_physical_restore_unexpected_error_(); private: typedef common::RWLock RWLock; @@ -863,8 +870,6 @@ private: typedef RWLock::WLockGuardWithRetry WLockGuardWithRetry; typedef common::ObSEArray ServerArray; static const int64_t REBUILD_REPLICA_INTERVAL = 2l * 60l * 1000l * 1000l; - static const int64_t STANDBY_CHECK_RESTORE_STATE_INTERVAL = 15l * 1000l * 1000l; - static const int64_t STANDBY_CHECK_MS_INTERVAL = 60l * 1000l * 1000l; static const int64_t WRLOCK_TIMEOUT_US = 100 * 1000; // 100ms bool is_inited_; @@ -892,8 +897,6 @@ private: uint64_t zone_priority_; bool is_candidate_; int64_t last_rebuild_time_; - int64_t last_check_restore_state_time_; - int64_t last_check_standby_ms_time_; mutable int64_t ack_log_time_; mutable int64_t recv_child_next_ilog_ts_time_; mutable int64_t submit_log_mc_time_; diff --git a/src/election/ob_election.cpp b/src/election/ob_election.cpp index e8d0d042b6..9e7b79bd94 100644 --- a/src/election/ob_election.cpp +++ b/src/election/ob_election.cpp @@ -50,7 +50,8 @@ const char* const ObElection::REVOKE_REASON_STR[REVOKE_TYPE_MAX] = {"leader leas "standby replica restore failed", "memory alloc failed when submit_log", "clog disk full", - "clog disk hang"}; + "clog disk hang", + "takeover with old epoch"}; void ObElection::reset() { @@ -1424,7 +1425,7 @@ int ObElection::change_leader_async(const ObAddr& leader, ObTsWindows& changing_ } // This function can be used to trigger self revoke->takeover, and update leader_epoch. -int ObElection::change_leader_to_self_async() +int ObElection::change_leader_to_self() { int ret = OB_SUCCESS; const ObAddr new_leader = self_; diff --git a/src/election/ob_election.h b/src/election/ob_election.h index 1536233499..77395d45bc 100644 --- a/src/election/ob_election.h +++ b/src/election/ob_election.h @@ -83,7 +83,7 @@ public: virtual int set_candidate( const int64_t replica_num, const common::ObMemberList& curr_mlist, const int64_t membership_version) = 0; virtual int change_leader_async(const common::ObAddr& leader, common::ObTsWindows& changing_leader_windows) = 0; - virtual int change_leader_to_self_async() = 0; + virtual int change_leader_to_self() = 0; virtual int force_leader_async() = 0; virtual int get_curr_candidate(common::ObMemberList& mlist) const = 0; virtual int get_valid_candidate(common::ObMemberList& mlist) const = 0; @@ -168,7 +168,7 @@ public: int set_candidate( const int64_t replica_num, const common::ObMemberList& curr_mlist, const int64_t membership_version) override; int change_leader_async(const common::ObAddr& leader, common::ObTsWindows& changing_leader_windows) override; - int change_leader_to_self_async() override; + int change_leader_to_self() override; int force_leader_async() override; int get_curr_candidate(common::ObMemberList& mlist) const override; int get_valid_candidate(common::ObMemberList& mlist) const override; @@ -240,21 +240,19 @@ public: LEASE_EXPIRED = 0, NOT_CANDIDATE, DISK_ERROR, - RECONFIRM_TIMEOUT, // CLOG reconfirm timeout - CLOG_SW_TIMEOUT, // CLOG sliding window timeout - ROLE_CHANGE_TIMEOUT, // CLOG role change timeout - REPLICA_TYPE_DISALLOW, // replica_type now allow to be leader - MEMBER_LIST_DISALLOW, // member_list not contain self - PS_LEADER_ACTIVE_FAIL, // partition_service leader_active fail - CLUSTER_ROLE_SWITCH, // all leader revoke when cluster role switching - OFS_MIGRATE_REVOKE_SRC, // src server's leader revoke in OFS mode - TRANS_CB_ERROR, // leader need revoke when CLOG call transation's callback failed - STANDBY_RESTORE_FAIL, // standby restore fail - SUBMIT_LOG_MEMORY_ALLOC_FAIL, // submit_log memory alloc fail - RESTORE_LEADER_SUBMIT_TASK_FAIL, // standby restore fail` - CLOG_DISK_FULL, // CLOG disk full - CLOG_DISK_HANG, // CLOG DISK HANG - SHARED_STORAGE_LEASE_EXPIRED, // lease expired in shared storage mode + RECONFIRM_TIMEOUT, // CLOG reconfirm timeout + CLOG_SW_TIMEOUT, // CLOG sliding window timeout + ROLE_CHANGE_TIMEOUT, // CLOG role change timeout + REPLICA_TYPE_DISALLOW, // replica_type now allow to be leader + MEMBER_LIST_DISALLOW, // member_list not contain self + PS_LEADER_ACTIVE_FAIL, // partition_service leader_active fail + CLUSTER_ROLE_SWITCH, // all leader revoke when cluster role switching + TRANS_CB_ERROR, // leader need revoke when CLOG call transation's callback failed + STANDBY_RESTORE_FAIL, // standby restore fail + SUBMIT_LOG_MEMORY_ALLOC_FAIL, // submit_log memory alloc fail + CLOG_DISK_FULL, // CLOG disk full + CLOG_DISK_HANG, // CLOG DISK HANG + EPOCH_NOT_CHANGE, // takeover with same epoch REVOKE_TYPE_MAX }; static const char* const REVOKE_REASON_STR[RevokeType::REVOKE_TYPE_MAX]; diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index e49c44a1a2..1dffd98302 100644 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -269,6 +269,7 @@ class ObString; ACT(BEFORE_UPDATE_RESTORE_FLAG_RESTORE_LOG, ) \ ACT(SLOW_TXN_DURING_2PC_COMMIT_PHASE_FOR_PHYSICAL_BACKUP_1055, ) \ ACT(BEFORE_FORCE_DROP_SCHEMA, ) \ + ACT(BLOCK_CLOG_PRIMARY_RECONFIRM,) \ ACT(MAX_DEBUG_SYNC_POINT, ) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/ob_server_locality_cache.cpp b/src/share/ob_server_locality_cache.cpp index ae9145d224..250bc7d96d 100644 --- a/src/share/ob_server_locality_cache.cpp +++ b/src/share/ob_server_locality_cache.cpp @@ -242,11 +242,16 @@ int ObServerLocalityCache::get_server_region(const common::ObAddr& server, commo for (int64_t i = 0; !is_found && i < server_locality_array_.count(); ++i) { const share::ObServerLocality& server_locality = server_locality_array_.at(i); if (server_locality.get_addr() == server) { - region = server_locality.get_region(); - is_found = true; + if (!server_locality.get_region().is_empty()) { + // assign region only when it is not empty + region = server_locality.get_region(); + is_found = true; + } } } if (!is_found) { + // if its locality is not found or its region is empty, we will try + // get region from region_map. ret = get_server_region_from_map_(server, region); LOG_TRACE("not found server in server_locality_array_", K(ret), K(server), K(region)); } diff --git a/src/share/partition_table/ob_persistent_partition_table.cpp b/src/share/partition_table/ob_persistent_partition_table.cpp index ba08278a35..062dca63f8 100644 --- a/src/share/partition_table/ob_persistent_partition_table.cpp +++ b/src/share/partition_table/ob_persistent_partition_table.cpp @@ -483,8 +483,9 @@ int ObPersistentPartitionTable::execute(ObPartitionTableProxy* proxy, const ObPa LOG_WARN("data_version must > 0 for leader replica", K(ret), K(replica)); } else if (replica.to_leader_time_ <= 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("change to leader time must > 0 for leader replica", K(ret), K(replica)); - } else if (OB_FAIL(GCTX.par_ser_->get_role_for_partition_table(replica.partition_key(), new_role))) { + LOG_WARN("change to leader time must > 0 for leader replica", + K(ret), K(replica)); + } else if (OB_FAIL(GCTX.par_ser_->get_role(replica.partition_key(), new_role))) { ret = OB_SUCCESS; new_replica.role_ = FOLLOWER; } else if (is_follower(new_role)) { diff --git a/src/storage/ob_i_partition_group.h b/src/storage/ob_i_partition_group.h index 7da4d52e79..30520dd77c 100644 --- a/src/storage/ob_i_partition_group.h +++ b/src/storage/ob_i_partition_group.h @@ -256,7 +256,6 @@ public: // leader or follower virtual int get_role(common::ObRole& role) const = 0; - virtual int get_role_for_partition_table(common::ObRole& role) const = 0; virtual int get_role_unsafe(common::ObRole& role) const = 0; virtual int get_leader_curr_member_list(common::ObMemberList& member_list) const = 0; virtual int get_leader(common::ObAddr& leader) const = 0; diff --git a/src/storage/ob_partition_group.cpp b/src/storage/ob_partition_group.cpp index d75b291198..a530c0a08a 100644 --- a/src/storage/ob_partition_group.cpp +++ b/src/storage/ob_partition_group.cpp @@ -435,16 +435,7 @@ int ObPartitionGroup::get_role(common::ObRole& role) const return ret; } -int ObPartitionGroup::get_role_for_partition_table(common::ObRole& role) const -{ - int ret = OB_SUCCESS; - if (OB_SUCCESS == (ret = check_init_(pls_, "partition log service"))) { - ret = pls_->get_role_for_partition_table(role); - } - return ret; -} - -int ObPartitionGroup::get_role_unsafe(common::ObRole& role) const +int ObPartitionGroup::get_role_unsafe(common::ObRole &role) const { int ret = OB_SUCCESS; if (OB_SUCCESS == (ret = check_init_(pls_, "partition log service"))) { diff --git a/src/storage/ob_partition_group.h b/src/storage/ob_partition_group.h index e243819e1d..1db82057c8 100644 --- a/src/storage/ob_partition_group.h +++ b/src/storage/ob_partition_group.h @@ -140,7 +140,6 @@ public: const common::ObNewRow& row, ObLockFlag lock_flag) override; virtual int get_role(common::ObRole& role) const; - virtual int get_role_for_partition_table(common::ObRole& role) const; virtual int get_role_unsafe(common::ObRole& role) const; virtual int get_leader_curr_member_list(common::ObMemberList& member_list) const; virtual int get_leader(common::ObAddr& leader) const; diff --git a/src/storage/ob_partition_service.cpp b/src/storage/ob_partition_service.cpp index e9b13f3918..f500dcf9a1 100644 --- a/src/storage/ob_partition_service.cpp +++ b/src/storage/ob_partition_service.cpp @@ -2274,6 +2274,37 @@ void ObPartitionService::submit_pt_update_task_(const ObPartitionKey& pkey, cons UNUSED(ret); } +int ObPartitionService::submit_pg_pt_update_task_(const ObPartitionKey& pkey) +{ + int ret = OB_SUCCESS; + ObPartitionArray pkeys; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + } else if (!pkey.is_valid()) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(pkey)); + } else if (pkey.is_pg()) { + ObIPartitionGroupGuard guard; + ObIPartitionGroup* partition = NULL; + if (OB_FAIL(get_partition(pkey, guard))) { + STORAGE_LOG(WARN, "get partition failed, ", K(ret), K(pkey)); + } else if (OB_UNLIKELY(NULL == (partition = guard.get_partition_group()))) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "Unexpected error, the partition is NULL, ", K(pkey)); + } else if (OB_FAIL(partition->get_all_pg_partition_keys(pkeys))) { + STORAGE_LOG(WARN, "get all pg partition keys error", K(ret), K(pkey)); + } + rs_cb_->submit_pg_pt_update_task(pkeys); + } else { + if (OB_FAIL(pkeys.push_back(pkey))) { + STORAGE_LOG(WARN, "pkeys push back error", K(ret), K(pkey)); + } + rs_cb_->submit_pg_pt_update_task(pkeys); + } + return ret; +} + /** create partition group and create partitions (1) create pg @@ -4462,29 +4493,6 @@ int ObPartitionService::get_role(const common::ObPartitionKey& pkey, common::ObR return ret; } -int ObPartitionService::get_role_for_partition_table(const common::ObPartitionKey& pkey, common::ObRole& role) const -{ - int ret = OB_SUCCESS; - ObIPartitionGroupGuard guard; - if (OB_UNLIKELY(!is_inited_)) { - ret = OB_NOT_INIT; - STORAGE_LOG(WARN, "partition service is not initiated", K(ret)); - } else if (OB_UNLIKELY(!pkey.is_valid())) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(DEBUG, "invalid argument", K(ret)); - } else if (OB_FAIL(get_partition(pkey, guard))) { - STORAGE_LOG(DEBUG, "get partition failed", K(pkey), K(ret)); - } else if (OB_ISNULL(guard.get_partition_group())) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret)); - } else if (OB_FAIL(guard.get_partition_group()->get_role_for_partition_table(role))) { - STORAGE_LOG(WARN, "get_role_for_partition_table failed", K(pkey), K(ret)); - } else { - // do nothing - } - return ret; -} - int ObPartitionService::get_role_unsafe(const common::ObPartitionKey& pkey, common::ObRole& role) const { class GetRoleFunctor { @@ -7908,6 +7916,7 @@ int ObPartitionService::process_ms_info_task(ObMsInfoTask& task) { int ret = OB_SUCCESS; ObTimeGuard timeguard("process_ms_info_task", 50L * 1000L); + const int64_t now = ObTimeUtility::current_time(); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; @@ -7915,6 +7924,10 @@ int ObPartitionService::process_ms_info_task(ObMsInfoTask& task) } else if (!task.is_valid()) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument", K(ret), K(task)); + } else if (now - task.get_gen_ts() >= ObSlogWriterQueueThread::SLOG_FLUSH_TASK_TIMEOUT_THRESHOLD) { + if (REACH_TIME_INTERVAL(100 * 1000)) { + STORAGE_LOG(WARN, "task has been timeout, drop it", K(ret), K(task)); + } } else if (OB_FAIL(on_member_change_success(task.get_pkey(), task.get_log_type(), task.get_ms_log_id(), @@ -9942,7 +9955,8 @@ int ObPartitionService::submit_ms_info_task(const common::ObPartitionKey& pkey, K(prev_member_list), K(curr_member_list)); } else { - ObMsInfoTask task(pkey, server, cluster_id, log_type, ms_log_id, mc_timestamp, replica_num, ms_proposal_id); + const int64_t now = ObTimeUtility::current_time(); + ObMsInfoTask task(pkey, server, cluster_id, log_type, ms_log_id, mc_timestamp, replica_num, ms_proposal_id, now); if (OB_FAIL(task.update_prev_member_list(prev_member_list))) { STORAGE_LOG(WARN, "update_prev_member_list failed", K(task), K(ret)); } else if (OB_FAIL(task.update_curr_member_list(curr_member_list))) { @@ -10248,6 +10262,8 @@ int ObPartitionService::internal_leader_active(const ObCbTask& active_task) STORAGE_LOG(WARN, "get partition failed", K(pkey), K(ret)); } else if (OB_FAIL(rs_cb_->submit_pt_update_role_task(pkey))) { STORAGE_LOG(WARN, "internal_leader_active callback failed", K(pkey), K(ret)); + } else if (OB_FAIL(submit_pg_pt_update_task_(pkey))) { + STORAGE_LOG(WARN, "submit_pg_pt_update_task_ failed", K(pkey), K(ret)); } else if (LEADER != active_task.role_) { // only LEADER need exec ret = OB_ERR_UNEXPECTED; @@ -11941,9 +11957,8 @@ bool ObReplicaOpArg::is_physical_restore_follower() const bool ObReplicaOpArg::is_FtoL() const { - return CHANGE_REPLICA_OP == type_ - && ObReplicaType::REPLICA_TYPE_FULL == src_.get_replica_type() - && ObReplicaType::REPLICA_TYPE_LOGONLY == dst_.get_replica_type(); + return CHANGE_REPLICA_OP == type_ && ObReplicaType::REPLICA_TYPE_FULL == src_.get_replica_type() && + ObReplicaType::REPLICA_TYPE_LOGONLY == dst_.get_replica_type(); } bool ObReplicaOpArg::is_standby_restore() const diff --git a/src/storage/ob_partition_service.h b/src/storage/ob_partition_service.h index c1b7a20cae..e4744e22c4 100644 --- a/src/storage/ob_partition_service.h +++ b/src/storage/ob_partition_service.h @@ -554,7 +554,6 @@ public: const uint64_t table_id, const common::ObAddr& server, DupReplicaType& dup_replica_type); VIRTUAL_FOR_UNITTEST int get_replica_status(const common::ObPartitionKey& pkey, share::ObReplicaStatus& status) const; VIRTUAL_FOR_UNITTEST int get_role(const common::ObPartitionKey& pkey, common::ObRole& role) const; - VIRTUAL_FOR_UNITTEST int get_role_for_partition_table(const common::ObPartitionKey& pkey, common::ObRole& role) const; VIRTUAL_FOR_UNITTEST int get_role_unsafe(const common::ObPartitionKey& pkey, common::ObRole& role) const; VIRTUAL_FOR_UNITTEST int get_leader_curr_member_list( const common::ObPartitionKey& pkey, common::ObMemberList& member_list) const; @@ -997,6 +996,7 @@ private: common::ObIArray& target_batch_arg, common::ObIArray& batch_res); void free_partition_list(ObArray& partition_list); void submit_pt_update_task_(const ObPartitionKey& pkey, const bool need_report_checksum = true); + int submit_pg_pt_update_task_(const ObPartitionKey &pkey); int try_inc_total_partition_cnt(const int64_t new_partition_cnt, const bool need_check); int physical_flashback(); int clean_all_clog_files_(); diff --git a/src/storage/ob_slog_writer_queue_thread.cpp b/src/storage/ob_slog_writer_queue_thread.cpp index ab340099a6..598ed4636c 100644 --- a/src/storage/ob_slog_writer_queue_thread.cpp +++ b/src/storage/ob_slog_writer_queue_thread.cpp @@ -27,6 +27,7 @@ ObMsInfoTask& ObMsInfoTask::operator=(const ObMsInfoTask& rv) this->set_cluster_id(rv.get_cluster_id()); this->set_ms_log_id(rv.get_ms_log_id()); this->set_mc_timestamp(rv.get_mc_timestamp()); + this->set_gen_ts(rv.get_gen_ts()); this->set_replica_num(rv.get_replica_num()); this->set_ms_proposal_id(rv.get_ms_proposal_id()); (void)this->update_prev_member_list(rv.get_prev_member_list()); @@ -66,6 +67,11 @@ void ObMsInfoTask::set_mc_timestamp(const int64_t mc_timestamp) mc_timestamp_ = mc_timestamp; } +void ObMsInfoTask::set_gen_ts(const int64_t gen_ts) +{ + gen_ts_ = gen_ts; +} + void ObMsInfoTask::set_replica_num(const int64_t replica_num) { replica_num_ = replica_num; diff --git a/src/storage/ob_slog_writer_queue_thread.h b/src/storage/ob_slog_writer_queue_thread.h index 0981881961..982f999d45 100644 --- a/src/storage/ob_slog_writer_queue_thread.h +++ b/src/storage/ob_slog_writer_queue_thread.h @@ -36,11 +36,12 @@ public: replica_num_(0), prev_member_list_(), curr_member_list_(), - ms_proposal_id_() + ms_proposal_id_(), + gen_ts_(OB_INVALID_TIMESTAMP) {} ObMsInfoTask(const common::ObPartitionKey& pkey, const common::ObAddr& server, const int64_t cluster_id, const clog::ObLogType log_type, const uint64_t ms_log_id, const int64_t mc_timestamp, const int64_t replica_num, - const common::ObProposalID& ms_proposal_id) + const common::ObProposalID& ms_proposal_id, const int64_t gen_ts) : pkey_(pkey), server_(server), cluster_id_(cluster_id), @@ -48,13 +49,14 @@ public: ms_log_id_(ms_log_id), mc_timestamp_(mc_timestamp), replica_num_(replica_num), - ms_proposal_id_(ms_proposal_id) + ms_proposal_id_(ms_proposal_id), + gen_ts_(gen_ts) {} ~ObMsInfoTask() {} bool is_valid() const { - return pkey_.is_valid(); + return (pkey_.is_valid() && OB_INVALID_TIMESTAMP != gen_ts_); } common::ObPartitionKey get_pkey() const { @@ -103,15 +105,24 @@ public: void set_ms_log_id(const uint64_t ms_log_id); void set_mc_timestamp(const int64_t mc_timestamp); void set_replica_num(const int64_t replica_num); - void set_ms_proposal_id(const common::ObProposalID& ms_proposal_id); - int update_prev_member_list(const common::ObMemberList& prev_member_list); - int update_curr_member_list(const common::ObMemberList& curr_member_list); - ObMsInfoTask& operator=(const ObMsInfoTask& rv); - - TO_STRING_KV(N_KEY, pkey_, "server", server_, "cluster_id", cluster_id_, "log_type", log_type_, "ms_log_id", - ms_log_id_, "mc_timestamp", mc_timestamp_, "replica_num", replica_num_, "prev_member_list", prev_member_list_, - "curr_member_list", curr_member_list_, "ms_proposal_id", ms_proposal_id_); + void set_ms_proposal_id(const common::ObProposalID &ms_proposal_id); + int update_prev_member_list(const common::ObMemberList &prev_member_list); + int update_curr_member_list(const common::ObMemberList &curr_member_list); + void set_gen_ts(const int64_t gen_ts); + int64_t get_gen_ts() const { return gen_ts_; } + ObMsInfoTask &operator=(const ObMsInfoTask &rv); + TO_STRING_KV(N_KEY, pkey_, + "server", server_, + "cluster_id", cluster_id_, + "log_type", log_type_, + "ms_log_id", ms_log_id_, + "mc_timestamp", mc_timestamp_, + "replica_num", replica_num_, + "prev_member_list", prev_member_list_, + "curr_member_list", curr_member_list_, + "ms_proposal_id", ms_proposal_id_, + "gen_ts", gen_ts_); private: common::ObPartitionKey pkey_; common::ObAddr server_; @@ -123,12 +134,14 @@ private: common::ObMemberList prev_member_list_; common::ObMemberList curr_member_list_; common::ObProposalID ms_proposal_id_; + int64_t gen_ts_; }; class ObSlogWriterQueueThread : public lib::TGTaskHandler { public: static const int64_t QUEUE_THREAD_NUM = 4; static const int64_t MINI_MODE_QUEUE_THREAD_NUM = 2; + static const int64_t SLOG_FLUSH_TASK_TIMEOUT_THRESHOLD = clog::CLOG_LEADER_RECONFIRM_SYNC_TIMEOUT; ObSlogWriterQueueThread(); virtual ~ObSlogWriterQueueThread(); diff --git a/src/storage/transaction/ob_time_wheel.h b/src/storage/transaction/ob_time_wheel.h index 82a418107b..e14cf8c35b 100644 --- a/src/storage/transaction/ob_time_wheel.h +++ b/src/storage/transaction/ob_time_wheel.h @@ -181,11 +181,11 @@ public: } public: - int schedule(ObTimeWheelTask* task, const int64_t delay); - int cancel(ObTimeWheelTask* task); - -private: + int schedule(ObTimeWheelTask *task, const int64_t delay); + int cancel(ObTimeWheelTask *task); +public: static const int64_t MAX_THREAD_NUM = 64; +private: static const int64_t MAX_TIMER_NAME_LEN = 16; private: diff --git a/unittest/clog/clog_mock_container/mock_log_membership_mgr.h b/unittest/clog/clog_mock_container/mock_log_membership_mgr.h index 2fef5e2d29..dc666c0bf9 100644 --- a/unittest/clog/clog_mock_container/mock_log_membership_mgr.h +++ b/unittest/clog/clog_mock_container/mock_log_membership_mgr.h @@ -91,6 +91,10 @@ public: { return 3; } + bool is_single_member_mode() const + { + return false; + } common::ObReplicaType get_replica_type() const { return REPLICA_TYPE_MAX; diff --git a/unittest/clog/mock_ob_partition_log_service.h b/unittest/clog/mock_ob_partition_log_service.h index b0ecb7fbce..f575a3da6a 100644 --- a/unittest/clog/mock_ob_partition_log_service.h +++ b/unittest/clog/mock_ob_partition_log_service.h @@ -203,9 +203,11 @@ public: UNUSED(sick_child); return OB_SUCCESS; } - int process_reject_msg(const common::ObAddr& server, const int32_t msg_type, const int64_t timestamp) + int process_reject_msg(const common::ObAddr &server, const int64_t cluster_id, + const int32_t msg_type, const int64_t timestamp) { UNUSED(server); + UNUSED(cluster_id); UNUSED(msg_type); UNUSED(timestamp); return OB_SUCCESS; @@ -477,11 +479,6 @@ public: UNUSED(role); return common::OB_SUCCESS; } - virtual int get_role_for_partition_table(common::ObRole& role) const - { - UNUSED(role); - return common::OB_SUCCESS; - } virtual int set_archive_restore_state(const int16_t archive_restore_state) { UNUSED(archive_restore_state); @@ -512,13 +509,7 @@ public: UNUSED(leader_epoch); return common::OB_SUCCESS; } - virtual int get_role_for_partition_table_unlock(bool& in_changing_leader_windows, int64_t& leader_epoch) const - { - UNUSED(in_changing_leader_windows); - UNUSED(leader_epoch); - return common::OB_SUCCESS; - } - virtual int get_leader_curr_member_list(common::ObMemberList& member_list) const + virtual int get_leader_curr_member_list(common::ObMemberList &member_list) const { UNUSED(member_list); return common::OB_SUCCESS; @@ -1492,6 +1483,15 @@ public: { return true; } + virtual int process_restore_check_req(const common::ObAddr &server, + const int64_t cluster_id, + const ObRestoreCheckType restore_type) + { + UNUSED(server); + UNUSED(cluster_id); + UNUSED(restore_type); + return OB_SUCCESS; + } virtual int check_and_try_leader_revoke(const election::ObElection::RevokeType& revoke_type) { UNUSED(revoke_type); @@ -1501,7 +1501,13 @@ public: { return OB_SUCCESS; } - + virtual int process_query_restore_end_id_resp(const common::ObAddr &server, + const uint64_t last_restore_log_id) override + { + UNUSED(server); + UNUSED(last_restore_log_id); + return OB_SUCCESS; + } private: common::ObPartitionKey p_k_; }; diff --git a/unittest/election/mock_election.h b/unittest/election/mock_election.h index 3aa03e257d..78f4d9d5c1 100644 --- a/unittest/election/mock_election.h +++ b/unittest/election/mock_election.h @@ -79,7 +79,7 @@ public: { return 0; } - virtual int change_leader_to_self_async() override + virtual int change_leader_to_self() override { return 0; } diff --git a/unittest/storage/mock_ob_partition.h b/unittest/storage/mock_ob_partition.h index ffee155c9b..bc8e4cabba 100644 --- a/unittest/storage/mock_ob_partition.h +++ b/unittest/storage/mock_ob_partition.h @@ -236,12 +236,7 @@ public: UNUSED(role); return OB_NOT_SUPPORTED; } - int get_role_for_partition_table(common::ObRole& role) const - { - UNUSED(role); - return OB_NOT_SUPPORTED; - } - int get_leader_curr_member_list(common::ObMemberList& member_list) const + int get_leader_curr_member_list(common::ObMemberList &member_list) const { UNUSED(member_list); return OB_NOT_SUPPORTED; diff --git a/unittest/storage/mock_ob_partition_service.h b/unittest/storage/mock_ob_partition_service.h index edde4984a7..00d8e87cdd 100644 --- a/unittest/storage/mock_ob_partition_service.h +++ b/unittest/storage/mock_ob_partition_service.h @@ -177,8 +177,6 @@ public: int(const int64_t cmd_type, const ObPartitionKey &pkey)); MOCK_CONST_METHOD2(get_role, int(const common::ObPartitionKey &pkey, common::ObRole &role)); - MOCK_CONST_METHOD2(get_role_for_partition_table, - int(const common::ObPartitionKey &pkey, common::ObRole &role)); MOCK_CONST_METHOD2(get_leader_curr_member_list, int(const common::ObPartitionKey &pkey, common::ObMemberList &member_list)); MOCK_METHOD2(change_leader, diff --git a/unittest/storage/mockcontainer/mock_ob_partition.h b/unittest/storage/mockcontainer/mock_ob_partition.h index 8c9d40173b..eb4152c081 100644 --- a/unittest/storage/mockcontainer/mock_ob_partition.h +++ b/unittest/storage/mockcontainer/mock_ob_partition.h @@ -92,7 +92,6 @@ public: // leader or follower MOCK_CONST_METHOD1(get_role, int(common::ObRole& role)); - MOCK_CONST_METHOD1(get_role_for_partition_table, int(common::ObRole& role)); MOCK_CONST_METHOD1(get_role_unsafe, int(common::ObRole& role)); MOCK_CONST_METHOD1(get_leader_curr_member_list, int(common::ObMemberList& member_list)); MOCK_CONST_METHOD1(get_leader, int(common::ObAddr& addr)); diff --git a/unittest/storage/mockcontainer/mock_ob_partition_service.h b/unittest/storage/mockcontainer/mock_ob_partition_service.h index a331819c63..2e259f493d 100644 --- a/unittest/storage/mockcontainer/mock_ob_partition_service.h +++ b/unittest/storage/mockcontainer/mock_ob_partition_service.h @@ -256,7 +256,6 @@ public: MOCK_METHOD1(submit_freeze_log_finished, int(const ObPartitionKey& pkey)); MOCK_METHOD1(is_freeze_replay_finished, bool(const ObPartitionKey& pkey)); MOCK_CONST_METHOD2(get_role, int(const common::ObPartitionKey& pkey, common::ObRole& role)); - MOCK_CONST_METHOD2(get_role_for_partition_table, int(const common::ObPartitionKey& pkey, common::ObRole& role)); MOCK_CONST_METHOD2( get_leader_curr_member_list, int(const common::ObPartitionKey& pkey, common::ObMemberList& member_list)); MOCK_CONST_METHOD7(get_curr_leader_and_memberlist,