[BUG] fix transfer between rollback to
This commit is contained in:
parent
51c0f131ae
commit
afe7b912d6
35
deps/oblib/src/lib/container/ob_mask_set2.h
vendored
35
deps/oblib/src/lib/container/ob_mask_set2.h
vendored
@ -70,12 +70,35 @@ public:
|
||||
ret = OB_MASK_SET_NO_NODE;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int mask(const T &key, bool &is_new_mask)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int unmask(const T &key)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else {
|
||||
bool hit = false;
|
||||
for (int64_t i = 0 ; OB_SUCCESS == ret && i < array_->count(); i++) {
|
||||
if (array_->at(i) == key) {
|
||||
hit = true;
|
||||
if (OB_FAIL(bitset_.del_member(i))) {
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (OB_SUCCESS == ret) {
|
||||
if (!hit) {
|
||||
ret = OB_MASK_SET_NO_NODE;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int mask(const T &key, bool &is_new_mask)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool tmp_new_mask = false;
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
|
@ -111,6 +111,7 @@ ob_unittest_observer(test_callbacks_with_reverse_order test_callbacks_with_rever
|
||||
ob_unittest_observer(test_transfer_tx_data test_transfer_with_smaller_tx_data.cpp)
|
||||
ob_unittest_observer(test_transfer_in_after_abort test_transfer_in_after_abort.cpp)
|
||||
ob_unittest_observer(test_transfer_commit_action test_transfer_with_commit_action.cpp)
|
||||
ob_unittest_observer(test_transfer_rollback_to test_transfer_between_rollback_to.cpp)
|
||||
ob_unittest_observer(test_memtable_new_safe_to_destroy test_memtable_new_safe_to_destroy.cpp)
|
||||
ob_unittest_observer(test_tablet_to_ls_cache test_tablet_to_ls_cache.cpp)
|
||||
# TODO(muwei.ym): open later
|
||||
@ -123,4 +124,4 @@ ob_ha_unittest_observer(test_transfer_doing_stage_restart_with_mds_flush storage
|
||||
ob_ha_unittest_observer(test_transfer_complete_restart_with_mds_flush storage_ha/test_transfer_complete_restart_with_mds_flush.cpp)
|
||||
ob_ha_unittest_observer(test_transfer_with_empty_shell storage_ha/test_transfer_with_empty_shell.cpp)
|
||||
ob_ha_unittest_observer(test_mds_transaction test_mds_transaction.cpp)
|
||||
errsim_ha_unittest_observer(errsim_test_transfer_handler errsim/storage_ha/errsim_test_transfer_handler.cpp)
|
||||
errsim_ha_unittest_observer(errsim_test_transfer_handler errsim/storage_ha/errsim_test_transfer_handler.cpp)
|
||||
|
446
mittest/simple_server/test_transfer_between_rollback_to.cpp
Normal file
446
mittest/simple_server/test_transfer_between_rollback_to.cpp
Normal file
@ -0,0 +1,446 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <iostream>
|
||||
#define protected public
|
||||
#define private public
|
||||
|
||||
#include "env/ob_simple_cluster_test_base.h"
|
||||
#include "rootserver/ob_tenant_balance_service.h"
|
||||
#include "share/balance/ob_balance_job_table_operator.h"
|
||||
#include "mittest/env/ob_simple_server_helper.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "storage/tx/ob_tx_loop_worker.h"
|
||||
#include "storage/tx/ob_trans_part_ctx.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
|
||||
static ObTransID global_tx_id;
|
||||
static share::ObLSID global_ls_id;
|
||||
|
||||
namespace storage
|
||||
{
|
||||
int ObTransferHandler::wait_src_ls_advance_weak_read_ts_(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx)
|
||||
{
|
||||
UNUSED(task_info);
|
||||
UNUSED(timeout_ctx);
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
namespace transaction
|
||||
{
|
||||
|
||||
int ObTransService::batch_post_rollback_savepoint_msg_(ObTxDesc &tx,
|
||||
ObTxRollbackSPMsg &msg,
|
||||
const ObTxRollbackParts &list,
|
||||
int &post_succ_num)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int last_ret = OB_SUCCESS;
|
||||
post_succ_num = 0;
|
||||
const ObTxDesc *msg_tx_ptr = msg.tx_ptr_;
|
||||
ARRAY_FOREACH_NORET(list, idx) {
|
||||
const ObTxExecPart &p = list.at(idx);
|
||||
msg.receiver_ = p.ls_id_;
|
||||
msg.epoch_ = p.exec_epoch_;
|
||||
if (msg.epoch_ > 0) {
|
||||
msg.tx_ptr_ = NULL;
|
||||
}
|
||||
if (p.exec_epoch_ <= 0 && p.transfer_epoch_ > 0) {
|
||||
msg.set_for_transfer();
|
||||
}
|
||||
|
||||
if (global_ls_id.is_valid() && global_ls_id == msg.receiver_) {
|
||||
fprintf(stdout, "qcc encounter failure %ld, %s\n", global_ls_id.id(), to_cstring(tx));
|
||||
}
|
||||
|
||||
if ((!global_ls_id.is_valid() || global_ls_id != msg.receiver_)
|
||||
&& OB_FAIL(rpc_->post_msg(msg.receiver_, msg))) {
|
||||
if (OB_LS_IS_DELETED == ret) {
|
||||
ObSpinLockGuard lock(tx.lock_);
|
||||
ObAddr fake_addr;
|
||||
on_sp_rollback_succ_(p, tx, ObTxPart::EPOCH_DEAD, fake_addr);
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
TRANS_LOG(WARN, "post msg falied", K(ret), K(msg), K(p));
|
||||
last_ret = ret;
|
||||
}
|
||||
} else { ++post_succ_num; }
|
||||
msg.tx_ptr_ = msg_tx_ptr;
|
||||
}
|
||||
return last_ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
namespace unittest
|
||||
{
|
||||
|
||||
#define EXE_SQL(sql_str) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
||||
|
||||
#define EXE_SQL_FMT(...) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
||||
|
||||
#define WRITE_SQL_BY_CONN(conn, sql_str) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
||||
|
||||
#define WRITE_SQL_FMT_BY_CONN(conn, ...) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
||||
|
||||
#define READ_SQL_BY_CONN(conn, sql_str) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res));
|
||||
|
||||
|
||||
class ObTransferBetweenRollbackTo : public ObSimpleClusterTestBase
|
||||
{
|
||||
public:
|
||||
ObTransferBetweenRollbackTo(): ObSimpleClusterTestBase("test_transfer_between_rollback_to", "200G", "40G") {}
|
||||
|
||||
void prepare_tenant_env()
|
||||
{
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
int64_t affected_rows = 0;
|
||||
ObSqlString sql;
|
||||
sqlclient::ObISQLConnection *connection = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
|
||||
ASSERT_NE(nullptr, connection);
|
||||
WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_idle_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set partition_balance_schedule_interval = '10s';");
|
||||
sleep(5);
|
||||
}
|
||||
|
||||
void create_test_tenant(uint64_t &tenant_id)
|
||||
{
|
||||
TRANS_LOG(INFO, "create_tenant start");
|
||||
ASSERT_EQ(OB_SUCCESS, create_tenant("tt1", "20G", "100G"));
|
||||
fprintf(stdout, "finish sleep\n");
|
||||
ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id));
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
TRANS_LOG(INFO, "create_tenant end", K(tenant_id));
|
||||
}
|
||||
|
||||
int wait_balance_clean(uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
while (OB_SUCC(ret)) {
|
||||
bool is_clean = false;
|
||||
MTL_SWITCH(tenant_id) {
|
||||
ObBalanceJob job;
|
||||
int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP;
|
||||
if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(tenant_id,
|
||||
false,
|
||||
*GCTX.sql_proxy_,
|
||||
job,
|
||||
start_time,
|
||||
finish_time))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
is_clean = true;
|
||||
}
|
||||
} else {
|
||||
ob_usleep(200 * 1000);
|
||||
}
|
||||
}
|
||||
if (is_clean) {
|
||||
int64_t transfer_task_count = 0;
|
||||
if (OB_FAIL(SSH::g_select_int64(tenant_id, "select count(*) as val from __all_transfer_task", transfer_task_count))) {
|
||||
} else if (transfer_task_count == 0) {
|
||||
break;
|
||||
} else {
|
||||
ob_usleep(200 * 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void get_tablet_info_with_table_name(const char *name,
|
||||
int64_t &table_id,
|
||||
int64_t &object_id,
|
||||
int64_t &tablet_id,
|
||||
int64_t &ls_id)
|
||||
{
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("SELECT table_id, object_id, tablet_id, ls_id FROM oceanbase.DBA_OB_TABLE_LOCATIONS WHERE TABLE_NAME= '%s';", name));
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr()));
|
||||
sqlclient::ObMySQLResult *result = res.get_result();
|
||||
ASSERT_NE(nullptr, result);
|
||||
ASSERT_EQ(OB_SUCCESS, result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("table_id", table_id));
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("object_id", object_id));
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", tablet_id));
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("ls_id", ls_id));
|
||||
}
|
||||
}
|
||||
|
||||
int do_balance_inner_(uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
static std::mutex mutex;
|
||||
mutex.lock();
|
||||
MTL_SWITCH(tenant_id) {
|
||||
TRANS_LOG(INFO, "worker to do partition_balance");
|
||||
auto b_svr = MTL(rootserver::ObTenantBalanceService*);
|
||||
b_svr->reset();
|
||||
int64_t job_cnt = 0;
|
||||
int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP;
|
||||
ObBalanceJob job;
|
||||
if (OB_FAIL(b_svr->gather_stat_())) {
|
||||
TRANS_LOG(WARN, "failed to gather stat", KR(ret));
|
||||
} else if (OB_FAIL(b_svr->gather_ls_status_stat(tenant_id, b_svr->ls_array_))) {
|
||||
TRANS_LOG(WARN, "failed to gather ls stat", KR(ret));
|
||||
} else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
|
||||
tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
//NO JOB, need check current ls status
|
||||
ret = OB_SUCCESS;
|
||||
job_cnt = 0;
|
||||
} else {
|
||||
TRANS_LOG(WARN, "failed to get balance job", KR(ret), K(tenant_id));
|
||||
}
|
||||
} else if (OB_FAIL(b_svr->try_finish_current_job_(job, job_cnt))) {
|
||||
TRANS_LOG(WARN, "failed to finish current job", KR(ret), K(job));
|
||||
}
|
||||
if (OB_SUCC(ret) && job_cnt == 0 && OB_FAIL(b_svr->partition_balance_(true))) {
|
||||
TRANS_LOG(WARN, "failed to do partition balance", KR(ret));
|
||||
}
|
||||
}
|
||||
mutex.unlock();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int do_balance(uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(do_balance_inner_(tenant_id))) {
|
||||
} else if (OB_FAIL(do_balance_inner_(tenant_id))) {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObLS *get_ls(const int64_t tenant_id, const ObLSID ls_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLS *ls = nullptr;
|
||||
MTL_SWITCH(tenant_id)
|
||||
{
|
||||
ObLSHandle ls_handle;
|
||||
ObLSService *ls_svr = MTL(ObLSService *);
|
||||
OB_ASSERT(OB_SUCCESS == ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
|
||||
OB_ASSERT(nullptr != (ls = ls_handle.get_ls()));
|
||||
}
|
||||
return ls;
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(ObTransferBetweenRollbackTo, transfer_between_rollback_to)
|
||||
{
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
global_ls_id.reset();
|
||||
global_tx_id.reset();
|
||||
|
||||
// ============================== Phase1. create tenant ==============================
|
||||
TRANS_LOG(INFO, "create tenant start");
|
||||
uint64_t tenant_id = 0;
|
||||
create_test_tenant(tenant_id);
|
||||
TRANS_LOG(INFO, "create tenant end");
|
||||
|
||||
share::ObTenantSwitchGuard tenant_guard;
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id));
|
||||
|
||||
prepare_tenant_env();
|
||||
|
||||
// ============================== Phase2. create new ls ==============================
|
||||
ASSERT_EQ(0, SSH::create_ls(tenant_id, get_curr_observer().self_addr_));
|
||||
int64_t ls_count = 0;
|
||||
ASSERT_EQ(0, SSH::g_select_int64(tenant_id, "select count(ls_id) as val from __all_ls where ls_id!=1", ls_count));
|
||||
ASSERT_EQ(2, ls_count);
|
||||
|
||||
// ============================== Phase3. create new tables ==============================
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
TRANS_LOG(INFO, "create table qcc1 start");
|
||||
EXE_SQL("create table qcc1 (a int)");
|
||||
TRANS_LOG(INFO, "create_table qcc1 end");
|
||||
|
||||
TRANS_LOG(INFO, "create table qcc2 start");
|
||||
EXE_SQL("create table qcc2 (a int)");
|
||||
TRANS_LOG(INFO, "create_table qcc2 end");
|
||||
usleep(3 * 1000 * 1000);
|
||||
|
||||
ObLSID loc1, loc2;
|
||||
ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1));
|
||||
ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2));
|
||||
ASSERT_NE(loc1, loc2);
|
||||
int64_t table1, table2;
|
||||
int64_t object1, object2;
|
||||
int64_t tablet1, tablet2;
|
||||
int64_t ls1, ls2;
|
||||
get_tablet_info_with_table_name("qcc1", table1, object1, tablet1, ls1);
|
||||
get_tablet_info_with_table_name("qcc2", table2, object2, tablet2, ls2);
|
||||
fprintf(stdout, "qcc is created successfully, loc1: %ld, loc2: %ld, table1: %ld, table2: %ld, tablet1: %ld, tablet2: %ld, ls1: %ld, ls2: %ld\n",
|
||||
loc1.id(), loc2.id(), table1, table2, tablet1, tablet2, ls1, ls2);
|
||||
|
||||
EXE_SQL("create tablegroup tg1 sharding='NONE';");
|
||||
|
||||
// ============================== Phase4. wait minor freeze to remove retain ctx ==============================
|
||||
sqlclient::ObISQLConnection *sys_conn = nullptr;
|
||||
common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy();
|
||||
ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn));
|
||||
ASSERT_NE(nullptr, sys_conn);
|
||||
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant sys;");
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all_user;");
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all_meta;");
|
||||
sleep(5);
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
HEAP_VAR(ObMySQLProxy::MySQLResult, res_0)
|
||||
{
|
||||
common::sqlclient::ObMySQLResult *result = nullptr;
|
||||
sql.assign(
|
||||
"SELECT count(*) as cnt FROM oceanbase.__all_virtual_trans_stat where tenant_id = 1002 and ls_id = 1001;");
|
||||
int retry_times = 100;
|
||||
int64_t cnt = 0;
|
||||
|
||||
while (--retry_times >= 0) {
|
||||
res_0.reuse();
|
||||
ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_0));
|
||||
result = res_0.mysql_result();
|
||||
ASSERT_EQ(OB_SUCCESS, result->next());
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("cnt", cnt));
|
||||
if (0 == cnt) {
|
||||
break;
|
||||
} else {
|
||||
fprintf(stdout, "waitting for tx ctx table mini merge to clear retain ctx ... \n");
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(0, cnt);
|
||||
}
|
||||
|
||||
// ============================== Phase5. start the user txn ==============================
|
||||
sqlclient::ObISQLConnection *user_connection = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection));
|
||||
ASSERT_NE(nullptr, user_connection);
|
||||
|
||||
WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_idle_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_query_timeout = 10000000000");
|
||||
|
||||
TRANS_LOG(INFO, "start the txn");
|
||||
WRITE_SQL_BY_CONN(user_connection, "begin;");
|
||||
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection, "savepoint qcqc;");
|
||||
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc2 values(1);");
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc1 values(1);");
|
||||
|
||||
// remember the frozen point
|
||||
ASSERT_EQ(0, SSH::find_tx(user_connection, global_tx_id));
|
||||
global_ls_id = ls2;
|
||||
|
||||
|
||||
fprintf(stdout, "txn is created successfully, tx_id: %ld\n", global_tx_id.get_id());
|
||||
|
||||
|
||||
std::thread th([user_connection] () {
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
fprintf(stdout, "start to rollback to savepoint, tx_id: %ld\n", global_tx_id.get_id());
|
||||
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection, "rollback to savepoint qcqc;");
|
||||
|
||||
fprintf(stdout, "rollback to savepoint successfully, tx_id: %ld\n", global_tx_id.get_id());
|
||||
});
|
||||
|
||||
// ============================== Phase5. start the transfer ==============================
|
||||
EXE_SQL("alter tablegroup tg1 add qcc1,qcc2;");
|
||||
usleep(1 * 1000 * 1000);
|
||||
ASSERT_EQ(0, do_balance(tenant_id));
|
||||
int64_t begin_time = ObTimeUtility::current_time();
|
||||
while (true) {
|
||||
ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1));
|
||||
ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2));
|
||||
if (loc1 == loc2) {
|
||||
fprintf(stdout, "succeed wait for balancer\n");
|
||||
break;
|
||||
} else if (ObTimeUtility::current_time() - begin_time > 300 * 1000 * 1000) {
|
||||
fprintf(stdout, "ERROR: fail to wait for balancer\n");
|
||||
break;
|
||||
} else {
|
||||
usleep(1 * 1000 * 1000);
|
||||
fprintf(stdout, "wait for balancer\n");
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(loc1, loc2);
|
||||
|
||||
usleep(1000 * 1000);
|
||||
|
||||
global_ls_id.reset();
|
||||
|
||||
th.join();
|
||||
|
||||
ObISQLClient::ReadResult read_res;
|
||||
|
||||
READ_SQL_BY_CONN(user_connection, "select * from qcc2;");
|
||||
sqlclient::ObMySQLResult *result = read_res.get_result();
|
||||
int i = 0;
|
||||
while (OB_SUCC(result->next())) {
|
||||
i++;
|
||||
int64_t a;
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("a", a));
|
||||
ASSERT_EQ(a, 1);
|
||||
}
|
||||
|
||||
EXPECT_EQ(0, i);
|
||||
}
|
||||
|
||||
} // namespace unittest
|
||||
} // namespace oceanbase
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
using namespace oceanbase::unittest;
|
||||
|
||||
oceanbase::unittest::init_log_and_gtest(argc, argv);
|
||||
OB_LOGGER.set_log_level("info");
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -1024,6 +1024,9 @@ int RollbackMaskSet::merge_part(const share::ObLSID add_ls_id, const int64_t exe
|
||||
for (int64_t i = 0; i < rollback_parts_->count(); i++) {
|
||||
if (rollback_parts_->at(i).ls_id_ == add_ls_id) {
|
||||
is_exist = true;
|
||||
if (OB_FAIL(mask_set_.unmask(rollback_parts_->at(i)))) {
|
||||
TRANS_LOG(WARN, "unmask fail", KR(ret), K(add_ls_id));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -393,6 +393,10 @@ public:
|
||||
ObSpinLockGuard guard(lock_);
|
||||
return mask_set_.mask(part);
|
||||
}
|
||||
int unmask(const ObTxExecPart &part) {
|
||||
ObSpinLockGuard guard(lock_);
|
||||
return mask_set_.unmask(part);
|
||||
}
|
||||
bool is_all_mask() {
|
||||
ObSpinLockGuard guard(lock_);
|
||||
return mask_set_.is_all_mask();
|
||||
|
@ -308,6 +308,11 @@ void ObPartTransCtx::destroy()
|
||||
timeout_task_.destroy();
|
||||
trace_info_.reset();
|
||||
block_frozen_memtable_ = nullptr;
|
||||
|
||||
last_rollback_to_request_id_ = 0;
|
||||
last_rollback_to_timestamp_ = 0;
|
||||
last_transfer_in_timestamp_ = 0;
|
||||
|
||||
is_inited_ = false;
|
||||
}
|
||||
}
|
||||
@ -371,6 +376,9 @@ void ObPartTransCtx::default_init_()
|
||||
standby_part_collected_.reset();
|
||||
trace_log_.reset();
|
||||
transfer_deleted_ = false;
|
||||
last_rollback_to_request_id_ = 0;
|
||||
last_rollback_to_timestamp_ = 0;
|
||||
last_transfer_in_timestamp_ = 0;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::init_log_cbs_(const ObLSID &ls_id, const ObTransID &tx_id)
|
||||
@ -8237,6 +8245,7 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn,
|
||||
ObTxSEQ from_scn,
|
||||
const ObTxSEQ to_scn,
|
||||
const int64_t seq_base,
|
||||
const int64_t request_id,
|
||||
ObIArray<ObTxLSEpochPair> &downstream_parts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -8273,13 +8282,49 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn,
|
||||
} else if (to_scn.get_branch() == 0) {
|
||||
last_scn_ = to_scn;
|
||||
}
|
||||
// must add downstream parts when return success
|
||||
for (int64_t idx = 0; OB_SUCC(ret) && idx < exec_info_.intermediate_participants_.count(); idx++) {
|
||||
if (OB_FAIL(downstream_parts.push_back(ObTxLSEpochPair(exec_info_.intermediate_participants_.at(idx).ls_id_,
|
||||
exec_info_.intermediate_participants_.at(idx).transfer_epoch_)))) {
|
||||
TRANS_LOG(WARN, "push parts to array failed", K(ret), KPC(this));
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
bool need_downstream = true;
|
||||
int64_t current_rollback_to_timestamp = ObTimeUtility::current_time();
|
||||
if (request_id != 0 && // come from rollback to request
|
||||
request_id == last_rollback_to_request_id_) { // the same rollback to with the last one
|
||||
if (last_transfer_in_timestamp_ != 0 &&
|
||||
last_rollback_to_timestamp_ != 0 &&
|
||||
// encounter transfer between two same rollback to
|
||||
last_transfer_in_timestamp_ > last_rollback_to_timestamp_) {
|
||||
need_downstream = true;
|
||||
TRANS_LOG(INFO, "transfer between rollback to happened", K(ret), K(request_id),
|
||||
K(last_rollback_to_timestamp_), K(last_transfer_in_timestamp_),
|
||||
K(last_rollback_to_request_id_), KPC(this));
|
||||
} else {
|
||||
need_downstream = false;
|
||||
TRANS_LOG(INFO, "no transfer between rollback to happened", K(ret), K(request_id),
|
||||
K(last_rollback_to_timestamp_), K(last_transfer_in_timestamp_),
|
||||
K(last_rollback_to_request_id_), KPC(this));
|
||||
}
|
||||
} else {
|
||||
need_downstream = true;
|
||||
}
|
||||
|
||||
// must add downstream parts when return success
|
||||
for (int64_t idx = 0;
|
||||
OB_SUCC(ret) &&
|
||||
need_downstream &&
|
||||
idx < exec_info_.intermediate_participants_.count();
|
||||
idx++) {
|
||||
if (OB_FAIL(downstream_parts.push_back(
|
||||
ObTxLSEpochPair(exec_info_.intermediate_participants_.at(idx).ls_id_,
|
||||
exec_info_.intermediate_participants_.at(idx).transfer_epoch_)))) {
|
||||
TRANS_LOG(WARN, "push parts to array failed", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
last_rollback_to_request_id_ = request_id;
|
||||
last_rollback_to_timestamp_ = current_rollback_to_timestamp;
|
||||
}
|
||||
}
|
||||
|
||||
REC_TRANS_TRACE_EXT(tlog_, rollback_savepoint,
|
||||
OB_ID(ret), ret,
|
||||
OB_ID(from), from_scn.cast_to_int(),
|
||||
@ -9838,6 +9883,8 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
|
||||
exec_info_.is_transfer_blocking_ = false;
|
||||
if (OB_FAIL(transfer_op_log_cb_(move_tx_param.op_scn_, move_tx_param.op_type_))) {
|
||||
TRANS_LOG(WARN, "transfer op loc_cb failed", KR(ret), K(move_tx_param));
|
||||
} else {
|
||||
last_transfer_in_timestamp_ = ObTimeUtility::current_time();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -173,7 +173,10 @@ public:
|
||||
coord_prepare_info_arr_(OB_MALLOC_NORMAL_BLOCK_SIZE,
|
||||
ModulePageAllocator(reserve_allocator_, "PREPARE_INFO")),
|
||||
standby_part_collected_(), ask_state_info_interval_(100 * 1000), refresh_state_info_interval_(100 * 1000),
|
||||
transfer_deleted_(false)
|
||||
transfer_deleted_(false),
|
||||
last_rollback_to_request_id_(0),
|
||||
last_rollback_to_timestamp_(0),
|
||||
last_transfer_in_timestamp_(0)
|
||||
{ /*reset();*/ }
|
||||
~ObPartTransCtx() { destroy(); }
|
||||
void destroy();
|
||||
@ -900,6 +903,7 @@ public:
|
||||
ObTxSEQ from_seq,
|
||||
const ObTxSEQ to_seq,
|
||||
const int64_t seq_base,
|
||||
const int64_t request_id,
|
||||
ObIArray<ObTxLSEpochPair> &downstream_parts);
|
||||
bool is_xa_trans() const { return !exec_info_.xid_.empty(); }
|
||||
bool is_transfer_deleted() const { return transfer_deleted_; }
|
||||
@ -1097,6 +1101,11 @@ private:
|
||||
|
||||
// for transfer move tx ctx to clean for abort
|
||||
bool transfer_deleted_;
|
||||
|
||||
// TODO(handora.qc): remove after fix the transfer bwteen rollback_to bug
|
||||
int64_t last_rollback_to_request_id_;
|
||||
int64_t last_rollback_to_timestamp_;
|
||||
int64_t last_transfer_in_timestamp_;
|
||||
// ========================================================
|
||||
};
|
||||
|
||||
|
@ -2106,6 +2106,7 @@ int ObTransService::handle_sp_rollback_request(ObTxRollbackSPMsg &msg,
|
||||
msg.tx_ptr_,
|
||||
msg.for_transfer(),
|
||||
msg.specified_from_scn_,
|
||||
msg.request_id_,
|
||||
result.downstream_parts_);
|
||||
if (msg.use_async_resp()) {
|
||||
ObTxRollbackSPRespMsg resp;
|
||||
|
@ -360,6 +360,7 @@ int ls_rollback_to_savepoint_(const ObTransID &tx_id,
|
||||
const ObTxDesc *tx,
|
||||
const bool for_transfer,
|
||||
const ObTxSEQ from_scn,
|
||||
const int64_t request_id,
|
||||
ObIArray<ObTxLSEpochPair> &downstream_parts,
|
||||
int64_t expire_ts = -1);
|
||||
int sync_rollback_savepoint__(ObTxDesc &tx,
|
||||
@ -388,6 +389,7 @@ int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
|
||||
const int64_t tx_seq_base,
|
||||
const int64_t expire_ts,
|
||||
const ObTxSEQ specified_from_scn,
|
||||
const int64_t request_id,
|
||||
ObIArray<ObTxLSEpochPair> &downstream_parts);
|
||||
void tx_post_terminate_(ObTxDesc &tx);
|
||||
int start_epoch_(ObTxDesc &tx);
|
||||
|
@ -1114,6 +1114,7 @@ int ObTransService::rollback_to_local_implicit_savepoint_(ObTxDesc &tx,
|
||||
tx.seq_base_,
|
||||
expire_ts,
|
||||
from_scn,
|
||||
0, /*request_id, only used for request*/
|
||||
downstream_parts))) {
|
||||
TRANS_LOG(WARN, "LS rollback savepoint fail", K(ret), K(savepoint), K(tx));
|
||||
} else {
|
||||
@ -1290,13 +1291,19 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
|
||||
const int64_t tx_seq_base,
|
||||
const int64_t expire_ts,
|
||||
const ObTxSEQ specified_from_scn,
|
||||
const int64_t request_id,
|
||||
ObIArray<ObTxLSEpochPair> &downstream_parts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t retry_cnt = 0;
|
||||
bool blockable = expire_ts > 0;
|
||||
do {
|
||||
ret = part_ctx->rollback_to_savepoint(op_sn, specified_from_scn, savepoint, tx_seq_base, downstream_parts);
|
||||
ret = part_ctx->rollback_to_savepoint(op_sn,
|
||||
specified_from_scn,
|
||||
savepoint,
|
||||
tx_seq_base,
|
||||
request_id,
|
||||
downstream_parts);
|
||||
if ((OB_NEED_RETRY == ret || OB_EAGAIN == ret) && blockable) {
|
||||
if (ObTimeUtility::current_time() >= expire_ts) {
|
||||
ret = OB_TIMEOUT;
|
||||
@ -1523,6 +1530,7 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx,
|
||||
&tx,
|
||||
false,/*for transfer*/
|
||||
ObTxSEQ::INVL(),
|
||||
0, /*request_id, only for rollback_to request*/
|
||||
downstream_parts,
|
||||
-1/*non-blocking*/))) {
|
||||
if (common_retryable_error_(ret)) {
|
||||
@ -1593,6 +1601,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id,
|
||||
const ObTxDesc *tx,
|
||||
const bool for_transfer,
|
||||
const ObTxSEQ from_scn,
|
||||
const int64_t request_id,
|
||||
ObIArray<ObTxLSEpochPair> &downstream_parts,
|
||||
int64_t expire_ts)
|
||||
{
|
||||
@ -1656,6 +1665,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id,
|
||||
tx_seq_base,
|
||||
expire_ts,
|
||||
from_scn,
|
||||
request_id,
|
||||
downstream_parts))) {
|
||||
TRANS_LOG(WARN, "LS rollback to savepoint fail", K(ret), K(tx_id), K(ls), K(op_sn), K(savepoint), KPC(ctx));
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user