Files
oceanbase/unittest/storage/ddl/test_auto_split_polling_mgr.cpp

350 lines
13 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE
#define ASSERT_OK(x) ASSERT_EQ(OB_SUCCESS, (x))
#include <gtest/gtest.h>
#define private public
#define protected public
#include "share/scheduler/ob_partition_auto_split_helper.h"
#undef private
namespace oceanbase
{
using namespace common;
using namespace lib;
using namespace share;
using namespace storage;
using namespace blocksstable;
using namespace share::schema;
using namespace compaction;
static const uint64_t TEST_TENANT_A_ID = 1002;
static const uint64_t TEST_TENANT_B_ID = 1003;
static const uint64_t TEST_TENANT_C_ID = 1004;
static const ObLSID ls_id{1};
class TestSplitTaskScheduler : public ::testing::Test
{
public:
TestSplitTaskScheduler()
: rs_scheduler_(ObRsAutoSplitScheduler::get_instance()), polling_mgr_(rs_scheduler_.polling_mgr_), sys_tenant_(OB_SYS_TENANT_ID)
{
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(TEST_TENANT_A_ID);
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(TEST_TENANT_B_ID);
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(TEST_TENANT_C_ID);
init();
}
virtual void SetUp();
void reset();
int init() { return rs_scheduler_.init(); };
public:
ObRsAutoSplitScheduler &rs_scheduler_;
ObAutoSplitTaskPollingMgr &polling_mgr_;
ObTenantBase sys_tenant_;
};
void pop(ObArray<ObAutoSplitTask> &task_array, ObAutoSplitTaskPollingMgr &polling_mgr)
{
ObArray<ObArray<ObAutoSplitTask>> tenant_task_array;
int64_t pop_nums = 1;
while (ATOMIC_LOAD(&polling_mgr.total_tasks_) > 0) {
tenant_task_array.reuse();
ASSERT_OK(polling_mgr.pop_tasks(pop_nums, tenant_task_array));
if (tenant_task_array.count() == 0) {
break;
}
for (int64_t i = 0; i < tenant_task_array.count(); ++i) {
task_array.push_back(tenant_task_array.at(i));
}
pop_nums%=10;
pop_nums+=1;
}
}
void push(const ObArray<ObArray<ObAutoSplitTask>> &tenants_task_array, ObAutoSplitTaskPollingMgr &polling_mgr)
{
for (int64_t i = 0; i < tenants_task_array.count(); ++i) {
int ret = polling_mgr.push_tasks(tenants_task_array.at(i));
ASSERT_OK(ret);
}
}
void TestSplitTaskScheduler::reset()
{
rs_scheduler_.reset();
}
void TestSplitTaskScheduler::SetUp()
{
ObTenantEnv::set_tenant(&sys_tenant_);
}
TEST_F(TestSplitTaskScheduler, simple_push_and_pop)
{
ObArray<ObAutoSplitTask> task_array;
ObArray<uint64_t> ids;
ObAutoSplitTask task_a(TEST_TENANT_A_ID, ls_id, 1/*tablet_id*/, 1/*auto_split_size*/, 2/*used_disk_size*/, 0);
ObAutoSplitTask task_b(TEST_TENANT_A_ID, ls_id, 2/*tablet_id*/, 1/*auto_split_size*/, 3/*used_disk_size*/, 0);
ObAutoSplitTask task_c(TEST_TENANT_B_ID, ls_id, 3/*tablet_id*/, 1/*auto_split_size*/, 4/*used_disk_size*/, 0);
ASSERT_OK(task_array.push_back(task_a));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
ASSERT_OK(task_array.push_back(task_c));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
ASSERT_OK(task_array.push_back(task_b));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
// we expect the first tablet_id popped from mgr to be 2,
// because the tenant_cache A is at index 0 and the last_access_index is 0, and the tablet with id 2 has the hightest priority in that cache
ObArray<ObArray<ObAutoSplitTask>> tenants_task_array;
ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array));
ASSERT_EQ(1, tenants_task_array.count());
ASSERT_EQ(1, tenants_task_array.at(0).count());
ObAutoSplitTask &task1 = tenants_task_array.at(0).at(0);
ASSERT_OK(ids.push_back(task1.tablet_id_.id_));
tenants_task_array.reuse();
// we expect the second tablet_id popped from mgr to be 2,
// because the tenant_cache B is at index 1, and the last_access_index is 1, the tablet with id 3 has the hightest priority in that cache
ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array));
ASSERT_EQ(1, tenants_task_array.count());
ASSERT_EQ(1, tenants_task_array.at(0).count());
ObAutoSplitTask &task2 = tenants_task_array.at(0).at(0);
ASSERT_OK(ids.push_back(task2.tablet_id_.id_));
tenants_task_array.reuse();
// we expect the third tablet_id popped from mgr to be 2,
// because the tenant_cache B is at index 1, and the last_access_index is 0, the tablet with id 3 has the hightest priority in that cache
ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array));
ASSERT_EQ(1, tenants_task_array.count());
ASSERT_EQ(1, tenants_task_array.at(0).count());
ObAutoSplitTask &task3 = tenants_task_array.at(0).at(0);
ASSERT_OK(ids.push_back(task3.tablet_id_.id_));
tenants_task_array.reuse();
std::sort(ids.begin(), ids.end());
for (int64_t i = 0; i < ids.count(); ++i) {
ASSERT_EQ(i + 1, ids.at(i));
}
}
TEST_F(TestSplitTaskScheduler, test_reset)
{
ObArray<ObAutoSplitTask> task_array;
ObAutoSplitTask task_a(TEST_TENANT_A_ID, ls_id, 1/*tablet_id*/, 1/*auto_split_size*/, 2/*used_disk_size*/, 0);
ObAutoSplitTask task_b(TEST_TENANT_A_ID, ls_id, 2/*tablet_id*/, 1/*auto_split_size*/, 3/*used_disk_size*/, 0);
ASSERT_OK(task_array.push_back(task_a));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
ASSERT_OK(task_array.push_back(task_b));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
reset();
init();
task_array.push_back(task_a);
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
ObArray<ObArray<ObAutoSplitTask>> tenants_task_array;
ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array));
ASSERT_EQ(1, tenants_task_array.count());
ASSERT_EQ(1, tenants_task_array.at(0).count());
ObAutoSplitTask &task1 = tenants_task_array.at(0).at(0);
ASSERT_EQ(1, task1.tablet_id_.id_);
tenants_task_array.reuse();
}
TEST_F(TestSplitTaskScheduler, single_tenant_hard_push_and_pop)
{
ObArray<ObAutoSplitTask> task_array;
ObAutoSplitTask task_high_prio(TEST_TENANT_A_ID, ls_id, 1000/*tablet_id*/, 1/*auto_split_size*/, 1000/*used_disk_size*/, 0);
ASSERT_OK(task_array.push_back(task_high_prio));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
for (int64_t i = 2; i < ObAutoSplitTaskCache::CACHE_MAX_CAPACITY + 10; ++i ) {
ASSERT_OK(task_array.push_back(ObAutoSplitTask(TEST_TENANT_A_ID, ls_id, i/*tablet_id*/, 1/*auto_split_size*/, i/*used_disk_size*/, 0)));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
}
ObArray<ObArray<ObAutoSplitTask>> tenants_task_array;
ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array));
ASSERT_EQ(1, tenants_task_array.count());
ASSERT_EQ(1, tenants_task_array.at(0).count());
ObAutoSplitTask &high_prio_task = tenants_task_array.at(0).at(0);
ASSERT_EQ(1000, high_prio_task.tablet_id_.id_);
tenants_task_array.reuse();
int64_t expect_hightest_prio_tablet_id = ObAutoSplitTaskCache::CACHE_MAX_CAPACITY + 9;
int64_t total_tasks = ObAutoSplitTaskCache::CACHE_MAX_CAPACITY - 1;
while (total_tasks > 0) {
tenants_task_array.reuse();
ASSERT_OK(polling_mgr_.pop_tasks(5, tenants_task_array));
ASSERT_EQ(1, tenants_task_array.count());
int64_t task_arr_len = tenants_task_array.at(0).count();
ASSERT_TRUE(task_arr_len != 0);
total_tasks -= task_arr_len;
ASSERT_EQ(total_tasks, polling_mgr_.total_tasks_);
ASSERT_TRUE(tenants_task_array.at(0).count() >= 4);
LOG_INFO("tenants_task_array", K(tenants_task_array));
for (int64_t i = 0; i < tenants_task_array.at(0).count(); ++i) {
ObAutoSplitTask &task = tenants_task_array.at(0).at(i);
ASSERT_EQ(expect_hightest_prio_tablet_id--, task.tablet_id_.id_);
}
}
for (hash::ObHashMap<uint64_t, ObAutoSplitTaskCache*>::iterator iter = polling_mgr_.map_tenant_to_cache_.begin(); iter != polling_mgr_.map_tenant_to_cache_.end(); iter++) {
uint64_t tenant_id = iter->first;
ObAutoSplitTaskCache *&tenant_cache = iter->second;
ASSERT_TRUE(tenant_cache != nullptr);
ASSERT_EQ(0, tenant_cache->get_tasks_num());
}
}
TEST_F(TestSplitTaskScheduler, mutiple_tenants_hard_push_and_pop)
{
ObArray<ObAutoSplitTask> task_array;
ObArray<int64_t> nums;
for (int64_t i = 2; i < 102; ++i) {
ASSERT_OK(nums.push_back(i));
}
std::random_shuffle(nums.begin(), nums.end());
for (int64_t i = 0; i < nums.count(); ++i ) {
int64_t num = nums.at(i);
ASSERT_OK(task_array.push_back(ObAutoSplitTask(TEST_TENANT_A_ID, ls_id, num/*tablet_id*/, 1/*auto_split_size*/, num/*used_disk_size*/, 0)));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
ASSERT_OK(task_array.push_back(ObAutoSplitTask(TEST_TENANT_B_ID, ls_id, num/*tablet_id*/, 1/*auto_split_size*/, num/*used_disk_size*/, 0)));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
ASSERT_OK(task_array.push_back(ObAutoSplitTask(TEST_TENANT_C_ID, ls_id, num/*tablet_id*/, 1/*auto_split_size*/, num/*used_disk_size*/, 0)));
ASSERT_OK(polling_mgr_.push_tasks(task_array));
task_array.reuse();
}
ASSERT_EQ(3 * nums.count(), polling_mgr_.total_tasks_);
ASSERT_EQ(3, polling_mgr_.get_total_tenants());
ObArray<ObArray<ObAutoSplitTask>> tenants_task_array;
int64_t total_tasks = nums.count() * 3;
ObArray<int64_t> tenant_A_task_id;
ObArray<int64_t> tenant_B_task_id;
ObArray<int64_t> tenant_C_task_id;
while (total_tasks > 0) {
tenants_task_array.reuse();
ASSERT_OK(polling_mgr_.pop_tasks(5, tenants_task_array));
for (int64_t i = 0; i < tenants_task_array.count(); ++i) {
ObArray<ObAutoSplitTask> &task_array = tenants_task_array.at(i);
ASSERT_TRUE(task_array.count() > 0);
uint64_t tenant_id = task_array.at(0).tenant_id_;
for (int64_t j = 0; j < task_array.count(); ++j) {
total_tasks -= 1;
ASSERT_EQ(tenant_id, task_array.at(j).tenant_id_);
if (tenant_id == TEST_TENANT_A_ID) {
ASSERT_OK(tenant_A_task_id.push_back(task_array.at(j).tablet_id_.id_));
} else if (tenant_id == TEST_TENANT_B_ID) {
ASSERT_OK(tenant_B_task_id.push_back(task_array.at(j).tablet_id_.id_));
} else if (tenant_id == TEST_TENANT_C_ID) {
ASSERT_OK(tenant_C_task_id.push_back(task_array.at(j).tablet_id_.id_));
}
}
}
}
for (hash::ObHashMap<uint64_t, ObAutoSplitTaskCache*>::iterator iter = polling_mgr_.map_tenant_to_cache_.begin(); iter != polling_mgr_.map_tenant_to_cache_.end(); iter++) {
uint64_t tenant_id = iter->first;
ObAutoSplitTaskCache *&tenant_cache = iter->second;
ASSERT_TRUE(tenant_cache != nullptr);
ASSERT_EQ(0, tenant_cache->get_tasks_num());
}
ASSERT_EQ(100, tenant_A_task_id.count());
ASSERT_EQ(100, tenant_B_task_id.count());
ASSERT_EQ(100, tenant_C_task_id.count());
for (int64_t i = 100; i > 0 ; --i) {
ASSERT_EQ(i+1, tenant_A_task_id.at(100 - i));
ASSERT_EQ(i+1, tenant_B_task_id.at(100 - i));
ASSERT_EQ(i+1, tenant_C_task_id.at(100 - i));
}
}
TEST_F(TestSplitTaskScheduler, parallel_push_and_pop)
{
ObArray<ObArray<ObAutoSplitTask>> task_array_1;
ObArray<ObArray<ObAutoSplitTask>> task_array_2;
ObArray<int64_t> nums_1;
ObArray<int64_t> nums_2;
for (int64_t i = 2; i < 50000; ++i) {
if (i%2 == 0) {
ASSERT_OK(nums_1.push_back(i));
} else {
ASSERT_OK(nums_2.push_back(i));
}
}
std::random_shuffle(nums_1.begin(), nums_1.end());
ObArray<ObAutoSplitTask> tmp_array;
for (int64_t i = 0; i < nums_1.count(); ++i ) {
tmp_array.reuse();
int64_t num_1 = nums_1.at(i);
ASSERT_OK(tmp_array.push_back(ObAutoSplitTask(TEST_TENANT_A_ID, ls_id, num_1/*tablet_id*/, 1/*auto_split_size*/, num_1/*used_disk_size*/, 0)));
ASSERT_OK(task_array_1.push_back(tmp_array));
tmp_array.reuse();
int64_t num_2 = nums_2.at(i);
ASSERT_OK(tmp_array.push_back(ObAutoSplitTask(TEST_TENANT_A_ID, ls_id, num_2/*tablet_id*/, 1/*auto_split_size*/, num_2/*used_disk_size*/, 0)));
ASSERT_OK(task_array_2.push_back(tmp_array));
}
std::thread t1(&push, std::ref(task_array_1), std::ref(polling_mgr_));
std::thread t2(&push, std::ref(task_array_2), std::ref(polling_mgr_));
t1.join();
t2.join();
ObArray<ObAutoSplitTask> result_array_1;
ObArray<ObAutoSplitTask> result_array_2;
std::thread t3(&pop, std::ref(result_array_1), std::ref(polling_mgr_));
std::thread t4(&pop, std::ref(result_array_2), std::ref(polling_mgr_));
t3.join();
t4.join();
tmp_array.reuse();
ASSERT_OK(tmp_array.push_back(result_array_1));
ASSERT_OK(tmp_array.push_back(result_array_2));
ASSERT_EQ(100, tmp_array.count());
int64_t max = 49999;
ObArray<int64_t> id_array;
for (int64_t i = 0; i < tmp_array.count(); ++i) {
id_array.push_back(tmp_array.at(i).tablet_id_.id());
}
std::sort(id_array.begin(),id_array.end());
for (int64_t i = id_array.count()-1; i > -1; --i) {
ASSERT_EQ(max--, id_array.at(i));
}
}
}
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
system("rm -rf test_auto_split_polling_mgr.log");
OB_LOGGER.set_log_level("INFO");
OB_LOGGER.set_file_name("test_auto_split_polling_mgr.log", true);
return RUN_ALL_TESTS();
}