diff --git a/src/storage/memtable/mvcc/ob_tx_callback_functor.h b/src/storage/memtable/mvcc/ob_tx_callback_functor.h index 18a194bef3..48982eb1f4 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_functor.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_functor.h @@ -278,10 +278,9 @@ public: checksum_scn_(share::SCN::min_scn()), checksumer_(NULL), checksum_last_scn_(share::SCN::min_scn()) {} - virtual bool cond_for_remove(ObITransCallback* callback) = 0; + virtual bool cond_for_remove(ObITransCallback* callback, int &ret) = 0; void set_checksumer(const share::SCN checksum_scn, - TxChecksum *checksumer - ) + TxChecksum *checksumer) { checksum_scn_ = checksum_scn; checksumer_ = checksumer; @@ -307,7 +306,7 @@ public: TRANS_LOG(ERROR, "unexpected callback", KP(callback)); } else if (callback->need_submit_log()) { // Case 1: callback has not been proposed to paxos - if (cond_for_remove(callback)) { + if (cond_for_remove(callback, ret)) { if (need_remove_data_ && OB_FAIL(callback->rollback_callback())) { TRANS_LOG(WARN, "rollback callback failed", K(ret), K(*callback)); } else if (!need_remove_data_ && OB_FAIL(callback->checkpoint_callback())) { @@ -315,11 +314,13 @@ public: } else { need_remove_callback_ = true; } + } else if (OB_FAIL(ret)) { + // check ret } } else if (!callback->need_submit_log()) { // Case 2: callback has submitted to log-service may not persistented // we check removable in cond_for_remove_ ensure it is synced - if (cond_for_remove(callback)) { + if (cond_for_remove(callback, ret)) { if (checksumer_ && callback->get_scn() >= checksum_scn_ && OB_FAIL(callback->calc_checksum(checksum_scn_, checksumer_))) { TRANS_LOG(WARN, "calc checksum callback failed", K(ret), K(*callback)); @@ -333,6 +334,8 @@ public: checksum_last_scn_ = callback->get_scn(); } } + } else if (OB_FAIL(ret)) { + // check ret } else { if (checksumer_) { if (callback->get_scn() >= checksum_scn_ diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp index 63dce8291d..2542091412 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.cpp +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.cpp @@ -391,14 +391,20 @@ int ObTxCallbackList::remove_callbacks_for_rollback_to(const transaction::ObTxSE struct Functor final : public ObRemoveCallbacksWCondFunctor { Functor(const share::SCN right_bound, const bool need_remove_data = true) : ObRemoveCallbacksWCondFunctor(right_bound, need_remove_data) {} - bool cond_for_remove(ObITransCallback *callback) { + bool cond_for_remove(ObITransCallback *callback, int &ret) { transaction::ObTxSEQ dseq = callback->get_seq_no(); - // sanity check - OB_ASSERT(to_seq_.support_branch() == dseq.support_branch()); - return (to_seq_.get_branch() == 0 // match all branches - || to_seq_.get_branch() == dseq.get_branch()) // match target branch - && dseq.get_seq() > to_seq_.get_seq() // exclusive - && dseq.get_seq() < from_seq_.get_seq(); // inclusive + bool match = false; + if (to_seq_.get_branch() == 0 // match all branches + || to_seq_.get_branch() == dseq.get_branch()) { // match target branch + if (dseq.get_seq() >= from_seq_.get_seq()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "found callback with seq_no larger than rollback from point", + K(ret), K(dseq), K_(from_seq), K_(to_seq), KPC(callback)); + } else { + match = dseq.get_seq() > to_seq_.get_seq(); // exclusive + } + } + return match; } transaction::ObTxSEQ to_seq_; transaction::ObTxSEQ from_seq_; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 49ef01d873..5134a1516f 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -7884,8 +7884,10 @@ int ObPartTransCtx::end_access() * * @op_sn - operation sequence number, used to reject out of order msg * @from_scn - the start position of rollback, inclusive + * generally not specified, and generated in callee * @to_scn - the end position of rollback, exclusive - * + * @seq_base - the baseline of TxSEQ of current transaction + * * savepoint may be created in these ways: * 1) created at txn scheduler, named Global-Savepoint * 2) created at txn participant server, named Local-Savepoint @@ -7904,8 +7906,9 @@ int ObPartTransCtx::end_access() * when start_access was called */ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, - const ObTxSEQ from_scn, + ObTxSEQ from_scn, const ObTxSEQ to_scn, + const int64_t seq_base, ObIArray &downstream_parts) { int ret = OB_SUCCESS; @@ -7921,8 +7924,7 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, } else if (!leader) { ret = OB_NOT_MASTER; } - TRANS_LOG(WARN, "rollback_to need retry because of logging", K(ret), - K(trans_id_), K(ls_id_), K(busy_cbs_.get_size())); + TRANS_LOG(WARN, "rollback_to need retry because of logging", K(ret), K(trans_id_), K(ls_id_), K(busy_cbs_.get_size())); } else if (is_2pc_blocking()) { ret = OB_NEED_RETRY; TRANS_LOG(WARN, "rollback_to need retry because of 2pc blocking", K(trans_id_), K(ls_id_), KP(this), K(ret)); @@ -7932,15 +7934,15 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, } else if ((to_scn.get_branch() == 0) && pending_write_ > 0) { // for branch savepoint rollback, pending_write !=0 almostly ret = OB_NEED_RETRY; - TRANS_LOG(WARN, "has pending write, rollback blocked", - K(ret), K(pending_write_), KPC(this)); + TRANS_LOG(WARN, "has pending write, rollback blocked", K(ret), K(to_scn), K(pending_write_), KPC(this)); } else if (last_scn_ <= to_scn) { - TRANS_LOG(INFO, "rollback succeed trivially", K_(trans_id), - K_(ls_id), K(op_sn), K(to_scn), K_(last_scn)); + TRANS_LOG(INFO, "rollback succeed trivially", K_(trans_id), K_(ls_id), K(op_sn), K(to_scn), K_(last_scn)); + } else if (!from_scn.is_valid() && + // generate from if not specified + FALSE_IT(from_scn = to_scn.clone_with_seq(ObSequence::inc_and_get_max_seq_no(), seq_base))) { } else if (OB_FAIL(rollback_to_savepoint_(from_scn, to_scn, share::SCN::invalid_scn()))) { - TRANS_LOG(WARN, "rollback_to_savepoint fail", K(ret), - K(from_scn), K(to_scn), K(op_sn), KPC(this)); - } else if (to_scn.get_branch() == 0){ + TRANS_LOG(WARN, "rollback_to_savepoint fail", K(ret), K(from_scn), K(to_scn), K(op_sn), KPC(this)); + } else if (to_scn.get_branch() == 0) { last_scn_ = to_scn; } // must add downstream parts when return success diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index f578cae1cf..9d8b86fd32 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -864,7 +864,11 @@ public: * end_access - end of txn protected resources access */ int end_access(); - int rollback_to_savepoint(const int64_t op_sn, const ObTxSEQ from_scn, const ObTxSEQ to_scn, ObIArray &downstream_parts); + int rollback_to_savepoint(const int64_t op_sn, + ObTxSEQ from_seq, + const ObTxSEQ to_seq, + const int64_t seq_base, + ObIArray &downstream_parts); bool is_xa_trans() const { return !exec_info_.xid_.empty(); } bool is_transfer_deleted() const { return transfer_deleted_; } int handle_tx_keepalive_response(const int64_t status); diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index d1d6f8d588..dde88da911 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1298,11 +1298,8 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, int ret = OB_SUCCESS; int64_t retry_cnt = 0; bool blockable = expire_ts > 0; - const int64_t seq_abs = ObSequence::inc_and_get_max_seq_no(); - const ObTxSEQ from_scn = specified_from_scn.is_valid() ? specified_from_scn - : savepoint.clone_with_seq(seq_abs, tx_seq_base); do { - ret = part_ctx->rollback_to_savepoint(op_sn, from_scn, savepoint, downstream_parts); + ret = part_ctx->rollback_to_savepoint(op_sn, specified_from_scn, savepoint, tx_seq_base, downstream_parts); if (OB_NEED_RETRY == ret && blockable) { if (ObTimeUtility::current_time() >= expire_ts) { ret = OB_TIMEOUT;