diff --git a/mittest/simple_server/test_ls_status_operator.cpp b/mittest/simple_server/test_ls_status_operator.cpp index 904d8e287a..53564efa09 100644 --- a/mittest/simple_server/test_ls_status_operator.cpp +++ b/mittest/simple_server/test_ls_status_operator.cpp @@ -129,7 +129,12 @@ TEST_F(TestLSStatusOperator, SQLProxy) TEST_F(TestLSStatusOperator, LSLifeAgent) { int ret = OB_SUCCESS; - tenant_id_ = 1001; + ASSERT_EQ(OB_SUCCESS, create_tenant()); + ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id_)); + ObSqlString sql; + //common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + tenant_id_ = 1002; ObLSLifeAgentManager ls_life(get_curr_simple_server().get_observer().get_mysql_proxy()); ObLSStatusOperator status_operator; ObLSStatusInfoArray ls_array; @@ -152,7 +157,7 @@ TEST_F(TestLSStatusOperator, LSLifeAgent) create_scn.set_min(); //创建新日志流 - ObLSID ls_id(1001); + ObLSID ls_id(1002); ret = info.init(tenant_id_, ls_id, 0, share::OB_LS_CREATING, 0, primary_zone, flag); ASSERT_EQ(OB_SUCCESS, ret); ret = ls_life.create_new_ls(info, create_scn, zone_priority.str(), share::NORMAL_SWITCHOVER_STATUS); @@ -241,17 +246,21 @@ TEST_F(TestLSStatusOperator, LSLifeAgent) ASSERT_EQ(OB_NEED_RETRY, ret); ObLSRecoveryStat recovery_stat; ObLSRecoveryStatOperator recovery_op; + palf::LogConfigVersion config_version; + ret = config_version.generate(1,2); + ASSERT_EQ(OB_SUCCESS, ret); scn.convert_for_logservice(99); - ret = recovery_stat.init_only_recovery_stat(tenant_id_, ls_id, scn, scn); + ret = recovery_stat.init_only_recovery_stat(tenant_id_, ls_id, scn, scn, config_version); ASSERT_EQ(OB_SUCCESS, ret); ret = recovery_op.update_ls_recovery_stat(recovery_stat, false, get_curr_simple_server().get_observer().get_mysql_proxy()); ASSERT_EQ(OB_SUCCESS, ret); ret = ls_life.drop_ls(tenant_id_, ls_id, share::NORMAL_SWITCHOVER_STATUS); ASSERT_EQ(OB_NEED_RETRY, ret); - scn.convert_for_logservice(98); + scn.convert_for_logservice(100); share::SCN recovery_scn; - recovery_scn.convert_for_logservice(100); - ret = recovery_stat.init_only_recovery_stat(tenant_id_, ls_id, scn, recovery_scn); + recovery_scn.convert_for_logservice(98); + //readable scn 大于sync_scn + ret = recovery_stat.init_only_recovery_stat(tenant_id_, ls_id, scn, recovery_scn,config_version); ASSERT_EQ(OB_SUCCESS, ret); //recovery_stat的取最大值 ret = recovery_op.update_ls_recovery_stat(recovery_stat, false, get_curr_simple_server().get_observer().get_mysql_proxy()); @@ -259,8 +268,8 @@ TEST_F(TestLSStatusOperator, LSLifeAgent) ret = ls_life.drop_ls(tenant_id_, ls_id, share::NORMAL_SWITCHOVER_STATUS); ASSERT_EQ(OB_NEED_RETRY, ret); - scn.convert_for_logservice(100); - ret = recovery_stat.init_only_recovery_stat(tenant_id_, ls_id, scn, recovery_scn); + recovery_scn.convert_for_logservice(100); + ret = recovery_stat.init_only_recovery_stat(tenant_id_, ls_id, scn, recovery_scn, config_version); ASSERT_EQ(OB_SUCCESS, ret); ret = recovery_op.update_ls_recovery_stat(recovery_stat, false, get_curr_simple_server().get_observer().get_mysql_proxy()); ASSERT_EQ(OB_SUCCESS, ret); diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 113b3ba1ab..d0c882f72a 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -76,6 +76,7 @@ #include "observer/report/ob_tenant_meta_checker.h"//ObTenantMetaChecker #include "rootserver/backup/ob_backup_task_scheduler.h" // ObBackupTaskScheduler #include "rootserver/backup/ob_backup_schedule_task.h" // ObBackupScheduleTask +#include "rootserver/ob_ls_recovery_stat_handler.h"//get_all_ls_replica_readbable_scn #ifdef OB_BUILD_TDE_SECURITY #include "share/ob_master_key_getter.h" #endif @@ -3024,7 +3025,6 @@ int ObService::get_ls_replayed_scn( } else if (arg.get_tenant_id() != MTL_ID() && OB_FAIL(guard.switch_to(arg.get_tenant_id()))) { LOG_WARN("switch tenant failed", KR(ret), K(arg)); } - if (OB_SUCC(ret)) { ObLSService *ls_svr = MTL(ObLSService*); ObLSHandle ls_handle; @@ -3039,12 +3039,17 @@ int ObService::get_ls_replayed_scn( LOG_WARN("log stream is null", KR(ret), K(arg), K(ls_handle)); } else if (OB_FAIL(ls->get_max_decided_scn(cur_readable_scn))) { LOG_WARN("failed to get_max_decided_scn", KR(ret), K(arg), KPC(ls)); - } else if (OB_FAIL(result.init(arg.get_tenant_id(), arg.get_ls_id(), cur_readable_scn))) { - LOG_WARN("failed to init res", KR(ret), K(arg), K(cur_readable_scn)); - } else { - LOG_INFO("finish get_ls_replayed_scn", KR(ret), K(cur_readable_scn), K(arg), K(result)); + } else if (arg.is_all_replica()) { + if (OB_FAIL(ls->get_all_replica_min_readable_scn(cur_readable_scn))) { + LOG_WARN("failed to get all replica readable scn", KR(ret)); + } } + if (FAILEDx(result.init(arg.get_tenant_id(), arg.get_ls_id(), cur_readable_scn, GCTX.self_addr()))) { + LOG_WARN("failed to init res", KR(ret), K(arg), K(cur_readable_scn)); + } + LOG_INFO("finish get_ls_replayed_scn", KR(ret), K(cur_readable_scn), K(arg), K(result)); } + return ret; } diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 156934c21f..8a1b593875 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -273,7 +273,6 @@ private: share::ObTabletReplica &tablet_replica, share::ObTabletReplicaChecksumItem &tablet_checksum, const bool need_checksum); - int register_self(); int check_server_empty(bool &server_empty); diff --git a/src/rootserver/ob_ls_recovery_reportor.cpp b/src/rootserver/ob_ls_recovery_reportor.cpp index 000f0365cc..f50e50c6da 100755 --- a/src/rootserver/ob_ls_recovery_reportor.cpp +++ b/src/rootserver/ob_ls_recovery_reportor.cpp @@ -263,6 +263,14 @@ int ObLSRecoveryReportor::update_ls_recovery_stat_() } else if (OB_TMP_FAIL(ls->update_ls_replayable_point(replayable_scn))) { LOG_WARN("failed to update_ls_replayable_point", KR(tmp_ret), KPC(ls), K(replayable_scn)); } + if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_0_0) { + } else if (OB_TMP_FAIL(ls->gather_replica_readable_scn())) { + if (OB_NOT_MASTER == tmp_ret) { + tmp_ret = OB_SUCCESS; + } else { + LOG_WARN("failed to gather replica readable scn", KR(tmp_ret)); + } + } if (ls->is_sys_ls() && !MTL_TENANT_ROLE_CACHE_IS_PRIMARY()) { // nothing todo diff --git a/src/rootserver/ob_ls_recovery_stat_handler.cpp b/src/rootserver/ob_ls_recovery_stat_handler.cpp index 39269f14e6..4683d41cea 100755 --- a/src/rootserver/ob_ls_recovery_stat_handler.cpp +++ b/src/rootserver/ob_ls_recovery_stat_handler.cpp @@ -25,6 +25,19 @@ namespace oceanbase { namespace rootserver { + +int ObLSReplicaReadableSCN::init(const common::ObAddr &server, const share::SCN &readable_scn) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!server.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("server is invalid", KR(ret), K(server)); + } else { + server_ = server; + readable_scn_ = readable_scn; + } + return ret; +} int ObLSRecoveryStatHandler::init(const uint64_t tenant_id, ObLS *ls) { int ret = OB_SUCCESS; @@ -39,6 +52,8 @@ int ObLSRecoveryStatHandler::init(const uint64_t tenant_id, ObLS *ls) ls_ = ls; tenant_id_ = tenant_id; is_inited_ = true; + replicas_scn_.set_tenant_id(tenant_id); + replicas_scn_.set_label("LSReadableSCN"); LOG_INFO("ObLSRecoveryStatHandler init success", K(this)); } @@ -50,6 +65,12 @@ void ObLSRecoveryStatHandler::reset() is_inited_ = false; ls_ = NULL; tenant_id_ = OB_INVALID_TENANT_ID; + SpinWLockGuard guard(lock_); + readable_scn_in_inner_.reset(); + config_version_in_inner_.reset(); + extra_server_.reset(); + config_version_.reset(); + replicas_scn_.reset(); } int ObLSRecoveryStatHandler::check_inner_stat_() @@ -90,6 +111,63 @@ int ObLSRecoveryStatHandler::get_ls_replica_readable_scn(share::SCN &readable_sc return ret; } + +int ObLSRecoveryStatHandler::get_all_replica_min_readable_scn(share::SCN &readable_scn) +{ + int ret = OB_SUCCESS; + palf::PalfStat palf_stat_first; + palf::PalfStat palf_stat_second; + readable_scn = SCN::max_scn(); + logservice::ObLogService *ls_svr = MTL(logservice::ObLogService*); + int64_t first_proposal_id = 0; + ObRole first_role; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret), K_(is_inited)); + } else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) { + LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); + } else if (OB_FAIL(ls_svr->get_palf_role(ls_->get_ls_id(), first_role, first_proposal_id))) { + LOG_WARN("failed to get first role", KR(ret), K(ls_->get_ls_id()), KP(ls_svr), KPC_(ls)); + } else if (!is_strong_leader(first_role)) { + ret = OB_NOT_MASTER; + LOG_WARN("not master, need retry", KR(ret), K(ls_->get_ls_id())); + } else { + ObMemberList &paxos_member_list = palf_stat_first.paxos_member_list_; + SpinRLockGuard guard(lock_); + if (palf_stat_first.config_version_ != config_version_) { + ret = OB_NEED_RETRY; + LOG_WARN("config version not match", KR(ret), K(config_version_), K(palf_stat_first)); + } else if (replicas_scn_.count() < paxos_member_list.get_member_number()) { + ret = OB_NEED_RETRY; + LOG_WARN("not enough replica", KR(ret), K(replicas_scn_), K(paxos_member_list)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < paxos_member_list.get_member_number(); ++i) { + ObAddr member; + int64_t index = 0; + if (OB_FAIL(paxos_member_list.get_server_by_index(i, member))) { + LOG_WARN("failed to get server by index", KR(ret), K(i)); + } + for (; OB_SUCC(ret) && index < replicas_scn_.count(); ++index) { + if (replicas_scn_.at(index).get_server() == member) { + readable_scn = SCN::min(readable_scn, replicas_scn_.at(index).get_readable_scn()); + break; + } + } + if (OB_SUCC(ret) && index >= replicas_scn_.count()) { + ret = OB_NEED_RETRY; + LOG_WARN("replica has no readable scn", KR(ret), K(member), K(replicas_scn_)); + } + } + //TODO maybe need consider readable scn in inner table + } + if (FAILEDx(get_latest_palf_stat_(palf_stat_second))) { + LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); + } else if (palf_stat_first.config_version_ != palf_stat_second.config_version_) { + ret = OB_EAGAIN; + LOG_WARN("config_version changed, try again", KR(ret), K(palf_stat_first), K(palf_stat_second)); + } + return ret; +} + int ObLSRecoveryStatHandler::increase_ls_replica_readable_scn_(SCN &readable_scn) { int ret = OB_SUCCESS; @@ -146,36 +224,48 @@ int ObLSRecoveryStatHandler::get_ls_level_recovery_stat(ObLSRecoveryStat &ls_rec int ret = OB_SUCCESS; share::SCN sync_scn = SCN::min_scn(); share::SCN readable_scn = SCN::min_scn(); - logservice::ObLogService *ls_svr = MTL(logservice::ObLogService*); - common::ObRole role; - int64_t first_proposal_id = palf::INVALID_PROPOSAL_ID; - int64_t second_proposal_id = palf::INVALID_PROPOSAL_ID; ls_recovery_stat.reset(); + palf::PalfStat palf_stat_first; + palf::PalfStat palf_stat_second; if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret), K_(is_inited)); - } else if (OB_ISNULL(ls_svr)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("pointer is null", KR(ret), KP(ls_svr)); - } else if (OB_FAIL(ls_svr->get_palf_role(ls_->get_ls_id(), role, first_proposal_id))) { - LOG_WARN("failed to get first role", KR(ret), K(ls_->get_ls_id()), KPC_(ls)); - } else if (!is_strong_leader(role)) { + } else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) { + LOG_WARN("failed to get lastest palf stat", KR(ret)); + } else if (!is_strong_leader(palf_stat_first.role_)) { ret = OB_NOT_MASTER; - LOG_TRACE("not leader", KR(ret), K(role), KPC_(ls)); + LOG_TRACE("not leader", KR(ret), K(palf_stat_first)); } else if (OB_FAIL(do_get_ls_level_readable_scn_(readable_scn))) { LOG_WARN("failed to do_get_ls_level_readable_scn_", KR(ret), KPC_(ls)); // scn get order: read_scn before replayable_scn before sync_scn } else if (OB_FAIL(ObLSServiceHelper::get_ls_replica_sync_scn(MTL_ID(), ls_->get_ls_id(), sync_scn))) { LOG_WARN("failed to get ls sync scn", KR(ret), "tenant_id", MTL_ID()); } else if (OB_FAIL(ls_recovery_stat.init_only_recovery_stat(tenant_id_, ls_->get_ls_id(), - sync_scn, readable_scn))) { - LOG_WARN("failed to init ls recovery stat", KR(ret), KPC_(ls), K_(tenant_id), K(sync_scn), K(readable_scn)); - } else if (OB_FAIL(ls_svr->get_palf_role(ls_->get_ls_id(), role, second_proposal_id))) { - LOG_WARN("failed to get palf role again", KR(ret), K(role), KPC_(ls)); - } else if (first_proposal_id != second_proposal_id || !is_strong_leader(role)) { + sync_scn, readable_scn, + palf_stat_first.config_version_))) { + LOG_WARN("failed to init ls recovery stat", KR(ret), K_(tenant_id), K(sync_scn), K(readable_scn), + K(palf_stat_first), "ls_id", ls_->get_ls_id()); + } else if (OB_FAIL(get_latest_palf_stat_(palf_stat_second))) { + LOG_WARN("failed to get lastest palf stat", KR(ret)); + } else if (palf_stat_first.config_version_ != palf_stat_second.config_version_ + || !is_strong_leader(palf_stat_second.role_)) { ret = OB_EAGAIN; - LOG_INFO("role changed, try again", KR(ret), K(role), - K(first_proposal_id), K(second_proposal_id), KPC_(ls)); + LOG_INFO("role changed, try again", KR(ret), K(palf_stat_first), K(palf_stat_second)); + } + + return ret; +} + +int ObLSRecoveryStatHandler::set_add_replica_server( + const common::ObAddr &server) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!server.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("server is invalid", KR(ret), K(server)); + } else { + SpinWLockGuard guard(lock_); + extra_server_ = server; } return ret; @@ -184,8 +274,7 @@ int ObLSRecoveryStatHandler::get_ls_level_recovery_stat(ObLSRecoveryStat &ls_rec int ObLSRecoveryStatHandler::do_get_ls_level_readable_scn_(SCN &read_scn) { int ret = OB_SUCCESS; - palf::AccessMode access_mode; - int64_t unused_mode_version = 0; + share::SCN majority_min_readable_scn = SCN::min_scn(); read_scn = SCN::min_scn(); @@ -255,6 +344,52 @@ int ObLSRecoveryStatHandler::construct_new_member_list_( return ret; } +int ObLSRecoveryStatHandler::try_reload_and_fix_config_version_(const palf::LogConfigVersion ¤t_version) +{ + int ret = OB_SUCCESS; + bool need_update = false; + share::SCN readable_scn; + const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_); + uint64_t tenant_data_version = 0; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret)); + } else if (OB_UNLIKELY(!current_version.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("config_version is invalid", KR(ret), K(current_version)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql proxy is null", KR(ret)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(meta_tenant_id, tenant_data_version))) { + LOG_WARN("failed to get min data version", KR(ret), K(tenant_id_), K(meta_tenant_id)); + } else if (tenant_data_version < DATA_VERSION_4_3_0_0) { + need_update = false; + LOG_INFO("not ready to load and update config version", KR(ret), K(tenant_data_version)); + } else { + SpinRLockGuard guard(lock_); + if (current_version != config_version_in_inner_) { + need_update = true; + FLOG_INFO("config version not match, need update", + K(config_version_in_inner_), K(current_version), "ls_id", + ls_->get_ls_id()); + } + } + if (OB_SUCC(ret) && need_update) { + ObLSRecoveryStatOperator op; + ObLSID ls_id = ls_->get_ls_id(); + if (OB_FAIL(op.update_ls_config_version(tenant_id_, ls_id, current_version, + *GCTX.sql_proxy_, readable_scn))) { + LOG_WARN("failed to update ls config version", KR(ret), K(tenant_id_), K(ls_id), K(current_version)); + //set invalid config version + SpinWLockGuard guard(lock_); + config_version_in_inner_.reset(); + } + SpinWLockGuard guard(lock_); + readable_scn_in_inner_ = readable_scn; + config_version_in_inner_ = current_version; + } + return ret; +} + int ObLSRecoveryStatHandler::get_palf_stat_( palf::PalfStat &palf_stat) { @@ -301,6 +436,134 @@ int ObLSRecoveryStatHandler::get_latest_palf_stat_( return ret; } +int ObLSRecoveryStatHandler::gather_replica_readable_scn() +{ + int ret = OB_SUCCESS; + ObArray replicas_scn; + palf::PalfStat palf_stat_first; + palf::PalfStat palf_stat_second; + ObArray addr_list; + + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret), K_(is_inited)); + } else if (OB_FAIL(get_latest_palf_stat_(palf_stat_first))) { + LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); + } else if (!is_strong_leader(palf_stat_first.role_)) { + ret = OB_NOT_MASTER; + LOG_TRACE("not leader", KR(ret), K(palf_stat_first)); + } else if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_0_0) { + ret = OB_NEED_WAIT; + LOG_WARN("not ready to gather replica readable scn", KR(ret)); + } else { + common::ObMember member; + common::ObMemberList &member_list = palf_stat_first.paxos_member_list_; + for (int64_t i = 0; OB_SUCC(ret) && i < member_list.get_member_number(); ++i) { + if (OB_FAIL(member_list.get_member_by_index(i, member))) { + LOG_WARN("failed to get member by index", KR(ret), K(i)); + } else if (OB_FAIL(addr_list.push_back(member.get_server()))) { + LOG_WARN("failed to push back member", KR(ret), K(member)); + } + } + if (OB_SUCC(ret)) { + SpinWLockGuard guard(lock_); + if (extra_server_.is_valid() && OB_FAIL(addr_list.push_back(extra_server_))) { + LOG_WARN("failed to push back member", KR(ret), K(extra_server_)); + } + } + } + if (FAILEDx(do_get_each_replica_readable_scn_(addr_list, replicas_scn))) { + LOG_WARN("failed to get each replica readable", KR(ret), K(addr_list)); + } else if (OB_FAIL(get_palf_stat_(palf_stat_second))) { + LOG_WARN("failed to get palf stat", KR(ret)); + } else if (palf_stat_second.config_version_ != palf_stat_first.config_version_) { + ret = OB_EAGAIN; + LOG_WARN("config version change", KR(ret), K(palf_stat_second), K(palf_stat_first)); + } else { + SpinWLockGuard guard(lock_); + config_version_ = palf_stat_second.config_version_; + replicas_scn_.reset(); + if (OB_FAIL(replicas_scn_.assign(replicas_scn))) { + LOG_WARN("failed to replicas scn", KR(ret), K(replicas_scn)); + } + const int64_t PRINT_INTERVAL = 10 * 1000 * 1000L; + if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) { + LOG_INFO("ls readable scn in memory", KR(ret), K(replicas_scn_)); + } else { + LOG_TRACE("ls readable scn in memory", KR(ret), K(replicas_scn_)); + } + } + if (FAILEDx(try_reload_and_fix_config_version_(palf_stat_second.config_version_))) { + LOG_WARN("failed to try reload and fix config version", KR(ret), K(palf_stat_second)); + } + return ret; +} + +int ObLSRecoveryStatHandler::do_get_each_replica_readable_scn_( + const ObIArray &ob_member_list, + ObArray &replicas_scn) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret), K_(is_inited)); + } else if (OB_UNLIKELY(0 >= ob_member_list.count())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(ob_member_list)); + } else { + obrpc::ObGetLSReplayedScnArg arg; + ObGetLSReplayedScnProxy proxy( + *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_replayed_scn); + ObTimeoutCtx ctx; + int tmp_ret = OB_SUCCESS; + ObArray return_code_array; + ObLSReplicaReadableSCN replica_scn; + if (OB_FAIL(arg.init(tenant_id_, ls_->get_ls_id(), false))) { + LOG_WARN("failed to init arg", KR(ret), K_(tenant_id), KPC_(ls)); + } else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) { + LOG_WARN("fail to get timeout ctx", KR(ret), K(ctx)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < ob_member_list.count(); ++i) { + const ObAddr &member = ob_member_list.at(i); + if (GCTX.self_addr() == member) { + SCN self_readable_scn; + if (OB_FAIL(ls_->get_max_decided_scn(self_readable_scn))) { + LOG_WARN("failed to get max decide scn", KR(ret)); + } else if (OB_FAIL(replica_scn.init(member, self_readable_scn))) { + LOG_WARN("failed to init replica scn", KR(ret), K(member), K(self_readable_scn)); + } else if (OB_FAIL(replicas_scn.push_back(replica_scn))) { + LOG_WARN("failed to push back replica scn", KR(ret), K(replica_scn)); + } + } else if (OB_TMP_FAIL(proxy.call(member, ctx.get_timeout(), tenant_id_, arg))) { + LOG_WARN("failed to send rpc", KR(ret), K(member), K(i), K(ctx), + K_(tenant_id), K(arg), K(ob_member_list)); + } + } + if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) { + LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret)); + ret = OB_SUCC(ret) ? tmp_ret : ret; + } else if (OB_SUCC(ret) && return_code_array.count() != proxy.get_results().count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("return code array not match with result", KR(ret), K(return_code_array), + K(proxy.get_results())); + } + for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) { + if (OB_FAIL(return_code_array.at(i))) { + LOG_WARN("send rpc is failed", KR(ret), K(i), K(return_code_array)); + } else { + const auto *result = proxy.get_results().at(i); + if (OB_ISNULL(result)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is null", KR(ret), K(i), K(return_code_array)); + } else if (OB_FAIL(replica_scn.init(result->get_server(), result->get_cur_readable_scn()))) { + LOG_WARN("failed to init replica scn", KR(ret), KPC(result)); + } else if (OB_FAIL(replicas_scn.push_back(replica_scn))) { + LOG_WARN("failed to push back", KR(ret), K(replica_scn)); + } + } + } + } + return ret; +} + int ObLSRecoveryStatHandler::get_majority_readable_scn_( const share::SCN &leader_readable_scn, share::SCN &majority_min_readable_scn) @@ -325,11 +588,20 @@ int ObLSRecoveryStatHandler::get_majority_readable_scn_( member_list_new, paxos_replica_number_new))) { LOG_WARN("construct_new_member_list failed", KR(ret), KPC_(ls), K(palf_stat_first)); + } else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_3_0_0) { + //use readable in memory + if (OB_FAIL(do_get_majority_readable_scn_V2_(member_list_new, + rootserver::majority(paxos_replica_number_new), palf_stat_first.config_version_, + majority_min_readable_scn))) { + LOG_WARN("failed to get majority readable scn", KR(ret), K(palf_stat_first), + K(paxos_replica_number_new), K(member_list_new)); + } } else if (OB_FAIL(do_get_majority_readable_scn_(member_list_new, leader_readable_scn, rootserver::majority(paxos_replica_number_new), majority_min_readable_scn))) { LOG_WARN("do_get_majority_readable_scn_ failed", KR(ret), K(member_list_new), K(leader_readable_scn), K(paxos_replica_number_new), K(palf_stat_first), K(majority_min_readable_scn)); - } else if (OB_FAIL(get_latest_palf_stat_(palf_stat_second))) { + } + if (FAILEDx(get_latest_palf_stat_(palf_stat_second))) { LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls)); } else if (palf_stat_first.config_version_ != palf_stat_second.config_version_) { ret = OB_EAGAIN; @@ -374,7 +646,7 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_( majority_min_readable_scn = leader_readable_scn; LOG_INFO("single replica, majority_min_readable_scn = leader_readable_scn", KR(ret), K(ob_member_list), K(leader_readable_scn)); - } else if (OB_FAIL(arg.init(tenant_id_, ls_->get_ls_id()))) { + } else if (OB_FAIL(arg.init(tenant_id_, ls_->get_ls_id(), false))) { LOG_WARN("failed to init arg", KR(ret), K_(tenant_id), KPC_(ls)); } else { int tmp_ret = OB_SUCCESS; @@ -441,6 +713,48 @@ int ObLSRecoveryStatHandler::do_get_majority_readable_scn_( return ret; } +int ObLSRecoveryStatHandler::do_get_majority_readable_scn_V2_( + const ObIArray &ob_member_list, + const int64_t need_query_member_cnt, + const palf::LogConfigVersion &config_version, + share::SCN &majority_min_readable_scn) +{ + int ret = OB_SUCCESS; + majority_min_readable_scn = SCN::min_scn(); + ObArray replica_readble_scn; + + if (OB_FAIL(check_inner_stat_())) { + LOG_WARN("inner stat error", KR(ret), K_(is_inited)); + } else if ( ob_member_list.count() <= 0 + || 0 >= need_query_member_cnt + || !config_version.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(need_query_member_cnt), + K(ob_member_list), K(config_version)); + } else { + SpinRLockGuard guard(lock_); + if (config_version_ != config_version) { + ret = OB_NEED_RETRY; + LOG_WARN("config version not match, need retry", KR(ret), K(config_version), + K(config_version_)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < replicas_scn_.count(); ++i) { + ObAddr &server = replicas_scn_.at(i).get_server(); + SCN readable_scn= replicas_scn_.at(i).get_readable_scn(); + if (has_exist_in_array(ob_member_list, server)) { + if (OB_FAIL(replica_readble_scn.push_back(readable_scn))) { + LOG_WARN("failed to push back", KR(ret), K(i), K(server), K(readable_scn)); + } + } + } + } + if (FAILEDx(do_calc_majority_min_readable_scn_(need_query_member_cnt, + replica_readble_scn, majority_min_readable_scn))) { + LOG_WARN("failed to calc majority readable scn", KR(ret), + K(need_query_member_cnt), K(replica_readble_scn)); + } + return ret; +} int ObLSRecoveryStatHandler::calc_majority_min_readable_scn_( const SCN &leader_readable_scn, @@ -450,8 +764,8 @@ int ObLSRecoveryStatHandler::calc_majority_min_readable_scn_( SCN &majority_min_readable_scn) { int ret = OB_SUCCESS; - ObArray readable_scn_list; majority_min_readable_scn = SCN::max_scn(); + ObArray readable_scn_list; if (OB_FAIL(check_inner_stat_())) { LOG_WARN("inner stat error", KR(ret), K_(is_inited)); } else if (!leader_readable_scn.is_valid_and_not_min() || 0 >= majority_cnt) { @@ -485,24 +799,42 @@ int ObLSRecoveryStatHandler::calc_majority_min_readable_scn_( } } } - - if (OB_FAIL(ret)) { - } else if (readable_scn_list.count() < majority_cnt) { - ret = OB_EAGAIN; - LOG_WARN("can not get majority readable_scn count", KR(ret), K(majority_cnt), K(readable_scn_list), K(return_code_array)); - } else { - (void)std::sort(readable_scn_list.begin(), readable_scn_list.end(), std::greater()); - for (int64_t i = 0; OB_SUCC(ret) && i < readable_scn_list.count() && i < majority_cnt; ++i) { - if (majority_min_readable_scn > readable_scn_list.at(i)) { - majority_min_readable_scn = readable_scn_list.at(i); - } - } - LOG_TRACE("calculate majority min readable_scn finished", KR(ret), K(leader_readable_scn), - K(majority_min_readable_scn), K(readable_scn_list), K(majority_cnt), K(return_code_array)); + if (FAILEDx(do_calc_majority_min_readable_scn_(majority_cnt, + readable_scn_list, majority_min_readable_scn))) { + LOG_WARN("do calc majority min readable scn", KR(ret), K(majority_cnt), K(readable_scn_list)); } } return ret; } - + int ObLSRecoveryStatHandler::do_calc_majority_min_readable_scn_( + const int64_t majority_cnt, + ObArray &readable_scn_list, + share::SCN &majority_min_readable_scn) +{ + int ret = OB_SUCCESS; + majority_min_readable_scn.set_max(); + if (OB_UNLIKELY(0 >= majority_cnt)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(majority_cnt)); + } else if (readable_scn_list.count() < majority_cnt) { + ret = OB_EAGAIN; + LOG_WARN("can not get majority readable_scn count", KR(ret), + K(majority_cnt), K(readable_scn_list)); + } else { + (void)std::sort(readable_scn_list.begin(), readable_scn_list.end(), + std::greater()); + for (int64_t i = 0; + OB_SUCC(ret) && i < readable_scn_list.count() && i < majority_cnt; + ++i) { + if (majority_min_readable_scn > readable_scn_list.at(i)) { + majority_min_readable_scn = readable_scn_list.at(i); + } + } + LOG_TRACE("calculate majority min readable_scn finished", KR(ret), + K(majority_min_readable_scn), + K(readable_scn_list), K(majority_cnt)); + } + return ret; +} } } diff --git a/src/rootserver/ob_ls_recovery_stat_handler.h b/src/rootserver/ob_ls_recovery_stat_handler.h index ebf928d507..a628dafaee 100644 --- a/src/rootserver/ob_ls_recovery_stat_handler.h +++ b/src/rootserver/ob_ls_recovery_stat_handler.h @@ -17,6 +17,7 @@ #include "rootserver/ob_rs_async_rpc_proxy.h" //ObGetLSReplayedScnProxy #include "share/ls/ob_ls_recovery_stat_operator.h" // ObLSRecoveryStatOperator #include "logservice/palf/palf_handle_impl.h" // PalfStat +#include "logservice/palf/log_meta_info.h"//LogConfigVersion namespace oceanbase { @@ -28,6 +29,27 @@ class ObLS; namespace rootserver { +struct ObLSReplicaReadableSCN +{ +public: + ObLSReplicaReadableSCN() : server_(), readable_scn_() {} + ~ObLSReplicaReadableSCN() {} + int init(const common::ObAddr &server, const share::SCN &readable_scn); + + share::SCN get_readable_scn()const + { + return readable_scn_; + } + common::ObAddr &get_server() + { + return server_; + } + TO_STRING_KV(K_(server), K_(readable_scn)); +private: + common::ObAddr server_; + share::SCN readable_scn_; +}; + /** * @description: * ObLSRecoveryStatHandler exists on the LS of each observer and is responsible for @@ -48,6 +70,14 @@ public: */ int get_ls_replica_readable_scn(share::SCN &readable_scn); + /* + * @description: + * Get the readable_scn of other replicas + * @param[out] readable_scn ls readable_scn + * @return return code + * */ + int get_all_replica_min_readable_scn(share::SCN &readable_scn); + /** * @description: * get ls level recovery_stat by LS leader. @@ -57,6 +87,13 @@ public: */ int get_ls_level_recovery_stat(share::ObLSRecoveryStat &ls_recovery_stat); + int set_add_replica_server(const common::ObAddr &server); + /* + * @description: + * get all ls replica readable and set to replicas_scn_; + */ + int gather_replica_readable_scn(); + TO_STRING_KV(K_(tenant_id), K_(ls)); private: @@ -96,7 +133,9 @@ private: */ int get_latest_palf_stat_( palf::PalfStat &palf_stat); - + int do_get_each_replica_readable_scn_( + const ObIArray &ob_member_list, + ObArray &replicas_scn); int get_majority_readable_scn_( const share::SCN &leader_readable_scn, share::SCN &majority_min_readable_scn); @@ -105,12 +144,21 @@ private: const share::SCN &leader_readable_scn, const int64_t need_query_member_cnt, share::SCN &majority_min_readable_scn); + int do_get_majority_readable_scn_V2_( + const ObIArray &ob_member_list, + const int64_t need_query_member_cnt, + const palf::LogConfigVersion &config_version, + share::SCN &majority_min_readable_scn); int calc_majority_min_readable_scn_( const share::SCN &leader_readable_scn, const int64_t majority_cnt, const ObIArray &return_code_array, const ObGetLSReplayedScnProxy &proxy, share::SCN &majority_min_readable_scn); + int do_calc_majority_min_readable_scn_( + const int64_t majority_cnt, + ObArray &readable_scn_list, + share::SCN &majority_min_readable_scn); int construct_new_member_list_( const common::ObMemberList &member_list_ori, @@ -119,12 +167,20 @@ private: ObIArray &member_list_new, int64_t &paxos_replica_number_new); + int try_reload_and_fix_config_version_(const palf::LogConfigVersion ¤t_version); + DISALLOW_COPY_AND_ASSIGN(ObLSRecoveryStatHandler); private: bool is_inited_; uint64_t tenant_id_; ObLS *ls_; + common::SpinRWLock lock_; + share::SCN readable_scn_in_inner_;//readable_scn of inner_table + palf::LogConfigVersion config_version_in_inner_;//config_version in inner_table + common::ObAddr extra_server_;//for add replica, need to gather add_replica's readable_scn + palf::LogConfigVersion config_version_; + ObArray replicas_scn_; }; } diff --git a/src/rootserver/ob_ls_service_helper.cpp b/src/rootserver/ob_ls_service_helper.cpp index 63895837aa..1ab73e8466 100755 --- a/src/rootserver/ob_ls_service_helper.cpp +++ b/src/rootserver/ob_ls_service_helper.cpp @@ -1396,5 +1396,113 @@ int ObTenantLSInfo::get_next_primary_zone( } return ret; } +int ObLSServiceHelper::check_transfer_task_replay(const uint64_t tenant_id, + const share::ObLSID &src_ls, + const share::ObLSID &dest_ls, + const share::SCN &transfer_scn, + bool &replay_finish) +{ + int ret = OB_SUCCESS; + ObLSStatusOperator ls_operator; + share::ObLSStatusInfo ls_status; + SCN readble_scn; + replay_finish = true; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + || !src_ls.is_valid() || !dest_ls.is_valid() + || !transfer_scn.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(src_ls), + K(dest_ls), K(transfer_scn)); + } else if (OB_FAIL(check_ls_transfer_replay_(tenant_id, src_ls, transfer_scn, replay_finish))) { + LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(src_ls), K(transfer_scn)); + } else if (!replay_finish) { + LOG_WARN("src ls has not replay transfer finish", K(tenant_id), K(src_ls)); + } else if (OB_FAIL(check_ls_transfer_replay_(tenant_id, src_ls, transfer_scn, replay_finish))) { + LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(dest_ls), K(transfer_scn)); + } else if (!replay_finish) { + LOG_WARN("dest ls has not replay transfer finish", K(tenant_id), K(dest_ls)); + } + return ret; +} + +int ObLSServiceHelper::check_ls_transfer_replay_(const uint64_t tenant_id, + const share::ObLSID &ls_id, + const share::SCN &transfer_scn, + bool &replay_finish) +{ + int ret = OB_SUCCESS; + ObLSStatusOperator ls_operator; + share::ObLSStatusInfo ls_status; + SCN readable_scn; + replay_finish = true; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + || !ls_id.is_valid() + || !transfer_scn.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(transfer_scn)); + } else if (OB_ISNULL(GCTX.sql_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_)); + } else if (OB_FAIL(ls_operator.get_ls_status_info(tenant_id, ls_id, + ls_status, *GCTX.sql_proxy_))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + LOG_INFO("src ls not exist, no need check", K(tenant_id), K(ls_id)); + } else { + LOG_WARN("failed to get ls status info", KR(ret), K(tenant_id), K(ls_id)); + } + } else if (OB_FAIL(get_ls_all_replica_readable_scn_(tenant_id, ls_id, readable_scn))) { + LOG_WARN("failed to get ls all replica readable scn", KR(ret), K(tenant_id), K(ls_id)); + } else if (readable_scn < transfer_scn) { + replay_finish = false; + LOG_INFO("need wait, ls has not replay finish transfer", K(tenant_id), + K(ls_id), K(readable_scn), K(transfer_scn)); + } + return ret; +} + +int ObLSServiceHelper::get_ls_all_replica_readable_scn_(const uint64_t tenant_id, + const share::ObLSID &ls_id, + share::SCN &readable_scn) +{ + int ret = OB_SUCCESS; + ObTimeoutCtx ctx; + if (OB_UNLIKELY(!ls_id.is_valid() || !is_valid_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(ls_id), K(tenant_id)); + } else if (OB_ISNULL(GCTX.location_service_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("location service is null", KR(ret), KP(GCTX.location_service_), + KP(GCTX.srv_rpc_proxy_)); + } else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) { + LOG_WARN("fail to set timeout ctx", KR(ret)); + } else { + obrpc::ObGetLSReplayedScnArg arg; + ObAddr leader; + ObGetLSReplayedScnRes result; + const int64_t timeout = ctx.get_timeout(); + ObGetLSReplayedScnProxy proxy( + *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_replayed_scn); + if (OB_FAIL(arg.init(tenant_id, ls_id, true))) { + LOG_WARN("failed to init arg", KR(ret), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(GCTX.location_service_->get_leader( + GCONF.cluster_id, tenant_id, ls_id, false, leader))) { + LOG_WARN("failed to get leader", KR(ret), K(tenant_id), K(ls_id)); + } else if (OB_FAIL(proxy.call(leader, timeout, tenant_id, arg))) { + LOG_WARN("failed to get ls repalyed scn", KR(ret), K(leader), K(tenant_id), + K(timeout), K(arg)); + } else if (OB_FAIL(proxy.wait())) { + LOG_WARN("failed to get wait all rpc", KR(ret), K(tenant_id), K(ls_id), K(leader)); + } else if (OB_ISNULL(proxy.get_results().at(0))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result is null", KR(ret), K(tenant_id), K(leader), K(ls_id)); + } else { + readable_scn = proxy.get_results().at(0)->get_cur_readable_scn(); + } + } + return ret; + +} + }//end of rootserver } diff --git a/src/rootserver/ob_ls_service_helper.h b/src/rootserver/ob_ls_service_helper.h index 4d48996cc9..2f89d32087 100644 --- a/src/rootserver/ob_ls_service_helper.h +++ b/src/rootserver/ob_ls_service_helper.h @@ -251,6 +251,11 @@ public: ObTenantLSInfo& tenant_ls_info, common::ObISQLClient &sql_proxy); static int wait_all_tenants_user_ls_sync_scn(common::hash::ObHashMap &tenants_sys_ls_target_scn); + static int check_transfer_task_replay(const uint64_t tenant_id, + const share::ObLSID &src_ls, + const share::ObLSID &dest_id, + const share::SCN &transfer_scn, + bool &replay_finish); private: static int check_if_need_wait_user_ls_sync_scn_(const uint64_t tenant_id, const share::SCN &sys_ls_target_scn); static int revision_to_equal_status_( @@ -268,6 +273,13 @@ private: const uint64_t ls_group_id, ObUnitGroupInfo &src_info, ObUnitGroupInfo &dest_info); + static int get_ls_all_replica_readable_scn_(const uint64_t tenant_id, + const share::ObLSID &src_ls, + share::SCN &readable_scn); + static int check_ls_transfer_replay_(const uint64_t tenant_id, + const share::ObLSID &ls_id, + const share::SCN &transfer_scn, + bool &replay_finish); }; diff --git a/src/rootserver/ob_recovery_ls_service.cpp b/src/rootserver/ob_recovery_ls_service.cpp index 59a142e9df..43a8fe4a38 100755 --- a/src/rootserver/ob_recovery_ls_service.cpp +++ b/src/rootserver/ob_recovery_ls_service.cpp @@ -716,7 +716,8 @@ int ObRecoveryLSService::construct_sys_ls_recovery_stat_based_on_sync_scn_( readable_scn = tmp_ls_stat.get_readable_scn(); } - if (FAILEDx(ls_stat.init_only_recovery_stat(tenant_id_, SYS_LS, sync_scn, readable_scn))) { + if (FAILEDx(ls_stat.init_only_recovery_stat(tenant_id_, SYS_LS, sync_scn, readable_scn, + tmp_ls_stat.get_config_version()))) { LOG_WARN("failed to init ls recovery stat", KR(ret), K(tenant_id_), K(sync_scn), K(readable_scn), K(tmp_ls_stat), K(tenant_info)); } @@ -1106,6 +1107,8 @@ int ObRecoveryLSService::do_ls_balance_task_() LOG_WARN("get_tenant_info failed", K(ret)); } else if (tenant_info.get_standby_scn() >= ls_balance_task.get_operation_scn()) { const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_); + //transfer_scn maybe sync_scn + SCN transfer_scn; START_TRANSACTION(proxy_, exec_tenant_id) if (OB_FAIL(ret)) { LOG_WARN("failed to start trans", KR(ret)); @@ -1114,6 +1117,7 @@ int ObRecoveryLSService::do_ls_balance_task_() LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task)); } } else if (ls_balance_task.get_task_op().is_transfer_end()) { + transfer_scn = ls_balance_task.get_operation_scn(); } else if (ls_balance_task.get_task_op().is_transfer_begin()) { //find transfer end, or tenant is in flashback ObBalanceTaskHelper transfer_end_task; @@ -1122,28 +1126,43 @@ int ObRecoveryLSService::do_ls_balance_task_() ls_balance_task.get_dest_ls(), trans, transfer_end_task); if (OB_SUCC(ret)) { //if has transfer end, can remove transfer begin - LOG_INFO("has transfer end task, can remove transfer begin", KR(ret), - K(ls_balance_task), K(transfer_end_task)); + transfer_scn = ls_balance_task.get_operation_scn(); + LOG_INFO("has transfer end task", KR(ret), K(ls_balance_task), K(transfer_end_task)); } else if (OB_ENTRY_NOT_EXIST != ret) { LOG_WARN("failed to find transfer end task", KR(ret), K(tenant_id_), K(ls_balance_task)); } else if (tenant_info.is_prepare_flashback_for_switch_to_primary_status() || tenant_info.is_prepare_flashback_for_failover_to_primary_status()) { //check tenant_info status and check wait readable_scn is equal to sync_scn ret = OB_SUCCESS; + transfer_scn = tenant_info.get_sync_scn(); if (tenant_info.get_sync_scn() != tenant_info.get_standby_scn()) { ret = OB_NEED_WAIT; LOG_WARN("must wait repay to newest", KR(ret), K(tenant_id_), K(tenant_info), K(ls_balance_task)); } else { - LOG_INFO("repay to newest, can remove transfer begin before failover", K(tenant_info), K(ls_balance_task)); + LOG_INFO("replay to newest", K(tenant_info), K(ls_balance_task)); } } else { ret = OB_NEED_RETRY; LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task)); } - } else { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls balance task op is unexpected", KR(ret), K(ls_balance_task)); + } + if (OB_FAIL(ret)) { + } else if (ls_balance_task.get_task_op().is_transfer_begin() + || ls_balance_task.get_task_op().is_transfer_end()) { + bool is_replay_finish = false; + if (!transfer_scn.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("transfer scn is invalid", KR(ret), K(transfer_scn), K(tenant_info), K(ls_balance_task)); + } else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay( + tenant_id_, ls_balance_task.get_src_ls(), + ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) { + LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), K(ls_balance_task), + K(tenant_info), K(transfer_scn)); + } else if (!is_replay_finish) { + ret = OB_NEED_RETRY; + LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn)); + } } if (FAILEDx(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_, ls_balance_task.get_operation_scn(), trans))) { @@ -1162,6 +1181,8 @@ int ObRecoveryLSService::do_ls_balance_task_() return ret; } + + int ObRecoveryLSService::do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task, common::ObMySQLTransaction &trans) { diff --git a/src/rootserver/ob_recovery_ls_service.h b/src/rootserver/ob_recovery_ls_service.h index 5eff671254..0b2ec7a29e 100755 --- a/src/rootserver/ob_recovery_ls_service.h +++ b/src/rootserver/ob_recovery_ls_service.h @@ -144,6 +144,7 @@ private: int get_restore_source_value_(ObLogRestoreSourceItem &item, ObSqlString &standby_source_value); int do_update_restore_source_(ObRestoreSourceServiceAttr &old_attr, ObLogRestoreSourceMgr &restore_source_mgr); int update_source_inner_table_(char *buf, const int64_t buf_size, ObMySQLTransaction &trans, const ObLogRestoreSourceItem &item); + int get_ls_all_replica_readable_scn_(const share::ObLSID &ls_id, share::SCN &reabable_scn); private: bool inited_; uint64_t tenant_id_; diff --git a/src/rootserver/ob_tenant_role_transition_service.cpp b/src/rootserver/ob_tenant_role_transition_service.cpp index 0694f40a03..bbf05b8481 100644 --- a/src/rootserver/ob_tenant_role_transition_service.cpp +++ b/src/rootserver/ob_tenant_role_transition_service.cpp @@ -555,7 +555,19 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_() LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id_)); } else if (cur_tenant_info.get_sync_scn() == cur_tenant_info.get_standby_scn()) { is_finish = true; - LOG_INFO("has transfer task, and repaly to newest", KR(ret), K(cur_tenant_info)); + for (int64_t i = 0; OB_SUCC(ret) && i < balance_task_array.count() && !is_finish; ++i) { + const ObBalanceTask &task = balance_task_array.at(i); + if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(tenant_id_, + task.get_src_ls_id(), task.get_dest_ls_id(), cur_tenant_info.get_sync_scn(), + is_finish))) { + LOG_WARN("failed to check transfer task replay", KR(ret), K(cur_tenant_info), K(task)); + } else if (!is_finish) { + LOG_INFO("has transfe task, and not replay to newest", K(task)); + } + }//end for + if (OB_SUCC(ret) && is_finish) { + LOG_INFO("has transfer task, and replay to newest", KR(ret), K(cur_tenant_info)); + } } } else if (OB_FAIL(ret)) { LOG_WARN("failed to pop task", KR(ret), K(tenant_id_)); diff --git a/src/rootserver/restore/ob_restore_scheduler.cpp b/src/rootserver/restore/ob_restore_scheduler.cpp index 672a20c84e..0a35fa771b 100644 --- a/src/rootserver/restore/ob_restore_scheduler.cpp +++ b/src/rootserver/restore/ob_restore_scheduler.cpp @@ -1032,18 +1032,14 @@ int ObRestoreScheduler::restore_init_ls(const share::ObPhysicalRestoreJob &job_i LOG_WARN("failed to read ls info", KR(ret)); } else { const SCN &sync_scn = backup_ls_attr.backup_scn_; - const SCN readable_scn = SCN::base_scn(); ObLSRecoveryStatOperator ls_recovery; - ObLSRecoveryStat ls_recovery_stat; - LOG_INFO("start to create ls and set sync scn", K(sync_scn), K(backup_ls_attr)); - if (OB_FAIL(ls_recovery_stat.init_only_recovery_stat( - tenant_id_, SYS_LS, sync_scn, readable_scn))) { - LOG_WARN("failed to init ls recovery stat", KR(ret), K(backup_ls_attr.backup_scn_), - K(sync_scn), K(readable_scn)); - } else if (OB_FAIL(ls_recovery.update_ls_recovery_stat(ls_recovery_stat, false, *sql_proxy_))) { - LOG_WARN("failed to update ls recovery stat", KR(ret), - K(ls_recovery_stat)); + const uint64_t exec_tenant_id = get_private_table_exec_tenant_id(tenant_id_); + START_TRANSACTION(sql_proxy_, exec_tenant_id) + LOG_INFO("start to create ls and set sync scn", K(sync_scn), K(backup_ls_attr), KR(ret)); + if (FAILEDx(ls_recovery.update_sys_ls_sync_scn(tenant_id_, trans, sync_scn))) { + LOG_WARN("failed to update sync ls sync scn", KR(ret), K(sync_scn)); } + END_TRANSACTION(trans) } if (FAILEDx(create_all_ls_(*tenant_schema, backup_ls_attr.ls_attr_array_))) { LOG_WARN("failed to create all ls", KR(ret), K(backup_ls_attr), KPC(tenant_schema)); diff --git a/src/share/ls/ob_ls_i_life_manager.h b/src/share/ls/ob_ls_i_life_manager.h index e599c25bd0..933a205dc2 100644 --- a/src/share/ls/ob_ls_i_life_manager.h +++ b/src/share/ls/ob_ls_i_life_manager.h @@ -175,7 +175,13 @@ public: const common::ObSqlString &sql, ObISQLClient &client, TableOperator *table_operator, common::ObIArray &res); + template + static int hex_str_to_type(const common::ObString &str, T &list); + template + static int type_to_hex_str(const T &list, + common::ObIAllocator &allocator, + common::ObString &hex_str); private: DISALLOW_COPY_AND_ASSIGN(ObLSTemplateOperator); }; @@ -269,6 +275,77 @@ int ObLSTemplateOperator::exec_write(const uint64_t &tenant_id, return ret; } +template +int ObLSTemplateOperator::hex_str_to_type( + const common::ObString &str, + T &list) +{ + int ret = OB_SUCCESS; + list.reset(); + char *deserialize_buf = NULL; + const int64_t str_size = str.length(); + const int64_t deserialize_size = str.length() / 2 + 1; + int64_t deserialize_pos = 0; + ObArenaAllocator allocator("HexValue"); + if (OB_UNLIKELY(str.empty())) { + ret = OB_INVALID_ARGUMENT; + SHARE_LOG(WARN, "str is empty", KR(ret)); + } else if (OB_ISNULL(deserialize_buf = static_cast(allocator.alloc(deserialize_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + SHARE_LOG(WARN, "fail to alloc memory", KR(ret), K(deserialize_size)); + } else if (OB_FAIL(hex_to_cstr(str.ptr(), str_size, deserialize_buf, deserialize_size))) { + SHARE_LOG(WARN, "fail to get cstr from hex", KR(ret), K(str_size), K(deserialize_size), K(str)); + } else if (OB_FAIL(list.deserialize(deserialize_buf, deserialize_size, deserialize_pos))) { + SHARE_LOG(WARN, "fail to deserialize set member list arg", KR(ret), K(deserialize_pos), K(deserialize_size), + K(str)); + } else if (OB_UNLIKELY(deserialize_pos > deserialize_size)) { + ret = OB_SIZE_OVERFLOW; + SHARE_LOG(WARN, "deserialize error", KR(ret), K(deserialize_pos), K(deserialize_size)); + } + return ret; +} + +template +int ObLSTemplateOperator::type_to_hex_str( + const T &list, + common::ObIAllocator &allocator, + common::ObString &hex_str) +{ + int ret = OB_SUCCESS; + char *serialize_buf = NULL; + const int64_t serialize_size = list.get_serialize_size(); + int64_t serialize_pos = 0; + char *hex_buf = NULL; + const int64_t hex_size = 2 * serialize_size; + int64_t hex_pos = 0; + if (OB_UNLIKELY(!list.is_valid())) { + ret = OB_INVALID_ARGUMENT; + SHARE_LOG(WARN, "list is invalid", KR(ret), K(list)); + } else if (OB_UNLIKELY(hex_size > OB_MAX_LONGTEXT_LENGTH + 1)) { + ret = OB_SIZE_OVERFLOW; + SHARE_LOG(WARN, "format str is too long", KR(ret), K(hex_size), K(list)); + } else if (OB_ISNULL(serialize_buf = static_cast(allocator.alloc(serialize_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + SHARE_LOG(WARN, "fail to alloc buf", KR(ret), K(serialize_size)); + } else if (OB_FAIL(list.serialize(serialize_buf, serialize_size, serialize_pos))) { + SHARE_LOG(WARN, "failed to serialize set list arg", KR(ret), K(list), K(serialize_size), K(serialize_pos)); + } else if (OB_UNLIKELY(serialize_pos > serialize_size)) { + ret = OB_SIZE_OVERFLOW; + SHARE_LOG(WARN, "serialize error", KR(ret), K(serialize_pos), K(serialize_size)); + } else if (OB_ISNULL(hex_buf = static_cast(allocator.alloc(hex_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + SHARE_LOG(WARN, "fail to alloc memory", KR(ret), K(hex_size)); + } else if (OB_FAIL(hex_print(serialize_buf, serialize_pos, hex_buf, hex_size, hex_pos))) { + SHARE_LOG(WARN, "fail to print hex", KR(ret), K(serialize_pos), K(hex_size), K(serialize_buf)); + } else if (OB_UNLIKELY(hex_pos > hex_size)) { + ret = OB_SIZE_OVERFLOW; + SHARE_LOG(WARN, "encode error", KR(ret), K(hex_pos), K(hex_size)); + } else { + hex_str.assign_ptr(hex_buf, static_cast(hex_pos)); + } + return ret; +} + #define DEFINE_IN_TRANS_FUC(func_name, ...)\ int func_name ##_in_trans(__VA_ARGS__, ObMySQLTransaction &trans);\ int func_name(__VA_ARGS__); diff --git a/src/share/ls/ob_ls_recovery_stat_operator.cpp b/src/share/ls/ob_ls_recovery_stat_operator.cpp index d3564ae258..c99412fd31 100644 --- a/src/share/ls/ob_ls_recovery_stat_operator.cpp +++ b/src/share/ls/ob_ls_recovery_stat_operator.cpp @@ -40,13 +40,15 @@ bool ObLSRecoveryStat::is_valid() const && sync_scn_ >= readable_scn_ && create_scn_.is_valid() && drop_scn_.is_valid(); + //config_version maybe invalid, no need check } int ObLSRecoveryStat::init(const uint64_t tenant_id, const ObLSID &id, const SCN &sync_scn, const SCN &readable_scn, const SCN &create_scn, - const SCN &drop_scn) + const SCN &drop_scn, + const palf::LogConfigVersion &config_version) { int ret = OB_SUCCESS; reset(); @@ -69,23 +71,26 @@ int ObLSRecoveryStat::init(const uint64_t tenant_id, readable_scn_ = readable_scn; create_scn_ = create_scn; drop_scn_ = drop_scn; + config_version_ = config_version; } return ret; } int ObLSRecoveryStat::init_only_recovery_stat(const uint64_t tenant_id, const ObLSID &id, - const SCN &sync_scn, const SCN &readable_scn) + const SCN &sync_scn, const SCN &readable_scn, + const palf::LogConfigVersion &config_version) { int ret = OB_SUCCESS; reset(); if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !id.is_valid() || !sync_scn.is_valid() - || !readable_scn.is_valid())) { + || !readable_scn.is_valid() + || !config_version.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(id), - K(sync_scn), K(readable_scn)); + K(sync_scn), K(readable_scn), K(config_version)); } else if (OB_UNLIKELY(sync_scn < readable_scn)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("readable_scn > sync_scn, invalid argument", KR(ret), K(sync_scn), K(readable_scn), @@ -97,6 +102,7 @@ int ObLSRecoveryStat::init_only_recovery_stat(const uint64_t tenant_id, const Ob readable_scn_ = readable_scn; drop_scn_ = SCN::base_scn(); create_scn_ = SCN::base_scn(); + config_version_ = config_version; } return ret; } @@ -108,6 +114,7 @@ void ObLSRecoveryStat::reset() readable_scn_.reset(); create_scn_.reset(); drop_scn_.reset(); + config_version_.reset(); } int ObLSRecoveryStat::assign(const ObLSRecoveryStat &other) @@ -120,6 +127,7 @@ int ObLSRecoveryStat::assign(const ObLSRecoveryStat &other) readable_scn_ = other.readable_scn_; create_scn_ = other.create_scn_; drop_scn_ = other.drop_scn_; + config_version_ = other.config_version_; } return ret; } @@ -248,7 +256,14 @@ int ObLSRecoveryStatOperator::update_ls_recovery_stat_in_trans( readable_scn.get_val_for_inner_table_field(), ls_recovery.get_ls_id().id(), ls_recovery.get_tenant_id()))) { LOG_WARN("failed to assign sql", KR(ret), K(sync_scn), K(readable_scn), K(sql)); - } else if (OB_FAIL(exec_write(ls_recovery.get_tenant_id(), sql, this, trans, true))) { + } else if (old_ls_recovery.get_config_version().is_valid()) { + //if config version is valid in inner table, no need check compat_version + if (ls_recovery.get_config_version() != old_ls_recovery.get_config_version()) { + ret = OB_NEED_RETRY; + LOG_WARN("configversion not match, maybe leader change", KR(ret), K(ls_recovery), K(old_ls_recovery)); + } + } + if (FAILEDx(exec_write(ls_recovery.get_tenant_id(), sql, this, trans, true))) { LOG_WARN("failed to exec write", KR(ret), K(ls_recovery), K(sql)); } } @@ -370,12 +385,16 @@ int ObLSRecoveryStatOperator::fill_cell(common::sqlclient::ObMySQLResult*result, int64_t readable_ts = OB_INVALID_SCN_VAL; int64_t create_ts = OB_INVALID_SCN_VAL; int64_t drop_ts = OB_INVALID_SCN_VAL; + ObString config_version_val; + palf::LogConfigVersion config_version; EXTRACT_INT_FIELD_MYSQL(*result, "tenant_id", tenant_id, uint64_t); EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", id_value, int64_t); EXTRACT_UINT_FIELD_MYSQL(*result, "sync_scn", sync_ts, int64_t); EXTRACT_UINT_FIELD_MYSQL(*result, "readable_scn", readable_ts, int64_t); EXTRACT_UINT_FIELD_MYSQL(*result, "create_scn", create_ts, int64_t); EXTRACT_UINT_FIELD_MYSQL(*result, "drop_scn", drop_ts, int64_t); + EXTRACT_VARCHAR_FIELD_MYSQL_WITH_DEFAULT_VALUE(*result, "bconfig_version", config_version_val, + true /* skip_null_error */, true /* skip_column_error */, ""); if (OB_FAIL(ret)) { LOG_WARN("failed to get result", KR(ret), K(id_value), K(tenant_id), K(sync_ts), K(readable_ts), K(create_ts), K(drop_ts)); @@ -394,9 +413,12 @@ int ObLSRecoveryStatOperator::fill_cell(common::sqlclient::ObMySQLResult*result, LOG_WARN("failed to convert_for_inner_table_field", KR(ret), K(tenant_id), K(id_value), K(create_ts)); } else if (OB_FAIL(drop_scn.convert_for_inner_table_field(drop_ts))) { LOG_WARN("failed to convert_for_inner_table_field", KR(ret), K(tenant_id), K(id_value), K(drop_ts)); - } else if (OB_FAIL(ls_recovery.init(tenant_id, ls_id, sync_scn, readable_scn, create_scn, drop_scn))) { + } else if (!config_version_val.empty() && OB_FAIL(hex_str_to_type(config_version_val, config_version))) { + LOG_WARN("failed to get config version from fix", KR(ret), K(config_version_val)); + } else if (OB_FAIL(ls_recovery.init(tenant_id, ls_id, sync_scn, readable_scn, + create_scn, drop_scn, config_version))) { LOG_WARN("failed to init ls operation", KR(ret), K(tenant_id), K(sync_scn), K(readable_scn), - K(ls_id), K(create_scn), K(drop_scn)); + K(ls_id), K(create_scn), K(drop_scn), K(config_version)); } } } @@ -464,13 +486,63 @@ int ObLSRecoveryStatOperator::get_tenant_min_user_ls_create_scn(const uint64_t t return ret; } -int ObLSRecoveryStatOperator::get_min_create_scn_( - const uint64_t tenant_id, - const common::ObSqlString &sql, - ObISQLClient &client, - SCN &min_create_scn) +int ObLSRecoveryStatOperator::update_ls_config_version( + const uint64_t tenant_id, const ObLSID &ls_id, + const palf::LogConfigVersion &config_version, ObMySQLProxy &client, + SCN &readable_scn) { int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + || !ls_id.is_valid() || !config_version.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid_argument", KR(ret), K(tenant_id), K(ls_id), K(config_version)); + } else { + const uint64_t exec_tenant_id = get_exec_tenant_id(tenant_id); + ObLSRecoveryStat ls_recovery_stat; + START_TRANSACTION(&client, exec_tenant_id) + if (FAILEDx(get_ls_recovery_stat(tenant_id, ls_id, true, + ls_recovery_stat, trans))) { + LOG_WARN("failed to get ls recovery stat", KR(ret), K(tenant_id), K(ls_id)); + } else if (ls_recovery_stat.get_config_version().is_valid() + && ls_recovery_stat.get_config_version() > config_version) { + ret = OB_NEED_RETRY; + LOG_WARN("config version can not fallback", KR(ret), K(ls_recovery_stat), K(config_version)); + } else if (ls_recovery_stat.get_config_version() == config_version) { + } else { + //config_version inmaybe invalid or larger than inner table + common::ObSqlString sql; + ObString config_version_str; + ObArenaAllocator allocator("VersionStr"); + char config_version_val[128] = {0}; + if (OB_FAIL(type_to_hex_str(config_version, allocator, + config_version_str))) { + LOG_WARN("failed to type to hex", KR(ret), K(config_version)); + } else if (0 > config_version.to_string(config_version_val, 128)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("config_version to string failed", KR(ret), K(config_version)); + } else if (OB_FAIL(sql.assign_fmt("UPDATE %s SET config_version = '%s', bconfig_version = '%.*s' " + "WHERE ls_id = %ld and tenant_id = %lu", + OB_ALL_LS_RECOVERY_STAT_TNAME, + config_version_val, + static_cast(config_version_str.length()), config_version_str.ptr(), + ls_id.id(), tenant_id))) { + LOG_WARN("failed to assign sql", KR(ret), K(ls_id), K(tenant_id), K(sql)); + } else if (OB_FAIL(exec_write(tenant_id, sql, this, trans, true))) { + LOG_WARN("failed to exec write", KR(ret), K(tenant_id), K(sql)); + } + } + if (OB_SUCC(ret)) { + readable_scn = ls_recovery_stat.get_readable_scn(); + } + END_TRANSACTION(trans) + } + return ret; +} + +int ObLSRecoveryStatOperator::get_min_create_scn_( + const uint64_t tenant_id, const common::ObSqlString &sql, + ObISQLClient &client, SCN &min_create_scn) { + int ret = OB_SUCCESS; min_create_scn = SCN::base_scn(); if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { ret = OB_INVALID_ARGUMENT; diff --git a/src/share/ls/ob_ls_recovery_stat_operator.h b/src/share/ls/ob_ls_recovery_stat_operator.h index 87ceda605d..df1475e88d 100644 --- a/src/share/ls/ob_ls_recovery_stat_operator.h +++ b/src/share/ls/ob_ls_recovery_stat_operator.h @@ -21,6 +21,7 @@ #include "lib/container/ob_iarray.h"//ObIArray #include "logservice/palf/log_define.h"//SCN #include "share/scn.h"//SCN +#include "logservice/palf/log_meta_info.h"//LogConfigVersion namespace oceanbase { @@ -49,7 +50,8 @@ struct ObLSRecoveryStat sync_scn_(), readable_scn_(), create_scn_(), - drop_scn_() {} + drop_scn_(), + config_version_() {} virtual ~ObLSRecoveryStat() {} bool is_valid() const; int init(const uint64_t tenant_id, @@ -57,10 +59,12 @@ struct ObLSRecoveryStat const SCN &sync_scn, const SCN &readable_scn, const SCN &create_scn, - const SCN &drop_scn); + const SCN &drop_scn, + const palf::LogConfigVersion &config_version); int init_only_recovery_stat(const uint64_t tenant_id, const ObLSID &id, const SCN &sync_scn, - const SCN &readable_scn); + const SCN &readable_scn, + const palf::LogConfigVersion &config_version); void reset(); int assign(const ObLSRecoveryStat &other); uint64_t get_tenant_id() const @@ -87,8 +91,12 @@ struct ObLSRecoveryStat { return drop_scn_; } + const palf::LogConfigVersion &get_config_version() const + { + return config_version_; + } TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(sync_scn), K_(readable_scn), - K_(create_scn), K_(drop_scn)); + K_(create_scn), K_(drop_scn), K(config_version_)); private: uint64_t tenant_id_; @@ -97,6 +105,7 @@ struct ObLSRecoveryStat SCN readable_scn_;//min weak read timestamp TODO need different majorty replicas and all replicas SCN create_scn_;//ts less than first clog ts SCN drop_scn_; //ts larger than last user data's clog and before offline + palf::LogConfigVersion config_version_; }; /* @@ -254,6 +263,17 @@ public: int get_user_ls_sync_scn(const uint64_t tenant_id, ObISQLClient &client, SCN &sync_scn); + /* + * description: update ls config_version and return current readable_scn + * @param[in] tenant_id + * @param[in] ls_id + * @param[in] config_version :will not fallback + * @param[in] client + * @param[out] readable scn + */ + int update_ls_config_version(const uint64_t tenant_id, + const ObLSID &ls_id, const palf::LogConfigVersion &config_version, + ObMySQLProxy &client, SCN &readable_scn); private: int get_min_create_scn_(const uint64_t tenant_id, diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index 24f0506b32..2455ee3627 100755 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -5937,7 +5937,7 @@ int ObGetLSSyncScnRes::assign(const ObGetLSSyncScnRes &other) return ret; } -OB_SERIALIZE_MEMBER(ObGetLSReplayedScnArg, tenant_id_, ls_id_); +OB_SERIALIZE_MEMBER(ObGetLSReplayedScnArg, tenant_id_, ls_id_, all_replica_); bool ObGetLSReplayedScnArg::is_valid() const { @@ -5945,7 +5945,7 @@ bool ObGetLSReplayedScnArg::is_valid() const && ls_id_.is_valid(); } int ObGetLSReplayedScnArg::init( - const uint64_t tenant_id, const ObLSID &ls_id) + const uint64_t tenant_id, const ObLSID &ls_id, const bool all_replica) { int ret = OB_SUCCESS; if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id @@ -5955,6 +5955,7 @@ int ObGetLSReplayedScnArg::init( } else { tenant_id_ = tenant_id; ls_id_ = ls_id; + all_replica_ = all_replica; } return ret; } @@ -5964,33 +5965,38 @@ int ObGetLSReplayedScnArg::assign(const ObGetLSReplayedScnArg &other) if (this != &other) { tenant_id_ = other.tenant_id_; ls_id_ = other.ls_id_; + all_replica_ = other.all_replica_; } return ret; } -OB_SERIALIZE_MEMBER(ObGetLSReplayedScnRes, tenant_id_, ls_id_, cur_readable_scn_); +OB_SERIALIZE_MEMBER(ObGetLSReplayedScnRes, tenant_id_, ls_id_, cur_readable_scn_, self_addr_); bool ObGetLSReplayedScnRes::is_valid() const { return OB_INVALID_TENANT_ID != tenant_id_ && ls_id_.is_valid() && cur_readable_scn_.is_valid_and_not_min(); + //no need check server valid } int ObGetLSReplayedScnRes::init( const uint64_t tenant_id, const share::ObLSID &ls_id, - const share::SCN &cur_readable_scn) + const share::SCN &cur_readable_scn, + const common::ObAddr &server) { int ret = OB_SUCCESS; if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !ls_id.is_valid() - || !cur_readable_scn.is_valid_and_not_min())) { + || !cur_readable_scn.is_valid_and_not_min() + || !server.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(cur_readable_scn)); + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(cur_readable_scn), K(server)); } else { tenant_id_ = tenant_id; ls_id_ = ls_id; cur_readable_scn_ = cur_readable_scn; + self_addr_ = server; } return ret; } @@ -6002,6 +6008,7 @@ int ObGetLSReplayedScnRes::assign(const ObGetLSReplayedScnRes &other) tenant_id_ = other.tenant_id_; ls_id_ = other.ls_id_; cur_readable_scn_ = other.cur_readable_scn_; + self_addr_ = other.self_addr_; } return ret; } diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 6477b692aa..698d5aaf8b 100755 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -7015,12 +7015,12 @@ struct ObGetLSReplayedScnArg { OB_UNIS_VERSION(1); public: - ObGetLSReplayedScnArg(): tenant_id_(OB_INVALID_TENANT_ID), ls_id_() {} + ObGetLSReplayedScnArg(): tenant_id_(OB_INVALID_TENANT_ID), ls_id_(), all_replica_(false) {} ~ObGetLSReplayedScnArg() {} bool is_valid() const; - int init(const uint64_t tenant_id, const share::ObLSID &ls_id); + int init(const uint64_t tenant_id, const share::ObLSID &ls_id, const bool all_replica); int assign(const ObGetLSReplayedScnArg &other); - TO_STRING_KV(K_(tenant_id), K_(ls_id)); + TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(all_replica)); uint64_t get_tenant_id() const { @@ -7030,11 +7030,16 @@ public: { return ls_id_; } + bool is_all_replica() const + { + return all_replica_; + } private: DISALLOW_COPY_AND_ASSIGN(ObGetLSReplayedScnArg); private: uint64_t tenant_id_; share::ObLSID ls_id_; + bool all_replica_;//add in 4.3.0, for get all ls replica readable_scn }; struct ObGetLSReplayedScnRes @@ -7043,10 +7048,12 @@ struct ObGetLSReplayedScnRes public: ObGetLSReplayedScnRes(): tenant_id_(OB_INVALID_TENANT_ID), ls_id_(), - cur_readable_scn_(share::SCN::min_scn()) {} + cur_readable_scn_(share::SCN::min_scn()), + self_addr_() {} ~ObGetLSReplayedScnRes() {} bool is_valid() const; - int init(const uint64_t tenant_id, const share::ObLSID &ls_id, const share::SCN &cur_readable_scn); + int init(const uint64_t tenant_id, const share::ObLSID &ls_id, const share::SCN &cur_readable_scn, + const common::ObAddr &server); int assign(const ObGetLSReplayedScnRes &other); TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(cur_readable_scn)); uint64_t get_tenant_id() const @@ -7061,12 +7068,17 @@ public: { return cur_readable_scn_; } + common::ObAddr get_server() const + { + return self_addr_; + } private: DISALLOW_COPY_AND_ASSIGN(ObGetLSReplayedScnRes); private: uint64_t tenant_id_; share::ObLSID ls_id_; share::SCN cur_readable_scn_; + common::ObAddr self_addr_;//add in 4.3.0 }; struct ObSwitchTenantArg diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 649d4443b3..7f54a9a801 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -648,6 +648,15 @@ public: // @param[out] ls_recovery_stat // int get_ls_replica_readable_scn(share::SCN &readable_scn) DELEGATE_WITH_RET(ls_recovery_stat_handler_, get_ls_level_recovery_stat, int); + //gather all replicas of ls's readable scn + // If follower LS replica call this function, it will return OB_NOT_MASTER. + //int gather_replica_readable_scn(); + DELEGATE_WITH_RET(ls_recovery_stat_handler_, gather_replica_readable_scn, int); + + // get all ls readable_scn: it will be failed while has replica is offline + // @param[out] readable_scn ls readable_scn + // int get_all_replica_min_readable_scn(share::SCN &readable_scn) + DELEGATE_WITH_RET(ls_recovery_stat_handler_, get_all_replica_min_readable_scn, int); // disable clog sync. // with ls read lock and log write lock.