diff --git a/mittest/logservice/test_ob_simple_log_arb.cpp b/mittest/logservice/test_ob_simple_log_arb.cpp index dbe1f11536..2a382eee31 100644 --- a/mittest/logservice/test_ob_simple_log_arb.cpp +++ b/mittest/logservice/test_ob_simple_log_arb.cpp @@ -450,8 +450,11 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_defensive) palf_list[another_f_idx]->get_palf_handle_impl()->set_location_cache_cb(&loc_cb); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id)); sleep(2); + const int64_t added_member_idx = 3; + const common::ObMember added_member = ObMember(palf_list[added_member_idx]->palf_handle_impl_->self_, 1); - LogConfigChangeArgs args(ObMember(palf_list[3]->palf_handle_impl_->self_, 1), 3, ADD_MEMBER); + // add a member, do not allow to append logs until config log reaches majority + LogConfigChangeArgs args(added_member, 3, ADD_MEMBER); const int64_t proposal_id = leader.palf_handle_impl_->state_mgr_.get_proposal_id(); const int64_t election_epoch = leader.palf_handle_impl_->state_mgr_.get_leader_epoch(); LogConfigVersion config_version; @@ -466,6 +469,21 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_defensive) ::ob_usleep(10 * 1000); } } + + // flashback one follower + LogEntryHeader header_origin; + SCN base_scn; + base_scn.set_base(); + SCN flashback_scn; + palf::AccessMode unused_access_mode; + int64_t mode_version; + EXPECT_EQ(OB_SUCCESS, get_middle_scn(50, leader, flashback_scn, header_origin)); + switch_append_to_flashback(leader, mode_version); + EXPECT_EQ(OB_SUCCESS, palf_list[another_f_idx]->palf_handle_impl_->flashback(mode_version, flashback_scn, CONFIG_CHANGE_TIMEOUT)); + + // remove another follower + EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->remove_member(added_member, 2, CONFIG_CHANGE_TIMEOUT)); + revert_cluster_palf_handle_guard(palf_list); leader.reset(); delete_paxos_group(id); diff --git a/src/logservice/ob_log_service.cpp b/src/logservice/ob_log_service.cpp index af27ff8a52..ede689b4ce 100644 --- a/src/logservice/ob_log_service.cpp +++ b/src/logservice/ob_log_service.cpp @@ -443,6 +443,9 @@ int ObLogService::update_replayable_point(const SCN &replayable_point) CLOG_LOG(WARN, "log_service is not inited", K(ret)); } else if (OB_FAIL(replay_service_.update_replayable_point(replayable_point))) { CLOG_LOG(WARN, "update_replayable_point failed", K(ret), K(replayable_point)); + // should be removed in version 4.2.0.0 + } else if (OB_FAIL(palf_env_->update_replayable_point(replayable_point))) { + CLOG_LOG(WARN, "update_replayable_point failed", K(replayable_point)); } return ret; } diff --git a/src/logservice/palf/fetch_log_engine.cpp b/src/logservice/palf/fetch_log_engine.cpp index f76fb4fc5b..281f29a7a4 100644 --- a/src/logservice/palf/fetch_log_engine.cpp +++ b/src/logservice/palf/fetch_log_engine.cpp @@ -78,7 +78,8 @@ FetchLogEngine::FetchLogEngine() : tg_id_(-1), is_inited_(false), palf_env_impl_(NULL), - allocator_(NULL) + allocator_(NULL), + replayable_point_() {} @@ -221,7 +222,8 @@ void FetchLogEngine::handle(void *task) fetch_log_task->get_start_lsn(), fetch_log_task->get_log_size(), fetch_log_task->get_log_count(), - fetch_log_task->get_accepted_mode_meta()))) { + fetch_log_task->get_accepted_mode_meta(), + replayable_point_.atomic_load()))) { PALF_LOG(WARN, "fetch_log_from_storage failed", K(ret), K(palf_id), KPC(fetch_log_task)); } else { // do nothing @@ -279,5 +281,16 @@ bool FetchLogEngine::is_task_queue_timeout_(FetchLogTask *task) const return bool_ret; } +int FetchLogEngine::update_replayable_point(const share::SCN &replayable_scn) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else { + replayable_point_.atomic_store(replayable_scn); + } + return ret; +} + } // namespace palf } // namespace oceanbase diff --git a/src/logservice/palf/fetch_log_engine.h b/src/logservice/palf/fetch_log_engine.h index 6845cd9647..f9286f653b 100644 --- a/src/logservice/palf/fetch_log_engine.h +++ b/src/logservice/palf/fetch_log_engine.h @@ -95,6 +95,7 @@ public: void handle_drop(void *task); FetchLogTask *alloc_fetch_log_task(); void free_fetch_log_task(FetchLogTask *task); + int update_replayable_point(const share::SCN &replayable_scn); private: bool is_task_queue_timeout_(FetchLogTask *fetch_log_task) const; private: @@ -102,6 +103,7 @@ private: bool is_inited_; IPalfEnvImpl *palf_env_impl_; common::ObILogAllocator *allocator_; + share::SCN replayable_point_; DISALLOW_COPY_AND_ASSIGN(FetchLogEngine); }; } // namespace logservice diff --git a/src/logservice/palf/log_config_mgr.cpp b/src/logservice/palf/log_config_mgr.cpp index a95f3e32be..77c68c2984 100644 --- a/src/logservice/palf/log_config_mgr.cpp +++ b/src/logservice/palf/log_config_mgr.cpp @@ -1883,6 +1883,7 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args, added_member_has_new_version = is_add_member_list(args.type_)? false: true; (void) sw_->get_committed_end_lsn(first_leader_committed_end_lsn); + const bool need_skip_log_barrier = mode_mgr_->need_skip_log_barrier(); if (new_member_list.get_member_number() == 0) { ret = OB_INVALID_ARGUMENT; } else if (new_member_list.get_member_number() == 1) { @@ -1891,6 +1892,10 @@ int LogConfigMgr::check_follower_sync_status_(const LogConfigChangeArgs &args, conn_timeout_us, first_committed_end_lsn, added_member_has_new_version))) { PALF_LOG(WARN, "sync_get_committed_end_lsn failed", K(ret), K_(palf_id), K_(self), K(new_member_list), K(new_replica_num), K(added_member_has_new_version)); + } else if (need_skip_log_barrier) { + ret = OB_SUCCESS; + PALF_LOG(INFO, "PALF is in FLASHBACK mode, skip log barrier", K(ret), K_(palf_id), K_(self), \ + "accepted_mode_meta", mode_mgr_->get_accepted_mode_meta()); } else if (is_check_log_barrier) { // check log barrier during log appending are stopped LSN prev_log_lsn; diff --git a/src/logservice/palf/log_req.cpp b/src/logservice/palf/log_req.cpp index f755b69be0..610ececba4 100644 --- a/src/logservice/palf/log_req.cpp +++ b/src/logservice/palf/log_req.cpp @@ -738,12 +738,12 @@ OB_SERIALIZE_MEMBER(LogGetStatReq, get_type_); // ================= LogGetStatResp start ================ LogGetStatResp::LogGetStatResp() - : max_scn_() + : max_scn_(), end_lsn_() { } -LogGetStatResp::LogGetStatResp(const share::SCN &max_scn) - : max_scn_(max_scn) +LogGetStatResp::LogGetStatResp(const share::SCN &max_scn, const LSN &end_lsn) + : max_scn_(max_scn), end_lsn_(end_lsn) { } @@ -760,9 +760,10 @@ bool LogGetStatResp::is_valid() const void LogGetStatResp::reset() { max_scn_.reset(); + end_lsn_.reset(); } -OB_SERIALIZE_MEMBER(LogGetStatResp, max_scn_); +OB_SERIALIZE_MEMBER(LogGetStatResp, max_scn_, end_lsn_); // ================= LogGetStatResp end ================ } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/log_req.h b/src/logservice/palf/log_req.h index 18c2682918..9f3f699e57 100644 --- a/src/logservice/palf/log_req.h +++ b/src/logservice/palf/log_req.h @@ -383,12 +383,13 @@ struct LogGetStatResp { OB_UNIS_VERSION(1); public: LogGetStatResp(); - LogGetStatResp(const share::SCN &max_scn); + LogGetStatResp(const share::SCN &max_scn, const LSN &end_lsn); ~LogGetStatResp(); bool is_valid() const; void reset(); - TO_STRING_KV(K_(max_scn)); + TO_STRING_KV(K_(max_scn), K_(end_lsn)); share::SCN max_scn_; + LSN end_lsn_; }; } // end namespace palf diff --git a/src/logservice/palf/log_request_handler.cpp b/src/logservice/palf/log_request_handler.cpp index 6b40ca4d7f..401892af3d 100644 --- a/src/logservice/palf/log_request_handler.cpp +++ b/src/logservice/palf/log_request_handler.cpp @@ -486,6 +486,7 @@ int LogRequestHandler::handle_sync_request( CLOG_LOG(INFO, "i am not leader", K(ret), K(palf_id), K(req), K(role), K(is_pending_state)); } else { resp.max_scn_ = guard.get_palf_handle_impl()->get_max_scn(); + resp.end_lsn_ = guard.get_palf_handle_impl()->get_end_lsn(); CLOG_LOG(TRACE, "get_leader_max_scn success", K(ret), K(palf_id), K(server), K(req), K(resp)); } } diff --git a/src/logservice/palf/palf_env.cpp b/src/logservice/palf/palf_env.cpp index d65ea47241..4099ebb21a 100644 --- a/src/logservice/palf/palf_env.cpp +++ b/src/logservice/palf/palf_env.cpp @@ -180,5 +180,11 @@ int PalfEnv::get_io_start_time(int64_t &last_working_time) return palf_env_impl_.get_io_start_time(last_working_time); } +// should be removed in version 4.2.0.0 +int PalfEnv::update_replayable_point(const SCN &replayable_scn) +{ + return palf_env_impl_.update_replayable_point(replayable_scn); +} + } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/palf_env.h b/src/logservice/palf/palf_env.h index 39b66a40bf..4c3d8cc69f 100644 --- a/src/logservice/palf/palf_env.h +++ b/src/logservice/palf/palf_env.h @@ -105,6 +105,8 @@ public: int for_each(const ObFunction &func); // just for LogRpc palf::IPalfEnvImpl *get_palf_env_impl() { return &palf_env_impl_; } + // should be removed in version 4.2.0.0 + int update_replayable_point(const SCN &replayable_scn); private: int start_(); void stop_(); diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index ff833a3f9b..fd0a2d307e 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -1149,6 +1149,16 @@ int64_t PalfEnvImpl::get_tenant_id() { return tenant_id_; } +int PalfEnvImpl::update_replayable_point(const SCN &replayable_scn) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(fetch_log_engine_.update_replayable_point(replayable_scn))) { + PALF_LOG(WARN, "update_replayable_point failed", KPC(this), K(replayable_scn)); + } + return ret; +} } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/palf_env_impl.h b/src/logservice/palf/palf_env_impl.h index 7275aa8118..e450896018 100644 --- a/src/logservice/palf/palf_env_impl.h +++ b/src/logservice/palf/palf_env_impl.h @@ -162,6 +162,8 @@ public: virtual bool check_disk_space_enough() = 0; virtual int get_io_start_time(int64_t &last_working_time) = 0; virtual int64_t get_tenant_id() = 0; + // should be removed in version 4.2.0.0 + virtual int update_replayable_point(const SCN &replayable_scn) = 0; VIRTUAL_TO_STRING_KV("IPalfEnvImpl", "Dummy"); }; @@ -225,6 +227,7 @@ public: common::ObILogAllocator* get_log_allocator() override final; int get_io_start_time(int64_t &last_working_time) override final; int64_t get_tenant_id() override final; + int update_replayable_point(const SCN &replayable_scn) override final; INHERIT_TO_STRING_KV("IPalfEnvImpl", IPalfEnvImpl, K_(self), K_(log_dir), K_(disk_options_wrapper), KPC(log_alloc_mgr_)); // =================== disk space management ================== diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index aef55779af..af34c745ed 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -758,6 +758,12 @@ int PalfHandleImpl::change_access_mode(const int64_t proposal_id, ret = OB_EAGAIN; PALF_LOG(WARN, "another change_access_mode is running, try again", K(ret), K_(palf_id), K_(self), K(proposal_id),K(access_mode), K(ref_scn)); + } else if (OB_FAIL(config_change_lock_.trylock())) { + // forbid to change access mode when reconfiguration is doing + mode_change_lock_.unlock(); + ret = OB_EAGAIN; + PALF_LOG(WARN, "reconfiguration is running, try again", K(ret), K_(palf_id), + K_(self), K(proposal_id), K(access_mode), K(ref_scn)); } else { PALF_EVENT("start change_access_mode", palf_id_, K(ret), KPC(this), K(proposal_id), K(access_mode), K(ref_scn), K_(sw)); @@ -800,6 +806,7 @@ int PalfHandleImpl::change_access_mode(const int64_t proposal_id, ob_usleep(1000); } } + config_change_lock_.unlock(); mode_change_lock_.unlock(); } return ret; @@ -2766,13 +2773,14 @@ int PalfHandleImpl::fetch_log_from_storage(const common::ObAddr &server, const LSN &fetch_start_lsn, const int64_t fetch_log_size, const int64_t fetch_log_count, - const int64_t accepted_mode_pid) + const int64_t accepted_mode_pid, + const SCN &replayable_point) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else if (OB_FAIL(fetch_log_from_storage_(server, fetch_type, msg_proposal_id, prev_lsn, - fetch_start_lsn, fetch_log_size, fetch_log_count))) { + fetch_start_lsn, fetch_log_size, fetch_log_count, replayable_point))) { PALF_LOG(WARN, "fetch_log_from_storage_ failed", K(ret), K_(palf_id), K_(self), K(server), K(fetch_type), K(msg_proposal_id), K(prev_lsn), K(fetch_start_lsn), K(fetch_log_size), K(fetch_log_count), K(accepted_mode_pid)); @@ -2817,7 +2825,8 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, const LSN &prev_lsn, const LSN &fetch_start_lsn, const int64_t fetch_log_size, - const int64_t fetch_log_count) + const int64_t fetch_log_count, + const SCN &replayable_point) { int ret = OB_SUCCESS; PalfGroupBufferIterator iterator; @@ -2857,6 +2866,10 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, LogInfo prev_log_info; const bool no_need_fetch_log = (prev_lsn >= max_flushed_end_lsn) || (AccessMode::FLASHBACK == access_mode); + common::ObMemberList member_list; + int64_t replica_num = 0; + (void) config_mgr_.get_curr_member_list(member_list, replica_num); + const bool is_dest_in_memberlist = (member_list.contains(server)); if (no_need_fetch_log) { PALF_LOG(INFO, "no need fetch_log_from_storage", K(ret), KPC(this), K(server), K(fetch_start_lsn), K(prev_lsn), K(max_flushed_end_lsn), K(access_mode)); @@ -2864,9 +2877,14 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, && OB_FAIL(get_prev_log_info_(fetch_start_lsn, prev_log_info))) { PALF_LOG(WARN, "get_prev_log_info_ failed", K(ret), K_(palf_id), K(prev_lsn), K(fetch_start_lsn)); } else if (true == need_check_prev_log && prev_log_info.lsn_ != prev_lsn) { - ret = OB_ERR_UNEXPECTED; - PALF_LOG(ERROR, "the LSN between each replica is not same, unexpected error!!!", K(ret), - K_(palf_id), K(fetch_start_lsn), K(prev_log_info)); + if (is_dest_in_memberlist) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "the LSN between each replica is not same, unexpected error!!!", K(ret), + K_(palf_id), K(fetch_start_lsn), K(prev_log_info)); + } else { + PALF_LOG(INFO, "the LSN between leader and non paxos member is not same, do not fetch log", + K_(palf_id), K(fetch_start_lsn), K(prev_log_info)); + } } else if (OB_FAIL(iterator.init(fetch_start_lsn, get_file_end_lsn, log_engine_.get_log_storage()))) { PALF_LOG(WARN, "PalfGroupBufferIterator init failed", K(ret), K_(palf_id)); } else { @@ -2892,6 +2910,13 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server, is_reach_end = true; PALF_LOG(INFO, "reach committed_end_lsn(not leader active replica), end fetch", K(ret), K_(palf_id), K(server), K(msg_proposal_id), K(curr_lsn), K(curr_log_end_lsn), K(committed_end_lsn)); + } else if (false == is_dest_in_memberlist && + curr_group_entry.get_header().is_raw_write() && + replayable_point.is_valid() && + curr_group_entry.get_scn() > replayable_point) { + is_reach_end = true; + PALF_LOG(INFO, "non paxos member could not fetch logs which scn is bigger than replayable_point, end fetch", + K_(palf_id), K(server), K(msg_proposal_id), K(curr_lsn), K(replayable_point)); } else if (OB_FAIL(submit_fetch_log_resp_(server, msg_proposal_id, prev_log_proposal_id, \ each_round_prev_lsn, curr_lsn, curr_group_entry))) { PALF_LOG(WARN, "submit_fetch_log_resp_ failed", K(ret), K_(palf_id), K(server), @@ -4007,20 +4032,26 @@ void PalfHandleImpl::is_in_sync_(bool &is_log_sync, bool &is_use_cache) { int ret = OB_SUCCESS; SCN leader_max_scn; + LSN leader_end_lsn; is_log_sync = false; is_use_cache = false; - const share::SCN local_max_scn = sw_.get_max_scn(); + share::SCN local_max_scn = sw_.get_max_scn(); + LSN local_end_lsn; if (state_mgr_.get_leader() == self_) { is_log_sync = true; } else if (false == local_max_scn.is_valid()) { } else if (palf_reach_time_interval(PALF_LOG_SYNC_DELAY_THRESHOLD_US, last_check_sync_time_us_)) { // if reachs time interval, get max_scn of leader with sync RPC - if (OB_FAIL(get_leader_max_scn_(leader_max_scn))) { + if (OB_FAIL(get_leader_max_scn_(leader_max_scn, leader_end_lsn))) { CLOG_LOG(WARN, "get_palf_max_scn failed", K(ret), K_(self), K_(palf_id)); last_check_sync_time_us_ = OB_INVALID_TIMESTAMP; - } else if (leader_max_scn.is_valid()) { - is_log_sync = (leader_max_scn.convert_to_ts() - local_max_scn.convert_to_ts() <= PALF_LOG_SYNC_DELAY_THRESHOLD_US); + } else if (leader_max_scn.is_valid() && leader_end_lsn.is_valid()) { + local_max_scn = sw_.get_max_scn(); + sw_.get_committed_end_lsn(local_end_lsn); + const bool is_scn_sync = (leader_max_scn.convert_to_ts() - local_max_scn.convert_to_ts() <= PALF_LOG_SYNC_DELAY_THRESHOLD_US); + const bool is_log_size_sync = (leader_end_lsn - local_end_lsn) < 2 * PALF_BLOCK_SIZE; + is_log_sync = is_scn_sync || is_log_size_sync; } } else { is_use_cache = true; @@ -4034,7 +4065,7 @@ void PalfHandleImpl::is_in_sync_(bool &is_log_sync, bool &is_use_cache) } } -int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn) +int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn, LSN &end_lsn) { int ret = OB_SUCCESS; common::ObAddr leader; @@ -4043,6 +4074,7 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn) bool need_renew_leader = false; max_scn.reset(); + end_lsn.reset(); // use lc_cb_ in here without rlock is safe, because we don't reset lc_cb_ // until this PalfHandleImpl is destoryed. if (OB_ISNULL(lc_cb_)) { @@ -4057,6 +4089,7 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn) need_renew_leader = true; } else { max_scn = resp.max_scn_; + end_lsn = resp.end_lsn_; } if (need_renew_leader && palf_reach_time_interval(500 * 1000, last_renew_loc_time_us_)) { (void) lc_cb_->nonblock_renew_leader(palf_id_); diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index 0fc1c54b0f..6381a43a76 100644 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -490,7 +490,8 @@ public: const LSN &log_offset, const int64_t fetch_log_size, const int64_t fetch_log_count, - const int64_t accepted_mode_pid) = 0; + const int64_t accepted_mode_pid, + const SCN &replayable_point) = 0; virtual int receive_config_log(const common::ObAddr &server, const int64_t &msg_proposal_id, const int64_t &prev_log_proposal_id, @@ -800,7 +801,8 @@ public: const LSN &fetch_start_lsn, const int64_t fetch_log_size, const int64_t fetch_log_count, - const int64_t accepted_mode_pid) override final; + const int64_t accepted_mode_pid, + const SCN &replayable_point) override final; int receive_config_log(const common::ObAddr &server, const int64_t &msg_proposal_id, const int64_t &prev_log_proposal_id, @@ -884,7 +886,8 @@ private: const LSN &prev_lsn, const LSN &fetch_start_lsn, const int64_t fetch_log_size, - const int64_t fetch_log_count); + const int64_t fetch_log_count, + const SCN &replayable_point); int submit_fetch_log_resp_(const common::ObAddr &server, const int64_t &msg_proposal_id, const int64_t &prev_log_proposal_id, @@ -938,7 +941,7 @@ private: // ================================================================= int leader_sync_mode_meta_to_arb_member_(); void is_in_sync_(bool &is_log_sync, bool &is_use_cache); - int get_leader_max_scn_(SCN &max_scn); + int get_leader_max_scn_(SCN &max_scn, LSN &end_lsn); private: class ElectionMsgSender : public election::ElectionMsgSender {