[xa][4.2/4.1] refine tx id in sql audit for xa stmt

This commit is contained in:
jw-guo
2023-08-03 08:42:14 +00:00
committed by ob-robot
parent 45afda53b5
commit 7f668dfa4a
4 changed files with 46 additions and 27 deletions

View File

@ -104,6 +104,7 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
} else { } else {
// associate xa with session // associate xa with session
my_session->associate_xa(xid); my_session->associate_xa(xid);
my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id();
} }
} }
LOG_INFO("xa start execute", K(stmt)); LOG_INFO("xa start execute", K(stmt));
@ -202,6 +203,7 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt)
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock()); ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
int64_t flags = stmt.get_flags(); int64_t flags = stmt.get_flags();
flags = my_session->has_tx_level_temp_table() ? (flags | ObXAFlag::TEMPTABLE) : flags; flags = my_session->has_tx_level_temp_table() ? (flags | ObXAFlag::TEMPTABLE) : flags;
my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id();
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_end(xid, flags, if (OB_FAIL(MTL(transaction::ObXAService*)->xa_end(xid, flags,
my_session->get_tx_desc()))) { my_session->get_tx_desc()))) {
LOG_WARN("xa end failed", K(ret), K(xid)); LOG_WARN("xa end failed", K(ret), K(xid));
@ -270,6 +272,7 @@ int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt)
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx(); ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx();
ObXATransID xid; ObXATransID xid;
ObTransID tx_id;
if (OB_ISNULL(my_session)) { if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
@ -286,15 +289,16 @@ int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt)
LOG_WARN("already start trans", K(ret)); LOG_WARN("already start trans", K(ret));
} else { } else {
int64_t timeout_seconds = my_session->get_xa_end_timeout_seconds(); int64_t timeout_seconds = my_session->get_xa_end_timeout_seconds();
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_prepare(xid, timeout_seconds))) { if (OB_FAIL(MTL(transaction::ObXAService*)->xa_prepare(xid, timeout_seconds, tx_id))) {
if (OB_TRANS_XA_RDONLY != ret) { if (OB_TRANS_XA_RDONLY != ret) {
LOG_WARN("xa prepare failed", K(ret), K(stmt)); LOG_WARN("xa prepare failed", K(ret), K(stmt));
} }
// TODO: 如果是OB_TRANS_XA_RMFAIL错误那么由用户决定是否回滚 // TODO: 如果是OB_TRANS_XA_RMFAIL错误那么由用户决定是否回滚
} }
my_session->get_raw_audit_record().trans_id_ = tx_id;
} }
LOG_INFO("xa prepare execute", K(stmt)); LOG_INFO("xa prepare execute", K(ret), K(stmt), K(xid), K(tx_id));
return ret; return ret;
} }
@ -396,6 +400,7 @@ int ObPlXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt)
ObTxDesc *tx_desc = my_session->get_tx_desc(); ObTxDesc *tx_desc = my_session->get_tx_desc();
int64_t xa_timeout_seconds = my_session->get_xa_end_timeout_seconds(); int64_t xa_timeout_seconds = my_session->get_xa_end_timeout_seconds();
ObXATransID xid; ObXATransID xid;
ObTransID tx_id;
bool has_tx_level_temp_table = false; bool has_tx_level_temp_table = false;
if (!stmt.is_valid_oracle_xid()) { if (!stmt.is_valid_oracle_xid()) {
ret = OB_TRANS_XA_INVAL; ret = OB_TRANS_XA_INVAL;
@ -412,9 +417,10 @@ int ObPlXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt)
LOG_WARN("already start trans", K(ret), K(tx_desc->tid())); LOG_WARN("already start trans", K(ret), K(tx_desc->tid()));
} else { } else {
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_commit(xid, stmt.get_flags(), if (OB_FAIL(MTL(transaction::ObXAService*)->xa_commit(xid, stmt.get_flags(),
xa_timeout_seconds, has_tx_level_temp_table))) { xa_timeout_seconds, has_tx_level_temp_table, tx_id))) {
LOG_WARN("xa commit failed", K(ret), K(xid)); LOG_WARN("xa commit failed", K(ret), K(xid));
} }
my_session->get_raw_audit_record().trans_id_ = tx_id;
} }
if (has_tx_level_temp_table) { if (has_tx_level_temp_table) {
int temp_ret = my_session->drop_temp_tables(false, true/*is_xa_trans*/); int temp_ret = my_session->drop_temp_tables(false, true/*is_xa_trans*/);
@ -422,7 +428,7 @@ int ObPlXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt)
LOG_WARN("trx level temporary table clean failed", KR(temp_ret)); LOG_WARN("trx level temporary table clean failed", KR(temp_ret));
} }
} }
LOG_INFO("xa commit", K(ret), K(stmt), K(xid)); LOG_INFO("xa commit", K(ret), K(stmt), K(xid), K(tx_id));
return ret; return ret;
} }
@ -433,6 +439,7 @@ int ObPlXaRollbackExecutor::execute(ObExecContext &ctx, ObXaRollBackStmt &stmt)
ObTxDesc *tx_desc = my_session->get_tx_desc(); ObTxDesc *tx_desc = my_session->get_tx_desc();
int64_t xa_timeout_seconds = my_session->get_xa_end_timeout_seconds(); int64_t xa_timeout_seconds = my_session->get_xa_end_timeout_seconds();
ObXATransID xid; ObXATransID xid;
ObTransID tx_id;
if (!stmt.is_valid_oracle_xid()) { if (!stmt.is_valid_oracle_xid()) {
ret = OB_TRANS_XA_INVAL; ret = OB_TRANS_XA_INVAL;
LOG_WARN("invalid xid for oracle mode", K(ret), K(stmt)); LOG_WARN("invalid xid for oracle mode", K(ret), K(stmt));
@ -447,11 +454,12 @@ int ObPlXaRollbackExecutor::execute(ObExecContext &ctx, ObXaRollBackStmt &stmt)
ret = OB_TRANS_XA_RMFAIL; ret = OB_TRANS_XA_RMFAIL;
LOG_WARN("already start trans", K(ret), K(tx_desc->tid())); LOG_WARN("already start trans", K(ret), K(tx_desc->tid()));
} else { } else {
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_rollback(xid, xa_timeout_seconds))) { if (OB_FAIL(MTL(transaction::ObXAService*)->xa_rollback(xid, xa_timeout_seconds, tx_id))) {
LOG_WARN("xa rollback failed", K(ret), K(xid)); LOG_WARN("xa rollback failed", K(ret), K(xid));
} }
my_session->get_raw_audit_record().trans_id_ = tx_id;
} }
LOG_INFO("xa rollback", K(ret), K(stmt), K(xid)); LOG_INFO("xa rollback", K(ret), K(stmt), K(xid), K(tx_id));
return ret; return ret;
} }

