[replay] fix replay serial part log when txn recover scn after final serial log

This commit is contained in:
chinaxing 2024-01-17 07:43:09 +00:00 committed by ob-robot
parent 81611ca45a
commit 1e2b018bbf
13 changed files with 154 additions and 52 deletions

View File

@ -157,7 +157,8 @@ int ObIMvccCtx::register_row_replay_cb(
int ObIMvccCtx::register_table_lock_cb_(
ObLockMemtable *memtable,
ObMemCtxLockOpLinkNode *lock_op,
ObOBJLockCallback *&cb)
ObOBJLockCallback *&cb,
const share::SCN replay_scn)
{
int ret = OB_SUCCESS;
static ObFakeStoreRowKey tablelock_fake_rowkey("tbl", 3);
@ -171,6 +172,9 @@ int ObIMvccCtx::register_table_lock_cb_(
TRANS_LOG(WARN, "encode memtable key failed", K(ret));
} else {
cb->set(mt_key, lock_op);
if (replay_scn.is_valid()) {
cb->set_scn(replay_scn);
}
if (OB_FAIL(append_callback(cb))) {
TRANS_LOG(WARN, "append table lock callback failed", K(ret), K(*cb));
} else {
@ -216,10 +220,10 @@ int ObIMvccCtx::register_table_lock_replay_cb(
TRANS_LOG(WARN, "invalid argument", K(ret), K(memtable), K(lock_op));
} else if (OB_FAIL(register_table_lock_cb_(memtable,
lock_op,
cb))) {
cb,
scn))) {
TRANS_LOG(WARN, "register tablelock callback failed", K(ret), KPC(lock_op));
} else {
cb->set_scn(scn);
TRANS_LOG(DEBUG, "replay register table lock callback", K(*cb));
}
return ret;

View File

@ -225,7 +225,8 @@ private:
int register_table_lock_cb_(
ObLockMemtable *memtable,
ObMemCtxLockOpLinkNode *lock_op,
ObOBJLockCallback *&cb);
ObOBJLockCallback *&cb,
const share::SCN replay_scn = share::SCN::invalid_scn());
protected:
DISALLOW_COPY_AND_ASSIGN(ObIMvccCtx);
int alloc_type_;

View File

