fix restore tenant sql does not exit after creating tenant failed
This commit is contained in:
@ -8019,11 +8019,11 @@ int ObRootService::admin_rolling_upgrade_cmd(const obrpc::ObAdminRollingUpgradeA
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObRootService::physical_restore_tenant(const obrpc::ObPhysicalRestoreTenantArg &arg)
|
int ObRootService::physical_restore_tenant(const obrpc::ObPhysicalRestoreTenantArg &arg, obrpc::Int64 &res_job_id)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool has_standby_cluster = false;
|
bool has_standby_cluster = false;
|
||||||
|
res_job_id = OB_INVALID_ID;
|
||||||
int64_t current_timestamp = ObTimeUtility::current_time();
|
int64_t current_timestamp = ObTimeUtility::current_time();
|
||||||
int64_t start_ts = ObTimeUtility::current_time();
|
int64_t start_ts = ObTimeUtility::current_time();
|
||||||
int64_t job_id = OB_INVALID_ID;
|
int64_t job_id = OB_INVALID_ID;
|
||||||
@ -8075,6 +8075,7 @@ int ObRootService::physical_restore_tenant(const obrpc::ObPhysicalRestoreTenantA
|
|||||||
LOG_WARN("fail to fill physical restore job", K(ret), K(job_id), K(arg));
|
LOG_WARN("fail to fill physical restore job", K(ret), K(job_id), K(arg));
|
||||||
} else {
|
} else {
|
||||||
job_info.set_restore_start_ts(start_ts);
|
job_info.set_restore_start_ts(start_ts);
|
||||||
|
res_job_id = job_id;
|
||||||
}
|
}
|
||||||
if (FAILEDx(check_restore_tenant_valid(job_info, schema_guard))) {
|
if (FAILEDx(check_restore_tenant_valid(job_info, schema_guard))) {
|
||||||
LOG_WARN("failed to check restore tenant vailid", KR(ret), K(job_info));
|
LOG_WARN("failed to check restore tenant vailid", KR(ret), K(job_info));
|
||||||
|
|||||||
@ -692,7 +692,7 @@ public:
|
|||||||
int admin_set_tracepoint(const obrpc::ObAdminSetTPArg &arg);
|
int admin_set_tracepoint(const obrpc::ObAdminSetTPArg &arg);
|
||||||
int admin_set_backup_config(const obrpc::ObAdminSetConfigArg &arg);
|
int admin_set_backup_config(const obrpc::ObAdminSetConfigArg &arg);
|
||||||
/* physical restore */
|
/* physical restore */
|
||||||
int physical_restore_tenant(const obrpc::ObPhysicalRestoreTenantArg &arg);
|
int physical_restore_tenant(const obrpc::ObPhysicalRestoreTenantArg &arg, obrpc::Int64 &job_id);
|
||||||
int check_restore_tenant_valid(const share::ObPhysicalRestoreJob &job_info,
|
int check_restore_tenant_valid(const share::ObPhysicalRestoreJob &job_info,
|
||||||
share::schema::ObSchemaGetterGuard &guard);
|
share::schema::ObSchemaGetterGuard &guard);
|
||||||
int rebuild_index_in_restore(const obrpc::ObRebuildIndexInRestoreArg &arg);
|
int rebuild_index_in_restore(const obrpc::ObRebuildIndexInRestoreArg &arg);
|
||||||
|
|||||||
@ -498,7 +498,7 @@ DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_HANDLE_LABEL_SE_LABEL_DDL, ObRpcHandleLabe
|
|||||||
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_HANDLE_LABEL_SE_USER_LEVEL_DDL, ObRpcHandleLabelSeUserLevelDDLP, handle_label_se_user_level_ddl(arg_));
|
DEFINE_DDL_RS_RPC_PROCESSOR(obrpc::OB_HANDLE_LABEL_SE_USER_LEVEL_DDL, ObRpcHandleLabelSeUserLevelDDLP, handle_label_se_user_level_ddl(arg_));
|
||||||
|
|
||||||
// backup and restore
|
// backup and restore
|
||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_PHYSICAL_RESTORE_TENANT, ObRpcPhysicalRestoreTenantP, physical_restore_tenant(arg_));
|
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_PHYSICAL_RESTORE_TENANT, ObRpcPhysicalRestoreTenantP, physical_restore_tenant(arg_, result_));
|
||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_REBUILD_INDEX_IN_RESTORE, ObRpcRebuildIndexInRestoreP, rebuild_index_in_restore(arg_));
|
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_REBUILD_INDEX_IN_RESTORE, ObRpcRebuildIndexInRestoreP, rebuild_index_in_restore(arg_));
|
||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_ARCHIVE_LOG, ObArchiveLogP, handle_archive_log(arg_));
|
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_ARCHIVE_LOG, ObArchiveLogP, handle_archive_log(arg_));
|
||||||
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_BACKUP_DATABASE, ObBackupDatabaseP, handle_backup_database(arg_));
|
DEFINE_RS_RPC_PROCESSOR(obrpc::OB_BACKUP_DATABASE, ObBackupDatabaseP, handle_backup_database(arg_));
|
||||||
|
|||||||
@ -936,7 +936,7 @@ int ObRestoreUtil::get_restore_ls_palf_base_info(
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObRestoreUtil::check_physical_restore_finish(
|
int ObRestoreUtil::check_physical_restore_finish(
|
||||||
common::ObISQLClient &proxy, uint64_t tenant_id, bool &is_finish, bool &is_failed) {
|
common::ObISQLClient &proxy, const int64_t job_id, bool &is_finish, bool &is_failed) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
is_failed = false;
|
is_failed = false;
|
||||||
is_finish = false;
|
is_finish = false;
|
||||||
@ -946,8 +946,8 @@ int ObRestoreUtil::check_physical_restore_finish(
|
|||||||
HEAP_VAR(ObMySQLProxy::ReadResult, res) {
|
HEAP_VAR(ObMySQLProxy::ReadResult, res) {
|
||||||
common::sqlclient::ObMySQLResult *result = nullptr;
|
common::sqlclient::ObMySQLResult *result = nullptr;
|
||||||
int64_t cnt = 0;
|
int64_t cnt = 0;
|
||||||
if (OB_FAIL(sql.assign_fmt("select status from %s where tenant_id=%lu and restore_tenant_id=%lu",
|
if (OB_FAIL(sql.assign_fmt("select status from %s where tenant_id=%lu and job_id=%ld",
|
||||||
OB_ALL_RESTORE_JOB_HISTORY_TNAME, OB_SYS_TENANT_ID, tenant_id))) {
|
OB_ALL_RESTORE_JOB_HISTORY_TNAME, OB_SYS_TENANT_ID, job_id))) {
|
||||||
LOG_WARN("failed to assign fmt", K(ret));
|
LOG_WARN("failed to assign fmt", K(ret));
|
||||||
} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
|
} else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql.ptr()))) {
|
||||||
LOG_WARN("failed to exec sql", K(ret), K(sql));
|
LOG_WARN("failed to exec sql", K(ret), K(sql));
|
||||||
@ -958,7 +958,7 @@ int ObRestoreUtil::check_physical_restore_finish(
|
|||||||
if (OB_ITER_END == ret) {
|
if (OB_ITER_END == ret) {
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("failed to get next", K(ret), K(tenant_id));
|
LOG_WARN("failed to get next", K(ret), K(job_id));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
EXTRACT_STRBUF_FIELD_MYSQL(*result, OB_STR_STATUS, status_str, OB_DEFAULT_STATUS_LENTH, real_length);
|
EXTRACT_STRBUF_FIELD_MYSQL(*result, OB_STR_STATUS, status_str, OB_DEFAULT_STATUS_LENTH, real_length);
|
||||||
|
|||||||
@ -71,7 +71,7 @@ public:
|
|||||||
static int get_restore_ls_palf_base_info(const share::ObPhysicalRestoreJob &job_info,
|
static int get_restore_ls_palf_base_info(const share::ObPhysicalRestoreJob &job_info,
|
||||||
const share::ObLSID &ls_id,
|
const share::ObLSID &ls_id,
|
||||||
palf::PalfBaseInfo &palf_base_info);
|
palf::PalfBaseInfo &palf_base_info);
|
||||||
static int check_physical_restore_finish(common::ObISQLClient &proxy, uint64_t tenant_id, bool &is_finish, bool &is_failed);
|
static int check_physical_restore_finish(common::ObISQLClient &proxy, const int64_t job_id, bool &is_finish, bool &is_failed);
|
||||||
static int get_restore_tenant_cpu_count(common::ObMySQLProxy &proxy, const uint64_t tenant_id, double &cpu_count);
|
static int get_restore_tenant_cpu_count(common::ObMySQLProxy &proxy, const uint64_t tenant_id, double &cpu_count);
|
||||||
private:
|
private:
|
||||||
static int fill_backup_info_(
|
static int fill_backup_info_(
|
||||||
|
|||||||
@ -262,7 +262,7 @@ public:
|
|||||||
RPC_S(PR5 get_recycle_schema_versions, OB_GET_RECYCLE_SCHEMA_VERSIONS, (obrpc::ObGetRecycleSchemaVersionsArg), obrpc::ObGetRecycleSchemaVersionsResult);
|
RPC_S(PR5 get_recycle_schema_versions, OB_GET_RECYCLE_SCHEMA_VERSIONS, (obrpc::ObGetRecycleSchemaVersionsArg), obrpc::ObGetRecycleSchemaVersionsResult);
|
||||||
|
|
||||||
// backup and restore
|
// backup and restore
|
||||||
RPC_S(PRD physical_restore_tenant, OB_PHYSICAL_RESTORE_TENANT, (obrpc::ObPhysicalRestoreTenantArg));
|
RPC_S(PRD physical_restore_tenant, OB_PHYSICAL_RESTORE_TENANT, (obrpc::ObPhysicalRestoreTenantArg), obrpc::Int64);
|
||||||
RPC_S(PRD rebuild_index_in_restore, OB_REBUILD_INDEX_IN_RESTORE, (obrpc::ObRebuildIndexInRestoreArg));
|
RPC_S(PRD rebuild_index_in_restore, OB_REBUILD_INDEX_IN_RESTORE, (obrpc::ObRebuildIndexInRestoreArg));
|
||||||
RPC_S(PR5 archive_log, obrpc::OB_ARCHIVE_LOG, (ObArchiveLogArg));
|
RPC_S(PR5 archive_log, obrpc::OB_ARCHIVE_LOG, (ObArchiveLogArg));
|
||||||
RPC_S(PRD backup_database, obrpc::OB_BACKUP_DATABASE, (ObBackupDatabaseArg)); // use ddl thread
|
RPC_S(PRD backup_database, obrpc::OB_BACKUP_DATABASE, (ObBackupDatabaseArg)); // use ddl thread
|
||||||
|
|||||||
@ -48,6 +48,7 @@ int ObPhysicalRestoreTenantExecutor::execute(
|
|||||||
const bool is_preview = stmt.get_is_preview();
|
const bool is_preview = stmt.get_is_preview();
|
||||||
ObString first_stmt;
|
ObString first_stmt;
|
||||||
ObObj value;
|
ObObj value;
|
||||||
|
obrpc::Int64 job_id = OB_INVALID_ID;
|
||||||
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
|
if (OB_FAIL(stmt.get_first_stmt(first_stmt))) {
|
||||||
LOG_WARN("fail to get first stmt" , K(ret));
|
LOG_WARN("fail to get first stmt" , K(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -72,7 +73,7 @@ int ObPhysicalRestoreTenantExecutor::execute(
|
|||||||
} else if (OB_ISNULL(common_rpc_proxy)){
|
} else if (OB_ISNULL(common_rpc_proxy)){
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("common rpc proxy should not be null", K(ret));
|
LOG_WARN("common rpc proxy should not be null", K(ret));
|
||||||
} else if (OB_FAIL(common_rpc_proxy->physical_restore_tenant(restore_tenant_arg))) {
|
} else if (OB_FAIL(common_rpc_proxy->physical_restore_tenant(restore_tenant_arg, job_id))) {
|
||||||
LOG_WARN("rpc proxy restore tenant failed", K(ret), "dst", common_rpc_proxy->get_server());
|
LOG_WARN("rpc proxy restore tenant failed", K(ret), "dst", common_rpc_proxy->get_server());
|
||||||
}
|
}
|
||||||
if (session_info->user_variable_exists(OB_RESTORE_SOURCE_NAME_SESSION_STR)) {
|
if (session_info->user_variable_exists(OB_RESTORE_SOURCE_NAME_SESSION_STR)) {
|
||||||
@ -84,7 +85,7 @@ int ObPhysicalRestoreTenantExecutor::execute(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (OB_FAIL(sync_wait_tenant_created_(ctx, restore_tenant_arg.tenant_name_))) {
|
} else if (OB_FAIL(sync_wait_tenant_created_(ctx, restore_tenant_arg.tenant_name_, job_id))) {
|
||||||
LOG_WARN("failed to sync wait tenant created", K(ret));
|
LOG_WARN("failed to sync wait tenant created", K(ret));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,7 +101,8 @@ int ObPhysicalRestoreTenantExecutor::execute(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObPhysicalRestoreTenantExecutor::sync_wait_tenant_created_(ObExecContext &ctx, const ObString &tenant_name)
|
int ObPhysicalRestoreTenantExecutor::sync_wait_tenant_created_(
|
||||||
|
ObExecContext &ctx, const ObString &tenant_name, const int64_t job_id)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const int64_t timeout = 10 * 60 * 1000 * 1000; // 10min
|
const int64_t timeout = 10 * 60 * 1000 * 1000; // 10min
|
||||||
@ -157,8 +159,8 @@ int ObPhysicalRestoreTenantExecutor::sync_wait_tenant_created_(ObExecContext &ct
|
|||||||
if (OB_ERR_INVALID_TENANT_NAME == ret || OB_EAGAIN == ret) {
|
if (OB_ERR_INVALID_TENANT_NAME == ret || OB_EAGAIN == ret) {
|
||||||
bool is_failed = false;
|
bool is_failed = false;
|
||||||
bool is_finish = false;
|
bool is_finish = false;
|
||||||
if (OB_FAIL(ObRestoreUtil::check_physical_restore_finish(*sql_proxy, user_tenant_id, is_finish, is_failed))) {
|
if (OB_FAIL(ObRestoreUtil::check_physical_restore_finish(*sql_proxy, job_id, is_finish, is_failed))) {
|
||||||
LOG_WARN("failed to check physical restore finish", K(ret), K(user_tenant_id));
|
LOG_WARN("failed to check physical restore finish", K(ret), K(job_id));
|
||||||
} else if (!is_finish) {
|
} else if (!is_finish) {
|
||||||
sleep(1);
|
sleep(1);
|
||||||
LOG_DEBUG("restore not finish, wait later", K(ret), K(user_tenant_id));
|
LOG_DEBUG("restore not finish, wait later", K(ret), K(user_tenant_id));
|
||||||
|
|||||||
@ -27,7 +27,7 @@ public:
|
|||||||
virtual ~ObPhysicalRestoreTenantExecutor();
|
virtual ~ObPhysicalRestoreTenantExecutor();
|
||||||
int execute(ObExecContext &ctx, ObPhysicalRestoreTenantStmt &stmt);
|
int execute(ObExecContext &ctx, ObPhysicalRestoreTenantStmt &stmt);
|
||||||
private:
|
private:
|
||||||
int sync_wait_tenant_created_(ObExecContext &ctx, const ObString &tenant_name);
|
int sync_wait_tenant_created_(ObExecContext &ctx, const ObString &tenant_name, const int64_t job_id);
|
||||||
int physical_restore_preview(ObExecContext &ctx, ObPhysicalRestoreTenantStmt &stmt);
|
int physical_restore_preview(ObExecContext &ctx, ObPhysicalRestoreTenantStmt &stmt);
|
||||||
};
|
};
|
||||||
} //end namespace sql
|
} //end namespace sql
|
||||||
|
|||||||
Reference in New Issue
Block a user