diff --git a/src/share/schema/ob_schema_utils.cpp b/src/share/schema/ob_schema_utils.cpp index 377c8921c5..689f63bd03 100644 --- a/src/share/schema/ob_schema_utils.cpp +++ b/src/share/schema/ob_schema_utils.cpp @@ -25,6 +25,7 @@ #include "sql/resolver/expr/ob_raw_expr_util.h" #include "sql/session/ob_sql_session_info.h" #include "observer/ob_server_struct.h" +#include "sql/engine/cmd/ob_ddl_executor_util.h" namespace oceanbase { using namespace common; @@ -506,6 +507,7 @@ int ObSchemaUtils::construct_inner_table_schemas( int ObSchemaUtils::try_check_parallel_ddl_schema_in_sync( const ObTimeoutCtx &ctx, + sql::ObSQLSessionInfo *session, const uint64_t tenant_id, const int64_t schema_version) { @@ -517,19 +519,27 @@ int ObSchemaUtils::try_check_parallel_ddl_schema_in_sync( if (tenant_config.is_valid()) { consensus_timeout = tenant_config->_wait_interval_after_parallel_ddl; } - if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + if (OB_ISNULL(session) || OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || schema_version <= 0 || consensus_timeout < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(schema_version), K(consensus_timeout)); + LOG_WARN("invalid arg", KR(ret), KP(session), K(tenant_id), K(schema_version), K(consensus_timeout)); } else if (OB_ISNULL(schema_service = GCTX.schema_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("schema_service is null", KR(ret)); } + bool is_dropped = false; while (OB_SUCC(ret) && ctx.get_timeout() > 0) { int64_t refreshed_schema_version = OB_INVALID_VERSION; int64_t consensus_schema_version = OB_INVALID_VERSION; - if (OB_FAIL(schema_service->get_tenant_refreshed_schema_version(tenant_id, refreshed_schema_version))) { + if (OB_FAIL(schema_service->check_if_tenant_has_been_dropped(tenant_id, is_dropped))) { + LOG_WARN("fail to check if tenant has been dropped", KR(ret), K(tenant_id)); + } else if (OB_UNLIKELY(is_dropped)) { + ret = OB_TENANT_HAS_BEEN_DROPPED; + LOG_WARN("tenant has been dropped", KR(ret), K(tenant_id)); + } else if (OB_FAIL(ObDDLExecutorUtil::handle_session_exception(*session))) { + LOG_WARN("fail to handle session exception", KR(ret)); + } else if (OB_FAIL(schema_service->get_tenant_refreshed_schema_version(tenant_id, refreshed_schema_version))) { LOG_WARN("get refreshed schema_version fail", KR(ret), K(tenant_id)); } else if (OB_FAIL(schema_service->get_tenant_broadcast_consensus_version(tenant_id, consensus_schema_version))) { LOG_WARN("get consensus schema_version fail", KR(ret), K(tenant_id)); diff --git a/src/share/schema/ob_schema_utils.h b/src/share/schema/ob_schema_utils.h index ab27b55d28..7e5c79c0fb 100644 --- a/src/share/schema/ob_schema_utils.h +++ b/src/share/schema/ob_schema_utils.h @@ -178,6 +178,7 @@ public: static int try_check_parallel_ddl_schema_in_sync( const ObTimeoutCtx &ctx, + sql::ObSQLSessionInfo *session, const uint64_t tenant_id, const int64_t schema_version); private: diff --git a/src/sql/engine/cmd/ob_table_executor.cpp b/src/sql/engine/cmd/ob_table_executor.cpp index 4caf531b79..8752942d12 100644 --- a/src/sql/engine/cmd/ob_table_executor.cpp +++ b/src/sql/engine/cmd/ob_table_executor.cpp @@ -586,7 +586,7 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt) } else { int64_t refresh_time = ObTimeUtility::current_time(); if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync( - ctx, tenant_id, res.schema_version_))) { + ctx, my_session, tenant_id, res.schema_version_))) { LOG_WARN("fail to check paralleld ddl schema in sync", KR(ret), K(res)); } int64_t end_time = ObTimeUtility::current_time(); @@ -2254,7 +2254,7 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st ret = OB_ERR_UNEXPECTED; LOG_WARN("truncate invalid ddl_res", KR(ret), K(res)); } else if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync( - ctx, tenant_id, res.task_id_))) { + ctx, my_session, tenant_id, res.task_id_))) { LOG_WARN("fail to check parallel ddl schema in sync", KR(ret), K(res)); } int64_t end_time = ObTimeUtility::current_time();