@ -100,8 +100,12 @@ SCN ObITransCallback::get_scn() const
int ObITransCallback::before_append_cb(const bool is_replay)
{
int ret = before_append(is_replay);
if (OB_SUCC(ret)) {
int ret = OB_SUCCESS;
if (is_replay && !scn_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "scn is invalid for replay", K(ret), KPC(this));
} else if (OB_FAIL(before_append(is_replay))) {
} else {
need_submit_log_ = !is_replay;
}
return ret;
@ -219,7 +223,9 @@ void ObTransCallbackMgr::reset()
write_epoch_start_tid_ = 0;
need_merge_ = false;
for_replay_ = false;
has_branch_replayed_into_first_list_ = false;
serial_final_scn_.set_max();
serial_final_seq_no_.reset();
serial_sync_scn_.set_min();
callback_main_list_append_count_ = 0;
callback_slave_list_append_count_ = 0;
@ -345,6 +351,39 @@ int ObTransCallbackMgr::append(ObITransCallback *node)
if (seq_no.support_branch()) {
// NEW since version 4.3, select by branch
int slot = seq_no.get_branch() % MAX_CALLBACK_LIST_COUNT;
if (slot > 0
&& for_replay_
&& is_serial_final_()
&& OB_UNLIKELY(node->get_scn() <= serial_final_scn_)) {
// _NOTE_
// for log with scn before serial final and replayed after txn recovery from point after serial final
// it's replayed into first callback-list to keep the scn is in asc order for all callback list
// for example:
// serial final log scn = 100
// recovery point scn = 200
// log replaying with scn = 80
//
// Checksum calculation:
// this log has been accumulated, it will not be required in all calback-list
if (parallel_replay_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "parallel replay an serial log", K(ret), KPC(this));
ob_abort();
}
if (OB_UNLIKELY(!has_branch_replayed_into_first_list_)) {
// sanity check: the serial_final_seq_no must be set
// which will be used in replay `rollback branch savepoint` log
if (OB_UNLIKELY(!serial_final_seq_no_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "serial_final_seq_no is invalid", K(ret), KPC(this));
ob_abort();
}
ATOMIC_STORE(&has_branch_replayed_into_first_list_, true);
TRANS_LOG(INFO, "replay log before serial final when reach serial final",
KPC(this), KPC(get_trans_ctx()), KPC(node));
}
slot = 0;
}
if (slot == 0) {
// no parallel and no branch requirement
ret = callback_list_.append_callback(node, for_replay_, parallel_replay_, is_serial_final_());
@ -420,6 +459,19 @@ int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no,
} else if (callback_lists_) {
ret = callback_lists_[slot - 1].remove_callbacks_for_rollback_to(to_seq_no, from_seq_no, replay_scn);
} else { /*callback_lists_ is empty, no need do rollback */ }
// _NOTE_
// if branch level savepoint with `to_seq_no` before serial_final log, the branch maybe replayed
// into first callback-list when recovery with scn after serial final log (see ObTransCallbackMgr::append)
// hence, we need try rollback on it
if (OB_SUCC(ret)
&& for_replay_
&& slot > 0
&& OB_UNLIKELY(has_branch_replayed_into_first_list_)
&& to_seq_no.get_seq() <= serial_final_seq_no_.get_seq()) {
ret = callback_list_.remove_callbacks_for_rollback_to(to_seq_no, from_seq_no, replay_scn);
TRANS_LOG(INFO, "replay branch savepoint cross serial final",
KPC(this), KPC(get_trans_ctx()), K(replay_scn), K(to_seq_no), K(from_seq_no));
}
}
} else { // before 4.3
ret = callback_list_.remove_callbacks_for_rollback_to(to_seq_no, from_seq_no, replay_scn);
@ -2169,9 +2221,11 @@ bool ObTransCallbackMgr::pending_log_size_too_large(const transaction::ObTxSEQ &
}
}
void ObTransCallbackMgr::set_parallel_logging(const share::SCN serial_final_scn)
void ObTransCallbackMgr::set_parallel_logging(const share::SCN serial_final_scn,
const transaction::ObTxSEQ serial_final_seq_no)
{
serial_final_scn_.atomic_set(serial_final_scn);
serial_final_seq_no_.atomic_store(serial_final_seq_no);
}
void ObTransCallbackMgr::update_serial_sync_scn_(const share::SCN scn)

View File

