[4.3] fix the from_seq of rollback-to-savpoint maybe too small
This commit is contained in:
@ -278,10 +278,9 @@ public:
|
|||||||
checksum_scn_(share::SCN::min_scn()),
|
checksum_scn_(share::SCN::min_scn()),
|
||||||
checksumer_(NULL),
|
checksumer_(NULL),
|
||||||
checksum_last_scn_(share::SCN::min_scn()) {}
|
checksum_last_scn_(share::SCN::min_scn()) {}
|
||||||
virtual bool cond_for_remove(ObITransCallback* callback) = 0;
|
virtual bool cond_for_remove(ObITransCallback* callback, int &ret) = 0;
|
||||||
void set_checksumer(const share::SCN checksum_scn,
|
void set_checksumer(const share::SCN checksum_scn,
|
||||||
TxChecksum *checksumer
|
TxChecksum *checksumer)
|
||||||
)
|
|
||||||
{
|
{
|
||||||
checksum_scn_ = checksum_scn;
|
checksum_scn_ = checksum_scn;
|
||||||
checksumer_ = checksumer;
|
checksumer_ = checksumer;
|
||||||
@ -307,7 +306,7 @@ public:
|
|||||||
TRANS_LOG(ERROR, "unexpected callback", KP(callback));
|
TRANS_LOG(ERROR, "unexpected callback", KP(callback));
|
||||||
} else if (callback->need_submit_log()) {
|
} else if (callback->need_submit_log()) {
|
||||||
// Case 1: callback has not been proposed to paxos
|
// Case 1: callback has not been proposed to paxos
|
||||||
if (cond_for_remove(callback)) {
|
if (cond_for_remove(callback, ret)) {
|
||||||
if (need_remove_data_ && OB_FAIL(callback->rollback_callback())) {
|
if (need_remove_data_ && OB_FAIL(callback->rollback_callback())) {
|
||||||
TRANS_LOG(WARN, "rollback callback failed", K(ret), K(*callback));
|
TRANS_LOG(WARN, "rollback callback failed", K(ret), K(*callback));
|
||||||
} else if (!need_remove_data_ && OB_FAIL(callback->checkpoint_callback())) {
|
} else if (!need_remove_data_ && OB_FAIL(callback->checkpoint_callback())) {
|
||||||
@ -315,11 +314,13 @@ public:
|
|||||||
} else {
|
} else {
|
||||||
need_remove_callback_ = true;
|
need_remove_callback_ = true;
|
||||||
}
|
}
|
||||||
|
} else if (OB_FAIL(ret)) {
|
||||||
|
// check ret
|
||||||
}
|
}
|
||||||
} else if (!callback->need_submit_log()) {
|
} else if (!callback->need_submit_log()) {
|
||||||
// Case 2: callback has submitted to log-service may not persistented
|
// Case 2: callback has submitted to log-service may not persistented
|
||||||
// we check removable in cond_for_remove_ ensure it is synced
|
// we check removable in cond_for_remove_ ensure it is synced
|
||||||
if (cond_for_remove(callback)) {
|
if (cond_for_remove(callback, ret)) {
|
||||||
if (checksumer_ && callback->get_scn() >= checksum_scn_
|
if (checksumer_ && callback->get_scn() >= checksum_scn_
|
||||||
&& OB_FAIL(callback->calc_checksum(checksum_scn_, checksumer_))) {
|
&& OB_FAIL(callback->calc_checksum(checksum_scn_, checksumer_))) {
|
||||||
TRANS_LOG(WARN, "calc checksum callback failed", K(ret), K(*callback));
|
TRANS_LOG(WARN, "calc checksum callback failed", K(ret), K(*callback));
|
||||||
@ -333,6 +334,8 @@ public:
|
|||||||
checksum_last_scn_ = callback->get_scn();
|
checksum_last_scn_ = callback->get_scn();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (OB_FAIL(ret)) {
|
||||||
|
// check ret
|
||||||
} else {
|
} else {
|
||||||
if (checksumer_) {
|
if (checksumer_) {
|
||||||
if (callback->get_scn() >= checksum_scn_
|
if (callback->get_scn() >= checksum_scn_
|
||||||
|
|||||||
@ -391,14 +391,20 @@ int ObTxCallbackList::remove_callbacks_for_rollback_to(const transaction::ObTxSE
|
|||||||
struct Functor final : public ObRemoveCallbacksWCondFunctor {
|
struct Functor final : public ObRemoveCallbacksWCondFunctor {
|
||||||
Functor(const share::SCN right_bound, const bool need_remove_data = true)
|
Functor(const share::SCN right_bound, const bool need_remove_data = true)
|
||||||
: ObRemoveCallbacksWCondFunctor(right_bound, need_remove_data) {}
|
: ObRemoveCallbacksWCondFunctor(right_bound, need_remove_data) {}
|
||||||
bool cond_for_remove(ObITransCallback *callback) {
|
bool cond_for_remove(ObITransCallback *callback, int &ret) {
|
||||||
transaction::ObTxSEQ dseq = callback->get_seq_no();
|
transaction::ObTxSEQ dseq = callback->get_seq_no();
|
||||||
// sanity check
|
bool match = false;
|
||||||
OB_ASSERT(to_seq_.support_branch() == dseq.support_branch());
|
if (to_seq_.get_branch() == 0 // match all branches
|
||||||
return (to_seq_.get_branch() == 0 // match all branches
|
|| to_seq_.get_branch() == dseq.get_branch()) { // match target branch
|
||||||
|| to_seq_.get_branch() == dseq.get_branch()) // match target branch
|
if (dseq.get_seq() >= from_seq_.get_seq()) {
|
||||||
&& dseq.get_seq() > to_seq_.get_seq() // exclusive
|
ret = OB_ERR_UNEXPECTED;
|
||||||
&& dseq.get_seq() < from_seq_.get_seq(); // inclusive
|
TRANS_LOG(ERROR, "found callback with seq_no larger than rollback from point",
|
||||||
|
K(ret), K(dseq), K_(from_seq), K_(to_seq), KPC(callback));
|
||||||
|
} else {
|
||||||
|
match = dseq.get_seq() > to_seq_.get_seq(); // exclusive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return match;
|
||||||
}
|
}
|
||||||
transaction::ObTxSEQ to_seq_;
|
transaction::ObTxSEQ to_seq_;
|
||||||
transaction::ObTxSEQ from_seq_;
|
transaction::ObTxSEQ from_seq_;
|
||||||
|
|||||||
@ -7884,7 +7884,9 @@ int ObPartTransCtx::end_access()
|
|||||||
*
|
*
|
||||||
* @op_sn - operation sequence number, used to reject out of order msg
|
* @op_sn - operation sequence number, used to reject out of order msg
|
||||||
* @from_scn - the start position of rollback, inclusive
|
* @from_scn - the start position of rollback, inclusive
|
||||||
|
* generally not specified, and generated in callee
|
||||||
* @to_scn - the end position of rollback, exclusive
|
* @to_scn - the end position of rollback, exclusive
|
||||||
|
* @seq_base - the baseline of TxSEQ of current transaction
|
||||||
*
|
*
|
||||||
* savepoint may be created in these ways:
|
* savepoint may be created in these ways:
|
||||||
* 1) created at txn scheduler, named Global-Savepoint
|
* 1) created at txn scheduler, named Global-Savepoint
|
||||||
@ -7904,8 +7906,9 @@ int ObPartTransCtx::end_access()
|
|||||||
* when start_access was called
|
* when start_access was called
|
||||||
*/
|
*/
|
||||||
int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn,
|
int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn,
|
||||||
const ObTxSEQ from_scn,
|
ObTxSEQ from_scn,
|
||||||
const ObTxSEQ to_scn,
|
const ObTxSEQ to_scn,
|
||||||
|
const int64_t seq_base,
|
||||||
ObIArray<ObTxLSEpochPair> &downstream_parts)
|
ObIArray<ObTxLSEpochPair> &downstream_parts)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -7921,8 +7924,7 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn,
|
|||||||
} else if (!leader) {
|
} else if (!leader) {
|
||||||
ret = OB_NOT_MASTER;
|
ret = OB_NOT_MASTER;
|
||||||
}
|
}
|
||||||
TRANS_LOG(WARN, "rollback_to need retry because of logging", K(ret),
|
TRANS_LOG(WARN, "rollback_to need retry because of logging", K(ret), K(trans_id_), K(ls_id_), K(busy_cbs_.get_size()));
|
||||||
K(trans_id_), K(ls_id_), K(busy_cbs_.get_size()));
|
|
||||||
} else if (is_2pc_blocking()) {
|
} else if (is_2pc_blocking()) {
|
||||||
ret = OB_NEED_RETRY;
|
ret = OB_NEED_RETRY;
|
||||||
TRANS_LOG(WARN, "rollback_to need retry because of 2pc blocking", K(trans_id_), K(ls_id_), KP(this), K(ret));
|
TRANS_LOG(WARN, "rollback_to need retry because of 2pc blocking", K(trans_id_), K(ls_id_), KP(this), K(ret));
|
||||||
@ -7932,15 +7934,15 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn,
|
|||||||
} else if ((to_scn.get_branch() == 0) && pending_write_ > 0) {
|
} else if ((to_scn.get_branch() == 0) && pending_write_ > 0) {
|
||||||
// for branch savepoint rollback, pending_write !=0 almostly
|
// for branch savepoint rollback, pending_write !=0 almostly
|
||||||
ret = OB_NEED_RETRY;
|
ret = OB_NEED_RETRY;
|
||||||
TRANS_LOG(WARN, "has pending write, rollback blocked",
|
TRANS_LOG(WARN, "has pending write, rollback blocked", K(ret), K(to_scn), K(pending_write_), KPC(this));
|
||||||
K(ret), K(pending_write_), KPC(this));
|
|
||||||
} else if (last_scn_ <= to_scn) {
|
} else if (last_scn_ <= to_scn) {
|
||||||
TRANS_LOG(INFO, "rollback succeed trivially", K_(trans_id),
|
TRANS_LOG(INFO, "rollback succeed trivially", K_(trans_id), K_(ls_id), K(op_sn), K(to_scn), K_(last_scn));
|
||||||
K_(ls_id), K(op_sn), K(to_scn), K_(last_scn));
|
} else if (!from_scn.is_valid() &&
|
||||||
|
// generate from if not specified
|
||||||
|
FALSE_IT(from_scn = to_scn.clone_with_seq(ObSequence::inc_and_get_max_seq_no(), seq_base))) {
|
||||||
} else if (OB_FAIL(rollback_to_savepoint_(from_scn, to_scn, share::SCN::invalid_scn()))) {
|
} else if (OB_FAIL(rollback_to_savepoint_(from_scn, to_scn, share::SCN::invalid_scn()))) {
|
||||||
TRANS_LOG(WARN, "rollback_to_savepoint fail", K(ret),
|
TRANS_LOG(WARN, "rollback_to_savepoint fail", K(ret), K(from_scn), K(to_scn), K(op_sn), KPC(this));
|
||||||
K(from_scn), K(to_scn), K(op_sn), KPC(this));
|
} else if (to_scn.get_branch() == 0) {
|
||||||
} else if (to_scn.get_branch() == 0){
|
|
||||||
last_scn_ = to_scn;
|
last_scn_ = to_scn;
|
||||||
}
|
}
|
||||||
// must add downstream parts when return success
|
// must add downstream parts when return success
|
||||||
|
|||||||
@ -864,7 +864,11 @@ public:
|
|||||||
* end_access - end of txn protected resources access
|
* end_access - end of txn protected resources access
|
||||||
*/
|
*/
|
||||||
int end_access();
|
int end_access();
|
||||||
int rollback_to_savepoint(const int64_t op_sn, const ObTxSEQ from_scn, const ObTxSEQ to_scn, ObIArray<ObTxLSEpochPair> &downstream_parts);
|
int rollback_to_savepoint(const int64_t op_sn,
|
||||||
|
ObTxSEQ from_seq,
|
||||||
|
const ObTxSEQ to_seq,
|
||||||
|
const int64_t seq_base,
|
||||||
|
ObIArray<ObTxLSEpochPair> &downstream_parts);
|
||||||
bool is_xa_trans() const { return !exec_info_.xid_.empty(); }
|
bool is_xa_trans() const { return !exec_info_.xid_.empty(); }
|
||||||
bool is_transfer_deleted() const { return transfer_deleted_; }
|
bool is_transfer_deleted() const { return transfer_deleted_; }
|
||||||
int handle_tx_keepalive_response(const int64_t status);
|
int handle_tx_keepalive_response(const int64_t status);
|
||||||
|
|||||||
@ -1298,11 +1298,8 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t retry_cnt = 0;
|
int64_t retry_cnt = 0;
|
||||||
bool blockable = expire_ts > 0;
|
bool blockable = expire_ts > 0;
|
||||||
const int64_t seq_abs = ObSequence::inc_and_get_max_seq_no();
|
|
||||||
const ObTxSEQ from_scn = specified_from_scn.is_valid() ? specified_from_scn
|
|
||||||
: savepoint.clone_with_seq(seq_abs, tx_seq_base);
|
|
||||||
do {
|
do {
|
||||||
ret = part_ctx->rollback_to_savepoint(op_sn, from_scn, savepoint, downstream_parts);
|
ret = part_ctx->rollback_to_savepoint(op_sn, specified_from_scn, savepoint, tx_seq_base, downstream_parts);
|
||||||
if (OB_NEED_RETRY == ret && blockable) {
|
if (OB_NEED_RETRY == ret && blockable) {
|
||||||
if (ObTimeUtility::current_time() >= expire_ts) {
|
if (ObTimeUtility::current_time() >= expire_ts) {
|
||||||
ret = OB_TIMEOUT;
|
ret = OB_TIMEOUT;
|
||||||
|
|||||||
Reference in New Issue
Block a user