diff --git a/deps/oblib/src/lib/container/ob_mask_set2.h b/deps/oblib/src/lib/container/ob_mask_set2.h index c120bb8ab..518bfcb10 100644 --- a/deps/oblib/src/lib/container/ob_mask_set2.h +++ b/deps/oblib/src/lib/container/ob_mask_set2.h @@ -70,12 +70,35 @@ public: ret = OB_MASK_SET_NO_NODE; } } - } - return ret; - } - int mask(const T &key, bool &is_new_mask) - { - int ret = OB_SUCCESS; + } + return ret; + } + int unmask(const T &key) + { + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + } else { + bool hit = false; + for (int64_t i = 0 ; OB_SUCCESS == ret && i < array_->count(); i++) { + if (array_->at(i) == key) { + hit = true; + if (OB_FAIL(bitset_.del_member(i))) { + } + break; + } + } + if (OB_SUCCESS == ret) { + if (!hit) { + ret = OB_MASK_SET_NO_NODE; + } + } + } + return ret; + } + int mask(const T &key, bool &is_new_mask) + { + int ret = OB_SUCCESS; bool tmp_new_mask = false; if (!is_inited_) { ret = OB_NOT_INIT; diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index f24ba4780..93b7cb193 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -111,6 +111,7 @@ ob_unittest_observer(test_callbacks_with_reverse_order test_callbacks_with_rever ob_unittest_observer(test_transfer_tx_data test_transfer_with_smaller_tx_data.cpp) ob_unittest_observer(test_transfer_in_after_abort test_transfer_in_after_abort.cpp) ob_unittest_observer(test_transfer_commit_action test_transfer_with_commit_action.cpp) +ob_unittest_observer(test_transfer_rollback_to test_transfer_between_rollback_to.cpp) ob_unittest_observer(test_memtable_new_safe_to_destroy test_memtable_new_safe_to_destroy.cpp) ob_unittest_observer(test_tablet_to_ls_cache test_tablet_to_ls_cache.cpp) # TODO(muwei.ym): open later @@ -123,4 +124,4 @@ ob_ha_unittest_observer(test_transfer_doing_stage_restart_with_mds_flush storage ob_ha_unittest_observer(test_transfer_complete_restart_with_mds_flush storage_ha/test_transfer_complete_restart_with_mds_flush.cpp) ob_ha_unittest_observer(test_transfer_with_empty_shell storage_ha/test_transfer_with_empty_shell.cpp) ob_ha_unittest_observer(test_mds_transaction test_mds_transaction.cpp) -errsim_ha_unittest_observer(errsim_test_transfer_handler errsim/storage_ha/errsim_test_transfer_handler.cpp) \ No newline at end of file +errsim_ha_unittest_observer(errsim_test_transfer_handler errsim/storage_ha/errsim_test_transfer_handler.cpp) diff --git a/mittest/simple_server/test_transfer_between_rollback_to.cpp b/mittest/simple_server/test_transfer_between_rollback_to.cpp new file mode 100644 index 000000000..272742837 --- /dev/null +++ b/mittest/simple_server/test_transfer_between_rollback_to.cpp @@ -0,0 +1,446 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#define protected public +#define private public + +#include "env/ob_simple_cluster_test_base.h" +#include "rootserver/ob_tenant_balance_service.h" +#include "share/balance/ob_balance_job_table_operator.h" +#include "mittest/env/ob_simple_server_helper.h" +#include "storage/tx_storage/ob_ls_service.h" +#include "storage/tx/ob_tx_loop_worker.h" +#include "storage/tx/ob_trans_part_ctx.h" + +namespace oceanbase +{ + +static ObTransID global_tx_id; +static share::ObLSID global_ls_id; + +namespace storage +{ +int ObTransferHandler::wait_src_ls_advance_weak_read_ts_( + const share::ObTransferTaskInfo &task_info, + ObTimeoutCtx &timeout_ctx) +{ + UNUSED(task_info); + UNUSED(timeout_ctx); + return OB_SUCCESS; +} +} + +namespace transaction +{ + +int ObTransService::batch_post_rollback_savepoint_msg_(ObTxDesc &tx, + ObTxRollbackSPMsg &msg, + const ObTxRollbackParts &list, + int &post_succ_num) +{ + int ret = OB_SUCCESS; + int last_ret = OB_SUCCESS; + post_succ_num = 0; + const ObTxDesc *msg_tx_ptr = msg.tx_ptr_; + ARRAY_FOREACH_NORET(list, idx) { + const ObTxExecPart &p = list.at(idx); + msg.receiver_ = p.ls_id_; + msg.epoch_ = p.exec_epoch_; + if (msg.epoch_ > 0) { + msg.tx_ptr_ = NULL; + } + if (p.exec_epoch_ <= 0 && p.transfer_epoch_ > 0) { + msg.set_for_transfer(); + } + + if (global_ls_id.is_valid() && global_ls_id == msg.receiver_) { + fprintf(stdout, "qcc encounter failure %ld, %s\n", global_ls_id.id(), to_cstring(tx)); + } + + if ((!global_ls_id.is_valid() || global_ls_id != msg.receiver_) + && OB_FAIL(rpc_->post_msg(msg.receiver_, msg))) { + if (OB_LS_IS_DELETED == ret) { + ObSpinLockGuard lock(tx.lock_); + ObAddr fake_addr; + on_sp_rollback_succ_(p, tx, ObTxPart::EPOCH_DEAD, fake_addr); + ret = OB_SUCCESS; + } else { + TRANS_LOG(WARN, "post msg falied", K(ret), K(msg), K(p)); + last_ret = ret; + } + } else { ++post_succ_num; } + msg.tx_ptr_ = msg_tx_ptr; + } + return last_ret; +} + + + +} + +namespace unittest +{ + +#define EXE_SQL(sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define EXE_SQL_FMT(...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define WRITE_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define WRITE_SQL_FMT_BY_CONN(conn, ...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define READ_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res)); + + +class ObTransferBetweenRollbackTo : public ObSimpleClusterTestBase +{ +public: + ObTransferBetweenRollbackTo(): ObSimpleClusterTestBase("test_transfer_between_rollback_to", "200G", "40G") {} + + void prepare_tenant_env() + { + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + int64_t affected_rows = 0; + ObSqlString sql; + sqlclient::ObISQLConnection *connection = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection)); + ASSERT_NE(nullptr, connection); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_idle_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;"); + WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;"); + WRITE_SQL_BY_CONN(connection, "alter system set partition_balance_schedule_interval = '10s';"); + sleep(5); + } + + void create_test_tenant(uint64_t &tenant_id) + { + TRANS_LOG(INFO, "create_tenant start"); + ASSERT_EQ(OB_SUCCESS, create_tenant("tt1", "20G", "100G")); + fprintf(stdout, "finish sleep\n"); + ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id)); + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + TRANS_LOG(INFO, "create_tenant end", K(tenant_id)); + } + + int wait_balance_clean(uint64_t tenant_id) + { + int ret = OB_SUCCESS; + while (OB_SUCC(ret)) { + bool is_clean = false; + MTL_SWITCH(tenant_id) { + ObBalanceJob job; + int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP; + if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(tenant_id, + false, + *GCTX.sql_proxy_, + job, + start_time, + finish_time))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + is_clean = true; + } + } else { + ob_usleep(200 * 1000); + } + } + if (is_clean) { + int64_t transfer_task_count = 0; + if (OB_FAIL(SSH::g_select_int64(tenant_id, "select count(*) as val from __all_transfer_task", transfer_task_count))) { + } else if (transfer_task_count == 0) { + break; + } else { + ob_usleep(200 * 1000); + } + } + } + return ret; + } + + void get_tablet_info_with_table_name(const char *name, + int64_t &table_id, + int64_t &object_id, + int64_t &tablet_id, + int64_t &ls_id) + { + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affected_rows = 0; + + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("SELECT table_id, object_id, tablet_id, ls_id FROM oceanbase.DBA_OB_TABLE_LOCATIONS WHERE TABLE_NAME= '%s';", name)); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr())); + sqlclient::ObMySQLResult *result = res.get_result(); + ASSERT_NE(nullptr, result); + ASSERT_EQ(OB_SUCCESS, result->next()); + ASSERT_EQ(OB_SUCCESS, result->get_int("table_id", table_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("object_id", object_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", tablet_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("ls_id", ls_id)); + } + } + + int do_balance_inner_(uint64_t tenant_id) + { + int ret = OB_SUCCESS; + static std::mutex mutex; + mutex.lock(); + MTL_SWITCH(tenant_id) { + TRANS_LOG(INFO, "worker to do partition_balance"); + auto b_svr = MTL(rootserver::ObTenantBalanceService*); + b_svr->reset(); + int64_t job_cnt = 0; + int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP; + ObBalanceJob job; + if (OB_FAIL(b_svr->gather_stat_())) { + TRANS_LOG(WARN, "failed to gather stat", KR(ret)); + } else if (OB_FAIL(b_svr->gather_ls_status_stat(tenant_id, b_svr->ls_array_))) { + TRANS_LOG(WARN, "failed to gather ls stat", KR(ret)); + } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job( + tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) { + if (OB_ENTRY_NOT_EXIST == ret) { + //NO JOB, need check current ls status + ret = OB_SUCCESS; + job_cnt = 0; + } else { + TRANS_LOG(WARN, "failed to get balance job", KR(ret), K(tenant_id)); + } + } else if (OB_FAIL(b_svr->try_finish_current_job_(job, job_cnt))) { + TRANS_LOG(WARN, "failed to finish current job", KR(ret), K(job)); + } + if (OB_SUCC(ret) && job_cnt == 0 && OB_FAIL(b_svr->partition_balance_(true))) { + TRANS_LOG(WARN, "failed to do partition balance", KR(ret)); + } + } + mutex.unlock(); + return ret; + } + + int do_balance(uint64_t tenant_id) + { + int ret = OB_SUCCESS; + if (OB_FAIL(do_balance_inner_(tenant_id))) { + } else if (OB_FAIL(do_balance_inner_(tenant_id))) { + } + return ret; + } + + ObLS *get_ls(const int64_t tenant_id, const ObLSID ls_id) + { + int ret = OB_SUCCESS; + ObLS *ls = nullptr; + MTL_SWITCH(tenant_id) + { + ObLSHandle ls_handle; + ObLSService *ls_svr = MTL(ObLSService *); + OB_ASSERT(OB_SUCCESS == ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); + OB_ASSERT(nullptr != (ls = ls_handle.get_ls())); + } + return ls; + } +}; + +TEST_F(ObTransferBetweenRollbackTo, transfer_between_rollback_to) +{ + ObSqlString sql; + int64_t affected_rows = 0; + + global_ls_id.reset(); + global_tx_id.reset(); + + // ============================== Phase1. create tenant ============================== + TRANS_LOG(INFO, "create tenant start"); + uint64_t tenant_id = 0; + create_test_tenant(tenant_id); + TRANS_LOG(INFO, "create tenant end"); + + share::ObTenantSwitchGuard tenant_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id)); + + prepare_tenant_env(); + + // ============================== Phase2. create new ls ============================== + ASSERT_EQ(0, SSH::create_ls(tenant_id, get_curr_observer().self_addr_)); + int64_t ls_count = 0; + ASSERT_EQ(0, SSH::g_select_int64(tenant_id, "select count(ls_id) as val from __all_ls where ls_id!=1", ls_count)); + ASSERT_EQ(2, ls_count); + + // ============================== Phase3. create new tables ============================== + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + TRANS_LOG(INFO, "create table qcc1 start"); + EXE_SQL("create table qcc1 (a int)"); + TRANS_LOG(INFO, "create_table qcc1 end"); + + TRANS_LOG(INFO, "create table qcc2 start"); + EXE_SQL("create table qcc2 (a int)"); + TRANS_LOG(INFO, "create_table qcc2 end"); + usleep(3 * 1000 * 1000); + + ObLSID loc1, loc2; + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1)); + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2)); + ASSERT_NE(loc1, loc2); + int64_t table1, table2; + int64_t object1, object2; + int64_t tablet1, tablet2; + int64_t ls1, ls2; + get_tablet_info_with_table_name("qcc1", table1, object1, tablet1, ls1); + get_tablet_info_with_table_name("qcc2", table2, object2, tablet2, ls2); + fprintf(stdout, "qcc is created successfully, loc1: %ld, loc2: %ld, table1: %ld, table2: %ld, tablet1: %ld, tablet2: %ld, ls1: %ld, ls2: %ld\n", + loc1.id(), loc2.id(), table1, table2, tablet1, tablet2, ls1, ls2); + + EXE_SQL("create tablegroup tg1 sharding='NONE';"); + + // ============================== Phase4. wait minor freeze to remove retain ctx ============================== + sqlclient::ObISQLConnection *sys_conn = nullptr; + common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy(); + ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn)); + ASSERT_NE(nullptr, sys_conn); + + WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant sys;"); + WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all_user;"); + WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all_meta;"); + sleep(5); + + int ret = OB_SUCCESS; + HEAP_VAR(ObMySQLProxy::MySQLResult, res_0) + { + common::sqlclient::ObMySQLResult *result = nullptr; + sql.assign( + "SELECT count(*) as cnt FROM oceanbase.__all_virtual_trans_stat where tenant_id = 1002 and ls_id = 1001;"); + int retry_times = 100; + int64_t cnt = 0; + + while (--retry_times >= 0) { + res_0.reuse(); + ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_0)); + result = res_0.mysql_result(); + ASSERT_EQ(OB_SUCCESS, result->next()); + + ASSERT_EQ(OB_SUCCESS, result->get_int("cnt", cnt)); + if (0 == cnt) { + break; + } else { + fprintf(stdout, "waitting for tx ctx table mini merge to clear retain ctx ... \n"); + sleep(1); + } + } + ASSERT_EQ(0, cnt); + } + + // ============================== Phase5. start the user txn ============================== + sqlclient::ObISQLConnection *user_connection = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection)); + ASSERT_NE(nullptr, user_connection); + + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_timeout = 10000000000"); + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_idle_timeout = 10000000000"); + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_query_timeout = 10000000000"); + + TRANS_LOG(INFO, "start the txn"); + WRITE_SQL_BY_CONN(user_connection, "begin;"); + + WRITE_SQL_FMT_BY_CONN(user_connection, "savepoint qcqc;"); + + WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc2 values(1);"); + WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc1 values(1);"); + + // remember the frozen point + ASSERT_EQ(0, SSH::find_tx(user_connection, global_tx_id)); + global_ls_id = ls2; + + + fprintf(stdout, "txn is created successfully, tx_id: %ld\n", global_tx_id.get_id()); + + + std::thread th([user_connection] () { + ObSqlString sql; + int64_t affected_rows = 0; + + fprintf(stdout, "start to rollback to savepoint, tx_id: %ld\n", global_tx_id.get_id()); + + WRITE_SQL_FMT_BY_CONN(user_connection, "rollback to savepoint qcqc;"); + + fprintf(stdout, "rollback to savepoint successfully, tx_id: %ld\n", global_tx_id.get_id()); + }); + + // ============================== Phase5. start the transfer ============================== + EXE_SQL("alter tablegroup tg1 add qcc1,qcc2;"); + usleep(1 * 1000 * 1000); + ASSERT_EQ(0, do_balance(tenant_id)); + int64_t begin_time = ObTimeUtility::current_time(); + while (true) { + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1)); + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2)); + if (loc1 == loc2) { + fprintf(stdout, "succeed wait for balancer\n"); + break; + } else if (ObTimeUtility::current_time() - begin_time > 300 * 1000 * 1000) { + fprintf(stdout, "ERROR: fail to wait for balancer\n"); + break; + } else { + usleep(1 * 1000 * 1000); + fprintf(stdout, "wait for balancer\n"); + } + } + ASSERT_EQ(loc1, loc2); + + usleep(1000 * 1000); + + global_ls_id.reset(); + + th.join(); + + ObISQLClient::ReadResult read_res; + + READ_SQL_BY_CONN(user_connection, "select * from qcc2;"); + sqlclient::ObMySQLResult *result = read_res.get_result(); + int i = 0; + while (OB_SUCC(result->next())) { + i++; + int64_t a; + ASSERT_EQ(OB_SUCCESS, result->get_int("a", a)); + ASSERT_EQ(a, 1); + } + + EXPECT_EQ(0, i); +} + +} // namespace unittest +} // namespace oceanbase + +int main(int argc, char **argv) +{ + using namespace oceanbase::unittest; + + oceanbase::unittest::init_log_and_gtest(argc, argv); + OB_LOGGER.set_log_level("info"); + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} diff --git a/src/storage/tx/ob_trans_define.cpp b/src/storage/tx/ob_trans_define.cpp index dd54fc0d1..4bfef44e4 100644 --- a/src/storage/tx/ob_trans_define.cpp +++ b/src/storage/tx/ob_trans_define.cpp @@ -1024,6 +1024,9 @@ int RollbackMaskSet::merge_part(const share::ObLSID add_ls_id, const int64_t exe for (int64_t i = 0; i < rollback_parts_->count(); i++) { if (rollback_parts_->at(i).ls_id_ == add_ls_id) { is_exist = true; + if (OB_FAIL(mask_set_.unmask(rollback_parts_->at(i)))) { + TRANS_LOG(WARN, "unmask fail", KR(ret), K(add_ls_id)); + } break; } } diff --git a/src/storage/tx/ob_trans_define_v4.h b/src/storage/tx/ob_trans_define_v4.h index d26555217..eb59471ee 100644 --- a/src/storage/tx/ob_trans_define_v4.h +++ b/src/storage/tx/ob_trans_define_v4.h @@ -393,6 +393,10 @@ public: ObSpinLockGuard guard(lock_); return mask_set_.mask(part); } + int unmask(const ObTxExecPart &part) { + ObSpinLockGuard guard(lock_); + return mask_set_.unmask(part); + } bool is_all_mask() { ObSpinLockGuard guard(lock_); return mask_set_.is_all_mask(); diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index c73ce4aa0..9c96d2e9b 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -308,6 +308,11 @@ void ObPartTransCtx::destroy() timeout_task_.destroy(); trace_info_.reset(); block_frozen_memtable_ = nullptr; + + last_rollback_to_request_id_ = 0; + last_rollback_to_timestamp_ = 0; + last_transfer_in_timestamp_ = 0; + is_inited_ = false; } } @@ -371,6 +376,9 @@ void ObPartTransCtx::default_init_() standby_part_collected_.reset(); trace_log_.reset(); transfer_deleted_ = false; + last_rollback_to_request_id_ = 0; + last_rollback_to_timestamp_ = 0; + last_transfer_in_timestamp_ = 0; } int ObPartTransCtx::init_log_cbs_(const ObLSID &ls_id, const ObTransID &tx_id) @@ -8237,6 +8245,7 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, ObTxSEQ from_scn, const ObTxSEQ to_scn, const int64_t seq_base, + const int64_t request_id, ObIArray &downstream_parts) { int ret = OB_SUCCESS; @@ -8273,13 +8282,49 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, } else if (to_scn.get_branch() == 0) { last_scn_ = to_scn; } - // must add downstream parts when return success - for (int64_t idx = 0; OB_SUCC(ret) && idx < exec_info_.intermediate_participants_.count(); idx++) { - if (OB_FAIL(downstream_parts.push_back(ObTxLSEpochPair(exec_info_.intermediate_participants_.at(idx).ls_id_, - exec_info_.intermediate_participants_.at(idx).transfer_epoch_)))) { - TRANS_LOG(WARN, "push parts to array failed", K(ret), KPC(this)); + + if (OB_SUCC(ret)) { + bool need_downstream = true; + int64_t current_rollback_to_timestamp = ObTimeUtility::current_time(); + if (request_id != 0 && // come from rollback to request + request_id == last_rollback_to_request_id_) { // the same rollback to with the last one + if (last_transfer_in_timestamp_ != 0 && + last_rollback_to_timestamp_ != 0 && + // encounter transfer between two same rollback to + last_transfer_in_timestamp_ > last_rollback_to_timestamp_) { + need_downstream = true; + TRANS_LOG(INFO, "transfer between rollback to happened", K(ret), K(request_id), + K(last_rollback_to_timestamp_), K(last_transfer_in_timestamp_), + K(last_rollback_to_request_id_), KPC(this)); + } else { + need_downstream = false; + TRANS_LOG(INFO, "no transfer between rollback to happened", K(ret), K(request_id), + K(last_rollback_to_timestamp_), K(last_transfer_in_timestamp_), + K(last_rollback_to_request_id_), KPC(this)); + } + } else { + need_downstream = true; + } + + // must add downstream parts when return success + for (int64_t idx = 0; + OB_SUCC(ret) && + need_downstream && + idx < exec_info_.intermediate_participants_.count(); + idx++) { + if (OB_FAIL(downstream_parts.push_back( + ObTxLSEpochPair(exec_info_.intermediate_participants_.at(idx).ls_id_, + exec_info_.intermediate_participants_.at(idx).transfer_epoch_)))) { + TRANS_LOG(WARN, "push parts to array failed", K(ret), KPC(this)); + } + } + + if (OB_SUCC(ret)) { + last_rollback_to_request_id_ = request_id; + last_rollback_to_timestamp_ = current_rollback_to_timestamp; } } + REC_TRANS_TRACE_EXT(tlog_, rollback_savepoint, OB_ID(ret), ret, OB_ID(from), from_scn.cast_to_int(), @@ -9838,6 +9883,8 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, exec_info_.is_transfer_blocking_ = false; if (OB_FAIL(transfer_op_log_cb_(move_tx_param.op_scn_, move_tx_param.op_type_))) { TRANS_LOG(WARN, "transfer op loc_cb failed", KR(ret), K(move_tx_param)); + } else { + last_transfer_in_timestamp_ = ObTimeUtility::current_time(); } } } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 1935b43f8..3fb885aee 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -173,7 +173,10 @@ public: coord_prepare_info_arr_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(reserve_allocator_, "PREPARE_INFO")), standby_part_collected_(), ask_state_info_interval_(100 * 1000), refresh_state_info_interval_(100 * 1000), - transfer_deleted_(false) + transfer_deleted_(false), + last_rollback_to_request_id_(0), + last_rollback_to_timestamp_(0), + last_transfer_in_timestamp_(0) { /*reset();*/ } ~ObPartTransCtx() { destroy(); } void destroy(); @@ -900,6 +903,7 @@ public: ObTxSEQ from_seq, const ObTxSEQ to_seq, const int64_t seq_base, + const int64_t request_id, ObIArray &downstream_parts); bool is_xa_trans() const { return !exec_info_.xid_.empty(); } bool is_transfer_deleted() const { return transfer_deleted_; } @@ -1097,6 +1101,11 @@ private: // for transfer move tx ctx to clean for abort bool transfer_deleted_; + + // TODO(handora.qc): remove after fix the transfer bwteen rollback_to bug + int64_t last_rollback_to_request_id_; + int64_t last_rollback_to_timestamp_; + int64_t last_transfer_in_timestamp_; // ======================================================== }; diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index ffa4061d0..9faab0267 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -2106,6 +2106,7 @@ int ObTransService::handle_sp_rollback_request(ObTxRollbackSPMsg &msg, msg.tx_ptr_, msg.for_transfer(), msg.specified_from_scn_, + msg.request_id_, result.downstream_parts_); if (msg.use_async_resp()) { ObTxRollbackSPRespMsg resp; diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index 6ef161fe3..a8a0614d9 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -360,6 +360,7 @@ int ls_rollback_to_savepoint_(const ObTransID &tx_id, const ObTxDesc *tx, const bool for_transfer, const ObTxSEQ from_scn, + const int64_t request_id, ObIArray &downstream_parts, int64_t expire_ts = -1); int sync_rollback_savepoint__(ObTxDesc &tx, @@ -388,6 +389,7 @@ int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, const int64_t tx_seq_base, const int64_t expire_ts, const ObTxSEQ specified_from_scn, + const int64_t request_id, ObIArray &downstream_parts); void tx_post_terminate_(ObTxDesc &tx); int start_epoch_(ObTxDesc &tx); diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index a6d2f5bc5..bce0f205f 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1114,6 +1114,7 @@ int ObTransService::rollback_to_local_implicit_savepoint_(ObTxDesc &tx, tx.seq_base_, expire_ts, from_scn, + 0, /*request_id, only used for request*/ downstream_parts))) { TRANS_LOG(WARN, "LS rollback savepoint fail", K(ret), K(savepoint), K(tx)); } else { @@ -1290,13 +1291,19 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, const int64_t tx_seq_base, const int64_t expire_ts, const ObTxSEQ specified_from_scn, + const int64_t request_id, ObIArray &downstream_parts) { int ret = OB_SUCCESS; int64_t retry_cnt = 0; bool blockable = expire_ts > 0; do { - ret = part_ctx->rollback_to_savepoint(op_sn, specified_from_scn, savepoint, tx_seq_base, downstream_parts); + ret = part_ctx->rollback_to_savepoint(op_sn, + specified_from_scn, + savepoint, + tx_seq_base, + request_id, + downstream_parts); if ((OB_NEED_RETRY == ret || OB_EAGAIN == ret) && blockable) { if (ObTimeUtility::current_time() >= expire_ts) { ret = OB_TIMEOUT; @@ -1523,6 +1530,7 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx, &tx, false,/*for transfer*/ ObTxSEQ::INVL(), + 0, /*request_id, only for rollback_to request*/ downstream_parts, -1/*non-blocking*/))) { if (common_retryable_error_(ret)) { @@ -1593,6 +1601,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, const ObTxDesc *tx, const bool for_transfer, const ObTxSEQ from_scn, + const int64_t request_id, ObIArray &downstream_parts, int64_t expire_ts) { @@ -1656,6 +1665,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, tx_seq_base, expire_ts, from_scn, + request_id, downstream_parts))) { TRANS_LOG(WARN, "LS rollback to savepoint fail", K(ret), K(tx_id), K(ls), K(op_sn), K(savepoint), KPC(ctx)); }