[CP] Fix garbage tasks remain problem

This commit is contained in:
tino247
2024-01-18 04:43:10 +00:00
committed by ob-robot
parent e52f385a43
commit d50e4d0662
11 changed files with 55 additions and 89 deletions

View File

@ -796,6 +796,7 @@ class EventTable
EN_ENABLE_ROWKEY_CONFLICT_CHECK = 2103,
EN_ENABLE_ORA_DECINT_CONST = 2104,
EN_ENABLE_CLEAN_INTERM_RES = 2105,
EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL = 2106,
EN_DISABLE_VEC_SORT = 2200,
EN_DISABLE_VEC_HASH_DISTINCT = 2201,

View File

@ -78,13 +78,6 @@ public:
uint64_t get_tenant_id() const { return schema_info_.get_tenant_id(); }
uint64_t get_schema_version() const { return schema_info_.get_schema_version(); }
virtual bool need_assign_when_equal() const { return false; }
virtual int assign_when_equal(const ObServerSchemaTask &other)
{
UNUSED(other);
return common::OB_NOT_SUPPORTED;
}
TO_STRING_KV(K_(type), K_(did_retry), K_(schema_info));
private:

View File

@ -21,6 +21,7 @@
#include "lib/list/ob_dlist.h"
#include "lib/queue/ob_dedup_queue.h"
#include "lib/lock/ob_thread_cond.h"
#include "lib/utility/ob_tracepoint.h"
#include "share/ob_thread_pool.h"
#include "share/ob_debug_sync.h"
#include "share/ob_debug_sync_point.h"
@ -74,8 +75,6 @@ public:
virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; };
virtual bool compare_without_version(const Task &other) const = 0;
// The following two interfaces are used to update the waiting tasks with same key.
virtual bool need_assign_when_equal() const = 0;
virtual int assign_when_equal(const Task &other) = 0;
// get_group_id() is used for classifying tasks and batch processing
virtual uint64_t get_group_id() const = 0;
// for diagnose
@ -335,17 +334,8 @@ int ObUniqTaskQueue<Task, Process>::add(const Task &task)
const Task *stored_task = NULL;
if (OB_FAIL(task_set_.set_refactored(task, 0))) {
if (common::OB_HASH_EXIST == ret) {
if (task.need_assign_when_equal()) {
if (NULL == (stored_task = task_set_.get(task))) {
ret = common::OB_ERR_SYS;
SERVER_LOG(WARN, "get inserted task failed", K(ret), K(task));
} else if (OB_FAIL(const_cast<Task *>(stored_task)->assign_when_equal(task))) {
SERVER_LOG(WARN, "assign task failed", K(ret), K(task));
}
} else {
ret = common::OB_EAGAIN;
SERVER_LOG(TRACE, "same task exist", K(task));
}
SERVER_LOG(TRACE, "same task exist", K(ret), K(task));
} else {
SERVER_LOG(WARN, "insert into hash failed", K(ret), K(task));
}
@ -356,13 +346,19 @@ int ObUniqTaskQueue<Task, Process>::add(const Task &task)
Group *group = NULL;
const uint64_t group_id = task.get_group_id();
if (OB_FAIL(get_group(group_id, group))) {
SERVER_LOG(WARN, "get group failed", K(group_id), K(ret));
SERVER_LOG(WARN, "get group failed", K(ret), K(group_id));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(task_set_.erase_refactored(*stored_task))) {
SERVER_LOG(ERROR, "fail to erase task from uniq queue", K(tmp_ret), K(task));
} else {
stored_task = NULL;
}
} else if (NULL == group) {
ret = common::OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "group is null", K(group_id), K(ret));
SERVER_LOG(WARN, "group is null", K(ret), K(group_id));
} else if (!group->list_.add_last(const_cast<Task *>(stored_task))) {
ret = common::OB_ERR_SYS;
SERVER_LOG(WARN, "add task to list failed", K(ret));
SERVER_LOG(ERROR, "add task to list failed", K(ret));
} else {
++task_count_;
int tmp_ret = common::OB_SUCCESS;
@ -425,13 +421,13 @@ void ObUniqTaskQueue<Task, Process>::run1()
SERVER_LOG(WARN, "remove first return null", K(ret));
} else if (OB_FAIL(tasks.push_back(*t))) {
SERVER_LOG(WARN, "push_back failed", K(ret));
} else if (OB_FAIL(task_set_.erase_refactored(*t))) {
SERVER_LOG(ERROR, "erase task from task map failed",
K(ret), "task", tasks.at(tasks.count() - 1));
} else {
if (OB_FAIL(task_set_.erase_refactored(*t))) {
SERVER_LOG(WARN, "erase task from task map failed", K(ret), "task", *t);
} else {
t = NULL;
--task_count_;
}
}
if (OB_SUCC(ret)
&& tasks.at(tasks.count() - 1).need_process_alone()) {
break;
@ -618,6 +614,9 @@ int ObUniqTaskQueue<Task, Process>::get_group(const uint64_t group_id, Group *&g
} else if (common::OB_INVALID_ID == group_id) {
ret = common::OB_INVALID_ARGUMENT;
SERVER_LOG(WARN, "invalid group_id", K(group_id), K(ret));
} else if (OB_UNLIKELY(EVENT_CALL(EventTable::EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
SERVER_LOG(WARN, "inject error when uniq task queue get_group() failed", K(ret));
} else {
Group new_group;
if (OB_FAIL(group_map_.set_refactored(group_id, new_group)) && common::OB_HASH_EXIST != ret) {

View File

@ -108,13 +108,6 @@ bool ObLSTableUpdateTask::compare_without_version(
return (*this == other);
}
int ObLSTableUpdateTask::assign_when_equal(
const ObLSTableUpdateTask &other)
{
UNUSED(other);
return OB_NOT_SUPPORTED;
}
ObLSTableUpdateQueueSet::ObLSTableUpdateQueueSet(
ObLSTableUpdater *updater)
: inited_(false),

View File

@ -47,7 +47,6 @@ public:
virtual void reset();
virtual bool is_barrier() const { return false; }
virtual bool need_process_alone() const { return true; }
virtual bool need_assign_when_equal() const { return false; }
virtual bool is_valid() const;
virtual int64_t hash() const;
virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; };
@ -55,7 +54,6 @@ public:
virtual bool operator!=(const ObLSTableUpdateTask &other) const;
virtual bool compare_without_version(const ObLSTableUpdateTask &other) const;
virtual uint64_t get_group_id() const { return tenant_id_; }
virtual int assign_when_equal(const ObLSTableUpdateTask &other);
inline int64_t get_tenant_id() const { return tenant_id_; }
inline share::ObLSID get_ls_id() const { return ls_id_; }
inline int64_t get_add_timestamp() const { return add_timestamp_; }

View File

@ -111,12 +111,6 @@ public:
int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; };
bool compare_without_version(
const ObTabletTableUpdateTask &other) const;
inline bool need_assign_when_equal() const { return false; }
inline int assign_when_equal(const ObTabletTableUpdateTask &other)
{
UNUSED(other);
return common::OB_NOT_SUPPORTED;
}
// TODO: need to realize barrier related functions
bool is_barrier() const;

View File

@ -126,13 +126,6 @@ bool ObDRTaskTableUpdateTask::compare_without_version(
return (*this == other);
}
int ObDRTaskTableUpdateTask::assign_when_equal(
const ObDRTaskTableUpdateTask &other)
{
UNUSED(other);
return OB_NOT_SUPPORTED;
}
int ObDRTaskTableUpdater::init(
common::ObMySQLProxy *sql_proxy,
ObDRTaskMgr *task_mgr)

View File

@ -92,8 +92,6 @@ public:
// unused functions
virtual bool is_barrier() const { return false; }
virtual bool need_process_alone() const { return true; }
virtual bool need_assign_when_equal() const { return false; }
virtual int assign_when_equal(const ObDRTaskTableUpdateTask &other);
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(task_type), K_(task_id),
K_(task_key), K_(ret_code), K_(need_clear_server_data_in_limit),

View File

@ -110,13 +110,6 @@ bool ObLSLocationUpdateTask::compare_without_version(
return (*this == other);
}
int ObLSLocationUpdateTask::assign_when_equal(
const ObLSLocationUpdateTask &other)
{
UNUSED(other);
return OB_NOT_SUPPORTED;
}
int ObTabletLSUpdateTask::init(
const uint64_t tenant_id,
const ObTabletID &tablet_id,
@ -187,13 +180,6 @@ bool ObTabletLSUpdateTask::compare_without_version(
return (*this == other);
}
int ObTabletLSUpdateTask::assign_when_equal(
const ObTabletLSUpdateTask &other)
{
UNUSED(other);
return OB_NOT_SUPPORTED;
}
ObLSLocationTimerTask::ObLSLocationTimerTask(
ObLSLocationService &ls_loc_service)
: ls_loc_service_(ls_loc_service)
@ -318,13 +304,6 @@ bool ObVTableLocUpdateTask::compare_without_version(
return (*this == other);
}
int ObVTableLocUpdateTask::assign_when_equal(
const ObVTableLocUpdateTask &other)
{
UNUSED(other);
return OB_NOT_SUPPORTED;
}
ObClearTabletLSCacheTimerTask::ObClearTabletLSCacheTimerTask(
ObTabletLSService &tablet_ls_service)
: tablet_ls_service_(tablet_ls_service)
@ -427,13 +406,6 @@ bool ObTabletLocationBroadcastTask::compare_without_version
return (*this == other);
}
int ObTabletLocationBroadcastTask::assign_when_equal(
const ObTabletLocationBroadcastTask &other)
{
UNUSED(other);
return OB_NOT_SUPPORTED;
}
OB_SERIALIZE_MEMBER(ObTabletLocationBroadcastTask,
tenant_id_,
task_id_,

View File

@ -57,7 +57,6 @@ public:
virtual void reset();
virtual bool is_barrier() const { return false; }
virtual bool need_process_alone() const { return true; }
virtual bool need_assign_when_equal() const { return false; }
virtual bool is_valid() const;
virtual int64_t hash() const;
virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }
@ -65,7 +64,6 @@ public:
virtual bool operator!=(const ObLSLocationUpdateTask &other) const;
virtual bool compare_without_version(const ObLSLocationUpdateTask &other) const;
virtual uint64_t get_group_id() const { return tenant_id_; }
virtual int assign_when_equal(const ObLSLocationUpdateTask &other);
inline int64_t get_cluster_id() const { return cluster_id_; }
inline int64_t get_tenant_id() const { return tenant_id_; }
@ -106,7 +104,6 @@ public:
virtual void reset();
virtual bool is_barrier() const { return false; }
virtual bool need_process_alone() const { return false; }
virtual bool need_assign_when_equal() const { return false; }
virtual bool is_valid() const;
virtual int64_t hash() const;
virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }
@ -114,7 +111,6 @@ public:
virtual bool operator!=(const ObTabletLSUpdateTask &other) const;
virtual bool compare_without_version(const ObTabletLSUpdateTask &other) const;
virtual uint64_t get_group_id() const { return tenant_id_; }
virtual int assign_when_equal(const ObTabletLSUpdateTask &other);
inline int64_t get_tenant_id() const { return tenant_id_; }
inline ObTabletID get_tablet_id() const { return tablet_id_; }
@ -181,7 +177,6 @@ public:
virtual void reset();
virtual bool is_barrier() const { return false; }
virtual bool need_process_alone() const { return true; }
virtual bool need_assign_when_equal() const { return false; }
virtual bool is_valid() const;
virtual int64_t hash() const;
virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }
@ -189,7 +184,6 @@ public:
virtual bool operator!=(const ObVTableLocUpdateTask &other) const;
virtual bool compare_without_version(const ObVTableLocUpdateTask &other) const;
virtual uint64_t get_group_id() const { return tenant_id_; }
virtual int assign_when_equal(const ObVTableLocUpdateTask &other);
inline int64_t get_tenant_id() const { return tenant_id_; }
inline uint64_t get_table_id() const { return table_id_; }
@ -229,7 +223,6 @@ public:
virtual void reset();
virtual bool is_barrier() const { return false; }
virtual bool need_process_alone() const { return true; } // process 1 task each time
virtual bool need_assign_when_equal() const { return false; }
virtual bool is_valid() const;
virtual int64_t hash() const;
virtual int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }
@ -237,7 +230,6 @@ public:
virtual bool operator!=(const ObTabletLocationBroadcastTask &other) const;
virtual bool compare_without_version(const ObTabletLocationBroadcastTask &other) const;
virtual uint64_t get_group_id() const { return tenant_id_; }
virtual int assign_when_equal(const ObTabletLocationBroadcastTask &other);
inline uint64_t get_tenant_id() const { return tenant_id_; }
inline const ObTransferTaskID &get_task_id() const { return task_id_; }

