From 478d5d29de9593bddff604eec8b625b712657a8d Mon Sep 17 00:00:00 2001 From: ZenoWang Date: Tue, 27 Aug 2024 16:23:19 +0000 Subject: [PATCH] fix test_transfer_tx_data --- .../test_transfer_with_smaller_tx_data.cpp | 11 +++++--- src/storage/ls/ob_ls.h | 1 + src/storage/ls/ob_ls_transfer_status.cpp | 3 ++- src/storage/tx/ob_keep_alive_ls_handler.cpp | 11 ++++++++ src/storage/tx/ob_keep_alive_ls_handler.h | 1 + src/storage/tx_table/ob_tx_data_table.cpp | 7 ------ src/storage/tx_table/ob_tx_data_table.h | 2 -- src/storage/tx_table/ob_tx_table.cpp | 25 ++++++++++++++++++- src/storage/tx_table/ob_tx_table.h | 5 ++++ 9 files changed, 51 insertions(+), 15 deletions(-) diff --git a/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp b/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp index b5b079ca1..707e4d3de 100644 --- a/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp +++ b/mittest/simple_server/test_transfer_with_smaller_tx_data.cpp @@ -30,6 +30,7 @@ namespace storage { int64_t ObTxTable::UPDATE_MIN_START_SCN_INTERVAL = 0; + int ObTransferHandler::wait_src_ls_advance_weak_read_ts_( const share::ObTransferTaskInfo &task_info, ObTimeoutCtx &timeout_ctx) @@ -82,7 +83,6 @@ public: WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000"); WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;"); WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;"); - WRITE_SQL_BY_CONN(connection, "alter system set partition_balance_schedule_interval = '10s';"); sleep(5); } @@ -164,12 +164,15 @@ public: MTL_SWITCH(tenant_id) { TRANS_LOG(INFO, "worker to do partition_balance"); auto b_svr = MTL(rootserver::ObTenantBalanceService*); + b_svr->stop(); b_svr->reset(); int64_t job_cnt = 0; int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP; ObBalanceJob job; if (OB_FAIL(b_svr->gather_stat_())) { TRANS_LOG(WARN, "failed to gather stat", KR(ret)); + } else if (OB_FAIL(b_svr->gather_ls_status_stat(tenant_id, b_svr->ls_array_))) { + TRANS_LOG(WARN, "failed to gather ls stat", KR(ret)); } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job( tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) { if (OB_ENTRY_NOT_EXIST == ret) { @@ -408,7 +411,7 @@ TEST_F(ObTransferWithSmallerStartSCN, smaller_start_scn) fprintf(stdout, "end update upper info the second time %lu\n", second_min_start_scn); TRANS_LOG(INFO, "end update upper info the second time"); - ASSERT_EQ(true, first_min_start_scn > second_min_start_scn); + ASSERT_GT(first_min_start_scn, second_min_start_scn); min_start_scn_in_tx_data.set_max(); fprintf(stdout, "start get min start in tx data table second time\n"); @@ -418,8 +421,8 @@ TEST_F(ObTransferWithSmallerStartSCN, smaller_start_scn) fprintf(stdout, "end get min start in tx data table second time, %lu\n", min_start_scn_in_tx_data.val_); TRANS_LOG(INFO, "end get min start in tx data table second time"); - ASSERT_EQ(true, first_min_start_scn > second_min_start_scn); - ASSERT_EQ(true, first_min_start_scn_in_tx_data > second_min_start_scn_in_tx_data); + ASSERT_GT(first_min_start_scn, second_min_start_scn); + ASSERT_GT(first_min_start_scn_in_tx_data, second_min_start_scn_in_tx_data); inject_tx_fault_helper.release(); th.join(); diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 3f93d1c64..403093325 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -926,6 +926,7 @@ public: // ObDataCheckpoint interface: DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int); DELEGATE_WITH_RET(keep_alive_ls_handler_, get_min_start_scn, void); + DELEGATE_WITH_RET(keep_alive_ls_handler_, clear_keep_alive_smaller_scn_info, void); // update tablet table store here do not using Macro because need lock ls and tablet // update table store for tablet diff --git a/src/storage/ls/ob_ls_transfer_status.cpp b/src/storage/ls/ob_ls_transfer_status.cpp index 95df82e60..e673bf1ae 100644 --- a/src/storage/ls/ob_ls_transfer_status.cpp +++ b/src/storage/ls/ob_ls_transfer_status.cpp @@ -245,7 +245,8 @@ int ObLSTransferStatus::enable_upper_trans_calculation_(const share::SCN op_scn) ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "tx data table in tx table is nullptr.", K(ret)); } else { - tx_table->enable_upper_trans_calculation(op_scn); + (void)ls_->clear_keep_alive_smaller_scn_info(); + (void)tx_table->enable_upper_trans_calculation(op_scn); TRANS_LOG(INFO, "enable upper trans calculation", KPC(ls_), K(guard), KPC(this)); } diff --git a/src/storage/tx/ob_keep_alive_ls_handler.cpp b/src/storage/tx/ob_keep_alive_ls_handler.cpp index f61c6c31c..b6ea4d199 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.cpp +++ b/src/storage/tx/ob_keep_alive_ls_handler.cpp @@ -14,6 +14,8 @@ #include "logservice/ob_log_handler.h" #include "storage/tx/ob_ts_mgr.h" +#define USING_LOG_PREFIX TRANS + namespace oceanbase { @@ -94,6 +96,14 @@ void ObKeepAliveLSHandler::reset() stat_info_.reset(); } +void ObKeepAliveLSHandler::clear_keep_alive_smaller_scn_info() +{ + SpinWLockGuard guard(lock_); + FLOG_INFO("[Keep Alive] clear keep alive ls info", K(ls_id_), K(tmp_keep_alive_info_), K(durable_keep_alive_info_)); + tmp_keep_alive_info_.reset(); + durable_keep_alive_info_.reset(); +} + int ObKeepAliveLSHandler::try_submit_log(const SCN &min_start_scn, MinStartScnStatus min_start_status) { int ret = OB_SUCCESS; @@ -150,6 +160,7 @@ int ObKeepAliveLSHandler::on_success() durable_keep_alive_info_.replace(tmp_keep_alive_info_); stat_info_.stat_keepalive_info_ = durable_keep_alive_info_; + ATOMIC_STORE(&is_busy_,false); return ret; diff --git a/src/storage/tx/ob_keep_alive_ls_handler.h b/src/storage/tx/ob_keep_alive_ls_handler.h index e9414b38f..bed743249 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.h +++ b/src/storage/tx/ob_keep_alive_ls_handler.h @@ -154,6 +154,7 @@ public: void reset(); int try_submit_log(const share::SCN &min_start_scn, MinStartScnStatus status); + void clear_keep_alive_smaller_scn_info(); void print_stat_info(); public: diff --git a/src/storage/tx_table/ob_tx_data_table.cpp b/src/storage/tx_table/ob_tx_data_table.cpp index 297c08431..eb017eeb5 100644 --- a/src/storage/tx_table/ob_tx_data_table.cpp +++ b/src/storage/tx_table/ob_tx_data_table.cpp @@ -69,7 +69,6 @@ int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table) memtable_mgr_ = static_cast(memtable_mgr_handle.get_memtable_mgr()); tx_ctx_table_ = tx_ctx_table; tablet_id_ = LS_TX_DATA_TABLET; - calc_upper_trans_is_disabled_ = false; latest_transfer_scn_.reset(); is_inited_ = true; @@ -181,7 +180,6 @@ void ObTxDataTable::reset() tx_ctx_table_ = nullptr; calc_upper_trans_version_cache_.reset(); memtables_cache_.reuse(); - calc_upper_trans_is_disabled_ = false; latest_transfer_scn_.reset(); is_started_ = false; is_inited_ = false; @@ -236,7 +234,6 @@ int ObTxDataTable::online() calc_upper_trans_version_cache_.reset(); } latest_transfer_scn_.reset(); - ATOMIC_STORE(&calc_upper_trans_is_disabled_, false); is_started_ = true; } @@ -774,8 +771,6 @@ int ObTxDataTable::get_upper_trans_version_before_given_scn(const SCN sstable_en if (IS_NOT_INIT) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "The tx data table is not inited.", KR(ret)); - } else if (ATOMIC_LOAD(&calc_upper_trans_is_disabled_)) { - skip_calc = true; } else if (true == (skip_calc = skip_this_sstable_end_scn_(sstable_end_scn))) { // there is a start_scn of running transactions is smaller than the sstable_end_scn } else { @@ -1234,7 +1229,6 @@ int ObTxDataTable::get_start_tx_scn(SCN &start_tx_scn) void ObTxDataTable::disable_upper_trans_calculation() { - ATOMIC_STORE(&calc_upper_trans_is_disabled_, true); { TCWLockGuard lock_guard(calc_upper_trans_version_cache_.lock_); calc_upper_trans_version_cache_.reset(); @@ -1252,7 +1246,6 @@ void ObTxDataTable::enable_upper_trans_calculation(const share::SCN latest_trans } else { latest_transfer_scn_ = latest_transfer_scn; } - ATOMIC_STORE(&calc_upper_trans_is_disabled_, false); } int ObTxDataTable::dump_single_tx_data_2_text(const int64_t tx_id_int, FILE *fd) diff --git a/src/storage/tx_table/ob_tx_data_table.h b/src/storage/tx_table/ob_tx_data_table.h index aa49f6e29..c7b097590 100644 --- a/src/storage/tx_table/ob_tx_data_table.h +++ b/src/storage/tx_table/ob_tx_data_table.h @@ -108,7 +108,6 @@ public: // ObTxDataTable ObTxDataTable() : is_inited_(false), is_started_(false), - calc_upper_trans_is_disabled_(false), latest_transfer_scn_(), ls_id_(), tablet_id_(0), @@ -316,7 +315,6 @@ private: static const int64_t LS_TX_DATA_SCHEMA_COLUMN_CNT = 5; bool is_inited_; bool is_started_; - bool calc_upper_trans_is_disabled_; share::SCN latest_transfer_scn_; share::ObLSID ls_id_; ObTabletID tablet_id_; diff --git a/src/storage/tx_table/ob_tx_table.cpp b/src/storage/tx_table/ob_tx_table.cpp index 1eb52de0c..d3b258877 100644 --- a/src/storage/tx_table/ob_tx_table.cpp +++ b/src/storage/tx_table/ob_tx_table.cpp @@ -66,6 +66,7 @@ int ObTxTable::init(ObLS *ls) read_tx_data_table_cnt_ = 0; recycle_scn_cache_.reset(); LOG_INFO("init tx table successfully", K(ret), K(ls->get_ls_id())); + calc_upper_trans_is_disabled_ = false; is_inited_ = true; } if (OB_FAIL(ret)) { @@ -182,6 +183,7 @@ int ObTxTable::online() recycle_scn_cache_.reset(); (void)reset_ctx_min_start_scn_info_(); ATOMIC_STORE(&state_, ObTxTable::ONLINE); + ATOMIC_STORE(&calc_upper_trans_is_disabled_, false); LOG_INFO("tx table online succeed", K(ls_id_), KPC(this)); } @@ -1000,6 +1002,12 @@ int ObTxTable::get_uncommitted_tx_min_start_scn(share::SCN &min_start_scn, share void ObTxTable::update_min_start_scn_info(const SCN &max_decided_scn) { + if (true == ATOMIC_LOAD(&calc_upper_trans_is_disabled_)) { + // quit updating if calculate upper trans versions disabled + STORAGE_LOG(INFO, "skip update min start scn", K(max_decided_scn), KPC(this)); + return; + } + int64_t cur_ts = ObClockGenerator::getClock(); SpinWLockGuard lock_guard(ctx_min_start_scn_info_.lock_); @@ -1037,23 +1045,38 @@ void ObTxTable::update_min_start_scn_info(const SCN &max_decided_scn) } } } + + STORAGE_LOG(INFO, "finish update min start scn", K(max_decided_scn), K(ctx_min_start_scn_info_)); } int ObTxTable::get_upper_trans_version_before_given_scn(const SCN sstable_end_scn, SCN &upper_trans_version) { - return tx_data_table_.get_upper_trans_version_before_given_scn(sstable_end_scn, upper_trans_version); + int ret = OB_SUCCESS; + if (ATOMIC_LOAD(&calc_upper_trans_is_disabled_)) { + // cannot calculate upper trans version right now + if (REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL)) { + STORAGE_LOG(INFO, "calc upper trans version is disabled", K(calc_upper_trans_is_disabled_), K(sstable_end_scn)); + } + } else { + ret = tx_data_table_.get_upper_trans_version_before_given_scn(sstable_end_scn, upper_trans_version); + } + return ret; } void ObTxTable::disable_upper_trans_calculation() { + ATOMIC_STORE(&calc_upper_trans_is_disabled_, true); (void)tx_data_table_.disable_upper_trans_calculation(); reset_ctx_min_start_scn_info_(); + FLOG_INFO("disable upper trans version calculation", KPC(this)); } void ObTxTable::enable_upper_trans_calculation(const share::SCN latest_transfer_scn) { reset_ctx_min_start_scn_info_(); (void)tx_data_table_.enable_upper_trans_calculation(latest_transfer_scn); + ATOMIC_STORE(&calc_upper_trans_is_disabled_, false); + FLOG_INFO("enable upper trans version calculation", KPC(this)); } int ObTxTable::get_start_tx_scn(SCN &start_tx_scn) diff --git a/src/storage/tx_table/ob_tx_table.h b/src/storage/tx_table/ob_tx_table.h index 6376f7b5b..f30756b58 100644 --- a/src/storage/tx_table/ob_tx_table.h +++ b/src/storage/tx_table/ob_tx_table.h @@ -102,6 +102,7 @@ public: public: ObTxTable() : is_inited_(false), + calc_upper_trans_is_disabled_(false), epoch_(INVALID_READ_EPOCH), state_(OFFLINE), ls_id_(), @@ -115,6 +116,7 @@ public: ObTxTable(ObTxDataTable &tx_data_table) : is_inited_(false), + calc_upper_trans_is_disabled_(false), epoch_(INVALID_READ_EPOCH), state_(OFFLINE), ls_id_(), @@ -310,7 +312,9 @@ public: void enable_upper_trans_calculation(const share::SCN latest_transfer_scn); TO_STRING_KV(KP(this), + K_(ls_id), K_(is_inited), + K_(calc_upper_trans_is_disabled), K_(epoch), "state", get_state_string(state_), KP_(ls), @@ -368,6 +372,7 @@ private: static const int64_t LS_TX_CTX_SCHEMA_ROWKEY_CNT = 1; static const int64_t LS_TX_CTX_SCHEMA_COLUMN_CNT = 3; bool is_inited_; + bool calc_upper_trans_is_disabled_; int64_t epoch_ CACHE_ALIGNED; TxTableState state_ CACHE_ALIGNED; share::ObLSID ls_id_;