@ -197,7 +197,9 @@ public:
write_epoch_start_tid_(0),
need_merge_(false),
for_replay_(false),
has_branch_replayed_into_first_list_(false),
serial_final_scn_(share::SCN::max_scn()),
serial_final_seq_no_(),
serial_sync_scn_(share::SCN::min_scn()),
callback_main_list_append_count_(0),
callback_slave_list_append_count_(0),
@ -274,7 +276,8 @@ public:
int get_log_guard(const transaction::ObTxSEQ &write_seq,
ObCallbackListLogGuard &log_guard,
int &cb_list_idx);
void set_parallel_logging(const share::SCN serial_final_scn);
void set_parallel_logging(const share::SCN serial_final_scn,
const transaction::ObTxSEQ serial_final_seq_no);
void set_skip_checksum_calc();
bool skip_checksum_calc() const { return ATOMIC_LOAD(&skip_checksum_); }
void merge_multi_callback_lists();
@ -325,6 +328,7 @@ public:
transaction::ObPartTransCtx *get_trans_ctx() const;
TO_STRING_KV(KP(this),
K_(serial_final_scn),
K_(serial_final_seq_no),
K_(serial_sync_scn),
KP_(callback_lists),
K_(need_merge),
@ -387,8 +391,17 @@ private:
// On Leader, if no write after takeover, merge is also not required
bool need_merge_;
bool for_replay_;
// the last serial logg's scn
// used to mark that some branch callback replayed in first callback list
// actually, by default they were replayed into its own callback list by
// hash on branch id.
// this can happened when txn recovery from a point after serial final log
// and branch callback before (or equals to) serial final scn will be put
// into the first list for easy to handle (ensure each callback list will
// be `appended only` after serial final state).
bool has_branch_replayed_into_first_list_;
// the last serial log's scn
share::SCN serial_final_scn_;
transaction::ObTxSEQ serial_final_seq_no_;
// currently synced serial log's scn
// when serial_sync_scn_ == serial_final_scn_
// it means the serial logging or serial replay has been finished

View File

@ -100,11 +100,21 @@ int ObTxCallbackList::append_callback(ObITransCallback *callback,
TRANS_LOG(WARN, "before_append_cb failed", K(ret), KPC(callback));
} else {
const bool repos_lc = !for_replay && (log_cursor_ == &head_);
ObITransCallback *append_pos = NULL;
if (!for_replay || parallel_replay || serial_final || !parallel_start_pos_) {
(void)get_tail()->append(callback);
append_pos = get_tail();
} else {
parallel_start_pos_->get_prev()->append(callback);
append_pos = parallel_start_pos_->get_prev();
}
// for replay, do sanity check: scn is incremental
if (for_replay
&& append_pos != &head_ // the head with scn max
&& append_pos->get_scn() > callback->get_scn()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "replay callback scn out of order", K(ret), KPC(callback), KPC(this));
ob_abort();
}
append_pos->append(callback);
// start parallel replay, remember the position
if (for_replay && parallel_replay && !serial_final && !parallel_start_pos_) {
ATOMIC_STORE(&parallel_start_pos_, get_tail());

View File

@ -417,8 +417,9 @@ public:
int remove_callback_for_uncommited_txn(const memtable::ObMemtableSet *memtable_set);
int rollback(const transaction::ObTxSEQ seq_no, const transaction::ObTxSEQ from_seq_no,
const share::SCN replay_scn);
void set_parallel_logging(const share::SCN serial_final_scn) {
trans_mgr_.set_parallel_logging(serial_final_scn);
void set_parallel_logging(const share::SCN serial_final_scn,
const transaction::ObTxSEQ serial_final_seq_no) {
trans_mgr_.set_parallel_logging(serial_final_scn, serial_final_seq_no);
}
void set_skip_checksum_calc() {
trans_mgr_.set_skip_checksum_calc();

View File

@ -1201,6 +1201,7 @@ void ObMemtableMutatorIterator::reset()
row_header_.reset();
row_.reset();
table_lock_.reset();
row_seq_no_.reset();
}
int ObMemtableMutatorIterator::deserialize(const char *buf, const int64_t data_len, int64_t &pos,
@ -1260,6 +1261,8 @@ int ObMemtableMutatorIterator::iterate_next_row(ObEncryptRowBuf &decrypt_buf,
encrypt_info, unused_need_extract_encrypt_meta, encrypt_meta,
unused_encrypt_stat_map, ObTransRowFlag::is_big_row(meta_.get_flags())))) {
TRANS_LOG(WARN, "deserialize mutator row fail", K(ret));
} else {
row_seq_no_ = row_.seq_no_;
}
break;
}
@ -1268,6 +1271,8 @@ int ObMemtableMutatorIterator::iterate_next_row(ObEncryptRowBuf &decrypt_buf,
if (OB_FAIL(
table_lock_.deserialize(buf_.get_data(), buf_.get_limit(), buf_.get_position()))) {
TRANS_LOG(WARN, "deserialize table lock fail", K(ret));
} else {
row_seq_no_ = table_lock_.seq_no_;
}
break;
}
@ -1291,6 +1296,5 @@ const ObMemtableMutatorRow &ObMemtableMutatorIterator::get_mutator_row() { retur
const ObMutatorTableLock &ObMemtableMutatorIterator::get_table_lock_row() { return table_lock_; }
}//namespace memtable
}//namespace oceanbase

