[BUG] for txn with aborted txn transfer in, we need set as aborted
This commit is contained in:
@ -107,6 +107,7 @@ ob_unittest_observer(test_tablet_autoinc_mgr test_tablet_autoinc_mgr.cpp)
|
||||
ob_unittest_observer(test_tenant_snapshot_service test_tenant_snapshot_service.cpp)
|
||||
ob_unittest_observer(test_callbacks_with_reverse_order test_callbacks_with_reverse_order.cpp)
|
||||
ob_unittest_observer(test_transfer_tx_data test_transfer_with_smaller_tx_data.cpp)
|
||||
ob_unittest_observer(test_transfer_in_after_abort test_transfer_in_after_abort.cpp)
|
||||
# TODO(muwei.ym): open later
|
||||
ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp)
|
||||
ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp)
|
||||
|
399
mittest/simple_server/test_transfer_in_after_abort.cpp
Normal file
399
mittest/simple_server/test_transfer_in_after_abort.cpp
Normal file
@ -0,0 +1,399 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#include <iostream>
|
||||
#define protected public
|
||||
#define private public
|
||||
|
||||
#include "env/ob_simple_cluster_test_base.h"
|
||||
#include "rootserver/ob_tenant_balance_service.h"
|
||||
#include "share/balance/ob_balance_job_table_operator.h"
|
||||
#include "mittest/simple_server/env/ob_simple_server_helper.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "storage/tx/ob_tx_loop_worker.h"
|
||||
#include "storage/tx/ob_trans_part_ctx.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace unittest
|
||||
{
|
||||
|
||||
#define EXE_SQL(sql_str) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
||||
|
||||
#define EXE_SQL_FMT(...) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows));
|
||||
|
||||
#define WRITE_SQL_BY_CONN(conn, sql_str) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
||||
|
||||
#define WRITE_SQL_FMT_BY_CONN(conn, ...) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
||||
|
||||
#define READ_SQL_BY_CONN(conn, sql_str) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res));
|
||||
|
||||
|
||||
class ObTransferInAfterAbort : public ObSimpleClusterTestBase
|
||||
{
|
||||
public:
|
||||
ObTransferInAfterAbort(): ObSimpleClusterTestBase("test_transfer_in_after_abort", "200G", "40G") {}
|
||||
|
||||
void prepare_tenant_env()
|
||||
{
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
int64_t affected_rows = 0;
|
||||
ObSqlString sql;
|
||||
sqlclient::ObISQLConnection *connection = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection));
|
||||
ASSERT_NE(nullptr, connection);
|
||||
WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_idle_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set partition_balance_schedule_interval = '10s';");
|
||||
sleep(5);
|
||||
}
|
||||
|
||||
void set_fast_commit_count(const int64_t count)
|
||||
{
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy();
|
||||
int64_t affected_rows = 0;
|
||||
ObSqlString sql;
|
||||
EXE_SQL_FMT("alter system set _fast_commit_callback_count = %ld;", count);
|
||||
}
|
||||
|
||||
void create_test_tenant(uint64_t &tenant_id)
|
||||
{
|
||||
TRANS_LOG(INFO, "create_tenant start");
|
||||
ASSERT_EQ(OB_SUCCESS, create_tenant("tt1", "20G", "100G"));
|
||||
fprintf(stdout, "finish sleep\n");
|
||||
ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id));
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
TRANS_LOG(INFO, "create_tenant end", K(tenant_id));
|
||||
}
|
||||
|
||||
int wait_balance_clean(uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
while (OB_SUCC(ret)) {
|
||||
bool is_clean = false;
|
||||
MTL_SWITCH(tenant_id) {
|
||||
ObBalanceJob job;
|
||||
int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP;
|
||||
if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(tenant_id,
|
||||
false,
|
||||
*GCTX.sql_proxy_,
|
||||
job,
|
||||
start_time,
|
||||
finish_time))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
is_clean = true;
|
||||
}
|
||||
} else {
|
||||
ob_usleep(200 * 1000);
|
||||
}
|
||||
}
|
||||
if (is_clean) {
|
||||
int64_t transfer_task_count = 0;
|
||||
if (OB_FAIL(SSH::g_select_int64(tenant_id, "select count(*) as val from __all_transfer_task", transfer_task_count))) {
|
||||
} else if (transfer_task_count == 0) {
|
||||
break;
|
||||
} else {
|
||||
ob_usleep(200 * 1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void get_tablet_info_with_table_name(const char *name,
|
||||
int64_t &table_id,
|
||||
int64_t &object_id,
|
||||
int64_t &tablet_id,
|
||||
int64_t &ls_id)
|
||||
{
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("SELECT table_id, object_id, tablet_id, ls_id FROM oceanbase.DBA_OB_TABLE_LOCATIONS WHERE TABLE_NAME= '%s';", name));
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr()));
|
||||
sqlclient::ObMySQLResult *result = res.get_result();
|
||||
ASSERT_NE(nullptr, result);
|
||||
ASSERT_EQ(OB_SUCCESS, result->next());
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("table_id", table_id));
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("object_id", object_id));
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", tablet_id));
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("ls_id", ls_id));
|
||||
}
|
||||
}
|
||||
|
||||
int do_balance_inner_(uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
static std::mutex mutex;
|
||||
mutex.lock();
|
||||
MTL_SWITCH(tenant_id) {
|
||||
TRANS_LOG(INFO, "worker to do partition_balance");
|
||||
auto b_svr = MTL(rootserver::ObTenantBalanceService*);
|
||||
b_svr->reset();
|
||||
int64_t job_cnt = 0;
|
||||
int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP;
|
||||
ObBalanceJob job;
|
||||
if (OB_FAIL(b_svr->gather_stat_())) {
|
||||
TRANS_LOG(WARN, "failed to gather stat", KR(ret));
|
||||
} else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
|
||||
tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
//NO JOB, need check current ls status
|
||||
ret = OB_SUCCESS;
|
||||
job_cnt = 0;
|
||||
} else {
|
||||
TRANS_LOG(WARN, "failed to get balance job", KR(ret), K(tenant_id));
|
||||
}
|
||||
} else if (OB_FAIL(b_svr->try_finish_current_job_(job, job_cnt))) {
|
||||
TRANS_LOG(WARN, "failed to finish current job", KR(ret), K(job));
|
||||
}
|
||||
if (OB_SUCC(ret) && job_cnt == 0 && OB_FAIL(b_svr->partition_balance_(true))) {
|
||||
TRANS_LOG(WARN, "failed to do partition balance", KR(ret));
|
||||
}
|
||||
}
|
||||
mutex.unlock();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int do_balance(uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(do_balance_inner_(tenant_id))) {
|
||||
} else if (OB_FAIL(do_balance_inner_(tenant_id))) {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObLS *get_ls(const int64_t tenant_id, const ObLSID ls_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLS *ls = nullptr;
|
||||
MTL_SWITCH(tenant_id)
|
||||
{
|
||||
ObLSHandle ls_handle;
|
||||
ObLSService *ls_svr = MTL(ObLSService *);
|
||||
OB_ASSERT(OB_SUCCESS == ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
|
||||
OB_ASSERT(nullptr != (ls = ls_handle.get_ls()));
|
||||
}
|
||||
return ls;
|
||||
}
|
||||
|
||||
void get_memtable(const ObTabletID tablet_id,
|
||||
const ObLSID ls_id,
|
||||
ObTableHandleV2 &handle)
|
||||
{
|
||||
ObLS *ls = get_ls(1002, ls_id);
|
||||
ObTabletHandle tablet_handle;
|
||||
ObTablet *tablet = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, ls->get_tablet_svr()->get_tablet(tablet_id, tablet_handle));
|
||||
tablet = tablet_handle.get_obj();
|
||||
ASSERT_EQ(OB_SUCCESS, tablet->get_active_memtable(handle));
|
||||
}
|
||||
|
||||
void check_memtable_cleanout(ObTableHandleV2 &handle,
|
||||
const bool memtable_is_all_delay_cleanout)
|
||||
{
|
||||
memtable::ObIMemtable *imemtable;
|
||||
handle.get_memtable(imemtable);
|
||||
memtable::ObMemtable *memtable = dynamic_cast<memtable::ObMemtable *>(imemtable);
|
||||
|
||||
bool is_all_cleanout = true;
|
||||
bool is_all_delay_cleanout = true;
|
||||
int64_t count = 0;
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, memtable->check_cleanout(is_all_cleanout,
|
||||
is_all_delay_cleanout,
|
||||
count));
|
||||
|
||||
ASSERT_EQ(memtable_is_all_delay_cleanout, is_all_delay_cleanout);
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(ObTransferInAfterAbort, transfer_in_after_abort)
|
||||
{
|
||||
ObSqlString sql;
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
// ============================== Phase1. create tenant ==============================
|
||||
TRANS_LOG(INFO, "create tenant start");
|
||||
uint64_t tenant_id = 0;
|
||||
create_test_tenant(tenant_id);
|
||||
TRANS_LOG(INFO, "create tenant end");
|
||||
|
||||
share::ObTenantSwitchGuard tenant_guard;
|
||||
ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id));
|
||||
|
||||
prepare_tenant_env();
|
||||
|
||||
set_fast_commit_count(0);
|
||||
|
||||
// ============================== Phase2. create new ls ==============================
|
||||
ASSERT_EQ(0, SSH::create_ls(tenant_id, get_curr_observer().self_addr_));
|
||||
int64_t ls_count = 0;
|
||||
ASSERT_EQ(0, SSH::g_select_int64(tenant_id, "select count(ls_id) as val from __all_ls where ls_id!=1", ls_count));
|
||||
ASSERT_EQ(2, ls_count);
|
||||
|
||||
// ============================== Phase3. create new tables ==============================
|
||||
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
|
||||
TRANS_LOG(INFO, "create table qcc2 start");
|
||||
EXE_SQL("create table qcc1 (a int primary key)");
|
||||
TRANS_LOG(INFO, "create_table qcc2 end");
|
||||
|
||||
TRANS_LOG(INFO, "create table qcc2 start");
|
||||
EXE_SQL("create table qcc2 (a int primary key)");
|
||||
TRANS_LOG(INFO, "create_table qcc2 end");
|
||||
usleep(3 * 1000 * 1000);
|
||||
|
||||
ObLSID loc1, loc2;
|
||||
ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1));
|
||||
ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2));
|
||||
ASSERT_NE(loc1, loc2);
|
||||
int64_t table1, table2;
|
||||
int64_t object1, object2;
|
||||
int64_t tablet1, tablet2;
|
||||
int64_t ls1, ls2;
|
||||
get_tablet_info_with_table_name("qcc1", table1, object1, tablet1, ls1);
|
||||
get_tablet_info_with_table_name("qcc2", table2, object2, tablet2, ls2);
|
||||
fprintf(stdout, "qcc is created successfully, loc1: %ld, loc2: %ld, table1: %ld, table2: %ld, tablet1: %ld, tablet2: %ld, ls1: %ld, ls2: %ld\n",
|
||||
loc1.id(), loc2.id(), table1, table2, tablet1, tablet2, ls1, ls2);
|
||||
|
||||
EXE_SQL("create tablegroup tg1 sharding='NONE';");
|
||||
|
||||
// ============================== Phase4. wait minor freeze to remove retain ctx ==============================
|
||||
sqlclient::ObISQLConnection *sys_conn = nullptr;
|
||||
common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy();
|
||||
ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn));
|
||||
ASSERT_NE(nullptr, sys_conn);
|
||||
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant sys;");
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all_user;");
|
||||
WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all_meta;");
|
||||
sleep(5);
|
||||
|
||||
int ret = OB_SUCCESS;
|
||||
HEAP_VAR(ObMySQLProxy::MySQLResult, res_0)
|
||||
{
|
||||
common::sqlclient::ObMySQLResult *result = nullptr;
|
||||
sql.assign(
|
||||
"SELECT count(*) as cnt FROM oceanbase.__all_virtual_trans_stat where tenant_id = 1002 and ls_id = 1001;");
|
||||
int retry_times = 100;
|
||||
int64_t cnt = 0;
|
||||
|
||||
while (--retry_times >= 0) {
|
||||
res_0.reuse();
|
||||
ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_0));
|
||||
result = res_0.mysql_result();
|
||||
ASSERT_EQ(OB_SUCCESS, result->next());
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, result->get_int("cnt", cnt));
|
||||
if (0 == cnt) {
|
||||
break;
|
||||
} else {
|
||||
fprintf(stdout, "waitting for tx ctx table mini merge to clear retain ctx ... \n");
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(0, cnt);
|
||||
}
|
||||
|
||||
// ============================== Phase5. start the user txn ==============================
|
||||
sqlclient::ObISQLConnection *user_connection = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection));
|
||||
ASSERT_NE(nullptr, user_connection);
|
||||
|
||||
WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_idle_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_query_timeout = 10000000000");
|
||||
|
||||
TRANS_LOG(INFO, "start the txn");
|
||||
WRITE_SQL_BY_CONN(user_connection, "begin;");
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc2 values(1);");
|
||||
// Step1: let the first ls logging
|
||||
ASSERT_EQ(0, SSH::submit_redo(tenant_id, loc2));
|
||||
// Step2: sleep 5 seconds
|
||||
usleep(5 * 1000 * 1000);
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc1 values(1);");
|
||||
// Step3: let the second ls logging
|
||||
ASSERT_EQ(0, SSH::submit_redo(tenant_id, loc1));
|
||||
|
||||
ObTransID tx_id;
|
||||
ASSERT_EQ(0, SSH::find_tx(user_connection, tx_id));
|
||||
ASSERT_EQ(0, SSH::abort_tx(tenant_id, loc1, tx_id));
|
||||
|
||||
// ============================== Phase6. start the transfer ==============================
|
||||
EXE_SQL("alter tablegroup tg1 add qcc1,qcc2;");
|
||||
usleep(1 * 1000 * 1000);
|
||||
ASSERT_EQ(0, do_balance(tenant_id));
|
||||
int64_t begin_time = ObTimeUtility::current_time();
|
||||
while (true) {
|
||||
ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1));
|
||||
ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2));
|
||||
if (loc1 == loc2) {
|
||||
fprintf(stdout, "succeed wait for balancer\n");
|
||||
break;
|
||||
} else if (ObTimeUtility::current_time() - begin_time > 300 * 1000 * 1000) {
|
||||
fprintf(stdout, "ERROR: fail to wait for balancer\n");
|
||||
break;
|
||||
} else {
|
||||
usleep(1 * 1000 * 1000);
|
||||
fprintf(stdout, "wait for balancer\n");
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(loc1, loc2);
|
||||
|
||||
// ============================== Phase7. start the user txn2 ==============================
|
||||
ObTableHandleV2 handle;
|
||||
ObTabletID tablet_id1(tablet1);
|
||||
get_memtable(tablet_id1, loc1, handle);
|
||||
check_memtable_cleanout(handle, true/*all_delay_cleanout*/);
|
||||
|
||||
sqlclient::ObISQLConnection *user_connection2 = nullptr;
|
||||
ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection2));
|
||||
ASSERT_NE(nullptr, user_connection2);
|
||||
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection2, "begin;");
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection2, "insert into qcc1 values(1);");
|
||||
WRITE_SQL_FMT_BY_CONN(user_connection2, "commit");
|
||||
}
|
||||
|
||||
} // namespace unittest
|
||||
} // namespace oceanbase
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
using namespace oceanbase::unittest;
|
||||
|
||||
oceanbase::unittest::init_log_and_gtest(argc, argv);
|
||||
OB_LOGGER.set_log_level("info");
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -196,7 +196,7 @@ int ObPartTransCtx::init_for_transfer_move(const ObTxCtxMoveArg &arg)
|
||||
exec_info_.prepare_version_ = arg.prepare_version_;
|
||||
ctx_tx_data_.set_commit_version(arg.commit_version_);
|
||||
}
|
||||
exec_info_.state_ = arg.tx_state_;
|
||||
set_durable_state_(arg.tx_state_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -9224,6 +9224,52 @@ int ObPartTransCtx::collect_tx_ctx(const ObLSID dest_ls_id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
// For the transfer, the src txn may transfer into the dst txn that has already
|
||||
// aborted, and the txn may already release the lock which causes two alive txn
|
||||
// on the same row
|
||||
int ObPartTransCtx::check_is_aborted_in_tx_data_(const ObTransID tx_id,
|
||||
bool &is_aborted)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxTable *tx_table = nullptr;
|
||||
ObTxTableGuard guard;
|
||||
int64_t state;
|
||||
share::SCN trans_version;
|
||||
share::SCN recycled_scn;
|
||||
ctx_tx_data_.get_tx_table(tx_table);
|
||||
|
||||
if (OB_FAIL(tx_table->get_tx_table_guard(guard))) {
|
||||
TRANS_LOG(WARN, "fail to get tx table guard", K(ret));
|
||||
} else if (!guard.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "tx table is null", K(ret));
|
||||
} else if (OB_FAIL(guard.try_get_tx_state(tx_id,
|
||||
state,
|
||||
trans_version,
|
||||
recycled_scn))) {
|
||||
if (OB_TRANS_CTX_NOT_EXIST == ret) {
|
||||
is_aborted = false;
|
||||
ret = OB_SUCCESS;
|
||||
} else if (OB_REPLICA_NOT_READABLE == ret) {
|
||||
is_aborted = false;
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
TRANS_LOG(WARN, "get tx state from tx data failed", K(ret), KPC(this));
|
||||
}
|
||||
} else if (ObTxData::ABORT == state) {
|
||||
is_aborted = true;
|
||||
TRANS_LOG(INFO, "check is aborted in tx data", K(tx_id), K(state), KPC(this));
|
||||
} else if (ObTxData::COMMIT == state) {
|
||||
is_aborted = false;
|
||||
TRANS_LOG(ERROR, "check is committed in tx data", K(tx_id), K(state), KPC(this));
|
||||
} else {
|
||||
is_aborted = false;
|
||||
TRANS_LOG(INFO, "check is not aborted in tx data", K(tx_id), K(state), KPC(this));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// NB: This function can report a retryable error because the outer while loop
|
||||
// will ignore the error and continuously retry until it succeeds within the
|
||||
// callback function.
|
||||
@ -9297,8 +9343,12 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
|
||||
}
|
||||
}
|
||||
} else if (NotifyType::ON_COMMIT == move_tx_param.op_type_) {
|
||||
bool is_aborted = false;
|
||||
if (exec_info_.max_applying_log_ts_.is_valid() && exec_info_.max_applying_log_ts_ >= move_tx_param.op_scn_) {
|
||||
// do nothing
|
||||
} else if (epoch_ == arg.epoch_ && // created by myself
|
||||
OB_FAIL(check_is_aborted_in_tx_data_(trans_id_, is_aborted))) {
|
||||
TRANS_LOG(WARN, "check is aborted in tx data failed", K(ret), KPC(this));
|
||||
} else {
|
||||
if (is_new_created && is_follower_()) {
|
||||
exec_info_.is_empty_ctx_created_by_transfer_ = true;
|
||||
@ -9333,6 +9383,15 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
|
||||
ctx_tx_data_.set_start_log_ts(arg.tx_start_scn_);
|
||||
}
|
||||
}
|
||||
|
||||
if (is_aborted) {
|
||||
// If the dest txn already aborted before transfer, the dest txn may
|
||||
// already release its lock and new txn may write new data onto it which
|
||||
// will cause two alive tx node on one row at the same time if transfer
|
||||
// into an alive txn. So we need set these transferred txn as aborted
|
||||
ctx_tx_data_.set_state(ObTxData::ABORT);
|
||||
}
|
||||
|
||||
if (!arg.happened_before_) {
|
||||
bool epoch_exist = false;
|
||||
for (int64_t idx = 0; idx < exec_info_.transfer_parts_.count(); idx++) {
|
||||
|
@ -684,6 +684,10 @@ private:
|
||||
int refresh_rec_log_ts_();
|
||||
int get_tx_ctx_table_info_(ObTxCtxTableInfo &info);
|
||||
const share::SCN get_rec_log_ts_() const;
|
||||
|
||||
int check_is_aborted_in_tx_data_(const ObTransID tx_id,
|
||||
bool &is_aborted);
|
||||
|
||||
// ========================================================
|
||||
|
||||
// ======================== C2PC MSG HANDLER BEGIN ========================
|
||||
|
@ -471,7 +471,10 @@ int ObTxDataTable::check_with_tx_data(const ObTransID tx_id,
|
||||
K(tablet_id_));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(tx_data_guard.tx_data()) && (ObTxData::RUNNING == tx_data_guard.tx_data()->state_)) {
|
||||
if (OB_SUCC(ret) &&
|
||||
OB_NOT_NULL(tx_data_guard.tx_data()) &&
|
||||
!fn.may_exist_undecided_state_in_tx_data_table() &&
|
||||
(ObTxData::RUNNING == tx_data_guard.tx_data()->state_)) {
|
||||
ret = OB_EAGAIN;
|
||||
STORAGE_LOG(WARN, "read a running state tx data from tx data table, need retry", KR(ret), K(tx_data_guard));
|
||||
}
|
||||
@ -517,7 +520,8 @@ int ObTxDataTable::check_tx_data_with_cache_once_(const transaction::ObTransID t
|
||||
}
|
||||
} else {
|
||||
if (find) {
|
||||
if (ObTxData::RUNNING == tx_data_guard.tx_data()->state_) {
|
||||
if (ObTxData::RUNNING == tx_data_guard.tx_data()->state_ &&
|
||||
!fn.may_exist_undecided_state_in_tx_data_table()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR,
|
||||
"read a running state tx data from tx data table",
|
||||
|
@ -883,6 +883,7 @@ int ObTxTable::try_get_tx_state(ObReadTxDataArg &read_tx_data_arg,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
GetTxStateWithSCNFunctor fn(SCN::max_scn(), state, trans_version);
|
||||
fn.set_may_exist_undecided_state_in_tx_data_table();
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("tx table is not init.", KR(ret), K(read_tx_data_arg));
|
||||
|
@ -446,6 +446,16 @@ void ObITxDataCheckFunctor::resolve_tx_data_check_data_(const int32_t state,
|
||||
tx_data_check_data_.is_rollback_ = is_rollback;
|
||||
}
|
||||
|
||||
bool ObITxDataCheckFunctor::may_exist_undecided_state_in_tx_data_table() const
|
||||
{
|
||||
return may_exist_undecided_state_in_tx_data_table_;
|
||||
}
|
||||
|
||||
void ObITxDataCheckFunctor::set_may_exist_undecided_state_in_tx_data_table()
|
||||
{
|
||||
may_exist_undecided_state_in_tx_data_table_ = true;
|
||||
}
|
||||
|
||||
} // end namespace transaction
|
||||
} // end namespace oceanbase
|
||||
|
||||
|
@ -224,7 +224,8 @@ class ObITxDataCheckFunctor
|
||||
{
|
||||
public:
|
||||
ObITxDataCheckFunctor()
|
||||
: tx_data_check_data_() {}
|
||||
: tx_data_check_data_(),
|
||||
may_exist_undecided_state_in_tx_data_table_(false) {}
|
||||
virtual int operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx = nullptr) = 0;
|
||||
virtual bool recheck() { return false; }
|
||||
virtual bool is_decided() const;
|
||||
@ -233,6 +234,15 @@ public:
|
||||
const share::SCN commit_version,
|
||||
const share::SCN end_scn,
|
||||
const bool is_rollback);
|
||||
// In the tx_data_table, defensive error reporting strategies are implemented,
|
||||
// which means that potential errors are proactively handled and reported
|
||||
// before they can escalate. Concurrently, there might be cases where
|
||||
// information is retrieved directly from the tx data table without undergoing
|
||||
// these checks. To address this scenario, we utilize configuration settings
|
||||
// to govern the behavior of error reporting, determining when and how such
|
||||
// errors should be reported or managed.
|
||||
bool may_exist_undecided_state_in_tx_data_table() const;
|
||||
void set_may_exist_undecided_state_in_tx_data_table();
|
||||
|
||||
VIRTUAL_TO_STRING_KV(K_(tx_data_check_data));
|
||||
public:
|
||||
@ -276,6 +286,14 @@ public:
|
||||
// determined from the txn state, commit version, or rollback sequence on
|
||||
// the destination side.
|
||||
ObTxDataCheckData tx_data_check_data_;
|
||||
// In the tx_data_table, defensive error reporting strategies are implemented,
|
||||
// which means that potential errors are proactively handled and reported
|
||||
// before they can escalate. Concurrently, there might be cases where
|
||||
// information is retrieved directly from the tx data table without undergoing
|
||||
// these checks. To address this scenario, we utilize configuration settings
|
||||
// to govern the behavior of error reporting, determining when and how such
|
||||
// errors should be reported or managed.
|
||||
bool may_exist_undecided_state_in_tx_data_table_;
|
||||
};
|
||||
|
||||
class ObCommitVersionsArray
|
||||
|
Reference in New Issue
Block a user