diff --git a/mittest/env/ob_simple_server_helper.cpp b/mittest/env/ob_simple_server_helper.cpp index 665cccc6d..a100aca74 100644 --- a/mittest/env/ob_simple_server_helper.cpp +++ b/mittest/env/ob_simple_server_helper.cpp @@ -745,7 +745,12 @@ int SimpleServerHelper::write(sqlclient::ObISQLConnection *conn, const char *sql return conn->execute_write(OB_SYS_TENANT_ID, sql, affected_rows); } -int InjectTxFaultHelper::submit_log(const char *buf, const int64_t size, const share::SCN &base_ts, ObTxBaseLogCb *cb, const bool need_nonblock) +int InjectTxFaultHelper::submit_log(const char *buf, + const int64_t size, + const share::SCN &base_ts, + ObTxBaseLogCb *cb, + const bool need_nonblock, + const int64_t retry_timeout_us) { int ret = OB_SUCCESS; diff --git a/mittest/env/ob_simple_server_helper.h b/mittest/env/ob_simple_server_helper.h index 5964ac032..41d136355 100644 --- a/mittest/env/ob_simple_server_helper.h +++ b/mittest/env/ob_simple_server_helper.h @@ -95,7 +95,8 @@ public: const int64_t size, const share::SCN &base_ts, ObTxBaseLogCb *cb, - const bool need_nonblock) override; + const bool need_nonblock, + const int64_t retry_timeout_us = 1000) override; private: transaction::ObLSTxCtxMgr *mgr_; hash::ObHashMap tx_injects_; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 8b60bdc3b..3cf6db8d3 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -646,11 +646,29 @@ int ObPartTransCtx::handle_timeout(const int64_t delay) if (!is_follower_() && is_committing_()) { if (is_local_tx_()) { try_submit_next_log_(); + } else if (ObTxState::PREPARE > get_upstream_state() ) { + ObTxState next_state = (is_sub2pc() || exec_info_.is_dup_tx_) ? + ObTxState::REDO_COMPLETE : + ObTxState::PREPARE; + if (OB_FAIL(drive_self_2pc_phase(next_state))) { + TRANS_LOG(WARN, "drive to next phase failed", K(ret), K(*this), K(next_state)); + } else if (OB_FAIL(ObTxCycleTwoPhaseCommitter::handle_timeout())) { + TRANS_LOG(WARN, "handle 2pc timeout failed", KR(ret), KPC(this)); + } } else if (OB_FAIL(ObTxCycleTwoPhaseCommitter::handle_timeout())) { TRANS_LOG(WARN, "handle 2pc timeout failed", KR(ret), KPC(this)); } } + // retry submit abort log + if (!is_follower_() + && get_upstream_state() == ObTxState::ABORT + && get_upstream_state() != get_downstream_state()) { + if (OB_FAIL(compensate_abort_log_())) { + TRANS_LOG(WARN, "compensate abort log failed", KR(ret), KPC(this)); + } + } + // if not committing, abort txn if it was expired if (!is_follower_() && !is_committing_() && tx_expired) { if (OB_FAIL(abort_(OB_TRANS_TIMEOUT))) { @@ -2042,6 +2060,11 @@ int ObPartTransCtx::compensate_abort_log_() } else if(OB_FALSE_IT(sub_state_.set_force_abort())) { } else if (OB_FAIL(submit_log_impl_(ObTxLogType::TX_ABORT_LOG))) { + int tmp_ret = OB_SUCCESS; + if (OB_EAGAIN == ret && OB_TMP_FAIL(restart_2pc_trans_timer_())) { + TRANS_LOG(WARN, "restart_2pc_trans_timer_ for submit abort log fail", + KR(ret), KR(tmp_ret), KPC(this)); + } TRANS_LOG(WARN, "submit abort log failed", KR(ret), K(*this)); } else { } @@ -3564,6 +3587,7 @@ int ObPartTransCtx::submit_commit_log_() int ObPartTransCtx::submit_abort_log_() { int ret = OB_SUCCESS; + set_upstream_state(ObTxState::ABORT); ObTxLogCb *log_cb = NULL; ObTxLogBlock log_block; const int64_t replay_hint = trans_id_.get_id(); @@ -3621,7 +3645,7 @@ int ObPartTransCtx::submit_abort_log_() } } else if (OB_FAIL(acquire_ctx_ref_())) { TRANS_LOG(ERROR, "acquire ctx ref failed", KR(ret), K(*this)); - } else if (OB_FAIL(submit_log_block_out_(log_block, SCN::min_scn(), log_cb, replay_hint, barrier))) { + } else if (OB_FAIL(submit_log_block_out_(log_block, SCN::min_scn(), log_cb, replay_hint, barrier, 50 * 1000))) { TRANS_LOG(WARN, "submit log to clog adapter failed", KR(ret), K(*this)); return_log_cb_(log_cb); log_cb = NULL; @@ -3876,7 +3900,8 @@ int ObPartTransCtx::submit_log_block_out_(ObTxLogBlock &log_block, const share::SCN &base_scn, ObTxLogCb *&log_cb, const int64_t replay_hint, - const logservice::ObReplayBarrierType barrier) + const logservice::ObReplayBarrierType barrier, + const int64_t retry_timeout_us) { int ret = OB_SUCCESS; bool is_2pc_state_log = false; @@ -3899,7 +3924,8 @@ int ObPartTransCtx::submit_log_block_out_(ObTxLogBlock &log_block, log_block.get_size(), base_scn, log_cb, - false))) { + true, + retry_timeout_us))) { busy_cbs_.add_last(log_cb); } } @@ -3933,8 +3959,19 @@ int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type) case ObTxLogType::TX_PREPARE_LOG: { // try generate prepare verison ret = generate_prepare_version_(); - - if (OB_SUCC(ret) && mt_ctx_.is_prepared()) { + if (get_upstream_state() < ObTxState::PREPARE) { + if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) { + TRANS_LOG(WARN, "drive 2pc prepare phase failed", K(ret), K(*this)); + } + } else if (get_upstream_state() > ObTxState::PREPARE || + get_downstream_state() >= ObTxState::PREPARE) { + TRANS_LOG(INFO, "we need not submit prepare log after the prepare state", K(*this)); + } + if (OB_SUCC(ret) && + mt_ctx_.is_prepared() && + get_upstream_state() == ObTxState::PREPARE && + get_downstream_state() < ObTxState::PREPARE && + !is_2pc_logging()) { ret = submit_prepare_log_(); } break; @@ -7145,7 +7182,7 @@ int ObPartTransCtx::register_multi_data_source(const ObTxDataSourceType data_sou != (tmp_ret = submit_log_impl_(ObTxLogType::TX_MULTI_DATA_SOURCE_LOG))) { if (tmp_ret == OB_NOT_MASTER) { ret = OB_TRANS_NEED_ROLLBACK; - } else if (tmp_ret == OB_TX_NOLOGCB) { + } else if (tmp_ret == OB_TX_NOLOGCB || tmp_ret == OB_EAGAIN) { ret = OB_SUCCESS; if (register_flag.need_flush_redo_instantly_) { mds_cache_.set_need_retry_submit_mds(true); @@ -8426,8 +8463,13 @@ int ObPartTransCtx::do_local_commit_tx_() } } else if (OB_FAIL(submit_log_impl_(ObTxLogType::TX_COMMIT_LOG))) { // log submitting will retry in handle_timeout - TRANS_LOG(WARN, "submit commit log fail, will retry later", KR(ret), KPC(this)); - ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(restart_2pc_trans_timer_())) { + TRANS_LOG(WARN, "restart_2pc_trans_timer_ error", KR(ret), KR(tmp_ret), KPC(this)); + ret = OB_EAGAIN; + } else { + ret = OB_SUCCESS; + } } return ret; diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 418af903a..f537904a6 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -497,7 +497,8 @@ private: const share::SCN &base_scn, ObTxLogCb *&log_cb, const int64_t replay_hint = 0, - const ObReplayBarrierType barrier = ObReplayBarrierType::NO_NEED_BARRIER); + const ObReplayBarrierType barrier = ObReplayBarrierType::NO_NEED_BARRIER, + const int64_t retry_timeout_us = 1000); int after_submit_log_(ObTxLogBlock &log_block, ObTxLogCb *log_cb, memtable::ObRedoLogSubmitHelper *redo_helper); diff --git a/src/storage/tx/ob_two_phase_downstream_committer.cpp b/src/storage/tx/ob_two_phase_downstream_committer.cpp index d5396be0f..43ada8fa9 100644 --- a/src/storage/tx/ob_two_phase_downstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_downstream_committer.cpp @@ -1214,13 +1214,8 @@ int ObTxCycleTwoPhaseCommitter::decide_2pc_log_type_(bool &need_submit, break; } case ObTxState::REDO_COMPLETE: { - if (is_sub2pc()) { - need_submit = true; - log_type = ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT_INFO; - } else { - ret = OB_TRANS_INVALID_STATE; - TRANS_LOG(ERROR, "invalid 2pc state", KR(ret), KPC(this)); - } + need_submit = true; + log_type = ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT_INFO; break; } case ObTxState::PREPARE: { diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index f9fbc0e09..331f0cbde 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1299,7 +1299,7 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, bool blockable = expire_ts > 0; do { ret = part_ctx->rollback_to_savepoint(op_sn, specified_from_scn, savepoint, tx_seq_base, downstream_parts); - if (OB_NEED_RETRY == ret && blockable) { + if ((OB_NEED_RETRY == ret || OB_EAGAIN == ret) && blockable) { if (ObTimeUtility::current_time() >= expire_ts) { ret = OB_TIMEOUT; TRANS_LOG(WARN, "can not retry rollback_to because of timeout", K(ret), K(retry_cnt)); @@ -1311,7 +1311,7 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, ob_usleep(50 * 1000); } } - } while (OB_NEED_RETRY == ret && blockable && !part_ctx->is_transfer_deleted()); + } while ((OB_NEED_RETRY == ret || OB_EAGAIN == ret) && blockable && !part_ctx->is_transfer_deleted()); #ifndef NDEBUG TRANS_LOG(INFO, "rollback to savepoint sync", K(ret), K(part_ctx->get_trans_id()), K(part_ctx->get_ls_id()), K(retry_cnt), diff --git a/src/storage/tx/ob_tx_log_adapter.cpp b/src/storage/tx/ob_tx_log_adapter.cpp index 170661776..5755367b2 100644 --- a/src/storage/tx/ob_tx_log_adapter.cpp +++ b/src/storage/tx/ob_tx_log_adapter.cpp @@ -120,37 +120,59 @@ int ObLSTxLogAdapter::submit_log(const char *buf, const int64_t size, const SCN &base_scn, ObTxBaseLogCb *cb, - const bool need_nonblock) + const bool need_nonblock, + const int64_t retry_timeout_us) { int ret = OB_SUCCESS; palf::LSN lsn; SCN scn; + int64_t cur_ts = ObTimeUtility::current_time(); + int64_t retry_cnt = 0; const bool is_big_log = (size > palf::MAX_NORMAL_LOG_BODY_SIZE); const bool allow_compression = true; - int64_t cur_ts = ObClockGenerator::getClock(); - if (NULL == buf || 0 >= size || OB_ISNULL(cb) || !base_scn.is_valid() || size > palf::MAX_LOG_BODY_SIZE || + if (NULL == buf || 0 >= size || OB_ISNULL(cb) || !base_scn.is_valid() + || retry_timeout_us < 0 || size > palf::MAX_LOG_BODY_SIZE || base_scn.convert_to_ts() > cur_ts + 86400000000L) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", K(ret), KP(buf), K(size), K(base_scn), KP(cb)); } else if (OB_ISNULL(log_handler_) || !log_handler_->is_valid()) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", K(ret), KP(log_handler_)); - } else if (is_big_log && OB_FAIL(log_handler_->append_big_log(buf, size, base_scn, need_nonblock, - allow_compression, cb, lsn, scn))) { - TRANS_LOG(WARN, "append big log to palf failed", K(ret), KP(log_handler_), KP(buf), K(size), K(base_scn), - K(need_nonblock), K(is_big_log)); - } else if (!is_big_log && OB_FAIL(log_handler_->append(buf, size, base_scn, need_nonblock, - allow_compression, cb, lsn, scn))) { - TRANS_LOG(WARN, "append log to palf failed", K(ret), KP(log_handler_), KP(buf), K(size), K(base_scn), - K(need_nonblock)); } else { - cb->set_base_ts(base_scn); - cb->set_lsn(lsn); - cb->set_log_ts(scn); - cb->set_submit_ts(cur_ts); - ObTransStatistic::get_instance().add_clog_submit_count(MTL_ID(), 1); - ObTransStatistic::get_instance().add_trans_log_total_size(MTL_ID(), size); + static const int64_t MAX_SLEEP_US = 100; + int64_t retry_cnt = 0; + int64_t sleep_us = 0; + const int64_t expire_us = cur_ts + retry_timeout_us; + do { + if (is_big_log && OB_FAIL(log_handler_->append_big_log(buf, size, base_scn, need_nonblock, + allow_compression, cb, lsn, scn))) { + TRANS_LOG(WARN, "append big log to palf failed", K(ret), KP(log_handler_), KP(buf), K(size), K(base_scn), + K(need_nonblock), K(is_big_log)); + } else if (!is_big_log && OB_FAIL(log_handler_->append(buf, size, base_scn, need_nonblock, + allow_compression, cb, lsn, scn))) { + TRANS_LOG(WARN, "append log to palf failed", K(ret), KP(log_handler_), KP(buf), K(size), K(base_scn), + K(need_nonblock)); + } else { + cb->set_base_ts(base_scn); + cb->set_lsn(lsn); + cb->set_log_ts(scn); + cb->set_submit_ts(cur_ts); + ObTransStatistic::get_instance().add_clog_submit_count(MTL_ID(), 1); + ObTransStatistic::get_instance().add_trans_log_total_size(MTL_ID(), size); + } + if (!need_nonblock) { + // retries are not needed in block mode. + break; + } else if (OB_EAGAIN == ret) { + retry_cnt++; + sleep_us = retry_cnt * 10; + sleep_us = sleep_us > MAX_SLEEP_US ? MAX_SLEEP_US : sleep_us; + ob_usleep(sleep_us); + cur_ts = ObTimeUtility::current_time(); + } + } while (OB_EAGAIN == ret && cur_ts < expire_us); + } TRANS_LOG(DEBUG, "ObLSTxLogAdapter::submit_ls_log", KR(ret), KP(cb)); diff --git a/src/storage/tx/ob_tx_log_adapter.h b/src/storage/tx/ob_tx_log_adapter.h index a2cfce4f2..9928e9053 100644 --- a/src/storage/tx/ob_tx_log_adapter.h +++ b/src/storage/tx/ob_tx_log_adapter.h @@ -63,7 +63,8 @@ public: const int64_t size, const share::SCN &base_ts, ObTxBaseLogCb *cb, - const bool need_nonblock) = 0; + const bool need_nonblock, + const int64_t retry_timeout_us = 1000) = 0; virtual int get_role(bool &is_leader, int64_t &epoch) = 0; virtual int get_max_decided_scn(share::SCN &scn) = 0; @@ -108,7 +109,8 @@ public: const int64_t size, const share::SCN &base_ts, ObTxBaseLogCb *cb, - const bool need_nonblock); + const bool need_nonblock, + const int64_t retry_timeout_us = 1000); int get_role(bool &is_leader, int64_t &epoch); int get_max_decided_scn(share::SCN &scn); int get_palf_committed_max_scn(share::SCN &scn) const; diff --git a/unittest/storage/tx/mock_utils/basic_fake_define.h b/unittest/storage/tx/mock_utils/basic_fake_define.h index 5dcd490c5..2ec02762f 100644 --- a/unittest/storage/tx/mock_utils/basic_fake_define.h +++ b/unittest/storage/tx/mock_utils/basic_fake_define.h @@ -497,7 +497,8 @@ public: const int64_t size, const share::SCN &base_ts, ObTxBaseLogCb *cb, - const bool need_nonblock) + const bool need_nonblock, + const int64_t retry_timeout_us) { int ret = OB_SUCCESS; logservice::ObLogBaseHeader base_header; diff --git a/unittest/storage/tx/ob_mock_tx_log_adapter.cpp b/unittest/storage/tx/ob_mock_tx_log_adapter.cpp index f56c25e9c..29519416e 100644 --- a/unittest/storage/tx/ob_mock_tx_log_adapter.cpp +++ b/unittest/storage/tx/ob_mock_tx_log_adapter.cpp @@ -108,7 +108,8 @@ int MockTxLogAdapter::submit_log(const char *buf, const int64_t size, const share::SCN &base_ts, ObTxBaseLogCb *cb, - const bool need_nonblock) + const bool need_nonblock, + const int64_t retry_timeout_us) { int ret = OB_SUCCESS; int64_t ts = 0; diff --git a/unittest/storage/tx/ob_mock_tx_log_adapter.h b/unittest/storage/tx/ob_mock_tx_log_adapter.h index a5f43e1f6..b583dfff4 100644 --- a/unittest/storage/tx/ob_mock_tx_log_adapter.h +++ b/unittest/storage/tx/ob_mock_tx_log_adapter.h @@ -68,7 +68,8 @@ public: const int64_t size, const share::SCN &base_ts, ObTxBaseLogCb *cb, - const bool need_block); + const bool need_block, + const int64_t retry_timeout_us = 1000); int get_role(bool &is_leader, int64_t &epoch); int get_max_decided_scn(share::SCN &scn) {