Files
oceanbase/mittest/simple_server/test_transfer_tx.cpp

1603 lines
56 KiB
C++

// owner: handora.qc
// owner group: transaction
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define USING_LOG_PREFIX SERVER
#define protected public
#define private public
#include "env/ob_simple_cluster_test_base.h"
#include "rootserver/ob_tenant_balance_service.h"
#include "storage/tx/ob_trans_part_ctx.h"
#include "share/balance/ob_balance_job_table_operator.h"
#include "rootserver/ob_balance_group_ls_stat_operator.h"
#include "logservice/ob_log_service.h"
#include "mittest/env/ob_simple_server_helper.h"
namespace oceanbase
{
namespace unittest
{
using namespace oceanbase::transaction;
using namespace oceanbase::storage;
#define EQ(x, y) GTEST_ASSERT_EQ(x, y);
#define NEQ(x, y) GTEST_ASSERT_NE(x, y);
#define LE(x, y) GTEST_ASSERT_LE(x, y);
#define GE(x, y) GTEST_ASSERT_GE(x, y);
class TestRunCtx
{
public:
uint64_t tenant_id_ = 0;
int64_t time_sec_ = 0;
int64_t start_time_ = ObTimeUtil::current_time();
bool stop_ = false;
bool stop_balance_ = false;
std::thread worker_;
};
TestRunCtx R;
class ObTransferTx : public ObSimpleClusterTestBase
{
public:
// 指定case运行目录前缀 test_ob_simple_cluster_
ObTransferTx() : ObSimpleClusterTestBase("test_transfer_tx_", "50G", "50G") {}
int do_balance(uint64_t tenant_id);
private:
int do_balance_inner_(uint64_t tenant_id);
int do_transfer_start_abort(uint64_t tenant_id, ObLSID dest_ls_id, ObLSID src_ls_id, ObTransferTabletInfo tablet_info);
int wait_balance_clean(uint64_t tenant_id);
};
int ObTransferTx::do_balance_inner_(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
static std::mutex mutex;
mutex.lock();
MTL_SWITCH(tenant_id) {
LOG_INFO("worker to do partition_balance");
auto b_svr = MTL(rootserver::ObTenantBalanceService*);
b_svr->reset();
b_svr->stop();
int64_t job_cnt = 0;
int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP;
ObBalanceJob job;
if (OB_FAIL(b_svr->gather_stat_())) {
LOG_WARN("failed to gather stat", KR(ret));
} else if (OB_FAIL(b_svr->gather_ls_status_stat(tenant_id, b_svr->ls_array_))) {
LOG_WARN("failed to gather stat", KR(ret));
} else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) {
if (OB_ENTRY_NOT_EXIST == ret) {
//NO JOB, need check current ls status
ret = OB_SUCCESS;
job_cnt = 0;
} else {
LOG_WARN("failed to get balance job", KR(ret), K(tenant_id));
}
} else if (OB_FAIL(b_svr->try_finish_current_job_(job, job_cnt))) {
LOG_WARN("failed to finish current job", KR(ret), K(job));
}
if (OB_SUCC(ret) && job_cnt == 0 && !R.stop_balance_ && OB_FAIL(b_svr->partition_balance_(true))) {
LOG_WARN("failed to do partition balance", KR(ret));
}
}
mutex.unlock();
return ret;
}
int ObTransferTx::wait_balance_clean(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
int64_t begin_time = ObTimeUtil::current_time();
while (OB_SUCC(ret)) {
bool is_clean = false;
MTL_SWITCH(tenant_id) {
ObBalanceJob job;
int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP;
if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
is_clean = true;
}
} else {
ob_usleep(200 * 1000);
}
}
if (is_clean) {
int64_t transfer_task_count = 0;
if (OB_FAIL(SSH::g_select_int64(tenant_id, "select count(*) as val from __all_transfer_task", transfer_task_count))) {
} else if (transfer_task_count == 0) {
break;
} else {
ob_usleep(200 * 1000);
}
}
}
return ret;
}
int ObTransferTx::do_balance(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_FAIL(do_balance_inner_(tenant_id))) {
} else if (OB_FAIL(do_balance_inner_(tenant_id))) {
}
return ret;
}
int ObTransferTx::do_transfer_start_abort(uint64_t tenant_id, ObLSID dest_ls_id, ObLSID src_ls_id, ObTransferTabletInfo tablet_info)
{
int ret = OB_SUCCESS;
MTL_SWITCH(tenant_id) {
ObLSHandle ls_handle;
ObTransferHandler *transfer_handler = NULL;
ObTransferTaskInfo task_info;
ObMySQLTransaction trans;
ObTimeoutCtx timeout_ctx;
const int64_t stmt_timeout = 10_s;
const int32_t group_id = 0;
const share::SCN dest_max_desided_scn(share::SCN::min_scn());
if (OB_FAIL(MTL(ObLSService*)->get_ls(src_ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
} else if (FALSE_IT(transfer_handler = ls_handle.get_ls()->get_transfer_handler())) {
} else if (FALSE_IT(task_info.tenant_id_ = tenant_id)) {
} else if (FALSE_IT(task_info.src_ls_id_ = src_ls_id)) {
} else if (FALSE_IT(task_info.dest_ls_id_ = dest_ls_id)) {
} else if (FALSE_IT(task_info.task_id_.id_ = 10000)) {
} else if (FALSE_IT(task_info.trace_id_.set(*ObCurTraceId::get_trace_id()))) {
} else if (FALSE_IT(task_info.status_ = ObTransferStatus::START)) {
} else if (FALSE_IT(task_info.table_lock_owner_id_.id_ = 10000)) {
} else if (OB_FAIL(task_info.tablet_list_.push_back(tablet_info))) {
} else if (OB_FAIL(transfer_handler->start_trans_(stmt_timeout, group_id, timeout_ctx, trans))) {
LOG_WARN("failed to start trans", K(ret), K(task_info));
} else if (OB_FAIL(transfer_handler->precheck_ls_replay_scn_(task_info))) {
LOG_WARN("failed to precheck ls replay scn", K(ret), K(task_info));
} else if (OB_FAIL(transfer_handler->check_start_status_transfer_tablets_(task_info, timeout_ctx))) {
LOG_WARN("failed to check start status transfer tablets", K(ret), K(task_info));
} else if (OB_FAIL(transfer_handler->update_all_tablet_to_ls_(task_info, trans))) {
LOG_WARN("failed to update all tablet to ls", K(ret), K(task_info));
} else if (OB_FAIL(transfer_handler->lock_tablet_on_dest_ls_for_table_lock_(task_info, trans))) {
LOG_WARN("failed to lock tablet on dest ls for table lock", KR(ret), K(task_info));
} else if (OB_FAIL(transfer_handler->do_trans_transfer_start_prepare_(task_info, timeout_ctx, trans))) {
LOG_WARN("failed to do trans transfer start prepare", K(ret), K(task_info));
} else if (OB_FAIL(transfer_handler->do_trans_transfer_start_v2_(task_info, dest_max_desided_scn, timeout_ctx, trans))) {
LOG_WARN("failed to do trans transfer start", K(ret), K(task_info));
}
if (OB_FAIL(trans.end(false))) {
}
}
return ret;
}
TEST_F(ObTransferTx, prepare)
{
LOGI("observer start");
LOGI("create tenant begin");
// 创建普通租户tt1
EQ(OB_SUCCESS, create_tenant("tt1", "40G", "40G", false, 10));
// 获取租户tt1的tenant_id
EQ(OB_SUCCESS, get_tenant_id(R.tenant_id_));
ASSERT_NE(0, R.tenant_id_);
// 初始化普通租户tt1的sql proxy
EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
int tenant_id = R.tenant_id_;
LOGI("create tenant finish");
R.worker_ = std::thread([this, tenant_id] () {
int ret = OB_SUCCESS;
lib::set_thread_name_inner("MY_BALANCE");
MTL_SWITCH(R.tenant_id_) {
MTL(rootserver::ObTenantBalanceService*)->stop();
}
while (!R.stop_) {
do_balance(tenant_id);
::sleep(1);
}
});
// 在单节点ObServer下创建新的日志流, 注意避免被RS任务GC掉
EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_));
int64_t ls_count = 0;
EQ(0, SSH::g_select_int64(R.tenant_id_, "select count(ls_id) as val from __all_ls where ls_id!=1", ls_count));
EQ(2, ls_count);
int64_t affected_rows;
EQ(0, GCTX.sql_proxy_->write("alter system set _enable_parallel_table_creation = false tenant=all", affected_rows));
}
#define TRANSFER_CASE_PREPARE \
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); \
int64_t affected_rows = 0; \
EQ(0, wait_balance_clean(R.tenant_id_)); \
EQ(0, sql_proxy.write("drop database if exists test", affected_rows)); \
EQ(0, sql_proxy.write("create database if not exists test", affected_rows)); \
EQ(0, sql_proxy.write("use test", affected_rows)); \
EQ(0, sql_proxy.write("drop tablegroup if exists tg1", affected_rows)); \
EQ(0, do_balance(R.tenant_id_)); \
EQ(0, wait_balance_clean(R.tenant_id_)); \
rootserver::ObNewTableTabletAllocator::alloc_tablet_ls_offset_ = 0; \
EQ(OB_SUCCESS, sql_proxy.write("create table stu1(col int)", affected_rows)); \
EQ(OB_SUCCESS, sql_proxy.write("create table stu2(col int)", affected_rows)); \
ObLSID loc1,loc2; \
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1)); \
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2)); \
NEQ(loc1, loc2); \
EQ(0, sql_proxy.write("create tablegroup tg1 sharding='NONE';", affected_rows));
TEST_F(ObTransferTx, tx_exit)
{
TRANSFER_CASE_PREPARE;
ObLSID ls_id;
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", ls_id));
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
// timeout with no redo
// tx exit with no log
EQ(0, SSH::write(conn, "set session ob_trx_timeout=1000000"));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into stu1 values(200)"));
ObTransID tx_id;
EQ(0, SSH::find_tx(conn, tx_id));
LOGI("sp no redo ls_id:%ld, tx:%ld", ls_id.id(), tx_id.get_id());
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, ls_id, tx_id));
conn->commit();
// timeout with write redo
// tx exit with abort log
// ls1: redo --> abort
EQ(0, SSH::write(conn, "insert into stu1 values(200)"));
EQ(0, SSH::find_tx(conn, tx_id));
LOGI("sp with redo ls_id:%ld, tx:%ld", ls_id.id(), tx_id.get_id());
EQ(0, SSH::submit_redo(R.tenant_id_, ls_id));
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, ls_id, tx_id));
conn->commit();
// timeout with write redo
// tx exit with abort log
// ls1: redo --> abort
// ls2:
EQ(0, SSH::write(conn, "insert into stu1 values(200)"));
EQ(0, SSH::write(conn, "insert into stu2 values(200)"));
EQ(0, SSH::find_tx(conn, tx_id));
LOGI("dist trans with redo ls_id:%ld, tx:%ld", ls_id.id(), tx_id.get_id());
EQ(0, SSH::submit_redo(R.tenant_id_, ls_id));
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, ls_id, tx_id));
conn->commit();
// timeout with write prepare
// 协调者持续推进prepare阶段,参与者自身事务上下文不存在,给上游发abort消息
// ls1 redo --> prepare --> abort --> clear
// ls2:
EQ(0, SSH::write(conn, "insert into stu1 values(200)"));
EQ(0, SSH::write(conn, "insert into stu2 values(200)"));
EQ(0, SSH::find_tx(conn, tx_id));
LOGI("dist trans with prepare ls_id:%ld, tx:%ld", ls_id.id(), tx_id.get_id());
InjectTxFaultHelper inject_tx_fault_helper;
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc2, tx_id, ObTxLogType::TX_REDO_LOG));
std::thread th([conn] () {
conn->commit();
});
EQ(0, SSH::wait_tx(R.tenant_id_, loc1, tx_id, ObTxState::PREPARE));
EQ(0, SSH::abort_tx(R.tenant_id_, loc2, tx_id));
inject_tx_fault_helper.release();
th.join();
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, loc1, tx_id));
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, loc2, tx_id));
// timeout with write prepare
// ls1 redo --> prepare --> abort --> clear
// ls2: redo --> abort
EQ(0, SSH::write(conn, "insert into stu1 values(200)"));
EQ(0, SSH::write(conn, "insert into stu2 values(200)"));
EQ(0, SSH::find_tx(conn, tx_id));
LOGI("dist trans with prepare ls_id:%ld, tx:%ld", ls_id.id(), tx_id.get_id());
EQ(0, SSH::submit_redo(R.tenant_id_, loc2));
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc2, tx_id, ObTxLogType::TX_PREPARE_LOG));
std::thread th1([conn] () {
conn->commit();
});
EQ(0, SSH::wait_tx(R.tenant_id_, loc1, tx_id, ObTxState::PREPARE));
EQ(0, SSH::abort_tx(R.tenant_id_, loc2, tx_id));
inject_tx_fault_helper.release();
th1.join();
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, loc1, tx_id));
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, loc2, tx_id));
// timeout with write prepare
// 参与者持续给上游发prepare response,发现参与者不存在,通知下游abort
// ls1
// ls2 redo --> prepare --> abort --> clear
EQ(0, SSH::write(conn, "insert into stu1 values(200)"));
EQ(0, SSH::write(conn, "insert into stu2 values(200)"));
EQ(0, SSH::find_tx(conn, tx_id));
LOGI("dist trans with prare ls_id:%ld, tx:%ld", ls_id.id(), tx_id.get_id());
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc1, tx_id, ObTxLogType::TX_REDO_LOG));
std::thread th2([conn] () {
conn->commit();
});
EQ(0, SSH::wait_tx(R.tenant_id_, loc2, tx_id, ObTxState::PREPARE));
EQ(0, SSH::abort_tx(R.tenant_id_, loc1, tx_id));
inject_tx_fault_helper.release();
th2.join();
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, loc1, tx_id));
EQ(OB_TRANS_CTX_NOT_EXIST, SSH::wait_tx_exit(R.tenant_id_, loc2, tx_id));
}
/*
TEST_F(ObTransferTx, large_query)
{
TRANSFER_CASE_PREPARE;
get_curr_simple_server().get_sql_proxy().write("alter system set writing_throttling_trigger_percentage 100 tenant all", affected_rows);
//get_curr_simple_server().get_sql_proxy().write("alter system set syslog_level='DEBUG'", affected_rows);
// prepare data
EQ(0, sql_proxy.write("insert into stu1 values(100)", affected_rows));
for (int i = 0; i < 19; i++) {
EQ(0, sql_proxy.write("insert into stu1 select * from stu1", affected_rows));
}
bool stop = false;
std::thread th([&stop, loc1, loc2]() {
int ret = OB_SUCCESS;
while (OB_SUCC(ret) && !stop) {
ObMySQLTransaction trans;
trans.start(GCTX.sql_proxy_, R.tenant_id_);
observer::ObInnerSQLConnection *conn = static_cast<observer::ObInnerSQLConnection *>(trans.get_connection());
SCN end_scn;
SSH::get_ls_end_scn(R.tenant_id_, loc2, end_scn);
ObArenaAllocator allocator;
ObTabletID tablet_id;
SSH::select_table_tablet(R.tenant_id_, "stu2", tablet_id);
int64_t pos = 0;
ObRegisterMdsFlag flag;
flag.need_flush_redo_instantly_ = true;
flag.mds_base_scn_ = end_scn;
ObTransferTabletInfo tablet_info;
tablet_info.tablet_id_ = tablet_id;
tablet_info.transfer_seq_ = 0;
ObTXStartTransferOutInfo start_transfer_out_info;
start_transfer_out_info.src_ls_id_ = loc2;
start_transfer_out_info.dest_ls_id_ = loc1;
start_transfer_out_info.tablet_list_.push_back(tablet_info);
start_transfer_out_info.transfer_epoch_= 1;
start_transfer_out_info.data_end_scn_ = end_scn;
int64_t buf_len = start_transfer_out_info.get_serialize_size();
char *buf = (char*)allocator.alloc(buf_len);
start_transfer_out_info.serialize(buf, buf_len, pos);
if (OB_FAIL(conn->register_multi_data_source(R.tenant_id_, loc1,
ObTxDataSourceType::START_TRANSFER_OUT, buf, buf_len, flag))) {
LOG_WARN("failed to register mds", K(ret), K(start_transfer_out_info));
} else {
LOG_INFO("register mds", K(start_transfer_out_info));
}
ob_usleep(1000 * 1000);
trans.end(false);
int rd = rand() % 7000;
ob_usleep(rd * 1000);
}
});
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
int64_t session_id = 0;
EQ(0, SSH::find_session(conn, session_id));
for (int i=0;i<5;i++) {
int64_t start_time = ObTimeUtil::current_time();
EQ(0, SSH::write(conn, "insert into stu2 select * from stu1", affected_rows));
int64_t end_time = ObTimeUtil::current_time();
int64_t query_cost = end_time - start_time;
ObTransID tx_id;
int64_t request_id = 0;
ObString trace_id;
int64_t retry_cnt = 0;
EQ(0, SSH::find_request(R.tenant_id_, session_id, request_id, tx_id,trace_id, retry_cnt));
start_time = ObTimeUtil::current_time();
EQ(0, conn->commit());
end_time = ObTimeUtil::current_time();
LOGI("large_query %d: txid:%ld query_cost:%ld commit_cost:%ld row_count:%ld retry_cnt:%ld trace_id:%s", i,tx_id.get_id(), query_cost,
end_time-start_time, affected_rows, retry_cnt, trace_id.ptr());
}
stop = true;
th.join();
int64_t row_count = 0;
EQ(0, SSH::select_int64(sql_proxy, "select count(*) as val from stu2", row_count));
LOGI("large_query: row_count:%ld", row_count);
//get_curr_simple_server().get_sql_proxy().write("alter system set syslog_level='INFO'", affected_rows);
}
*/
TEST_F(ObTransferTx, epoch_recover_from_active_info)
{
TRANSFER_CASE_PREPARE;
ObTransID tx_id;
ObLSID ls_id;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into stu1 values(200)"));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", ls_id));
EQ(0, SSH::find_tx(conn, tx_id));
ObPartTransCtx ctx1;
EQ(0, SSH::find_tx_info(R.tenant_id_, ls_id, tx_id, ctx1));
LOGI("ls_id:%ld, tx:%ld epoch:%ld", ls_id.id(), tx_id.get_id(), ctx1.epoch_);
// write active info log
EQ(0, SSH::ls_reboot(R.tenant_id_, ls_id));
// recover epoch by TxActiveInfoLog
ObPartTransCtx ctx2;
EQ(0, SSH::find_tx_info(R.tenant_id_, ls_id, tx_id, ctx2));
LOGI("ls_id:%ld, tx:%ld epoch:%ld", ls_id.id(), tx_id.get_id(), ctx2.epoch_);
EQ(ctx1.epoch_, ctx2.epoch_);
EQ(0, conn->commit());
}
TEST_F(ObTransferTx, epoch_recover_from_ctx_checkpoint)
{
TRANSFER_CASE_PREPARE;
ObTransID tx_id;
ObLSID ls_id;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into stu1 values(100)"));
EQ(0, SSH::write(conn, "insert into stu2 values(200)"));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", ls_id));
EQ(0, SSH::find_tx(conn, tx_id));
ObPartTransCtx ctx1;
EQ(0, SSH::find_tx_info(R.tenant_id_, ls_id, tx_id, ctx1));
LOGI("ls_id:%ld, tx:%ld epoch:%ld", ls_id.id(), tx_id.get_id(), ctx1.epoch_);
InjectTxFaultHelper inject_tx_fault_helper;
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, ls_id, tx_id, ObTxLogType::TX_COMMIT_LOG));
int commit_ret = -1;
std::thread th([&conn, &commit_ret]() {
commit_ret = conn->commit();
});
// make tx block prepare phase
EQ(0, SSH::wait_tx(R.tenant_id_, ls_id, tx_id, ObTxState::PREPARE));
EQ(0, SSH::find_tx_info(R.tenant_id_, ls_id, tx_id, ctx1));
GE(ctx1.exec_info_.state_, ObTxState::PREPARE);
// checkpoint tx ctx to newest
EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, ls_id));
// replay from middle
EQ(0, SSH::ls_reboot(R.tenant_id_, ls_id));
ObPartTransCtx ctx2;
EQ(0, SSH::find_tx_info(R.tenant_id_, ls_id, tx_id, ctx2));
LOGI("ls_id:%ld, tx:%ld epoch:%ld", ls_id.id(), tx_id.get_id(), ctx2.epoch_);
EQ(ctx1.epoch_, ctx2.epoch_);
inject_tx_fault_helper.release();
th.join();
EQ(0, commit_ret);
}
TEST_F(ObTransferTx, epoch_recover_from_ctx_checkpoint2)
{
TRANSFER_CASE_PREPARE;
ObTransID tx_id;
ObLSID ls_id;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into stu1 values(100)"));
EQ(0, SSH::write(conn, "insert into stu2 values(200)"));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", ls_id));
EQ(0, SSH::find_tx(conn, tx_id));
ObPartTransCtx ctx1;
EQ(0, SSH::find_tx_info(R.tenant_id_, ls_id, tx_id, ctx1));
LOGI("ls_id:%ld, tx:%ld epoch:%ld", ls_id.id(), tx_id.get_id(), ctx1.epoch_);
InjectTxFaultHelper inject_tx_fault_helper;
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, ls_id, tx_id, ObTxLogType::TX_CLEAR_LOG));
int commit_ret = -1;
std::thread th([&conn, &commit_ret]() {
commit_ret = conn->commit();
});
// make tx block prepare phase
EQ(0, SSH::wait_tx(R.tenant_id_, ls_id, tx_id, ObTxState::COMMIT));
EQ(0, SSH::find_tx_info(R.tenant_id_, ls_id, tx_id, ctx1));
GE(ctx1.exec_info_.state_, ObTxState::COMMIT);
// checkpoint tx ctx to newest
EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, ls_id));
// replay from middle
EQ(0, SSH::ls_reboot(R.tenant_id_, ls_id));
ObPartTransCtx ctx2;
EQ(0, SSH::find_tx_info(R.tenant_id_, ls_id, tx_id, ctx2));
LOGI("ls_id:%ld, tx:%ld epoch:%ld", ls_id.id(), tx_id.get_id(), ctx2.epoch_);
EQ(ctx1.epoch_, ctx2.epoch_);
inject_tx_fault_helper.release();
th.join();
EQ(0, commit_ret);
}
// 空sstable、没有活跃事务
TEST_F(ObTransferTx, transfer_empty_tablet)
{
// 关掉observer内部的均衡,防止LS均衡,只调度分区均衡
TRANSFER_CASE_PREPARE;
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
EQ(0, sql_proxy.write("insert into stu2 values(100)", affected_rows));
int64_t val = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", val));
EQ(100, val);
}
// sstable有数据,没有活跃事务 transfer
TEST_F(ObTransferTx, transfer_no_active_tx)
{
TRANSFER_CASE_PREPARE;
EQ(0, sql_proxy.write("insert into stu1 values(100)", affected_rows));
EQ(0, sql_proxy.write("insert into stu2 values(100)", affected_rows));
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
EQ(0, sql_proxy.write("insert into stu2 values(200)", affected_rows));
int64_t val = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu1", val));
EQ(100, val);
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", val));
EQ(300, val);
}
// sstable有数据,有活跃事务 transfer
TEST_F(ObTransferTx, transfer_active_tx)
{
TRANSFER_CASE_PREPARE;
EQ(0, sql_proxy.write("insert into stu1 values(100)", affected_rows));
EQ(0, sql_proxy.write("insert into stu2 values(100)", affected_rows));
ObTransID tx_id, tx_id1, tx_id2, tx_id3;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into stu2 values(200)"));
EQ(0, SSH::find_tx(conn, tx_id));
sqlclient::ObISQLConnection *conn1 = NULL;
EQ(0, sql_proxy.acquire(conn1));
EQ(0, conn1->execute_write(OB_SYS_TENANT_ID, "set autocommit=0", affected_rows));
EQ(0, conn1->execute_write(OB_SYS_TENANT_ID, "insert into stu2 values(50)", affected_rows));
EQ(0, SSH::find_tx(conn1, tx_id1));
sqlclient::ObISQLConnection *conn2 = NULL;
EQ(0, sql_proxy.acquire(conn2));
EQ(0, SSH::write(conn2, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn2, "insert into stu2 values(300)", affected_rows));
EQ(0, SSH::write(conn2, "insert into stu1 values(200)", affected_rows));
EQ(0, SSH::find_tx(conn2, tx_id2));
sqlclient::ObISQLConnection *conn3 = NULL;
EQ(0, sql_proxy.acquire(conn3));
EQ(0, SSH::write(conn3, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn3, "insert into stu2 values(400)", affected_rows));
EQ(0, SSH::write(conn3, "insert into stu1 values(500)", affected_rows));
EQ(0, SSH::find_tx(conn3, tx_id3));
LOGI("find_tx:%ld %ld %ld %ld",tx_id.get_id(), tx_id1.get_id(), tx_id2.get_id(), tx_id3.get_id());
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
int64_t sum1 = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu1", sum1));
EQ(100, sum1);
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", sum1));
EQ(100, sum1);
EQ(0, SSH::write(conn, "insert into stu2 values(1000)", affected_rows));
EQ(0, SSH::write(conn2, "insert into stu2 values(1000)", affected_rows));
EQ(0, SSH::write(conn3, "insert into stu2 values(1000)", affected_rows));
EQ(0, SSH::submit_redo(R.tenant_id_, loc1));
EQ(0, SSH::submit_redo(R.tenant_id_, loc2));
EQ(0, conn->commit());
EQ(0, conn1->commit());
EQ(0, conn2->commit());
EQ(0, conn3->rollback());
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu1", sum1));
EQ(300, sum1);
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", sum1));
EQ(2650, sum1);
}
// transfer active tx A->B B->A
TEST_F(ObTransferTx, transfer_A_B_AND_B_A)
{
TRANSFER_CASE_PREPARE;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(100)", affected_rows));
ObTransID tx_id1, tx_id2;
EQ(0, SSH::find_tx(conn, tx_id1));
LOGI("find active_tx tx_id:%ld %ld", tx_id1.get_id(), tx_id2.get_id());
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
EQ(0, SSH::write(conn, "insert into stu2 values(200)", affected_rows));
EQ(0, sql_proxy.write("alter table stu2 tablegroup=''", affected_rows));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 != loc2) {
break;
}
::sleep(1);
}
EQ(0, SSH::write(conn, "insert into stu2 values(300)", affected_rows));
EQ(0, conn->commit());
int64_t val = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2",val));
EQ(600, val);
}
TEST_F(ObTransferTx, transfer_replay)
{
TRANSFER_CASE_PREPARE;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(100)", affected_rows));
sqlclient::ObISQLConnection *conn2 = NULL;
EQ(0, sql_proxy.acquire(conn2));
EQ(0, SSH::write(conn2, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn2, "insert into stu2 values(100)", affected_rows));
EQ(0, SSH::write(conn2, "insert into stu1 values(100)", affected_rows));
sqlclient::ObISQLConnection *conn3 = NULL;
EQ(0, sql_proxy.acquire(conn3));
EQ(0, SSH::write(conn3, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn3, "insert into stu2 values(100)", affected_rows));
ObTransID tx_id1, tx_id2, tx_id3;
EQ(0, SSH::find_tx(conn, tx_id1));
EQ(0, SSH::find_tx(conn2, tx_id2));
EQ(0, SSH::find_tx(conn3, tx_id3));
LOGI("find active_tx tx_id:%ld %ld %ld", tx_id1.get_id(), tx_id2.get_id(), tx_id3.get_id());
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
ObLSID loc1_tmp, loc2_tmp;
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1_tmp));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2_tmp));
if (loc1_tmp == loc2_tmp) {
break;
}
::sleep(1);
}
EQ(0, conn->commit());
EQ(0, conn2->commit());
EQ(OB_SUCCESS, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(OB_SUCCESS, SSH::ls_reboot(R.tenant_id_, loc2));
EQ(0, conn3->commit());
int64_t val = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2",val));
EQ(300, val);
}
// sstable有数据,有活跃事务 transfer
TEST_F(ObTransferTx, transfer_abort_active_tx)
{
TRANSFER_CASE_PREPARE;
R.stop_balance_ = true;
EQ(0, sql_proxy.write("insert into stu1 values(100)", affected_rows));
EQ(0, sql_proxy.write("insert into stu2 values(100)", affected_rows));
ObTransID tx_id, tx_id1, tx_id2, tx_id3;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(100)", affected_rows));
EQ(0, SSH::find_tx(conn, tx_id));
sqlclient::ObISQLConnection *conn2 = NULL;
EQ(0, sql_proxy.acquire(conn2));
EQ(0, SSH::write(conn2, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn2, "insert into stu2 values(100)", affected_rows));
EQ(0, SSH::write(conn2, "insert into stu1 values(100)", affected_rows));
EQ(0, SSH::find_tx(conn2, tx_id2));
LOGI("find_tx:%ld %ld",tx_id.get_id(), tx_id2.get_id());
ObTransferTabletInfo tablet_info;
EQ(0, SSH::select_table_tablet(R.tenant_id_, "stu2", tablet_info.tablet_id_));
tablet_info.transfer_seq_ = 0;
EQ(0, do_transfer_start_abort(R.tenant_id_, loc1, loc2, tablet_info));
EQ(0, SSH::write(conn, "insert into stu1 values(1000)", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(1000)", affected_rows));
EQ(0, do_transfer_start_abort(R.tenant_id_, loc1, loc2, tablet_info));
R.stop_balance_ = false;
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
EQ(0, conn->commit());
EQ(0, conn2->commit());
int64_t sum1 = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu1", sum1));
EQ(1200, sum1);
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", sum1));
EQ(1300, sum1);
}
TEST_F(ObTransferTx, transfer_resume)
{
TRANSFER_CASE_PREPARE;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(100)", affected_rows));
ObTransID tx_id1, tx_id2;
EQ(0, SSH::find_tx(conn, tx_id1));
LOGI("find active_tx tx_id:%ld %ld", tx_id1.get_id(), tx_id2.get_id());
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
EQ(0, SSH::ls_resume(R.tenant_id_, loc1));
EQ(0, SSH::ls_resume(R.tenant_id_, loc2));
EQ(0, SSH::write(conn, "insert into stu2 values(200)", affected_rows));
EQ(0, conn->commit());
int64_t val = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2",val));
EQ(300, val);
}
// sstable有数据,有活跃事务,但事务数据丢失不完整 transfer
TEST_F(ObTransferTx, transfer_query_lost)
{
TRANSFER_CASE_PREPARE;
EQ(0, sql_proxy.write("insert into stu1 values(100)", affected_rows));
EQ(0, sql_proxy.write("insert into stu2 values(100)", affected_rows));
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(300)", affected_rows));
// 活跃事务操作了stu1
EQ(0, SSH::write(conn, "insert into stu1 values(200)", affected_rows));
sqlclient::ObISQLConnection *conn2 = NULL;
EQ(0, sql_proxy.acquire(conn2));
EQ(0, SSH::write(conn2, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn2, "insert into stu2 values(400)", affected_rows));
ObTransID tx_id1, tx_id2;
EQ(0, SSH::find_tx(conn, tx_id1));
EQ(0, SSH::find_tx(conn2, tx_id2));
LOGI("tx_id:%ld %ld", tx_id1.get_id(), tx_id2.get_id());
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
// remove tx_ctx simulate sys reboot
EQ(0, SSH::abort_tx(R.tenant_id_, loc1, tx_id1));
EQ(0, SSH::abort_tx(R.tenant_id_, loc1, tx_id2));
// query lost
NEQ(0, conn->commit());
// transfer lost
NEQ(0, conn2->commit());
}
// transfer active tx A->B A->B
TEST_F(ObTransferTx, transfer_A_B_AND_A_B)
{
TRANSFER_CASE_PREPARE;
EQ(0, sql_proxy.write("create table stu3(col int)", affected_rows));
EQ(0, sql_proxy.write("create table stu4(col int)", affected_rows));
ObLSID loc3,loc4;
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu3", loc3));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu4", loc4));
EQ(loc1, loc3);
EQ(loc2, loc4);
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(100)", affected_rows));
ObTransID tx_id1, tx_id2;
EQ(0, SSH::find_tx(conn, tx_id1));
LOGI("find active_tx tx_id:%ld %ld", tx_id1.get_id(), tx_id2.get_id());
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
// abort tx create by move in simulate transfer tx lost
EQ(0, SSH::abort_tx(R.tenant_id_, loc1, tx_id1));
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu3,stu4;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu3", loc3));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu4", loc4));
if (loc3 == loc4 && loc1 == loc3) {
break;
}
::sleep(1);
}
// transfer lost
NEQ(0, conn->commit());
}
TEST_F(ObTransferTx, transfer_filter_tx)
{
TRANSFER_CASE_PREPARE;
rootserver::ObNewTableTabletAllocator::alloc_tablet_ls_offset_ = 0;
EQ(OB_SUCCESS, sql_proxy.write("create table stu3(col int)", affected_rows));
EQ(OB_SUCCESS, sql_proxy.write("create table stu4(col int)", affected_rows));
ObLSID loc3, loc4;
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu3", loc3));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu4", loc4));
EQ(loc1, loc3);
EQ(loc2, loc4);
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into stu4 values(100)"));
ObTransID tx_id;
EQ(0, SSH::find_tx(conn, tx_id));
sqlclient::ObISQLConnection *conn2 = NULL;
EQ(0, sql_proxy.acquire(conn2));
EQ(0, SSH::write(conn2, "set autocommit=0"));
EQ(0, SSH::write(conn2, "insert into stu2 values(100)"));
ObTransID tx_id2;
EQ(0, SSH::find_tx(conn2, tx_id2));
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
ObSqlString sql;
sql.assign_fmt("select count(*) as val from __all_virtual_trans_stat where trans_id=%ld", tx_id.tx_id_);
int64_t part_cnt = 0;
EQ(0, SSH::g_select_int64(R.tenant_id_, sql.ptr(), part_cnt));
EQ(1, part_cnt);
sql.assign_fmt("select count(*) as val from __all_virtual_trans_stat where trans_id=%ld", tx_id2.tx_id_);
EQ(0, SSH::g_select_int64(R.tenant_id_, sql.ptr(), part_cnt));
EQ(2, part_cnt);
EQ(0, conn->commit());
sql_proxy.close(conn, 0);
EQ(0, conn2->commit());
sql_proxy.close(conn2, 0);
int64_t sum = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", sum));
EQ(100, sum);
}
TEST_F(ObTransferTx, transfer_AND_ddl)
{
TRANSFER_CASE_PREPARE;
bool case_stop = false;
int case_err = 0;
std::thread th([&]() {
int64_t affected_rows = 0;
int ret = 0;
DEFER(if (OB_FAIL(ret)) {case_err = ret;});
while (!case_stop && OB_SUCC(ret)) {
if (OB_FAIL(sql_proxy.write(OB_SYS_TENANT_ID, "truncate table stu2", affected_rows))) {
}
ob_usleep(500 * 1000);
}
});
std::thread th2([&]() {
int64_t affected_rows = 0;
int ret = 0;
DEFER(if (OB_FAIL(ret)) {case_err = ret;});
while (!case_stop && OB_SUCC(ret)) {
if (OB_FAIL(sql_proxy.write(OB_SYS_TENANT_ID, "insert into stu2 values(100)", affected_rows))) {
}
ob_usleep(500 * 1000);
}
});
int64_t start_time = ObTimeUtil::current_time();
while (ObTimeUtil::current_time() - start_time < 15 * 1000 * 1000) {
int64_t task = 0;
ObSqlString sql;
sql.assign_fmt("select count(*) as val from __all_virtual_transfer_task where tenant_id=%ld", R.tenant_id_);
EQ(0, SSH::g_select_int64(OB_SYS_TENANT_ID, sql.ptr(), task));
if (task == 0) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
EQ(0, sql_proxy.write("alter table stu2 tablegroup=''", affected_rows));
} else {
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2", affected_rows));
}
EQ(0, do_balance(R.tenant_id_));
}
ob_usleep(1 * 1000 * 1000);
}
case_stop = true;
th.join();
th2.join();
EQ(0, case_err);
}
TEST_F(ObTransferTx, transfer_AND_rollback)
{
TRANSFER_CASE_PREPARE;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(100)", affected_rows));
EQ(0, SSH::write(conn, "savepoint sp1", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(200)", affected_rows));
int64_t val = 0;
EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val));
EQ(300, val);
ObTransID tx_id1, tx_id2;
EQ(0, SSH::find_tx(conn, tx_id1));
LOGI("find active_tx tx_id:%ld %ld", tx_id1.get_id(), tx_id2.get_id());
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
EQ(0, SSH::write(conn, "rollback to sp1", affected_rows));
EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val));
EQ(100, val);
EQ(0, SSH::write(conn, "insert into stu2 values(300)", affected_rows));
EQ(0, conn->commit());
EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val));
EQ(400, val);
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
}
TEST_F(ObTransferTx, transfer_AND_rollback2)
{
TRANSFER_CASE_PREPARE;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(100)", affected_rows));
EQ(0, SSH::write(conn, "savepoint sp1", affected_rows));
EQ(0, SSH::write(conn, "insert into stu2 values(200)", affected_rows));
int64_t val = 0;
ObTransID tx_id1, tx_id2;
EQ(0, SSH::find_tx(conn, tx_id1));
LOGI("find active_tx tx_id:%ld %ld", tx_id1.get_id(), tx_id2.get_id());
ObTabletID tablet_id;
EQ(0, SSH::select_table_tablet(R.tenant_id_, "stu2", tablet_id));
EQ(0, SSH::freeze(R.tenant_id_, loc2, tablet_id));
EQ(0, SSH::wait_flush_finish(R.tenant_id_, loc2, tablet_id));
//
EQ(0, SSH::write(conn, "rollback to sp1", affected_rows));
EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val));
EQ(100, val);
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val));
EQ(100, val);
EQ(0, wait_balance_clean(R.tenant_id_));
EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val));
EQ(100, val);
EQ(0, conn->commit());
EQ(0, SSH::select_int64(conn, "select sum(col) as val from stu2", val));
EQ(100, val);
}
/*
TEST_F(ObTransferTx, transfer_tx_ctx_merge)
{
TRANSFER_CASE_PREPARE;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into stu1 values(100)"));
EQ(0, SSH::write(conn, "insert into stu2 values(100)"));
ObTransID tx_id;
EQ(0, SSH::find_tx(conn, tx_id));
LOGI("find active_tx tx_id:%ld", tx_id.get_id());
sqlclient::ObISQLConnection *conn2 = NULL;
EQ(0, sql_proxy.acquire(conn2));
EQ(0, SSH::write(conn2, "set autocommit=0"));
EQ(0, SSH::write(conn2, "insert into stu1 values(100)"));
EQ(0, SSH::write(conn2, "insert into stu2 values(100)"));
ObTransID tx_id2;
EQ(0, SSH::find_tx(conn2, tx_id2));
LOGI("find active_tx tx_id2:%ld", tx_id2.get_id());
sqlclient::ObISQLConnection *conn3 = NULL;
EQ(0, sql_proxy.acquire(conn3));
EQ(0, SSH::write(conn3, "set autocommit=0"));
EQ(0, SSH::write(conn3, "insert into stu1 values(100)"));
EQ(0, SSH::write(conn3, "insert into stu2 values(100)"));
ObTransID tx_id3;
EQ(0, SSH::find_tx(conn3, tx_id3));
LOGI("find active_tx tx_id3:%ld", tx_id3.get_id());
sqlclient::ObISQLConnection *conn4 = NULL;
EQ(0, sql_proxy.acquire(conn4));
EQ(0, SSH::write(conn4, "set autocommit=0"));
EQ(0, SSH::write(conn4, "insert into stu1 values(100)"));
EQ(0, SSH::write(conn4, "insert into stu2 values(100)"));
ObTransID tx_id4;
EQ(0, SSH::find_tx(conn4, tx_id4));
LOGI("find active_tx tx_id4:%ld", tx_id4.get_id());
sqlclient::ObISQLConnection *conn5 = NULL;
EQ(0, sql_proxy.acquire(conn5));
EQ(0, SSH::write(conn5, "set autocommit=0"));
EQ(0, SSH::write(conn5, "insert into stu1 values(100)"));
EQ(0, SSH::write(conn5, "insert into stu2 values(100)"));
ObTransID tx_id5;
EQ(0, SSH::find_tx(conn5, tx_id5));
LOGI("find active_tx tx_id5:%ld", tx_id5.get_id());
EQ(0, SSH::submit_redo(R.tenant_id_, loc1));
InjectTxFaultHelper inject_tx_fault_helper;
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc1, tx_id, ObTxLogType::TX_REDO_LOG));
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc1, tx_id2, ObTxLogType::TX_PREPARE_LOG));
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc1, tx_id3, ObTxLogType::TX_COMMIT_LOG));
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc1, tx_id4, ObTxLogType::TX_CLEAR_LOG));
EQ(0, inject_tx_fault_helper.inject_tx_block(R.tenant_id_, loc1, tx_id5, ObTxLogType::TX_CLEAR_LOG));
int commit_ret = -1;
std::thread th([&conn, &commit_ret]() {
commit_ret = conn->commit();
});
int commit_ret2 = -1;
std::thread th2([&conn2, &commit_ret2]() {
commit_ret2 = conn2->commit();
});
int commit_ret3 = -1;
std::thread th3([&conn3, &commit_ret3]() {
commit_ret3 = conn3->commit();
});
int commit_ret4 = -1;
std::thread th4([&conn4, &commit_ret4]() {
commit_ret4 = conn4->commit();
});
int commit_ret5 = -1;
EQ(0, SSH::abort_tx(R.tenant_id_, loc1, tx_id5));
std::thread th5([&conn5, &commit_ret5]() {
commit_ret5 = conn5->commit();
});
ob_usleep(200 * 1000);
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// make wrs check approve
EQ(0, SSH::modify_wrs(R.tenant_id_, loc2));
// wait
LOGI("debug>");
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
LOGI("debug>");
inject_tx_fault_helper.release();
th.join();
th2.join();
th3.join();
th4.join();
th5.join();
LOGI("debug>");
EQ(0, commit_ret);
EQ(0, commit_ret2);
EQ(0, commit_ret3);
EQ(0, commit_ret4);
NEQ(0, commit_ret5);
int64_t val1 = 0;
int64_t val2 = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu1", val1));
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", val2));
EQ(400, val1);
EQ(400, val2);
LOGI("debug>");
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
LOGI("debug>");
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
LOGI("debug>");
EQ(0, wait_balance_clean(R.tenant_id_));
}
*/
TEST_F(ObTransferTx, transfer_batch)
{
TRANSFER_CASE_PREPARE;
sql_proxy.write("alter system set _transfer_start_trans_timeout = '10s'", affected_rows);
std::set<sqlclient::ObISQLConnection*> jobs;
for (int i =0 ;i< 2100;i++) {
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into stu2 values(100)"));
jobs.insert(conn);
}
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
for (auto iter = jobs.begin();iter !=jobs.end();iter++) {
EQ(0, (*iter)->commit());
sql_proxy.close(*iter, 0);
}
int64_t sum = 0;
EQ(0, SSH::select_int64(sql_proxy, "select sum(col) as val from stu2", sum));
EQ(100 * 2100, sum);
sql_proxy.write("alter system set _transfer_start_trans_timeout='1s'",affected_rows);
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
}
/*
TEST_F(ObTransferTx, replay_from_middle)
{
TRANSFER_CASE_PREPARE;
sqlclient::ObISQLConnection *conn = NULL;
EQ(0, sql_proxy.acquire(conn));
EQ(0, SSH::write(conn, "set autocommit=0"));
EQ(0, SSH::write(conn, "insert into test.stu1 values(100)", affected_rows));
EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1));
int64_t val = -1;
EQ(0, SSH::select_int64(conn, "select get_lock('test_lock', 10) val", val));
EQ(1, val);
EQ(0, conn->commit());
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, ObLSID(1)));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, ObLSID(1)));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
EQ(0, SSH::ls_reboot(R.tenant_id_, ObLSID(1)));
LOGI("release_lock");
val = -1;
EQ(0, SSH::select_int64(conn, "select release_lock('test_lock')", val));
// session end lock release
LOGI("get_lock");
val = -1;
EQ(0, SSH::select_int64(sql_proxy, "select get_lock('test_lock', 1) val", val));
EQ(1, val);
// session end lock release
LOGI("get_lock");
val = -1;
EQ(0, SSH::select_int64(sql_proxy, "select get_lock('test_lock', 1) val", val));
EQ(1, val);
}
*/
TEST_F(ObTransferTx, transfer_mds_trans)
{
TRANSFER_CASE_PREPARE;
ObMySQLTransaction trans;
EQ(0, trans.start(GCTX.sql_proxy_, R.tenant_id_));
EQ(0, trans.write(R.tenant_id_, "insert into test.stu1 values(100)", affected_rows));
EQ(0, trans.write(R.tenant_id_, "insert into test.stu2 values(100)", affected_rows));
//EQ(0, sql_proxy.write("alter system minor freeze", affected_rows));
// make it replay from middle
EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc1));
EQ(0, SSH::wait_checkpoint_newest(R.tenant_id_, loc2));
EQ(0, trans.write(R.tenant_id_, "insert into test.stu1 values(200)", affected_rows));
EQ(0, trans.write(R.tenant_id_, "insert into test.stu2 values(200)", affected_rows));
observer::ObInnerSQLConnection *conn = static_cast<observer::ObInnerSQLConnection *>(trans.get_connection());
char buf[10];
ObRegisterMdsFlag flag;
EQ(0, conn->register_multi_data_source(R.tenant_id_,
loc1,
ObTxDataSourceType::TEST3,
buf,
10,
flag));
EQ(0, conn->register_multi_data_source(R.tenant_id_,
loc2,
ObTxDataSourceType::TEST3,
buf,
10,
flag));
EQ(0, conn->register_multi_data_source(R.tenant_id_,
loc2,
ObTxDataSourceType::TEST3,
buf,
10,
flag));
ObTransID tx_id;
EQ(0, SSH::g_select_int64(R.tenant_id_, "select trans_id as val from __all_virtual_trans_stat where is_exiting=0 and session_id<=1 limit 1", tx_id.tx_id_));
LOGI("find active_tx tx_id:%ld", tx_id.get_id());
EQ(0, sql_proxy.write("alter tablegroup tg1 add stu1,stu2;", affected_rows));
EQ(0, do_balance(R.tenant_id_));
// wait
while (true) {
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu1", loc1));
EQ(0, SSH::select_table_loc(R.tenant_id_, "stu2", loc2));
if (loc1 == loc2) {
break;
}
::sleep(1);
}
EQ(0, trans.end(false));
int64_t val1 = 0;
int64_t val2 = 0;
EQ(0, SSH::select_int64(sql_proxy, "select count(col) as val from stu1", val1));
EQ(0, SSH::select_int64(sql_proxy, "select count(col) as val from stu2", val2));
EQ(0, val1);
EQ(0, val2);
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_ctx(R.tenant_id_, loc2));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc1));
EQ(0, SSH::freeze_tx_data(R.tenant_id_, loc2));
LOGI("ls_reboot");
EQ(0, SSH::ls_reboot(R.tenant_id_, loc1));
EQ(0, SSH::ls_reboot(R.tenant_id_, loc2));
}
TEST_F(ObTransferTx, create_more_ls)
{
for (int i=0;i<8;i++) {
EQ(0, SSH::create_ls(R.tenant_id_, get_curr_observer().self_addr_));
}
int64_t ls_count = 0;
EQ(0, SSH::g_select_int64(R.tenant_id_, "select count(ls_id) as val from __all_ls where ls_id!=1", ls_count));
EQ(10, ls_count);
}
TEST_F(ObTransferTx, bench)
{
int64_t affected_rows = 0;
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
// clean
EQ(0, sql_proxy.write("drop database if exists test", affected_rows));
EQ(0, sql_proxy.write("create database if not exists test", affected_rows));
EQ(0, sql_proxy.write("drop tablegroup if exists tg1", affected_rows));
EQ(0, sql_proxy.write("use test", affected_rows));
EQ(0, do_balance(R.tenant_id_));
rootserver::ObNewTableTabletAllocator::alloc_tablet_ls_offset_ = 0;
std::vector<std::string> tables;
for (int i=1;i<=10;i++) {
ObSqlString sql;
std::string table_name = "stu"+std::to_string(i);
sql.assign_fmt("create table %s(col int)", table_name.c_str());
tables.push_back(table_name);
EQ(0, sql_proxy.write(sql.ptr(), affected_rows));
}
EQ(0, sql_proxy.write("create tablegroup tg1 sharding='NONE';", affected_rows));
bool bench_stop = false;
int bench_err = 0;
std::vector<std::thread> ths;
for (int idx = 0; idx < 200; idx++) {
ths.push_back(std::thread([&sql_proxy, &bench_stop, &bench_err, tables] () {
int ret = OB_SUCCESS;
DEFER(if (OB_FAIL(ret)) bench_err = ret;);
int64_t affected_rows;
sqlclient::ObISQLConnection *conn = NULL;
if (OB_FAIL(sql_proxy.acquire(conn))) {
LOG_WARN("acquire conn failed", KR(ret));
} else if (OB_FAIL(SSH::write(conn, "set autocommit=0", affected_rows))) {
LOG_WARN("execute write failed", KR(ret));
}
while (OB_SUCC(ret) && !bench_stop) {
std::vector<int> query_count_limit_example = {1, 10, 100, 200};
int query_count = rand() % query_count_limit_example.at(rand() % query_count_limit_example.size());
if (query_count == 0) {
query_count = 1;
}
for (int i = 0; OB_SUCC(ret) && i < query_count; i++) {
ObSqlString sql;
static int64_t pk = 0;
sql.assign_fmt("insert into %s values (%ld)", tables.at(rand() % tables.size()).c_str(), ATOMIC_AAF(&pk, 1));
if (OB_FAIL(SSH::write(conn, sql.ptr()))) {
LOG_WARN("execute write failed", KR(ret), K(sql));
} else {
ob_usleep(rand()%100 * 1000);
}
}
ObTransID tx_id;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(SSH::find_tx(conn, tx_id))) {
LOG_WARN("find_tx failed", KR(ret));
} else {
if (rand() % 100 < 50) {
if (OB_FAIL(conn->commit())) {
LOG_WARN("execute commit failed", KR(ret), K(tx_id));
}
} else {
if (OB_FAIL(ret = conn->rollback())) {
LOG_WARN("execute rollback failed", KR(ret), K(tx_id));
}
}
}
}
}));
}
int64_t start_time = ObTimeUtil::current_time();
int64_t wait_time = 1 * 60 * 1000 * 1000;
ObLSID loc,loc_tmp;
while (ObTimeUtil::current_time() - start_time < wait_time && !bench_err) {
int64_t task = 0;
ObSqlString sql;
sql.assign_fmt("select count(*) as val from __all_virtual_transfer_task where tenant_id=%ld", R.tenant_id_);
EQ(0, SSH::g_select_int64(OB_SYS_TENANT_ID, sql.ptr(), task));
if (task == 0) {
LOGI("transfer task empty want to generate");
bool table_loc_same = true;
for (int i = 0; i < tables.size();i++) {
EQ(0, SSH::select_table_loc(R.tenant_id_, tables.at(i).c_str(), loc_tmp));
if (i == 0) {
loc = loc_tmp;
} else if (loc != loc_tmp) {
table_loc_same = false;
}
}
if (!table_loc_same) {
for (auto &iter : tables) {
sql.assign_fmt("alter tablegroup tg1 add %s", iter.c_str());
EQ(0, sql_proxy.write(sql.ptr(), affected_rows));
}
} else {
for (auto &iter : tables) {
sql.assign_fmt("alter table %s tablegroup=''", iter.c_str());
EQ(0, sql_proxy.write(sql.ptr(), affected_rows));
}
}
int64_t start_time = ObTimeUtil::current_time();
EQ(0, do_balance(R.tenant_id_));
int64_t end_time = ObTimeUtil::current_time();
LOGI("finish do_balance: timeuse=%ld", end_time -start_time);
}
::sleep(3);
}
bench_stop = true;
for (auto &th : ths) {
th.join();
}
EQ(0, bench_err);
}
TEST_F(ObTransferTx, end)
{
int64_t wait_us = R.time_sec_ * 1000 * 1000;
while (ObTimeUtil::current_time() - R.start_time_ < wait_us) {
ob_usleep(1000 * 1000);
}
R.stop_ = true;
if (R.worker_.joinable()) {
R.worker_.join();
}
}
} // end unittest
} // end oceanbase
int main(int argc, char **argv)
{
int c = 0;
int64_t time_sec = 0;
char *log_level = (char*)"INFO";
while(EOF != (c = getopt(argc,argv,"t:l:"))) {
switch(c) {
case 't':
time_sec = atoi(optarg);
break;
case 'l':
log_level = optarg;
oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false;
break;
default:
break;
}
}
oceanbase::unittest::init_log_and_gtest(argc, argv);
OB_LOGGER.set_log_level(log_level);
LOG_INFO("main>>>");
oceanbase::unittest::R.time_sec_ = time_sec;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}