Files
oceanbase/mittest/simple_server/rootservice/test_transfer_partition_task.cpp

235 lines
11 KiB
C++

// owner: msy164651
// owner group: rs
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#include <gmock/gmock.h>
#include "env/ob_simple_cluster_test_base.h"
#define private public
#include "share/balance/ob_transfer_partition_task_table_operator.h"
namespace oceanbase
{
using namespace unittest;
namespace share
{
using ::testing::_;
using ::testing::Invoke;
using ::testing::Return;
using namespace schema;
using namespace rootserver;
using namespace common;
class TestTransferPartition : public unittest::ObSimpleClusterTestBase
{
public:
TestTransferPartition() : unittest::ObSimpleClusterTestBase("test_transfer_partition_task") {}
protected:
uint64_t tenant_id_;
};
TEST_F(TestTransferPartition, TransferPartitionTask)
{
int ret = OB_SUCCESS;
// ASSERT_EQ(OB_SUCCESS, create_tenant());
//ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id_));
tenant_id_ = 1;
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_observer().get_mysql_proxy();
ObTransferPartitionTask task;
ObBalanceJobID job_id(1);
ObBalanceJobID invalid_job;
ObTransferPartInfo part_info(1, 1);
ObTransferPartInfo invalid_part;
ObLSID invalid_ls;
ObLSID ls(1);
ObTransferPartitionTaskID invalid_task_id;
ObTransferPartitionTaskID task_id(1);
ObTransferPartitionTaskStatus status(ObTransferPartitionTaskStatus::TRP_TASK_STATUS_WAITING);
ObTransferPartitionTaskStatus invalid_status;
ObString comment;
// case 1: 验证task的init
//simple_init不校验task_id,但是init校验
ASSERT_EQ(OB_INVALID_ARGUMENT, task.simple_init(tenant_id_, invalid_part, ls, task_id));
ASSERT_EQ(OB_INVALID_ARGUMENT, task.simple_init(tenant_id_, part_info, invalid_ls, task_id));
ASSERT_EQ(OB_SUCCESS, task.simple_init(tenant_id_, part_info, ls, invalid_task_id));
ASSERT_EQ(OB_SUCCESS, task.simple_init(tenant_id_, part_info, ls, task_id));
ASSERT_EQ(OB_INVALID_ARGUMENT, task.init(tenant_id_, part_info, ls, invalid_task_id,
job_id, ObTransferTaskID(1), status, comment));
ASSERT_EQ(OB_INVALID_ARGUMENT, task.init(tenant_id_, part_info, ls, task_id,
job_id, ObTransferTaskID(1), invalid_status, comment));
ASSERT_EQ(OB_SUCCESS, task.init(tenant_id_, part_info, ls, task_id, job_id,
ObTransferTaskID(1), status, comment));
// case 2: 验证task的插入
ObMySQLTransaction trans;
ObArray<ObTransferPartitionTask> task_array;
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ASSERT_EQ(OB_SUCCESS, task.simple_init(tenant_id_, part_info, ls, task_id));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::insert_new_task(tenant_id_, part_info, ls, trans));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::load_all_task(tenant_id_,
task_array, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ASSERT_EQ(1, task_array.count());
LOG_INFO("[MITTEST]new task", K(task_array));
ASSERT_EQ(status, task_array.at(0).get_task_status());
// case 3: 验证不同的事务,在第一个事务没有提交之前,另外一个事务不能写入
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ObTransferPartitionTask new_task;
ObTransferPartInfo part_info2(1, 2);
ASSERT_EQ(OB_SUCCESS, new_task.simple_init(tenant_id_, part_info2, ls, task_id));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::insert_new_task(tenant_id_, part_info2, ls, trans));
ObMySQLTransaction trans1;
ASSERT_EQ(OB_SUCCESS, trans1.start(&sql_proxy, tenant_id_));
ObTransferPartInfo part_info3(1, 3);
ASSERT_EQ(OB_SUCCESS, new_task.simple_init(tenant_id_, part_info3, ls, task_id));
ASSERT_EQ(OB_TRANS_TIMEOUT, ObTransferPartitionTaskTableOperator::insert_new_task(tenant_id_, part_info3, ls, trans1));
ASSERT_EQ(OB_SUCCESS, trans.end(false));
ASSERT_EQ(OB_SUCCESS, trans1.end(false));
ASSERT_EQ(OB_SUCCESS, trans1.start(&sql_proxy, tenant_id_));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::insert_new_task(tenant_id_, part_info3, ls, trans1));
ASSERT_EQ(OB_SUCCESS, new_task.simple_init(tenant_id_, part_info2, ls, task_id));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::insert_new_task(tenant_id_, part_info2, ls, trans1));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::load_all_task(tenant_id_,
task_array, trans1));
ASSERT_EQ(3, task_array.count());
LOG_INFO("[MITTEST]new task", K(task_array));
ASSERT_EQ(OB_SUCCESS, trans1.end(true));
// case 4: 验证table_id,object_id不能重复
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ASSERT_EQ(OB_ERR_PRIMARY_KEY_DUPLICATE, ObTransferPartitionTaskTableOperator::insert_new_task(tenant_id_, part_info, ls, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(false));
// case 5: 验证使用第一个task_id开始执行
ObTransferPartitionTaskID task_id2 = task_array.at(1).get_task_id();
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ASSERT_EQ(OB_ERR_UNEXPECTED, ObTransferPartitionTaskTableOperator::set_all_tasks_schedule(
tenant_id_, task_id2, job_id, 1, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(false));
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::set_all_tasks_schedule(
tenant_id_, task_id2, job_id, 2, trans));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::load_all_task(tenant_id_,
task_array, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ASSERT_EQ(3, task_array.count());
LOG_INFO("[MITTEST]new task", K(task_array));
ASSERT_EQ(1, task_array.at(0).get_balance_job_id().id());
ASSERT_EQ(1, task_array.at(1).get_balance_job_id().id());
ASSERT_EQ(-1, task_array.at(2).get_balance_job_id().id());
//case 6: 开始transfer,三个分区放进去会不匹配
ObTransferPartList part_list;
ASSERT_EQ(OB_SUCCESS, part_list.push_back(part_info));
ASSERT_EQ(OB_SUCCESS, part_list.push_back(part_info2));
ASSERT_EQ(OB_SUCCESS, part_list.push_back(part_info3));
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
//ASSERT_EQ(OB_ERR_UNEXPECTED, ObTransferPartitionTaskTableOperator::start_transfer_task(tenant_id_, part_list, ObTransferTaskID(100), trans));
ASSERT_EQ(OB_SUCCESS, part_list.remove(1));
ASSERT_EQ(OB_INVALID_ARGUMENT, ObTransferPartitionTaskTableOperator::start_transfer_task(tenant_id_,
invalid_job, part_list, ObTransferTaskID(100), trans));
ASSERT_EQ(OB_SUCCESS, trans.end(false));
ObBalanceJobID tmp_job_id(999);
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::start_transfer_task(tenant_id_,
tmp_job_id, part_list, ObTransferTaskID(100), trans));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::load_all_task(tenant_id_,
task_array, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ASSERT_EQ(3, task_array.count());
LOG_INFO("[MITTEST]new task", K(task_array));
ASSERT_EQ(-1, task_array.at(0).get_transfer_task_id().id());
ASSERT_EQ(-1, task_array.at(1).get_transfer_task_id().id());
ASSERT_EQ(-1, task_array.at(2).get_transfer_task_id().id());
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::start_transfer_task(tenant_id_,
job_id, part_list, ObTransferTaskID(100), trans));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::load_all_task(tenant_id_,
task_array, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ASSERT_EQ(3, task_array.count());
LOG_INFO("[MITTEST]new task", K(task_array));
ASSERT_EQ(100, task_array.at(0).get_transfer_task_id().id());
ASSERT_EQ(100, task_array.at(1).get_transfer_task_id().id());
ASSERT_EQ(-1, task_array.at(2).get_transfer_task_id().id());
//case 7: 结束transfer
ASSERT_EQ(OB_SUCCESS, part_list.remove(0));
ASSERT_EQ(OB_SUCCESS, part_list.push_back(part_info2));
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ObTransferPartitionTaskID max_task_id(1000);
ASSERT_EQ(OB_INVALID_ARGUMENT, ObTransferPartitionTaskTableOperator::finish_task(tenant_id_, part_list, max_task_id, status, "", trans));
ObTransferPartitionTaskStatus status1(ObTransferPartitionTaskStatus::TRP_TASK_STATUS_FAILED);
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::finish_task(tenant_id_, part_list,max_task_id, status1,"12343453", trans));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::load_all_task(tenant_id_,
task_array, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ASSERT_EQ(1, task_array.count());
LOG_INFO("[MITTEST]new task", K(task_array));
ObSqlString sql;
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("select *, time_to_usec(create_time) as create_1, time_to_usec(finish_time) as finish from __all_transfer_partition_task_history"));
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
int64_t start_time = 0;
int64_t finish_time = 0;
ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, tenant_id_, sql.ptr()));
sqlclient::ObMySQLResult *result = res.get_result();
ASSERT_NE(nullptr, result);
ASSERT_EQ(OB_SUCCESS, result->next());
ObTransferPartitionTask new_task3;
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::fill_cell_(tenant_id_, result, new_task3));
ASSERT_EQ(OB_SUCCESS, result->get_int("create_1", start_time));
ASSERT_EQ(OB_SUCCESS, result->get_int("finish", finish_time));
LOG_INFO("[MITTEST]balance_task", K(start_time), K(finish_time), K(new_task3));
ASSERT_EQ(OB_SUCCESS, result->next());
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::fill_cell_(tenant_id_, result, new_task3));
ASSERT_EQ(OB_SUCCESS, result->get_int("create_1", start_time));
ASSERT_EQ(OB_SUCCESS, result->get_int("finish", finish_time));
LOG_INFO("[MITTEST]balance_task", K(start_time), K(finish_time), K(new_task3));
}
//case 8: 验证rollback
ASSERT_EQ(OB_SUCCESS, trans.start(&sql_proxy, tenant_id_));
ASSERT_EQ(OB_INVALID_ARGUMENT, ObTransferPartitionTaskTableOperator::rollback_all_to_waitting(
tenant_id_, ObBalanceJobID(), trans));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::rollback_all_to_waitting(
tenant_id_, job_id, trans));
ASSERT_EQ(OB_SUCCESS, ObTransferPartitionTaskTableOperator::load_all_task(tenant_id_,
task_array, trans));
ASSERT_EQ(OB_SUCCESS, trans.end(true));
ASSERT_EQ(1, task_array.count());
LOG_INFO("[MITTEST]new task", K(task_array));
ASSERT_EQ(-1, task_array.at(0).get_balance_job_id().id());
ASSERT_EQ(-1, task_array.at(0).get_transfer_task_id().id());
ASSERT_EQ(status, task_array.at(0).get_task_status());
}
} // namespace share
} // namespace oceanbase
int main(int argc, char **argv)
{
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level("INFO");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}