[master] savepoint rollback may rollback future data
This commit is contained in:
@ -7213,10 +7213,9 @@ int ObPartTransCtx::check_status_()
|
|||||||
* purpose:
|
* purpose:
|
||||||
* 1) verify transaction ctx is *writable*
|
* 1) verify transaction ctx is *writable*
|
||||||
* 2) acquire memtable ctx's ref
|
* 2) acquire memtable ctx's ref
|
||||||
* 3) remember data_scn
|
* 3) alloc data_scn
|
||||||
*/
|
*/
|
||||||
int ObPartTransCtx::start_access(const ObTxDesc &tx_desc,
|
int ObPartTransCtx::start_access(const ObTxDesc &tx_desc, ObTxSEQ &data_scn)
|
||||||
const ObTxSEQ data_scn)
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
CtxLockGuard guard(lock_);
|
CtxLockGuard guard(lock_);
|
||||||
@ -7225,14 +7224,18 @@ int ObPartTransCtx::start_access(const ObTxDesc &tx_desc,
|
|||||||
ret = OB_TRANS_SQL_SEQUENCE_ILLEGAL;
|
ret = OB_TRANS_SQL_SEQUENCE_ILLEGAL;
|
||||||
TRANS_LOG(WARN, "stale access operation", K(ret),
|
TRANS_LOG(WARN, "stale access operation", K(ret),
|
||||||
K_(tx_desc.op_sn), K_(last_op_sn), KPC(this), K(tx_desc));
|
K_(tx_desc.op_sn), K_(last_op_sn), KPC(this), K(tx_desc));
|
||||||
} else if (FALSE_IT(++pending_write_)) {
|
} else {
|
||||||
} else if (FALSE_IT(last_scn_ = MAX(data_scn, last_scn_))) {
|
if (!data_scn.is_valid()) {
|
||||||
} else if (!first_scn_.is_valid() && FALSE_IT(first_scn_ = last_scn_)) {
|
data_scn = tx_desc.inc_and_get_tx_seq(0);
|
||||||
} else if (tx_desc.op_sn_ != last_op_sn_) {
|
}
|
||||||
|
++pending_write_;
|
||||||
|
last_scn_ = MAX(data_scn, last_scn_);
|
||||||
|
if (!first_scn_.is_valid()) {
|
||||||
|
first_scn_ = last_scn_;
|
||||||
|
}
|
||||||
|
if (tx_desc.op_sn_ != last_op_sn_) {
|
||||||
last_op_sn_ = tx_desc.op_sn_;
|
last_op_sn_ = tx_desc.op_sn_;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
mt_ctx_.inc_ref();
|
mt_ctx_.inc_ref();
|
||||||
mt_ctx_.acquire_callback_list();
|
mt_ctx_.acquire_callback_list();
|
||||||
}
|
}
|
||||||
@ -7318,14 +7321,13 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn,
|
|||||||
K(trans_id_), K(ls_id_), K(busy_cbs_.get_size()));
|
K(trans_id_), K(ls_id_), K(busy_cbs_.get_size()));
|
||||||
} else if (op_sn < last_op_sn_) {
|
} else if (op_sn < last_op_sn_) {
|
||||||
ret = OB_TRANS_SQL_SEQUENCE_ILLEGAL;
|
ret = OB_TRANS_SQL_SEQUENCE_ILLEGAL;
|
||||||
} else if (op_sn > last_op_sn_ && last_scn_ <= to_scn) {
|
} else if (FALSE_IT(last_op_sn_ = op_sn)) {
|
||||||
last_op_sn_ = op_sn;
|
} else if (pending_write_ > 0) {
|
||||||
TRANS_LOG(INFO, "rollback succeed trivially", K(op_sn), K(to_scn), K_(last_scn));
|
|
||||||
} else if (op_sn > last_op_sn_ && pending_write_ > 0) {
|
|
||||||
ret = OB_NEED_RETRY;
|
ret = OB_NEED_RETRY;
|
||||||
TRANS_LOG(WARN, "has pending write, rollback blocked",
|
TRANS_LOG(WARN, "has pending write, rollback blocked",
|
||||||
K(ret), K(pending_write_), KPC(this));
|
K(ret), K(pending_write_), KPC(this));
|
||||||
} else if (FALSE_IT(last_op_sn_ = op_sn)) {
|
} else if (last_scn_ <= to_scn) {
|
||||||
|
TRANS_LOG(INFO, "rollback succeed trivially", K(op_sn), K(to_scn), K_(last_scn));
|
||||||
} else if (OB_FAIL(rollback_to_savepoint_(from_scn, to_scn))) {
|
} else if (OB_FAIL(rollback_to_savepoint_(from_scn, to_scn))) {
|
||||||
TRANS_LOG(WARN, "rollback_to_savepoint fail", K(ret),
|
TRANS_LOG(WARN, "rollback_to_savepoint fail", K(ret),
|
||||||
K(from_scn), K(to_scn), K(op_sn), KPC(this));
|
K(from_scn), K(to_scn), K(op_sn), KPC(this));
|
||||||
|
|||||||
@ -733,8 +733,7 @@ public:
|
|||||||
* @data_seq: the sequence_no of current access
|
* @data_seq: the sequence_no of current access
|
||||||
* new created data will marked with this seq no
|
* new created data will marked with this seq no
|
||||||
*/
|
*/
|
||||||
int start_access(const ObTxDesc &tx_desc,
|
int start_access(const ObTxDesc &tx_desc, ObTxSEQ &data_seq);
|
||||||
const ObTxSEQ data_seq);
|
|
||||||
/*
|
/*
|
||||||
* end_access - end of txn protected resources access
|
* end_access - end of txn protected resources access
|
||||||
*/
|
*/
|
||||||
|
|||||||
@ -1079,7 +1079,7 @@ int ObTransService::get_write_store_ctx(ObTxDesc &tx,
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const share::ObLSID &ls_id = store_ctx.ls_id_;
|
const share::ObLSID &ls_id = store_ctx.ls_id_;
|
||||||
ObPartTransCtx *tx_ctx = NULL;
|
ObPartTransCtx *tx_ctx = NULL;
|
||||||
const ObTxSEQ data_scn = spec_seq_no.is_valid() ? spec_seq_no : tx.inc_and_get_tx_seq(0);
|
ObTxSEQ data_scn = spec_seq_no; // for LOB aux table, spec_seq_no is valid
|
||||||
ObTxSnapshot snap = snapshot.core_;
|
ObTxSnapshot snap = snapshot.core_;
|
||||||
ObTxTableGuard tx_table_guard;
|
ObTxTableGuard tx_table_guard;
|
||||||
bool access_started = false;
|
bool access_started = false;
|
||||||
|
|||||||
Reference in New Issue
Block a user