[CP] switchover to standby timeout
This commit is contained in:
@ -1641,13 +1641,14 @@ int ObRpcChangeLSAccessModeP::process()
|
|||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
ls_svr = MTL(ObLSService*);
|
ls_svr = MTL(ObLSService*);
|
||||||
|
logservice::ObLogService *log_ls_svr = MTL(logservice::ObLogService*);
|
||||||
ObLS *ls = nullptr;
|
ObLS *ls = nullptr;
|
||||||
ObLSID ls_id = arg_.get_ls_id();
|
ObLSID ls_id = arg_.get_ls_id();
|
||||||
ObLSHandle handle;
|
ObLSHandle handle;
|
||||||
logservice::ObLogHandler *log_handler = NULL;
|
logservice::ObLogHandler *log_handler = NULL;
|
||||||
if (OB_ISNULL(ls_svr)) {
|
if (OB_ISNULL(ls_svr) || OB_ISNULL(log_ls_svr)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
COMMON_LOG(ERROR, "mtl ObLSService should not be null", K(ret));
|
COMMON_LOG(ERROR, "mtl ObLSService or ObLogService should not be null", KR(ret), KP(ls_svr), KP(log_ls_svr));
|
||||||
} else if (OB_FAIL(ls_svr->get_ls(ls_id, handle, ObLSGetMod::OBSERVER_MOD))) {
|
} else if (OB_FAIL(ls_svr->get_ls(ls_id, handle, ObLSGetMod::OBSERVER_MOD))) {
|
||||||
COMMON_LOG(WARN, "get ls failed", KR(ret), K(ls_id));
|
COMMON_LOG(WARN, "get ls failed", KR(ret), K(ls_id));
|
||||||
} else if (OB_ISNULL(ls = handle.get_ls())) {
|
} else if (OB_ISNULL(ls = handle.get_ls())) {
|
||||||
@ -1659,11 +1660,13 @@ int ObRpcChangeLSAccessModeP::process()
|
|||||||
} else if (palf::AccessMode::RAW_WRITE == arg_.get_access_mode() && !ls_id.is_sys_ls()) {
|
} else if (palf::AccessMode::RAW_WRITE == arg_.get_access_mode() && !ls_id.is_sys_ls()) {
|
||||||
// switchover to standby
|
// switchover to standby
|
||||||
// user ls end scn should be larger than sys ls end scn at first
|
// user ls end scn should be larger than sys ls end scn at first
|
||||||
|
DEBUG_SYNC(BEFORE_WAIT_SYS_LS_END_SCN);
|
||||||
if (OB_UNLIKELY(!arg_.get_sys_ls_end_scn().is_valid_and_not_min())) {
|
if (OB_UNLIKELY(!arg_.get_sys_ls_end_scn().is_valid_and_not_min())) {
|
||||||
FLOG_WARN("invalid sys_ls_end_scn, no need to let user ls wait, "
|
FLOG_WARN("invalid sys_ls_end_scn, no need to let user ls wait, "
|
||||||
"the version might be smaller than V4.2.0", KR(ret), K(arg_.get_sys_ls_end_scn()));
|
"the version might be smaller than V4.2.0", KR(ret), K(arg_.get_sys_ls_end_scn()));
|
||||||
} else if (OB_FAIL(share::ObShareUtil::wait_user_ls_sync_scn_locally(
|
} else if (OB_FAIL(ObRootUtils::wait_user_ls_sync_scn_locally(
|
||||||
arg_.get_sys_ls_end_scn(),
|
arg_.get_sys_ls_end_scn(),
|
||||||
|
log_ls_svr,
|
||||||
*ls))) {
|
*ls))) {
|
||||||
LOG_WARN("fail to wait user ls sync scn locally", KR(ret), K(ls_id), K(arg_.get_sys_ls_end_scn()));
|
LOG_WARN("fail to wait user ls sync scn locally", KR(ret), K(ls_id), K(arg_.get_sys_ls_end_scn()));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2311,6 +2311,71 @@ int ObRootUtils::check_ls_balance_and_commit_rs_job(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ERRSIM_POINT_DEF(ERRSIM_USER_LS_SYNC_SCN);
|
||||||
|
int ObRootUtils::wait_user_ls_sync_scn_locally(
|
||||||
|
const share::SCN &sys_ls_target_scn,
|
||||||
|
logservice::ObLogService *log_ls_svr,
|
||||||
|
storage::ObLS &ls)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
logservice::ObLogHandler *log_handler = ls.get_log_handler();
|
||||||
|
transaction::ObKeepAliveLSHandler *keep_alive_handler = ls.get_keep_alive_ls_handler();
|
||||||
|
ObLSID ls_id = ls.get_ls_id();
|
||||||
|
uint64_t tenant_id = ls.get_tenant_id();
|
||||||
|
ObTimeoutCtx ctx;
|
||||||
|
if (OB_ISNULL(keep_alive_handler) || OB_ISNULL(log_handler ) || OB_ISNULL(log_ls_svr)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("keep_alive_ls_handler, log_handler or ls_svr is null", KR(ret), K(ls_id),
|
||||||
|
KP(keep_alive_handler), KP(log_handler), KP(log_ls_svr));
|
||||||
|
} else if (OB_UNLIKELY(!sys_ls_target_scn.is_valid_and_not_min())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid sys_ls_target_scn", KR(ret), K(sys_ls_target_scn));
|
||||||
|
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
|
||||||
|
LOG_WARN("fail to set timeout", KR(ret));
|
||||||
|
} else {
|
||||||
|
bool need_retry = true;
|
||||||
|
share::SCN curr_end_scn;
|
||||||
|
curr_end_scn.set_min();
|
||||||
|
common::ObRole role;
|
||||||
|
int64_t leader_epoch = 0;
|
||||||
|
(void) keep_alive_handler->set_sys_ls_end_scn(sys_ls_target_scn);
|
||||||
|
do {
|
||||||
|
if (OB_UNLIKELY(ctx.is_timeouted())) {
|
||||||
|
ret = OB_TIMEOUT;
|
||||||
|
need_retry = false;
|
||||||
|
LOG_WARN("ctx timeout", KR(ret), K(ctx));
|
||||||
|
} else if (OB_FAIL(log_ls_svr->get_palf_role(ls_id, role, leader_epoch))) {
|
||||||
|
LOG_WARN("fail to get palf role", KR(ret), K(ls_id));
|
||||||
|
} else if (OB_UNLIKELY(!is_strong_leader(role))) {
|
||||||
|
ret = OB_NOT_MASTER;
|
||||||
|
LOG_WARN("ls on this server is not master", KR(ret), K(ls_id), K(role));
|
||||||
|
} else {
|
||||||
|
if (OB_FAIL(log_handler->get_end_scn(curr_end_scn))) {
|
||||||
|
LOG_WARN("fail to get ls end scn", KR(ret), K(ls_id));
|
||||||
|
} else {
|
||||||
|
curr_end_scn = ERRSIM_USER_LS_SYNC_SCN ? SCN::scn_dec(sys_ls_target_scn) : curr_end_scn;
|
||||||
|
LOG_TRACE("wait curr_end_scn >= sys_ls_target_scn", K(curr_end_scn), K(sys_ls_target_scn),
|
||||||
|
"is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false);
|
||||||
|
}
|
||||||
|
if (OB_SUCC(ret) && curr_end_scn >= sys_ls_target_scn) {
|
||||||
|
LOG_INFO("current user ls end scn >= sys ls target scn now", K(curr_end_scn),
|
||||||
|
K(sys_ls_target_scn), "is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false,
|
||||||
|
K(tenant_id), K(ls_id));
|
||||||
|
need_retry = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (need_retry && OB_SUCC(ret)) {
|
||||||
|
ob_usleep(50 * 1000); // wait 50ms
|
||||||
|
}
|
||||||
|
} while (need_retry && OB_SUCC(ret));
|
||||||
|
if (OB_UNLIKELY(need_retry && OB_SUCC(ret))) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("the wait loop should not be terminated", KR(ret), K(curr_end_scn), K(sys_ls_target_scn));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
///////////////////////////////
|
///////////////////////////////
|
||||||
|
|
||||||
ObClusterRole ObClusterInfoGetter::get_cluster_role_v2()
|
ObClusterRole ObClusterInfoGetter::get_cluster_role_v2()
|
||||||
|
|||||||
@ -655,6 +655,18 @@ public:
|
|||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const int64_t rs_job_id,
|
const int64_t rs_job_id,
|
||||||
const ObRsJobType rs_job_type);
|
const ObRsJobType rs_job_type);
|
||||||
|
// wait the given ls's end_scn be larger than or equal to sys_ls_target_scn
|
||||||
|
// @params[in]: sys_ls_target_scn
|
||||||
|
// @params[in]: log_ls_svr
|
||||||
|
// @params[in]: ls
|
||||||
|
// @ret OB_SUCCESS user_ls_sync_scn >= sys_ls_sync_scn
|
||||||
|
// @ret OB_NOT_MASTER the current replica is not leader, no need to wait.
|
||||||
|
// the rpc sender need to find the new leader and send rpc again
|
||||||
|
// @ret other error code failure
|
||||||
|
static int wait_user_ls_sync_scn_locally(
|
||||||
|
const share::SCN &sys_ls_target_scn,
|
||||||
|
logservice::ObLogService *log_ls_svr,
|
||||||
|
storage::ObLS &ls);
|
||||||
|
|
||||||
template<class T>
|
template<class T>
|
||||||
static int check_left_f_in_primary_zone(ObZoneManager &zone_mgr,
|
static int check_left_f_in_primary_zone(ObZoneManager &zone_mgr,
|
||||||
|
|||||||
@ -565,6 +565,7 @@ class ObString;
|
|||||||
ACT(BEFORE_FETCH_SIMPLE_TABLES,)\
|
ACT(BEFORE_FETCH_SIMPLE_TABLES,)\
|
||||||
ACT(BEFORE_SEND_PARALLEL_CREATE_TABLE,)\
|
ACT(BEFORE_SEND_PARALLEL_CREATE_TABLE,)\
|
||||||
ACT(BEFORE_DROP_TENANT,)\
|
ACT(BEFORE_DROP_TENANT,)\
|
||||||
|
ACT(BEFORE_WAIT_SYS_LS_END_SCN,)\
|
||||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||||
|
|
||||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||||
|
|||||||
@ -22,7 +22,6 @@
|
|||||||
#endif
|
#endif
|
||||||
#include "lib/mysqlclient/ob_isql_client.h"
|
#include "lib/mysqlclient/ob_isql_client.h"
|
||||||
#include "observer/omt/ob_tenant_config_mgr.h" // ObTenantConfigGuard
|
#include "observer/omt/ob_tenant_config_mgr.h" // ObTenantConfigGuard
|
||||||
#include "storage/ls/ob_ls.h" //ObLS
|
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -447,61 +446,5 @@ bool ObShareUtil::is_tenant_enable_transfer(const uint64_t tenant_id)
|
|||||||
return bret;
|
return bret;
|
||||||
}
|
}
|
||||||
|
|
||||||
ERRSIM_POINT_DEF(ERRSIM_USER_LS_SYNC_SCN);
|
|
||||||
int ObShareUtil::wait_user_ls_sync_scn_locally(const share::SCN &sys_ls_target_scn, storage::ObLS &ls)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
logservice::ObLogHandler *log_handler = ls.get_log_handler();
|
|
||||||
transaction::ObKeepAliveLSHandler *keep_alive_handler = ls.get_keep_alive_ls_handler();
|
|
||||||
ObLSID ls_id = ls.get_ls_id();
|
|
||||||
uint64_t tenant_id = ls.get_tenant_id();
|
|
||||||
ObTimeoutCtx ctx;
|
|
||||||
if (OB_ISNULL(keep_alive_handler) || OB_ISNULL(log_handler )) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("keep_alive_ls_handler or log_handler is null", KR(ret), K(ls_id),
|
|
||||||
KP(keep_alive_handler), KP(log_handler));
|
|
||||||
} else if (OB_UNLIKELY(!sys_ls_target_scn.is_valid_and_not_min())) {
|
|
||||||
ret = OB_INVALID_ARGUMENT;
|
|
||||||
LOG_WARN("invalid sys_ls_target_scn", KR(ret), K(sys_ls_target_scn));
|
|
||||||
} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
|
|
||||||
LOG_WARN("fail to set timeout", KR(ret));
|
|
||||||
} else {
|
|
||||||
bool need_retry = true;
|
|
||||||
share::SCN curr_end_scn;
|
|
||||||
curr_end_scn.set_min();
|
|
||||||
(void) keep_alive_handler->set_sys_ls_end_scn(sys_ls_target_scn);
|
|
||||||
do {
|
|
||||||
if (OB_UNLIKELY(ctx.is_timeouted())) {
|
|
||||||
ret = OB_TIMEOUT;
|
|
||||||
need_retry = false;
|
|
||||||
LOG_WARN("ctx timeout", KR(ret), K(ctx));
|
|
||||||
} else {
|
|
||||||
if (OB_FAIL(log_handler->get_end_scn(curr_end_scn))) {
|
|
||||||
LOG_WARN("fail to get ls end scn", KR(ret), K(ls_id));
|
|
||||||
} else {
|
|
||||||
// switchover to standby timeout
|
|
||||||
curr_end_scn = ERRSIM_USER_LS_SYNC_SCN ? SCN::scn_dec(sys_ls_target_scn) : curr_end_scn;
|
|
||||||
LOG_TRACE("wait curr_end_scn >= sys_ls_target_scn", K(curr_end_scn), K(sys_ls_target_scn),
|
|
||||||
"is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false);
|
|
||||||
}
|
|
||||||
if (OB_SUCC(ret) && curr_end_scn >= sys_ls_target_scn) {
|
|
||||||
LOG_INFO("current user ls end scn >= sys ls target scn now", K(curr_end_scn),
|
|
||||||
K(sys_ls_target_scn), "is_errsim_opened", ERRSIM_USER_LS_SYNC_SCN ? true : false,
|
|
||||||
K(tenant_id), K(ls_id));
|
|
||||||
need_retry = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (need_retry) {
|
|
||||||
ob_usleep(50 * 1000); // wait 50ms
|
|
||||||
}
|
|
||||||
} while (need_retry && OB_SUCC(ret));
|
|
||||||
if (OB_UNLIKELY(need_retry && OB_SUCC(ret))) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("the wait loop should not be terminated", KR(ret), K(curr_end_scn), K(sys_ls_target_scn));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
} //end namespace share
|
} //end namespace share
|
||||||
} //end namespace oceanbase
|
} //end namespace oceanbase
|
||||||
|
|||||||
@ -21,10 +21,6 @@ namespace common
|
|||||||
class ObTimeoutCtx;
|
class ObTimeoutCtx;
|
||||||
class ObISQLClient;
|
class ObISQLClient;
|
||||||
}
|
}
|
||||||
namespace storage
|
|
||||||
{
|
|
||||||
class ObLS;
|
|
||||||
}
|
|
||||||
namespace share
|
namespace share
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -122,11 +118,6 @@ public:
|
|||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const ObSqlString &sql,
|
const ObSqlString &sql,
|
||||||
SCN &ora_rowscn);
|
SCN &ora_rowscn);
|
||||||
// wait the given ls's end_scn be larger than or equal to sys_ls_target_scn
|
|
||||||
// @params[in]: sys_ls_target_scn
|
|
||||||
// @params[in]: ls
|
|
||||||
static int wait_user_ls_sync_scn_locally(const share::SCN &sys_ls_target_scn, storage::ObLS &ls);
|
|
||||||
|
|
||||||
static bool is_tenant_enable_rebalance(const uint64_t tenant_id);
|
static bool is_tenant_enable_rebalance(const uint64_t tenant_id);
|
||||||
static bool is_tenant_enable_transfer(const uint64_t tenant_id);
|
static bool is_tenant_enable_transfer(const uint64_t tenant_id);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user