diff --git a/src/observer/table_load/ob_table_load_redef_table.cpp b/src/observer/table_load/ob_table_load_redef_table.cpp index 39394e606e..be16aa1510 100644 --- a/src/observer/table_load/ob_table_load_redef_table.cpp +++ b/src/observer/table_load/ob_table_load_redef_table.cpp @@ -80,7 +80,7 @@ int ObTableLoadRedefTable::finish(const ObTableLoadRedefTableFinishArg &arg, copy_table_dependents_arg.copy_constraints_ = true; copy_table_dependents_arg.copy_triggers_ = false; copy_table_dependents_arg.ignore_errors_ = false; - if (OB_FAIL(ObDDLServerClient::copy_table_dependents(copy_table_dependents_arg))) { + if (OB_FAIL(ObDDLServerClient::copy_table_dependents(copy_table_dependents_arg, session_info))) { LOG_WARN("failed to copy table dependents", KR(ret), K(copy_table_dependents_arg)); } else { LOG_INFO("succeed to copy table dependents", K(copy_table_dependents_arg)); diff --git a/src/sql/engine/cmd/ob_ddl_executor_util.h b/src/sql/engine/cmd/ob_ddl_executor_util.h index 5d390829aa..a90f4f2eb7 100644 --- a/src/sql/engine/cmd/ob_ddl_executor_util.h +++ b/src/sql/engine/cmd/ob_ddl_executor_util.h @@ -66,9 +66,9 @@ public: int64_t &affected_rows); static int wait_build_index_finish(const uint64_t tenant_id, const int64_t task_id, bool &is_finish); static int handle_session_exception(ObSQLSessionInfo &session); + static int cancel_ddl_task(const int64_t tenant_id, obrpc::ObCommonRpcProxy *common_rpc_proxy); private: static inline bool is_server_stopped() { return observer::ObServer::get_instance().is_stopped(); } - static int cancel_ddl_task(const int64_t tenant_id, obrpc::ObCommonRpcProxy *common_rpc_proxy); private: DISALLOW_COPY_AND_ASSIGN(ObDDLExecutorUtil); }; diff --git a/src/storage/ddl/ob_ddl_server_client.cpp b/src/storage/ddl/ob_ddl_server_client.cpp index 29699812c8..0d867329e8 100644 --- a/src/storage/ddl/ob_ddl_server_client.cpp +++ b/src/storage/ddl/ob_ddl_server_client.cpp @@ -111,9 +111,10 @@ int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg, } return ret; } -int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg) +int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg, sql::ObSQLSessionInfo &session) { int ret = OB_SUCCESS; + const uint64_t tenant_id = arg.tenant_id_; const int64_t retry_interval = 100 * 1000L; ObAddr rs_leader_addr; obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_; @@ -126,11 +127,15 @@ int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsA } else { while (OB_SUCC(ret)) { int tmp_ret = OB_SUCCESS; - omt::ObTenant *tenant = nullptr; - if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) { + if (OB_FAIL(check_need_stop(tenant_id))) { + LOG_WARN("fail to basic check", K(ret), K(tenant_id)); + } else if (OB_FAIL(ObDDLExecutorUtil::handle_session_exception(session))) { + LOG_WARN("session execption happened", K(ret)); + if (OB_TMP_FAIL(ObDDLExecutorUtil::cancel_ddl_task(tenant_id, common_rpc_proxy))) { + LOG_WARN("cancel ddl task failed", K(tmp_ret)); + } + } else if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) { LOG_WARN("fail to rootservice address", K(tmp_ret)); - } else if (OB_FAIL(GCTX.omt_->get_tenant(arg.tenant_id_, tenant))) { - LOG_WARN("fail to get tenant, maybe tenant deleted", K_(arg.tenant_id)); } else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).copy_table_dependents(arg))) { LOG_WARN("copy table dependents failed", K(ret), K(arg)); if (OB_ENTRY_NOT_EXIST == ret) { @@ -156,6 +161,7 @@ int ObDDLServerClient::copy_table_dependents(const obrpc::ObCopyTableDependentsA int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, sql::ObSQLSessionInfo &session) { int ret = OB_SUCCESS; + const uint64_t tenant_id = arg.tenant_id_; const int64_t retry_interval = 100 * 1000L; ObAddr rs_leader_addr; obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_; @@ -168,11 +174,10 @@ int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, } else { while (OB_SUCC(ret)) { int tmp_ret = OB_SUCCESS; - omt::ObTenant *tenant = nullptr; - if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) { + if (OB_FAIL(check_need_stop(tenant_id))) { + LOG_WARN("fail to basic check", K(ret), K(tenant_id)); + } else if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) { LOG_WARN("fail to get rootservice address", K(tmp_ret)); - } else if (OB_FAIL(GCTX.omt_->get_tenant(arg.tenant_id_, tenant))) { - LOG_WARN("fail to get tenant, maybe tenant deleted", K_(arg.tenant_id)); } else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).abort_redef_table(arg))) { LOG_WARN("abort redef table failed", K(ret), K(arg)); if (OB_ENTRY_NOT_EXIST == ret) { @@ -217,6 +222,7 @@ int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &fi { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; + const uint64_t tenant_id = finish_redef_arg.tenant_id_; const int64_t retry_interval = 100 * 1000L; ObAddr rs_leader_addr; obrpc::ObCommonRpcProxy *common_rpc_proxy = GCTX.rs_rpc_proxy_; @@ -229,11 +235,16 @@ int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &fi } else { while (OB_SUCC(ret)) { int tmp_ret = OB_SUCCESS; - omt::ObTenant *tenant = nullptr; - if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) { + if (OB_FAIL(check_need_stop(tenant_id))) { + LOG_WARN("fail to basic check", K(ret), K(tenant_id)); + } else if (OB_FAIL(ObDDLExecutorUtil::handle_session_exception(session))) { + LOG_WARN("session execption happened", K(ret)); + if (OB_TMP_FAIL(ObDDLExecutorUtil::cancel_ddl_task(tenant_id, common_rpc_proxy))) { + LOG_WARN("cancel ddl task failed", K(tmp_ret)); + ret = OB_SUCCESS; + } + } else if (OB_TMP_FAIL(GCTX.rs_mgr_->get_master_root_server(rs_leader_addr))) { LOG_WARN("fail to rootservice address", K(tmp_ret)); - } else if (OB_FAIL(GCTX.omt_->get_tenant(finish_redef_arg.tenant_id_, tenant))) { - LOG_WARN("fail to get tenant, maybe tenant deleted", K_(finish_redef_arg.tenant_id)); } else if (OB_FAIL(common_rpc_proxy->to(rs_leader_addr).finish_redef_table(finish_redef_arg))) { LOG_WARN("finish redef table failed", K(ret), K(finish_redef_arg)); if (OB_ENTRY_NOT_EXIST == ret) { @@ -345,6 +356,36 @@ int ObDDLServerClient::heart_beat_clear(const int64_t task_id) return ret; } +int ObDDLServerClient::check_need_stop(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + bool is_tenant_dropped = false; + bool is_tenant_standby = false; + if (OB_FAIL(ret)) { + } else if (OB_TMP_FAIL(GSCHEMASERVICE.check_if_tenant_has_been_dropped(tenant_id, is_tenant_dropped))) { + LOG_WARN("check if tenant has been droopped failed", K(tmp_ret), K(tenant_id)); + } else if (is_tenant_dropped) { + ret = OB_TENANT_HAS_BEEN_DROPPED; + LOG_WARN("tenant has been dropped", K(ret), K(tenant_id)); + } + + if (OB_FAIL(ret)) { + } else if (OB_TMP_FAIL(ObAllTenantInfoProxy::is_standby_tenant(GCTX.sql_proxy_, tenant_id, is_tenant_standby))) { + LOG_WARN("check is standby tenant failed", K(tmp_ret), K(tenant_id)); + } else if (is_tenant_standby) { + ret = OB_STANDBY_READ_ONLY; + LOG_WARN("tenant is standby now, stop wait", K(ret), K(tenant_id)); + } + + if (OB_FAIL(ret)) { + } else if (observer::ObServer::get_instance().is_stopped()) { + ret = OB_TIMEOUT; + LOG_WARN("server is stopping", K(ret), K(tenant_id)); + } + return ret; +} + } // end of namespace storage } // end of namespace oceanbase diff --git a/src/storage/ddl/ob_ddl_server_client.h b/src/storage/ddl/ob_ddl_server_client.h index 6e27952828..5f85be5819 100644 --- a/src/storage/ddl/ob_ddl_server_client.h +++ b/src/storage/ddl/ob_ddl_server_client.h @@ -25,7 +25,7 @@ class ObDDLServerClient final public: static int create_hidden_table(const obrpc::ObCreateHiddenTableArg &arg, obrpc::ObCreateHiddenTableRes &res, int64_t &snapshot_version, sql::ObSQLSessionInfo &session); static int start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res, sql::ObSQLSessionInfo &session); - static int copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg); + static int copy_table_dependents(const obrpc::ObCopyTableDependentsArg &arg, sql::ObSQLSessionInfo &session); static int finish_redef_table(const obrpc::ObFinishRedefTableArg &finish_redef_arg, const obrpc::ObDDLBuildSingleReplicaResponseArg &build_single_arg, sql::ObSQLSessionInfo &session); @@ -34,6 +34,7 @@ public: private: static int wait_task_reach_pending(const uint64_t tenant_id, const int64_t task_id, int64_t &snapshot_version, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session); static int heart_beat_clear(const int64_t task_id); + static int check_need_stop(const uint64_t tenant_id); }; } // end of namespace observer