459 lines
22 KiB
C++
459 lines
22 KiB
C++
// owner: wangzhennan.wzn
|
|
// owner group: rs
|
|
|
|
/**
|
|
* Copyright (c) 2022 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
#include <gmock/gmock.h>
|
|
|
|
#define USING_LOG_PREFIX SHARE
|
|
#define protected public
|
|
#define private public
|
|
|
|
#include "env/ob_simple_cluster_test_base.h"
|
|
#include "rootserver/ob_tenant_transfer_service.h" // ObTenantTransferService
|
|
#include "share/transfer/ob_transfer_task_operator.h" // ObTransferTaskOperator
|
|
|
|
namespace oceanbase
|
|
{
|
|
using namespace unittest;
|
|
namespace rootserver
|
|
{
|
|
using namespace share::schema;
|
|
using namespace common;
|
|
using namespace share;
|
|
using namespace transaction::tablelock;
|
|
|
|
static uint64_t g_tenant_id;
|
|
static ObTransferPartList g_part_list;
|
|
static ObTransferPartList g_batch_part_list;
|
|
|
|
#define INNER_EXE_SQL(tenant_id, ...) \
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign(__VA_ARGS__)); \
|
|
ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(tenant_id, sql.ptr(), affected_rows));
|
|
|
|
#define EXE_SQL(...) \
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign(__VA_ARGS__)); \
|
|
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
|
|
|
#define GEN_PART_LIST(proxy, part_list, ...) \
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
|
ASSERT_EQ(OB_SUCCESS, read_sql(proxy, sql, part_list));
|
|
|
|
class TestTenantTransferService : public unittest::ObSimpleClusterTestBase
|
|
{
|
|
public:
|
|
TestTenantTransferService() : unittest::ObSimpleClusterTestBase("test_tenant_transfer_service") {}
|
|
int read_sql(ObMySQLProxy &sql_proxy, const ObSqlString &sql, ObTransferPartList &part_list);
|
|
int gen_mock_data(const ObTransferTaskID task_id, const ObTransferStatus &status, ObTransferTask &task);
|
|
void create_hidden_table();
|
|
};
|
|
|
|
int TestTenantTransferService::read_sql(
|
|
ObMySQLProxy &sql_proxy,
|
|
const ObSqlString &sql,
|
|
ObTransferPartList &part_list)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
|
|
if (OB_UNLIKELY(!is_valid_tenant_id(g_tenant_id))) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
} else if (OB_FAIL(sql_proxy.read(result, sql.ptr()))) {
|
|
} else if (OB_ISNULL(result.get_result())) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
} else {
|
|
sqlclient::ObMySQLResult &res = *result.get_result();
|
|
uint64_t table_id = OB_INVALID_ID;
|
|
uint64_t part_id = OB_INVALID_ID;
|
|
int64_t part_count = 0;
|
|
if (OB_SUCC(ret) && OB_SUCC(res.next())) {
|
|
EXTRACT_INT_FIELD_MYSQL(res, "object_id", table_id, uint64_t);
|
|
}
|
|
while(OB_SUCC(ret)) {
|
|
part_id = OB_INVALID_ID;
|
|
ObTransferPartInfo part_info;
|
|
if (OB_SUCC(res.next())) {
|
|
++part_count;
|
|
EXTRACT_INT_FIELD_MYSQL(res, "object_id", part_id, uint64_t);
|
|
if (OB_FAIL(part_info.init(table_id, part_id))) {
|
|
} else if (OB_FAIL(part_list.push_back(part_info))) {
|
|
}
|
|
}
|
|
}
|
|
if (OB_ITER_END == ret) {
|
|
if (0 == part_count && OB_INVALID_ID != table_id && OB_INVALID_ID == part_id) {
|
|
ObTransferPartInfo part_info(table_id, 0);
|
|
(void)part_list.push_back(part_info);
|
|
}
|
|
ret = OB_SUCCESS;
|
|
} else {
|
|
LOG_WARN("fail to generate data", K(sql));
|
|
}
|
|
LOG_INFO("finish read sql", K(sql), K(part_list), K(table_id), K(part_id));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int TestTenantTransferService::gen_mock_data(const ObTransferTaskID task_id, const ObTransferStatus &status, ObTransferTask &task)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObLSID src_ls(1001);
|
|
ObLSID dest_ls(1);
|
|
ObTableLockOwnerID owner_id;
|
|
owner_id.convert_from_value(999);
|
|
share::SCN start_scn;
|
|
share::SCN finish_scn;
|
|
start_scn.convert_for_inner_table_field(1666844202200632);
|
|
finish_scn.convert_for_inner_table_field(1666844202208490);
|
|
ObCurTraceId::TraceId trace_id;
|
|
trace_id.init(GCONF.self_addr_);
|
|
uint64_t data_version = 0;
|
|
ret = task.init(task_id, src_ls, dest_ls, ObString::make_string("500016:500014"), ObString("500030:500031"), ObString("500016:500015"),
|
|
ObString::make_string("1152921504606846983"), ObString::make_string("1152921504606846983:0"), start_scn, finish_scn, status, trace_id, OB_SUCCESS,
|
|
ObTransferTaskComment::EMPTY_COMMENT, ObBalanceTaskID(123), owner_id, data_version);
|
|
return ret;
|
|
}
|
|
|
|
TEST_F(TestTenantTransferService, prepare_valid_data)
|
|
{
|
|
g_tenant_id = OB_INVALID_TENANT_ID;
|
|
|
|
ASSERT_EQ(OB_SUCCESS, create_tenant());
|
|
ASSERT_EQ(OB_SUCCESS, get_tenant_id(g_tenant_id));
|
|
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
|
ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
|
|
|
// create table
|
|
ObSqlString sql;
|
|
int64_t affected_rows = 0;
|
|
EXE_SQL("create table ttt1(c1 int primary key, c2 blob) partition by hash(c1) partitions 2");
|
|
EXE_SQL("create index ttt1_idx on ttt1(c1) local");
|
|
EXE_SQL("create index ttt1_global ON ttt1(c1) global partition by hash(c1) partitions 2");
|
|
GEN_PART_LIST(sql_proxy, g_part_list, "select object_id from oceanbase.DBA_OBJECTS where OBJECT_NAME='ttt1'");
|
|
GEN_PART_LIST(sql_proxy, g_part_list, "select object_id from oceanbase.DBA_OBJECTS where OBJECT_NAME='ttt1_global'");
|
|
|
|
// create 101 partitions
|
|
EXE_SQL("create table ttt2(c1 int primary key) partition by hash(c1) partitions 101");
|
|
GEN_PART_LIST(sql_proxy, g_batch_part_list, "select object_id from oceanbase.DBA_OBJECTS where OBJECT_NAME='ttt2'");
|
|
}
|
|
|
|
TEST_F(TestTenantTransferService, test_service)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ObMySQLProxy &inner_sql_proxy = get_curr_observer().get_mysql_proxy();
|
|
ObSqlString sql;
|
|
int64_t affected_rows = 0;
|
|
// stuck storage transfer
|
|
INNER_EXE_SQL(OB_SYS_TENANT_ID, "alter system set debug_sync_timeout = '1000s'");
|
|
usleep(100000); // wait for debug_sync_timeout to take effect
|
|
sql.reset();
|
|
INNER_EXE_SQL(OB_SYS_TENANT_ID, "set ob_global_debug_sync = 'AFTER_TRANSFER_PROCESS_INIT_TASK_AND_BEFORE_NOTIFY_STORAGE wait_for signal execute 10000'");
|
|
|
|
ASSERT_EQ(4, g_part_list.count());
|
|
ASSERT_TRUE(is_valid_tenant_id(g_tenant_id));
|
|
share::ObTenantSwitchGuard tenant_guard;
|
|
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(g_tenant_id));
|
|
ObTenantTransferService *tenant_transfer = MTL(ObTenantTransferService*);
|
|
ASSERT_TRUE(OB_NOT_NULL(tenant_transfer));
|
|
|
|
// generate transfer task
|
|
ObTransferTaskID task_id;
|
|
ObTransferTask transfer_task;
|
|
ObMySQLTransaction trans;
|
|
ASSERT_EQ(OB_SUCCESS, trans.start(&inner_sql_proxy, g_tenant_id));
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->generate_transfer_task(trans, ObLSID(1001), ObLSID(1),
|
|
g_part_list, ObBalanceTaskID(123), transfer_task));
|
|
task_id = transfer_task.get_task_id();
|
|
if (trans.is_started()) {
|
|
int tmp_ret = OB_SUCCESS;
|
|
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
|
|
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
|
|
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
|
}
|
|
}
|
|
ObTransferTask task;
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/));
|
|
ASSERT_TRUE(task.get_status().is_init_status() || task.get_status().is_start_status());
|
|
ASSERT_TRUE(task.get_part_list().count() == g_part_list.count());
|
|
ARRAY_FOREACH(g_part_list, idx) {
|
|
ASSERT_TRUE(is_contain(task.get_part_list(), g_part_list.at(idx)));
|
|
}
|
|
ASSERT_TRUE(task.get_data_version() > 0);
|
|
LOG_INFO("generate transfer task", K(task));
|
|
|
|
// generate tablet_list
|
|
int64_t create_time = OB_INVALID_TIMESTAMP;
|
|
int64_t finish_time = OB_INVALID_TIMESTAMP;
|
|
sleep(10);
|
|
task.reset();
|
|
ObArenaAllocator allocator;
|
|
ObString tablet_list_str;
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/));
|
|
LOG_INFO("generate tablet list", K(task));
|
|
ASSERT_TRUE(task.is_valid());
|
|
const ObTransferTabletList &tablet_list = task.get_tablet_list();
|
|
ASSERT_TRUE(10 == tablet_list.count()); // 2 * (primary + local_index + lob_meta + lob_piece) + 2 * global_index
|
|
ASSERT_EQ(OB_SUCCESS, tablet_list.to_display_str(allocator, tablet_list_str));
|
|
LOG_INFO("tablet list string", K(tablet_list_str));
|
|
ASSERT_TRUE(0 == tablet_list_str.case_compare("200001:0,200002:0,1152921504606846977:0,1152921504606846978:0,"
|
|
"1152921504606846979:0,1152921504606846980:0,1152921504606846981:0,1152921504606846982:0,1152921504606846983:0,1152921504606846984:0"));
|
|
|
|
// try cancel transfer task
|
|
ObTransferTask init_task;
|
|
ObTransferTask aborted_task;
|
|
ObTransferTask tmp_task;
|
|
ObTransferTaskID init_task_id(222);
|
|
ObTransferTaskID aborted_task_id(333);
|
|
ASSERT_EQ(OB_SUCCESS, gen_mock_data(init_task_id, ObTransferStatus(ObTransferStatus::INIT), init_task));
|
|
ASSERT_EQ(OB_SUCCESS, gen_mock_data(aborted_task_id, ObTransferStatus(ObTransferStatus::ABORTED), aborted_task));
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::insert(inner_sql_proxy, g_tenant_id, init_task));
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::insert(inner_sql_proxy, g_tenant_id, aborted_task));
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->try_cancel_transfer_task(ObTransferTaskID(555)));
|
|
ASSERT_EQ(OB_OP_NOT_ALLOW, tenant_transfer->try_cancel_transfer_task(aborted_task_id));
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->try_cancel_transfer_task(init_task_id));
|
|
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, init_task_id, false, init_task, 0/*group_id*/));
|
|
|
|
// try clear transfer task
|
|
task.reset();
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/));
|
|
ASSERT_EQ(OB_SUCCESS, ret);
|
|
sql.reset();
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("update oceanbase.__all_transfer_task set status = 'COMPLETED' where task_id = %ld", task.get_task_id().id()));
|
|
ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows));
|
|
|
|
ObTransferPartList all_part_list;
|
|
ObTransferPartList finished_part_list;
|
|
ASSERT_EQ(OB_NEED_RETRY, tenant_transfer->try_clear_transfer_task(aborted_task_id, tmp_task, all_part_list, finished_part_list));
|
|
ASSERT_TRUE(all_part_list.empty() && finished_part_list.empty());
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->try_clear_transfer_task(task_id, tmp_task, all_part_list, finished_part_list));
|
|
ObString all_part_list_str;
|
|
ASSERT_TRUE(all_part_list.count() == g_part_list.count());
|
|
ARRAY_FOREACH(g_part_list, idx) {
|
|
ASSERT_TRUE(is_contain(all_part_list, g_part_list.at(idx)));
|
|
}
|
|
ASSERT_TRUE(finished_part_list.count() == all_part_list.count());
|
|
ARRAY_FOREACH(all_part_list, idx) {
|
|
ASSERT_TRUE(is_contain(finished_part_list, all_part_list.at(idx)));
|
|
}
|
|
ObString finished_part_list_str;
|
|
ASSERT_EQ(OB_SUCCESS, finished_part_list.to_display_str(allocator, finished_part_list_str));
|
|
LOG_WARN("finished_part_list", K(finished_part_list_str));
|
|
ASSERT_TRUE(0 == finished_part_list_str.case_compare("500002:500005,500002:500006,500016:500014,500016:500015"));
|
|
ASSERT_EQ(OB_ENTRY_NOT_EXIST, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, task_id, false, task, 0/*group_id*/));
|
|
create_time = OB_INVALID_TIMESTAMP;
|
|
finish_time = OB_INVALID_TIMESTAMP;
|
|
ObTransferTask history_task;
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get_history_task(inner_sql_proxy, g_tenant_id, task_id, history_task, create_time, finish_time));
|
|
ASSERT_TRUE(history_task.get_status().is_completed_status());
|
|
INNER_EXE_SQL(OB_SYS_TENANT_ID, "set ob_global_debug_sync = 'AFTER_TRANSFER_PROCESS_INIT_TASK_AND_BEFORE_NOTIFY_STORAGE clear'");
|
|
INNER_EXE_SQL(OB_SYS_TENANT_ID, "set ob_global_debug_sync = 'now signal signal'");
|
|
|
|
uint64_t data_version = 0;
|
|
ASSERT_EQ(OB_SUCCESS, ObShareUtil::fetch_current_data_version(inner_sql_proxy, g_tenant_id, data_version));
|
|
ASSERT_TRUE(data_version == history_task.get_data_version());
|
|
|
|
// test retry task with interval
|
|
sql.reset();
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("alter system set _transfer_task_retry_interval = '1h'"));
|
|
ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows));
|
|
sql.reset();
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("update oceanbase.__all_transfer_task_history set status = 'FAILED' where task_id = %ld", init_task.get_task_id().id()));
|
|
ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows));
|
|
ObTransferTask retry_task;
|
|
ObTransferTaskID retry_task_id(444);
|
|
ASSERT_EQ(OB_SUCCESS, gen_mock_data(retry_task_id, ObTransferStatus(ObTransferStatus::INIT), retry_task));
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::insert(inner_sql_proxy, g_tenant_id, retry_task));
|
|
ASSERT_EQ(OB_NEED_RETRY, tenant_transfer->process_init_task_(retry_task_id));
|
|
ObTransferTask retry_task_after_process;
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, retry_task_id, false, retry_task_after_process, 0/*group_id*/));
|
|
ASSERT_TRUE(WAIT_DUE_TO_LAST_FAILURE == retry_task_after_process.get_comment() && retry_task_after_process.get_status().is_init_status());
|
|
sleep(10); // 20s
|
|
ASSERT_EQ(OB_NEED_RETRY, tenant_transfer->process_init_task_(retry_task_id)); // _transfer_task_retry_interval effect
|
|
retry_task_after_process.reset();
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, retry_task_id, false, retry_task_after_process, 0/*group_id*/));
|
|
ASSERT_TRUE(WAIT_DUE_TO_LAST_FAILURE == retry_task_after_process.get_comment() && retry_task_after_process.get_status().is_init_status());
|
|
sql.reset();
|
|
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("alter system set _transfer_task_retry_interval = '0'"));
|
|
ASSERT_EQ(OB_SUCCESS, inner_sql_proxy.write(g_tenant_id, sql.ptr(), affected_rows));
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->process_init_task_(retry_task_id));
|
|
INNER_EXE_SQL(g_tenant_id, "update __all_transfer_task set status = 'FAILED' where task_id = 444");
|
|
ObTransferTask::TaskStatus task_stat;
|
|
ASSERT_EQ(OB_SUCCESS, task_stat.init(retry_task_id, ObTransferStatus(ObTransferStatus::FAILED)));
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->process_task_(task_stat));
|
|
|
|
// test wait tenant major compaction
|
|
#ifdef OB_BUILD_SHARED_STORAGE
|
|
GCTX.startup_mode_ = observer::ObServerMode::SHARED_STORAGE_MODE;
|
|
LOG_INFO("test wait major compaction begin", K(GCTX.is_shared_storage_mode()));
|
|
INNER_EXE_SQL(g_tenant_id, "update __all_freeze_info set frozen_scn = now()");
|
|
ObTransferTask conflict_compaction_task;
|
|
ObTransferTaskID conflict_task_id(4444);
|
|
ASSERT_EQ(OB_SUCCESS, gen_mock_data(conflict_task_id, ObTransferStatus(ObTransferStatus::INIT), conflict_compaction_task));
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::insert(inner_sql_proxy, g_tenant_id, conflict_compaction_task));
|
|
ASSERT_EQ(OB_NEED_WAIT, tenant_transfer->process_init_task_(conflict_task_id));
|
|
ObTransferTask conflict_task_after_process;
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, conflict_task_id, false, conflict_task_after_process, 0/*group_id*/));
|
|
ASSERT_TRUE(WAIT_FOR_MAJOR_COMPACTION == conflict_task_after_process.get_comment() && conflict_task_after_process.get_status().is_init_status());
|
|
INNER_EXE_SQL(g_tenant_id, "update __all_freeze_info set frozen_scn = 1");
|
|
LOG_INFO("test wait major compaction finished");
|
|
#endif
|
|
}
|
|
|
|
TEST_F(TestTenantTransferService, test_batch_part_list)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ASSERT_EQ(101, g_batch_part_list.count());
|
|
ASSERT_TRUE(is_valid_tenant_id(g_tenant_id));
|
|
share::ObTenantSwitchGuard tenant_guard;
|
|
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(g_tenant_id));
|
|
ObTenantTransferService *tenant_transfer = MTL(ObTenantTransferService*);
|
|
ObMySQLProxy &inner_sql_proxy = get_curr_observer().get_mysql_proxy();
|
|
ASSERT_TRUE(OB_NOT_NULL(tenant_transfer));
|
|
|
|
// generate transfer task in batch
|
|
ObTransferTaskID batch_task_id;
|
|
ObTransferTask transfer_task;
|
|
ObMySQLTransaction trans;
|
|
ASSERT_EQ(OB_SUCCESS, trans.start(&inner_sql_proxy, g_tenant_id));
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->generate_transfer_task(trans, ObLSID(1001), ObLSID(1001),
|
|
g_batch_part_list, ObBalanceTaskID(124), transfer_task));
|
|
batch_task_id = transfer_task.get_task_id();
|
|
if (trans.is_started()) {
|
|
int tmp_ret = OB_SUCCESS;
|
|
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
|
|
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
|
|
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
|
}
|
|
}
|
|
ObTransferTask task;
|
|
ASSERT_EQ(OB_SUCCESS, ObTransferTaskOperator::get(inner_sql_proxy, g_tenant_id, batch_task_id, false, task, 0/*group_id*/));
|
|
ASSERT_TRUE(ObTenantTransferService::PART_COUNT_IN_A_TRANSFER == task.get_part_list().count());
|
|
ARRAY_FOREACH(task.get_part_list(), idx) {
|
|
ASSERT_TRUE(is_contain(g_batch_part_list, task.get_part_list().at(idx)));
|
|
}
|
|
}
|
|
|
|
TEST_F(TestTenantTransferService, test_empty_list)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ASSERT_EQ(101, g_batch_part_list.count());
|
|
ASSERT_TRUE(is_valid_tenant_id(g_tenant_id));
|
|
share::ObTenantSwitchGuard tenant_guard;
|
|
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(g_tenant_id));
|
|
ObTenantTransferService *tenant_transfer = MTL(ObTenantTransferService*);
|
|
ObMySQLProxy &inner_sql_proxy = get_curr_observer().get_mysql_proxy();
|
|
ASSERT_TRUE(OB_NOT_NULL(tenant_transfer));
|
|
// errorsim
|
|
ObSqlString sql;
|
|
int64_t affected_rows = 0;
|
|
INNER_EXE_SQL(OB_SYS_TENANT_ID, "alter system set_tp tp_name = EN_TENANT_TRANSFER_ALL_LIST_EMPTY, error_code = 4016, frequency = 1");
|
|
// transfer
|
|
ObTransferTaskID task_id;
|
|
ObTransferTask transfer_task;
|
|
ObMySQLTransaction trans;
|
|
ASSERT_EQ(OB_SUCCESS, trans.start(&inner_sql_proxy, g_tenant_id));
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->generate_transfer_task(trans, ObLSID(1001), ObLSID(1),
|
|
g_batch_part_list, ObBalanceTaskID(124), transfer_task));
|
|
task_id = transfer_task.get_task_id();
|
|
if (trans.is_started()) {
|
|
int tmp_ret = OB_SUCCESS;
|
|
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
|
|
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
|
|
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
|
}
|
|
}
|
|
ASSERT_EQ(OB_ERR_UNEXPECTED, tenant_transfer->process_init_task_(task_id));
|
|
}
|
|
|
|
void TestTenantTransferService::create_hidden_table()
|
|
{
|
|
ObMySQLProxy &inner_sql_proxy = get_curr_observer().get_mysql_proxy();
|
|
ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
|
ObSqlString sql;
|
|
int64_t affected_rows = 0;
|
|
|
|
INNER_EXE_SQL(OB_SYS_TENANT_ID, "set ob_global_debug_sync = 'reset';");
|
|
INNER_EXE_SQL(OB_SYS_TENANT_ID, "alter system set debug_sync_timeout = '1000s';");
|
|
usleep(100000);
|
|
INNER_EXE_SQL(OB_SYS_TENANT_ID, "set ob_global_debug_sync = 'TABLE_REDEFINITION_REPLICA_BUILD wait_for signal execute 10000'");
|
|
usleep(100000);
|
|
EXE_SQL("create table t_hidden_1(c1 int); ");
|
|
EXE_SQL("alter table t_hidden_1 modify c1 char(10);");
|
|
LOG_INFO("finish create hidden table", K(sql), K(affected_rows));
|
|
}
|
|
|
|
TEST_F(TestTenantTransferService, test_offline_ddl_hidden_table)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ASSERT_TRUE(is_valid_tenant_id(g_tenant_id));
|
|
share::ObTenantSwitchGuard tenant_guard;
|
|
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(g_tenant_id));
|
|
ObTenantTransferService *tenant_transfer = MTL(ObTenantTransferService*);
|
|
ObMySQLProxy &inner_sql_proxy = get_curr_observer().get_mysql_proxy();
|
|
ASSERT_TRUE(OB_NOT_NULL(tenant_transfer));
|
|
// errorsim
|
|
ObSqlString sql;
|
|
int64_t affected_rows = 0;
|
|
std::thread create_hidden_table_thread([this]() { create_hidden_table(); });
|
|
create_hidden_table_thread.detach();
|
|
sleep(3);
|
|
sql.reset();
|
|
ObTransferPartList primary_table_part_list;
|
|
GEN_PART_LIST(inner_sql_proxy, primary_table_part_list, "select object_id from oceanbase.CDB_OBJECTS where CON_ID = %lu and OBJECT_NAME = 't_hidden_1'", g_tenant_id);
|
|
LOG_INFO("read hidden table primary table", K(primary_table_part_list));
|
|
ASSERT_TRUE(1 == primary_table_part_list.count());
|
|
ASSERT_TRUE(500119 == primary_table_part_list.at(0).table_id());
|
|
ObTransferPartList hidden_part_list;
|
|
GEN_PART_LIST(inner_sql_proxy, hidden_part_list, "select association_table_id as object_id from oceanbase.__all_virtual_table where tenant_id = %lu and table_name = 't_hidden_1'", g_tenant_id);
|
|
LOG_INFO("read hidden table", K(hidden_part_list));
|
|
ASSERT_TRUE(500120 == hidden_part_list.at(0).table_id());
|
|
|
|
ObMySQLTransaction trans;
|
|
ObTransferPartList offline_ddl_table_part_list;
|
|
ObTransferPartList not_exist_part_list;
|
|
ObTransferPartList lock_conflict_part_list;
|
|
ObDisplayTabletList table_lock_tablet_list;
|
|
ObArray<ObTabletID> tablet_ids;
|
|
ObTableLockOwnerID lock_owner_id;
|
|
ASSERT_EQ(OB_SUCCESS, common::append(offline_ddl_table_part_list, primary_table_part_list));
|
|
ASSERT_EQ(OB_SUCCESS, common::append(offline_ddl_table_part_list, hidden_part_list));
|
|
ASSERT_EQ(OB_SUCCESS, trans.start(&inner_sql_proxy, g_tenant_id));
|
|
ASSERT_EQ(OB_SUCCESS, tenant_transfer->lock_table_and_part_(trans, ObLSID(1001), offline_ddl_table_part_list,
|
|
not_exist_part_list, lock_conflict_part_list, table_lock_tablet_list, tablet_ids, lock_owner_id));
|
|
LOG_INFO("lock_table_and_part", K(offline_ddl_table_part_list), K(not_exist_part_list), K(lock_conflict_part_list), K(table_lock_tablet_list), K(tablet_ids), K(lock_owner_id));
|
|
ASSERT_TRUE(not_exist_part_list.empty() && table_lock_tablet_list.empty() && tablet_ids.empty());
|
|
ASSERT_TRUE(2 == lock_conflict_part_list.count());
|
|
ObArenaAllocator allocator;
|
|
ObString lock_conflict_part_list_str;
|
|
ASSERT_EQ(OB_SUCCESS, lock_conflict_part_list.to_display_str(allocator, lock_conflict_part_list_str));
|
|
LOG_INFO("lock conflict hidden table", K(lock_conflict_part_list_str));
|
|
ASSERT_TRUE(0 == lock_conflict_part_list_str.compare("500119:0,500120:0"));
|
|
|
|
if (trans.is_started()) {
|
|
int tmp_ret = OB_SUCCESS;
|
|
if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
|
|
LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
|
|
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace rootserver
|
|
} // namespace oceanbase
|
|
int main(int argc, char **argv)
|
|
{
|
|
oceanbase::unittest::init_log_and_gtest(argc, argv);
|
|
OB_LOGGER.set_log_level("INFO");
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|