Files
oceanbase/mittest/simple_server/rootservice/test_tenant_balance_operator.cpp

613 lines
29 KiB
C++

// owner: msy164651
// owner group: rs
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#include <gmock/gmock.h>
#include "env/ob_simple_cluster_test_base.h"
#define private public
#include "share/balance/ob_balance_task_helper_operator.h"
#include "rootserver/ob_ls_balance_helper.h"//ObLSBalanceTaskHelper
namespace oceanbase
{
using namespace unittest;
namespace share
{
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using namespace schema;
using namespace rootserver;
using namespace common;
class TestBalanceOperator : public unittest::ObSimpleClusterTestBase
{
public:
TestBalanceOperator() : unittest::ObSimpleClusterTestBase("test_balance_operator") {}
protected:
uint64_t tenant_id_;
};
TEST_F(TestBalanceOperator, BalanceJob)
{
int ret = OB_SUCCESS;
tenant_id_ = OB_SYS_TENANT_ID;
//ObBalanceJob INIT
ObBalanceJob job;
ObBalanceJobID job_id(1);
int64_t job_start_time = 1;
int64_t job_finish_time = 1;
ObBalanceJobType job_type;
ObBalanceJobStatus job_status;
int64_t primary_zone_num = 0;
int64_t unit_group_num = 0;
ObString comment;
ObString balance_strategy(share::LS_BALANCE_BY_EXPAND);
ASSERT_EQ(OB_INVALID_ARGUMENT, job.init(0, job_id, job_type, job_status, primary_zone_num, unit_group_num, comment, balance_strategy));
job_type = ObBalanceJobType(ObString("LS_BALANCE"));
ASSERT_EQ(OB_INVALID_ARGUMENT, job.init(tenant_id_, job_id, job_type, job_status, primary_zone_num, unit_group_num, comment, balance_strategy));
job_status = ObBalanceJobStatus(ObString("DOING"));
ASSERT_EQ(OB_INVALID_ARGUMENT, job.init(tenant_id_, job_id, job_type, job_status, primary_zone_num, unit_group_num, comment, balance_strategy));
primary_zone_num = 1;
unit_group_num = 1;
ASSERT_EQ(OB_SUCCESS, job.init(tenant_id_, job_id, job_type, job_status, primary_zone_num, unit_group_num, comment, balance_strategy));
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2("sys", "oceanbase"));
LOG_INFO("[MITTEST]balance_job", K(job));
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
//insert
ASSERT_EQ(OB_SUCCESS, ObBalanceJobTableOperator::insert_new_job(job, sql_proxy));
ObBalanceJob new_job;
//select
ASSERT_EQ(OB_SUCCESS, ObBalanceJobTableOperator::get_balance_job(OB_SYS_TENANT_ID, false, sql_proxy, new_job, job_start_time, job_finish_time));
LOG_INFO("[MITTEST]balance_job", K(new_job));
ASSERT_EQ(new_job.get_tenant_id(), job.get_tenant_id());
ASSERT_EQ(new_job.get_job_id(), job.get_job_id());
ASSERT_EQ(new_job.get_primary_zone_num(), job.get_primary_zone_num());
ASSERT_EQ(new_job.get_unit_group_num(), job.get_unit_group_num());
ASSERT_EQ(new_job.get_job_type(), job_type);
ASSERT_EQ(new_job.get_job_status(), job_status);
ASSERT_EQ(OB_OP_NOT_ALLOW, ObBalanceJobTableOperator::clean_job(OB_SYS_TENANT_ID, job_id, sql_proxy));
//update
ObBalanceJobStatus new_job_status(2);
ASSERT_EQ(OB_SUCCESS, ObBalanceJobTableOperator::update_job_status(OB_SYS_TENANT_ID, job_id, job_status, new_job_status, false, ObString(), sql_proxy));
ASSERT_EQ(OB_SUCCESS, ObBalanceJobTableOperator::get_balance_job(OB_SYS_TENANT_ID, false, sql_proxy, new_job, job_start_time, job_finish_time));
ASSERT_EQ(new_job.get_job_status(), new_job_status);
//clean
ASSERT_EQ(OB_SUCCESS, ObBalanceJobTableOperator::clean_job(OB_SYS_TENANT_ID, job_id, sql_proxy));
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObBalanceJobTableOperator::get_balance_job(OB_SYS_TENANT_ID, false, sql_proxy, new_job, job_start_time, job_finish_time));
ObSqlString sql;
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("select * from __all_balance_job_history"));
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
int64_t start_time = 0;
int64_t finish_time = 0;
ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr()));
sqlclient::ObMySQLResult *result = res.get_result();
ASSERT_NE(nullptr, result);
ASSERT_EQ(OB_SUCCESS, result->next());
ASSERT_EQ(OB_SUCCESS, result->get_time("create_time", start_time));
ASSERT_EQ(OB_SUCCESS, result->get_time("finish_time", finish_time));
LOG_INFO("[MITTEST]job status", K(start_time), K(finish_time));
}
}
TEST_F(TestBalanceOperator, BalanceTask)
{
int ret = OB_SUCCESS;
//ObBalanceTask INIT
ObBalanceTask task;
uint64_t tenant_id = OB_SYS_TENANT_ID;
ObBalanceJobID job_id;
ObBalanceTaskID task_id;
int64_t ls_group_id = 0;
int64_t start_time = 0;
int64_t finish_time = 0;
int64_t src_ls_id = 0;
int64_t dest_ls_id = 0;
//TODO
ObTransferTaskID transfer_task_id;
ObString task_type, task_status;
ObString part_list_str, finished_part_list_str;
ObString parent_list_str, child_list_str;
ObTransferPartList part_list, finished_part_list;
ObBalanceTaskIDList parent_list, child_list;
ObString comment;
ASSERT_EQ(OB_INVALID_ARGUMENT, task.init(
tenant_id, job_id, task_id,
ObBalanceTaskType(task_type),
ObBalanceTaskStatus(task_status), ls_group_id,
ObLSID(src_ls_id), ObLSID(dest_ls_id),
transfer_task_id, part_list, finished_part_list,
parent_list, child_list, comment));
job_id = 1;
task_id = 2;
ASSERT_EQ(OB_INVALID_ARGUMENT, task.init(
tenant_id, job_id, task_id,
ObBalanceTaskType(task_type),
ObBalanceTaskStatus(task_status), ls_group_id,
ObLSID(src_ls_id), ObLSID(dest_ls_id),
transfer_task_id, part_list, finished_part_list,
parent_list, child_list, comment));
start_time = 1;
src_ls_id = 1;
dest_ls_id = 1;
task_type = ObString("LS_SPLIT");
task_status = ObString("CREATE_LS");
ObTransferPartInfo part_info;
ObObjectID obj_id = 1;
ASSERT_EQ(OB_SUCCESS, part_info.init(obj_id, obj_id));
ObBalanceTaskID parent_task_id(1);
ASSERT_EQ(OB_SUCCESS, parent_list.push_back(parent_task_id));
ASSERT_EQ(OB_SUCCESS, part_list.push_back(part_info));
ASSERT_EQ(OB_SUCCESS, task.init(
tenant_id, job_id, task_id,
ObBalanceTaskType(task_type),
ObBalanceTaskStatus(task_status), ls_group_id,
ObLSID(src_ls_id), ObLSID(dest_ls_id),
transfer_task_id, part_list, finished_part_list,
parent_list, child_list, comment));
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
//insert
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::insert_new_task(task, sql_proxy));
//select
ObBalanceTask new_task;
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::get_balance_task(OB_SYS_TENANT_ID, task_id , false, sql_proxy, new_task, start_time, finish_time));
ASSERT_EQ(new_task.get_parent_task_list().at(0), ObBalanceTaskID(1));
LOG_INFO("[MITTEST]balance_task", K(new_task));
//update balance task
ObBalanceTaskStatus transfer_status = ObBalanceTaskStatus::BALANCE_TASK_STATUS_TRANSFER;
common::ObMySQLTransaction trans2;
ASSERT_EQ(OB_SUCCESS, trans2.start(&sql_proxy, tenant_id));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::update_task_status(new_task, transfer_status, trans2));
ASSERT_EQ(OB_SUCCESS, trans2.end(true));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::get_balance_task(OB_SYS_TENANT_ID, task_id , false, sql_proxy, new_task, start_time, finish_time));
LOG_INFO("[MITTEST]balance_task", K(new_task));
//start transfer task
transfer_task_id = 1;
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::start_transfer_task(OB_SYS_TENANT_ID, task_id, transfer_task_id, sql_proxy));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::get_balance_task(OB_SYS_TENANT_ID, task_id , false, sql_proxy, new_task, start_time, finish_time));
LOG_INFO("[MITTEST]balance_task", K(new_task));
//finish transfer task
finished_part_list.reset();
ObTransferPartList to_do_part_list;
bool all_part_transferred = false;
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::finish_transfer_task(new_task, transfer_task_id, finished_part_list, sql_proxy, to_do_part_list, all_part_transferred));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::get_balance_task(OB_SYS_TENANT_ID, task_id , false, sql_proxy, new_task, start_time, finish_time));
LOG_INFO("[MITTEST]balance_task", K(new_task));
//remove parent task
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::remove_parent_task(OB_SYS_TENANT_ID, task_id, parent_task_id, sql_proxy));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::get_balance_task(OB_SYS_TENANT_ID, task_id , false, sql_proxy, new_task, start_time, finish_time));
ObBalanceTaskArray task_array;
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::load_can_execute_task(OB_SYS_TENANT_ID, task_array, sql_proxy));
LOG_INFO("[MITTEST]balance_task", K(new_task), K(task_array));
//clean task
//update task status into completed before clean task
ObBalanceTaskStatus completed_status = ObBalanceTaskStatus::BALANCE_TASK_STATUS_COMPLETED;
common::ObMySQLTransaction trans1;
ASSERT_EQ(OB_SUCCESS, trans1.start(&sql_proxy, tenant_id));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::update_task_status(new_task, completed_status, trans1));
ASSERT_EQ(OB_SUCCESS, trans1.end(true));
ObMySQLTransaction trans;
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::clean_task(OB_SYS_TENANT_ID, task_id, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObBalanceTaskTableOperator::get_balance_task(OB_SYS_TENANT_ID, task_id , false, sql_proxy, new_task, start_time, finish_time));
ObSqlString sql;
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("select * from __all_balance_task_history"));
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
int64_t start_time = 0;
int64_t finish_time = 0;
ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr()));
sqlclient::ObMySQLResult *result = res.get_result();
ASSERT_NE(nullptr, result);
ASSERT_EQ(OB_SUCCESS, result->next());
ASSERT_EQ(OB_SUCCESS, result->get_time("create_time", start_time));
ASSERT_EQ(OB_SUCCESS, result->get_time("finish_time", finish_time));
LOG_INFO("[MITTEST]balance_task", K(start_time), K(finish_time));
}
}
TEST_F(TestBalanceOperator, balance_execute)
{
int ret = OB_SUCCESS;
ASSERT_EQ(OB_SUCCESS, create_tenant());
common::ObMySQLProxy *sql_proxy = get_curr_observer().get_gctx().sql_proxy_;
int64_t primary_zone_num = 1;
uint64_t tenant_id = 1002;
ObSimpleUnitGroup unit_group1(1001, share::ObUnit::UNIT_STATUS_ACTIVE);
ObSimpleUnitGroup unit_group2(1002, share::ObUnit::UNIT_STATUS_ACTIVE);
ObSimpleUnitGroup unit_group3(1003, share::ObUnit::UNIT_STATUS_ACTIVE);
ObSimpleUnitGroup unit_group4(1004, share::ObUnit::UNIT_STATUS_ACTIVE);
ObArray<share::ObSimpleUnitGroup> unit_group;
ASSERT_EQ(OB_SUCCESS, unit_group.push_back(unit_group1));
ObLSID ls_id(1001);
ObLSFlag ls_flag;
ObLSStatusInfo info;
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ls_id, 1, share::OB_LS_NORMAL, 1001, "z1", ls_flag));
ObLSStatusInfoArray ls_array;
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
MTL_SWITCH(tenant_id) {
{
ObLSBalanceTaskHelper ls_balance;
//case 1. init
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id, ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest1", K(ls_balance.unit_group_balance_array_));
//case 2. no need balance
bool need_balance = false;
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(false, need_balance);
ASSERT_EQ(OB_ERR_UNEXPECTED, ls_balance.generate_ls_balance_task());
}
//case 3 1->2;
primary_zone_num = 2;
MTL_SWITCH(tenant_id) {
ObLSBalanceTaskHelper ls_balance;
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id,ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest3", K(ls_balance.unit_group_balance_array_));
bool need_balance = false;
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(true, need_balance);
ASSERT_EQ(OB_SUCCESS, ls_balance.generate_ls_balance_task());
ASSERT_EQ(primary_zone_num, ls_balance.job_.primary_zone_num_);
ASSERT_EQ(ObBalanceJobType(ObBalanceJobType::BALANCE_JOB_LS), ls_balance.job_.job_type_);
ASSERT_EQ(ObBalanceJobStatus(ObBalanceJobStatus::BALANCE_JOB_STATUS_DOING), ls_balance.job_.job_status_);
ASSERT_EQ(1, ls_balance.task_array_.count());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_SPLIT), ls_balance.task_array_[0].get_task_type());
}
//case 4 1-3
primary_zone_num = 3;
MTL_SWITCH(tenant_id) {
ObLSBalanceTaskHelper ls_balance;
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id,ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest4", K(ls_balance.unit_group_balance_array_));
bool need_balance = false;
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(true, need_balance);
ASSERT_EQ(OB_SUCCESS, ls_balance.generate_ls_balance_task());
ASSERT_EQ(primary_zone_num, ls_balance.job_.primary_zone_num_);
ASSERT_EQ(ObBalanceJobType(ObBalanceJobType::BALANCE_JOB_LS), ls_balance.job_.job_type_);
ASSERT_EQ(ObBalanceJobStatus(ObBalanceJobStatus::BALANCE_JOB_STATUS_DOING), ls_balance.job_.job_status_);
ASSERT_EQ(2, ls_balance.task_array_.count());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_SPLIT), ls_balance.task_array_[0].get_task_type());
}
//case5 2-3
MTL_SWITCH(tenant_id) {
ObLSID ls_id2(1002);
ObLSStatusInfo info2;
ObLSBalanceTaskHelper ls_balance;
ASSERT_EQ(OB_SUCCESS, info2.init(tenant_id, ls_id2, 1, share::OB_LS_NORMAL, 1001, "z2", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info2));
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id,ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest5", K(ls_balance.unit_group_balance_array_));
bool need_balance = false;
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(true, need_balance);
ASSERT_EQ(OB_SUCCESS, ls_balance.generate_ls_balance_task());
ASSERT_EQ(primary_zone_num, ls_balance.job_.primary_zone_num_);
ASSERT_EQ(ObBalanceJobType(ObBalanceJobType::BALANCE_JOB_LS), ls_balance.job_.job_type_);
ASSERT_EQ(ObBalanceJobStatus(ObBalanceJobStatus::BALANCE_JOB_STATUS_DOING), ls_balance.job_.job_status_);
ASSERT_EQ(3, ls_balance.task_array_.count());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_SPLIT), ls_balance.task_array_[0].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_MERGE), ls_balance.task_array_[2].get_task_type());
//ASSERT_EQ(2, ls_balance.task_array_[2].parent_list_.count());
}
//case 6 ls group not in unit group
MTL_SWITCH(tenant_id) {
ObLSID ls_id3(1003);
ObLSStatusInfo info3;
ObLSBalanceTaskHelper ls_balance;
ASSERT_EQ(OB_SUCCESS, info3.init(tenant_id, ls_id3, 1, share::OB_LS_NORMAL, 1002, "z2", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info3));
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id,ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest6", K(ls_balance.unit_group_balance_array_));
bool need_balance = false;
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(true, need_balance);
ASSERT_EQ(OB_SUCCESS, ls_balance.generate_ls_balance_task());
LOG_INFO("testtest6", K(ls_balance.get_balance_job()), K(ls_balance.get_balance_tasks()));
ASSERT_EQ(1, ls_balance.get_balance_tasks().count());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_ALTER), ls_balance.task_array_[0].get_task_type());
}
}
//case7 8->6
MTL_SWITCH(tenant_id) {
ASSERT_EQ(OB_SUCCESS, unit_group.push_back(unit_group2));
ls_array.reset();
ObLSBalanceTaskHelper ls_balance;
for (int64_t i = 1001; i < 1009; ++i) {
ObLSID ls_id(i);
ObLSStatusInfo info;
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ls_id, 1, share::OB_LS_NORMAL, 1001, "z2", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
i++;
ObLSID ls_id1(i);
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ls_id1, 2, share::OB_LS_NORMAL, 1002, "z2", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
}
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id,ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest7", K(ls_balance.unit_group_balance_array_));
bool need_balance = false;
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(true, need_balance);
ASSERT_EQ(OB_SUCCESS, ls_balance.generate_ls_balance_task());
LOG_INFO("testtest7", K(ls_balance.get_balance_job()), K(ls_balance.get_balance_tasks()));
ASSERT_EQ(6, ls_balance.get_balance_tasks().count());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_TRANSFER), ls_balance.task_array_[0].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_TRANSFER), ls_balance.task_array_[1].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_MERGE), ls_balance.task_array_[2].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_TRANSFER), ls_balance.task_array_[3].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_TRANSFER), ls_balance.task_array_[4].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_MERGE), ls_balance.task_array_[5].get_task_type());
}
//case 8 6->9
MTL_SWITCH(tenant_id) {
ASSERT_EQ(OB_SUCCESS, unit_group.push_back(unit_group3));
ls_array.reset();
ObLSBalanceTaskHelper ls_balance;
for (int64_t i = 1001; i < 1006; ++i) {
ObLSID ls_id(i);
ObLSStatusInfo info;
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ls_id, 1, share::OB_LS_NORMAL, 1001, "z2", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
i++;
ObLSID ls_id1(i);
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ls_id1, 2, share::OB_LS_NORMAL, 1002, "z2", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
}
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id,ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest8", K(ls_balance.unit_group_balance_array_));
bool need_balance = false;
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(true, need_balance);
ASSERT_EQ(OB_SUCCESS, ls_balance.generate_ls_balance_task());
LOG_INFO("testtest8", K(ls_balance.get_balance_job()), K(ls_balance.get_balance_tasks()));
ASSERT_EQ(15, ls_balance.get_balance_tasks().count());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_SPLIT), ls_balance.task_array_[0].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_SPLIT), ls_balance.task_array_[1].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_ALTER), ls_balance.task_array_[2].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_ALTER), ls_balance.task_array_[3].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_MERGE), ls_balance.task_array_[4].get_task_type());
}
//case 9 7->4
MTL_SWITCH(tenant_id) {
ASSERT_EQ(OB_SUCCESS, unit_group.push_back(unit_group4));
ObLSBalanceTaskHelper ls_balance;
ls_array.reset();
for (int64_t i = 1001; i < 1007; ++i) {
ObLSID ls_id(i);
ObLSStatusInfo info;
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ls_id, 1, share::OB_LS_NORMAL, 1001, "z1", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
i++;
ObLSID ls_id1(i);
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ls_id1, 2, share::OB_LS_NORMAL, 1002, "z1", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
}
primary_zone_num = 1;
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id,ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest9", K(ls_balance.unit_group_balance_array_));
bool need_balance = false;
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(true, need_balance);
ASSERT_EQ(OB_SUCCESS, ls_balance.generate_ls_balance_task());
LOG_INFO("testtest9", K(ls_balance.get_balance_job()), K(ls_balance.get_balance_tasks()));
ASSERT_EQ(2, ls_balance.get_balance_tasks().count());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_ALTER), ls_balance.task_array_[0].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_ALTER), ls_balance.task_array_[1].get_task_type());
ls_array.reset();
for (int64_t i = 1001; i < 1006; ++i) {
ObLSStatusInfo info;
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ObLSID(i), i%3, share::OB_LS_NORMAL, 1001, "z1", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
i++;
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ObLSID(i), i%3, share::OB_LS_NORMAL, 1002, "z1", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
i++;
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ObLSID(i), i%3, share::OB_LS_NORMAL, 1003, "z1", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
}
ASSERT_EQ(OB_SUCCESS, info.init(tenant_id, ObLSID(1007), 3, share::OB_LS_NORMAL, 1004, "z1", ls_flag));
ASSERT_EQ(OB_SUCCESS, ls_array.push_back(info));
primary_zone_num = 1;
{
ObLSBalanceTaskHelper ls_balance;
ASSERT_EQ(OB_SUCCESS, ls_balance.init(tenant_id,ls_array, unit_group, primary_zone_num, sql_proxy));
LOG_INFO("testtest9", K(ls_balance.unit_group_balance_array_));
ASSERT_EQ(OB_SUCCESS, ls_balance.check_need_ls_balance(need_balance));
ASSERT_EQ(true, need_balance);
ASSERT_EQ(OB_SUCCESS, ls_balance.generate_ls_balance_task());
LOG_INFO("testtest9", K(ls_balance.get_balance_job()), K(ls_balance.get_balance_tasks()));
ASSERT_EQ(9, ls_balance.get_balance_tasks().count());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_TRANSFER), ls_balance.task_array_[0].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_ALTER), ls_balance.task_array_[1].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_MERGE), ls_balance.task_array_[2].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_TRANSFER), ls_balance.task_array_[3].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_ALTER), ls_balance.task_array_[4].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_MERGE), ls_balance.task_array_[5].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_TRANSFER), ls_balance.task_array_[6].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_ALTER), ls_balance.task_array_[7].get_task_type());
ASSERT_EQ(ObBalanceTaskType(ObBalanceTaskType::BALANCE_TASK_MERGE), ls_balance.task_array_[8].get_task_type());
}
}
}
//验证merge任务在transfer结束后再次设置part_list然后结束
TEST_F(TestBalanceOperator, merge_task)
{
ObBalanceTask task;
uint64_t tenant_id = OB_SYS_TENANT_ID;
ObBalanceJobID job_id(1);
ObBalanceTaskID task_id(1);
int64_t ls_group_id = OB_INVALID_ID;
int64_t start_time = 0;
int64_t finish_time = 0;
int64_t src_ls_id = 1;
int64_t dest_ls_id = 2;
//TODO
ObTransferTaskID transfer_task_id;
ObString task_type, task_status;
ObString part_list_str, finished_part_list_str;
ObString parent_list_str, child_list_str;
ObTransferPartList part_list, finished_part_list;
ObBalanceTaskIDList parent_list, child_list;
task_type = ObString("LS_MERGE");
task_status = ObString("TRANSFER");
ObString comment;
//防止后台线程结束这个任务
ASSERT_EQ(OB_SUCCESS, parent_list.push_back(task_id));
ASSERT_EQ(OB_SUCCESS, task.init(
tenant_id, job_id, task_id,
ObBalanceTaskType(task_type),
ObBalanceTaskStatus(task_status), ls_group_id,
ObLSID(src_ls_id), ObLSID(dest_ls_id),
transfer_task_id, part_list, finished_part_list,
parent_list, child_list, comment));
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::insert_new_task(task, sql_proxy));
//设置part_list
ObTransferPartInfo part_info(50001, 50001);
ASSERT_EQ(OB_SUCCESS, part_list.push_back(part_info));
transfer_task_id = ObTransferTaskID(1);
ASSERT_EQ(OB_SUCCESS, task.init(
tenant_id, job_id, task_id,
ObBalanceTaskType(task_type),
ObBalanceTaskStatus(task_status), ls_group_id,
ObLSID(src_ls_id), ObLSID(dest_ls_id),
transfer_task_id, part_list, finished_part_list,
parent_list, child_list, comment));
LOG_INFO("testtest7: start set part list");
common::ObMySQLTransaction trans;
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::update_task_part_list(tenant_id, task_id, part_list, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::start_transfer_task(tenant_id, task_id, transfer_task_id, sql_proxy));
ObTransferPartList to_do_part_list;
bool all_part_transferred = false;
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::finish_transfer_task(task, transfer_task_id, part_list, sql_proxy, to_do_part_list, all_part_transferred));
ASSERT_EQ(0, to_do_part_list.count());
ASSERT_EQ(true, all_part_transferred);
LOG_INFO("testtest8: start set part list");
common::ObMySQLTransaction trans1;
ASSERT_EQ(OB_SUCCESS, trans1.start(&sql_proxy, tenant_id));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::update_task_part_list(tenant_id, task_id, part_list, trans1));
ASSERT_EQ(OB_SUCCESS, trans1.end(true));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::start_transfer_task(tenant_id, task_id, transfer_task_id, sql_proxy));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::finish_transfer_task(task, transfer_task_id, part_list, sql_proxy, to_do_part_list, all_part_transferred));
ASSERT_EQ(0, to_do_part_list.count());
ASSERT_EQ(true, all_part_transferred);
LOG_INFO("testtest9: start set part list");
common::ObMySQLTransaction trans2;
ASSERT_EQ(OB_SUCCESS, trans2.start(&sql_proxy, tenant_id));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::update_task_part_list(tenant_id, task_id, part_list, trans2));
ASSERT_EQ(OB_SUCCESS, trans2.end(true));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::start_transfer_task(tenant_id, task_id, transfer_task_id, sql_proxy));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskTableOperator::finish_transfer_task(task, transfer_task_id, part_list, sql_proxy, to_do_part_list, all_part_transferred));
ASSERT_EQ(0, to_do_part_list.count());
ASSERT_EQ(true, all_part_transferred);
}
TEST_F(TestBalanceOperator, ls_balance_helper)
{
ObBalanceTaskHelper task;
uint64_t tenant_id = OB_SYS_TENANT_ID;
ObBalanceTaskHelperOp invalid_op;
int64_t ls_group_id = OB_INVALID_ID;
ObLSID src_ls_id(1);
ObLSID dest_ls_id(2);
SCN invalid_scn;
ASSERT_EQ(OB_INVALID_ARGUMENT, task.init(
tenant_id, invalid_scn, invalid_op,
src_ls_id, dest_ls_id, ls_group_id));
ObBalanceTaskHelperOp op(1);
ASSERT_EQ(OB_INVALID_ARGUMENT, task.init(
tenant_id, invalid_scn, op,
src_ls_id, dest_ls_id, ls_group_id));
uint64_t ts = 10000000000000;
SCN scn;
ASSERT_EQ(OB_SUCCESS, scn.convert_from_ts(ts));
ASSERT_EQ(OB_INVALID_ARGUMENT, task.init(
tenant_id, scn, op,
src_ls_id, dest_ls_id, ls_group_id));
ls_group_id = 0;
ASSERT_EQ(OB_SUCCESS, task.init(
tenant_id, scn, op,
src_ls_id, dest_ls_id, ls_group_id));
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy));
ObArray<ObBalanceTaskHelper> new_task;
SCN op_scn;
ASSERT_EQ(OB_SUCCESS, op_scn.convert_from_ts(ts));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
tenant_id, sql_proxy, op_scn, new_task));
ASSERT_EQ(1, new_task.count());
ASSERT_EQ(new_task.at(0).get_operation_scn().convert_to_ts(), ts);
ASSERT_EQ(ls_group_id, new_task.at(0).get_ls_group_id());
ts += 10000000;
ObBalanceTaskHelperOp new_op(0);
ls_group_id = 1001;
ASSERT_EQ(OB_SUCCESS, scn.convert_from_ts(ts));
ASSERT_EQ(OB_SUCCESS, task.init(
tenant_id, scn, new_op,
src_ls_id, dest_ls_id, ls_group_id));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy));
ASSERT_EQ(OB_SUCCESS, op_scn.convert_from_ts(ts));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
tenant_id, sql_proxy, op_scn, new_task));
ASSERT_EQ(2, new_task.count());
ts = 10000000000000;
ASSERT_EQ(new_task.at(0).get_operation_scn().convert_to_ts(), ts);
ASSERT_EQ(new_task.at(0).get_ls_group_id(), 0);
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::remove_task(tenant_id,
new_task.at(0).get_operation_scn(), sql_proxy));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
tenant_id, sql_proxy, op_scn, new_task));
ASSERT_EQ(1, new_task.count());
ASSERT_EQ(ls_group_id, new_task.at(0).get_ls_group_id());
}
} // namespace share
} // namespace oceanbase
int main(int argc, char **argv)
{
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level("INFO");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}