[FIX] Using start_log_ts in keepalive log to check upper_trans_version calc

This commit is contained in:
ZenoWang
2022-11-17 08:40:28 +00:00
committed by wangzelin.wzl
parent f9b661406a
commit 48533588be
13 changed files with 186 additions and 87 deletions

View File

@ -607,6 +607,7 @@ public:
// ObDataCheckpoint interface:
DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int);
DELEGATE_WITH_RET(keep_alive_ls_handler_, get_min_start_scn, void);
// update tablet table store here do not using Macro because need lock ls and tablet
// update table store for tablet

View File

@ -453,6 +453,23 @@ int ObLSTxService::flush(int64_t rec_log_ts)
return ret;
}
int ObLSTxService::flush_ls_inner_tablet(const ObTabletID &tablet_id)
{
int ret = OB_SUCCESS;
if (!tablet_id.is_ls_inner_tablet()) {
TRANS_LOG(INFO, "not a ls inner tablet", KR(ret), K(tablet_id));
} else {
for (int i = 1; i < ObCommonCheckpointType::MAX_BASE_TYPE; i++) {
if (OB_NOT_NULL(common_checkpoints_[i]) && common_checkpoints_[i]->get_tablet_id() == tablet_id &&
OB_FAIL(common_checkpoints_[i]->flush(INT64_MAX, true))) {
TRANS_LOG(WARN, "obCommonCheckpoint flush failed", KR(ret), KP(common_checkpoints_[i]));
break;
}
}
}
return ret;
}
int ObLSTxService::get_common_checkpoint_info(
ObIArray<ObCommonCheckpointVTInfo> &common_checkpoint_array)
{

View File

@ -136,6 +136,7 @@ public:
int64_t get_rec_log_ts();
int flush(int64_t rec_log_ts);
int flush_ls_inner_tablet(const ObTabletID &tablet_id);
int get_common_checkpoint_info(
ObIArray<checkpoint::ObCommonCheckpointVTInfo> &common_checkpoint_array);

View File

@ -144,7 +144,7 @@ int ObKeepAliveLSHandler::on_success()
SpinWLockGuard guard(lock_);
durable_keep_alive_info_ = tmp_keep_alive_info_;
durable_keep_alive_info_.replace(tmp_keep_alive_info_);
stat_info_.stat_keepalive_info_ = durable_keep_alive_info_;
ATOMIC_STORE(&is_busy_,false);
@ -178,10 +178,11 @@ int ObKeepAliveLSHandler::replay(const void *buffer,
TRANS_LOG(WARN, "[Keep Alive] deserialize log body error", K(ret), K(nbytes), K(pos));
} else {
SpinWLockGuard guard(lock_);
durable_keep_alive_info_.log_ts_ = ts_ns;
durable_keep_alive_info_.lsn_ = lsn;
durable_keep_alive_info_.min_start_scn_ = log_body.get_min_start_scn();
durable_keep_alive_info_.min_start_status_ = log_body.get_min_start_status();
tmp_keep_alive_info_.log_ts_ = ts_ns;
tmp_keep_alive_info_.lsn_ = lsn;
tmp_keep_alive_info_.min_start_scn_ = log_body.get_min_start_scn();
tmp_keep_alive_info_.min_start_status_ = log_body.get_min_start_status();
durable_keep_alive_info_.replace(tmp_keep_alive_info_);
stat_info_.stat_keepalive_info_ = durable_keep_alive_info_;
}

View File

@ -80,6 +80,18 @@ struct KeepAliveLsInfo
min_start_status_ = MinStartScnStatus::UNKOWN;
}
void replace(KeepAliveLsInfo info)
{
log_ts_ = info.log_ts_;
lsn_ = info.lsn_;
if (info.min_start_status_ == MinStartScnStatus::NO_CTX
|| info.min_start_status_ == MinStartScnStatus::HAS_CTX) {
min_start_scn_ = info.min_start_scn_;
min_start_status_ = info.min_start_status_;
}
}
TO_STRING_KV(K(log_ts_), K(lsn_), K(min_start_scn_), K(min_start_status_));
};

