/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX STORAGE #define ASSERT_OK(x) ASSERT_EQ(OB_SUCCESS, (x)) #include #define private public #define protected public #include "share/scheduler/ob_partition_auto_split_helper.h" #undef private namespace oceanbase { using namespace common; using namespace lib; using namespace share; using namespace storage; using namespace blocksstable; using namespace share::schema; using namespace compaction; static const uint64_t TEST_TENANT_A_ID = 1002; static const uint64_t TEST_TENANT_B_ID = 1003; static const uint64_t TEST_TENANT_C_ID = 1004; static const ObLSID ls_id{1}; class TestSplitTaskScheduler : public ::testing::Test { public: TestSplitTaskScheduler() : rs_scheduler_(ObRsAutoSplitScheduler::get_instance()), polling_mgr_(rs_scheduler_.polling_mgr_), sys_tenant_(OB_SYS_TENANT_ID) { ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(TEST_TENANT_A_ID); ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(TEST_TENANT_B_ID); ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(TEST_TENANT_C_ID); init(); } virtual void SetUp(); void reset(); int init() { return rs_scheduler_.init(); }; public: ObRsAutoSplitScheduler &rs_scheduler_; ObAutoSplitTaskPollingMgr &polling_mgr_; ObTenantBase sys_tenant_; }; void pop(ObArray &task_array, ObAutoSplitTaskPollingMgr &polling_mgr) { ObArray> tenant_task_array; int64_t pop_nums = 1; while (ATOMIC_LOAD(&polling_mgr.total_tasks_) > 0) { tenant_task_array.reuse(); ASSERT_OK(polling_mgr.pop_tasks(pop_nums, tenant_task_array)); if (tenant_task_array.count() == 0) { break; } for (int64_t i = 0; i < tenant_task_array.count(); ++i) { task_array.push_back(tenant_task_array.at(i)); } pop_nums%=10; pop_nums+=1; } } void push(const ObArray> &tenants_task_array, ObAutoSplitTaskPollingMgr &polling_mgr) { for (int64_t i = 0; i < tenants_task_array.count(); ++i) { int ret = polling_mgr.push_tasks(tenants_task_array.at(i)); ASSERT_OK(ret); } } void TestSplitTaskScheduler::reset() { rs_scheduler_.reset(); } void TestSplitTaskScheduler::SetUp() { ObTenantEnv::set_tenant(&sys_tenant_); } TEST_F(TestSplitTaskScheduler, simple_push_and_pop) { ObArray task_array; ObArray ids; ObAutoSplitTask task_a(TEST_TENANT_A_ID, ls_id, 1/*tablet_id*/, 1/*auto_split_size*/, 2/*used_disk_size*/, 0); ObAutoSplitTask task_b(TEST_TENANT_A_ID, ls_id, 2/*tablet_id*/, 1/*auto_split_size*/, 3/*used_disk_size*/, 0); ObAutoSplitTask task_c(TEST_TENANT_B_ID, ls_id, 3/*tablet_id*/, 1/*auto_split_size*/, 4/*used_disk_size*/, 0); ASSERT_OK(task_array.push_back(task_a)); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); ASSERT_OK(task_array.push_back(task_c)); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); ASSERT_OK(task_array.push_back(task_b)); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); // we expect the first tablet_id popped from mgr to be 2, // because the tenant_cache A is at index 0 and the last_access_index is 0, and the tablet with id 2 has the hightest priority in that cache ObArray> tenants_task_array; ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array)); ASSERT_EQ(1, tenants_task_array.count()); ASSERT_EQ(1, tenants_task_array.at(0).count()); ObAutoSplitTask &task1 = tenants_task_array.at(0).at(0); ASSERT_OK(ids.push_back(task1.tablet_id_.id_)); tenants_task_array.reuse(); // we expect the second tablet_id popped from mgr to be 2, // because the tenant_cache B is at index 1, and the last_access_index is 1, the tablet with id 3 has the hightest priority in that cache ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array)); ASSERT_EQ(1, tenants_task_array.count()); ASSERT_EQ(1, tenants_task_array.at(0).count()); ObAutoSplitTask &task2 = tenants_task_array.at(0).at(0); ASSERT_OK(ids.push_back(task2.tablet_id_.id_)); tenants_task_array.reuse(); // we expect the third tablet_id popped from mgr to be 2, // because the tenant_cache B is at index 1, and the last_access_index is 0, the tablet with id 3 has the hightest priority in that cache ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array)); ASSERT_EQ(1, tenants_task_array.count()); ASSERT_EQ(1, tenants_task_array.at(0).count()); ObAutoSplitTask &task3 = tenants_task_array.at(0).at(0); ASSERT_OK(ids.push_back(task3.tablet_id_.id_)); tenants_task_array.reuse(); std::sort(ids.begin(), ids.end()); for (int64_t i = 0; i < ids.count(); ++i) { ASSERT_EQ(i + 1, ids.at(i)); } } TEST_F(TestSplitTaskScheduler, test_reset) { ObArray task_array; ObAutoSplitTask task_a(TEST_TENANT_A_ID, ls_id, 1/*tablet_id*/, 1/*auto_split_size*/, 2/*used_disk_size*/, 0); ObAutoSplitTask task_b(TEST_TENANT_A_ID, ls_id, 2/*tablet_id*/, 1/*auto_split_size*/, 3/*used_disk_size*/, 0); ASSERT_OK(task_array.push_back(task_a)); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); ASSERT_OK(task_array.push_back(task_b)); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); reset(); init(); task_array.push_back(task_a); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); ObArray> tenants_task_array; ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array)); ASSERT_EQ(1, tenants_task_array.count()); ASSERT_EQ(1, tenants_task_array.at(0).count()); ObAutoSplitTask &task1 = tenants_task_array.at(0).at(0); ASSERT_EQ(1, task1.tablet_id_.id_); tenants_task_array.reuse(); } TEST_F(TestSplitTaskScheduler, single_tenant_hard_push_and_pop) { ObArray task_array; ObAutoSplitTask task_high_prio(TEST_TENANT_A_ID, ls_id, 1000/*tablet_id*/, 1/*auto_split_size*/, 1000/*used_disk_size*/, 0); ASSERT_OK(task_array.push_back(task_high_prio)); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); for (int64_t i = 2; i < ObAutoSplitTaskCache::CACHE_MAX_CAPACITY + 10; ++i ) { ASSERT_OK(task_array.push_back(ObAutoSplitTask(TEST_TENANT_A_ID, ls_id, i/*tablet_id*/, 1/*auto_split_size*/, i/*used_disk_size*/, 0))); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); } ObArray> tenants_task_array; ASSERT_OK(polling_mgr_.pop_tasks(1, tenants_task_array)); ASSERT_EQ(1, tenants_task_array.count()); ASSERT_EQ(1, tenants_task_array.at(0).count()); ObAutoSplitTask &high_prio_task = tenants_task_array.at(0).at(0); ASSERT_EQ(1000, high_prio_task.tablet_id_.id_); tenants_task_array.reuse(); int64_t expect_hightest_prio_tablet_id = ObAutoSplitTaskCache::CACHE_MAX_CAPACITY + 9; int64_t total_tasks = ObAutoSplitTaskCache::CACHE_MAX_CAPACITY - 1; while (total_tasks > 0) { tenants_task_array.reuse(); ASSERT_OK(polling_mgr_.pop_tasks(5, tenants_task_array)); ASSERT_EQ(1, tenants_task_array.count()); int64_t task_arr_len = tenants_task_array.at(0).count(); ASSERT_TRUE(task_arr_len != 0); total_tasks -= task_arr_len; ASSERT_EQ(total_tasks, polling_mgr_.total_tasks_); ASSERT_TRUE(tenants_task_array.at(0).count() >= 4); LOG_INFO("tenants_task_array", K(tenants_task_array)); for (int64_t i = 0; i < tenants_task_array.at(0).count(); ++i) { ObAutoSplitTask &task = tenants_task_array.at(0).at(i); ASSERT_EQ(expect_hightest_prio_tablet_id--, task.tablet_id_.id_); } } for (hash::ObHashMap::iterator iter = polling_mgr_.map_tenant_to_cache_.begin(); iter != polling_mgr_.map_tenant_to_cache_.end(); iter++) { uint64_t tenant_id = iter->first; ObAutoSplitTaskCache *&tenant_cache = iter->second; ASSERT_TRUE(tenant_cache != nullptr); ASSERT_EQ(0, tenant_cache->get_tasks_num()); } } TEST_F(TestSplitTaskScheduler, mutiple_tenants_hard_push_and_pop) { ObArray task_array; ObArray nums; for (int64_t i = 2; i < 102; ++i) { ASSERT_OK(nums.push_back(i)); } std::random_shuffle(nums.begin(), nums.end()); for (int64_t i = 0; i < nums.count(); ++i ) { int64_t num = nums.at(i); ASSERT_OK(task_array.push_back(ObAutoSplitTask(TEST_TENANT_A_ID, ls_id, num/*tablet_id*/, 1/*auto_split_size*/, num/*used_disk_size*/, 0))); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); ASSERT_OK(task_array.push_back(ObAutoSplitTask(TEST_TENANT_B_ID, ls_id, num/*tablet_id*/, 1/*auto_split_size*/, num/*used_disk_size*/, 0))); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); ASSERT_OK(task_array.push_back(ObAutoSplitTask(TEST_TENANT_C_ID, ls_id, num/*tablet_id*/, 1/*auto_split_size*/, num/*used_disk_size*/, 0))); ASSERT_OK(polling_mgr_.push_tasks(task_array)); task_array.reuse(); } ASSERT_EQ(3 * nums.count(), polling_mgr_.total_tasks_); ASSERT_EQ(3, polling_mgr_.get_total_tenants()); ObArray> tenants_task_array; int64_t total_tasks = nums.count() * 3; ObArray tenant_A_task_id; ObArray tenant_B_task_id; ObArray tenant_C_task_id; while (total_tasks > 0) { tenants_task_array.reuse(); ASSERT_OK(polling_mgr_.pop_tasks(5, tenants_task_array)); for (int64_t i = 0; i < tenants_task_array.count(); ++i) { ObArray &task_array = tenants_task_array.at(i); ASSERT_TRUE(task_array.count() > 0); uint64_t tenant_id = task_array.at(0).tenant_id_; for (int64_t j = 0; j < task_array.count(); ++j) { total_tasks -= 1; ASSERT_EQ(tenant_id, task_array.at(j).tenant_id_); if (tenant_id == TEST_TENANT_A_ID) { ASSERT_OK(tenant_A_task_id.push_back(task_array.at(j).tablet_id_.id_)); } else if (tenant_id == TEST_TENANT_B_ID) { ASSERT_OK(tenant_B_task_id.push_back(task_array.at(j).tablet_id_.id_)); } else if (tenant_id == TEST_TENANT_C_ID) { ASSERT_OK(tenant_C_task_id.push_back(task_array.at(j).tablet_id_.id_)); } } } } for (hash::ObHashMap::iterator iter = polling_mgr_.map_tenant_to_cache_.begin(); iter != polling_mgr_.map_tenant_to_cache_.end(); iter++) { uint64_t tenant_id = iter->first; ObAutoSplitTaskCache *&tenant_cache = iter->second; ASSERT_TRUE(tenant_cache != nullptr); ASSERT_EQ(0, tenant_cache->get_tasks_num()); } ASSERT_EQ(100, tenant_A_task_id.count()); ASSERT_EQ(100, tenant_B_task_id.count()); ASSERT_EQ(100, tenant_C_task_id.count()); for (int64_t i = 100; i > 0 ; --i) { ASSERT_EQ(i+1, tenant_A_task_id.at(100 - i)); ASSERT_EQ(i+1, tenant_B_task_id.at(100 - i)); ASSERT_EQ(i+1, tenant_C_task_id.at(100 - i)); } } TEST_F(TestSplitTaskScheduler, parallel_push_and_pop) { ObArray> task_array_1; ObArray> task_array_2; ObArray nums_1; ObArray nums_2; for (int64_t i = 2; i < 50000; ++i) { if (i%2 == 0) { ASSERT_OK(nums_1.push_back(i)); } else { ASSERT_OK(nums_2.push_back(i)); } } std::random_shuffle(nums_1.begin(), nums_1.end()); ObArray tmp_array; for (int64_t i = 0; i < nums_1.count(); ++i ) { tmp_array.reuse(); int64_t num_1 = nums_1.at(i); ASSERT_OK(tmp_array.push_back(ObAutoSplitTask(TEST_TENANT_A_ID, ls_id, num_1/*tablet_id*/, 1/*auto_split_size*/, num_1/*used_disk_size*/, 0))); ASSERT_OK(task_array_1.push_back(tmp_array)); tmp_array.reuse(); int64_t num_2 = nums_2.at(i); ASSERT_OK(tmp_array.push_back(ObAutoSplitTask(TEST_TENANT_A_ID, ls_id, num_2/*tablet_id*/, 1/*auto_split_size*/, num_2/*used_disk_size*/, 0))); ASSERT_OK(task_array_2.push_back(tmp_array)); } std::thread t1(&push, std::ref(task_array_1), std::ref(polling_mgr_)); std::thread t2(&push, std::ref(task_array_2), std::ref(polling_mgr_)); t1.join(); t2.join(); ObArray result_array_1; ObArray result_array_2; std::thread t3(&pop, std::ref(result_array_1), std::ref(polling_mgr_)); std::thread t4(&pop, std::ref(result_array_2), std::ref(polling_mgr_)); t3.join(); t4.join(); tmp_array.reuse(); ASSERT_OK(tmp_array.push_back(result_array_1)); ASSERT_OK(tmp_array.push_back(result_array_2)); ASSERT_EQ(100, tmp_array.count()); int64_t max = 49999; ObArray id_array; for (int64_t i = 0; i < tmp_array.count(); ++i) { id_array.push_back(tmp_array.at(i).tablet_id_.id()); } std::sort(id_array.begin(),id_array.end()); for (int64_t i = id_array.count()-1; i > -1; --i) { ASSERT_EQ(max--, id_array.at(i)); } } } int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); system("rm -rf test_auto_split_polling_mgr.log"); OB_LOGGER.set_log_level("INFO"); OB_LOGGER.set_file_name("test_auto_split_polling_mgr.log", true); return RUN_ALL_TESTS(); }