View File

@ -557,7 +557,8 @@ int ObXAService::commit_for_dblink_trans(ObTxDesc *&tx_desc)
// if an error is returned in this phase, set this error to ret // if an error is returned in this phase, set this error to ret
// step 3.1, two phase xa commit/rollback for local branch // step 3.1, two phase xa commit/rollback for local branch
if (need_rollback) { if (need_rollback) {
if (OB_SUCCESS != (tmp_ret = xa_rollback(xid, timeout_seconds))) { ObTransID unused_tx_id;
if (OB_SUCCESS != (tmp_ret = xa_rollback(xid, timeout_seconds, unused_tx_id))) {
TRANS_LOG(WARN, "xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id)); TRANS_LOG(WARN, "xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id));
ret = tmp_ret; ret = tmp_ret;
} else { } else {
@ -569,8 +570,9 @@ int ObXAService::commit_for_dblink_trans(ObTxDesc *&tx_desc)
if (is_readonly_local_branch) { if (is_readonly_local_branch) {
// do nothing // do nothing
} else { } else {
ObTransID unused_tx_id;
if (OB_SUCCESS != (tmp_ret = xa_commit(xid, ObXAFlag::TMNOFLAGS, timeout_seconds, if (OB_SUCCESS != (tmp_ret = xa_commit(xid, ObXAFlag::TMNOFLAGS, timeout_seconds,
has_tx_level_temp_table))) { has_tx_level_temp_table, unused_tx_id))) {
TRANS_LOG(WARN, "xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id), TRANS_LOG(WARN, "xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id),
K(has_tx_level_temp_table)); K(has_tx_level_temp_table));
ret = tmp_ret; ret = tmp_ret;
@ -636,6 +638,7 @@ int ObXAService::rollback_for_dblink_trans(ObTxDesc *&tx_desc)
} else { } else {
const int64_t timeout_seconds = 60; const int64_t timeout_seconds = 60;
ObDBLinkClientArray &client_array = xa_ctx->get_dblink_client_array(); ObDBLinkClientArray &client_array = xa_ctx->get_dblink_client_array();
ObTransID unused_tx_id;
// step 1, xa end for each participant // step 1, xa end for each participant
// step 1.1, xa end for each dblink branch // step 1.1, xa end for each dblink branch
for (int i = 0; i < client_array.count(); i++) { for (int i = 0; i < client_array.count(); i++) {
@ -653,7 +656,7 @@ int ObXAService::rollback_for_dblink_trans(ObTxDesc *&tx_desc)
} }
// step 2, xa rollback for each participant // step 2, xa rollback for each participant
// step 2.1, xa rollback for local branch // step 2.1, xa rollback for local branch
if (OB_SUCCESS != (tmp_ret = xa_rollback(xid, timeout_seconds))) { if (OB_SUCCESS != (tmp_ret = xa_rollback(xid, timeout_seconds, unused_tx_id))) {
TRANS_LOG(WARN,"xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id)); TRANS_LOG(WARN,"xa rollback for local failed", K(tmp_ret), K(xid), K(tx_id));
ret = tmp_ret; ret = tmp_ret;
} }

View File

@ -1457,7 +1457,8 @@ int ObXAService::end_stmt(const ObXATransID &xid, ObTxDesc &tx_desc)
int ObXAService::xa_commit(const ObXATransID &xid, int ObXAService::xa_commit(const ObXATransID &xid,
const int64_t flags, const int64_t flags,
const int64_t xa_timeout_seconds, const int64_t xa_timeout_seconds,
bool &has_tx_level_temp_table) bool &has_tx_level_temp_table,
ObTransID &tx_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
@ -1474,14 +1475,16 @@ int ObXAService::xa_commit(const ObXATransID &xid,
} else { } else {
const int64_t timeout_us = xa_timeout_seconds * 1000000; const int64_t timeout_us = xa_timeout_seconds * 1000000;
if (ObXAFlag::is_tmnoflags(flags, ObXAReqType::XA_COMMIT)) { if (ObXAFlag::is_tmnoflags(flags, ObXAReqType::XA_COMMIT)) {
if (OB_FAIL(two_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table))) { if (OB_FAIL(two_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table,
tx_id))) {
TRANS_LOG(WARN, "two phase xa commit failed", K(ret), K(xid)); TRANS_LOG(WARN, "two phase xa commit failed", K(ret), K(xid));
xa_statistics_.inc_failure_xa_2pc_commit(); xa_statistics_.inc_failure_xa_2pc_commit();
} else { } else {
xa_statistics_.inc_success_xa_2pc_commit(); xa_statistics_.inc_success_xa_2pc_commit();
} }
} else if (ObXAFlag::is_tmonephase(flags)) { } else if (ObXAFlag::is_tmonephase(flags)) {
if (OB_FAIL(one_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table))) { if (OB_FAIL(one_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table,
tx_id))) {
TRANS_LOG(WARN, "one phase xa commit failed", K(ret), K(xid)); TRANS_LOG(WARN, "one phase xa commit failed", K(ret), K(xid));
xa_statistics_.inc_failure_xa_1pc_commit(); xa_statistics_.inc_failure_xa_1pc_commit();
} else { } else {
@ -1493,19 +1496,19 @@ int ObXAService::xa_commit(const ObXATransID &xid,
} }
} }
TRANS_LOG(INFO, "xa commit", K(ret), K(xid), K(flags), K(xa_timeout_seconds)); TRANS_LOG(INFO, "xa commit", K(ret), K(xid), K(tx_id), K(flags), K(xa_timeout_seconds));
return ret; return ret;
} }
int ObXAService::one_phase_xa_commit_(const ObXATransID &xid, int ObXAService::one_phase_xa_commit_(const ObXATransID &xid,
const int64_t timeout_us, const int64_t timeout_us,
const int64_t request_id, const int64_t request_id,
bool &has_tx_level_temp_table) bool &has_tx_level_temp_table,
ObTransID &tx_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID(); const uint64_t tenant_id = MTL_ID();
ObAddr sche_addr; ObAddr sche_addr;
ObTransID tx_id;
const bool is_rollback = false; const bool is_rollback = false;
int64_t end_flag = 0; int64_t end_flag = 0;
share::ObLSID coordinator; share::ObLSID coordinator;
@ -1579,12 +1582,12 @@ int ObXAService::one_phase_xa_commit_(const ObXATransID &xid,
} }
int ObXAService::xa_rollback(const ObXATransID &xid, int ObXAService::xa_rollback(const ObXATransID &xid,
const int64_t xa_timeout_seconds) const int64_t xa_timeout_seconds,
ObTransID &tx_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID(); const uint64_t tenant_id = MTL_ID();
int64_t end_flag = 0; int64_t end_flag = 0;
ObTransID tx_id;
ObAddr sche_addr; ObAddr sche_addr;
share::ObLSID coordinator; share::ObLSID coordinator;
const int64_t timeout_us = xa_timeout_seconds * 1000000; const int64_t timeout_us = xa_timeout_seconds * 1000000;
@ -2090,12 +2093,12 @@ int ObXAService::xa_rollback_all_changes(const ObXATransID &xid, ObTxDesc *&tx_d
// xa prepare // xa prepare
int ObXAService::xa_prepare(const ObXATransID &xid, int ObXAService::xa_prepare(const ObXATransID &xid,
const int64_t timeout_seconds) const int64_t timeout_seconds,
ObTransID &tx_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
common::ObAddr sche_addr; common::ObAddr sche_addr;
ObTransID tx_id;
bool is_tightly_coupled = true; bool is_tightly_coupled = true;
int64_t end_flag = 0; int64_t end_flag = 0;
const uint64_t tenant_id = MTL_ID(); const uint64_t tenant_id = MTL_ID();
@ -2163,7 +2166,7 @@ int ObXAService::xa_prepare(const ObXATransID &xid,
ret = OB_TRANS_XA_RDONLY; ret = OB_TRANS_XA_RDONLY;
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret) && OB_TRANS_XA_RDONLY != ret) {
xa_statistics_.inc_failure_xa_prepare(); xa_statistics_.inc_failure_xa_prepare();
} else { } else {
xa_statistics_.inc_success_xa_prepare(); xa_statistics_.inc_success_xa_prepare();
@ -2757,7 +2760,8 @@ int ObXAService::update_coord(const uint64_t tenant_id,
int ObXAService::two_phase_xa_commit_(const ObXATransID &xid, int ObXAService::two_phase_xa_commit_(const ObXATransID &xid,
const int64_t timeout_us, const int64_t timeout_us,
const int64_t request_id, const int64_t request_id,
bool &has_tx_level_temp_table) bool &has_tx_level_temp_table,
ObTransID &tx_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
@ -2765,7 +2769,6 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid,
ObXACtx *xa_ctx = NULL; ObXACtx *xa_ctx = NULL;
bool alloc = true; bool alloc = true;
share::ObLSID coordinator; share::ObLSID coordinator;
ObTransID tx_id;
bool record_in_tableone = true; bool record_in_tableone = true;
int64_t end_flag = 0; int64_t end_flag = 0;
// only used for constructor // only used for constructor

View File

@ -70,15 +70,18 @@ public:
int xa_commit(const ObXATransID &xid, int xa_commit(const ObXATransID &xid,
const int64_t flags, const int64_t flags,
const int64_t xa_timeout_seconds, const int64_t xa_timeout_seconds,
bool &has_tx_level_temp_table); bool &has_tx_level_temp_table,
ObTransID &tx_id);
int xa_rollback(const ObXATransID &xid, int xa_rollback(const ObXATransID &xid,
const int64_t xa_timeout_seconds); const int64_t xa_timeout_seconds,
ObTransID &tx_id);
int xa_rollback_local(const ObXATransID &xid, int xa_rollback_local(const ObXATransID &xid,
const ObTransID &tx_id, const ObTransID &tx_id,
const int64_t timeout_us, const int64_t timeout_us,
const int64_t request_id); const int64_t request_id);
int xa_prepare(const ObXATransID &xid, int xa_prepare(const ObXATransID &xid,
const int64_t timeout_seconds); const int64_t timeout_seconds,
ObTransID &tx_id);
int local_xa_prepare(const ObXATransID &xid, int local_xa_prepare(const ObXATransID &xid,
const ObTransID &trans_id, const ObTransID &trans_id,
const int64_t timeout_us); const int64_t timeout_us);
@ -209,7 +212,8 @@ private:
int one_phase_xa_commit_(const ObXATransID &xid, int one_phase_xa_commit_(const ObXATransID &xid,
const int64_t timeout_us, const int64_t timeout_us,
const int64_t request_id, const int64_t request_id,
bool &has_tx_level_temp_table); bool &has_tx_level_temp_table,
ObTransID &tx_id);
int xa_rollback_local_(const ObXATransID &xid, int xa_rollback_local_(const ObXATransID &xid,
const ObTransID &tx_id, const ObTransID &tx_id,
const int64_t timeout_us, const int64_t timeout_us,
@ -232,7 +236,8 @@ private:
int two_phase_xa_commit_(const ObXATransID &xid, int two_phase_xa_commit_(const ObXATransID &xid,
const int64_t timeout_us, const int64_t timeout_us,
const int64_t request_id, const int64_t request_id,
bool &has_tx_level_temp_table); bool &has_tx_level_temp_table,
ObTransID &tx_id);
int xa_rollback_for_pending_trans_(const ObXATransID &xid, int xa_rollback_for_pending_trans_(const ObXATransID &xid,
const ObTransID &tx_id, const ObTransID &tx_id,
const int64_t timeout_us, const int64_t timeout_us,