diff --git a/src/storage/ddl/ob_ddl_server_client.cpp b/src/storage/ddl/ob_ddl_server_client.cpp index 4f05d343e9..7dbaf10d65 100644 --- a/src/storage/ddl/ob_ddl_server_client.cpp +++ b/src/storage/ddl/ob_ddl_server_client.cpp @@ -43,18 +43,16 @@ int ObDDLServerClient::create_hidden_table(const obrpc::ObCreateHiddenTableArg & LOG_WARN("failed to set register task id", K(ret), K(res)); } else if (OB_FAIL(wait_table_lock(arg.tenant_id_, res.task_id_, *GCTX.sql_proxy_, session))) { LOG_WARN("failed to wait table lock. remove register task id and abort redef table task.", K(ret), K(arg), K(res)); - int tmp_ret = ret; - if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(res.task_id_))) { - LOG_ERROR("failed to remove register task id", K(ret), K(res)); - } else { - obrpc::ObAbortRedefTableArg abort_redef_table_arg; - abort_redef_table_arg.task_id_ = res.task_id_; - abort_redef_table_arg.tenant_id_ = arg.tenant_id_; - if (OB_FAIL(abort_redef_table(abort_redef_table_arg, session))) { - LOG_WARN("failed to abort redef table", K(ret), K(abort_redef_table_arg)); - } + int tmp_ret = OB_SUCCESS; + obrpc::ObAbortRedefTableArg abort_redef_table_arg; + abort_redef_table_arg.task_id_ = res.task_id_; + abort_redef_table_arg.tenant_id_ = arg.tenant_id_; + if (OB_TMP_FAIL(abort_redef_table(abort_redef_table_arg, session))) { + LOG_WARN("failed to abort redef table", K(tmp_ret), K(abort_redef_table_arg)); + } + if (OB_TMP_FAIL(heart_beat_clear(res.task_id_))) { + LOG_WARN("heart beat clear failed", K(tmp_ret), K(res.task_id_)); } - ret = tmp_ret; } return ret; } @@ -75,18 +73,16 @@ int ObDDLServerClient::start_redef_table(const obrpc::ObStartRedefTableArg &arg, LOG_WARN("failed to set register task id", K(ret), K(res)); } else if (OB_FAIL(wait_table_lock(arg.orig_tenant_id_, res.task_id_, *GCTX.sql_proxy_, session))) { LOG_WARN("failed to wait table lock. remove register task id and abort redef table task.", K(ret), K(arg), K(res)); - int tmp_ret = ret; - if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(res.task_id_))) { - LOG_ERROR("failed to remove register task id", K(ret), K(res)); - } else { - obrpc::ObAbortRedefTableArg abort_redef_table_arg; - abort_redef_table_arg.task_id_ = res.task_id_; - abort_redef_table_arg.tenant_id_ = arg.orig_tenant_id_; - if (OB_FAIL(abort_redef_table(abort_redef_table_arg, session))) { - LOG_WARN("failed to abort redef table", K(ret), K(abort_redef_table_arg)); - } + int tmp_ret = OB_SUCCESS; + obrpc::ObAbortRedefTableArg abort_redef_table_arg; + abort_redef_table_arg.task_id_ = res.task_id_; + abort_redef_table_arg.tenant_id_ = arg.orig_tenant_id_; + if (OB_TMP_FAIL(abort_redef_table(abort_redef_table_arg, session))) { + LOG_WARN("failed to abort redef table", K(tmp_ret), K(abort_redef_table_arg)); + } + if (OB_TMP_FAIL(heart_beat_clear(res.task_id_))) { + LOG_WARN("heart beat clear failed", K(tmp_ret), K(res.task_id_)); } - ret = tmp_ret; } return ret; } @@ -139,18 +135,12 @@ int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; } - int tmp_ret = 0; - if (OB_TMP_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(arg.tenant_id_, arg.task_id_, session, common_rpc_proxy))) { - LOG_WARN("wait ddl finish failed", K(tmp_ret), K(arg.tenant_id_), K(arg.task_id_)); + if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(arg.tenant_id_, arg.task_id_, session, common_rpc_proxy))) { + LOG_WARN("wait ddl finish failed", K(ret), K(arg.tenant_id_), K(arg.task_id_)); } - if (OB_SUCC(ret)) { - if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(arg.task_id_))) { - if (OB_HASH_NOT_EXIST == ret) { - ret = OB_SUCCESS; - } else { - LOG_ERROR("failed to remove register task id", K(ret), K(arg)); - } - } + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(heart_beat_clear(arg.task_id_))) { + LOG_WARN("heart beat clear failed", K(tmp_ret), K(arg.task_id_)); } } return ret; @@ -173,13 +163,11 @@ int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &fi LOG_WARN("failed to finish redef table", K(ret), K(finish_redef_arg)); } else if (OB_FAIL(build_ddl_single_replica_response(build_single_arg))) { LOG_WARN("build ddl single replica response", K(ret), K(build_single_arg)); - } else if (OB_TMP_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(finish_redef_arg.tenant_id_, finish_redef_arg.task_id_, session, common_rpc_proxy))) { - LOG_WARN("failed to wait ddl finish", K(tmp_ret), K(finish_redef_arg.tenant_id_), K(finish_redef_arg.task_id_)); + } else if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(finish_redef_arg.tenant_id_, finish_redef_arg.task_id_, session, common_rpc_proxy))) { + LOG_WARN("failed to wait ddl finish", K(ret), K(finish_redef_arg.tenant_id_), K(finish_redef_arg.task_id_)); } - if (OB_SUCC(ret)) { - if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(finish_redef_arg.task_id_))) { - LOG_ERROR("failed to remove register task id", K(ret), K(finish_redef_arg)); - } + if (OB_TMP_FAIL(heart_beat_clear(finish_redef_arg.task_id_))) { + LOG_WARN("heart beat clear failed", K(tmp_ret), K(finish_redef_arg.task_id_)); } return ret; } @@ -247,6 +235,22 @@ int ObDDLServerClient::wait_table_lock(const uint64_t tenant_id, const int64_t t return ret; } +int ObDDLServerClient::heart_beat_clear(const int64_t task_id) +{ + int ret = OB_SUCCESS; + if (task_id <= 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(task_id)); + } else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(task_id))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_ERROR("failed to remove register task id", K(ret), K(task_id)); + } + } + return ret; +} + } // end of namespace storage } // end of namespace oceanbase diff --git a/src/storage/ddl/ob_ddl_server_client.h b/src/storage/ddl/ob_ddl_server_client.h index 830bac7c16..e6a8a8b7d7 100644 --- a/src/storage/ddl/ob_ddl_server_client.h +++ b/src/storage/ddl/ob_ddl_server_client.h @@ -33,6 +33,7 @@ public: static int build_ddl_single_replica_response(const obrpc::ObDDLBuildSingleReplicaResponseArg &arg); private: static int wait_table_lock(const uint64_t tenant_id, const int64_t task_id, ObMySQLProxy &sql_proxy, sql::ObSQLSessionInfo &session); + static int heart_beat_clear(const int64_t task_id); }; } // end of namespace observer