From d2584bccf27e85eced7b90c5b680f1312eb44d94 Mon Sep 17 00:00:00 2001 From: ZenoWang Date: Mon, 3 Jul 2023 02:12:09 +0000 Subject: [PATCH] [FIX] only update min_start_scn and keep_alive log scn when log is NO_CTX or HAS_CTX --- mittest/simple_server/CMakeLists.txt | 1 + .../test_keep_alive_min_start_scn.cpp | 215 ++++++++++++++++++ .../simple_server/test_replay_from_middle.cpp | 55 +++-- src/storage/tx/ob_keep_alive_ls_handler.cpp | 9 +- src/storage/tx/ob_keep_alive_ls_handler.h | 17 +- src/storage/tx_table/ob_tx_data_table.cpp | 48 ++-- src/storage/tx_table/ob_tx_data_table.h | 10 + 7 files changed, 303 insertions(+), 52 deletions(-) create mode 100644 mittest/simple_server/test_keep_alive_min_start_scn.cpp diff --git a/mittest/simple_server/CMakeLists.txt b/mittest/simple_server/CMakeLists.txt index 9774f2e63..ed819e8b5 100644 --- a/mittest/simple_server/CMakeLists.txt +++ b/mittest/simple_server/CMakeLists.txt @@ -74,6 +74,7 @@ ob_unittest_observer(test_lock_table_with_tx test_lock_table_with_tx.cpp) ob_unittest_observer(test_ob_detect_manager_in_simple_server test_ob_detect_manager_in_simple_server.cpp) ob_unittest_observer(test_transfer_lock_info_operator storage_ha/test_transfer_lock_info_operator.cpp) ob_unittest_observer(test_mds_recover test_mds_recover.cpp) +ob_unittest_observer(test_keep_alive_min_start_scn test_keep_alive_min_start_scn.cpp) #ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp) #ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp) ob_ha_unittest_observer(test_transfer_start_stage_restart_without_mds_flush storage_ha/test_transfer_start_stage_restart_without_mds_flush.cpp) diff --git a/mittest/simple_server/test_keep_alive_min_start_scn.cpp b/mittest/simple_server/test_keep_alive_min_start_scn.cpp new file mode 100644 index 000000000..ab3ef9d28 --- /dev/null +++ b/mittest/simple_server/test_keep_alive_min_start_scn.cpp @@ -0,0 +1,215 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include +#define USING_LOG_PREFIX STORAGE +#define protected public +#define private public + +#include "env/ob_simple_cluster_test_base.h" +#include "env/ob_fast_bootstrap.h" +#include "lib/mysqlclient/ob_mysql_result.h" +#include "storage/tx/ob_tx_loop_worker.h" +#include "storage/tx_storage/ob_ls_map.h" +#include "storage/tx_storage/ob_ls_service.h" + +namespace oceanbase +{ + +namespace storage +{ +int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 0; +} + +namespace unittest +{ + +using namespace oceanbase::transaction; +using namespace oceanbase::storage; + + + +class TestRunCtx +{ +public: + uint64_t tenant_id_ = 0; + int time_sec_ = 0; +}; + +TestRunCtx RunCtx; + +class ObTestKeepAliveMinStartSCN : public ObSimpleClusterTestBase +{ +public: + ObTestKeepAliveMinStartSCN() : ObSimpleClusterTestBase("test_keep_alive_min_start_scn_") {} + + void test_min_start_scn(); + void loop_check_start_scn(SCN &prev_min_start_scn, SCN &prev_keep_alive_scn); + + ObLS *get_ls(const int64_t tenant_id, const ObLSID ls_id) + { + int ret = OB_SUCCESS; + ObLS *ls = nullptr; + MTL_SWITCH(tenant_id) + { + ObLSIterator *ls_iter = nullptr; + ObLSHandle ls_handle; + ObLSService *ls_svr = MTL(ObLSService *); + OB_ASSERT(OB_SUCCESS == ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); + OB_ASSERT(nullptr != (ls = ls_handle.get_ls())); + } + return ls; + } +}; + +#define WRITE_SQL_BY_CONN(conn, sql_str) \ + ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define WRITE_SQL_FMT_BY_CONN(conn, ...) \ + ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \ + ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows)); + +#define DEF_VAL_FOR_SQL \ + int ret = OB_SUCCESS; \ + ObSqlString sql; \ + int64_t affected_rows = 0; \ + sqlclient::ObISQLConnection *connection = nullptr; + +void ObTestKeepAliveMinStartSCN::loop_check_start_scn(SCN &prev_min_start_scn, SCN &prev_keep_alive_scn) +{ + int ret = OB_SUCCESS; + MTL_SWITCH(RunCtx.tenant_id_) + { + ObLS *ls = get_ls(RunCtx.tenant_id_, ObLSID(1001)); + ObTxDataTable *tx_data_table = ls->get_tx_table()->get_tx_data_table(); + + // 每100毫秒循环一次,对应tx loop worker的单次循环interval,循环200次,对应20秒 + // 因为tx loop worker会15秒遍历一次上下文,略大于遍历间隔 + int retry_times = 200; + while (--retry_times >= 0) { + // 每次循环都更新tx data table中的min_start_scn + tx_data_table->update_calc_upper_info_(SCN::max_scn()); + + // 判断min_start_scn的大小关系,若出错,打印到stdout + if (prev_min_start_scn > tx_data_table->calc_upper_info_.min_start_scn_in_ctx_) { + fprintf(stdout, + "Incorrect min_start_scn in tx data table, prev_min_start_scn = %s, current_min_start_scn = %s\n", + to_cstring(prev_min_start_scn), + to_cstring(tx_data_table->calc_upper_info_.min_start_scn_in_ctx_)); + } + ASSERT_LE(prev_min_start_scn, tx_data_table->calc_upper_info_.min_start_scn_in_ctx_); + prev_min_start_scn = tx_data_table->calc_upper_info_.min_start_scn_in_ctx_; + ASSERT_LE(prev_keep_alive_scn, tx_data_table->calc_upper_info_.keep_alive_scn_); + prev_keep_alive_scn = tx_data_table->calc_upper_info_.keep_alive_scn_; + + ::usleep(ObTxLoopWorker::LOOP_INTERVAL); + } + } +} + +void ObTestKeepAliveMinStartSCN::test_min_start_scn() +{ + DEF_VAL_FOR_SQL + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().get_sql_proxy2().acquire(connection)); + WRITE_SQL_BY_CONN(connection, "set ob_trx_timeout = 6000000000"); + WRITE_SQL_BY_CONN(connection, "set ob_trx_idle_timeout = 6000000000"); + + int64_t current_ts = ObTimeUtil::current_time_ns(); + fprintf(stdout, "current ts : %ld\n", current_ts); + + SCN prev_min_start_scn = SCN::min_scn(); + SCN prev_keep_alive_scn = SCN::min_scn(); + + // 执行循环检查前,先sleep一段时间,确保tx_loop_worker成功做一次遍历,keep_alive日志中会成功写下一次NO_CTX, + // 在错误场景下,后续的UNKNOW日志会会修改keep_alive_handler中的keep_alive_scn,但是不会修改status + ::sleep(15); + + // 循环检查并使用keep_alive_handler中的信息更新tx data table中的数据,在错误场景下,min_start_scn会被错误推大 + loop_check_start_scn(prev_min_start_scn, prev_keep_alive_scn); + + // create test table + WRITE_SQL_BY_CONN(connection, "create table if not exists test.test_keep_alive_min_start_scn_t (c1 int, c2 int)"); + + // insert data and trigger redo log write + WRITE_SQL_BY_CONN(connection, "begin"); + WRITE_SQL_BY_CONN(connection, "insert into test.test_keep_alive_min_start_scn_t values(1,1)"); + + // 由于系统租户设置过_private_buffer_size,所有写入都会立即产生CLOG,事务也拥有了start_scn + // 在错误场景下,由于之前min_start_scn已经被错误推大,此时会出现min_start_scn回退 + loop_check_start_scn(prev_min_start_scn, prev_keep_alive_scn); + + WRITE_SQL_BY_CONN(connection, "commit"); + WRITE_SQL_BY_CONN(connection, "alter system minor freeze"); + + // 在事务提交后再做一次检查,确保整个过程中不会有min_start_scn回退 + loop_check_start_scn(prev_min_start_scn, prev_keep_alive_scn); +} + +TEST_F(ObTestKeepAliveMinStartSCN, observer_start) +{ + DEF_VAL_FOR_SQL + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().get_sql_proxy().acquire(connection)); + WRITE_SQL_BY_CONN(connection, "alter system set _private_buffer_size = '1B'"); + SERVER_LOG(INFO, "observer_start succ"); +} + +TEST_F(ObTestKeepAliveMinStartSCN, add_tenant) +{ + ASSERT_EQ(OB_SUCCESS, create_tenant()); + ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_)); + ASSERT_NE(0, RunCtx.tenant_id_); + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); +} + +TEST_F(ObTestKeepAliveMinStartSCN, min_start_scn_test) +{ + test_min_start_scn(); +} + +TEST_F(ObTestKeepAliveMinStartSCN, end) +{ + if (RunCtx.time_sec_ > 0) { + ::sleep(RunCtx.time_sec_); + } +} + +} // end unittest +} // end oceanbase + + +int main(int argc, char **argv) +{ + int c = 0; + int time_sec = 0; + char *log_level = (char*)"INFO"; + while(EOF != (c = getopt(argc,argv,"t:l:"))) { + switch(c) { + case 't': + time_sec = atoi(optarg); + break; + case 'l': + log_level = optarg; + oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false; + break; + default: + break; + } + } + oceanbase::unittest::init_log_and_gtest(argc, argv); + OB_LOGGER.set_log_level(log_level); + + LOG_INFO("main>>>"); + oceanbase::unittest::RunCtx.time_sec_ = time_sec; + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/mittest/simple_server/test_replay_from_middle.cpp b/mittest/simple_server/test_replay_from_middle.cpp index 75b9fb06b..7be789775 100644 --- a/mittest/simple_server/test_replay_from_middle.cpp +++ b/mittest/simple_server/test_replay_from_middle.cpp @@ -418,10 +418,13 @@ TEST_F(ObReplayFromMiddleTest, observer_start) TEST_F(ObReplayFromMiddleTest, add_tenant) { // create tenant + DEF_VAL_FOR_SQL ASSERT_EQ(OB_SUCCESS, create_tenant()); ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_)); ASSERT_NE(0, RunCtx.tenant_id_); ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2()); + ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().get_sql_proxy2().acquire(connection)); + WRITE_SQL_BY_CONN(connection, "alter system set minor_compact_trigger = 16"); } TEST_F(ObReplayFromMiddleTest, basic_test) @@ -568,7 +571,9 @@ void ObReplayRestartTest::flush_tx_data(ObLS *ls) int64_t tail_after_flush = 0; ObTxDataMemtableMgr &mgr = ls->ls_tablet_svr_.tx_data_memtable_mgr_; ASSERT_EQ(OB_SUCCESS, mgr.get_memtable_range(head_before_flush, tail_before_flush)); - ASSERT_EQ(OB_SUCCESS, ls->ls_tx_svr_.flush_ls_inner_tablet(LS_TX_DATA_TABLET)); + int ret = ls->ls_tx_svr_.flush_ls_inner_tablet(LS_TX_DATA_TABLET); + ASSERT_TRUE(OB_SUCCESS == ret || OB_EAGAIN == ret); + while (--retry_times > 0) { ASSERT_EQ(OB_SUCCESS, mgr.get_memtable_range(head_after_flush, tail_after_flush)); if (head_after_flush > head_before_flush && tail_after_flush > tail_before_flush && @@ -580,7 +585,6 @@ void ObReplayRestartTest::flush_tx_data(ObLS *ls) } } ASSERT_GT(head_after_flush, head_before_flush); - ASSERT_GT(tail_after_flush, tail_before_flush); ASSERT_EQ(head_after_flush + 1, tail_after_flush); fprintf(stdout, "flush tx data done\n"); } @@ -638,31 +642,42 @@ namespace storage { void ObTxDataTable::update_calc_upper_info_(const SCN &max_decided_scn) { - UNUSED(max_decided_scn); int64_t cur_ts = common::ObTimeUtility::fast_current_time(); SpinWLockGuard lock_guard(calc_upper_info_.lock_); // recheck update condition and do update calc_upper_info + + /**********************************************************/ + //if (cur_ts - calc_upper_info_.update_ts_ > 30_s && max_decided_scn> calc_upper_info_.keep_alive_scn_) { + /**********************************************************/ + SCN min_start_scn = SCN::min_scn(); SCN keep_alive_scn = SCN::min_scn(); MinStartScnStatus status; ls_->get_min_start_scn(min_start_scn, keep_alive_scn, status); - switch (status) { - case MinStartScnStatus::UNKOWN: - // do nothing - break; - case MinStartScnStatus::NO_CTX: - // use the last keep_alive_scn as min_start_scn - calc_upper_info_.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_; - calc_upper_info_.keep_alive_scn_ = keep_alive_scn; - calc_upper_info_.update_ts_ = cur_ts; - break; - case MinStartScnStatus::HAS_CTX: - calc_upper_info_.min_start_scn_in_ctx_ = min_start_scn; - calc_upper_info_.keep_alive_scn_ = keep_alive_scn; - calc_upper_info_.update_ts_ = cur_ts; - break; - default: - break; + if (MinStartScnStatus::UNKOWN == status) { + // do nothing + } else { + int ret = OB_SUCCESS; + CalcUpperInfo tmp_calc_upper_info; + tmp_calc_upper_info.keep_alive_scn_ = keep_alive_scn; + tmp_calc_upper_info.update_ts_ = cur_ts; + if (MinStartScnStatus::NO_CTX == status) { + // use the previous keep_alive_scn as min_start_scn + tmp_calc_upper_info.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_; + } else if (MinStartScnStatus::HAS_CTX == status) { + tmp_calc_upper_info.min_start_scn_in_ctx_ = min_start_scn; + } else { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "invalid min start scn status", K(min_start_scn), K(keep_alive_scn), K(status)); + } + + if (OB_FAIL(ret)) { + } else if (tmp_calc_upper_info.min_start_scn_in_ctx_ < calc_upper_info_.min_start_scn_in_ctx_) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "invalid min start scn", K(tmp_calc_upper_info), K(calc_upper_info_)); + } else { + calc_upper_info_ = tmp_calc_upper_info; + } } } diff --git a/src/storage/tx/ob_keep_alive_ls_handler.cpp b/src/storage/tx/ob_keep_alive_ls_handler.cpp index d194a5057..2abe652a8 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.cpp +++ b/src/storage/tx/ob_keep_alive_ls_handler.cpp @@ -128,7 +128,7 @@ int ObKeepAliveLSHandler::try_submit_log(const SCN &min_start_scn, MinStartScnSt TRANS_LOG(WARN, "[Keep Alive] submit keep alive log failed", K(ret), K(ls_id_)); } else { stat_info_.submit_succ_cnt += 1; - tmp_keep_alive_info_.scn_ = scn; + tmp_keep_alive_info_.loop_job_succ_scn_ = scn; tmp_keep_alive_info_.lsn_ = lsn; tmp_keep_alive_info_.min_start_status_ = min_start_status; tmp_keep_alive_info_.min_start_scn_ = min_start_scn; @@ -143,7 +143,6 @@ int ObKeepAliveLSHandler::try_submit_log(const SCN &min_start_scn, MinStartScnSt int ObKeepAliveLSHandler::on_success() { int ret = OB_SUCCESS; - SpinWLockGuard guard(lock_); durable_keep_alive_info_.replace(tmp_keep_alive_info_); @@ -180,7 +179,7 @@ int ObKeepAliveLSHandler::replay(const void *buffer, TRANS_LOG(WARN, "[Keep Alive] deserialize log body error", K(ret), K(nbytes), K(pos)); } else { SpinWLockGuard guard(lock_); - tmp_keep_alive_info_.scn_ = scn; + tmp_keep_alive_info_.loop_job_succ_scn_ = scn; tmp_keep_alive_info_.lsn_ = lsn; tmp_keep_alive_info_.min_start_scn_ = log_body.get_min_start_scn(); tmp_keep_alive_info_.min_start_status_ = log_body.get_min_start_status(); @@ -205,7 +204,7 @@ void ObKeepAliveLSHandler::print_stat_info() "Near_To_GTS_Cnt", stat_info_.near_to_gts_cnt, "Other_Error_Cnt", stat_info_.other_error_cnt, "Submit_Succ_Cnt", stat_info_.submit_succ_cnt, - "last_scn", to_cstring(stat_info_.stat_keepalive_info_.scn_), + "last_scn", to_cstring(stat_info_.stat_keepalive_info_.loop_job_succ_scn_), "last_lsn", stat_info_.stat_keepalive_info_.lsn_, "last_gts", last_gts_, "min_start_scn", to_cstring(stat_info_.stat_keepalive_info_.min_start_scn_), @@ -220,7 +219,7 @@ void ObKeepAliveLSHandler::get_min_start_scn(SCN &min_start_scn, SpinRLockGuard guard(lock_); min_start_scn = durable_keep_alive_info_.min_start_scn_; - keep_alive_scn = durable_keep_alive_info_.scn_; + keep_alive_scn = durable_keep_alive_info_.loop_job_succ_scn_; status = durable_keep_alive_info_.min_start_status_; } diff --git a/src/storage/tx/ob_keep_alive_ls_handler.h b/src/storage/tx/ob_keep_alive_ls_handler.h index ca9d986df..72959481f 100644 --- a/src/storage/tx/ob_keep_alive_ls_handler.h +++ b/src/storage/tx/ob_keep_alive_ls_handler.h @@ -67,14 +67,14 @@ private: struct KeepAliveLsInfo { - share::SCN scn_; + share::SCN loop_job_succ_scn_; palf::LSN lsn_; share::SCN min_start_scn_; MinStartScnStatus min_start_status_; void reset() { - scn_.reset(); + loop_job_succ_scn_.reset(); lsn_.reset(); min_start_scn_.reset(); min_start_status_ = MinStartScnStatus::UNKOWN; @@ -82,17 +82,18 @@ struct KeepAliveLsInfo void replace(KeepAliveLsInfo info) { - scn_ = info.scn_; - lsn_ = info.lsn_; - - if (info.min_start_status_ == MinStartScnStatus::NO_CTX - || info.min_start_status_ == MinStartScnStatus::HAS_CTX) { + /* We only update in the NO_CTX and HAS_CTX states here because most of the KEEP_ALIVE logs are in the UNKNOW state. + * If we update this in UNKNOW state, TX_DATA_TABLE will retrieve UNKNOW state in most cases, thus causing a + * prolonged inability to calculate the upper_trans_version of TX_DATA_TABLE */ + if (info.min_start_status_ == MinStartScnStatus::NO_CTX || info.min_start_status_ == MinStartScnStatus::HAS_CTX) { min_start_scn_ = info.min_start_scn_; min_start_status_ = info.min_start_status_; + loop_job_succ_scn_ = info.loop_job_succ_scn_; + lsn_ = info.lsn_; } } - TO_STRING_KV(K(scn_), K(lsn_), K(min_start_scn_), K(min_start_status_)); + TO_STRING_KV(K(loop_job_succ_scn_), K(lsn_), K(min_start_scn_), K(min_start_status_)); }; class ObLSKeepAliveStatInfo diff --git a/src/storage/tx_table/ob_tx_data_table.cpp b/src/storage/tx_table/ob_tx_data_table.cpp index e2ddd5e42..4506870d9 100755 --- a/src/storage/tx_table/ob_tx_data_table.cpp +++ b/src/storage/tx_table/ob_tx_data_table.cpp @@ -43,7 +43,7 @@ namespace storage #define TX_DATA_MEM_LEAK_DEBUG_CODE #endif - +int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 30 * 1000 * 1000; // 30 seconds int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table) { @@ -1029,29 +1029,39 @@ void ObTxDataTable::update_calc_upper_info_(const SCN &max_decided_scn) { int64_t cur_ts = common::ObTimeUtility::fast_current_time(); SpinWLockGuard lock_guard(calc_upper_info_.lock_); + // recheck update condition and do update calc_upper_info - if (cur_ts - calc_upper_info_.update_ts_ > 30_s && max_decided_scn> calc_upper_info_.keep_alive_scn_) { + if (cur_ts - calc_upper_info_.update_ts_ > ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL && + max_decided_scn > calc_upper_info_.keep_alive_scn_) { SCN min_start_scn = SCN::min_scn(); SCN keep_alive_scn = SCN::min_scn(); MinStartScnStatus status; ls_->get_min_start_scn(min_start_scn, keep_alive_scn, status); - switch (status) { - case MinStartScnStatus::UNKOWN: - // do nothing - break; - case MinStartScnStatus::NO_CTX: - // use the last keep_alive_scn as min_start_scn - calc_upper_info_.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_; - calc_upper_info_.keep_alive_scn_ = keep_alive_scn; - calc_upper_info_.update_ts_ = cur_ts; - break; - case MinStartScnStatus::HAS_CTX: - calc_upper_info_.min_start_scn_in_ctx_ = min_start_scn; - calc_upper_info_.keep_alive_scn_ = keep_alive_scn; - calc_upper_info_.update_ts_ = cur_ts; - break; - default: - break; + + if (MinStartScnStatus::UNKOWN == status) { + // do nothing + } else { + int ret = OB_SUCCESS; + CalcUpperInfo tmp_calc_upper_info; + tmp_calc_upper_info.keep_alive_scn_ = keep_alive_scn; + tmp_calc_upper_info.update_ts_ = cur_ts; + if (MinStartScnStatus::NO_CTX == status) { + // use the previous keep_alive_scn as min_start_scn + tmp_calc_upper_info.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_; + } else if (MinStartScnStatus::HAS_CTX == status) { + tmp_calc_upper_info.min_start_scn_in_ctx_ = min_start_scn; + } else { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "invalid min start scn status", K(min_start_scn), K(keep_alive_scn), K(status)); + } + + if (OB_FAIL(ret)) { + } else if (tmp_calc_upper_info.min_start_scn_in_ctx_ < calc_upper_info_.min_start_scn_in_ctx_) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "invalid min start scn", K(tmp_calc_upper_info), K(calc_upper_info_)); + } else { + calc_upper_info_ = tmp_calc_upper_info; + } } } } diff --git a/src/storage/tx_table/ob_tx_data_table.h b/src/storage/tx_table/ob_tx_data_table.h index 1e4c64e22..3578956f0 100755 --- a/src/storage/tx_table/ob_tx_data_table.h +++ b/src/storage/tx_table/ob_tx_data_table.h @@ -91,6 +91,14 @@ public: update_ts_ = 0; } + CalcUpperInfo &operator= (const CalcUpperInfo &rhs) + { + min_start_scn_in_ctx_ = rhs.min_start_scn_in_ctx_; + keep_alive_scn_ = rhs.keep_alive_scn_; + update_ts_ = rhs.update_ts_; + return *this; + } + share::SCN min_start_scn_in_ctx_; share::SCN keep_alive_scn_; int64_t update_ts_; @@ -101,6 +109,8 @@ public: using SliceAllocator = ObSliceAlloc; + static int64_t UPDATE_CALC_UPPER_INFO_INTERVAL; + static const int64_t TX_DATA_MAX_CONCURRENCY = 32; // A tx data is 128 bytes, 128 * 262144 = 32MB static const int64_t SSTABLE_CACHE_MAX_RETAIN_CNT = 262144;