diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index b3d19e74a4..481d4b1ee3 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -1641,13 +1641,14 @@ int ObRpcChangeLSAccessModeP::process() } if (OB_SUCC(ret)) { ls_svr = MTL(ObLSService*); + logservice::ObLogService *log_ls_svr = MTL(logservice::ObLogService*); ObLS *ls = nullptr; ObLSID ls_id = arg_.get_ls_id(); ObLSHandle handle; logservice::ObLogHandler *log_handler = NULL; - if (OB_ISNULL(ls_svr)) { + if (OB_ISNULL(ls_svr) || OB_ISNULL(log_ls_svr)) { ret = OB_ERR_UNEXPECTED; - COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret)); + COMMON_LOG(ERROR, "mtl ObLSService or ObLogService should not be null", KR(ret), KP(ls_svr), KP(log_ls_svr)); } else if (OB_FAIL(ls_svr->get_ls(ls_id, handle, ObLSGetMod::OBSERVER_MOD))) { COMMON_LOG(WARN, "get ls failed", KR(ret), K(ls_id)); } else if (OB_ISNULL(ls = handle.get_ls())) { @@ -1659,11 +1660,13 @@ int ObRpcChangeLSAccessModeP::process() } else if (palf::AccessMode::RAW_WRITE == arg_.get_access_mode() && !ls_id.is_sys_ls()) { // switchover to standby // user ls end scn should be larger than sys ls end scn at first + DEBUG_SYNC(BEFORE_WAIT_SYS_LS_END_SCN); if (OB_UNLIKELY(!arg_.get_sys_ls_end_scn().is_valid_and_not_min())) { FLOG_WARN("invalid sys_ls_end_scn, no need to let user ls wait, " "the version might be smaller than V4.2.0", KR(ret), K(arg_.get_sys_ls_end_scn())); - } else if (OB_FAIL(share::ObShareUtil::wait_user_ls_sync_scn_locally( + } else if (OB_FAIL(ObRootUtils::wait_user_ls_sync_scn_locally( arg_.get_sys_ls_end_scn(), + log_ls_svr, *ls))) { LOG_WARN("fail to wait user ls sync scn locally", KR(ret), K(ls_id), K(arg_.get_sys_ls_end_scn())); } diff --git a/src/rootserver/ob_root_utils.cpp b/src/rootserver/ob_root_utils.cpp index 7daba6fd69..7bf28d6260 100644 --- a/src/rootserver/ob_root_utils.cpp +++ b/src/rootserver/ob_root_utils.cpp @@ -2311,6 +2311,71 @@ int ObRootUtils::check_ls_balance_and_commit_rs_job( return ret; } +ERRSIM_POINT_DEF(ERRSIM_USER_LS_SYNC_SCN); +int ObRootUtils::wait_user_ls_sync_scn_locally( + const share::SCN &sys_ls_target_scn, + logservice::ObLogService *log_ls_svr, + storage::ObLS &ls) +{ + int ret = OB_SUCCESS; + logservice::ObLogHandler *log_handler = ls.get_log_handler(); + transaction::ObKeepAliveLSHandler *keep_alive_handler = ls.get_keep_alive_ls_handler(); + ObLSID ls_id = ls.get_ls_id(); + uint64_t tenant_id = ls.get_tenant_id(); + ObTimeoutCtx ctx; + if (OB_ISNULL(keep_alive_handler) || OB_ISNULL(log_handler ) || OB_ISNULL(log_ls_svr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("keep_alive_ls_handler, log_handler or ls_svr is null", KR(ret), K(ls_id), + KP(keep_alive_handler), KP(log_handler), KP(log_ls_svr)); + } else if (OB_UNLIKELY(!sys_ls_target_scn.is_valid_and_not_min())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid sys_ls_target_scn", KR(ret), K(sys_ls_target_scn)); + } else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) { + LOG_WARN("fail to set timeout", KR(ret)); + } else { + bool need_retry = true; + share::SCN curr_end_scn; + curr_end_scn.set_min(); + common::ObRole role; + int64_t leader_epoch = 0; + (void) keep_alive_handler->set_sys_ls_end_scn(sys_ls_target_scn); + do { + if (OB_UNLIKELY(ctx.is_timeouted())) { + ret = OB_TIMEOUT; + need_retry = false; + LOG_WARN("ctx timeout", KR(ret), K(ctx)); + } else if (OB_FAIL(log_ls_svr->get_palf_role(ls_id, role, leader_epoch))) { + LOG_WARN("fail to get palf role", KR(ret), K(ls_id)); + } else if (OB_UNLIKELY(!is_strong_leader(role))) { + ret = OB_NOT_MASTER; + LOG_WARN("ls on this server is not master", KR(ret), K(ls_id), K(role)); + } else { + if (OB_FAIL(log_handler->get_end_scn(curr_end_scn))) { + LOG_WARN("fail to get ls end scn", KR(ret), K(ls_id)); + } else { + curr_end_scn = ERRSIM_USER_LS_SYNC_SCN ? SCN::scn_dec(sys_ls_target_scn) : curr_end_scn; + LOG_TRACE("wait curr_end_scn >= sys_ls_target_scn", K(curr_end_scn), K(sys_ls_target_scn), + "is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false); + } + if (OB_SUCC(ret) && curr_end_scn >= sys_ls_target_scn) { + LOG_INFO("current user ls end scn >= sys ls target scn now", K(curr_end_scn), + K(sys_ls_target_scn), "is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false, + K(tenant_id), K(ls_id)); + need_retry = false; + } + } + if (need_retry && OB_SUCC(ret)) { + ob_usleep(50 * 1000); // wait 50ms + } + } while (need_retry && OB_SUCC(ret)); + if (OB_UNLIKELY(need_retry && OB_SUCC(ret))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("the wait loop should not be terminated", KR(ret), K(curr_end_scn), K(sys_ls_target_scn)); + } + } + return ret; +} + /////////////////////////////// ObClusterRole ObClusterInfoGetter::get_cluster_role_v2() diff --git a/src/rootserver/ob_root_utils.h b/src/rootserver/ob_root_utils.h index b3efd8d31f..c2f9300a2a 100644 --- a/src/rootserver/ob_root_utils.h +++ b/src/rootserver/ob_root_utils.h @@ -655,6 +655,18 @@ public: const uint64_t tenant_id, const int64_t rs_job_id, const ObRsJobType rs_job_type); + // wait the given ls's end_scn be larger than or equal to sys_ls_target_scn + // @params[in]: sys_ls_target_scn + // @params[in]: log_ls_svr + // @params[in]: ls + // @ret OB_SUCCESS user_ls_sync_scn >= sys_ls_sync_scn + // @ret OB_NOT_MASTER the current replica is not leader, no need to wait. + // the rpc sender need to find the new leader and send rpc again + // @ret other error code failure + static int wait_user_ls_sync_scn_locally( + const share::SCN &sys_ls_target_scn, + logservice::ObLogService *log_ls_svr, + storage::ObLS &ls); template static int check_left_f_in_primary_zone(ObZoneManager &zone_mgr, diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index aff7666c0e..b017e926fc 100755 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -565,6 +565,7 @@ class ObString; ACT(BEFORE_FETCH_SIMPLE_TABLES,)\ ACT(BEFORE_SEND_PARALLEL_CREATE_TABLE,)\ ACT(BEFORE_DROP_TENANT,)\ + ACT(BEFORE_WAIT_SYS_LS_END_SCN,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/ob_share_util.cpp b/src/share/ob_share_util.cpp index 0821ba35f6..60370e6b9a 100644 --- a/src/share/ob_share_util.cpp +++ b/src/share/ob_share_util.cpp @@ -22,7 +22,6 @@ #endif #include "lib/mysqlclient/ob_isql_client.h" #include "observer/omt/ob_tenant_config_mgr.h" // ObTenantConfigGuard -#include "storage/ls/ob_ls.h" //ObLS namespace oceanbase { @@ -447,61 +446,5 @@ bool ObShareUtil::is_tenant_enable_transfer(const uint64_t tenant_id) return bret; } -ERRSIM_POINT_DEF(ERRSIM_USER_LS_SYNC_SCN); -int ObShareUtil::wait_user_ls_sync_scn_locally(const share::SCN &sys_ls_target_scn, storage::ObLS &ls) -{ - int ret = OB_SUCCESS; - logservice::ObLogHandler *log_handler = ls.get_log_handler(); - transaction::ObKeepAliveLSHandler *keep_alive_handler = ls.get_keep_alive_ls_handler(); - ObLSID ls_id = ls.get_ls_id(); - uint64_t tenant_id = ls.get_tenant_id(); - ObTimeoutCtx ctx; - if (OB_ISNULL(keep_alive_handler) || OB_ISNULL(log_handler )) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("keep_alive_ls_handler or log_handler is null", KR(ret), K(ls_id), - KP(keep_alive_handler), KP(log_handler)); - } else if (OB_UNLIKELY(!sys_ls_target_scn.is_valid_and_not_min())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid sys_ls_target_scn", KR(ret), K(sys_ls_target_scn)); - } else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) { - LOG_WARN("fail to set timeout", KR(ret)); - } else { - bool need_retry = true; - share::SCN curr_end_scn; - curr_end_scn.set_min(); - (void) keep_alive_handler->set_sys_ls_end_scn(sys_ls_target_scn); - do { - if (OB_UNLIKELY(ctx.is_timeouted())) { - ret = OB_TIMEOUT; - need_retry = false; - LOG_WARN("ctx timeout", KR(ret), K(ctx)); - } else { - if (OB_FAIL(log_handler->get_end_scn(curr_end_scn))) { - LOG_WARN("fail to get ls end scn", KR(ret), K(ls_id)); - } else { - // switchover to standby timeout - curr_end_scn = ERRSIM_USER_LS_SYNC_SCN ? SCN::scn_dec(sys_ls_target_scn) : curr_end_scn; - LOG_TRACE("wait curr_end_scn >= sys_ls_target_scn", K(curr_end_scn), K(sys_ls_target_scn), - "is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false); - } - if (OB_SUCC(ret) && curr_end_scn >= sys_ls_target_scn) { - LOG_INFO("current user ls end scn >= sys ls target scn now", K(curr_end_scn), - K(sys_ls_target_scn), "is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false, - K(tenant_id), K(ls_id)); - need_retry = false; - } - } - if (need_retry) { - ob_usleep(50 * 1000); // wait 50ms - } - } while (need_retry && OB_SUCC(ret)); - if (OB_UNLIKELY(need_retry && OB_SUCC(ret))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("the wait loop should not be terminated", KR(ret), K(curr_end_scn), K(sys_ls_target_scn)); - } - } - return ret; -} - } //end namespace share } //end namespace oceanbase diff --git a/src/share/ob_share_util.h b/src/share/ob_share_util.h index 0ec8de2fc6..4093041a72 100644 --- a/src/share/ob_share_util.h +++ b/src/share/ob_share_util.h @@ -21,10 +21,6 @@ namespace common class ObTimeoutCtx; class ObISQLClient; } -namespace storage -{ - class ObLS; -} namespace share { @@ -122,11 +118,6 @@ public: const uint64_t tenant_id, const ObSqlString &sql, SCN &ora_rowscn); - // wait the given ls's end_scn be larger than or equal to sys_ls_target_scn - // @params[in]: sys_ls_target_scn - // @params[in]: ls - static int wait_user_ls_sync_scn_locally(const share::SCN &sys_ls_target_scn, storage::ObLS &ls); - static bool is_tenant_enable_rebalance(const uint64_t tenant_id); static bool is_tenant_enable_transfer(const uint64_t tenant_id); };