diff --git a/src/sql/engine/cmd/ob_xa_executor.cpp b/src/sql/engine/cmd/ob_xa_executor.cpp index 382fc0fc74..7481345276 100644 --- a/src/sql/engine/cmd/ob_xa_executor.cpp +++ b/src/sql/engine/cmd/ob_xa_executor.cpp @@ -104,6 +104,7 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) } else { // associate xa with session my_session->associate_xa(xid); + my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id(); } } LOG_INFO("xa start execute", K(stmt)); @@ -202,6 +203,7 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt) ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock()); int64_t flags = stmt.get_flags(); flags = my_session->has_tx_level_temp_table() ? (flags | ObXAFlag::TEMPTABLE) : flags; + my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id(); if (OB_FAIL(MTL(transaction::ObXAService*)->xa_end(xid, flags, my_session->get_tx_desc()))) { LOG_WARN("xa end failed", K(ret), K(xid)); @@ -270,6 +272,7 @@ int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt) ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx(); ObXATransID xid; + ObTransID tx_id; if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; @@ -286,15 +289,16 @@ int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt) LOG_WARN("already start trans", K(ret)); } else { int64_t timeout_seconds = my_session->get_xa_end_timeout_seconds(); - if (OB_FAIL(MTL(transaction::ObXAService*)->xa_prepare(xid, timeout_seconds))) { + if (OB_FAIL(MTL(transaction::ObXAService*)->xa_prepare(xid, timeout_seconds, tx_id))) { if (OB_TRANS_XA_RDONLY != ret) { LOG_WARN("xa prepare failed", K(ret), K(stmt)); } // TODO: 如果是OB_TRANS_XA_RMFAIL错误那么由用户决定是否回滚 } + my_session->get_raw_audit_record().trans_id_ = tx_id; } - LOG_INFO("xa prepare execute", K(stmt)); + LOG_INFO("xa prepare execute", K(ret), K(stmt), K(xid), K(tx_id)); return ret; } @@ -396,6 +400,7 @@ int ObPlXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt) ObTxDesc *tx_desc = my_session->get_tx_desc(); int64_t xa_timeout_seconds = my_session->get_xa_end_timeout_seconds(); ObXATransID xid; + ObTransID tx_id; bool has_tx_level_temp_table = false; if (!stmt.is_valid_oracle_xid()) { ret = OB_TRANS_XA_INVAL; @@ -412,9 +417,10 @@ int ObPlXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt) LOG_WARN("already start trans", K(ret), K(tx_desc->tid())); } else { if (OB_FAIL(MTL(transaction::ObXAService*)->xa_commit(xid, stmt.get_flags(), - xa_timeout_seconds, has_tx_level_temp_table))) { + xa_timeout_seconds, has_tx_level_temp_table, tx_id))) { LOG_WARN("xa commit failed", K(ret), K(xid)); } + my_session->get_raw_audit_record().trans_id_ = tx_id; } if (has_tx_level_temp_table) { int temp_ret = my_session->drop_temp_tables(false, true/*is_xa_trans*/); @@ -422,7 +428,7 @@ int ObPlXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt) LOG_WARN("trx level temporary table clean failed", KR(temp_ret)); } } - LOG_INFO("xa commit", K(ret), K(stmt), K(xid)); + LOG_INFO("xa commit", K(ret), K(stmt), K(xid), K(tx_id)); return ret; } @@ -433,6 +439,7 @@ int ObPlXaRollbackExecutor::execute(ObExecContext &ctx, ObXaRollBackStmt &stmt) ObTxDesc *tx_desc = my_session->get_tx_desc(); int64_t xa_timeout_seconds = my_session->get_xa_end_timeout_seconds(); ObXATransID xid; + ObTransID tx_id; if (!stmt.is_valid_oracle_xid()) { ret = OB_TRANS_XA_INVAL; LOG_WARN("invalid xid for oracle mode", K(ret), K(stmt)); @@ -447,11 +454,12 @@ int ObPlXaRollbackExecutor::execute(ObExecContext &ctx, ObXaRollBackStmt &stmt) ret = OB_TRANS_XA_RMFAIL; LOG_WARN("already start trans", K(ret), K(tx_desc->tid())); } else { - if (OB_FAIL(MTL(transaction::ObXAService*)->xa_rollback(xid, xa_timeout_seconds))) { + if (OB_FAIL(MTL(transaction::ObXAService*)->xa_rollback(xid, xa_timeout_seconds, tx_id))) { LOG_WARN("xa rollback failed", K(ret), K(xid)); } + my_session->get_raw_audit_record().trans_id_ = tx_id; } - LOG_INFO("xa rollback", K(ret), K(stmt), K(xid)); + LOG_INFO("xa rollback", K(ret), K(stmt), K(xid), K(tx_id)); return ret; } diff --git a/src/storage/tx/ob_xa_dblink_service.cpp b/src/storage/tx/ob_xa_dblink_service.cpp index b42e3c21b9..cbcea5205b 100644 --- a/src/storage/tx/ob_xa_dblink_service.cpp +++ b/src/storage/tx/ob_xa_dblink_service.cpp @@ -557,7 +557,8 @@ int ObXAService::commit_for_dblink_trans(ObTxDesc *&tx_desc) // if an error is returned in this phase, set this error to ret // step 3.1, two phase xa commit/rollback for local branch if (need_rollback) { - if (OB_SUCCESS != (tmp_ret = xa_rollback(xid, timeout_seconds))) { + ObTransID unused_tx_id; + if (OB_SUCCESS != (tmp_ret = xa_rollback(xid, timeout_seconds, unused_tx_id))) { TRANS_LOG(WARN, "xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id)); ret = tmp_ret; } else { @@ -569,8 +570,9 @@ int ObXAService::commit_for_dblink_trans(ObTxDesc *&tx_desc) if (is_readonly_local_branch) { // do nothing } else { + ObTransID unused_tx_id; if (OB_SUCCESS != (tmp_ret = xa_commit(xid, ObXAFlag::TMNOFLAGS, timeout_seconds, - has_tx_level_temp_table))) { + has_tx_level_temp_table, unused_tx_id))) { TRANS_LOG(WARN, "xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id), K(has_tx_level_temp_table)); ret = tmp_ret; @@ -636,6 +638,7 @@ int ObXAService::rollback_for_dblink_trans(ObTxDesc *&tx_desc) } else { const int64_t timeout_seconds = 60; ObDBLinkClientArray &client_array = xa_ctx->get_dblink_client_array(); + ObTransID unused_tx_id; // step 1, xa end for each participant // step 1.1, xa end for each dblink branch for (int i = 0; i < client_array.count(); i++) { @@ -653,7 +656,7 @@ int ObXAService::rollback_for_dblink_trans(ObTxDesc *&tx_desc) } // step 2, xa rollback for each participant // step 2.1, xa rollback for local branch - if (OB_SUCCESS != (tmp_ret = xa_rollback(xid, timeout_seconds))) { + if (OB_SUCCESS != (tmp_ret = xa_rollback(xid, timeout_seconds, unused_tx_id))) { TRANS_LOG(WARN,"xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id)); ret = tmp_ret; } diff --git a/src/storage/tx/ob_xa_service.cpp b/src/storage/tx/ob_xa_service.cpp index 350d2ea346..10c300c91c 100644 --- a/src/storage/tx/ob_xa_service.cpp +++ b/src/storage/tx/ob_xa_service.cpp @@ -1457,7 +1457,8 @@ int ObXAService::end_stmt(const ObXATransID &xid, ObTxDesc &tx_desc) int ObXAService::xa_commit(const ObXATransID &xid, const int64_t flags, const int64_t xa_timeout_seconds, - bool &has_tx_level_temp_table) + bool &has_tx_level_temp_table, + ObTransID &tx_id) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -1474,14 +1475,16 @@ int ObXAService::xa_commit(const ObXATransID &xid, } else { const int64_t timeout_us = xa_timeout_seconds * 1000000; if (ObXAFlag::is_tmnoflags(flags, ObXAReqType::XA_COMMIT)) { - if (OB_FAIL(two_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table))) { + if (OB_FAIL(two_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table, + tx_id))) { TRANS_LOG(WARN, "two phase xa commit failed", K(ret), K(xid)); xa_statistics_.inc_failure_xa_2pc_commit(); } else { xa_statistics_.inc_success_xa_2pc_commit(); } } else if (ObXAFlag::is_tmonephase(flags)) { - if (OB_FAIL(one_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table))) { + if (OB_FAIL(one_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table, + tx_id))) { TRANS_LOG(WARN, "one phase xa commit failed", K(ret), K(xid)); xa_statistics_.inc_failure_xa_1pc_commit(); } else { @@ -1493,19 +1496,19 @@ int ObXAService::xa_commit(const ObXATransID &xid, } } - TRANS_LOG(INFO, "xa commit", K(ret), K(xid), K(flags), K(xa_timeout_seconds)); + TRANS_LOG(INFO, "xa commit", K(ret), K(xid), K(tx_id), K(flags), K(xa_timeout_seconds)); return ret; } int ObXAService::one_phase_xa_commit_(const ObXATransID &xid, const int64_t timeout_us, const int64_t request_id, - bool &has_tx_level_temp_table) + bool &has_tx_level_temp_table, + ObTransID &tx_id) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); ObAddr sche_addr; - ObTransID tx_id; const bool is_rollback = false; int64_t end_flag = 0; share::ObLSID coordinator; @@ -1579,12 +1582,12 @@ int ObXAService::one_phase_xa_commit_(const ObXATransID &xid, } int ObXAService::xa_rollback(const ObXATransID &xid, - const int64_t xa_timeout_seconds) + const int64_t xa_timeout_seconds, + ObTransID &tx_id) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); int64_t end_flag = 0; - ObTransID tx_id; ObAddr sche_addr; share::ObLSID coordinator; const int64_t timeout_us = xa_timeout_seconds * 1000000; @@ -2090,12 +2093,12 @@ int ObXAService::xa_rollback_all_changes(const ObXATransID &xid, ObTxDesc *&tx_d // xa prepare int ObXAService::xa_prepare(const ObXATransID &xid, - const int64_t timeout_seconds) + const int64_t timeout_seconds, + ObTransID &tx_id) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; common::ObAddr sche_addr; - ObTransID tx_id; bool is_tightly_coupled = true; int64_t end_flag = 0; const uint64_t tenant_id = MTL_ID(); @@ -2163,7 +2166,7 @@ int ObXAService::xa_prepare(const ObXATransID &xid, ret = OB_TRANS_XA_RDONLY; } - if (OB_FAIL(ret)) { + if (OB_FAIL(ret) && OB_TRANS_XA_RDONLY != ret) { xa_statistics_.inc_failure_xa_prepare(); } else { xa_statistics_.inc_success_xa_prepare(); @@ -2757,7 +2760,8 @@ int ObXAService::update_coord(const uint64_t tenant_id, int ObXAService::two_phase_xa_commit_(const ObXATransID &xid, const int64_t timeout_us, const int64_t request_id, - bool &has_tx_level_temp_table) + bool &has_tx_level_temp_table, + ObTransID &tx_id) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -2765,7 +2769,6 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid, ObXACtx *xa_ctx = NULL; bool alloc = true; share::ObLSID coordinator; - ObTransID tx_id; bool record_in_tableone = true; int64_t end_flag = 0; // only used for constructor diff --git a/src/storage/tx/ob_xa_service.h b/src/storage/tx/ob_xa_service.h index 77e9f90da4..a6adfc2b93 100644 --- a/src/storage/tx/ob_xa_service.h +++ b/src/storage/tx/ob_xa_service.h @@ -70,15 +70,18 @@ public: int xa_commit(const ObXATransID &xid, const int64_t flags, const int64_t xa_timeout_seconds, - bool &has_tx_level_temp_table); + bool &has_tx_level_temp_table, + ObTransID &tx_id); int xa_rollback(const ObXATransID &xid, - const int64_t xa_timeout_seconds); + const int64_t xa_timeout_seconds, + ObTransID &tx_id); int xa_rollback_local(const ObXATransID &xid, const ObTransID &tx_id, const int64_t timeout_us, const int64_t request_id); int xa_prepare(const ObXATransID &xid, - const int64_t timeout_seconds); + const int64_t timeout_seconds, + ObTransID &tx_id); int local_xa_prepare(const ObXATransID &xid, const ObTransID &trans_id, const int64_t timeout_us); @@ -209,7 +212,8 @@ private: int one_phase_xa_commit_(const ObXATransID &xid, const int64_t timeout_us, const int64_t request_id, - bool &has_tx_level_temp_table); + bool &has_tx_level_temp_table, + ObTransID &tx_id); int xa_rollback_local_(const ObXATransID &xid, const ObTransID &tx_id, const int64_t timeout_us, @@ -232,7 +236,8 @@ private: int two_phase_xa_commit_(const ObXATransID &xid, const int64_t timeout_us, const int64_t request_id, - bool &has_tx_level_temp_table); + bool &has_tx_level_temp_table, + ObTransID &tx_id); int xa_rollback_for_pending_trans_(const ObXATransID &xid, const ObTransID &tx_id, const int64_t timeout_us,