diff --git a/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp index 7246ce252..073e4f1d6 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp @@ -157,7 +157,8 @@ int ObIMvccCtx::register_row_replay_cb( int ObIMvccCtx::register_table_lock_cb_( ObLockMemtable *memtable, ObMemCtxLockOpLinkNode *lock_op, - ObOBJLockCallback *&cb) + ObOBJLockCallback *&cb, + const share::SCN replay_scn) { int ret = OB_SUCCESS; static ObFakeStoreRowKey tablelock_fake_rowkey("tbl", 3); @@ -171,6 +172,9 @@ int ObIMvccCtx::register_table_lock_cb_( TRANS_LOG(WARN, "encode memtable key failed", K(ret)); } else { cb->set(mt_key, lock_op); + if (replay_scn.is_valid()) { + cb->set_scn(replay_scn); + } if (OB_FAIL(append_callback(cb))) { TRANS_LOG(WARN, "append table lock callback failed", K(ret), K(*cb)); } else { @@ -216,10 +220,10 @@ int ObIMvccCtx::register_table_lock_replay_cb( TRANS_LOG(WARN, "invalid argument", K(ret), K(memtable), K(lock_op)); } else if (OB_FAIL(register_table_lock_cb_(memtable, lock_op, - cb))) { + cb, + scn))) { TRANS_LOG(WARN, "register tablelock callback failed", K(ret), KPC(lock_op)); } else { - cb->set_scn(scn); TRANS_LOG(DEBUG, "replay register table lock callback", K(*cb)); } return ret; diff --git a/src/storage/memtable/mvcc/ob_mvcc_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_ctx.h index c352c632d..28acd6b2c 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_ctx.h @@ -225,7 +225,8 @@ private: int register_table_lock_cb_( ObLockMemtable *memtable, ObMemCtxLockOpLinkNode *lock_op, - ObOBJLockCallback *&cb); + ObOBJLockCallback *&cb, + const share::SCN replay_scn = share::SCN::invalid_scn()); protected: DISALLOW_COPY_AND_ASSIGN(ObIMvccCtx); int alloc_type_; diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index 68d87eab2..d286bd3a4 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -100,8 +100,12 @@ SCN ObITransCallback::get_scn() const int ObITransCallback::before_append_cb(const bool is_replay) { - int ret = before_append(is_replay); - if (OB_SUCC(ret)) { + int ret = OB_SUCCESS; + if (is_replay && !scn_.is_valid()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "scn is invalid for replay", K(ret), KPC(this)); + } else if (OB_FAIL(before_append(is_replay))) { + } else { need_submit_log_ = !is_replay; } return ret; @@ -219,7 +223,9 @@ void ObTransCallbackMgr::reset() write_epoch_start_tid_ = 0; need_merge_ = false; for_replay_ = false; + has_branch_replayed_into_first_list_ = false; serial_final_scn_.set_max(); + serial_final_seq_no_.reset(); serial_sync_scn_.set_min(); callback_main_list_append_count_ = 0; callback_slave_list_append_count_ = 0; @@ -345,6 +351,39 @@ int ObTransCallbackMgr::append(ObITransCallback *node) if (seq_no.support_branch()) { // NEW since version 4.3, select by branch int slot = seq_no.get_branch() % MAX_CALLBACK_LIST_COUNT; + if (slot > 0 + && for_replay_ + && is_serial_final_() + && OB_UNLIKELY(node->get_scn() <= serial_final_scn_)) { + // _NOTE_ + // for log with scn before serial final and replayed after txn recovery from point after serial final + // it's replayed into first callback-list to keep the scn is in asc order for all callback list + // for example: + // serial final log scn = 100 + // recovery point scn = 200 + // log replaying with scn = 80 + // + // Checksum calculation: + // this log has been accumulated, it will not be required in all calback-list + if (parallel_replay_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "parallel replay an serial log", K(ret), KPC(this)); + ob_abort(); + } + if (OB_UNLIKELY(!has_branch_replayed_into_first_list_)) { + // sanity check: the serial_final_seq_no must be set + // which will be used in replay `rollback branch savepoint` log + if (OB_UNLIKELY(!serial_final_seq_no_.is_valid())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "serial_final_seq_no is invalid", K(ret), KPC(this)); + ob_abort(); + } + ATOMIC_STORE(&has_branch_replayed_into_first_list_, true); + TRANS_LOG(INFO, "replay log before serial final when reach serial final", + KPC(this), KPC(get_trans_ctx()), KPC(node)); + } + slot = 0; + } if (slot == 0) { // no parallel and no branch requirement ret = callback_list_.append_callback(node, for_replay_, parallel_replay_, is_serial_final_()); @@ -420,6 +459,19 @@ int ObTransCallbackMgr::rollback_to(const ObTxSEQ to_seq_no, } else if (callback_lists_) { ret = callback_lists_[slot - 1].remove_callbacks_for_rollback_to(to_seq_no, from_seq_no, replay_scn); } else { /*callback_lists_ is empty, no need do rollback */ } + // _NOTE_ + // if branch level savepoint with `to_seq_no` before serial_final log, the branch maybe replayed + // into first callback-list when recovery with scn after serial final log (see ObTransCallbackMgr::append) + // hence, we need try rollback on it + if (OB_SUCC(ret) + && for_replay_ + && slot > 0 + && OB_UNLIKELY(has_branch_replayed_into_first_list_) + && to_seq_no.get_seq() <= serial_final_seq_no_.get_seq()) { + ret = callback_list_.remove_callbacks_for_rollback_to(to_seq_no, from_seq_no, replay_scn); + TRANS_LOG(INFO, "replay branch savepoint cross serial final", + KPC(this), KPC(get_trans_ctx()), K(replay_scn), K(to_seq_no), K(from_seq_no)); + } } } else { // before 4.3 ret = callback_list_.remove_callbacks_for_rollback_to(to_seq_no, from_seq_no, replay_scn); @@ -2169,9 +2221,11 @@ bool ObTransCallbackMgr::pending_log_size_too_large(const transaction::ObTxSEQ & } } -void ObTransCallbackMgr::set_parallel_logging(const share::SCN serial_final_scn) +void ObTransCallbackMgr::set_parallel_logging(const share::SCN serial_final_scn, + const transaction::ObTxSEQ serial_final_seq_no) { serial_final_scn_.atomic_set(serial_final_scn); + serial_final_seq_no_.atomic_store(serial_final_seq_no); } void ObTransCallbackMgr::update_serial_sync_scn_(const share::SCN scn) diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index c3e1ac25b..fa1411d56 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -197,7 +197,9 @@ public: write_epoch_start_tid_(0), need_merge_(false), for_replay_(false), + has_branch_replayed_into_first_list_(false), serial_final_scn_(share::SCN::max_scn()), + serial_final_seq_no_(), serial_sync_scn_(share::SCN::min_scn()), callback_main_list_append_count_(0), callback_slave_list_append_count_(0), @@ -274,7 +276,8 @@ public: int get_log_guard(const transaction::ObTxSEQ &write_seq, ObCallbackListLogGuard &log_guard, int &cb_list_idx); - void set_parallel_logging(const share::SCN serial_final_scn); + void set_parallel_logging(const share::SCN serial_final_scn, + const transaction::ObTxSEQ serial_final_seq_no); void set_skip_checksum_calc(); bool skip_checksum_calc() const { return ATOMIC_LOAD(&skip_checksum_); } void merge_multi_callback_lists(); @@ -325,6 +328,7 @@ public: transaction::ObPartTransCtx *get_trans_ctx() const; TO_STRING_KV(KP(this), K_(serial_final_scn), + K_(serial_final_seq_no), K_(serial_sync_scn), KP_(callback_lists), K_(need_merge), @@ -387,8 +391,17 @@ private: // On Leader, if no write after takeover, merge is also not required bool need_merge_; bool for_replay_; - // the last serial logg's scn + // used to mark that some branch callback replayed in first callback list + // actually, by default they were replayed into its own callback list by + // hash on branch id. + // this can happened when txn recovery from a point after serial final log + // and branch callback before (or equals to) serial final scn will be put + // into the first list for easy to handle (ensure each callback list will + // be `appended only` after serial final state). + bool has_branch_replayed_into_first_list_; + // the last serial log's scn share::SCN serial_final_scn_; + transaction::ObTxSEQ serial_final_seq_no_; // currently synced serial log's scn // when serial_sync_scn_ == serial_final_scn_ // it means the serial logging or serial replay has been finished diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index 953746a91..3ab017fe1 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -100,11 +100,21 @@ int ObTxCallbackList::append_callback(ObITransCallback *callback, TRANS_LOG(WARN, "before_append_cb failed", K(ret), KPC(callback)); } else { const bool repos_lc = !for_replay && (log_cursor_ == &head_); + ObITransCallback *append_pos = NULL; if (!for_replay || parallel_replay || serial_final || !parallel_start_pos_) { - (void)get_tail()->append(callback); + append_pos = get_tail(); } else { - parallel_start_pos_->get_prev()->append(callback); + append_pos = parallel_start_pos_->get_prev(); } + // for replay, do sanity check: scn is incremental + if (for_replay + && append_pos != &head_ // the head with scn max + && append_pos->get_scn() > callback->get_scn()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "replay callback scn out of order", K(ret), KPC(callback), KPC(this)); + ob_abort(); + } + append_pos->append(callback); // start parallel replay, remember the position if (for_replay && parallel_replay && !serial_final && !parallel_start_pos_) { ATOMIC_STORE(¶llel_start_pos_, get_tail()); diff --git a/src/storage/memtable/ob_memtable_context.h b/src/storage/memtable/ob_memtable_context.h index 36d357424..a0e5f332f 100644 --- a/src/storage/memtable/ob_memtable_context.h +++ b/src/storage/memtable/ob_memtable_context.h @@ -417,8 +417,9 @@ public: int remove_callback_for_uncommited_txn(const memtable::ObMemtableSet *memtable_set); int rollback(const transaction::ObTxSEQ seq_no, const transaction::ObTxSEQ from_seq_no, const share::SCN replay_scn); - void set_parallel_logging(const share::SCN serial_final_scn) { - trans_mgr_.set_parallel_logging(serial_final_scn); + void set_parallel_logging(const share::SCN serial_final_scn, + const transaction::ObTxSEQ serial_final_seq_no) { + trans_mgr_.set_parallel_logging(serial_final_scn, serial_final_seq_no); } void set_skip_checksum_calc() { trans_mgr_.set_skip_checksum_calc(); diff --git a/src/storage/memtable/ob_memtable_mutator.cpp b/src/storage/memtable/ob_memtable_mutator.cpp index bbab8d249..3bd881687 100644 --- a/src/storage/memtable/ob_memtable_mutator.cpp +++ b/src/storage/memtable/ob_memtable_mutator.cpp @@ -1201,6 +1201,7 @@ void ObMemtableMutatorIterator::reset() row_header_.reset(); row_.reset(); table_lock_.reset(); + row_seq_no_.reset(); } int ObMemtableMutatorIterator::deserialize(const char *buf, const int64_t data_len, int64_t &pos, @@ -1260,6 +1261,8 @@ int ObMemtableMutatorIterator::iterate_next_row(ObEncryptRowBuf &decrypt_buf, encrypt_info, unused_need_extract_encrypt_meta, encrypt_meta, unused_encrypt_stat_map, ObTransRowFlag::is_big_row(meta_.get_flags())))) { TRANS_LOG(WARN, "deserialize mutator row fail", K(ret)); + } else { + row_seq_no_ = row_.seq_no_; } break; } @@ -1268,6 +1271,8 @@ int ObMemtableMutatorIterator::iterate_next_row(ObEncryptRowBuf &decrypt_buf, if (OB_FAIL( table_lock_.deserialize(buf_.get_data(), buf_.get_limit(), buf_.get_position()))) { TRANS_LOG(WARN, "deserialize table lock fail", K(ret)); + } else { + row_seq_no_ = table_lock_.seq_no_; } break; } @@ -1291,6 +1296,5 @@ const ObMemtableMutatorRow &ObMemtableMutatorIterator::get_mutator_row() { retur const ObMutatorTableLock &ObMemtableMutatorIterator::get_table_lock_row() { return table_lock_; } - }//namespace memtable }//namespace oceanbase diff --git a/src/storage/memtable/ob_memtable_mutator.h b/src/storage/memtable/ob_memtable_mutator.h index 3f019867e..0f4b56dc6 100644 --- a/src/storage/memtable/ob_memtable_mutator.h +++ b/src/storage/memtable/ob_memtable_mutator.h @@ -335,8 +335,8 @@ public: const ObMutatorRowHeader &get_row_head(); const ObMemtableMutatorRow &get_mutator_row(); const ObMutatorTableLock &get_table_lock_row(); - - TO_STRING_KV(K_(meta),K(buf_.get_position()),K(buf_.get_limit())); + transaction::ObTxSEQ get_row_seq_no() const { return row_seq_no_; } + TO_STRING_KV(K_(meta), K_(row_seq_no), K(buf_.get_position()),K(buf_.get_limit())); private: private: @@ -345,7 +345,7 @@ private: ObMutatorRowHeader row_header_; ObMemtableMutatorRow row_; ObMutatorTableLock table_lock_; - + transaction::ObTxSEQ row_seq_no_; DISALLOW_COPY_AND_ASSIGN(ObMemtableMutatorIterator); }; }//namespace memtable diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 91ec00589..8a47b3797 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1906,7 +1906,7 @@ int ObPartTransCtx::serial_submit_redo_after_write_() ret = submitter.serial_submit(should_switch); if (should_switch && submitter.get_submitted_cnt() > 0) { const share::SCN serial_final_scn = submitter.get_submitted_scn(); - switch_to_parallel_logging_(serial_final_scn); + switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_); TRANS_LOG(INFO, "**switch to parallel logging**", K_(ls_id), K_(trans_id), K(serial_final_scn), @@ -4760,7 +4760,8 @@ int ObPartTransCtx::replay_redo_in_ctx(const ObTxRedoLog &redo_log, const SCN ×tamp, const int64_t &part_log_no, const bool is_tx_log_queue, - const bool serial_final) + const bool serial_final, + const ObTxSEQ &max_seq_no) { int ret = OB_SUCCESS; common::ObTimeGuard timeguard("replay_redo_in_ctx", 10 * 1000); @@ -4805,7 +4806,7 @@ int ObPartTransCtx::replay_redo_in_ctx(const ObTxRedoLog &redo_log, usleep(50_ms); ob_abort(); } else if (!exec_info_.serial_final_scn_.is_valid()) { - switch_to_parallel_logging_(timestamp); + switch_to_parallel_logging_(timestamp, max_seq_no); } } } @@ -9636,7 +9637,8 @@ inline bool ObPartTransCtx::is_support_parallel_replay_() const return cluster_version_accurate_ && cluster_version_ >= CLUSTER_VERSION_4_3_0_0; } -inline void ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_final_scn) +inline void ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_final_scn, + const ObTxSEQ max_seq_no) { // when start replaying serial final redo log or submitted serial final redo log // switch the Tx's logging mode to parallel logging @@ -9651,13 +9653,19 @@ inline void ObPartTransCtx::switch_to_parallel_logging_(const share::SCN serial_ // if an rollback to savepoint before this point, which means // replay of this rollback-savepoint-log must pre-berrier to // wait serial replay parts finished - exec_info_.serial_final_seq_no_ = exec_info_.max_submitted_seq_no_; - mt_ctx_.set_parallel_logging(serial_final_scn); + if (OB_UNLIKELY(!max_seq_no.is_valid())) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "max seq_no of serial final log is invalid", + K(serial_final_scn), K(max_seq_no), KPC(this)); + print_trace_log_(); + ob_abort(); + } + exec_info_.serial_final_seq_no_ = max_seq_no; + mt_ctx_.set_parallel_logging(serial_final_scn, max_seq_no); } inline void ObPartTransCtx::recovery_parallel_logging_() { - mt_ctx_.set_parallel_logging(exec_info_.serial_final_scn_); + mt_ctx_.set_parallel_logging(exec_info_.serial_final_scn_, exec_info_.serial_final_seq_no_); if (exec_info_.max_applied_log_ts_ >= exec_info_.serial_final_scn_) { // the serial final log has been synced or replayed // notify callback_mgr serial part is finished diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 797070c87..740d19498 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -379,7 +379,8 @@ public: const share::SCN ×tamp, const int64_t &part_log_no, const bool is_tx_log_queue, - const bool serial_final); + const bool serial_final, + const ObTxSEQ &max_seq_no); int replay_rollback_to(const ObTxRollbackToLog &log, const palf::LSN &offset, const share::SCN ×tamp, @@ -513,7 +514,8 @@ private: memtable::ObRedoLogSubmitHelper &helper, const logservice::ObReplayBarrierType &barrier); bool should_switch_to_parallel_logging_(); - void switch_to_parallel_logging_(const share::SCN serial_final_scn); + void switch_to_parallel_logging_(const share::SCN serial_final_scn, + const ObTxSEQ max_seq_no); bool has_replay_serial_final_() const; void recovery_parallel_logging_(); int check_can_submit_redo_(); diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index b6e6ac16e..6bf8fec5e 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -361,32 +361,28 @@ int ObTxReplayExecutor::replay_redo_() int tmp_ret = OB_SUCCESS; ObTxRedoLogTempRef temp_ref; ObTxRedoLog redo_log(temp_ref); + const bool serial_final = log_block_.get_header().is_serial_final(); + ObTxSEQ max_seq_no; if (is_tx_log_replay_queue()) { tx_part_log_no_ += 1; // redo is compound with tx log, mark part_log_no is required } + if (OB_ISNULL(ls_)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "[Replay Tx] ls should not be null", K(ret), K(ls_)); } else if (OB_FAIL(log_block_.deserialize_log_body(redo_log))) { TRANS_LOG(WARN, "[Replay Tx] deserialize log body error", K(ret), K(redo_log), K(lsn_), K(log_ts_ns_)); - } else if (OB_FAIL(replay_redo_in_memtable_(redo_log))) { + } else if (OB_FAIL(replay_redo_in_memtable_(redo_log, serial_final, max_seq_no))) { TRANS_LOG(WARN, "[Replay Tx] replay redo in memtable error", K(ret), K(lsn_), K(log_ts_ns_)); } else if (OB_FAIL(ctx_->replay_redo_in_ctx(redo_log, lsn_, log_ts_ns_, tx_part_log_no_, is_tx_log_replay_queue(), - log_block_.get_header().is_serial_final()))) { + serial_final, + max_seq_no))) { TRANS_LOG(WARN, "[Replay Tx] replay redo in tx_ctx error", K(ret), K(lsn_), K(log_ts_ns_)); - // } else if (first_created_ctx_ && redo_log.get_log_no() > 0) { - // // replay a commited tx in recovery process - // ctx_->force_no_need_replay_checksum(); - // TRANS_LOG( - // WARN, - // "[Replay Tx] Don't replay from first redo log and Part_ctx is not existed in tx ctx table", - // K(first_created_ctx_), K(redo_log.get_log_no())); - // ctx_->supplement_undo_actions_if_exist(); } if (OB_SUCC(ret) && OB_TMP_FAIL(mt_ctx_->remove_callbacks_for_fast_commit(replay_queue_, share::SCN::minus(log_ts_ns_, 1)))) { TRANS_LOG(WARN, "[Replay Tx] remove callbacks for fast commit", K(ret), K(tmp_ret), @@ -563,7 +559,7 @@ int ObTxReplayExecutor::replay_clear_() return ret; } -int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo) +int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo, const bool serial_final, ObTxSEQ &max_seq_no) { int ret = OB_SUCCESS; // ObMemtable *cur_mem = nullptr; @@ -579,6 +575,8 @@ int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo) ObCLogEncryptInfo encrypt_info; encrypt_info.init(); + max_seq_no.reset(); + if (OB_ISNULL(mmi_ptr_)) { if (nullptr == (mmi_ptr_ = static_cast( @@ -625,8 +623,18 @@ int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo) K(row_head.tablet_id_), KP(ls_), K(log_ts_ns_), K(tx_part_log_no_), KPC(ctx_)); } - } else { - // do nothing + } else if (OB_UNLIKELY(serial_final)) { + // because the seq no in one log-entry is not in order + // must iterator all to pick the max value + const ObTxSEQ seq_no = mmi_ptr_->get_row_seq_no(); + if (OB_UNLIKELY(!seq_no.is_valid())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "seq no is invalid in mutator row", K(seq_no), KPC(this)); + ob_abort(); + } + if (seq_no.get_seq() > max_seq_no.get_seq()) { + max_seq_no = seq_no; + } } } } @@ -779,22 +787,18 @@ int ObTxReplayExecutor::replay_row_(storage::ObStoreCtx &store_ctx, TRANS_LOG(WARN, "[Replay Tx] this is not a ObMemtable", K(ret), KP(mem_ptr), KPC(mem_ptr), KP(mmi_ptr)); } else if (FALSE_IT(timeguard.click("get_memtable"))) { + // _NOTE_: + // set max_end_scn before repaly_row to ensure memtable will not be released + // before current log replay success + } else if (OB_FAIL(data_mem_ptr->set_max_end_scn(log_ts_ns_))) { + TRANS_LOG(WARN, "[Replay Tx] set memtable max end log ts failed", K(ret), KP(data_mem_ptr)); } else if (OB_FAIL(data_mem_ptr->replay_row(store_ctx, log_ts_ns_, mmi_ptr))) { TRANS_LOG(WARN, "[Replay Tx] replay row error", K(ret)); - } else if (OB_FAIL(data_mem_ptr->set_max_end_scn(log_ts_ns_))) { // for freeze log_ts , may be - TRANS_LOG(WARN, "[Replay Tx] set memtable max end log ts failed", K(ret), KP(data_mem_ptr)); } else if (OB_FAIL(data_mem_ptr->set_rec_scn(log_ts_ns_))) { TRANS_LOG(WARN, "[Replay Tx] set rec_log_ts error", K(ret), KPC(data_mem_ptr)); } timeguard.click("replay_finish"); - if (OB_FAIL(ret) && ret != OB_NO_NEED_UPDATE) { - // We need rollback all callbacks of this log to avoid replay a row - // in a freeze memtable which has a smaller end ts than this log. - // - // The rollback operation must hold write_ref to make memtable stay in memory. - mt_ctx_->rollback_redo_callbacks(replay_queue_, log_ts_ns_); - } return ret; } diff --git a/src/storage/tx/ob_tx_replay_executor.h b/src/storage/tx/ob_tx_replay_executor.h index f65e822a7..2b17f9227 100644 --- a/src/storage/tx/ob_tx_replay_executor.h +++ b/src/storage/tx/ob_tx_replay_executor.h @@ -128,9 +128,9 @@ private: int replay_multi_source_data_(); int replay_record_(); bool is_tx_log_replay_queue() const { return replay_queue_ == 0; } - int replay_redo_in_memtable_(ObTxRedoLog &redo); + int replay_redo_in_memtable_(ObTxRedoLog &redo, const bool serial_final, ObTxSEQ &max_seq_no); virtual int replay_one_row_in_memtable_(memtable::ObMutatorRowHeader& row_head, - memtable::ObMemtableMutatorIterator *mmi_ptr); + memtable::ObMemtableMutatorIterator *mmi_ptr); int prepare_memtable_replay_(storage::ObStorageTableGuard &w_guard, memtable::ObIMemtable *&mem_ptr); int replay_row_(storage::ObStoreCtx &store_ctx, diff --git a/unittest/storage/tx/test_trans_callback_mgr_fill_redo.cpp b/unittest/storage/tx/test_trans_callback_mgr_fill_redo.cpp index beeb3f72b..770d48e0b 100644 --- a/unittest/storage/tx/test_trans_callback_mgr_fill_redo.cpp +++ b/unittest/storage/tx/test_trans_callback_mgr_fill_redo.cpp @@ -136,9 +136,10 @@ public: if (t) { share::SCN scn; scn.convert_for_tx(1231231231); - callback_mgr_.set_parallel_logging(scn); + transaction::ObTxSEQ seq(1000, 0); + callback_mgr_.set_parallel_logging(scn, seq); } else { - callback_mgr_.set_parallel_logging(share::SCN::max_scn()); + callback_mgr_.set_parallel_logging(share::SCN::max_scn(), transaction::ObTxSEQ::INVL()); } } ObRedoLogSubmitHelper helper_;