View File

@ -335,8 +335,8 @@ public:
const ObMutatorRowHeader &get_row_head();
const ObMemtableMutatorRow &get_mutator_row();
const ObMutatorTableLock &get_table_lock_row();
TO_STRING_KV(K_(meta),K(buf_.get_position()),K(buf_.get_limit()));
transaction::ObTxSEQ get_row_seq_no() const { return row_seq_no_; }
TO_STRING_KV(K_(meta), K_(row_seq_no), K(buf_.get_position()),K(buf_.get_limit()));
private:
private:
@ -345,7 +345,7 @@ private:
ObMutatorRowHeader row_header_;
ObMemtableMutatorRow row_;
ObMutatorTableLock table_lock_;
transaction::ObTxSEQ row_seq_no_;
DISALLOW_COPY_AND_ASSIGN(ObMemtableMutatorIterator);
};
}//namespace memtable

View File

@ -1906,7 +1906,7 @@ int ObPartTransCtx::serial_submit_redo_after_write_()
ret = submitter.serial_submit(should_switch);
if (should_switch && submitter.get_submitted_cnt() > 0) {
const share::SCN serial_final_scn = submitter.get_submitted_scn();
switch_to_parallel_logging_(serial_final_scn);
switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_);
TRANS_LOG(INFO, "**switch to parallel logging**",
K_(ls_id), K_(trans_id),
K(serial_final_scn),
@ -4760,7 +4760,8 @@ int ObPartTransCtx::replay_redo_in_ctx(const ObTxRedoLog &redo_log,
const SCN &timestamp,
const int64_t &part_log_no,
const bool is_tx_log_queue,
const bool serial_final)
const bool serial_final,
const ObTxSEQ &max_seq_no)
{
int ret = OB_SUCCESS;
common::ObTimeGuard timeguard("replay_redo_in_ctx", 10 * 1000);
@ -4805,7 +4806,7 @@ int ObPartTransCtx::replay_redo_in_ctx(const ObTxRedoLog &redo_log,
usleep(50_ms);
ob_abort();
} else if (!exec_info_.serial_final_scn_.is_valid()) {
switch_to_parallel_logging_(timestamp);
switch_to_parallel_logging_(timestamp, max_seq_no);
}
}
}
@ -9636,7 +9637,8 @@ inline bool ObPartTransCtx::is_support_parallel_replay_() const
return cluster_version_accurate_ && cluster_version_ >= CLUSTER_VERSION_4_3_0_0;
}
inline void ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_final_scn)
inline void ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_final_scn,
const ObTxSEQ max_seq_no)
{
// when start replaying serial final redo log or submitted serial final redo log
// switch the Tx's logging mode to parallel logging
@ -9651,13 +9653,19 @@ inline void ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_
// if an rollback to savepoint before this point, which means
// replay of this rollback-savepoint-log must pre-berrier to
// wait serial replay parts finished
exec_info_.serial_final_seq_no_ = exec_info_.max_submitted_seq_no_;
mt_ctx_.set_parallel_logging(serial_final_scn);
if (OB_UNLIKELY(!max_seq_no.is_valid())) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "max seq_no of serial final log is invalid",
K(serial_final_scn), K(max_seq_no), KPC(this));
print_trace_log_();
ob_abort();
}
exec_info_.serial_final_seq_no_ = max_seq_no;
mt_ctx_.set_parallel_logging(serial_final_scn, max_seq_no);
}
inline void ObPartTransCtx::recovery_parallel_logging_()
{
mt_ctx_.set_parallel_logging(exec_info_.serial_final_scn_);
mt_ctx_.set_parallel_logging(exec_info_.serial_final_scn_, exec_info_.serial_final_seq_no_);
if (exec_info_.max_applied_log_ts_ >= exec_info_.serial_final_scn_) {
// the serial final log has been synced or replayed
// notify callback_mgr serial part is finished

View File

@ -379,7 +379,8 @@ public:
const share::SCN &timestamp,
const int64_t &part_log_no,
const bool is_tx_log_queue,
const bool serial_final);
const bool serial_final,
const ObTxSEQ &max_seq_no);
int replay_rollback_to(const ObTxRollbackToLog &log,
const palf::LSN &offset,
const share::SCN &timestamp,
@ -513,7 +514,8 @@ private:
memtable::ObRedoLogSubmitHelper &helper,
const logservice::ObReplayBarrierType &barrier);
bool should_switch_to_parallel_logging_();
void switch_to_parallel_logging_(const share::SCN serial_final_scn);
void switch_to_parallel_logging_(const share::SCN serial_final_scn,
const ObTxSEQ max_seq_no);
bool has_replay_serial_final_() const;
void recovery_parallel_logging_();
int check_can_submit_redo_();