View File

@ -1007,8 +1007,13 @@ int ObLSTxCtxMgr::check_scheduler_status(int64_t &min_start_scn, MinStartScnStat
if (OB_FAIL(ls_tx_ctx_map_.for_each(functor))) {
TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this);
} else {
min_start_scn = functor.get_min_start_scn();
status = functor.get_min_start_status();
if (0 == ls_tx_ctx_map_.count()) {
min_start_scn = OB_INVALID_TIMESTAMP;
status = MinStartScnStatus::NO_CTX;
} else {
min_start_scn = functor.get_min_start_scn();
status = functor.get_min_start_status();
}
}
return ret;

View File

@ -166,7 +166,7 @@ int ObTxLoopWorker::scan_all_ls_(bool can_tx_gc, bool can_gc_retain_ctx)
do_tx_gc_(cur_ls_ptr, min_start_scn, status);
}
if(MinStartScnStatus::UNKOWN != status) {
if(MinStartScnStatus::UNKOWN == status) {
// do nothing
} else if (OB_TMP_FAIL(cur_ls_ptr->get_log_handler()->get_role(role, proposal_id))) {
TRANS_LOG(WARN, "get role failed", K(tmp_ret), K(cur_ls_ptr->get_ls_id()));

View File

@ -343,12 +343,6 @@ int ObTxCtxTable::check_with_tx_data(const transaction::ObTransID tx_id, ObITxDa
return ret;
}
int ObTxCtxTable::get_min_start_log_ts(int64_t &min_start_log_ts)
{
int ret = get_ls_tx_ctx_mgr()->get_min_start_log_ts(min_start_log_ts);
return ret;
}
int ObTxCtxTable::dump_single_tx_data_2_text(const int64_t tx_id_int, FILE *fd)
{
int ret = OB_SUCCESS;

View File

@ -113,8 +113,6 @@ public:
int check_with_tx_data(const transaction::ObTransID tx_id, ObITxDataCheckFunctor &fn);
int get_min_start_log_ts(int64_t &min_start_log_ts);
int dump_single_tx_data_2_text(const int64_t tx_id_int, FILE *fd);
public:

View File

@ -176,13 +176,13 @@ void ObTxDataTable::stop()
void ObTxDataTable::reset()
{
min_start_log_ts_in_ctx_ = 0;
last_update_min_start_log_ts_ = 0;
tablet_id_ = 0;
ls_ = nullptr;
ls_tablet_svr_ = nullptr;
memtable_mgr_ = nullptr;
tx_ctx_table_ = nullptr;
calc_upper_info_.reset();
calc_upper_trans_version_cache_.reset();
memtables_cache_.reuse();
slice_allocator_.purge_extra_cached_block(0);
is_started_ = false;
@ -205,8 +205,7 @@ int ObTxDataTable::offline()
} else if (OB_FAIL(clean_memtables_cache_())) {
STORAGE_LOG(WARN, "clean memtables cache failed", KR(ret), KPC(this));
} else {
min_start_log_ts_in_ctx_ = 0;
last_update_min_start_log_ts_ = 0;
calc_upper_info_.reset();
calc_upper_trans_version_cache_.reset();
}
return ret;
@ -854,6 +853,7 @@ int ObTxDataTable::get_upper_trans_version_before_given_log_ts(const int64_t sst
K(upper_trans_version));
}
}
return ret;
}
@ -978,52 +978,108 @@ int ObTxDataTable::DEBUG_calc_with_row_iter_(ObStoreRowIterator *row_iter,
return ret;
}
// This function is implemented for two reasons : accuracy and performance of upper_trans_version
// calculation.
//
// 1. Accuracy :
// If there are some running transactions with start_log_ts which is less than or equal to
// sstable_end_log_ts, we skip this upper_trans_version calculation because it is currently
// undetermined.
//
// 2. Performance :
// If there are some commited transactions with start_log_ts which is less than or equal to
// sstable_end_log_ts stil in tx data memtable, we skip this upper_trans_version calculation because
// calculating upper_trans_version in tx data memtable is very slow.
bool ObTxDataTable::skip_this_sstable_end_log_ts_(int64_t sstable_end_log_ts)
bool ObTxDataTable::skip_this_sstable_end_log_ts_(const int64_t sstable_end_log_ts)
{
int ret = OB_SUCCESS;
bool need_skip = false;
int64_t cur_ts = common::ObTimeUtility::fast_current_time();
int64_t tmp_update_ts = ATOMIC_LOAD(&last_update_min_start_log_ts_);
int64_t min_start_ts_in_tx_data_memtable = INT64_MAX;
int64_t max_decided_log_ts = 0;
// make sure the max decided log ts is greater than sstable_end_log_ts
if (OB_FAIL(ls_->get_max_decided_log_ts_ns(max_decided_log_ts))) {
STORAGE_LOG(WARN, "get max decided log ts failed", KR(ret), "ls_id", get_ls_id().id());
} else if (max_decided_log_ts < sstable_end_log_ts) {
need_skip = true;
STORAGE_LOG(INFO, "skip calc upper trans version once", K(max_decided_log_ts), K(sstable_end_log_ts));
STORAGE_LOG(WARN, "get max decided log ts failed", KR(ret), "ls_id", get_ls_id().id());
}
// If the min_start_log_ts_in_ctx has not been updated for more than 30 seconds,
// check if the min_start_log_ts_in_ctx is larger than sstable_end_log_ts
if (need_skip) {
} else if (cur_ts - tmp_update_ts > 30_s
&& tmp_update_ts == ATOMIC_CAS(&last_update_min_start_log_ts_, tmp_update_ts, cur_ts)) {
// update last_update_min_start_log_ts
if (OB_FAIL(tx_ctx_table_->get_min_start_log_ts(min_start_log_ts_in_ctx_))) {
STORAGE_LOG(WARN, "get min start log ts from tx ctx table failed.", KR(ret));
} else if (OB_FAIL(check_min_start_in_ctx_(sstable_end_log_ts, max_decided_log_ts, need_skip))) {
need_skip = true;
STORAGE_LOG(WARN, "check min start in ctx failed", KR(ret), KP(this), K(sstable_end_log_ts));
}
if (need_skip) {
} else if (OB_FAIL(check_min_start_in_tx_data_(sstable_end_log_ts, min_start_ts_in_tx_data_memtable, need_skip))) {
need_skip = true;
STORAGE_LOG(WARN, "check min start in tx data failed", KR(ret), KP(this), K(sstable_end_log_ts));
}
if (!need_skip) {
STORAGE_LOG(INFO,
"do calculate upper trans version.",
K(need_skip),
K(sstable_end_log_ts),
K(max_decided_log_ts),
K(calc_upper_info_),
K(min_start_ts_in_tx_data_memtable));
}
return need_skip;
}
int ObTxDataTable::check_min_start_in_ctx_(const int64_t sstable_end_log_ts,
const int64_t max_decided_log_ts,
bool &need_skip)
{
int ret = OB_SUCCESS;
bool need_update_info = false;
int64_t cur_ts = common::ObTimeUtility::fast_current_time();
{
SpinRLockGuard lock_guard(calc_upper_info_.lock_);
if (calc_upper_info_.min_start_scn_in_ctx_ <= sstable_end_log_ts ||
calc_upper_info_.keep_alive_scn_ >= max_decided_log_ts) {
need_skip = true;
}
if (cur_ts - calc_upper_info_.update_ts_ > 30_s && max_decided_log_ts > calc_upper_info_.keep_alive_scn_) {
need_update_info = true;
}
}
if (need_skip) {
} else if (OB_FAIL(ret)) {
need_skip = true;
} else if (min_start_log_ts_in_ctx_ <= sstable_end_log_ts) {
// skip this sstable end log ts calculation
need_skip = true;
} else if (OB_FAIL(update_memtables_cache())) {
if (need_update_info) {
update_calc_upper_info_(max_decided_log_ts);
}
return ret;
}
void ObTxDataTable::update_calc_upper_info_(const int64_t max_decided_log_ts)
{
int64_t cur_ts = common::ObTimeUtility::fast_current_time();
SpinWLockGuard lock_guard(calc_upper_info_.lock_);
// recheck update condition and do update calc_upper_info
if (cur_ts - calc_upper_info_.update_ts_ > 30_s && max_decided_log_ts > calc_upper_info_.keep_alive_scn_) {
int64_t min_start_scn = 0;
int64_t keep_alive_scn = 0;
MinStartScnStatus status;
ls_->get_min_start_scn(min_start_scn, keep_alive_scn, status);
switch (status) {
case MinStartScnStatus::UNKOWN:
// do nothing
break;
case MinStartScnStatus::NO_CTX:
// use the last keep_alive_scn as min_start_scn
calc_upper_info_.min_start_scn_in_ctx_ = calc_upper_info_.keep_alive_scn_;
calc_upper_info_.keep_alive_scn_ = keep_alive_scn;
calc_upper_info_.update_ts_ = cur_ts;
break;
case MinStartScnStatus::HAS_CTX:
calc_upper_info_.min_start_scn_in_ctx_ = min_start_scn;
calc_upper_info_.keep_alive_scn_ = keep_alive_scn;
calc_upper_info_.update_ts_ = cur_ts;
break;
default:
break;
}
}
}
int ObTxDataTable::check_min_start_in_tx_data_(const int64_t sstable_end_log_ts,
int64_t &min_start_ts_in_tx_data_memtable,
bool &need_skip)
{
int ret = OB_SUCCESS;
if (OB_FAIL(update_memtables_cache())) {
STORAGE_LOG(WARN, "update memtables fail.", KR(ret));
// something wrong happend, skip calculation
need_skip = true;
@ -1035,15 +1091,17 @@ bool ObTxDataTable::skip_this_sstable_end_log_ts_(int64_t sstable_end_log_ts)
tx_data_memtable = nullptr;
if (OB_FAIL(memtable_handles.at(i).get_tx_data_memtable(tx_data_memtable))) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "get tx data memtable from table handles fail.", KR(ret), KP(this),
K(tablet_id_), K(memtable_handles.at(i)));
STORAGE_LOG(ERROR,
"get tx data memtable from table handles fail.",
KR(ret),
KP(this),
K(tablet_id_),
K(memtable_handles.at(i)));
} else if (OB_ISNULL(tx_data_memtable)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "tx data memtable is nullptr.", KR(ret), KP(this), K(tablet_id_),
K(memtable_handles.at(i)));
} else if (FALSE_IT(min_start_ts_in_tx_data_memtable
= std::min(min_start_ts_in_tx_data_memtable,
tx_data_memtable->get_min_start_log_ts()))) {
STORAGE_LOG(ERROR, "tx data memtable is nullptr.", KR(ret), KP(this), K(tablet_id_), K(memtable_handles.at(i)));
} else if (FALSE_IT(min_start_ts_in_tx_data_memtable =
std::min(min_start_ts_in_tx_data_memtable, tx_data_memtable->get_min_start_log_ts()))) {
} else if (sstable_end_log_ts >= min_start_ts_in_tx_data_memtable) {
// there is a min_start_log_ts in tx_data_memtable less than sstable_end_log_ts, skip this
// calculation
@ -1051,17 +1109,9 @@ bool ObTxDataTable::skip_this_sstable_end_log_ts_(int64_t sstable_end_log_ts)
break;
}
}
if (!need_skip) {
STORAGE_LOG(INFO, "check skip upper trans version calculation finish.", K(need_skip),
K(sstable_end_log_ts), K(min_start_log_ts_in_ctx_),
K(min_start_ts_in_tx_data_memtable));
}
}
return need_skip;
return ret;
}
int ObTxDataTable::update_cache_if_needed_(bool &skip_calc)
@ -1072,19 +1122,21 @@ int ObTxDataTable::update_cache_if_needed_(bool &skip_calc)
if (OB_FAIL(ls_tablet_svr_->get_tablet(LS_TX_DATA_TABLET, tablet_handle))) {
STORAGE_LOG(WARN, "get tablet from ls tablet service failed.", KR(ret));
} else {
ObITable *table
= tablet_handle.get_obj()->get_table_store().get_minor_sstables().get_boundary_table(
true /*is_last*/);
ObITable *table =
tablet_handle.get_obj()->get_table_store().get_minor_sstables().get_boundary_table(true /*is_last*/);
if (nullptr == table) {
skip_calc = true;
} else if (!calc_upper_trans_version_cache_.is_inited_
|| calc_upper_trans_version_cache_.cache_version_ < table->get_end_log_ts()) {
} else if (!calc_upper_trans_version_cache_.is_inited_ ||
calc_upper_trans_version_cache_.cache_version_ < table->get_end_log_ts()) {
ret = update_calc_upper_trans_version_cache_(table);
} else if (calc_upper_trans_version_cache_.cache_version_ > table->get_end_log_ts()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "the end log ts of the latest sstable is unexpected smaller", KR(ret),
KPC(tablet_handle.get_obj()), KPC(table));
STORAGE_LOG(ERROR,
"the end log ts of the latest sstable is unexpected smaller",
KR(ret),
KPC(tablet_handle.get_obj()),
KPC(table));
}
}
@ -1119,8 +1171,7 @@ int ObTxDataTable::update_calc_upper_trans_version_cache_(ObITable *table)
return ret;
}
int ObTxDataTable::calc_upper_trans_version_(const int64_t sstable_end_log_ts,
int64_t &upper_trans_version)
int ObTxDataTable::calc_upper_trans_version_(const int64_t sstable_end_log_ts, int64_t &upper_trans_version)
{
int ret = OB_SUCCESS;
@ -1210,7 +1261,7 @@ int ObTxDataTable::get_start_tx_scn(int64_t &start_tx_scn)
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "tablet is nullptr.", KR(ret), KP(this), K(tablet_id_));
} else if (OB_ISNULL(oldest_minor_sstable =
(ObSSTable *)tablet->get_table_store().get_minor_sstables().get_boundary_table(false /*is_last*/))) {
(ObSSTable *)tablet->get_table_store().get_minor_sstables().get_boundary_table(false /*is_last*/))) {
start_tx_scn = INT64_MAX;
STORAGE_LOG(INFO, "this logstream do not have tx data sstable", K(start_tx_scn), K(get_ls_id()), KPC(tablet));
} else if (FALSE_IT(start_tx_scn = oldest_minor_sstable->get_filled_tx_scn())) {

View File

@ -76,6 +76,25 @@ public:
}
};
struct CalcUpperInfo
{
CalcUpperInfo() : min_start_scn_in_ctx_(0), keep_alive_scn_(0), update_ts_(0) {}
void reset()
{
min_start_scn_in_ctx_ = 0;
keep_alive_scn_ = 0;
update_ts_ = 0;
}
int64_t min_start_scn_in_ctx_;
int64_t keep_alive_scn_;
int64_t update_ts_;
common::SpinRWLock lock_;
TO_STRING_KV(K(min_start_scn_in_ctx_), K(keep_alive_scn_), K(update_ts_));
};
using SliceAllocator = ObSliceAlloc;
static const int64_t TX_DATA_MAX_CONCURRENCY = 32;
@ -104,8 +123,6 @@ public: // ObTxDataTable
ObTxDataTable()
: is_inited_(false),
is_started_(false),
min_start_log_ts_in_ctx_(0),
last_update_min_start_log_ts_(0),
tablet_id_(0),
mem_attr_(),
slice_allocator_(),
@ -115,6 +132,8 @@ public: // ObTxDataTable
memtable_mgr_(nullptr),
tx_ctx_table_(nullptr),
read_schema_(),
calc_upper_info_(),
calc_upper_trans_version_cache_(),
memtables_cache_() {}
~ObTxDataTable() {}
@ -215,9 +234,8 @@ public: // ObTxDataTable
TO_STRING_KV(KP(this),
K_(is_inited),
K_(is_started),
K_(min_start_log_ts_in_ctx),
K_(last_update_min_start_log_ts),
K_(tablet_id),
K_(calc_upper_info),
KP_(ls),
KP_(ls_tablet_svr),
KP_(memtable_mgr),
@ -285,7 +303,11 @@ private:
int DEBUG_calc_with_row_iter_(ObStoreRowIterator *row_iter,
const int64_t sstable_end_log_ts,
int64_t &tmp_upper_trans_version);
bool skip_this_sstable_end_log_ts_(int64_t sstable_end_log_ts);
bool skip_this_sstable_end_log_ts_(const int64_t sstable_end_log_ts);
int check_min_start_in_ctx_(const int64_t sstable_end_log_ts, const int64_t max_decided_log_ts, bool &need_skip);
int check_min_start_in_tx_data_(const int64_t sstable_end_log_ts,
int64_t &min_start_ts_in_tx_data_memtable,
bool &need_skip);
void print_alloc_size_for_test_();
@ -293,7 +315,7 @@ private:
void free_undo_status_list_(ObUndoStatusNode *node_ptr);
void clean_sstable_cache_task_(int64_t cache_keeped_time);
void update_calc_upper_info_(const int64_t max_decided_log_ts);
void TEST_print_alloc_size_()
{
@ -317,8 +339,6 @@ private:
static const int64_t LS_TX_DATA_SCHEMA_COLUMN_CNT = 5;
bool is_inited_;
bool is_started_;
int64_t min_start_log_ts_in_ctx_;
int64_t last_update_min_start_log_ts_;
ObTabletID tablet_id_;
ObMemAttr mem_attr_;
// Allocator to allocate ObTxData and ObUndoStatus
@ -331,6 +351,7 @@ private:
ObTxDataMemtableMgr *memtable_mgr_;
ObTxCtxTable *tx_ctx_table_;
TxDataReadSchema read_schema_;
CalcUpperInfo calc_upper_info_;
CalcUpperTransVersionCache calc_upper_trans_version_cache_;
MemtableHandlesCache memtables_cache_;
}; // tx_table

View File

@ -300,7 +300,6 @@ private:
const int64_t read_epoch,
const bool need_log_error,
int &ret);
private:
static const int64_t LS_TX_CTX_SCHEMA_VERSION = 0;
static const int64_t LS_TX_CTX_SCHEMA_ROWKEY_CNT = 1;

View File

@ -513,7 +513,6 @@ int ObTxDataMemtableScanIterator::periodical_get_next_commit_version_(ObCommitVe
node.start_log_ts_ = tx_data->start_log_ts_;
// use cur_max_commit_version_ to keep the commit versions monotonically increasing
node.commit_version_ = cur_max_commit_version_;
// STORAGE_LOG(INFO, "GENGLI ", K(iter_cnt), K(PERIODICAL_SELECT_INTERVAL_NS), K(node));
tx_data = nullptr;
} else if (nullptr == cur_node_) {
ret = OB_ITER_END;