diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index 857f578109..8dd5071f22 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -106,6 +106,7 @@ ob_unittest_observer(test_create_clone_tenant_resource_pool test_create_clone_te ob_unittest_observer(test_tablet_autoinc_mgr test_tablet_autoinc_mgr.cpp) ob_unittest_observer(test_tenant_snapshot_service test_tenant_snapshot_service.cpp) ob_unittest_observer(test_callbacks_with_reverse_order test_callbacks_with_reverse_order.cpp) +ob_unittest_observer(test_transfer_tx_data test_transfer_with_smaller_tx_data.cpp) # TODO(muwei.ym): open later ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp) ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp) diff --git a/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp b/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp new file mode 100644 index 0000000000..309f292f6e --- /dev/null +++ b/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp @@ -0,0 +1,439 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#include +#define protected public +#define private public + +#include "env/ob_simple_cluster_test_base.h" +#include "rootserver/ob_tenant_balance_service.h" +#include "share/balance/ob_balance_job_table_operator.h" +#include "mittest/simple_server/env/ob_simple_server_helper.h" +#include "storage/tx_storage/ob_ls_service.h" +#include "storage/tx/ob_tx_loop_worker.h" +#include "storage/tx/ob_trans_part_ctx.h" + +namespace oceanbase +{ + +namespace storage +{ +int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 0; + +int ObTransferHandler::wait_src_ls_advance_weak_read_ts_( + const share::ObTransferTaskInfo &task_info, + ObTimeoutCtx &timeout_ctx) +{ + UNUSED(task_info); + UNUSED(timeout_ctx); + return OB_SUCCESS; +} +} + +namespace unittest +{ + +#define EXE_SQL(sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define EXE_SQL_FMT(...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, sql_proxy.write(sql.ptr(), affected_rows)); + +#define WRITE_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define WRITE_SQL_FMT_BY_CONN(conn, ...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define READ_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), read_res)); + + +class ObTransferWithSmallerStartSCN : public ObSimpleClusterTestBase +{ +public: + ObTransferWithSmallerStartSCN(): ObSimpleClusterTestBase("test_transfer_smaller_start_scn", "200G", "40G") {} + + void prepare_tenant_env() + { + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + int64_t affected_rows = 0; + ObSqlString sql; + sqlclient::ObISQLConnection *connection = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(connection)); + ASSERT_NE(nullptr, connection); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_trx_idle_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000"); + WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;"); + WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;"); + WRITE_SQL_BY_CONN(connection, "alter system set partition_balance_schedule_interval = '10s';"); + sleep(5); + } + + void create_test_tenant(uint64_t &tenant_id) + { + TRANS_LOG(INFO, "create_tenant start"); + ASSERT_EQ(OB_SUCCESS, create_tenant("tt1", "20G", "100G")); + fprintf(stdout, "finish sleep\n"); + ASSERT_EQ(OB_SUCCESS, get_tenant_id(tenant_id)); + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + TRANS_LOG(INFO, "create_tenant end", K(tenant_id)); + } + + int wait_balance_clean(uint64_t tenant_id) + { + int ret = OB_SUCCESS; + while (OB_SUCC(ret)) { + bool is_clean = false; + MTL_SWITCH(tenant_id) { + ObBalanceJob job; + int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP; + if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(tenant_id, + false, + *GCTX.sql_proxy_, + job, + start_time, + finish_time))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + is_clean = true; + } + } else { + ob_usleep(200 * 1000); + } + } + if (is_clean) { + int64_t transfer_task_count = 0; + if (OB_FAIL(SSH::g_select_int64(tenant_id, "select count(*) as val from __all_transfer_task", transfer_task_count))) { + } else if (transfer_task_count == 0) { + break; + } else { + ob_usleep(200 * 1000); + } + } + } + return ret; + } + + void get_tablet_info_with_table_name(const char *name, + int64_t &table_id, + int64_t &object_id, + int64_t &tablet_id, + int64_t &ls_id) + { + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affected_rows = 0; + + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("SELECT table_id, object_id, tablet_id, ls_id FROM oceanbase.DBA_OB_TABLE_LOCATIONS WHERE TABLE_NAME= '%s';", name)); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ASSERT_EQ(OB_SUCCESS, sql_proxy.read(res, sql.ptr())); + sqlclient::ObMySQLResult *result = res.get_result(); + ASSERT_NE(nullptr, result); + ASSERT_EQ(OB_SUCCESS, result->next()); + ASSERT_EQ(OB_SUCCESS, result->get_int("table_id", table_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("object_id", object_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("tablet_id", tablet_id)); + ASSERT_EQ(OB_SUCCESS, result->get_int("ls_id", ls_id)); + } + } + + int do_balance_inner_(uint64_t tenant_id) + { + int ret = OB_SUCCESS; + static std::mutex mutex; + mutex.lock(); + MTL_SWITCH(tenant_id) { + TRANS_LOG(INFO, "worker to do partition_balance"); + auto b_svr = MTL(rootserver::ObTenantBalanceService*); + b_svr->reset(); + int64_t job_cnt = 0; + int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP; + ObBalanceJob job; + if (OB_FAIL(b_svr->gather_stat_())) { + TRANS_LOG(WARN, "failed to gather stat", KR(ret)); + } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job( + tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) { + if (OB_ENTRY_NOT_EXIST == ret) { + //NO JOB, need check current ls status + ret = OB_SUCCESS; + job_cnt = 0; + } else { + TRANS_LOG(WARN, "failed to get balance job", KR(ret), K(tenant_id)); + } + } else if (OB_FAIL(b_svr->try_finish_current_job_(job, job_cnt))) { + TRANS_LOG(WARN, "failed to finish current job", KR(ret), K(job)); + } + if (OB_SUCC(ret) && job_cnt == 0 && OB_FAIL(b_svr->partition_balance_(true))) { + TRANS_LOG(WARN, "failed to do partition balance", KR(ret)); + } + } + mutex.unlock(); + return ret; + } + + int do_balance(uint64_t tenant_id) + { + int ret = OB_SUCCESS; + if (OB_FAIL(do_balance_inner_(tenant_id))) { + } else if (OB_FAIL(do_balance_inner_(tenant_id))) { + } + return ret; + } + + ObLS *get_ls(const int64_t tenant_id, const ObLSID ls_id) + { + int ret = OB_SUCCESS; + ObLS *ls = nullptr; + MTL_SWITCH(tenant_id) + { + ObLSHandle ls_handle; + ObLSService *ls_svr = MTL(ObLSService *); + OB_ASSERT(OB_SUCCESS == ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); + OB_ASSERT(nullptr != (ls = ls_handle.get_ls())); + } + return ls; + } +}; + +TEST_F(ObTransferWithSmallerStartSCN, smaller_start_scn) +{ + ObSqlString sql; + int64_t affected_rows = 0; + + // ============================== Phase1. create tenant ============================== + TRANS_LOG(INFO, "create tenant start"); + uint64_t tenant_id = 0; + create_test_tenant(tenant_id); + TRANS_LOG(INFO, "create tenant end"); + + share::ObTenantSwitchGuard tenant_guard; + ASSERT_EQ(OB_SUCCESS, tenant_guard.switch_to(tenant_id)); + + prepare_tenant_env(); + + // ============================== Phase2. create new ls ============================== + ASSERT_EQ(0, SSH::create_ls(tenant_id, get_curr_observer().self_addr_)); + int64_t ls_count = 0; + ASSERT_EQ(0, SSH::g_select_int64(tenant_id, "select count(ls_id) as val from __all_ls where ls_id!=1", ls_count)); + ASSERT_EQ(2, ls_count); + + // ============================== Phase3. create new tables ============================== + common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); + TRANS_LOG(INFO, "create table qcc2 start"); + EXE_SQL("create table qcc1 (a int)"); + TRANS_LOG(INFO, "create_table qcc2 end"); + + TRANS_LOG(INFO, "create table qcc2 start"); + EXE_SQL("create table qcc2 (a int)"); + TRANS_LOG(INFO, "create_table qcc2 end"); + usleep(3 * 1000 * 1000); + + ObLSID loc1, loc2; + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1)); + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2)); + ASSERT_NE(loc1, loc2); + int64_t table1, table2; + int64_t object1, object2; + int64_t tablet1, tablet2; + int64_t ls1, ls2; + get_tablet_info_with_table_name("qcc1", table1, object1, tablet1, ls1); + get_tablet_info_with_table_name("qcc2", table2, object2, tablet2, ls2); + fprintf(stdout, "qcc is created successfully, loc1: %ld, loc2: %ld, table1: %ld, table2: %ld, tablet1: %ld, tablet2: %ld, ls1: %ld, ls2: %ld\n", + loc1.id(), loc2.id(), table1, table2, tablet1, tablet2, ls1, ls2); + + EXE_SQL("create tablegroup tg1 sharding='NONE';"); + + // ============================== Phase4. wait minor freeze to remove retain ctx ============================== + sqlclient::ObISQLConnection *sys_conn = nullptr; + common::ObMySQLProxy &sys_proxy = get_curr_simple_server().get_sql_proxy(); + ASSERT_EQ(OB_SUCCESS, sys_proxy.acquire(sys_conn)); + ASSERT_NE(nullptr, sys_conn); + + WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant sys;"); + WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all_user;"); + WRITE_SQL_BY_CONN(sys_conn, "alter system minor freeze tenant all_meta;"); + sleep(5); + + int ret = OB_SUCCESS; + HEAP_VAR(ObMySQLProxy::MySQLResult, res_0) + { + common::sqlclient::ObMySQLResult *result = nullptr; + sql.assign( + "SELECT count(*) as cnt FROM oceanbase.__all_virtual_trans_stat where tenant_id = 1002 and ls_id = 1001;"); + int retry_times = 100; + int64_t cnt = 0; + + while (--retry_times >= 0) { + res_0.reuse(); + ASSERT_EQ(OB_SUCCESS, sys_conn->execute_read(OB_SYS_TENANT_ID, sql.ptr(), res_0)); + result = res_0.mysql_result(); + ASSERT_EQ(OB_SUCCESS, result->next()); + + ASSERT_EQ(OB_SUCCESS, result->get_int("cnt", cnt)); + if (0 == cnt) { + break; + } else { + fprintf(stdout, "waitting for tx ctx table mini merge to clear retain ctx ... \n"); + sleep(1); + } + } + ASSERT_EQ(0, cnt); + } + + // ============================== Phase5. start the user txn ============================== + sqlclient::ObISQLConnection *user_connection = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection)); + ASSERT_NE(nullptr, user_connection); + + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_timeout = 10000000000"); + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_trx_idle_timeout = 10000000000"); + WRITE_SQL_BY_CONN(user_connection, "set SESSION ob_query_timeout = 10000000000"); + + TRANS_LOG(INFO, "start the txn"); + WRITE_SQL_BY_CONN(user_connection, "begin;"); + WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc2 values(1);"); + // Step1: let the first ls logging + ASSERT_EQ(0, SSH::submit_redo(tenant_id, loc2)); + // Step2: sleep 5 seconds + usleep(5 * 1000 * 1000); + WRITE_SQL_FMT_BY_CONN(user_connection, "insert into qcc1 values(1);"); + // Step3: let the second ls logging + ASSERT_EQ(0, SSH::submit_redo(tenant_id, loc1)); + + ObTxLoopWorker *worker = MTL(ObTxLoopWorker *); + worker->scan_all_ls_(true, true); + usleep(1 * 1000 * 1000); + + // Step4: let the tx data table update upper info + ObLS *ls = get_ls(tenant_id, loc1); + storage::ObTxDataTable *tx_data_table = ls->get_tx_table()->get_tx_data_table(); + fprintf(stdout, "start update upper info the first time\n"); + TRANS_LOG(INFO, "start update upper info the first time"); + tx_data_table->update_calc_upper_info_(SCN::max_scn()); + uint64_t first_min_start_scn = tx_data_table->calc_upper_info_.min_start_scn_in_ctx_.val_; + fprintf(stdout, "end update upper info the first time, %lu\n", first_min_start_scn); + TRANS_LOG(INFO, "end update upper info the first time"); + + // ============================== Phase5.2. start the user txn2 ============================== + sqlclient::ObISQLConnection *user_connection2 = nullptr; + ASSERT_EQ(OB_SUCCESS, sql_proxy.acquire(user_connection2)); + ASSERT_NE(nullptr, user_connection2); + + WRITE_SQL_BY_CONN(user_connection2, "set SESSION ob_trx_timeout = 10000000000"); + WRITE_SQL_BY_CONN(user_connection2, "set SESSION ob_trx_idle_timeout = 10000000000"); + WRITE_SQL_BY_CONN(user_connection2, "set SESSION ob_query_timeout = 10000000000"); + + TRANS_LOG(INFO, "start the txn2"); + WRITE_SQL_BY_CONN(user_connection2, "begin;"); + WRITE_SQL_FMT_BY_CONN(user_connection2, "insert into qcc2 values(2);"); + // Step1: let the first ls logging + ASSERT_EQ(0, SSH::submit_redo(tenant_id, loc2)); + // Step2: sleep 5 seconds + usleep(5 * 1000 * 1000); + WRITE_SQL_FMT_BY_CONN(user_connection2, "insert into qcc1 values(2);"); + // Step3: let the second ls logging + ASSERT_EQ(0, SSH::submit_redo(tenant_id, loc1)); + + ObTransID tx_id; + ASSERT_EQ(0, SSH::find_tx(user_connection2, tx_id)); + + InjectTxFaultHelper inject_tx_fault_helper; + ASSERT_EQ(0, inject_tx_fault_helper.inject_tx_block(tenant_id, loc2, tx_id, ObTxLogType::TX_COMMIT_LOG)); + + std::thread th([user_connection2] () { + user_connection2->commit(); + }); + + usleep(1 * 1000 * 1000); + + // Step4: let the tx data table update upper info + share::SCN min_start_scn_in_tx_data; + min_start_scn_in_tx_data.set_max(); + bool unused; + fprintf(stdout, "start get min start in tx data table first time\n"); + TRANS_LOG(INFO, "start get min start in tx data table first time"); + tx_data_table->check_min_start_in_tx_data_(SCN::invalid_scn(), min_start_scn_in_tx_data, unused); + uint64_t first_min_start_scn_in_tx_data = min_start_scn_in_tx_data.val_; + fprintf(stdout, "end get min start in tx data table first time, %lu, %lu\n", min_start_scn_in_tx_data.val_, tx_id.get_id()); + TRANS_LOG(INFO, "end get min start in tx data table first time"); + + // ============================== Phase5. start the transfer ============================== + EXE_SQL("alter tablegroup tg1 add qcc1,qcc2;"); + usleep(1 * 1000 * 1000); + ASSERT_EQ(0, do_balance(tenant_id)); + int64_t begin_time = ObTimeUtility::current_time(); + while (true) { + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc1", loc1)); + ASSERT_EQ(0, SSH::select_table_loc(tenant_id, "qcc2", loc2)); + if (loc1 == loc2) { + fprintf(stdout, "succeed wait for balancer\n"); + break; + } else if (ObTimeUtility::current_time() - begin_time > 300 * 1000 * 1000) { + fprintf(stdout, "ERROR: fail to wait for balancer\n"); + break; + } else { + usleep(1 * 1000 * 1000); + fprintf(stdout, "wait for balancer\n"); + } + } + ASSERT_EQ(loc1, loc2); + + worker->scan_all_ls_(true, true); + usleep(1 * 1000 * 1000); + + fprintf(stdout, "start update upper info the second time\n"); + TRANS_LOG(INFO, "start update upper info the second time"); + tx_data_table->update_calc_upper_info_(SCN::max_scn()); + uint64_t second_min_start_scn = tx_data_table->calc_upper_info_.min_start_scn_in_ctx_.val_; + fprintf(stdout, "end update upper info the second time %lu\n", second_min_start_scn); + TRANS_LOG(INFO, "end update upper info the second time"); + + ASSERT_EQ(true, first_min_start_scn > second_min_start_scn); + + min_start_scn_in_tx_data.set_max(); + fprintf(stdout, "start get min start in tx data table second time\n"); + TRANS_LOG(INFO, "start get min start in tx data table second time"); + tx_data_table->check_min_start_in_tx_data_(SCN::invalid_scn(), min_start_scn_in_tx_data, unused); + uint64_t second_min_start_scn_in_tx_data = min_start_scn_in_tx_data.val_; + fprintf(stdout, "end get min start in tx data table second time, %lu\n", min_start_scn_in_tx_data.val_); + TRANS_LOG(INFO, "end get min start in tx data table second time"); + + ASSERT_EQ(true, first_min_start_scn > second_min_start_scn); + ASSERT_EQ(true, first_min_start_scn_in_tx_data > second_min_start_scn_in_tx_data); + + inject_tx_fault_helper.release(); + th.join(); +} + +} // namespace unittest +} // namespace oceanbase + +int main(int argc, char **argv) +{ + using namespace oceanbase::unittest; + + oceanbase::unittest::init_log_and_gtest(argc, argv); + OB_LOGGER.set_log_level("info"); + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} diff --git a/src/storage/ls/ob_ls_transfer_status.cpp b/src/storage/ls/ob_ls_transfer_status.cpp index 8264db7e42..8de06ef55a 100644 --- a/src/storage/ls/ob_ls_transfer_status.cpp +++ b/src/storage/ls/ob_ls_transfer_status.cpp @@ -46,7 +46,8 @@ void ObLSTransferStatus::reset() move_tx_scn_.reset(); } -void ObLSTransferStatus::reset_prepare_op() { +void ObLSTransferStatus::reset_prepare_op() +{ transfer_prepare_op_ = false; transfer_prepare_scn_.reset(); if (is_finished()) { @@ -54,7 +55,9 @@ void ObLSTransferStatus::reset_prepare_op() { transfer_task_id_ = 0; } } -void ObLSTransferStatus::reset_move_tx_op() { + +void ObLSTransferStatus::reset_move_tx_op() +{ move_tx_op_ = false; move_tx_scn_.reset(); if (is_finished()) { @@ -147,6 +150,7 @@ int ObLSTransferStatus::update_status_inner_(const transaction::ObTransID tx_id, if (!transfer_tx_id_.is_valid() || transfer_tx_id_ == tx_id) { if (NotifyType::ON_COMMIT == op_type || NotifyType::ON_ABORT == op_type) { if (ObTxDataSourceType::TRANSFER_DEST_PREPARE == mds_type) { + enable_upper_trans_calculation_(op_scn); reset_prepare_op(); } else if (ObTxDataSourceType::TRANSFER_MOVE_TX_CTX == mds_type) { reset_move_tx_op(); @@ -157,6 +161,7 @@ int ObLSTransferStatus::update_status_inner_(const transaction::ObTransID tx_id, if (ObTxDataSourceType::TRANSFER_DEST_PREPARE == mds_type) { transfer_prepare_op_ = true; transfer_prepare_scn_ = op_scn; + disable_upper_trans_calculation_(); } else if (ObTxDataSourceType::TRANSFER_MOVE_TX_CTX == mds_type) { move_tx_op_ = true; move_tx_scn_ = op_scn; @@ -184,12 +189,14 @@ int ObLSTransferStatus::replay_status_inner_(const transaction::ObTransID tx_id, if (ObTxDataSourceType::TRANSFER_DEST_PREPARE == mds_type) { if (!transfer_prepare_scn_.is_valid() || transfer_prepare_scn_ < op_scn) { if (NotifyType::ON_COMMIT == op_type || NotifyType::ON_ABORT == op_type) { + enable_upper_trans_calculation_(op_scn); reset_prepare_op(); } else { transfer_tx_id_ = tx_id; transfer_task_id_ = task_id; transfer_prepare_op_ = true; transfer_prepare_scn_ = op_scn; + disable_upper_trans_calculation_(); } } } else if (ObTxDataSourceType::TRANSFER_MOVE_TX_CTX == mds_type) { @@ -223,6 +230,50 @@ int ObLSTransferStatus::get_transfer_prepare_status( return ret; } +int ObLSTransferStatus::enable_upper_trans_calculation_(const share::SCN op_scn) +{ + int ret = OB_SUCCESS; + ObTxTableGuard guard; + ObTxDataTable *tx_data_table = nullptr; + if (OB_FAIL(ls_->get_tx_table_guard(guard))) { + TRANS_LOG(WARN, "failed to get tx table", K(ret)); + } else if (OB_UNLIKELY(!guard.is_valid())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "tx table guard is invalid", K(ret), K(guard)); + } else if (OB_ISNULL(tx_data_table = + guard.get_tx_table()->get_tx_data_table())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "tx data table in tx table is nullptr.", K(ret)); + } else { + tx_data_table->enable_upper_trans_calculation(op_scn); + TRANS_LOG(INFO, "enable upper trans calculation", KPC(ls_), K(guard), KPC(this)); + } + + return ret; +} + +int ObLSTransferStatus::disable_upper_trans_calculation_() +{ + int ret = OB_SUCCESS; + ObTxTableGuard guard; + ObTxDataTable *tx_data_table = nullptr; + + if (OB_FAIL(ls_->get_tx_table_guard(guard))) { + TRANS_LOG(WARN, "failed to get tx table", K(ret)); + } else if (OB_UNLIKELY(!guard.is_valid())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "tx table guard is invalid", K(ret), K(guard)); + } else if (OB_ISNULL(tx_data_table = + guard.get_tx_table()->get_tx_data_table())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "tx data table in tx table is nullptr.", K(ret)); + } else { + tx_data_table->disable_upper_trans_calculation(); + TRANS_LOG(INFO, "disable upper trans calculation", KPC(ls_), K(guard), KPC(this)); + } + + return ret; +} } } diff --git a/src/storage/ls/ob_ls_transfer_status.h b/src/storage/ls/ob_ls_transfer_status.h index cfdb7f9a0c..772de78726 100644 --- a/src/storage/ls/ob_ls_transfer_status.h +++ b/src/storage/ls/ob_ls_transfer_status.h @@ -55,6 +55,8 @@ private: const share::SCN op_scn, const transaction::NotifyType op_type, const transaction::ObTxDataSourceType mds_type); + int enable_upper_trans_calculation_(const share::SCN op_scn); + int disable_upper_trans_calculation_(); private: bool is_inited_; ObLS *ls_; diff --git a/src/storage/tx/ob_keep_alive_ls_handler.h b/src/storage/tx/ob_keep_alive_ls_handler.h index 197b2da9c6..e9414b38f0 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.h +++ b/src/storage/tx/ob_keep_alive_ls_handler.h @@ -46,11 +46,15 @@ public: public: ObKeepAliveLogBody() - : compat_bit_(1), min_start_scn_(), - min_start_status_(MinStartScnStatus::UNKOWN) + : compat_bit_(1), min_start_scn_(), + min_start_status_(MinStartScnStatus::UNKOWN) {} - ObKeepAliveLogBody(int64_t compat_bit, const share::SCN &min_start_scn, MinStartScnStatus min_status) - : compat_bit_(compat_bit), min_start_scn_(min_start_scn), min_start_status_(min_status) + ObKeepAliveLogBody(int64_t compat_bit, + const share::SCN &min_start_scn, + MinStartScnStatus min_status) + : compat_bit_(compat_bit), + min_start_scn_(min_start_scn), + min_start_status_(min_status) {} static int64_t get_max_serialize_size(); @@ -168,8 +172,11 @@ public: share::SCN get_rec_scn() { return share::SCN::max_scn(); } int flush(share::SCN &rec_scn) { return OB_SUCCESS;} - void get_min_start_scn(share::SCN &min_start_scn, share::SCN &keep_alive_scn, MinStartScnStatus &status); + void get_min_start_scn(share::SCN &min_start_scn, + share::SCN &keep_alive_scn, + MinStartScnStatus &status); void set_sys_ls_end_scn(const share::SCN &sys_ls_end_scn) { sys_ls_end_scn_.inc_update(sys_ls_end_scn);} + private: bool check_gts_(); int serialize_keep_alive_log_(const share::SCN &min_start_scn, MinStartScnStatus status); diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 86cb539b6b..91ec005895 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1481,7 +1481,6 @@ int ObPartTransCtx::check_rs_scheduler_is_alive_(bool &is_alive) int64_t trace_time = 0; int64_t cur_time = ObTimeUtility::current_time(); share::ObAliveServerTracer *server_tracer = NULL; - is_alive = true; if (OB_ISNULL(trans_service_)) { ret = OB_ERR_UNEXPECTED; @@ -9198,6 +9197,9 @@ int ObPartTransCtx::collect_tx_ctx(const ObLSID dest_ls_id, return ret; } +// NB: This function can report a retryable error because the outer while loop +// will ignore the error and continuously retry until it succeeds within the +// callback function. int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, const ObTxCtxMoveArg &arg, const bool is_new_created) @@ -9231,7 +9233,10 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, ret = OB_OP_NOT_ALLOW; TRANS_LOG(WARN, "tx ctx has end", KR(ret), KPC(this)); } - } else if (epoch_ != arg.epoch_ && exec_info_.next_log_entry_no_ == 0 && get_redo_log_no_() == 0 && busy_cbs_.is_empty()) { + } else if (epoch_ != arg.epoch_ // ctx created by itself + && exec_info_.next_log_entry_no_ == 0 // no log submitted + && get_redo_log_no_() == 0 // no log submitted + && busy_cbs_.is_empty()) { // no log submitting // promise tx log before move log if (exec_info_.state_ == ObTxState::INIT) { // promise redo log before move log @@ -9282,9 +9287,22 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, if (arg.last_seq_no_ > last_scn_) { last_scn_.atomic_store(arg.last_seq_no_); } - if (!ctx_tx_data_.get_start_log_ts().is_valid() || arg.tx_start_scn_ < ctx_tx_data_.get_start_log_ts()) { - // TODO fix start_scn back + + // start scn in dest ctx is not valid while start scn in previous dest + // ctx(has been released) or src ctx is valid, so we need change it + if ((!ctx_tx_data_.get_start_log_ts().is_valid() && + (arg.tx_start_scn_.is_valid())) + || + // start scn in dest ctx is valid and start scn in src ctx is smaller + // than it,, so we need change it + (ctx_tx_data_.get_start_log_ts().is_valid() && + arg.tx_start_scn_.is_valid() && + arg.tx_start_scn_ < ctx_tx_data_.get_start_log_ts())) { if (!ctx_tx_data_.is_read_only()) { + // for merging txn where the start_scn is smaller or refers to a + // previously existing txn, we need to replace it with the smallest + // start_scn to ensure the proper recycling mechanism is in place. + // Otherwise the upper trans version will be calculated incorrectly ctx_tx_data_.set_start_log_ts(arg.tx_start_scn_); } } @@ -9326,7 +9344,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, // log sequence move_tx --> transfer_in --> commit // so when recycle tx_data on dest_ls, we can see transfer in tablet, not to recycle tx_data which end_scn > transfer_scn if (OB_SUCC(ret) && exec_info_.state_ >= ObTxState::COMMIT) { - if (OB_FAIL(update_tx_data_end_scn_(move_tx_param.op_scn_, move_tx_param.transfer_scn_))) { + if (OB_FAIL(update_tx_data_start_and_end_scn_(arg.tx_start_scn_, + move_tx_param.op_scn_, + move_tx_param.transfer_scn_))) { TRANS_LOG(WARN, "update tx data failed", KR(ret), KPC(this)); } } @@ -9422,7 +9442,9 @@ bool ObPartTransCtx::is_exec_complete_without_lock(ObLSID ls_id, return is_complete; } -int ObPartTransCtx::update_tx_data_end_scn_(const SCN end_scn, const SCN transfer_scn) +int ObPartTransCtx::update_tx_data_start_and_end_scn_(const SCN start_scn, + const SCN end_scn, + const SCN transfer_scn) { int ret = OB_SUCCESS; ObTxTable *tx_table = NULL; @@ -9438,6 +9460,14 @@ int ObPartTransCtx::update_tx_data_end_scn_(const SCN end_scn, const SCN transfe TRANS_LOG(WARN, "copy tx data failed", KR(ret), KPC(this)); } else { ObTxData *tx_data = tmp_tx_data_guard.tx_data(); + if (start_scn.is_valid()) { + share::SCN current_start_scn = get_start_log_ts(); + if (current_start_scn.is_valid()) { + tx_data->start_scn_.atomic_store(MIN(start_scn, current_start_scn)); + } else { + tx_data->start_scn_.atomic_store(start_scn); + } + } tx_data->end_scn_.atomic_store(end_scn); if (OB_FAIL(tx_table->insert(tx_data))) { TRANS_LOG(WARN, "insert tx data failed", KR(ret), KPC(this)); diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 4ca39fd3cd..797070c87c 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -776,7 +776,9 @@ public: bool is_exec_complete_without_lock(ObLSID ls_id, int64_t epoch, int64_t transfer_epoch); private: int transfer_op_log_cb_(share::SCN op_scn, NotifyType op_type); - int update_tx_data_end_scn_(const share::SCN end_scn, const share::SCN transfer_scn); + int update_tx_data_start_and_end_scn_(const share::SCN start_scn, + const share::SCN end_scn, + const share::SCN transfer_scn); protected: virtual int post_msg_(const share::ObLSID&receiver, ObTxMsg &msg); diff --git a/src/storage/tx/ob_tx_loop_worker.cpp b/src/storage/tx/ob_tx_loop_worker.cpp index 64a941656a..c439922170 100644 --- a/src/storage/tx/ob_tx_loop_worker.cpp +++ b/src/storage/tx/ob_tx_loop_worker.cpp @@ -187,6 +187,21 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx) status = MinStartScnStatus::UNKOWN; } + // During the transfer, we should not update min_start_scn, otherwise we + // will ignore the ctx that has been transferred in. So we check whether + // transfer is going on there. + // + // TODO(handora.qc): while after we have checked the transfer and later + // submitted the log, the transfer may also happens during these two + // operations. So we need double check it in the log application/replay. + if(MinStartScnStatus::UNKOWN == status) { + // do nothing + } else if (cur_ls_ptr->get_transfer_status().get_transfer_prepare_enable()) { + TRANS_LOG(INFO, "ignore min start scn during transfer prepare enabled", + K(cur_ls_ptr->get_transfer_status()), K(status), K(min_start_scn)); + status = MinStartScnStatus::UNKOWN; + } + if (MinStartScnStatus::UNKOWN == status) { min_start_scn.reset(); } else if (MinStartScnStatus::NO_CTX == status) { diff --git a/src/storage/tx_table/ob_tx_data_table.cpp b/src/storage/tx_table/ob_tx_data_table.cpp index 273f4e649e..0ec5f39081 100644 --- a/src/storage/tx_table/ob_tx_data_table.cpp +++ b/src/storage/tx_table/ob_tx_data_table.cpp @@ -39,7 +39,7 @@ using namespace oceanbase::share; namespace storage { -int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 30 * 1000 * 1000; // 30 seconds +int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 15 * 1000 * 1000; // 15 seconds int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table) { @@ -71,6 +71,8 @@ int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table) memtable_mgr_ = static_cast(memtable_mgr_handle.get_memtable_mgr()); tx_ctx_table_ = tx_ctx_table; tablet_id_ = LS_TX_DATA_TABLET; + calc_upper_trans_is_disabled_ = false; + latest_transfer_scn_.reset(); is_inited_ = true; FLOG_INFO("tx data table init success", K(sizeof(ObTxData)), K(sizeof(ObTxDataLinkNode)), KPC(this)); @@ -182,6 +184,8 @@ void ObTxDataTable::reset() calc_upper_info_.reset(); calc_upper_trans_version_cache_.reset(); memtables_cache_.reuse(); + calc_upper_trans_is_disabled_ = false; + latest_transfer_scn_.reset(); is_started_ = false; is_inited_ = false; } @@ -205,7 +209,7 @@ int ObTxDataTable::offline() STORAGE_LOG(WARN, "clean memtables cache failed", KR(ret), KPC(this)); } else { is_started_ = false; - calc_upper_info_.reset(); + disable_upper_trans_calculation(); calc_upper_trans_version_cache_.reset(); } return ret; @@ -230,6 +234,8 @@ int ObTxDataTable::online() } else { // load tx data table succeed is_started_ = true; + calc_upper_trans_is_disabled_ = false; + latest_transfer_scn_.reset(); } return ret; @@ -609,6 +615,7 @@ int ObTxDataTable::check_need_update_memtables_cache_(bool &need_update) // cache already up to date, skip update need_update = false; } + return ret; } @@ -821,6 +828,8 @@ int ObTxDataTable::get_upper_trans_version_before_given_scn(const SCN sstable_en if (IS_NOT_INIT) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "The tx data table is not inited.", KR(ret)); + } else if (ATOMIC_LOAD(&calc_upper_trans_is_disabled_)) { + skip_calc = true; } else if (true == (skip_calc = skip_this_sstable_end_scn_(sstable_end_scn))) { // there is a start_scn of running transactions is smaller than the sstable_end_scn } else { @@ -1024,11 +1033,14 @@ int ObTxDataTable::check_min_start_in_ctx_(const SCN &sstable_end_scn, { SpinRLockGuard lock_guard(calc_upper_info_.lock_); if (calc_upper_info_.min_start_scn_in_ctx_ <= sstable_end_scn || + (latest_transfer_scn_.is_valid() && + calc_upper_info_.keep_alive_scn_ < latest_transfer_scn_) || calc_upper_info_.keep_alive_scn_ >= max_decided_scn) { need_skip = true; } - if (cur_ts - calc_upper_info_.update_ts_ > 30_s && max_decided_scn > calc_upper_info_.keep_alive_scn_) { + if (cur_ts - calc_upper_info_.update_ts_ > ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL && + max_decided_scn > calc_upper_info_.keep_alive_scn_) { need_update_info = true; } } @@ -1298,6 +1310,27 @@ int ObTxDataTable::get_start_tx_scn(SCN &start_tx_scn) return ret; } +void ObTxDataTable::disable_upper_trans_calculation() +{ + ATOMIC_STORE(&calc_upper_trans_is_disabled_, true); + calc_upper_trans_version_cache_.reset(); + SpinWLockGuard lock_guard(calc_upper_info_.lock_); + calc_upper_info_.reset(); +} + +void ObTxDataTable::enable_upper_trans_calculation(const share::SCN latest_transfer_scn) +{ + calc_upper_trans_version_cache_.reset(); + if (latest_transfer_scn_.is_valid()) { + latest_transfer_scn_ = SCN::max(latest_transfer_scn, latest_transfer_scn_); + } else { + latest_transfer_scn_ = latest_transfer_scn; + } + SpinWLockGuard lock_guard(calc_upper_info_.lock_); + calc_upper_info_.reset(); + ATOMIC_STORE(&calc_upper_trans_is_disabled_, false); +} + int ObTxDataTable::dump_single_tx_data_2_text(const int64_t tx_id_int, FILE *fd) { int ret = OB_SUCCESS; diff --git a/src/storage/tx_table/ob_tx_data_table.h b/src/storage/tx_table/ob_tx_data_table.h index 54cdf23d75..e5c75911c7 100644 --- a/src/storage/tx_table/ob_tx_data_table.h +++ b/src/storage/tx_table/ob_tx_data_table.h @@ -122,6 +122,8 @@ public: // ObTxDataTable ObTxDataTable() : is_inited_(false), is_started_(false), + calc_upper_trans_is_disabled_(false), + latest_transfer_scn_(), ls_id_(), tablet_id_(0), arena_allocator_(), @@ -244,6 +246,8 @@ public: // getter and setter TxDataReadSchema &get_read_schema() { return read_schema_; }; share::ObLSID get_ls_id(); + void disable_upper_trans_calculation(); + void enable_upper_trans_calculation(const share::SCN latest_transfer_scn); private: virtual ObTxDataMemtableMgr *get_memtable_mgr_() { return memtable_mgr_; } @@ -326,6 +330,8 @@ private: static const int64_t LS_TX_DATA_SCHEMA_COLUMN_CNT = 5; bool is_inited_; bool is_started_; + bool calc_upper_trans_is_disabled_; + share::SCN latest_transfer_scn_; share::ObLSID ls_id_; ObTabletID tablet_id_; // Allocator to allocate ObTxData and ObUndoStatus