Files
oceanbase/unittest/observer/test_uniq_task_queue.cpp
2024-05-27 15:11:47 +00:00

330 lines
9.8 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "observer/ob_uniq_task_queue.h"
#include "rpc/mock_ob_common_rpc_proxy.h"
#include "lib/ob_define.h"
#include "share/inner_table/ob_inner_table_schema_constants.h"
#include "share/config/ob_server_config.h"
#include "lib/oblog/ob_log_module.h"
namespace oceanbase
{
namespace observer
{
using namespace common;
using namespace share;
using testing::_;
using ::testing::Return;
using ::testing::AtLeast;
using ::testing::Invoke;
class TestUniqTaskQueue : public ::testing::Test
{
public:
virtual void SetUp() {}
virtual void TearDown() {}
TestUniqTaskQueue() {}
~TestUniqTaskQueue() {}
};
class MockTask : public common::ObDLinkBase<MockTask>
{
public:
friend class MockTaskProcesser;
MockTask()
: group_id_(common::OB_INVALID_ID),
task_id_(common::OB_INVALID_ID) {}
MockTask(const uint64_t group_id,
const uint64_t task_id)
: group_id_(group_id), task_id_(task_id) {}
virtual ~MockTask() {}
void init(const uint64_t group_id,
const uint64_t task_id)
{
group_id_ = group_id;
task_id_ = task_id;
}
int assign(const MockTask &other) {
int ret = OB_SUCCESS;
group_id_ = other.group_id_;
task_id_ = other.task_id_;
return ret;
}
virtual void set_start_timestamp() {}
bool is_valid() const { return true; }
virtual int64_t hash() const
{
uint64_t hash_val = 0;
hash_val = murmurhash(&group_id_, sizeof(group_id_), hash_val);
hash_val = murmurhash(&task_id_, sizeof(task_id_), hash_val);
return static_cast<int64_t>(hash_val);
}
inline int hash(uint64_t &hash_val) const { hash_val = hash(); return OB_SUCCESS; }
uint64_t get_group_id() const { return group_id_; }
bool is_barrier() const { return false; }
bool need_process_alone() const { return true; }
virtual bool operator==(const MockTask &other) const
{
return group_id_ == other.group_id_ && task_id_ == other.task_id_;
}
virtual bool compare_without_version(const MockTask &other) const
{
return *this == other;
}
bool need_assign_when_equal() const { return false; }
int assign_when_equal(const MockTask &other) { UNUSED(other); return OB_NOT_SUPPORTED; }
uint64_t get_task_id() const { return task_id_; }
TO_STRING_KV(K_(group_id), K_(task_id));
private:
uint64_t group_id_;
uint64_t task_id_;
};
class MockTaskProcesser;
typedef ObUniqTaskQueue<MockTask, MockTaskProcesser> MockTaskQueue;
class MockTaskProcesser
{
public:
MockTaskProcesser(MockTaskQueue &queue) : results_(), queue_(queue) {}
virtual int batch_process_tasks(const common::ObIArray<MockTask> &tasks, bool &stopped)
{
UNUSED(stopped);
int ret = OB_SUCCESS;
if (1 != tasks.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task count is invalid", KR(ret), K(tasks.count()));
} else if (OB_FAIL(results_.push_back(tasks.at(0)))) {
LOG_WARN("fail to push back task", KR(ret), K(tasks.count()));
} else if (tasks.at(0).get_group_id() == 1
&& tasks.at(0).get_task_id() == 0) {
MockTask task(1 /*group_id*/, 1 /*task_id*/);
if (OB_FAIL(queue_.add(task))) {
LOG_WARN("fail to add task", KR(ret), K(task));
}
}
return ret;
}
int process_barrier(const MockTask &task, bool &stopped)
{
UNUSEDx(task, stopped);
return OB_NOT_SUPPORTED;
}
ObArray<MockTask >&get_results() { return results_; }
private:
ObArray<MockTask> results_;
MockTaskQueue &queue_;
};
/*
class MockUpdate : public ObPartitionTableUpdater
{
public:
MockUpdate() : batch_count_(0) {}
~MockUpdate() {}
int64_t batch_count() {return batch_count_; }
virtual int batch_process_tasks(const common::ObIArray<ObPTUpdateTask> &tasks, bool &stopped)
{
UNUSED(tasks);
LOG_INFO("batch process task", K(tasks.count()), K(stopped), K(batch_count_));
if (batch_count_ < 10) {
sleep(2);
}
batch_count_ ++;
return OB_SUCCESS;
}
int process_barrier(const ObPTUpdateTask &task, bool &stopped)
{
LOG_INFO("barrier process task", K(task), K(stopped));
return OB_SUCCESS;
}
private:
int64_t batch_count_;
};
TEST_F(TestUniqTaskQueue, test_concurrency_execute)
{
MockUpdate updater;
ObUniqTaskQueue<ObPTUpdateTask, MockUpdate> queue;
queue.init(&updater, 1, 100000);
int64_t core_table_id = combine_id(OB_SYS_TENANT_ID, OB_ALL_CORE_TABLE_TID);
int64_t sys_table_id = combine_id(OB_SYS_TENANT_ID, OB_ALL_DDL_OPERATION_TID);
int64_t user_table_id = combine_id(500001, 100001);
int64_t partition_cnt = 1;
int64_t partition_id = 0;
ObPartitionKey core_key(core_table_id, partition_id, partition_cnt);
ObPartitionKey sys_key(sys_table_id, partition_id, partition_cnt);
ObPartitionKey user_key(user_table_id, partition_id, partition_cnt);
ObPTUpdateTask task;
//同一个partition不同的key,作为不同的batch进行处理
for (int64_t i = 1; i <= 3000; i++) {
EXPECT_EQ(OB_SUCCESS, task.set_update_task(core_key, i));
EXPECT_EQ(OB_SUCCESS, queue.add(task));
}
while (queue.task_count() > 0) {
sleep(3);
}
EXPECT_EQ(updater.batch_count(), 3000);
//不同的partition, 可以作为一个batch 处理
int64_t data_version = 4;
for (int64_t i = 0; i < 3000; i++) {
ObPartitionKey sys_key(sys_table_id, i, 3000);
EXPECT_EQ(OB_SUCCESS, task.set_update_task(sys_key, data_version));
EXPECT_EQ(OB_SUCCESS, queue.add(task));
}
while (queue.task_count() > 0) {
sleep(3);
}
EXPECT_LT(updater.batch_count(), 6000);
}
*/
// bugfix:53694448
TEST_F(TestUniqTaskQueue, test_get_queue_fail)
{
MockTaskQueue queue;
MockTaskProcesser processor(queue);
ASSERT_EQ(OB_SUCCESS, queue.init(&processor, 1 /*thread_num*/, 1024 /*queue_size*/));
// error injection
TP_SET_EVENT(common::EventTable::EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL, OB_ALLOCATE_MEMORY_FAILED, 0, 1);
// add task failed
MockTask task(0 /*group_id*/, 0 /*task_id*/);
ASSERT_EQ(OB_ALLOCATE_MEMORY_FAILED, queue.add(task));
// reset error injection
TP_SET_EVENT(common::EventTable::EN_UNIQ_TASK_QUEUE_GET_GROUP_FAIL, OB_SUCCESS, 0, 1);
// add the same task again
// before bugfix
// add task will fail and do nothing
// ASSERT_EQ(OB_EAGAIN, queue.add(task));
// usleep(1 * 1000 * 1000L); //1s
// ASSERT_EQ(0, processor.get_results().count());
// after bugfix
// add task will success and task will be scheduled
ASSERT_EQ(OB_SUCCESS, queue.add(task));
usleep(1 * 1000 * 1000L); //1s
ASSERT_EQ(1, processor.get_results().count());
ASSERT_EQ(task, processor.get_results().at(0));
}
// bugfix: workitem/49006474
TEST_F(TestUniqTaskQueue, test_queue_starvation)
{
MockTaskQueue queue;
MockTaskProcesser processor(queue);
ASSERT_EQ(OB_SUCCESS, queue.init(&processor, 1 /*thread_num*/, 1024 /*queue_size*/));
// to reproduce the bug scenario's groups and tasks, stop the queue first and add tasks
queue.stop();
queue.wait();
MockTask task(0 /*group_id*/, 0 /*task_id*/);
ASSERT_EQ(OB_SUCCESS, queue.add(task));
(void) task.init(0 /*group_id*/, 1 /*task_id*/);
ASSERT_EQ(OB_SUCCESS, queue.add(task));
(void) task.init(1 /*group_id*/, 0 /*task_id*/);
ASSERT_EQ(OB_SUCCESS, queue.add(task));
/*
* tasks in queue are as follows:
*
* |-----------| |-----------|
* |--group 0--| |--group 1--|
* ... |-----------| <-----> |-----------| <-----> tail
* |--task 0---| |--task 0---|
* |--task 1---|
* |-----------|
*/
// continue to start queue
ASSERT_EQ(OB_SUCCESS, queue.start());
/*
* before task(1, 0) runs, tasks in queue are as follows:
*
* |-----------| |-----------|
* |--group 0--| |--group 1--|
* ... |-----------| <-----> |-----------| <-----> tail
* |--task 1---| |--task 0---|
* |-----------|
* cur_group group
*
* when task(1, 0) runs, task(1, 1) will be generated and tasks in queue are as follows:
*
* |-----------| |-----------|
* |--group 0--| |--group 1--|
* ... |-----------| <-----> |-----------| <-----> tail
* |--task 1---| |--task 1---|
* |-----------|
*/
int64_t max_retry_times = 10;
int64_t retry_times = 0;
while (retry_times < max_retry_times && processor.get_results().count() < 4) {
retry_times++;
usleep(1 * 1000 * 1000L); //1s
}
ASSERT_EQ(4, processor.get_results().count());
// Eventaully, results should be (0, 0)->(1, 0)->(0, 1)->(1, 1) if group rotates.
//
// When group doesn't rotate, results will be (0, 0)->(1, 0)->(1, 1)->(0, 1).
(void) task.init(0 /*group_id*/, 0 /*task_id*/);
ASSERT_EQ(task, processor.get_results().at(0));
(void) task.init(1 /*group_id*/, 0 /*task_id*/);
ASSERT_EQ(task, processor.get_results().at(1));
(void) task.init(0 /*group_id*/, 1 /*task_id*/);
ASSERT_EQ(task, processor.get_results().at(2));
(void) task.init(1 /*group_id*/, 1 /*task_id*/);
ASSERT_EQ(task, processor.get_results().at(3));
GCONF.debug_sync_timeout.set_value("0");
ASSERT_FALSE(GCONF.is_debug_sync_enabled());
}
} //namespace observer
} //namespace oceanbase
int main(int argc, char **argv)
{
system("rm -f test_uniq_task_queue.log");
oceanbase::common::ObLogger::get_logger().set_log_level("WDIAG");
OB_LOGGER.set_file_name("test_uniq_task_queue.log", true);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}