fix wrong operation between invalid scn using replay argument

This commit is contained in:
Handora 2024-10-12 04:01:27 +00:00 committed by ob-robot
parent 30047d08e0
commit ecbc194fc0

View File

@ -9903,14 +9903,18 @@ int ObPartTransCtx::do_transfer_out_tx_op(const SCN data_end_scn,
} else if (NotifyType::REGISTER_SUCC == op_type) {
// blocking active tx which start_scn <= data_end_scn
// when register modify memory state only
if (exec_info_.max_applying_log_ts_.is_valid() && exec_info_.max_applying_log_ts_ >= op_scn) {
if (is_replay
&& exec_info_.max_applying_log_ts_.is_valid()
&& exec_info_.max_applying_log_ts_ >= op_scn) {
// do nothing
} else {
sub_state_.set_transfer_blocking();
is_operated = true;
}
} else if (NotifyType::ON_REDO == op_type) {
if (exec_info_.max_applying_log_ts_.is_valid() && exec_info_.max_applying_log_ts_ >= op_scn) {
if (is_replay
&& exec_info_.max_applying_log_ts_.is_valid()
&& exec_info_.max_applying_log_ts_ >= op_scn) {
// do nothing
} else if (FALSE_IT(sub_state_.set_transfer_blocking())) {
} else if (FALSE_IT(exec_info_.is_transfer_blocking_ = true)) {
@ -9920,7 +9924,9 @@ int ObPartTransCtx::do_transfer_out_tx_op(const SCN data_end_scn,
is_operated = true;
}
} else if (NotifyType::ON_COMMIT == op_type) {
if (exec_info_.max_applying_log_ts_.is_valid() && exec_info_.max_applying_log_ts_ >= op_scn) {
if (is_replay
&& exec_info_.max_applying_log_ts_.is_valid()
&& exec_info_.max_applying_log_ts_ >= op_scn) {
// do nothing
} else {
// just for check
@ -9975,7 +9981,9 @@ int ObPartTransCtx::do_transfer_out_tx_op(const SCN data_end_scn,
// leader on_register fail to clean
sub_state_.clear_transfer_blocking();
exec_info_.is_transfer_blocking_ = false;
} else if (exec_info_.max_applying_log_ts_.is_valid() && exec_info_.max_applying_log_ts_ >= op_scn) {
} else if (is_replay
&& exec_info_.max_applying_log_ts_.is_valid()
&& exec_info_.max_applying_log_ts_ >= op_scn) {
// replay filter
} else {
sub_state_.clear_transfer_blocking();
@ -10170,7 +10178,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
ret = OB_NEED_RETRY;
TRANS_LOG(WARN, "has state log submitting need retry", KR(ret), K(trans_id_), K(sub_state_));
} else if (NotifyType::REGISTER_SUCC == move_tx_param.op_type_) {
if (exec_info_.max_applying_log_ts_.is_valid() && exec_info_.max_applying_log_ts_ >= move_tx_param.op_scn_) {
if (move_tx_param.is_replay_
&& exec_info_.max_applying_log_ts_.is_valid()
&& exec_info_.max_applying_log_ts_ >= move_tx_param.op_scn_) {
// do nothing
} else if (exec_info_.state_ >= ObTxState::ABORT) {
// this ctx may be recycled soon
@ -10214,7 +10224,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
sub_state_.set_transfer_blocking();
}
} else if (NotifyType::ON_REDO == move_tx_param.op_type_) {
if (exec_info_.max_applying_log_ts_.is_valid() && exec_info_.max_applying_log_ts_ >=move_tx_param.op_scn_) {
if (move_tx_param.is_replay_
&& exec_info_.max_applying_log_ts_.is_valid()
&& exec_info_.max_applying_log_ts_ >= move_tx_param.op_scn_) {
// do nothing
} else {
if (is_new_created && is_follower_()) {
@ -10230,7 +10242,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
}
} else if (NotifyType::ON_COMMIT == move_tx_param.op_type_) {
bool is_aborted = false;
if (exec_info_.max_applying_log_ts_.is_valid() && exec_info_.max_applying_log_ts_ >= move_tx_param.op_scn_) {
if (move_tx_param.is_replay_
&& exec_info_.max_applying_log_ts_.is_valid()
&& exec_info_.max_applying_log_ts_ >= move_tx_param.op_scn_) {
// do nothing
} else if (epoch_ == arg.epoch_ && // created by myself
OB_FAIL(check_is_aborted_in_tx_data_(trans_id_, is_aborted))) {
@ -10337,8 +10351,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param,
// leader register fail to clean
sub_state_.clear_transfer_blocking();
exec_info_.is_transfer_blocking_ = false;
} else if (exec_info_.max_applying_log_ts_.is_valid() &&
exec_info_.max_applying_log_ts_ >= move_tx_param.op_scn_) {
} else if (move_tx_param.is_replay_
&& exec_info_.max_applying_log_ts_.is_valid()
&& exec_info_.max_applying_log_ts_ >= move_tx_param.op_scn_) {
// replay filter
} else {
sub_state_.clear_transfer_blocking();