View File

@ -203,6 +203,39 @@ TEST_F(TestUniqTaskQueue, test_concurrency_execute)
}
*/
// bugfix:53694448
TEST_F(TestUniqTaskQueue, test_get_queue_fail)
{
MockTaskQueue queue;
MockTaskProcesser processor(queue);
ASSERT_EQ(OB_SUCCESS, queue.init(&processor, 1 /*thread_num*/, 1024 /*queue_size*/));
// error injection
TP_SET_EVENT(common::EventTable::EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL, OB_ALLOCATE_MEMORY_FAILED, 0, 1);
// add task failed
MockTask task(0 /*group_id*/, 0 /*task_id*/);
ASSERT_EQ(OB_ALLOCATE_MEMORY_FAILED, queue.add(task));
// reset error injection
TP_SET_EVENT(common::EventTable::EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL, OB_SUCCESS, 0, 1);
// add the same task again
// before bugfix
// add task will fail and do nothing
// ASSERT_EQ(OB_EAGAIN, queue.add(task));
// usleep(1 * 1000 * 1000L); //1s
// ASSERT_EQ(0, processor.get_results().count());
// after bugfix
// add task will success and task will be scheduled
ASSERT_EQ(OB_SUCCESS, queue.add(task));
usleep(1 * 1000 * 1000L); //1s
ASSERT_EQ(1, processor.get_results().count());
ASSERT_EQ(task, processor.get_results().at(0));
}
// bugfix: workitem/49006474
TEST_F(TestUniqTaskQueue, test_queue_starvation)
{
@ -297,7 +330,7 @@ TEST_F(TestUniqTaskQueue, test_queue_starvation)
int main(int argc, char **argv)
{
system("rm -f test_uniq_task_queue.log");
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
oceanbase::common::ObLogger::get_logger().set_log_level("WDIAG");
OB_LOGGER.set_file_name("test_uniq_task_queue.log", true);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();