fix test_transfer_tx_data
This commit is contained in:
parent
c3c60dd20c
commit
478d5d29de
@ -30,6 +30,7 @@ namespace storage
|
||||
{
|
||||
int64_t ObTxTable::UPDATE_MIN_START_SCN_INTERVAL = 0;
|
||||
|
||||
|
||||
int ObTransferHandler::wait_src_ls_advance_weak_read_ts_(
|
||||
const share::ObTransferTaskInfo &task_info,
|
||||
ObTimeoutCtx &timeout_ctx)
|
||||
@ -82,7 +83,6 @@ public:
|
||||
WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;");
|
||||
WRITE_SQL_BY_CONN(connection, "alter system set partition_balance_schedule_interval = '10s';");
|
||||
sleep(5);
|
||||
}
|
||||
|
||||
@ -164,12 +164,15 @@ public:
|
||||
MTL_SWITCH(tenant_id) {
|
||||
TRANS_LOG(INFO, "worker to do partition_balance");
|
||||
auto b_svr = MTL(rootserver::ObTenantBalanceService*);
|
||||
b_svr->stop();
|
||||
b_svr->reset();
|
||||
int64_t job_cnt = 0;
|
||||
int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP;
|
||||
ObBalanceJob job;
|
||||
if (OB_FAIL(b_svr->gather_stat_())) {
|
||||
TRANS_LOG(WARN, "failed to gather stat", KR(ret));
|
||||
} else if (OB_FAIL(b_svr->gather_ls_status_stat(tenant_id, b_svr->ls_array_))) {
|
||||
TRANS_LOG(WARN, "failed to gather ls stat", KR(ret));
|
||||
} else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
|
||||
tenant_id, false, *GCTX.sql_proxy_, job, start_time, finish_time))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
@ -408,7 +411,7 @@ TEST_F(ObTransferWithSmallerStartSCN, smaller_start_scn)
|
||||
fprintf(stdout, "end update upper info the second time %lu\n", second_min_start_scn);
|
||||
TRANS_LOG(INFO, "end update upper info the second time");
|
||||
|
||||
ASSERT_EQ(true, first_min_start_scn > second_min_start_scn);
|
||||
ASSERT_GT(first_min_start_scn, second_min_start_scn);
|
||||
|
||||
min_start_scn_in_tx_data.set_max();
|
||||
fprintf(stdout, "start get min start in tx data table second time\n");
|
||||
@ -418,8 +421,8 @@ TEST_F(ObTransferWithSmallerStartSCN, smaller_start_scn)
|
||||
fprintf(stdout, "end get min start in tx data table second time, %lu\n", min_start_scn_in_tx_data.val_);
|
||||
TRANS_LOG(INFO, "end get min start in tx data table second time");
|
||||
|
||||
ASSERT_EQ(true, first_min_start_scn > second_min_start_scn);
|
||||
ASSERT_EQ(true, first_min_start_scn_in_tx_data > second_min_start_scn_in_tx_data);
|
||||
ASSERT_GT(first_min_start_scn, second_min_start_scn);
|
||||
ASSERT_GT(first_min_start_scn_in_tx_data, second_min_start_scn_in_tx_data);
|
||||
|
||||
inject_tx_fault_helper.release();
|
||||
th.join();
|
||||
|
@ -926,6 +926,7 @@ public:
|
||||
// ObDataCheckpoint interface:
|
||||
DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int);
|
||||
DELEGATE_WITH_RET(keep_alive_ls_handler_, get_min_start_scn, void);
|
||||
DELEGATE_WITH_RET(keep_alive_ls_handler_, clear_keep_alive_smaller_scn_info, void);
|
||||
|
||||
// update tablet table store here do not using Macro because need lock ls and tablet
|
||||
// update table store for tablet
|
||||
|
@ -245,7 +245,8 @@ int ObLSTransferStatus::enable_upper_trans_calculation_(const share::SCN op_scn)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "tx data table in tx table is nullptr.", K(ret));
|
||||
} else {
|
||||
tx_table->enable_upper_trans_calculation(op_scn);
|
||||
(void)ls_->clear_keep_alive_smaller_scn_info();
|
||||
(void)tx_table->enable_upper_trans_calculation(op_scn);
|
||||
TRANS_LOG(INFO, "enable upper trans calculation", KPC(ls_), K(guard), KPC(this));
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,8 @@
|
||||
#include "logservice/ob_log_handler.h"
|
||||
#include "storage/tx/ob_ts_mgr.h"
|
||||
|
||||
#define USING_LOG_PREFIX TRANS
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
|
||||
@ -94,6 +96,14 @@ void ObKeepAliveLSHandler::reset()
|
||||
stat_info_.reset();
|
||||
}
|
||||
|
||||
void ObKeepAliveLSHandler::clear_keep_alive_smaller_scn_info()
|
||||
{
|
||||
SpinWLockGuard guard(lock_);
|
||||
FLOG_INFO("[Keep Alive] clear keep alive ls info", K(ls_id_), K(tmp_keep_alive_info_), K(durable_keep_alive_info_));
|
||||
tmp_keep_alive_info_.reset();
|
||||
durable_keep_alive_info_.reset();
|
||||
}
|
||||
|
||||
int ObKeepAliveLSHandler::try_submit_log(const SCN &min_start_scn, MinStartScnStatus min_start_status)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -150,6 +160,7 @@ int ObKeepAliveLSHandler::on_success()
|
||||
durable_keep_alive_info_.replace(tmp_keep_alive_info_);
|
||||
stat_info_.stat_keepalive_info_ = durable_keep_alive_info_;
|
||||
|
||||
|
||||
ATOMIC_STORE(&is_busy_,false);
|
||||
|
||||
return ret;
|
||||
|
@ -154,6 +154,7 @@ public:
|
||||
void reset();
|
||||
|
||||
int try_submit_log(const share::SCN &min_start_scn, MinStartScnStatus status);
|
||||
void clear_keep_alive_smaller_scn_info();
|
||||
void print_stat_info();
|
||||
public:
|
||||
|
||||
|
@ -69,7 +69,6 @@ int ObTxDataTable::init(ObLS *ls, ObTxCtxTable *tx_ctx_table)
|
||||
memtable_mgr_ = static_cast<ObTxDataMemtableMgr *>(memtable_mgr_handle.get_memtable_mgr());
|
||||
tx_ctx_table_ = tx_ctx_table;
|
||||
tablet_id_ = LS_TX_DATA_TABLET;
|
||||
calc_upper_trans_is_disabled_ = false;
|
||||
latest_transfer_scn_.reset();
|
||||
|
||||
is_inited_ = true;
|
||||
@ -181,7 +180,6 @@ void ObTxDataTable::reset()
|
||||
tx_ctx_table_ = nullptr;
|
||||
calc_upper_trans_version_cache_.reset();
|
||||
memtables_cache_.reuse();
|
||||
calc_upper_trans_is_disabled_ = false;
|
||||
latest_transfer_scn_.reset();
|
||||
is_started_ = false;
|
||||
is_inited_ = false;
|
||||
@ -236,7 +234,6 @@ int ObTxDataTable::online()
|
||||
calc_upper_trans_version_cache_.reset();
|
||||
}
|
||||
latest_transfer_scn_.reset();
|
||||
ATOMIC_STORE(&calc_upper_trans_is_disabled_, false);
|
||||
is_started_ = true;
|
||||
}
|
||||
|
||||
@ -774,8 +771,6 @@ int ObTxDataTable::get_upper_trans_version_before_given_scn(const SCN sstable_en
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "The tx data table is not inited.", KR(ret));
|
||||
} else if (ATOMIC_LOAD(&calc_upper_trans_is_disabled_)) {
|
||||
skip_calc = true;
|
||||
} else if (true == (skip_calc = skip_this_sstable_end_scn_(sstable_end_scn))) {
|
||||
// there is a start_scn of running transactions is smaller than the sstable_end_scn
|
||||
} else {
|
||||
@ -1234,7 +1229,6 @@ int ObTxDataTable::get_start_tx_scn(SCN &start_tx_scn)
|
||||
|
||||
void ObTxDataTable::disable_upper_trans_calculation()
|
||||
{
|
||||
ATOMIC_STORE(&calc_upper_trans_is_disabled_, true);
|
||||
{
|
||||
TCWLockGuard lock_guard(calc_upper_trans_version_cache_.lock_);
|
||||
calc_upper_trans_version_cache_.reset();
|
||||
@ -1252,7 +1246,6 @@ void ObTxDataTable::enable_upper_trans_calculation(const share::SCN latest_trans
|
||||
} else {
|
||||
latest_transfer_scn_ = latest_transfer_scn;
|
||||
}
|
||||
ATOMIC_STORE(&calc_upper_trans_is_disabled_, false);
|
||||
}
|
||||
|
||||
int ObTxDataTable::dump_single_tx_data_2_text(const int64_t tx_id_int, FILE *fd)
|
||||
|
@ -108,7 +108,6 @@ public: // ObTxDataTable
|
||||
ObTxDataTable()
|
||||
: is_inited_(false),
|
||||
is_started_(false),
|
||||
calc_upper_trans_is_disabled_(false),
|
||||
latest_transfer_scn_(),
|
||||
ls_id_(),
|
||||
tablet_id_(0),
|
||||
@ -316,7 +315,6 @@ private:
|
||||
static const int64_t LS_TX_DATA_SCHEMA_COLUMN_CNT = 5;
|
||||
bool is_inited_;
|
||||
bool is_started_;
|
||||
bool calc_upper_trans_is_disabled_;
|
||||
share::SCN latest_transfer_scn_;
|
||||
share::ObLSID ls_id_;
|
||||
ObTabletID tablet_id_;
|
||||
|
@ -66,6 +66,7 @@ int ObTxTable::init(ObLS *ls)
|
||||
read_tx_data_table_cnt_ = 0;
|
||||
recycle_scn_cache_.reset();
|
||||
LOG_INFO("init tx table successfully", K(ret), K(ls->get_ls_id()));
|
||||
calc_upper_trans_is_disabled_ = false;
|
||||
is_inited_ = true;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
@ -182,6 +183,7 @@ int ObTxTable::online()
|
||||
recycle_scn_cache_.reset();
|
||||
(void)reset_ctx_min_start_scn_info_();
|
||||
ATOMIC_STORE(&state_, ObTxTable::ONLINE);
|
||||
ATOMIC_STORE(&calc_upper_trans_is_disabled_, false);
|
||||
LOG_INFO("tx table online succeed", K(ls_id_), KPC(this));
|
||||
}
|
||||
|
||||
@ -1000,6 +1002,12 @@ int ObTxTable::get_uncommitted_tx_min_start_scn(share::SCN &min_start_scn, share
|
||||
|
||||
void ObTxTable::update_min_start_scn_info(const SCN &max_decided_scn)
|
||||
{
|
||||
if (true == ATOMIC_LOAD(&calc_upper_trans_is_disabled_)) {
|
||||
// quit updating if calculate upper trans versions disabled
|
||||
STORAGE_LOG(INFO, "skip update min start scn", K(max_decided_scn), KPC(this));
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t cur_ts = ObClockGenerator::getClock();
|
||||
SpinWLockGuard lock_guard(ctx_min_start_scn_info_.lock_);
|
||||
|
||||
@ -1037,23 +1045,38 @@ void ObTxTable::update_min_start_scn_info(const SCN &max_decided_scn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
STORAGE_LOG(INFO, "finish update min start scn", K(max_decided_scn), K(ctx_min_start_scn_info_));
|
||||
}
|
||||
|
||||
int ObTxTable::get_upper_trans_version_before_given_scn(const SCN sstable_end_scn, SCN &upper_trans_version)
|
||||
{
|
||||
return tx_data_table_.get_upper_trans_version_before_given_scn(sstable_end_scn, upper_trans_version);
|
||||
int ret = OB_SUCCESS;
|
||||
if (ATOMIC_LOAD(&calc_upper_trans_is_disabled_)) {
|
||||
// cannot calculate upper trans version right now
|
||||
if (REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL)) {
|
||||
STORAGE_LOG(INFO, "calc upper trans version is disabled", K(calc_upper_trans_is_disabled_), K(sstable_end_scn));
|
||||
}
|
||||
} else {
|
||||
ret = tx_data_table_.get_upper_trans_version_before_given_scn(sstable_end_scn, upper_trans_version);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTxTable::disable_upper_trans_calculation()
|
||||
{
|
||||
ATOMIC_STORE(&calc_upper_trans_is_disabled_, true);
|
||||
(void)tx_data_table_.disable_upper_trans_calculation();
|
||||
reset_ctx_min_start_scn_info_();
|
||||
FLOG_INFO("disable upper trans version calculation", KPC(this));
|
||||
}
|
||||
|
||||
void ObTxTable::enable_upper_trans_calculation(const share::SCN latest_transfer_scn)
|
||||
{
|
||||
reset_ctx_min_start_scn_info_();
|
||||
(void)tx_data_table_.enable_upper_trans_calculation(latest_transfer_scn);
|
||||
ATOMIC_STORE(&calc_upper_trans_is_disabled_, false);
|
||||
FLOG_INFO("enable upper trans version calculation", KPC(this));
|
||||
}
|
||||
|
||||
int ObTxTable::get_start_tx_scn(SCN &start_tx_scn)
|
||||
|
@ -102,6 +102,7 @@ public:
|
||||
public:
|
||||
ObTxTable()
|
||||
: is_inited_(false),
|
||||
calc_upper_trans_is_disabled_(false),
|
||||
epoch_(INVALID_READ_EPOCH),
|
||||
state_(OFFLINE),
|
||||
ls_id_(),
|
||||
@ -115,6 +116,7 @@ public:
|
||||
|
||||
ObTxTable(ObTxDataTable &tx_data_table)
|
||||
: is_inited_(false),
|
||||
calc_upper_trans_is_disabled_(false),
|
||||
epoch_(INVALID_READ_EPOCH),
|
||||
state_(OFFLINE),
|
||||
ls_id_(),
|
||||
@ -310,7 +312,9 @@ public:
|
||||
void enable_upper_trans_calculation(const share::SCN latest_transfer_scn);
|
||||
|
||||
TO_STRING_KV(KP(this),
|
||||
K_(ls_id),
|
||||
K_(is_inited),
|
||||
K_(calc_upper_trans_is_disabled),
|
||||
K_(epoch),
|
||||
"state", get_state_string(state_),
|
||||
KP_(ls),
|
||||
@ -368,6 +372,7 @@ private:
|
||||
static const int64_t LS_TX_CTX_SCHEMA_ROWKEY_CNT = 1;
|
||||
static const int64_t LS_TX_CTX_SCHEMA_COLUMN_CNT = 3;
|
||||
bool is_inited_;
|
||||
bool calc_upper_trans_is_disabled_;
|
||||
int64_t epoch_ CACHE_ALIGNED;
|
||||
TxTableState state_ CACHE_ALIGNED;
|
||||
share::ObLSID ls_id_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user