From a70508fb0919733f58c9e2c02b31fd7b3b79fe3e Mon Sep 17 00:00:00 2001 From: wxhwang Date: Mon, 27 Nov 2023 09:15:21 +0000 Subject: [PATCH] [CP] fix get NULL table schema --- .../restore/ob_import_table_job_scheduler.cpp | 103 +++++++++++++----- .../restore/ob_import_table_job_scheduler.h | 1 + 2 files changed, 74 insertions(+), 30 deletions(-) diff --git a/src/rootserver/restore/ob_import_table_job_scheduler.cpp b/src/rootserver/restore/ob_import_table_job_scheduler.cpp index 2730eb0375..3ecab7869d 100644 --- a/src/rootserver/restore/ob_import_table_job_scheduler.cpp +++ b/src/rootserver/restore/ob_import_table_job_scheduler.cpp @@ -121,46 +121,86 @@ int ObImportTableJobScheduler::check_compatible_() const int ObImportTableJobScheduler::process_(share::ObImportTableJob &job) { int ret = OB_SUCCESS; - switch(job.get_status()) { - case ObImportTableJobStatus::INIT: { - if (OB_FAIL(gen_import_table_task_(job))) { - LOG_WARN("failed to gen import table task", K(ret), K(job)); - } - break; + if (OB_FAIL(wait_src_tenant_schema_refreshed_(job.get_src_tenant_id()))) { + if (OB_SCHEMA_EAGAIN != ret) { + LOG_WARN("failed to wait src tenant schema refreshed", K(ret), K(job)); } - case ObImportTableJobStatus::IMPORT_TABLE: { - if (OB_FAIL(deal_with_import_table_task_(job))) { - LOG_WARN("failed to deal with import table task", K(ret), K(job)); + } else { + switch(job.get_status()) { + case ObImportTableJobStatus::INIT: { + if (OB_FAIL(gen_import_table_task_(job))) { + LOG_WARN("failed to gen import table task", K(ret), K(job)); + } + break; } - break; - } - case ObImportTableJobStatus::RECONSTRUCT_REF_CONSTRAINT: { - if (OB_FAIL(reconstruct_ref_constraint_(job))) { - LOG_WARN("failed to deal with reconstrcut ref constraint", K(ret)); + case ObImportTableJobStatus::IMPORT_TABLE: { + if (OB_FAIL(deal_with_import_table_task_(job))) { + LOG_WARN("failed to deal with import table task", K(ret), K(job)); + } + break; } - break; - } - case ObImportTableJobStatus::CANCELING: { - if (OB_FAIL(canceling_(job))) { - LOG_WARN("failed to cancel", K(ret), K(job)); + case ObImportTableJobStatus::RECONSTRUCT_REF_CONSTRAINT: { + if (OB_FAIL(reconstruct_ref_constraint_(job))) { + LOG_WARN("failed to deal with reconstrcut ref constraint", K(ret)); + } + break; } - break; - } - case ObImportTableJobStatus::IMPORT_FINISH: { - if (OB_FAIL(finish_(job))) { - LOG_WARN("failed to cancel", K(ret), K(job)); + case ObImportTableJobStatus::CANCELING: { + if (OB_FAIL(canceling_(job))) { + LOG_WARN("failed to cancel", K(ret), K(job)); + } + break; + } + case ObImportTableJobStatus::IMPORT_FINISH: { + if (OB_FAIL(finish_(job))) { + LOG_WARN("failed to cancel", K(ret), K(job)); + } + break; + } + default: { + ret = OB_ERR_SYS; + LOG_WARN("invalid import job status", K(ret)); + break; } - break; - } - default: { - ret = OB_ERR_SYS; - LOG_WARN("invalid import job status", K(ret)); - break; } } return ret; } +int ObImportTableJobScheduler::wait_src_tenant_schema_refreshed_(const uint64_t tenant_id) +{ + // Only if the aux tenant schema refreshed to newest, then we can confirm src table exist or not. + int ret = OB_SUCCESS; + int64_t max_schema_version = OB_INVALID_VERSION; + ObSchemaService *sql_schema_service = nullptr; + ObRefreshSchemaStatus status; + status.tenant_id_ = tenant_id; + MTL_SWITCH (OB_SYS_TENANT_ID) { + if (OB_ISNULL(schema_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema_service_ is null", K(ret)); + } else if (OB_ISNULL(sql_schema_service = schema_service_->get_schema_service())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql_schema_service is null", K(ret)); + } else if (OB_FAIL(sql_schema_service->fetch_schema_version(status, *sql_proxy_, max_schema_version))) { + LOG_WARN("fail to fetch max schema version", K(ret), K(tenant_id), K(status)); + } else { + int64_t refreshed_schema_version = 0; + if (OB_FAIL(schema_service_->get_tenant_refreshed_schema_version( + tenant_id, refreshed_schema_version))) { + LOG_WARN("get refreshed schema version failed", K(ret), K(tenant_id)); + } else if (!ObSchemaService::is_formal_version(refreshed_schema_version) || refreshed_schema_version < max_schema_version) { + ret = OB_SCHEMA_EAGAIN; + if (REACH_TIME_INTERVAL(1000L * 1000L)) { + LOG_WARN("tenant schema not refreshed to the newest version", K(ret), K(tenant_id), K(max_schema_version), K(refreshed_schema_version)); + } + } + } + } + + return ret; +} + int ObImportTableJobScheduler::reconstruct_ref_constraint_(share::ObImportTableJob &job) { int ret = OB_SUCCESS; @@ -682,6 +722,9 @@ int ObImportTableTaskScheduler::construct_import_table_arg_(obrpc::ObRecoverRest false, src_table_schema))) { LOG_WARN("failed to get table schema", K(ret), KPC_(import_task)); + } else if (OB_ISNULL(src_table_schema)) { + ret = OB_TABLE_NOT_EXIST; + LOG_WARN("src table not exist", K(ret), KPC_(import_task)); } } if (FAILEDx(construct_import_table_schema_(*src_table_schema, arg.target_schema_))) { diff --git a/src/rootserver/restore/ob_import_table_job_scheduler.h b/src/rootserver/restore/ob_import_table_job_scheduler.h index e0219751d2..076b42897d 100644 --- a/src/rootserver/restore/ob_import_table_job_scheduler.h +++ b/src/rootserver/restore/ob_import_table_job_scheduler.h @@ -51,6 +51,7 @@ public: private: int check_compatible_() const; int process_(share::ObImportTableJob &job); + int wait_src_tenant_schema_refreshed_(const uint64_t tenant_id); int gen_import_table_task_(share::ObImportTableJob &job); int deal_with_import_table_task_(share::ObImportTableJob &job); int process_import_table_task_(share::ObImportTableTask &task);