diff --git a/src/storage/ddl/ob_ddl_heart_beat_task.cpp b/src/storage/ddl/ob_ddl_heart_beat_task.cpp index b2fc5d4181..66a192828c 100644 --- a/src/storage/ddl/ob_ddl_heart_beat_task.cpp +++ b/src/storage/ddl/ob_ddl_heart_beat_task.cpp @@ -90,14 +90,14 @@ int ObDDLHeartBeatTaskContainer::set_register_task_id(const int64_t task_id, con LOG_WARN("ObDDLHeartBeatTaskContainer not inited", K(ret)); } else { ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id); - if (OB_FAIL(register_tasks_.set_refactored(task_id, tenant_id))) { + if (OB_FAIL(register_tasks_.set_refactored(rootserver::ObDDLTaskID(tenant_id, task_id), 0))) { LOG_ERROR("set register task id failed", KR(ret)); } } return ret; } -int ObDDLHeartBeatTaskContainer::remove_register_task_id(const int64_t task_id) +int ObDDLHeartBeatTaskContainer::remove_register_task_id(const int64_t task_id, const uint64_t tenant_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { @@ -105,11 +105,11 @@ int ObDDLHeartBeatTaskContainer::remove_register_task_id(const int64_t task_id) LOG_WARN("ObDDLHeartBeatTaskContainer not inited", K(ret)); } else { ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id); - if (OB_FAIL(register_tasks_.erase_refactored(task_id))) { + if (OB_FAIL(register_tasks_.erase_refactored(rootserver::ObDDLTaskID(tenant_id, task_id)))) { if (OB_HASH_NOT_EXIST == ret) { ret = OB_SUCCESS; } else { - LOG_WARN("remove register task id failed", KR(ret), K(task_id)); + LOG_WARN("remove register task id failed", KR(ret), K(task_id), K(tenant_id)); } } } @@ -136,9 +136,9 @@ int ObDDLHeartBeatTaskContainer::send_task_status_to_rs() sleep(RETRY_TIME_INTERVAL); } } else { - for (common::hash::ObHashMap::iterator it = register_tasks_.begin(); OB_SUCC(ret) && it != register_tasks_.end(); it++) { - int64_t task_id = it->first; - uint64_t tenant_id = it->second; + for (common::hash::ObHashMap::iterator it = register_tasks_.begin(); OB_SUCC(ret) && it != register_tasks_.end(); it++) { + int64_t task_id = it->first.task_id_; + uint64_t tenant_id = it->first.tenant_id_; if (OB_FAIL(heart_beart_task_infos.push_back(ObDDLHeartBeatTaskInfo(task_id, tenant_id)))) { LOG_WARN("task_ids push_back failed", K(ret)); } diff --git a/src/storage/ddl/ob_ddl_heart_beat_task.h b/src/storage/ddl/ob_ddl_heart_beat_task.h index c4ac857af4..e83f829322 100644 --- a/src/storage/ddl/ob_ddl_heart_beat_task.h +++ b/src/storage/ddl/ob_ddl_heart_beat_task.h @@ -14,6 +14,7 @@ #define OCEANBASE_STORAGE_OB_DDL_HEART_BEAT_TASK_H #include "observer/ob_server_struct.h" +#include "rootserver/ddl_task/ob_ddl_task.h" namespace oceanbase { @@ -43,13 +44,13 @@ public: ~ObDDLHeartBeatTaskContainer(); int init(); int set_register_task_id(const int64_t task_id, const uint64_t tenant_id); - int remove_register_task_id(const int64_t task_id); + int remove_register_task_id(const int64_t task_id, const uint64_t tenant_id); int send_task_status_to_rs(); private: static const int64_t BUCKET_LOCK_BUCKET_CNT = 10243L; static const int64_t RETRY_COUNT = 3L; static const int64_t RETRY_TIME_INTERVAL = 100L; - common::hash::ObHashMap register_tasks_; + common::hash::ObHashMap register_tasks_; bool is_inited_; common::ObBucketLock bucket_lock_; }; diff --git a/src/storage/ddl/ob_ddl_server_client.cpp b/src/storage/ddl/ob_ddl_server_client.cpp index aae9d7ed9c..521f0e5246 100644 --- a/src/storage/ddl/ob_ddl_server_client.cpp +++ b/src/storage/ddl/ob_ddl_server_client.cpp @@ -288,7 +288,7 @@ int ObDDLServerClient::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg, } } int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(heart_beat_clear(arg.task_id_))) { + if (OB_TMP_FAIL(heart_beat_clear(arg.task_id_, tenant_id))) { LOG_WARN("heart beat clear failed", K(tmp_ret), K(arg.task_id_)); } } @@ -355,7 +355,7 @@ int ObDDLServerClient::finish_redef_table(const obrpc::ObFinishRedefTableArg &fi } else if (OB_FAIL(sql::ObDDLExecutorUtil::wait_ddl_finish(finish_redef_arg.tenant_id_, finish_redef_arg.task_id_, &session, common_rpc_proxy))) { LOG_WARN("failed to wait ddl finish", K(ret), K(finish_redef_arg.tenant_id_), K(finish_redef_arg.task_id_)); } - if (OB_TMP_FAIL(heart_beat_clear(finish_redef_arg.task_id_))) { + if (OB_TMP_FAIL(heart_beat_clear(finish_redef_arg.task_id_, tenant_id))) { LOG_WARN("heart beat clear failed", K(tmp_ret), K(finish_redef_arg.task_id_)); } } @@ -442,13 +442,13 @@ int ObDDLServerClient::wait_task_reach_pending( return ret; } -int ObDDLServerClient::heart_beat_clear(const int64_t task_id) +int ObDDLServerClient::heart_beat_clear(const int64_t task_id, const uint64_t tenant_id) { int ret = OB_SUCCESS; if (task_id <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(task_id)); - } else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(task_id))) { + } else if (OB_FAIL(OB_DDL_HEART_BEAT_TASK_CONTAINER.remove_register_task_id(task_id, tenant_id))) { LOG_WARN("failed to remove register task id", K(ret), K(task_id)); } return ret; diff --git a/src/storage/ddl/ob_ddl_server_client.h b/src/storage/ddl/ob_ddl_server_client.h index 95d62589b4..28c51b7544 100644 --- a/src/storage/ddl/ob_ddl_server_client.h +++ b/src/storage/ddl/ob_ddl_server_client.h @@ -53,7 +53,7 @@ private: int64_t &snapshot_version, uint64_t &data_format_version, ObMySQLProxy &sql_proxy); - static int heart_beat_clear(const int64_t task_id); + static int heart_beat_clear(const int64_t task_id, const uint64_t tenant_id); static int check_need_stop(const uint64_t tenant_id); };