View File

@ -361,32 +361,28 @@ int ObTxReplayExecutor::replay_redo_()
int tmp_ret = OB_SUCCESS;
ObTxRedoLogTempRef temp_ref;
ObTxRedoLog redo_log(temp_ref);
const bool serial_final = log_block_.get_header().is_serial_final();
ObTxSEQ max_seq_no;
if (is_tx_log_replay_queue()) {
tx_part_log_no_ += 1; // redo is compound with tx log, mark part_log_no is required
}
if (OB_ISNULL(ls_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "[Replay Tx] ls should not be null", K(ret), K(ls_));
} else if (OB_FAIL(log_block_.deserialize_log_body(redo_log))) {
TRANS_LOG(WARN, "[Replay Tx] deserialize log body error", K(ret), K(redo_log), K(lsn_),
K(log_ts_ns_));
} else if (OB_FAIL(replay_redo_in_memtable_(redo_log))) {
} else if (OB_FAIL(replay_redo_in_memtable_(redo_log, serial_final, max_seq_no))) {
TRANS_LOG(WARN, "[Replay Tx] replay redo in memtable error", K(ret), K(lsn_), K(log_ts_ns_));
} else if (OB_FAIL(ctx_->replay_redo_in_ctx(redo_log,
lsn_,
log_ts_ns_,
tx_part_log_no_,
is_tx_log_replay_queue(),
log_block_.get_header().is_serial_final()))) {
serial_final,
max_seq_no))) {
TRANS_LOG(WARN, "[Replay Tx] replay redo in tx_ctx error", K(ret), K(lsn_), K(log_ts_ns_));
// } else if (first_created_ctx_ && redo_log.get_log_no() > 0) {
// // replay a commited tx in recovery process
// ctx_->force_no_need_replay_checksum();
// TRANS_LOG(
// WARN,
// "[Replay Tx] Don't replay from first redo log and Part_ctx is not existed in tx ctx table",
// K(first_created_ctx_), K(redo_log.get_log_no()));
// ctx_->supplement_undo_actions_if_exist();
}
if (OB_SUCC(ret) && OB_TMP_FAIL(mt_ctx_->remove_callbacks_for_fast_commit(replay_queue_, share::SCN::minus(log_ts_ns_, 1)))) {
TRANS_LOG(WARN, "[Replay Tx] remove callbacks for fast commit", K(ret), K(tmp_ret),
@ -563,7 +559,7 @@ int ObTxReplayExecutor::replay_clear_()
return ret;
}
int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo)
int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo, const bool serial_final, ObTxSEQ &max_seq_no)
{
int ret = OB_SUCCESS;
// ObMemtable *cur_mem = nullptr;
@ -579,6 +575,8 @@ int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo)
ObCLogEncryptInfo encrypt_info;
encrypt_info.init();
max_seq_no.reset();
if (OB_ISNULL(mmi_ptr_)) {
if (nullptr
== (mmi_ptr_ = static_cast<ObMemtableMutatorIterator *>(
@ -625,8 +623,18 @@ int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo)
K(row_head.tablet_id_), KP(ls_), K(log_ts_ns_), K(tx_part_log_no_),
KPC(ctx_));
}
} else {
// do nothing
} else if (OB_UNLIKELY(serial_final)) {
// because the seq no in one log-entry is not in order
// must iterator all to pick the max value
const ObTxSEQ seq_no = mmi_ptr_->get_row_seq_no();
if (OB_UNLIKELY(!seq_no.is_valid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "seq no is invalid in mutator row", K(seq_no), KPC(this));
ob_abort();
}
if (seq_no.get_seq() > max_seq_no.get_seq()) {
max_seq_no = seq_no;
}
}
}
}
@ -779,22 +787,18 @@ int ObTxReplayExecutor::replay_row_(storage::ObStoreCtx &store_ctx,
TRANS_LOG(WARN, "[Replay Tx] this is not a ObMemtable", K(ret), KP(mem_ptr), KPC(mem_ptr),
KP(mmi_ptr));
} else if (FALSE_IT(timeguard.click("get_memtable"))) {
// _NOTE_:
// set max_end_scn before repaly_row to ensure memtable will not be released
// before current log replay success
} else if (OB_FAIL(data_mem_ptr->set_max_end_scn(log_ts_ns_))) {
TRANS_LOG(WARN, "[Replay Tx] set memtable max end log ts failed", K(ret), KP(data_mem_ptr));
} else if (OB_FAIL(data_mem_ptr->replay_row(store_ctx, log_ts_ns_, mmi_ptr))) {
TRANS_LOG(WARN, "[Replay Tx] replay row error", K(ret));
} else if (OB_FAIL(data_mem_ptr->set_max_end_scn(log_ts_ns_))) { // for freeze log_ts , may be
TRANS_LOG(WARN, "[Replay Tx] set memtable max end log ts failed", K(ret), KP(data_mem_ptr));
} else if (OB_FAIL(data_mem_ptr->set_rec_scn(log_ts_ns_))) {
TRANS_LOG(WARN, "[Replay Tx] set rec_log_ts error", K(ret), KPC(data_mem_ptr));
}
timeguard.click("replay_finish");
if (OB_FAIL(ret) && ret != OB_NO_NEED_UPDATE) {
// We need rollback all callbacks of this log to avoid replay a row
// in a freeze memtable which has a smaller end ts than this log.
//
// The rollback operation must hold write_ref to make memtable stay in memory.
mt_ctx_->rollback_redo_callbacks(replay_queue_, log_ts_ns_);
}
return ret;
}

