fix bugs in 2pc of xa
This commit is contained in:
		@ -5078,7 +5078,7 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts)
 | 
				
			|||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
        TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_table_lock), KPC(this));
 | 
					        TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_table_lock), KPC(this));
 | 
				
			||||||
        if (has_persisted_log_()) {
 | 
					        if (has_persisted_log_()) {
 | 
				
			||||||
          if (ObPartTransAction::COMMIT == part_trans_action_ || get_upstream_state() >= ObTxState::PREPARE) {
 | 
					          if (ObPartTransAction::COMMIT == part_trans_action_ || get_upstream_state() >= ObTxState::REDO_COMPLETE) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            TRANS_LOG(WARN, "abort self instantly with a tx_commit request", K(contain_table_lock),
 | 
					            TRANS_LOG(WARN, "abort self instantly with a tx_commit request", K(contain_table_lock),
 | 
				
			||||||
                      KPC(this));
 | 
					                      KPC(this));
 | 
				
			||||||
@ -6101,7 +6101,6 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts,
 | 
				
			|||||||
  } else if (OB_UNLIKELY(is_2pc_logging_())) {
 | 
					  } else if (OB_UNLIKELY(is_2pc_logging_())) {
 | 
				
			||||||
    TRANS_LOG(WARN, "tx is 2pc logging", KPC(this));
 | 
					    TRANS_LOG(WARN, "tx is 2pc logging", KPC(this));
 | 
				
			||||||
  } else if (ObTxState::INIT != upstream_state_) {
 | 
					  } else if (ObTxState::INIT != upstream_state_) {
 | 
				
			||||||
    // TODO, consider prepare state
 | 
					 | 
				
			||||||
    if (is_prepared_sub2pc()) {
 | 
					    if (is_prepared_sub2pc()) {
 | 
				
			||||||
      if (OB_FAIL(post_tx_sub_prepare_resp_(OB_SUCCESS))) {
 | 
					      if (OB_FAIL(post_tx_sub_prepare_resp_(OB_SUCCESS))) {
 | 
				
			||||||
        TRANS_LOG(WARN, "fail to post sub prepare response", K(ret), KPC(this));
 | 
					        TRANS_LOG(WARN, "fail to post sub prepare response", K(ret), KPC(this));
 | 
				
			||||||
@ -6114,6 +6113,12 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts,
 | 
				
			|||||||
  } else if (OB_UNLIKELY(pending_write_)) {
 | 
					  } else if (OB_UNLIKELY(pending_write_)) {
 | 
				
			||||||
    ret = OB_ERR_UNEXPECTED;
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
    TRANS_LOG(WARN, "access in progress", K(ret), K_(pending_write), KPC(this));
 | 
					    TRANS_LOG(WARN, "access in progress", K(ret), K_(pending_write), KPC(this));
 | 
				
			||||||
 | 
					  } else if (sub_state_.is_force_abort()) {
 | 
				
			||||||
 | 
					    if (OB_FAIL(compensate_abort_log_())) {
 | 
				
			||||||
 | 
					      TRANS_LOG(WARN, "compensate abort log failed", K(ret), KPC(this));
 | 
				
			||||||
 | 
					    } else {
 | 
				
			||||||
 | 
					      ret = OB_TRANS_KILLED;
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
  } else if (OB_FAIL(set_2pc_participants_(parts))) {
 | 
					  } else if (OB_FAIL(set_2pc_participants_(parts))) {
 | 
				
			||||||
    TRANS_LOG(WARN, "set participants failed", K(ret), KPC(this));
 | 
					    TRANS_LOG(WARN, "set participants failed", K(ret), KPC(this));
 | 
				
			||||||
  } else if (OB_FAIL(set_2pc_request_id_(request_id))) {
 | 
					  } else if (OB_FAIL(set_2pc_request_id_(request_id))) {
 | 
				
			||||||
@ -6162,7 +6167,7 @@ int ObPartTransCtx::sub_end_tx(const int64_t &request_id,
 | 
				
			|||||||
  } else if (xid.empty() || xid != exec_info_.xid_) {
 | 
					  } else if (xid.empty() || xid != exec_info_.xid_) {
 | 
				
			||||||
    ret = OB_INVALID_ARGUMENT;
 | 
					    ret = OB_INVALID_ARGUMENT;
 | 
				
			||||||
    TRANS_LOG(WARN, "invalid argument", K(ret), K(xid), KPC(this));
 | 
					    TRANS_LOG(WARN, "invalid argument", K(ret), K(xid), KPC(this));
 | 
				
			||||||
  } else if (!is_sub2pc()) {
 | 
					  } else if (!is_sub2pc() && !is_rollback) {
 | 
				
			||||||
    ret = OB_ERR_UNEXPECTED;
 | 
					    ret = OB_ERR_UNEXPECTED;
 | 
				
			||||||
    TRANS_LOG(WARN, "unexpected trans ctx", KR(ret), KPC(this));
 | 
					    TRANS_LOG(WARN, "unexpected trans ctx", KR(ret), KPC(this));
 | 
				
			||||||
  } else if (OB_UNLIKELY(is_follower_())) {
 | 
					  } else if (OB_UNLIKELY(is_follower_())) {
 | 
				
			||||||
 | 
				
			|||||||
@ -279,7 +279,7 @@ int ObTxCycleTwoPhaseCommitter::continue_execution(const bool is_rollback)
 | 
				
			|||||||
  const ObTxState state = get_downstream_state();
 | 
					  const ObTxState state = get_downstream_state();
 | 
				
			||||||
  bool no_need_submit_log = false;
 | 
					  bool no_need_submit_log = false;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (ObTxState::REDO_COMPLETE != get_upstream_state()) {
 | 
					  if (ObTxState::REDO_COMPLETE != get_upstream_state() && !is_rollback) {
 | 
				
			||||||
    TRANS_LOG(INFO, "already in second phase", K(ret), K(*this));
 | 
					    TRANS_LOG(INFO, "already in second phase", K(ret), K(*this));
 | 
				
			||||||
  } else {
 | 
					  } else {
 | 
				
			||||||
    if (is_rollback) {
 | 
					    if (is_rollback) {
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user