fix and optimize rec scn

This commit is contained in:
Handora 2024-09-18 08:18:54 +00:00 committed by ob-robot
parent f00f696510
commit 69e9a4e0de
3 changed files with 69 additions and 53 deletions

View File

@ -531,8 +531,11 @@ int ObPartTransCtx::trans_clear_()
// What's more, we need not to care about the retain tx_ctx, because it has
// already meet the durability requirement and is just used for multi-source
// data.
share::SCN rec_log_ts = get_rec_log_ts_() == share::SCN::max_scn() ?
ctx_tx_data_.get_end_log_ts() : get_rec_log_ts_();
if (is_ctx_table_merged_
&& OB_FAIL(ls_tx_ctx_mgr_->update_aggre_log_ts_wo_lock(get_rec_log_ts_()))) {
&& OB_FAIL(ls_tx_ctx_mgr_->update_aggre_log_ts_wo_lock(rec_log_ts))) {
TRANS_LOG(ERROR, "update aggre log ts wo lock failed", KR(ret), "context", *this);
} else {
ret = mt_ctx_.trans_clear();
@ -1814,21 +1817,23 @@ int ObPartTransCtx::serialize_tx_ctx_to_buffer(ObTxLocalBuffer &buffer, int64_t
const SCN ObPartTransCtx::get_rec_log_ts() const
{
CtxLockGuard guard(lock_);
return get_rec_log_ts_();
}
const SCN ObPartTransCtx::get_rec_log_ts_() const
{
SCN log_ts = SCN::max_scn();
share::SCN log_ts = SCN::max_scn();;
share::SCN rec_log_ts = rec_log_ts_.atomic_load();
share::SCN prev_rec_log_ts = prev_rec_log_ts_.atomic_load();
// Before the checkpoint of the tx ctx table is succeed, we should still use
// the prev_log_ts. And after successfully checkpointed, we can use the new
// rec_log_ts if exist
if (prev_rec_log_ts_.is_valid()) {
log_ts = prev_rec_log_ts_;
} else if (rec_log_ts_.is_valid()) {
log_ts = rec_log_ts_;
if (prev_rec_log_ts.is_valid()) {
log_ts = prev_rec_log_ts;
} else if (rec_log_ts.is_valid()) {
log_ts = rec_log_ts;
}
TRANS_LOG(DEBUG, "part ctx get rec log ts", K(*this), K(log_ts));
@ -1841,7 +1846,7 @@ int ObPartTransCtx::on_tx_ctx_table_flushed()
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
// To mark the checkpoint is succeed, we reset the prev_rec_log_ts
prev_rec_log_ts_.reset();
prev_rec_log_ts_.atomic_store(share::SCN::invalid_scn());
return ret;
}
@ -5023,10 +5028,6 @@ int ObPartTransCtx::check_replay_avaliable_(const palf::LSN &offset,
if (need_replay && !create_ctx_scn_.is_valid()) {
create_ctx_scn_ = timestamp;
}
if (need_replay) {
update_rec_log_ts_(true/*for_replay*/, timestamp);
}
}
return ret;
@ -5084,6 +5085,8 @@ int ObPartTransCtx::push_replayed_log_ts(const SCN log_ts_ns,
exec_info_.next_log_entry_no_ = log_entry_no + 1;
}
update_rec_log_ts_(true/*for_replay*/, log_ts_ns);
if (OB_SUCC(ret)) {
if (big_segment_info_.segment_buf_.is_completed()
&& big_segment_info_.unsynced_segment_part_cbs_.count() > 0) {
@ -5279,7 +5282,7 @@ void ObPartTransCtx::force_no_need_replay_checksum_(const bool parallel_replay,
if (ATOMIC_LOAD(&exec_info_.need_checksum_)) {
TRANS_LOG(INFO, "set skip calc checksum", K_(trans_id), K_(ls_id), KP(this), K(parallel_replay), K(log_ts));
if (parallel_replay) {
update_rec_log_ts_(true, log_ts);
update_rec_log_ts_(true/*for_replay*/, log_ts);
}
ATOMIC_STORE(&exec_info_.need_checksum_, false);
mt_ctx_.set_skip_checksum_calc();
@ -5385,27 +5388,6 @@ int ObPartTransCtx::replay_redo_in_ctx(const ObTxRedoLog &redo_log,
ret = correct_cluster_version_(redo_log.get_cluster_version());
}
// if we need calc checksum, must don't recycle redo log ts before they were replayed
// otherwise the checksum scn in TxCtx checkpoint will be lag behind recovery scn
// and after the restart, the txn's checksum verify will be skipped
//
// example:
//
// Log sequence of Txn is : 1 -> 2 -> 3 -> 4
// where 1, 4 in queue 0 (aka tx-log-queue), 2 in queue 2 and 3 in queue 3
// because of parallel replaying, assume queue 0 replayed 4, queue 2 and 3 not
// replayed 2, 3 yet, then in this moment, a checkpoint are issued, the checksum
// calculate for queue 2 and queue 3 will missing data of log 2 and 3
// after checkpoint, and 2, 3 replayed, the system will recycle logs 1-4,
// after a restart, recovery from log queue after 4, and 2,3 will not be replayed
// finally the checksum of queue 2,3 not include log sequence 2,3
//
// the cons of this choice is after restart, the log recycle position
// will be more older, which cause do more times checkpoint of TxCtx
//
if (OB_SUCC(ret) && !is_tx_log_queue && exec_info_.need_checksum_) {
update_rec_log_ts_(true, timestamp);
}
// if this is serial final redo log
// change the logging to parallel logging
if (OB_SUCC(ret) && serial_final) {
@ -6878,6 +6860,37 @@ int ObPartTransCtx::check_with_tx_data(ObITxDataCheckFunctor &fn)
return ret;
}
int ObPartTransCtx::update_rec_log_ts_for_parallel_replay(const SCN &rec_scn)
{
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
// NB: If we need calculate the checksum, we cannot allow them to recycle the
// logs before the checksum of the logs are computed. Otherwise, we may not be
// able to calculate the checksum for the concurrent replay portion of the log
// after a restart.
//
// Let's see the example:
//
// The log sequence of the Txn is : 1 -> 2 -> 3 -> 4
//
// The 1, 4 is in the queue 0(aka tx-log-queue), 2 is in the queue 2 and 3 is
// in the queue 3 because of the parallel replay. Assuming the queue 0 has
// replayed 4, and the queue 2 and 3 has not replayed 2, 3 yet. At the moment,
// a checkpoint is issued, and the checksum calculation for queue 2 and queue
// 3 will miss the portion of the log 2 and 3 during the checkpoint. And if we
// donot update the rec_scn, After 2, 3 has replayed, the checkpoint will
// later recycle the logs 1-4. And the restart will miss to calculate the
// checksum of 2 and 3 forever.
//
// The cons of this choice is that after restart, the log recycle position
// will be somehow older which will cause more checkpoint of the tx ctx table.
//
if (exec_info_.need_checksum_) {
update_rec_log_ts_(true/*for_replay*/, rec_scn);
}
return ret;
}
int ObPartTransCtx::update_rec_log_ts_(bool for_replay, const SCN &rec_log_ts)
{
int ret = OB_SUCCESS;
@ -6889,9 +6902,9 @@ int ObPartTransCtx::update_rec_log_ts_(bool for_replay, const SCN &rec_log_ts)
if (for_replay) {
// follower may support parallel replay redo, so must do dec update
if (!rec_log_ts_.is_valid()) {
rec_log_ts_ = rec_log_ts;
rec_log_ts_.atomic_store(rec_log_ts);
} else if (rec_log_ts_ > rec_log_ts){
rec_log_ts_ = rec_log_ts;
rec_log_ts_.atomic_store(rec_log_ts);
}
} else {
if (!rec_log_ts_.is_valid()) {
@ -6907,7 +6920,7 @@ int ObPartTransCtx::update_rec_log_ts_(bool for_replay, const SCN &rec_log_ts)
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected null ptr", K(*this));
} else {
rec_log_ts_ = log_cb->get_log_ts();
rec_log_ts_.atomic_store(log_cb->get_log_ts());
}
} else {
// there may exits if log cbs is empty
@ -6917,9 +6930,9 @@ int ObPartTransCtx::update_rec_log_ts_(bool for_replay, const SCN &rec_log_ts)
if (min_big_segment_rec_scn.is_valid() && !rec_log_ts_.is_valid()) {
rec_log_ts_ = min_big_segment_rec_scn;
rec_log_ts_.atomic_store(min_big_segment_rec_scn);
} else if (min_big_segment_rec_scn.is_valid() && rec_log_ts_.is_valid()) {
rec_log_ts_ = share::SCN::min(min_big_segment_rec_scn, rec_log_ts_);
rec_log_ts_.atomic_store(share::SCN::min(min_big_segment_rec_scn, rec_log_ts_));
}
return ret;
@ -6936,18 +6949,17 @@ int ObPartTransCtx::refresh_rec_log_ts_()
if (!prev_rec_log_ts_.is_valid()) {
// We should remember the rec_log_ts before the tx ctx table is successfully
// checkpointed
prev_rec_log_ts_ = rec_log_ts_;
prev_rec_log_ts_.atomic_store(rec_log_ts_);
if (is_follower_()) {
// Case 1: As follower, the replay is indead in order, while we cannot
// simply reset it because the replay is not atomic, and it may be in the
// middle stage that the replay is currently on-going. So the necessary
// state before the log ts that is replaying may not be contained, so we
// need replay from the on-going log ts.
if (exec_info_.max_applied_log_ts_ != exec_info_.max_applying_log_ts_) {
rec_log_ts_ = exec_info_.max_applying_log_ts_;
} else if (busy_cbs_.is_empty()) {
rec_log_ts_.reset();
// Case 1: As follower, the replay may involve both serial and concurrent
// replay. While regardless of how the replay occurs, we update the
// rec_scn after the replay is completed. Therefore, as long as we ensure
// that the state of the txn of the rec_scn is complete before updating
// the rec_scn(indicated by ObTxReplayExecutor::finish_replay_), it is
// safe to clear the rec_scn during the refresh_rec_scn.
if (busy_cbs_.is_empty()) {
rec_log_ts_.atomic_store(share::SCN::invalid_scn());
} else {
// Case 1.1: As follower, there may also exist log which is proposed
// while not committed because of the current leader's switch mechinism
@ -6959,7 +6971,7 @@ int ObPartTransCtx::refresh_rec_log_ts_()
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected null ptr", K(*this));
} else {
rec_log_ts_ = log_cb->get_log_ts();
rec_log_ts_.atomic_store(log_cb->get_log_ts());
}
}
} else {
@ -6968,19 +6980,18 @@ int ObPartTransCtx::refresh_rec_log_ts_()
// rec_log_ts if exists or reset it if not because all log of the txn with
// its log ts in front of the FCL must be contained in the checkpoint.
if (busy_cbs_.is_empty()) {
rec_log_ts_.reset();
rec_log_ts_.atomic_store(share::SCN::invalid_scn());
} else {
const ObTxLogCb *log_cb = busy_cbs_.get_first();
if (OB_ISNULL(log_cb)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected null ptr", K(*this));
} else {
rec_log_ts_ = log_cb->get_log_ts();
rec_log_ts_.atomic_store(log_cb->get_log_ts());
}
}
}
} else {
// TODO(handora.qc): change to ERROR or enabling the exception
TRANS_LOG(WARN, "we should not allow concurrent merge of tx ctx table", K(*this));
}

View File

@ -252,6 +252,7 @@ public:
int check_with_tx_data(ObITxDataCheckFunctor &fn);
const share::SCN get_rec_log_ts() const;
int on_tx_ctx_table_flushed();
int update_rec_log_ts_for_parallel_replay(const SCN &rec_log_ts);
int64_t get_applying_log_ts() const;
int64_t get_pending_log_size() { return mt_ctx_.get_pending_log_size(); }

View File

@ -362,6 +362,10 @@ void ObTxReplayExecutor::finish_replay_(const int retcode)
if (OB_SUCCESS == retcode) {
ctx_->push_replayed_log_ts(log_ts_ns_, lsn_, replaying_log_entry_no_);
}
} else {
if (OB_SUCCESS == retcode) {
ctx_->update_rec_log_ts_for_parallel_replay(log_ts_ns_);
}
}
if (OB_SUCCESS != retcode) {
ctx_->print_trace_log();