View File

@ -128,9 +128,9 @@ private:
int replay_multi_source_data_();
int replay_record_();
bool is_tx_log_replay_queue() const { return replay_queue_ == 0; }
int replay_redo_in_memtable_(ObTxRedoLog &redo);
int replay_redo_in_memtable_(ObTxRedoLog &redo, const bool serial_final, ObTxSEQ &max_seq_no);
virtual int replay_one_row_in_memtable_(memtable::ObMutatorRowHeader& row_head,
memtable::ObMemtableMutatorIterator *mmi_ptr);
memtable::ObMemtableMutatorIterator *mmi_ptr);
int prepare_memtable_replay_(storage::ObStorageTableGuard &w_guard,
memtable::ObIMemtable *&mem_ptr);
int replay_row_(storage::ObStoreCtx &store_ctx,

View File

@ -136,9 +136,10 @@ public:
if (t) {
share::SCN scn;
scn.convert_for_tx(1231231231);
callback_mgr_.set_parallel_logging(scn);
transaction::ObTxSEQ seq(1000, 0);
callback_mgr_.set_parallel_logging(scn, seq);
} else {
callback_mgr_.set_parallel_logging(share::SCN::max_scn());
callback_mgr_.set_parallel_logging(share::SCN::max_scn(), transaction::ObTxSEQ::INVL());
}
}
ObRedoLogSubmitHelper helper_;