support check ls restore status when creating standby tenant
This commit is contained in:
@ -23,10 +23,12 @@
|
||||
#include "share/ls/ob_ls_recovery_stat_operator.h"// ObLSRecoveryStatOperator
|
||||
#include "share/ls/ob_ls_life_manager.h" //ObLSLifeAgentManager
|
||||
#include "share/ls/ob_ls_operator.h" //ObLSAttr
|
||||
#include "storage/tx/ob_timestamp_service.h" // ObTimestampService
|
||||
#include "share/schema/ob_multi_version_schema_service.h" // for GSCHEMASERVICE
|
||||
#include "share/ob_standby_upgrade.h" // ObStandbyUpgrade
|
||||
#include "share/backup/ob_backup_config.h" // ObBackupConfigParserMgr
|
||||
#include "observer/ob_inner_sql_connection.h"//ObInnerSQLConnection
|
||||
#include "storage/tx/ob_trans_service.h" //ObTransService
|
||||
#include "storage/tx/ob_timestamp_service.h" // ObTimestampService
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -679,5 +681,183 @@ int ObPrimaryStandbyService::write_upgrade_barrier_log(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPrimaryStandbyService::check_can_create_standby_tenant(
|
||||
const common::ObString &log_restore_source,
|
||||
ObCompatibilityMode &compat_mode)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t start_ts = ObTimeUtility::current_time();
|
||||
share::ObBackupConfigParserMgr config_parser_mgr;
|
||||
common::ObSqlString name;
|
||||
common::ObSqlString value;
|
||||
compat_mode = ObCompatibilityMode::OCEANBASE_MODE;
|
||||
|
||||
if (OB_FAIL(check_inner_stat_())) {
|
||||
LOG_WARN("inner stat error", KR(ret), K_(inited));
|
||||
} else if (OB_FAIL(name.assign("log_restore_source"))) {
|
||||
LOG_WARN("assign sql failed", KR(ret));
|
||||
} else if (OB_FAIL(value.assign(log_restore_source))) {
|
||||
LOG_WARN("fail to assign value", KR(ret), K(log_restore_source));
|
||||
} else if (OB_FAIL(config_parser_mgr.init(name, value, 1002 /* fake_user_tenant_id */))) {
|
||||
LOG_WARN("fail to init backup config parser mgr", KR(ret), K(name), K(value));
|
||||
} else if (OB_FAIL(config_parser_mgr.only_check_before_update(compat_mode))) {
|
||||
LOG_WARN("fail to only_check_before_update", KR(ret), K(name), K(value));
|
||||
}
|
||||
|
||||
LOG_INFO("[CREATE STANDBY TENANT] check can create standby tenant", KR(ret), K(log_restore_source),
|
||||
K(compat_mode), "cost", ObTimeUtility::current_time() - start_ts);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPrimaryStandbyService::wait_create_standby_tenant_end(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t start_ts = ObTimeUtility::current_time();
|
||||
bool is_dropped = false;
|
||||
const uint64_t user_tenant_id = gen_user_tenant_id(tenant_id);
|
||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||
int64_t user_schema_version = OB_INVALID_VERSION;
|
||||
int64_t meta_schema_version = OB_INVALID_VERSION;
|
||||
const ObSimpleTenantSchema *tenant_schema = nullptr;
|
||||
|
||||
if (OB_FAIL(check_inner_stat_())) {
|
||||
LOG_WARN("inner stat error", KR(ret), K_(inited));
|
||||
} else if (!is_user_tenant(tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
while (OB_SUCC(ret)) {
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
if (THIS_WORKER.is_timeout()) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("failed to wait creating standby tenant end", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(meta_tenant_id, is_dropped))) {
|
||||
LOG_WARN("meta tenant has been dropped", KR(ret), K(meta_tenant_id));
|
||||
} else if (is_dropped) {
|
||||
ret = OB_TENANT_HAS_BEEN_DROPPED;
|
||||
LOG_WARN("meta tenant has been dropped", KR(ret), K(meta_tenant_id));
|
||||
} else if (OB_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(user_tenant_id, is_dropped))) {
|
||||
LOG_WARN("user tenant has been dropped", KR(ret), K(user_tenant_id));
|
||||
} else if (is_dropped) {
|
||||
ret = OB_TENANT_HAS_BEEN_DROPPED;
|
||||
LOG_WARN("user tenant has been dropped", KR(ret), K(user_tenant_id));
|
||||
} else if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
|
||||
LOG_WARN("failed to get schema guard", KR(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_tenant_info(user_tenant_id, tenant_schema))) {
|
||||
LOG_WARN("failed to get tenant info", KR(ret), K(user_tenant_id));
|
||||
} else if (OB_ISNULL(tenant_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tenant_schema is null", KR(ret), K(user_tenant_id));
|
||||
} else if (tenant_schema->is_creating_standby_tenant_status()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(check_ls_restore_status_(user_tenant_id))) {
|
||||
LOG_WARN("failed to check ls restore_status", KR(ret), KR(tmp_ret), K(user_tenant_id));
|
||||
if (OB_CREATE_STANDBY_TENANT_FAILED == tmp_ret) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
}
|
||||
}
|
||||
} else if (tenant_schema->is_normal()) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(GSCHEMASERVICE.get_tenant_refreshed_schema_version(
|
||||
meta_tenant_id, meta_schema_version))) {
|
||||
if (OB_ENTRY_NOT_EXIST != tmp_ret) {
|
||||
ret = tmp_ret;
|
||||
LOG_WARN("get refreshed schema version failed", KR(ret), K(meta_tenant_id));
|
||||
}
|
||||
} else if (OB_TMP_FAIL(GSCHEMASERVICE.get_tenant_refreshed_schema_version(
|
||||
user_tenant_id, user_schema_version))) {
|
||||
if (OB_ENTRY_NOT_EXIST != tmp_ret) {
|
||||
ret = tmp_ret;
|
||||
LOG_WARN("get refreshed schema version failed", KR(ret), K(user_tenant_id));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (ObSchemaService::is_formal_version(meta_schema_version)
|
||||
&& ObSchemaService::is_formal_version(user_schema_version)) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected tenant status", KR(ret), K(user_tenant_id), K(meta_tenant_id), KPC(tenant_schema));
|
||||
LOG_USER_ERROR(OB_ERR_UNEXPECTED, "unexpected tenant status, create standby tenant failed");
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else {
|
||||
LOG_INFO("[CREATE STANDBY TENANT] wait create standby tenant end", K(tenant_id), K(user_tenant_id),
|
||||
K(meta_tenant_id), K(meta_schema_version), K(user_schema_version), KPC(tenant_schema));
|
||||
ob_usleep(1000 * 1000L); // 1s
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG_INFO("[CREATE STANDBY TENANT] finish to wait create standby tenant end", KR(ret), K(tenant_id),
|
||||
"cost", ObTimeUtility::current_time() - start_ts, K(user_tenant_id),
|
||||
K(meta_tenant_id), K(meta_schema_version), K(user_schema_version), KPC(tenant_schema));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPrimaryStandbyService::check_ls_restore_status_(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMySQLProxy *sql_proxy = GCTX.sql_proxy_;
|
||||
|
||||
if (OB_FAIL(check_inner_stat_())) {
|
||||
LOG_WARN("inner stat error", KR(ret), K_(inited));
|
||||
} else if (OB_ISNULL(sql_proxy)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql_proxy is null", KR(ret), K(tenant_id));
|
||||
} else if (!is_user_tenant(tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
|
||||
ObSqlString sql;
|
||||
if (OB_FAIL(sql.assign_fmt("SELECT LS_ID, SYNC_STATUS, SYNC_SCN FROM %s WHERE TENANT_ID = '%lu' and SYNC_STATUS !='NORMAL' and ls_id = %ld",
|
||||
OB_V_OB_LS_LOG_RESTORE_STATUS_TNAME, tenant_id, SYS_LS.id()))) {
|
||||
LOG_WARN("fail to generate sql", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql_proxy->read(result, sql.ptr()))) {
|
||||
LOG_WARN("read V$OB_LS_LOG_RESTORE_STATUS failed", KR(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result is null", KR(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_FAIL(result.get_result()->next())) {
|
||||
if (OB_ITER_END == ret) {
|
||||
// there isn't abnormal ls
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("get result next failed", KR(ret), K(tenant_id), K(sql));
|
||||
}
|
||||
} else if (OB_ISNULL(result.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("result is null", KR(ret), K(tenant_id), K(sql));
|
||||
} else {
|
||||
int64_t ls_id = 0;
|
||||
ObString sync_status_str;
|
||||
SCN sync_scn;
|
||||
uint64_t sync_scn_val = OB_INVALID_SCN_VAL;
|
||||
EXTRACT_INT_FIELD_MYSQL(*result.get_result(), "LS_ID", ls_id, int64_t);
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result.get_result(), "SYNC_STATUS", sync_status_str);
|
||||
EXTRACT_UINT_FIELD_MYSQL(*result.get_result(), "SYNC_SCN", sync_scn_val, uint64_t);
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to get result", KR(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_FAIL(sync_scn.convert_for_inner_table_field(sync_scn_val))) {
|
||||
LOG_WARN("failed to convert_for_inner_table_field", KR(ret), K(sync_scn_val));
|
||||
} else {
|
||||
LOG_WARN("get LS restore status", KR(ret), K(tenant_id), K(ls_id), K(sync_status_str), K(sync_scn), K(sql));
|
||||
ret = OB_CREATE_STANDBY_TENANT_FAILED;
|
||||
std::string fail_reason = "SYS LS sync status is abnormal: "
|
||||
+ std::string(sync_status_str.ptr(), sync_status_str.length())
|
||||
+ ", please check V$OB_LS_LOG_RESTORE_STATUS";
|
||||
LOG_USER_ERROR(OB_CREATE_STANDBY_TENANT_FAILED, fail_reason.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user