[FIX] only update min_start_scn and keep_alive log scn when log is NO_CTX or HAS_CTX
This commit is contained in:
parent
fc374c7ce0
commit
d2584bccf2
@ -74,6 +74,7 @@ ob_unittest_observer(test_lock_table_with_tx test_lock_table_with_tx.cpp)
|
||||
ob_unittest_observer(test_ob_detect_manager_in_simple_server test_ob_detect_manager_in_simple_server.cpp)
|
||||
ob_unittest_observer(test_transfer_lock_info_operator storage_ha/test_transfer_lock_info_operator.cpp)
|
||||
ob_unittest_observer(test_mds_recover test_mds_recover.cpp)
|
||||
ob_unittest_observer(test_keep_alive_min_start_scn test_keep_alive_min_start_scn.cpp)
|
||||
#ob_ha_unittest_observer(test_transfer_handler storage_ha/test_transfer_handler.cpp)
|
||||
#ob_ha_unittest_observer(test_transfer_and_restart_basic storage_ha/test_transfer_and_restart_basic.cpp)
|
||||
ob_ha_unittest_observer(test_transfer_start_stage_restart_without_mds_flush storage_ha/test_transfer_start_stage_restart_without_mds_flush.cpp)
|
||||
|
215
mittest/simple_server/test_keep_alive_min_start_scn.cpp
Normal file
215
mittest/simple_server/test_keep_alive_min_start_scn.cpp
Normal file
@ -0,0 +1,215 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
#define protected public
|
||||
#define private public
|
||||
|
||||
#include "env/ob_simple_cluster_test_base.h"
|
||||
#include "env/ob_fast_bootstrap.h"
|
||||
#include "lib/mysqlclient/ob_mysql_result.h"
|
||||
#include "storage/tx/ob_tx_loop_worker.h"
|
||||
#include "storage/tx_storage/ob_ls_map.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
|
||||
namespace storage
|
||||
{
|
||||
int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 0;
|
||||
}
|
||||
|
||||
namespace unittest
|
||||
{
|
||||
|
||||
using namespace oceanbase::transaction;
|
||||
using namespace oceanbase::storage;
|
||||
|
||||
|
||||
|
||||
class TestRunCtx
|
||||
{
|
||||
public:
|
||||
uint64_t tenant_id_ = 0;
|
||||
int time_sec_ = 0;
|
||||
};
|
||||
|
||||
TestRunCtx RunCtx;
|
||||
|
||||
class ObTestKeepAliveMinStartSCN : public ObSimpleClusterTestBase
|
||||
{
|
||||
public:
|
||||
ObTestKeepAliveMinStartSCN() : ObSimpleClusterTestBase("test_keep_alive_min_start_scn_") {}
|
||||
|
||||
void test_min_start_scn();
|
||||
void loop_check_start_scn(SCN &prev_min_start_scn, SCN &prev_keep_alive_scn);
|
||||
|
||||
ObLS *get_ls(const int64_t tenant_id, const ObLSID ls_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLS *ls = nullptr;
|
||||
MTL_SWITCH(tenant_id)
|
||||
{
|
||||
ObLSIterator *ls_iter = nullptr;
|
||||
ObLSHandle ls_handle;
|
||||
ObLSService *ls_svr = MTL(ObLSService *);
|
||||
OB_ASSERT(OB_SUCCESS == ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
|
||||
OB_ASSERT(nullptr != (ls = ls_handle.get_ls()));
|
||||
}
|
||||
return ls;
|
||||
}
|
||||
};
|
||||
|
||||
#define WRITE_SQL_BY_CONN(conn, sql_str) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign(sql_str)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
||||
|
||||
#define WRITE_SQL_FMT_BY_CONN(conn, ...) \
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt(__VA_ARGS__)); \
|
||||
ASSERT_EQ(OB_SUCCESS, conn->execute_write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
||||
|
||||
#define DEF_VAL_FOR_SQL \
|
||||
int ret = OB_SUCCESS; \
|
||||
ObSqlString sql; \
|
||||
int64_t affected_rows = 0; \
|
||||
sqlclient::ObISQLConnection *connection = nullptr;
|
||||
|
||||
void ObTestKeepAliveMinStartSCN::loop_check_start_scn(SCN &prev_min_start_scn, SCN &prev_keep_alive_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
MTL_SWITCH(RunCtx.tenant_id_)
|
||||
{
|
||||
ObLS *ls = get_ls(RunCtx.tenant_id_, ObLSID(1001));
|
||||
ObTxDataTable *tx_data_table = ls->get_tx_table()->get_tx_data_table();
|
||||
|
||||
// 每100毫秒循环一次,对应tx loop worker的单次循环interval,循环200次,对应20秒
|
||||
// 因为tx loop worker会15秒遍历一次上下文,略大于遍历间隔
|
||||
int retry_times = 200;
|
||||
while (--retry_times >= 0) {
|
||||
// 每次循环都更新tx data table中的min_start_scn
|
||||
tx_data_table->update_calc_upper_info_(SCN::max_scn());
|
||||
|
||||
// 判断min_start_scn的大小关系,若出错,打印到stdout
|
||||
if (prev_min_start_scn > tx_data_table->calc_upper_info_.min_start_scn_in_ctx_) {
|
||||
fprintf(stdout,
|
||||
"Incorrect min_start_scn in tx data table, prev_min_start_scn = %s, current_min_start_scn = %s\n",
|
||||
to_cstring(prev_min_start_scn),
|
||||
to_cstring(tx_data_table->calc_upper_info_.min_start_scn_in_ctx_));
|
||||
}
|
||||
ASSERT_LE(prev_min_start_scn, tx_data_table->calc_upper_info_.min_start_scn_in_ctx_);
|
||||
prev_min_start_scn = tx_data_table->calc_upper_info_.min_start_scn_in_ctx_;
|
||||
ASSERT_LE(prev_keep_alive_scn, tx_data_table->calc_upper_info_.keep_alive_scn_);
|
||||
prev_keep_alive_scn = tx_data_table->calc_upper_info_.keep_alive_scn_;
|
||||
|
||||
::usleep(ObTxLoopWorker::LOOP_INTERVAL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ObTestKeepAliveMinStartSCN::test_min_start_scn()
|
||||
{
|
||||
DEF_VAL_FOR_SQL
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().get_sql_proxy2().acquire(connection));
|
||||
WRITE_SQL_BY_CONN(connection, "set ob_trx_timeout = 6000000000");
|
||||
WRITE_SQL_BY_CONN(connection, "set ob_trx_idle_timeout = 6000000000");
|
||||
|
||||
int64_t current_ts = ObTimeUtil::current_time_ns();
|
||||
fprintf(stdout, "current ts : %ld\n", current_ts);
|
||||
|
||||
SCN prev_min_start_scn = SCN::min_scn();
|
||||
SCN prev_keep_alive_scn = SCN::min_scn();
|
||||
|
||||
// 执行循环检查前,先sleep一段时间,确保tx_loop_worker成功做一次遍历,keep_alive日志中会成功写下一次NO_CTX,
|
||||
// 在错误场景下,后续的UNKNOW日志会会修改keep_alive_handler中的keep_alive_scn,但是不会修改status
|
||||
::sleep(15);
|
||||
|
||||
// 循环检查并使用keep_alive_handler中的信息更新tx data table中的数据,在错误场景下,min_start_scn会被错误推大
|
||||
loop_check_start_scn(prev_min_start_scn, prev_keep_alive_scn);
|
||||
|
||||
// create test table
|
||||
WRITE_SQL_BY_CONN(connection, "create table if not exists test.test_keep_alive_min_start_scn_t (c1 int, c2 int)");
|
||||
|
||||
// insert data and trigger redo log write
|
||||
WRITE_SQL_BY_CONN(connection, "begin");
|
||||
WRITE_SQL_BY_CONN(connection, "insert into test.test_keep_alive_min_start_scn_t values(1,1)");
|
||||
|
||||
// 由于系统租户设置过_private_buffer_size,所有写入都会立即产生CLOG,事务也拥有了start_scn
|
||||
// 在错误场景下,由于之前min_start_scn已经被错误推大,此时会出现min_start_scn回退
|
||||
loop_check_start_scn(prev_min_start_scn, prev_keep_alive_scn);
|
||||
|
||||
WRITE_SQL_BY_CONN(connection, "commit");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system minor freeze");
|
||||
|
||||
// 在事务提交后再做一次检查,确保整个过程中不会有min_start_scn回退
|
||||
loop_check_start_scn(prev_min_start_scn, prev_keep_alive_scn);
|
||||
}
|
||||
|
||||
TEST_F(ObTestKeepAliveMinStartSCN, observer_start)
|
||||
{
|
||||
DEF_VAL_FOR_SQL
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().get_sql_proxy().acquire(connection));
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set _private_buffer_size = '1B'");
|
||||
SERVER_LOG(INFO, "observer_start succ");
|
||||
}
|
||||
|
||||
TEST_F(ObTestKeepAliveMinStartSCN, add_tenant)
|
||||
{
|
||||
ASSERT_EQ(OB_SUCCESS, create_tenant());
|
||||
ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_));
|
||||
ASSERT_NE(0, RunCtx.tenant_id_);
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
}
|
||||
|
||||
TEST_F(ObTestKeepAliveMinStartSCN, min_start_scn_test)
|
||||
{
|
||||
test_min_start_scn();
|
||||
}
|
||||
|
||||
TEST_F(ObTestKeepAliveMinStartSCN, end)
|
||||
{
|
||||
if (RunCtx.time_sec_ > 0) {
|
||||
::sleep(RunCtx.time_sec_);
|
||||
}
|
||||
}
|
||||
|
||||
} // end unittest
|
||||
} // end oceanbase
|
||||
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
int c = 0;
|
||||
int time_sec = 0;
|
||||
char *log_level = (char*)"INFO";
|
||||
while(EOF != (c = getopt(argc,argv,"t:l:"))) {
|
||||
switch(c) {
|
||||
case 't':
|
||||
time_sec = atoi(optarg);
|
||||
break;
|
||||
case 'l':
|
||||
log_level = optarg;
|
||||
oceanbase::unittest::ObSimpleClusterTestBase::enable_env_warn_log_ = false;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
oceanbase::unittest::init_log_and_gtest(argc, argv);
|
||||
OB_LOGGER.set_log_level(log_level);
|
||||
|
||||
LOG_INFO("main>>>");
|
||||
oceanbase::unittest::RunCtx.time_sec_ = time_sec;
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
@ -418,10 +418,13 @@ TEST_F(ObReplayFromMiddleTest, observer_start)
|
||||
TEST_F(ObReplayFromMiddleTest, add_tenant)
|
||||
{
|
||||
// create tenant
|
||||
DEF_VAL_FOR_SQL
|
||||
ASSERT_EQ(OB_SUCCESS, create_tenant());
|
||||
ASSERT_EQ(OB_SUCCESS, get_tenant_id(RunCtx.tenant_id_));
|
||||
ASSERT_NE(0, RunCtx.tenant_id_);
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().init_sql_proxy2());
|
||||
ASSERT_EQ(OB_SUCCESS, get_curr_simple_server().get_sql_proxy2().acquire(connection));
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set minor_compact_trigger = 16");
|
||||
}
|
||||
|
||||
TEST_F(ObReplayFromMiddleTest, basic_test)
|
||||
@ -568,7 +571,9 @@ void ObReplayRestartTest::flush_tx_data(ObLS *ls)
|
||||
int64_t tail_after_flush = 0;
|
||||
ObTxDataMemtableMgr &mgr = ls->ls_tablet_svr_.tx_data_memtable_mgr_;
|
||||
ASSERT_EQ(OB_SUCCESS, mgr.get_memtable_range(head_before_flush, tail_before_flush));
|
||||
ASSERT_EQ(OB_SUCCESS, ls->ls_tx_svr_.flush_ls_inner_tablet(LS_TX_DATA_TABLET));
|
||||
int ret = ls->ls_tx_svr_.flush_ls_inner_tablet(LS_TX_DATA_TABLET);
|
||||
ASSERT_TRUE(OB_SUCCESS == ret || OB_EAGAIN == ret);
|
||||
|
||||
while (--retry_times > 0) {
|
||||
ASSERT_EQ(OB_SUCCESS, mgr.get_memtable_range(head_after_flush, tail_after_flush));
|
||||
if (head_after_flush > head_before_flush && tail_after_flush > tail_before_flush &&
|
||||
@ -580,7 +585,6 @@ void ObReplayRestartTest::flush_tx_data(ObLS *ls)
|
||||
}
|
||||
}
|
||||
ASSERT_GT(head_after_flush, head_before_flush);
|
||||
ASSERT_GT(tail_after_flush, tail_before_flush);
|
||||
ASSERT_EQ(head_after_flush + 1, tail_after_flush);
|
||||
fprintf(stdout, "flush tx data done\n");
|
||||
}
|
||||
@ -638,31 +642,42 @@ namespace storage {
|
||||
|
||||
void ObTxDataTable::update_calc_upper_info_(const SCN &max_decided_scn)
|
||||
{
|
||||
UNUSED(max_decided_scn);
|
||||
int64_t cur_ts = common::ObTimeUtility::fast_current_time();
|
||||
SpinWLockGuard lock_guard(calc_upper_info_.lock_);
|
||||
// recheck update condition and do update calc_upper_info
|
||||
|
||||
/**********************************************************/
|
||||
//if (cur_ts - calc_upper_info_.update_ts_ > 30_s && max_decided_scn> calc_upper_info_.keep_alive_scn_) {
|
||||
/**********************************************************/
|
||||
|
||||
SCN min_start_scn = SCN::min_scn();
|
||||
SCN keep_alive_scn = SCN::min_scn();
|
||||
MinStartScnStatus status;
|
||||
ls_->get_min_start_scn(min_start_scn, keep_alive_scn, status);
|
||||
switch (status) {
|
||||
case MinStartScnStatus::UNKOWN:
|
||||
// do nothing
|
||||
break;
|
||||
case MinStartScnStatus::NO_CTX:
|
||||
// use the last keep_alive_scn as min_start_scn
|
||||
calc_upper_info_.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_;
|
||||
calc_upper_info_.keep_alive_scn_ = keep_alive_scn;
|
||||
calc_upper_info_.update_ts_ = cur_ts;
|
||||
break;
|
||||
case MinStartScnStatus::HAS_CTX:
|
||||
calc_upper_info_.min_start_scn_in_ctx_ = min_start_scn;
|
||||
calc_upper_info_.keep_alive_scn_ = keep_alive_scn;
|
||||
calc_upper_info_.update_ts_ = cur_ts;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
if (MinStartScnStatus::UNKOWN == status) {
|
||||
// do nothing
|
||||
} else {
|
||||
int ret = OB_SUCCESS;
|
||||
CalcUpperInfo tmp_calc_upper_info;
|
||||
tmp_calc_upper_info.keep_alive_scn_ = keep_alive_scn;
|
||||
tmp_calc_upper_info.update_ts_ = cur_ts;
|
||||
if (MinStartScnStatus::NO_CTX == status) {
|
||||
// use the previous keep_alive_scn as min_start_scn
|
||||
tmp_calc_upper_info.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_;
|
||||
} else if (MinStartScnStatus::HAS_CTX == status) {
|
||||
tmp_calc_upper_info.min_start_scn_in_ctx_ = min_start_scn;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid min start scn status", K(min_start_scn), K(keep_alive_scn), K(status));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (tmp_calc_upper_info.min_start_scn_in_ctx_ < calc_upper_info_.min_start_scn_in_ctx_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid min start scn", K(tmp_calc_upper_info), K(calc_upper_info_));
|
||||
} else {
|
||||
calc_upper_info_ = tmp_calc_upper_info;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,7 +128,7 @@ int ObKeepAliveLSHandler::try_submit_log(const SCN &min_start_scn, MinStartScnSt
|
||||
TRANS_LOG(WARN, "[Keep Alive] submit keep alive log failed", K(ret), K(ls_id_));
|
||||
} else {
|
||||
stat_info_.submit_succ_cnt += 1;
|
||||
tmp_keep_alive_info_.scn_ = scn;
|
||||
tmp_keep_alive_info_.loop_job_succ_scn_ = scn;
|
||||
tmp_keep_alive_info_.lsn_ = lsn;
|
||||
tmp_keep_alive_info_.min_start_status_ = min_start_status;
|
||||
tmp_keep_alive_info_.min_start_scn_ = min_start_scn;
|
||||
@ -143,7 +143,6 @@ int ObKeepAliveLSHandler::try_submit_log(const SCN &min_start_scn, MinStartScnSt
|
||||
int ObKeepAliveLSHandler::on_success()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
SpinWLockGuard guard(lock_);
|
||||
|
||||
durable_keep_alive_info_.replace(tmp_keep_alive_info_);
|
||||
@ -180,7 +179,7 @@ int ObKeepAliveLSHandler::replay(const void *buffer,
|
||||
TRANS_LOG(WARN, "[Keep Alive] deserialize log body error", K(ret), K(nbytes), K(pos));
|
||||
} else {
|
||||
SpinWLockGuard guard(lock_);
|
||||
tmp_keep_alive_info_.scn_ = scn;
|
||||
tmp_keep_alive_info_.loop_job_succ_scn_ = scn;
|
||||
tmp_keep_alive_info_.lsn_ = lsn;
|
||||
tmp_keep_alive_info_.min_start_scn_ = log_body.get_min_start_scn();
|
||||
tmp_keep_alive_info_.min_start_status_ = log_body.get_min_start_status();
|
||||
@ -205,7 +204,7 @@ void ObKeepAliveLSHandler::print_stat_info()
|
||||
"Near_To_GTS_Cnt", stat_info_.near_to_gts_cnt,
|
||||
"Other_Error_Cnt", stat_info_.other_error_cnt,
|
||||
"Submit_Succ_Cnt", stat_info_.submit_succ_cnt,
|
||||
"last_scn", to_cstring(stat_info_.stat_keepalive_info_.scn_),
|
||||
"last_scn", to_cstring(stat_info_.stat_keepalive_info_.loop_job_succ_scn_),
|
||||
"last_lsn", stat_info_.stat_keepalive_info_.lsn_,
|
||||
"last_gts", last_gts_,
|
||||
"min_start_scn", to_cstring(stat_info_.stat_keepalive_info_.min_start_scn_),
|
||||
@ -220,7 +219,7 @@ void ObKeepAliveLSHandler::get_min_start_scn(SCN &min_start_scn,
|
||||
SpinRLockGuard guard(lock_);
|
||||
|
||||
min_start_scn = durable_keep_alive_info_.min_start_scn_;
|
||||
keep_alive_scn = durable_keep_alive_info_.scn_;
|
||||
keep_alive_scn = durable_keep_alive_info_.loop_job_succ_scn_;
|
||||
status = durable_keep_alive_info_.min_start_status_;
|
||||
}
|
||||
|
||||
|
@ -67,14 +67,14 @@ private:
|
||||
|
||||
struct KeepAliveLsInfo
|
||||
{
|
||||
share::SCN scn_;
|
||||
share::SCN loop_job_succ_scn_;
|
||||
palf::LSN lsn_;
|
||||
share::SCN min_start_scn_;
|
||||
MinStartScnStatus min_start_status_;
|
||||
|
||||
void reset()
|
||||
{
|
||||
scn_.reset();
|
||||
loop_job_succ_scn_.reset();
|
||||
lsn_.reset();
|
||||
min_start_scn_.reset();
|
||||
min_start_status_ = MinStartScnStatus::UNKOWN;
|
||||
@ -82,17 +82,18 @@ struct KeepAliveLsInfo
|
||||
|
||||
void replace(KeepAliveLsInfo info)
|
||||
{
|
||||
scn_ = info.scn_;
|
||||
lsn_ = info.lsn_;
|
||||
|
||||
if (info.min_start_status_ == MinStartScnStatus::NO_CTX
|
||||
|| info.min_start_status_ == MinStartScnStatus::HAS_CTX) {
|
||||
/* We only update in the NO_CTX and HAS_CTX states here because most of the KEEP_ALIVE logs are in the UNKNOW state.
|
||||
* If we update this in UNKNOW state, TX_DATA_TABLE will retrieve UNKNOW state in most cases, thus causing a
|
||||
* prolonged inability to calculate the upper_trans_version of TX_DATA_TABLE */
|
||||
if (info.min_start_status_ == MinStartScnStatus::NO_CTX || info.min_start_status_ == MinStartScnStatus::HAS_CTX) {
|
||||
min_start_scn_ = info.min_start_scn_;
|
||||
min_start_status_ = info.min_start_status_;
|
||||
loop_job_succ_scn_ = info.loop_job_succ_scn_;
|
||||
lsn_ = info.lsn_;
|
||||
}
|
||||
}
|
||||
|
||||
TO_STRING_KV(K(scn_), K(lsn_), K(min_start_scn_), K(min_start_status_));
|
||||
TO_STRING_KV(K(loop_job_succ_scn_), K(lsn_), K(min_start_scn_), K(min_start_status_));
|
||||
};
|
||||
|
||||
class ObLSKeepAliveStatInfo
|
||||
|
@ -43,7 +43,7 @@ namespace storage
|
||||
#define TX_DATA_MEM_LEAK_DEBUG_CODE
|
||||
#endif
|
||||
|
||||
|
||||
int64_t ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL = 30 * 1000 * 1000; // 30 seconds
|
||||
|
||||
int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table)
|
||||
{
|
||||
@ -1029,29 +1029,39 @@ void ObTxDataTable::update_calc_upper_info_(const SCN &max_decided_scn)
|
||||
{
|
||||
int64_t cur_ts = common::ObTimeUtility::fast_current_time();
|
||||
SpinWLockGuard lock_guard(calc_upper_info_.lock_);
|
||||
|
||||
// recheck update condition and do update calc_upper_info
|
||||
if (cur_ts - calc_upper_info_.update_ts_ > 30_s && max_decided_scn> calc_upper_info_.keep_alive_scn_) {
|
||||
if (cur_ts - calc_upper_info_.update_ts_ > ObTxDataTable::UPDATE_CALC_UPPER_INFO_INTERVAL &&
|
||||
max_decided_scn > calc_upper_info_.keep_alive_scn_) {
|
||||
SCN min_start_scn = SCN::min_scn();
|
||||
SCN keep_alive_scn = SCN::min_scn();
|
||||
MinStartScnStatus status;
|
||||
ls_->get_min_start_scn(min_start_scn, keep_alive_scn, status);
|
||||
switch (status) {
|
||||
case MinStartScnStatus::UNKOWN:
|
||||
// do nothing
|
||||
break;
|
||||
case MinStartScnStatus::NO_CTX:
|
||||
// use the last keep_alive_scn as min_start_scn
|
||||
calc_upper_info_.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_;
|
||||
calc_upper_info_.keep_alive_scn_ = keep_alive_scn;
|
||||
calc_upper_info_.update_ts_ = cur_ts;
|
||||
break;
|
||||
case MinStartScnStatus::HAS_CTX:
|
||||
calc_upper_info_.min_start_scn_in_ctx_ = min_start_scn;
|
||||
calc_upper_info_.keep_alive_scn_ = keep_alive_scn;
|
||||
calc_upper_info_.update_ts_ = cur_ts;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
||||
if (MinStartScnStatus::UNKOWN == status) {
|
||||
// do nothing
|
||||
} else {
|
||||
int ret = OB_SUCCESS;
|
||||
CalcUpperInfo tmp_calc_upper_info;
|
||||
tmp_calc_upper_info.keep_alive_scn_ = keep_alive_scn;
|
||||
tmp_calc_upper_info.update_ts_ = cur_ts;
|
||||
if (MinStartScnStatus::NO_CTX == status) {
|
||||
// use the previous keep_alive_scn as min_start_scn
|
||||
tmp_calc_upper_info.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_;
|
||||
} else if (MinStartScnStatus::HAS_CTX == status) {
|
||||
tmp_calc_upper_info.min_start_scn_in_ctx_ = min_start_scn;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid min start scn status", K(min_start_scn), K(keep_alive_scn), K(status));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (tmp_calc_upper_info.min_start_scn_in_ctx_ < calc_upper_info_.min_start_scn_in_ctx_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "invalid min start scn", K(tmp_calc_upper_info), K(calc_upper_info_));
|
||||
} else {
|
||||
calc_upper_info_ = tmp_calc_upper_info;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -91,6 +91,14 @@ public:
|
||||
update_ts_ = 0;
|
||||
}
|
||||
|
||||
CalcUpperInfo &operator= (const CalcUpperInfo &rhs)
|
||||
{
|
||||
min_start_scn_in_ctx_ = rhs.min_start_scn_in_ctx_;
|
||||
keep_alive_scn_ = rhs.keep_alive_scn_;
|
||||
update_ts_ = rhs.update_ts_;
|
||||
return *this;
|
||||
}
|
||||
|
||||
share::SCN min_start_scn_in_ctx_;
|
||||
share::SCN keep_alive_scn_;
|
||||
int64_t update_ts_;
|
||||
@ -101,6 +109,8 @@ public:
|
||||
|
||||
using SliceAllocator = ObSliceAlloc;
|
||||
|
||||
static int64_t UPDATE_CALC_UPPER_INFO_INTERVAL;
|
||||
|
||||
static const int64_t TX_DATA_MAX_CONCURRENCY = 32;
|
||||
// A tx data is 128 bytes, 128 * 262144 = 32MB
|
||||
static const int64_t SSTABLE_CACHE_MAX_RETAIN_CNT = 262144;
|
||||
|
Loading…
x
Reference in New Issue
Block a user