[CP] switchover to standby timeout

This commit is contained in:
linqiucen
2023-11-29 10:11:42 +00:00
committed by ob-robot
parent b03a138f0a
commit 9caad1da27
6 changed files with 84 additions and 69 deletions

View File

@ -1641,13 +1641,14 @@ int ObRpcChangeLSAccessModeP::process()
}
if (OB_SUCC(ret)) {
ls_svr = MTL(ObLSService*);
logservice::ObLogService *log_ls_svr = MTL(logservice::ObLogService*);
ObLS *ls = nullptr;
ObLSID ls_id = arg_.get_ls_id();
ObLSHandle handle;
logservice::ObLogHandler *log_handler = NULL;
if (OB_ISNULL(ls_svr)) {
if (OB_ISNULL(ls_svr) || OB_ISNULL(log_ls_svr)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret));
COMMON_LOG(ERROR, "mtl ObLSService or ObLogService should not be null", KR(ret), KP(ls_svr), KP(log_ls_svr));
} else if (OB_FAIL(ls_svr->get_ls(ls_id, handle, ObLSGetMod::OBSERVER_MOD))) {
COMMON_LOG(WARN, "get ls failed", KR(ret), K(ls_id));
} else if (OB_ISNULL(ls = handle.get_ls())) {
@ -1659,11 +1660,13 @@ int ObRpcChangeLSAccessModeP::process()
} else if (palf::AccessMode::RAW_WRITE == arg_.get_access_mode() && !ls_id.is_sys_ls()) {
// switchover to standby
// user ls end scn should be larger than sys ls end scn at first
DEBUG_SYNC(BEFORE_WAIT_SYS_LS_END_SCN);
if (OB_UNLIKELY(!arg_.get_sys_ls_end_scn().is_valid_and_not_min())) {
FLOG_WARN("invalid sys_ls_end_scn, no need to let user ls wait, "
"the version might be smaller than V4.2.0", KR(ret), K(arg_.get_sys_ls_end_scn()));
} else if (OB_FAIL(share::ObShareUtil::wait_user_ls_sync_scn_locally(
} else if (OB_FAIL(ObRootUtils::wait_user_ls_sync_scn_locally(
arg_.get_sys_ls_end_scn(),
log_ls_svr,
*ls))) {
LOG_WARN("fail to wait user ls sync scn locally", KR(ret), K(ls_id), K(arg_.get_sys_ls_end_scn()));
}

View File

@ -2311,6 +2311,71 @@ int ObRootUtils::check_ls_balance_and_commit_rs_job(
return ret;
}
ERRSIM_POINT_DEF(ERRSIM_USER_LS_SYNC_SCN);
int ObRootUtils::wait_user_ls_sync_scn_locally(
const share::SCN &sys_ls_target_scn,
logservice::ObLogService *log_ls_svr,
storage::ObLS &ls)
{
int ret = OB_SUCCESS;
logservice::ObLogHandler *log_handler = ls.get_log_handler();
transaction::ObKeepAliveLSHandler *keep_alive_handler = ls.get_keep_alive_ls_handler();
ObLSID ls_id = ls.get_ls_id();
uint64_t tenant_id = ls.get_tenant_id();
ObTimeoutCtx ctx;
if (OB_ISNULL(keep_alive_handler) || OB_ISNULL(log_handler ) || OB_ISNULL(log_ls_svr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("keep_alive_ls_handler, log_handler or ls_svr is null", KR(ret), K(ls_id),
KP(keep_alive_handler), KP(log_handler), KP(log_ls_svr));
} else if (OB_UNLIKELY(!sys_ls_target_scn.is_valid_and_not_min())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid sys_ls_target_scn", KR(ret), K(sys_ls_target_scn));
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
LOG_WARN("fail to set timeout", KR(ret));
} else {
bool need_retry = true;
share::SCN curr_end_scn;
curr_end_scn.set_min();
common::ObRole role;
int64_t leader_epoch = 0;
(void) keep_alive_handler->set_sys_ls_end_scn(sys_ls_target_scn);
do {
if (OB_UNLIKELY(ctx.is_timeouted())) {
ret = OB_TIMEOUT;
need_retry = false;
LOG_WARN("ctx timeout", KR(ret), K(ctx));
} else if (OB_FAIL(log_ls_svr->get_palf_role(ls_id, role, leader_epoch))) {
LOG_WARN("fail to get palf role", KR(ret), K(ls_id));
} else if (OB_UNLIKELY(!is_strong_leader(role))) {
ret = OB_NOT_MASTER;
LOG_WARN("ls on this server is not master", KR(ret), K(ls_id), K(role));
} else {
if (OB_FAIL(log_handler->get_end_scn(curr_end_scn))) {
LOG_WARN("fail to get ls end scn", KR(ret), K(ls_id));
} else {
curr_end_scn = ERRSIM_USER_LS_SYNC_SCN ? SCN::scn_dec(sys_ls_target_scn) : curr_end_scn;
LOG_TRACE("wait curr_end_scn >= sys_ls_target_scn", K(curr_end_scn), K(sys_ls_target_scn),
"is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false);
}
if (OB_SUCC(ret) && curr_end_scn >= sys_ls_target_scn) {
LOG_INFO("current user ls end scn >= sys ls target scn now", K(curr_end_scn),
K(sys_ls_target_scn), "is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false,
K(tenant_id), K(ls_id));
need_retry = false;
}
}
if (need_retry && OB_SUCC(ret)) {
ob_usleep(50 * 1000); // wait 50ms
}
} while (need_retry && OB_SUCC(ret));
if (OB_UNLIKELY(need_retry && OB_SUCC(ret))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the wait loop should not be terminated", KR(ret), K(curr_end_scn), K(sys_ls_target_scn));
}
}
return ret;
}
///////////////////////////////
ObClusterRole ObClusterInfoGetter::get_cluster_role_v2()

View File

@ -655,6 +655,18 @@ public:
const uint64_t tenant_id,
const int64_t rs_job_id,
const ObRsJobType rs_job_type);
// wait the given ls's end_scn be larger than or equal to sys_ls_target_scn
// @params[in]: sys_ls_target_scn
// @params[in]: log_ls_svr
// @params[in]: ls
// @ret OB_SUCCESS user_ls_sync_scn >= sys_ls_sync_scn
// @ret OB_NOT_MASTER the current replica is not leader, no need to wait.
// the rpc sender need to find the new leader and send rpc again
// @ret other error code failure
static int wait_user_ls_sync_scn_locally(
const share::SCN &sys_ls_target_scn,
logservice::ObLogService *log_ls_svr,
storage::ObLS &ls);
template<class T>
static int check_left_f_in_primary_zone(ObZoneManager &zone_mgr,

View File

@ -565,6 +565,7 @@ class ObString;
ACT(BEFORE_FETCH_SIMPLE_TABLES,)\
ACT(BEFORE_SEND_PARALLEL_CREATE_TABLE,)\
ACT(BEFORE_DROP_TENANT,)\
ACT(BEFORE_WAIT_SYS_LS_END_SCN,)\
ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);

View File

@ -22,7 +22,6 @@
#endif
#include "lib/mysqlclient/ob_isql_client.h"
#include "observer/omt/ob_tenant_config_mgr.h" // ObTenantConfigGuard
#include "storage/ls/ob_ls.h" //ObLS
namespace oceanbase
{
@ -447,61 +446,5 @@ bool ObShareUtil::is_tenant_enable_transfer(const uint64_t tenant_id)
return bret;
}
ERRSIM_POINT_DEF(ERRSIM_USER_LS_SYNC_SCN);
int ObShareUtil::wait_user_ls_sync_scn_locally(const share::SCN &sys_ls_target_scn, storage::ObLS &ls)
{
int ret = OB_SUCCESS;
logservice::ObLogHandler *log_handler = ls.get_log_handler();
transaction::ObKeepAliveLSHandler *keep_alive_handler = ls.get_keep_alive_ls_handler();
ObLSID ls_id = ls.get_ls_id();
uint64_t tenant_id = ls.get_tenant_id();
ObTimeoutCtx ctx;
if (OB_ISNULL(keep_alive_handler) || OB_ISNULL(log_handler )) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("keep_alive_ls_handler or log_handler is null", KR(ret), K(ls_id),
KP(keep_alive_handler), KP(log_handler));
} else if (OB_UNLIKELY(!sys_ls_target_scn.is_valid_and_not_min())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid sys_ls_target_scn", KR(ret), K(sys_ls_target_scn));
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
LOG_WARN("fail to set timeout", KR(ret));
} else {
bool need_retry = true;
share::SCN curr_end_scn;
curr_end_scn.set_min();
(void) keep_alive_handler->set_sys_ls_end_scn(sys_ls_target_scn);
do {
if (OB_UNLIKELY(ctx.is_timeouted())) {
ret = OB_TIMEOUT;
need_retry = false;
LOG_WARN("ctx timeout", KR(ret), K(ctx));
} else {
if (OB_FAIL(log_handler->get_end_scn(curr_end_scn))) {
LOG_WARN("fail to get ls end scn", KR(ret), K(ls_id));
} else {
// switchover to standby timeout
curr_end_scn = ERRSIM_USER_LS_SYNC_SCN ? SCN::scn_dec(sys_ls_target_scn) : curr_end_scn;
LOG_TRACE("wait curr_end_scn >= sys_ls_target_scn", K(curr_end_scn), K(sys_ls_target_scn),
"is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false);
}
if (OB_SUCC(ret) && curr_end_scn >= sys_ls_target_scn) {
LOG_INFO("current user ls end scn >= sys ls target scn now", K(curr_end_scn),
K(sys_ls_target_scn), "is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false,
K(tenant_id), K(ls_id));
need_retry = false;
}
}
if (need_retry) {
ob_usleep(50 * 1000); // wait 50ms
}
} while (need_retry && OB_SUCC(ret));
if (OB_UNLIKELY(need_retry && OB_SUCC(ret))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the wait loop should not be terminated", KR(ret), K(curr_end_scn), K(sys_ls_target_scn));
}
}
return ret;
}
} //end namespace share
} //end namespace oceanbase

View File

@ -21,10 +21,6 @@ namespace common
class ObTimeoutCtx;
class ObISQLClient;
}
namespace storage
{
class ObLS;
}
namespace share
{
@ -122,11 +118,6 @@ public:
const uint64_t tenant_id,
const ObSqlString &sql,
SCN &ora_rowscn);
// wait the given ls's end_scn be larger than or equal to sys_ls_target_scn
// @params[in]: sys_ls_target_scn
// @params[in]: ls
static int wait_user_ls_sync_scn_locally(const share::SCN &sys_ls_target_scn, storage::ObLS &ls);
static bool is_tenant_enable_rebalance(const uint64_t tenant_id);
static bool is_tenant_enable_transfer(const uint64_t tenant_id);
};