在备租户同步到普通日志流创建后,尽早返回创建租户命令
This commit is contained in:
@ -137,7 +137,6 @@ int ObCreateStandbyFromNetActor::check_has_user_ls(const uint64_t tenant_id, com
|
|||||||
int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
|
int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObLSRecoveryStat recovery_stat;
|
|
||||||
ObLSRecoveryStatOperator ls_recovery_operator;
|
ObLSRecoveryStatOperator ls_recovery_operator;
|
||||||
rootserver::ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
|
rootserver::ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
|
||||||
obrpc::ObCreateTenantEndArg arg;
|
obrpc::ObCreateTenantEndArg arg;
|
||||||
@ -145,29 +144,30 @@ int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
|
|||||||
arg.tenant_id_ = tenant_id_;
|
arg.tenant_id_ = tenant_id_;
|
||||||
obrpc::ObCommonRpcProxy *rs_rpc_proxy = GCTX.rs_rpc_proxy_;
|
obrpc::ObCommonRpcProxy *rs_rpc_proxy = GCTX.rs_rpc_proxy_;
|
||||||
ObAllTenantInfo tenant_info;
|
ObAllTenantInfo tenant_info;
|
||||||
|
SCN min_user_ls_create_scn = SCN::base_scn();
|
||||||
|
|
||||||
if (OB_ISNULL(rs_rpc_proxy)) {
|
if (OB_ISNULL(rs_rpc_proxy)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
WSTAT("rs_rpc_proxy is null", KP(rs_rpc_proxy));
|
WSTAT("rs_rpc_proxy is null", KP(rs_rpc_proxy));
|
||||||
} else if (OB_FAIL(check_inner_stat_())) {
|
} else if (OB_FAIL(check_inner_stat_())) {
|
||||||
WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
|
WSTAT("error unexpected", KR(ret), K(tenant_id_), KP(sql_proxy_));
|
||||||
} else if (OB_FAIL(ls_recovery_operator.get_ls_recovery_stat(tenant_id_, SYS_LS,
|
} else if (OB_FAIL(ls_recovery_operator.get_tenant_min_user_ls_create_scn(tenant_id_, *sql_proxy_,
|
||||||
false/*for_update*/, recovery_stat, *sql_proxy_))) {
|
min_user_ls_create_scn))) {
|
||||||
WSTAT("failed to get ls recovery stat", KR(ret), K_(tenant_id));
|
WSTAT("failed to get tenant min_user_ls_create_scn", KR(ret), K_(tenant_id));
|
||||||
} else if (OB_ISNULL(tenant_info_loader)) {
|
} else if (OB_ISNULL(tenant_info_loader)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
WSTAT("tenant info loader should not be NULL", KR(ret), KP(tenant_info_loader));
|
WSTAT("tenant info loader should not be NULL", KR(ret), KP(tenant_info_loader));
|
||||||
} else {
|
} else {
|
||||||
ISTAT("start to wait whether can finish restore", K_(tenant_id), K(recovery_stat));
|
ISTAT("start to wait whether can finish restore", K_(tenant_id), K(min_user_ls_create_scn));
|
||||||
// wait 1 minute, sleep 1s and retry 60 times
|
// wait 1 minute, sleep 1s and retry 60 times
|
||||||
for (int64_t retry_cnt = 60; OB_SUCC(ret) && retry_cnt > 0 && !has_set_stop(); --retry_cnt) {
|
for (int64_t retry_cnt = 60; OB_SUCC(ret) && retry_cnt > 0 && !has_set_stop(); --retry_cnt) {
|
||||||
if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
|
if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
|
||||||
WSTAT("failed to get tenant info", KR(ret));
|
WSTAT("failed to get tenant info", KR(ret));
|
||||||
} else if (tenant_info.get_standby_scn() >= recovery_stat.get_sync_scn()) {
|
} else if (tenant_info.get_standby_scn() >= min_user_ls_create_scn) {
|
||||||
ISTAT("tenant readable scn can read inner table", K(tenant_info), K(recovery_stat));
|
ISTAT("tenant readable scn can read inner table", K(tenant_info), K(min_user_ls_create_scn));
|
||||||
|
|
||||||
if (OB_FAIL(ObRestoreService::reset_schema_status(tenant_id_, sql_proxy_))) {
|
if (OB_FAIL(ObRestoreService::reset_schema_status(tenant_id_, sql_proxy_))) {
|
||||||
WSTAT("failed to reset schema status", KR(ret), K_(tenant_id), K(tenant_info), K(recovery_stat));
|
WSTAT("failed to reset schema status", KR(ret), K_(tenant_id), K(tenant_info), K(min_user_ls_create_scn));
|
||||||
} else if (OB_FAIL(rs_rpc_proxy->create_tenant_end(arg))) {
|
} else if (OB_FAIL(rs_rpc_proxy->create_tenant_end(arg))) {
|
||||||
WSTAT("fail to execute create tenant end", KR(ret), K_(tenant_id), K(arg));
|
WSTAT("fail to execute create tenant end", KR(ret), K_(tenant_id), K(arg));
|
||||||
} else {
|
} else {
|
||||||
@ -179,7 +179,7 @@ int ObCreateStandbyFromNetActor::finish_restore_if_possible_()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ISTAT("finish_restore_if_possible", K(ret), K_(tenant_id), K(recovery_stat), K(arg), K(tenant_info));
|
ISTAT("finish_restore_if_possible", K(ret), K_(tenant_id), K(min_user_ls_create_scn), K(arg), K(tenant_info));
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -732,6 +732,7 @@ int ObRecoveryLSService::create_new_ls_(const share::ObLSAttr &ls_attr,
|
|||||||
LOG_WARN("ls not create pre operation", KR(ret), K(ls_attr));
|
LOG_WARN("ls not create pre operation", KR(ret), K(ls_attr));
|
||||||
} else {
|
} else {
|
||||||
//create new ls;
|
//create new ls;
|
||||||
|
DEBUG_SYNC(BEFORE_RECOVER_USER_LS);
|
||||||
share::schema::ObSchemaGetterGuard schema_guard;
|
share::schema::ObSchemaGetterGuard schema_guard;
|
||||||
const share::schema::ObTenantSchema *tenant_schema = NULL;
|
const share::schema::ObTenantSchema *tenant_schema = NULL;
|
||||||
if (OB_ISNULL(GCTX.schema_service_)) {
|
if (OB_ISNULL(GCTX.schema_service_)) {
|
||||||
|
|||||||
@ -486,6 +486,7 @@ int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id,
|
|||||||
// update last_sql_update_time_ after sql refresh
|
// update last_sql_update_time_ after sql refresh
|
||||||
last_sql_update_time_ = new_refresh_time_us;
|
last_sql_update_time_ = new_refresh_time_us;
|
||||||
} else {
|
} else {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
LOG_WARN("refresh tenant info conflict", K(new_tenant_info), K(new_refresh_time_us),
|
LOG_WARN("refresh tenant info conflict", K(new_tenant_info), K(new_refresh_time_us),
|
||||||
K(tenant_id), K(tenant_info_), K(last_sql_update_time_), K(ora_rowscn_), K(ora_rowscn));
|
K(tenant_id), K(tenant_info_), K(last_sql_update_time_), K(ora_rowscn_), K(ora_rowscn));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -385,6 +385,69 @@ int ObLSRecoveryStatOperator::get_tenant_recovery_stat(const uint64_t tenant_id,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObLSRecoveryStatOperator::get_tenant_min_user_ls_create_scn(const uint64_t tenant_id,
|
||||||
|
ObISQLClient &client,
|
||||||
|
SCN &min_user_ls_create_scn)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
min_user_ls_create_scn = SCN::base_scn();
|
||||||
|
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid_argument", KR(ret), K(tenant_id));
|
||||||
|
} else {
|
||||||
|
common::ObSqlString sql;
|
||||||
|
const uint64_t exec_tenant_id = get_exec_tenant_id(tenant_id);
|
||||||
|
ObSEArray<ObLSRecoveryStat, 1> ls_recovery_array;
|
||||||
|
if (OB_FAIL(sql.assign_fmt(
|
||||||
|
"select min(create_scn) as min_create_scn from %s where tenant_id = %lu and ls_id != %ld",
|
||||||
|
OB_ALL_LS_RECOVERY_STAT_TNAME, tenant_id, SYS_LS.id()))) {
|
||||||
|
LOG_WARN("failed to assign sql", KR(ret), K(sql), K(tenant_id));
|
||||||
|
} else if (OB_FAIL(get_min_create_scn_(tenant_id, sql, client, min_user_ls_create_scn))) {
|
||||||
|
LOG_WARN("failed to get min_create_scn", KR(ret), K(tenant_id), K(sql));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObLSRecoveryStatOperator::get_min_create_scn_(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const common::ObSqlString &sql,
|
||||||
|
ObISQLClient &client,
|
||||||
|
SCN &min_create_scn)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
min_create_scn = SCN::base_scn();
|
||||||
|
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid_argument", KR(ret), K(tenant_id));
|
||||||
|
} else {
|
||||||
|
const uint64_t exec_tenant_id = get_exec_tenant_id(tenant_id);
|
||||||
|
HEAP_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||||
|
common::sqlclient::ObMySQLResult *result = NULL;
|
||||||
|
if (OB_FAIL(client.read(res, exec_tenant_id, sql.ptr()))) {
|
||||||
|
LOG_WARN("failed to read", KR(ret), K(exec_tenant_id), K(tenant_id), K(sql));
|
||||||
|
} else if (OB_ISNULL(result = res.get_result())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("failed to get sql result", KR(ret));
|
||||||
|
} else if (OB_FAIL(result->next())) {
|
||||||
|
LOG_WARN("failed to get next", KR(ret), K(sql));
|
||||||
|
} else if (OB_ISNULL(result)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("result is null", KR(ret), K(tenant_id), K(exec_tenant_id), K(sql));
|
||||||
|
} else {
|
||||||
|
uint64_t min_create_scn_val = OB_INVALID_SCN_VAL;
|
||||||
|
EXTRACT_UINT_FIELD_MYSQL(*result, "min_create_scn", min_create_scn_val, uint64_t);
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
LOG_WARN("failed to get min_create_scn", KR(ret), K(tenant_id), K(exec_tenant_id), K(sql));
|
||||||
|
} else if (OB_FAIL(min_create_scn.convert_for_inner_table_field(min_create_scn_val))) {
|
||||||
|
LOG_WARN("failed to convert_for_inner_table_field", KR(ret), K(min_create_scn_val), K(tenant_id), K(exec_tenant_id), K(sql));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObLSRecoveryStatOperator::get_tenant_max_sync_scn(const uint64_t tenant_id,
|
int ObLSRecoveryStatOperator::get_tenant_max_sync_scn(const uint64_t tenant_id,
|
||||||
ObISQLClient &client,
|
ObISQLClient &client,
|
||||||
SCN &max_sync_scn)
|
SCN &max_sync_scn)
|
||||||
|
|||||||
@ -206,6 +206,18 @@ public:
|
|||||||
SCN &sync_scn,
|
SCN &sync_scn,
|
||||||
SCN &min_wrs);
|
SCN &min_wrs);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @description:
|
||||||
|
* get tenant min user ls create scn
|
||||||
|
* @param[in] tenant_id
|
||||||
|
* @param[in] client
|
||||||
|
* @param[out] min_user_ls_create_scn
|
||||||
|
* @return return code
|
||||||
|
*/
|
||||||
|
int get_tenant_min_user_ls_create_scn(const uint64_t tenant_id,
|
||||||
|
ObISQLClient &client,
|
||||||
|
SCN &min_user_ls_create_scn);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* description: get tenant max sync_scn across all ls
|
* description: get tenant max sync_scn across all ls
|
||||||
* @param[in] tenant_id
|
* @param[in] tenant_id
|
||||||
@ -226,6 +238,12 @@ public:
|
|||||||
ObISQLClient &client,
|
ObISQLClient &client,
|
||||||
SCN &sync_scn);
|
SCN &sync_scn);
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
int get_min_create_scn_(const uint64_t tenant_id,
|
||||||
|
const common::ObSqlString &sql,
|
||||||
|
ObISQLClient &client,
|
||||||
|
SCN &min_create_scn);
|
||||||
|
|
||||||
bool need_update_ls_recovery_(const ObLSRecoveryStat &old_recovery,
|
bool need_update_ls_recovery_(const ObLSRecoveryStat &old_recovery,
|
||||||
const ObLSRecoveryStat &new_recovery);
|
const ObLSRecoveryStat &new_recovery);
|
||||||
int get_all_ls_recovery_stat_(const uint64_t tenant_id,
|
int get_all_ls_recovery_stat_(const uint64_t tenant_id,
|
||||||
|
|||||||
@ -396,6 +396,7 @@ class ObString;
|
|||||||
ACT(BEFORE_DO_FLASHBACK,)\
|
ACT(BEFORE_DO_FLASHBACK,)\
|
||||||
ACT(PREPARE_FLASHBACK_FOR_SWITCH_TO_PRIMARY,)\
|
ACT(PREPARE_FLASHBACK_FOR_SWITCH_TO_PRIMARY,)\
|
||||||
ACT(SWITCHING_TO_STANDBY,)\
|
ACT(SWITCHING_TO_STANDBY,)\
|
||||||
|
ACT(BEFORE_RECOVER_USER_LS,)\
|
||||||
ACT(BEFORE_PREPARE_FLASHBACK,)\
|
ACT(BEFORE_PREPARE_FLASHBACK,)\
|
||||||
ACT(BEFORE_LS_RESTORE_SYS_TABLETS,)\
|
ACT(BEFORE_LS_RESTORE_SYS_TABLETS,)\
|
||||||
ACT(BEFORE_WAIT_RESTORE_SYS_TABLETS,)\
|
ACT(BEFORE_WAIT_RESTORE_SYS_TABLETS,)\
|
||||||
|
|||||||
Reference in New Issue
Block a user