[CP] fix get NULL table schema

This commit is contained in:
wxhwang
2024-02-07 01:28:00 +00:00
committed by ob-robot
parent 9aa4871187
commit 78fd12676e
2 changed files with 74 additions and 30 deletions

View File

@ -121,6 +121,11 @@ int ObImportTableJobScheduler::check_compatible_() const
int ObImportTableJobScheduler::process_(share::ObImportTableJob &job) int ObImportTableJobScheduler::process_(share::ObImportTableJob &job)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_FAIL(wait_src_tenant_schema_refreshed_(job.get_src_tenant_id()))) {
if (OB_SCHEMA_EAGAIN != ret) {
LOG_WARN("failed to wait src tenant schema refreshed", K(ret), K(job));
}
} else {
switch(job.get_status()) { switch(job.get_status()) {
case ObImportTableJobStatus::INIT: { case ObImportTableJobStatus::INIT: {
if (OB_FAIL(gen_import_table_task_(job))) { if (OB_FAIL(gen_import_table_task_(job))) {
@ -158,6 +163,41 @@ int ObImportTableJobScheduler::process_(share::ObImportTableJob &job)
break; break;
} }
} }
}
return ret;
}
int ObImportTableJobScheduler::wait_src_tenant_schema_refreshed_(const uint64_t tenant_id)
{
// Only if the aux tenant schema refreshed to newest, then we can confirm src table exist or not.
int ret = OB_SUCCESS;
int64_t max_schema_version = OB_INVALID_VERSION;
ObSchemaService *sql_schema_service = nullptr;
ObRefreshSchemaStatus status;
status.tenant_id_ = tenant_id;
MTL_SWITCH (OB_SYS_TENANT_ID) {
if (OB_ISNULL(schema_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_service_ is null", K(ret));
} else if (OB_ISNULL(sql_schema_service = schema_service_->get_schema_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql_schema_service is null", K(ret));
} else if (OB_FAIL(sql_schema_service->fetch_schema_version(status, *sql_proxy_, max_schema_version))) {
LOG_WARN("fail to fetch max schema version", K(ret), K(tenant_id), K(status));
} else {
int64_t refreshed_schema_version = 0;
if (OB_FAIL(schema_service_->get_tenant_refreshed_schema_version(
tenant_id, refreshed_schema_version))) {
LOG_WARN("get refreshed schema version failed", K(ret), K(tenant_id));
} else if (!ObSchemaService::is_formal_version(refreshed_schema_version) || refreshed_schema_version < max_schema_version) {
ret = OB_SCHEMA_EAGAIN;
if (REACH_TIME_INTERVAL(1000L * 1000L)) {
LOG_WARN("tenant schema not refreshed to the newest version", K(ret), K(tenant_id), K(max_schema_version), K(refreshed_schema_version));
}
}
}
}
return ret; return ret;
} }
@ -682,6 +722,9 @@ int ObImportTableTaskScheduler::construct_import_table_arg_(obrpc::ObRecoverRest
false, false,
src_table_schema))) { src_table_schema))) {
LOG_WARN("failed to get table schema", K(ret), KPC_(import_task)); LOG_WARN("failed to get table schema", K(ret), KPC_(import_task));
} else if (OB_ISNULL(src_table_schema)) {
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("src table not exist", K(ret), KPC_(import_task));
} }
} }
if (FAILEDx(construct_import_table_schema_(*src_table_schema, arg.target_schema_))) { if (FAILEDx(construct_import_table_schema_(*src_table_schema, arg.target_schema_))) {

View File

@ -51,6 +51,7 @@ public:
private: private:
int check_compatible_() const; int check_compatible_() const;
int process_(share::ObImportTableJob &job); int process_(share::ObImportTableJob &job);
int wait_src_tenant_schema_refreshed_(const uint64_t tenant_id);
int gen_import_table_task_(share::ObImportTableJob &job); int gen_import_table_task_(share::ObImportTableJob &job);
int deal_with_import_table_task_(share::ObImportTableJob &job); int deal_with_import_table_task_(share::ObImportTableJob &job);
int process_import_table_task_(share::ObImportTableTask &task); int process_import_table_task_(share::ObImportTableTask &task);