From 120a638ecdd4eb6d82f017ef78e878651537d094 Mon Sep 17 00:00:00 2001 From: tino247 Date: Fri, 9 Feb 2024 15:44:05 +0000 Subject: [PATCH] [CP] Fix garbage tasks remain problem --- deps/oblib/src/lib/utility/ob_tracepoint.h | 1 + src/observer/ob_server_schema_updater.h | 7 ---- src/observer/ob_uniq_task_queue.h | 41 +++++++++---------- src/observer/report/ob_ls_table_updater.cpp | 7 ---- src/observer/report/ob_ls_table_updater.h | 2 - src/observer/report/ob_tablet_table_updater.h | 6 --- ...b_disaster_recovery_task_table_updater.cpp | 7 ---- .../ob_disaster_recovery_task_table_updater.h | 2 - .../ob_location_update_task.cpp | 28 ------------- .../location_cache/ob_location_update_task.h | 8 ---- unittest/observer/test_uniq_task_queue.cpp | 35 +++++++++++++++- 11 files changed, 55 insertions(+), 89 deletions(-) diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index ee982c86e..817273ebe 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -796,6 +796,7 @@ class EventTable EN_ENABLE_ROWKEY_CONFLICT_CHECK = 2103, EN_ENABLE_ORA_DECINT_CONST = 2104, EN_ENABLE_CLEAN_INTERM_RES = 2105, + EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL = 2106, EN_DISABLE_VEC_SORT = 2200, EN_DISABLE_VEC_HASH_DISTINCT = 2201, diff --git a/src/observer/ob_server_schema_updater.h b/src/observer/ob_server_schema_updater.h index 5eda2922b..d90bbbc08 100644 --- a/src/observer/ob_server_schema_updater.h +++ b/src/observer/ob_server_schema_updater.h @@ -78,13 +78,6 @@ public: uint64_t get_tenant_id() const { return schema_info_.get_tenant_id(); } uint64_t get_schema_version() const { return schema_info_.get_schema_version(); } - virtual bool need_assign_when_equal() const { return false; } - virtual int assign_when_equal(const ObServerSchemaTask &other) - { - UNUSED(other); - return common::OB_NOT_SUPPORTED; - } - TO_STRING_KV(K_(type), K_(did_retry), K_(schema_info)); private: diff --git a/src/observer/ob_uniq_task_queue.h b/src/observer/ob_uniq_task_queue.h index 45ff31264..df796440c 100644 --- a/src/observer/ob_uniq_task_queue.h +++ b/src/observer/ob_uniq_task_queue.h @@ -21,6 +21,7 @@ #include "lib/list/ob_dlist.h" #include "lib/queue/ob_dedup_queue.h" #include "lib/lock/ob_thread_cond.h" +#include "lib/utility/ob_tracepoint.h" #include "share/ob_thread_pool.h" #include "share/ob_debug_sync.h" #include "share/ob_debug_sync_point.h" @@ -74,8 +75,6 @@ public: virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }; virtual bool compare_without_version(const Task &other) const = 0; // The following two interfaces are used to update the waiting tasks with same key. - virtual bool need_assign_when_equal() const = 0; - virtual int assign_when_equal(const Task &other) = 0; // get_group_id() is used for classifying tasks and batch processing virtual uint64_t get_group_id() const = 0; // for diagnose @@ -335,17 +334,8 @@ int ObUniqTaskQueue::add(const Task &task) const Task *stored_task = NULL; if (OB_FAIL(task_set_.set_refactored(task, 0))) { if (common::OB_HASH_EXIST == ret) { - if (task.need_assign_when_equal()) { - if (NULL == (stored_task = task_set_.get(task))) { - ret = common::OB_ERR_SYS; - SERVER_LOG(WARN, "get inserted task failed", K(ret), K(task)); - } else if (OB_FAIL(const_cast(stored_task)->assign_when_equal(task))) { - SERVER_LOG(WARN, "assign task failed", K(ret), K(task)); - } - } else { - ret = common::OB_EAGAIN; - SERVER_LOG(TRACE, "same task exist", K(task)); - } + ret = common::OB_EAGAIN; + SERVER_LOG(TRACE, "same task exist", K(ret), K(task)); } else { SERVER_LOG(WARN, "insert into hash failed", K(ret), K(task)); } @@ -356,13 +346,19 @@ int ObUniqTaskQueue::add(const Task &task) Group *group = NULL; const uint64_t group_id = task.get_group_id(); if (OB_FAIL(get_group(group_id, group))) { - SERVER_LOG(WARN, "get group failed", K(group_id), K(ret)); + SERVER_LOG(WARN, "get group failed", K(ret), K(group_id)); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(task_set_.erase_refactored(*stored_task))) { + SERVER_LOG(ERROR, "fail to erase task from uniq queue", K(tmp_ret), K(task)); + } else { + stored_task = NULL; + } } else if (NULL == group) { ret = common::OB_ERR_UNEXPECTED; - SERVER_LOG(WARN, "group is null", K(group_id), K(ret)); + SERVER_LOG(WARN, "group is null", K(ret), K(group_id)); } else if (!group->list_.add_last(const_cast(stored_task))) { ret = common::OB_ERR_SYS; - SERVER_LOG(WARN, "add task to list failed", K(ret)); + SERVER_LOG(ERROR, "add task to list failed", K(ret)); } else { ++task_count_; int tmp_ret = common::OB_SUCCESS; @@ -425,12 +421,12 @@ void ObUniqTaskQueue::run1() SERVER_LOG(WARN, "remove first return null", K(ret)); } else if (OB_FAIL(tasks.push_back(*t))) { SERVER_LOG(WARN, "push_back failed", K(ret)); + } else if (OB_FAIL(task_set_.erase_refactored(*t))) { + SERVER_LOG(ERROR, "erase task from task map failed", + K(ret), "task", tasks.at(tasks.count() - 1)); } else { - if (OB_FAIL(task_set_.erase_refactored(*t))) { - SERVER_LOG(WARN, "erase task from task map failed", K(ret), "task", *t); - } else { - --task_count_; - } + t = NULL; + --task_count_; } if (OB_SUCC(ret) && tasks.at(tasks.count() - 1).need_process_alone()) { @@ -618,6 +614,9 @@ int ObUniqTaskQueue::get_group(const uint64_t group_id, Group *&g } else if (common::OB_INVALID_ID == group_id) { ret = common::OB_INVALID_ARGUMENT; SERVER_LOG(WARN, "invalid group_id", K(group_id), K(ret)); + } else if (OB_UNLIKELY(EVENT_CALL(EventTable::EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL))) { + ret = common::OB_ALLOCATE_MEMORY_FAILED; + SERVER_LOG(WARN, "inject error when uniq task queue get_group() failed", K(ret)); } else { Group new_group; if (OB_FAIL(group_map_.set_refactored(group_id, new_group)) && common::OB_HASH_EXIST != ret) { diff --git a/src/observer/report/ob_ls_table_updater.cpp b/src/observer/report/ob_ls_table_updater.cpp index 11c412415..4c5eaa716 100644 --- a/src/observer/report/ob_ls_table_updater.cpp +++ b/src/observer/report/ob_ls_table_updater.cpp @@ -108,13 +108,6 @@ bool ObLSTableUpdateTask::compare_without_version( return (*this == other); } -int ObLSTableUpdateTask::assign_when_equal( - const ObLSTableUpdateTask &other) -{ - UNUSED(other); - return OB_NOT_SUPPORTED; -} - ObLSTableUpdateQueueSet::ObLSTableUpdateQueueSet( ObLSTableUpdater *updater) : inited_(false), diff --git a/src/observer/report/ob_ls_table_updater.h b/src/observer/report/ob_ls_table_updater.h index 12616c4e5..d7f2cfcc2 100644 --- a/src/observer/report/ob_ls_table_updater.h +++ b/src/observer/report/ob_ls_table_updater.h @@ -47,7 +47,6 @@ public: virtual void reset(); virtual bool is_barrier() const { return false; } virtual bool need_process_alone() const { return true; } - virtual bool need_assign_when_equal() const { return false; } virtual bool is_valid() const; virtual int64_t hash() const; virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }; @@ -55,7 +54,6 @@ public: virtual bool operator!=(const ObLSTableUpdateTask &other) const; virtual bool compare_without_version(const ObLSTableUpdateTask &other) const; virtual uint64_t get_group_id() const { return tenant_id_; } - virtual int assign_when_equal(const ObLSTableUpdateTask &other); inline int64_t get_tenant_id() const { return tenant_id_; } inline share::ObLSID get_ls_id() const { return ls_id_; } inline int64_t get_add_timestamp() const { return add_timestamp_; } diff --git a/src/observer/report/ob_tablet_table_updater.h b/src/observer/report/ob_tablet_table_updater.h index 86674a527..b0f228ac7 100644 --- a/src/observer/report/ob_tablet_table_updater.h +++ b/src/observer/report/ob_tablet_table_updater.h @@ -111,12 +111,6 @@ public: int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }; bool compare_without_version( const ObTabletTableUpdateTask &other) const; - inline bool need_assign_when_equal() const { return false; } - inline int assign_when_equal(const ObTabletTableUpdateTask &other) - { - UNUSED(other); - return common::OB_NOT_SUPPORTED; - } // TODO: need to realize barrier related functions bool is_barrier() const; diff --git a/src/rootserver/ob_disaster_recovery_task_table_updater.cpp b/src/rootserver/ob_disaster_recovery_task_table_updater.cpp index 73d056589..32fd9cd9c 100644 --- a/src/rootserver/ob_disaster_recovery_task_table_updater.cpp +++ b/src/rootserver/ob_disaster_recovery_task_table_updater.cpp @@ -126,13 +126,6 @@ bool ObDRTaskTableUpdateTask::compare_without_version( return (*this == other); } -int ObDRTaskTableUpdateTask::assign_when_equal( - const ObDRTaskTableUpdateTask &other) -{ - UNUSED(other); - return OB_NOT_SUPPORTED; -} - int ObDRTaskTableUpdater::init( common::ObMySQLProxy *sql_proxy, ObDRTaskMgr *task_mgr) diff --git a/src/rootserver/ob_disaster_recovery_task_table_updater.h b/src/rootserver/ob_disaster_recovery_task_table_updater.h index df7ac7d24..c4d0aa471 100644 --- a/src/rootserver/ob_disaster_recovery_task_table_updater.h +++ b/src/rootserver/ob_disaster_recovery_task_table_updater.h @@ -92,8 +92,6 @@ public: // unused functions virtual bool is_barrier() const { return false; } virtual bool need_process_alone() const { return true; } - virtual bool need_assign_when_equal() const { return false; } - virtual int assign_when_equal(const ObDRTaskTableUpdateTask &other); TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(task_type), K_(task_id), K_(task_key), K_(ret_code), K_(need_clear_server_data_in_limit), diff --git a/src/share/location_cache/ob_location_update_task.cpp b/src/share/location_cache/ob_location_update_task.cpp index 7d0571c76..ef6792532 100644 --- a/src/share/location_cache/ob_location_update_task.cpp +++ b/src/share/location_cache/ob_location_update_task.cpp @@ -110,13 +110,6 @@ bool ObLSLocationUpdateTask::compare_without_version( return (*this == other); } -int ObLSLocationUpdateTask::assign_when_equal( - const ObLSLocationUpdateTask &other) -{ - UNUSED(other); - return OB_NOT_SUPPORTED; -} - int ObTabletLSUpdateTask::init( const uint64_t tenant_id, const ObTabletID &tablet_id, @@ -187,13 +180,6 @@ bool ObTabletLSUpdateTask::compare_without_version( return (*this == other); } -int ObTabletLSUpdateTask::assign_when_equal( - const ObTabletLSUpdateTask &other) -{ - UNUSED(other); - return OB_NOT_SUPPORTED; -} - ObLSLocationTimerTask::ObLSLocationTimerTask( ObLSLocationService &ls_loc_service) : ls_loc_service_(ls_loc_service) @@ -318,13 +304,6 @@ bool ObVTableLocUpdateTask::compare_without_version( return (*this == other); } -int ObVTableLocUpdateTask::assign_when_equal( - const ObVTableLocUpdateTask &other) -{ - UNUSED(other); - return OB_NOT_SUPPORTED; -} - ObClearTabletLSCacheTimerTask::ObClearTabletLSCacheTimerTask( ObTabletLSService &tablet_ls_service) : tablet_ls_service_(tablet_ls_service) @@ -427,13 +406,6 @@ bool ObTabletLocationBroadcastTask::compare_without_version return (*this == other); } -int ObTabletLocationBroadcastTask::assign_when_equal( - const ObTabletLocationBroadcastTask &other) -{ - UNUSED(other); - return OB_NOT_SUPPORTED; -} - OB_SERIALIZE_MEMBER(ObTabletLocationBroadcastTask, tenant_id_, task_id_, diff --git a/src/share/location_cache/ob_location_update_task.h b/src/share/location_cache/ob_location_update_task.h index 2a66f5e6a..9fbbeaaaf 100644 --- a/src/share/location_cache/ob_location_update_task.h +++ b/src/share/location_cache/ob_location_update_task.h @@ -57,7 +57,6 @@ public: virtual void reset(); virtual bool is_barrier() const { return false; } virtual bool need_process_alone() const { return true; } - virtual bool need_assign_when_equal() const { return false; } virtual bool is_valid() const; virtual int64_t hash() const; virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; } @@ -65,7 +64,6 @@ public: virtual bool operator!=(const ObLSLocationUpdateTask &other) const; virtual bool compare_without_version(const ObLSLocationUpdateTask &other) const; virtual uint64_t get_group_id() const { return tenant_id_; } - virtual int assign_when_equal(const ObLSLocationUpdateTask &other); inline int64_t get_cluster_id() const { return cluster_id_; } inline int64_t get_tenant_id() const { return tenant_id_; } @@ -106,7 +104,6 @@ public: virtual void reset(); virtual bool is_barrier() const { return false; } virtual bool need_process_alone() const { return false; } - virtual bool need_assign_when_equal() const { return false; } virtual bool is_valid() const; virtual int64_t hash() const; virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; } @@ -114,7 +111,6 @@ public: virtual bool operator!=(const ObTabletLSUpdateTask &other) const; virtual bool compare_without_version(const ObTabletLSUpdateTask &other) const; virtual uint64_t get_group_id() const { return tenant_id_; } - virtual int assign_when_equal(const ObTabletLSUpdateTask &other); inline int64_t get_tenant_id() const { return tenant_id_; } inline ObTabletID get_tablet_id() const { return tablet_id_; } @@ -181,7 +177,6 @@ public: virtual void reset(); virtual bool is_barrier() const { return false; } virtual bool need_process_alone() const { return true; } - virtual bool need_assign_when_equal() const { return false; } virtual bool is_valid() const; virtual int64_t hash() const; virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; } @@ -189,7 +184,6 @@ public: virtual bool operator!=(const ObVTableLocUpdateTask &other) const; virtual bool compare_without_version(const ObVTableLocUpdateTask &other) const; virtual uint64_t get_group_id() const { return tenant_id_; } - virtual int assign_when_equal(const ObVTableLocUpdateTask &other); inline int64_t get_tenant_id() const { return tenant_id_; } inline uint64_t get_table_id() const { return table_id_; } @@ -229,7 +223,6 @@ public: virtual void reset(); virtual bool is_barrier() const { return false; } virtual bool need_process_alone() const { return true; } // process 1 task each time - virtual bool need_assign_when_equal() const { return false; } virtual bool is_valid() const; virtual int64_t hash() const; virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; } @@ -237,7 +230,6 @@ public: virtual bool operator!=(const ObTabletLocationBroadcastTask &other) const; virtual bool compare_without_version(const ObTabletLocationBroadcastTask &other) const; virtual uint64_t get_group_id() const { return tenant_id_; } - virtual int assign_when_equal(const ObTabletLocationBroadcastTask &other); inline uint64_t get_tenant_id() const { return tenant_id_; } inline const ObTransferTaskID &get_task_id() const { return task_id_; } diff --git a/unittest/observer/test_uniq_task_queue.cpp b/unittest/observer/test_uniq_task_queue.cpp index a061d0744..975a45cb8 100644 --- a/unittest/observer/test_uniq_task_queue.cpp +++ b/unittest/observer/test_uniq_task_queue.cpp @@ -203,6 +203,39 @@ TEST_F(TestUniqTaskQueue, test_concurrency_execute) } */ +// bugfix:53694448 +TEST_F(TestUniqTaskQueue, test_get_queue_fail) +{ + MockTaskQueue queue; + MockTaskProcesser processor(queue); + ASSERT_EQ(OB_SUCCESS, queue.init(&processor, 1 /*thread_num*/, 1024 /*queue_size*/)); + + // error injection + TP_SET_EVENT(common::EventTable::EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL, OB_ALLOCATE_MEMORY_FAILED, 0, 1); + + // add task failed + MockTask task(0 /*group_id*/, 0 /*task_id*/); + ASSERT_EQ(OB_ALLOCATE_MEMORY_FAILED, queue.add(task)); + + // reset error injection + TP_SET_EVENT(common::EventTable::EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL, OB_SUCCESS, 0, 1); + + // add the same task again + + // before bugfix + // add task will fail and do nothing + // ASSERT_EQ(OB_EAGAIN, queue.add(task)); + // usleep(1 * 1000 * 1000L); //1s + // ASSERT_EQ(0, processor.get_results().count()); + + // after bugfix + // add task will success and task will be scheduled + ASSERT_EQ(OB_SUCCESS, queue.add(task)); + usleep(1 * 1000 * 1000L); //1s + ASSERT_EQ(1, processor.get_results().count()); + ASSERT_EQ(task, processor.get_results().at(0)); +} + // bugfix: workitem/49006474 TEST_F(TestUniqTaskQueue, test_queue_starvation) { @@ -297,7 +330,7 @@ TEST_F(TestUniqTaskQueue, test_queue_starvation) int main(int argc, char **argv) { system("rm -f test_uniq_task_queue.log"); - oceanbase::common::ObLogger::get_logger().set_log_level("INFO"); + oceanbase::common::ObLogger::get_logger().set_log_level("WDIAG"); OB_LOGGER.set_file_name("test_uniq_task_queue.log", true); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();