[CP] fix get NULL table schema
This commit is contained in:
@ -121,46 +121,86 @@ int ObImportTableJobScheduler::check_compatible_() const
|
||||
int ObImportTableJobScheduler::process_(share::ObImportTableJob &job)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
switch(job.get_status()) {
|
||||
case ObImportTableJobStatus::INIT: {
|
||||
if (OB_FAIL(gen_import_table_task_(job))) {
|
||||
LOG_WARN("failed to gen import table task", K(ret), K(job));
|
||||
}
|
||||
break;
|
||||
if (OB_FAIL(wait_src_tenant_schema_refreshed_(job.get_src_tenant_id()))) {
|
||||
if (OB_SCHEMA_EAGAIN != ret) {
|
||||
LOG_WARN("failed to wait src tenant schema refreshed", K(ret), K(job));
|
||||
}
|
||||
case ObImportTableJobStatus::IMPORT_TABLE: {
|
||||
if (OB_FAIL(deal_with_import_table_task_(job))) {
|
||||
LOG_WARN("failed to deal with import table task", K(ret), K(job));
|
||||
} else {
|
||||
switch(job.get_status()) {
|
||||
case ObImportTableJobStatus::INIT: {
|
||||
if (OB_FAIL(gen_import_table_task_(job))) {
|
||||
LOG_WARN("failed to gen import table task", K(ret), K(job));
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObImportTableJobStatus::RECONSTRUCT_REF_CONSTRAINT: {
|
||||
if (OB_FAIL(reconstruct_ref_constraint_(job))) {
|
||||
LOG_WARN("failed to deal with reconstrcut ref constraint", K(ret));
|
||||
case ObImportTableJobStatus::IMPORT_TABLE: {
|
||||
if (OB_FAIL(deal_with_import_table_task_(job))) {
|
||||
LOG_WARN("failed to deal with import table task", K(ret), K(job));
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObImportTableJobStatus::CANCELING: {
|
||||
if (OB_FAIL(canceling_(job))) {
|
||||
LOG_WARN("failed to cancel", K(ret), K(job));
|
||||
case ObImportTableJobStatus::RECONSTRUCT_REF_CONSTRAINT: {
|
||||
if (OB_FAIL(reconstruct_ref_constraint_(job))) {
|
||||
LOG_WARN("failed to deal with reconstrcut ref constraint", K(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObImportTableJobStatus::IMPORT_FINISH: {
|
||||
if (OB_FAIL(finish_(job))) {
|
||||
LOG_WARN("failed to cancel", K(ret), K(job));
|
||||
case ObImportTableJobStatus::CANCELING: {
|
||||
if (OB_FAIL(canceling_(job))) {
|
||||
LOG_WARN("failed to cancel", K(ret), K(job));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObImportTableJobStatus::IMPORT_FINISH: {
|
||||
if (OB_FAIL(finish_(job))) {
|
||||
LOG_WARN("failed to cancel", K(ret), K(job));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("invalid import job status", K(ret));
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("invalid import job status", K(ret));
|
||||
break;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObImportTableJobScheduler::wait_src_tenant_schema_refreshed_(const uint64_t tenant_id)
|
||||
{
|
||||
// Only if the aux tenant schema refreshed to newest, then we can confirm src table exist or not.
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t max_schema_version = OB_INVALID_VERSION;
|
||||
ObSchemaService *sql_schema_service = nullptr;
|
||||
ObRefreshSchemaStatus status;
|
||||
status.tenant_id_ = tenant_id;
|
||||
MTL_SWITCH (OB_SYS_TENANT_ID) {
|
||||
if (OB_ISNULL(schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema_service_ is null", K(ret));
|
||||
} else if (OB_ISNULL(sql_schema_service = schema_service_->get_schema_service())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sql_schema_service is null", K(ret));
|
||||
} else if (OB_FAIL(sql_schema_service->fetch_schema_version(status, *sql_proxy_, max_schema_version))) {
|
||||
LOG_WARN("fail to fetch max schema version", K(ret), K(tenant_id), K(status));
|
||||
} else {
|
||||
int64_t refreshed_schema_version = 0;
|
||||
if (OB_FAIL(schema_service_->get_tenant_refreshed_schema_version(
|
||||
tenant_id, refreshed_schema_version))) {
|
||||
LOG_WARN("get refreshed schema version failed", K(ret), K(tenant_id));
|
||||
} else if (!ObSchemaService::is_formal_version(refreshed_schema_version) || refreshed_schema_version < max_schema_version) {
|
||||
ret = OB_SCHEMA_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(1000L * 1000L)) {
|
||||
LOG_WARN("tenant schema not refreshed to the newest version", K(ret), K(tenant_id), K(max_schema_version), K(refreshed_schema_version));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObImportTableJobScheduler::reconstruct_ref_constraint_(share::ObImportTableJob &job)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -682,6 +722,9 @@ int ObImportTableTaskScheduler::construct_import_table_arg_(obrpc::ObRecoverRest
|
||||
false,
|
||||
src_table_schema))) {
|
||||
LOG_WARN("failed to get table schema", K(ret), KPC_(import_task));
|
||||
} else if (OB_ISNULL(src_table_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_WARN("src table not exist", K(ret), KPC_(import_task));
|
||||
}
|
||||
}
|
||||
if (FAILEDx(construct_import_table_schema_(*src_table_schema, arg.target_schema_))) {
|
||||
|
@ -51,6 +51,7 @@ public:
|
||||
private:
|
||||
int check_compatible_() const;
|
||||
int process_(share::ObImportTableJob &job);
|
||||
int wait_src_tenant_schema_refreshed_(const uint64_t tenant_id);
|
||||
int gen_import_table_task_(share::ObImportTableJob &job);
|
||||
int deal_with_import_table_task_(share::ObImportTableJob &job);
|
||||
int process_import_table_task_(share::ObImportTableTask &task);
|
||||
|
Reference in New Issue
Block a user