diff --git a/src/sql/engine/cmd/ob_xa_executor.cpp b/src/sql/engine/cmd/ob_xa_executor.cpp index 43142ca20..5b390d282 100644 --- a/src/sql/engine/cmd/ob_xa_executor.cpp +++ b/src/sql/engine/cmd/ob_xa_executor.cpp @@ -189,6 +189,12 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt) } else if (!my_session->get_in_transaction()) { ret = OB_TRANS_XA_PROTO; LOG_WARN("not in a trans", K(ret)); + } else if (my_session->get_xid().empty()) { + ret = OB_TRANS_XA_PROTO; + LOG_WARN("not in xa trans", K(ret)); + } else if (!xid.all_equal_to(my_session->get_xid())) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "xid not match", K(ret), K(xid)); } else { int64_t flags = stmt.get_flags(); flags = my_session->has_tx_level_temp_table() ? (flags | ObXAFlag::TEMPTABLE) : flags; diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 882661fe8..1125fc9a4 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -497,6 +497,7 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx) OZ (acquire_tx_if_need_(txs, *session)); OZ (stmt_sanity_check_(session, plan, plan_ctx)); OZ (txs->sql_stmt_start_hook(session->get_xid(), *session->get_tx_desc(), session->get_sessid())); + bool start_hook = OB_SUCC(ret) && !session->get_xid().empty() ? true : false; if (OB_SUCC(ret) && txs->get_tx_elr_util().check_and_update_tx_elr_info(*session->get_tx_desc())) { LOG_WARN("check and update tx elr info", K(ret), KPC(session->get_tx_desc())); @@ -531,6 +532,12 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx) if (plan->is_contain_oracle_trx_level_temporary_table()) { OX (tx_desc->set_with_temporary_table()); } + if (OB_FAIL(ret) && start_hook) { + int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc()); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("call sql stmt end hook fail", K(tmp_ret)); + } + } bool print_log = false; #ifndef NDEBUG print_log = true; @@ -701,7 +708,28 @@ int ObSqlTransControl::create_savepoint(ObExecContext &exec_ctx, CHECK_TXN_FREE_ROUTE_ALLOWED(); OZ (get_tx_service(session, txs)); OZ (acquire_tx_if_need_(txs, *session)); - OZ (txs->create_explicit_savepoint(*session->get_tx_desc(), sp_name), sp_name); + bool start_hook = false; + OZ(start_hook_if_need_(*session, txs, start_hook)); + OZ (txs->create_explicit_savepoint(*session->get_tx_desc(), sp_name, !session->get_xid().empty() ? session->get_sessid() : 0), sp_name); + if (start_hook) { + int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc()); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("call sql stmt end hook fail", K(tmp_ret)); + ret = COVER_SUCC(tmp_ret); + } + } + return ret; +} + +int ObSqlTransControl::start_hook_if_need_(ObSQLSessionInfo &session, + transaction::ObTransService *txs, + bool &start_hook) +{ + int ret = OB_SUCCESS; + if (!session.get_tx_desc()->is_shadow() && !session.has_start_stmt() && + OB_SUCC(txs->sql_stmt_start_hook(session.get_xid(), *session.get_tx_desc(), session.get_sessid()))) { + start_hook = true; + } return ret; } @@ -720,7 +748,16 @@ int ObSqlTransControl::rollback_savepoint(ObExecContext &exec_ctx, OZ (get_tx_service(session, txs)); OZ (acquire_tx_if_need_(txs, *session)); OX (stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session)); - OZ (txs->rollback_to_explicit_savepoint(*session->get_tx_desc(), sp_name, stmt_expire_ts), sp_name); + bool start_hook = false; + OZ(start_hook_if_need_(*session, txs, start_hook)); + OZ (txs->rollback_to_explicit_savepoint(*session->get_tx_desc(), sp_name, stmt_expire_ts, !session->get_xid().empty() ? session->get_sessid() : 0), sp_name); + if (start_hook) { + int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc()); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("call sql stmt end hook fail", K(tmp_ret)); + ret = COVER_SUCC(tmp_ret); + } + } return ret; } @@ -735,7 +772,16 @@ int ObSqlTransControl::release_savepoint(ObExecContext &exec_ctx, CHECK_TXN_FREE_ROUTE_ALLOWED(); OZ (get_tx_service(session, txs), *session); OZ (acquire_tx_if_need_(txs, *session)); - OZ (txs->release_explicit_savepoint(*session->get_tx_desc(), sp_name), *session, sp_name); + bool start_hook = false; + OZ(start_hook_if_need_(*session, txs, start_hook)); + OZ (txs->release_explicit_savepoint(*session->get_tx_desc(), sp_name, !session->get_xid().empty() ? session->get_sessid() : 0), *session, sp_name); + if (start_hook) { + int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc()); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("call sql stmt end hook fail", K(tmp_ret)); + ret = COVER_SUCC(tmp_ret); + } + } return ret; } diff --git a/src/sql/ob_sql_trans_control.h b/src/sql/ob_sql_trans_control.h index 0dff7c58d..ab2a327b2 100644 --- a/src/sql/ob_sql_trans_control.h +++ b/src/sql/ob_sql_trans_control.h @@ -247,6 +247,9 @@ private: const ObSQLSessionInfo &session); static int inc_session_ref(const ObSQLSessionInfo *session); static int acquire_tx_if_need_(transaction::ObTransService *txs, ObSQLSessionInfo &session); + static int start_hook_if_need_(ObSQLSessionInfo &session, + transaction::ObTransService *txs, + bool &start_hook); public: /* * create a savepoint without name diff --git a/src/storage/tx/ob_trans_define_v4.cpp b/src/storage/tx/ob_trans_define_v4.cpp index 15209e792..1395e129d 100644 --- a/src/storage/tx/ob_trans_define_v4.cpp +++ b/src/storage/tx/ob_trans_define_v4.cpp @@ -54,7 +54,7 @@ ObTxIsolationLevel tx_isolation_from_str(const ObString &s) } ObTxSavePoint::ObTxSavePoint() - : type_(T::INVL), scn_(0), name_() {} + : type_(T::INVL), scn_(0), session_id_(0), name_() {} ObTxSavePoint::ObTxSavePoint(const ObTxSavePoint &a) { @@ -65,9 +65,14 @@ ObTxSavePoint &ObTxSavePoint::operator=(const ObTxSavePoint &a) { type_ = a.type_; scn_ = a.scn_; + session_id_ = a.session_id_; switch(type_) { case T::SAVEPOINT: - case T::STASH: name_ = a.name_; break; + case T::STASH: { + name_ = a.name_; + session_id_ = a.session_id_; + break; + } case T::SNAPSHOT: snapshot_ = a.snapshot_; break; default: break; } @@ -84,6 +89,7 @@ void ObTxSavePoint::release() type_ = T::INVL; snapshot_ = NULL; scn_ = 0; + session_id_ = 0; } void ObTxSavePoint::rollback() @@ -101,7 +107,7 @@ void ObTxSavePoint::init(ObTxReadSnapshot *snapshot) scn_ = snapshot->core_.scn_; } -int ObTxSavePoint::init(int64_t scn, const ObString &name, const bool stash) +int ObTxSavePoint::init(int64_t scn, const ObString &name, const uint32_t session_id, const bool stash) { int ret = OB_SUCCESS; if (OB_FAIL(name_.assign(name))) { @@ -113,6 +119,7 @@ int ObTxSavePoint::init(int64_t scn, const ObString &name, const bool stash) } else { type_ = stash ? T::STASH : T::SAVEPOINT; scn_ = scn; + session_id_ = session_id; } return ret; } @@ -129,6 +136,7 @@ DEF_TO_STRING(ObTxSavePoint) } J_COMMA(); J_KV(K_(scn)); + J_KV(K_(session_id)); J_OBJ_END(); return pos; } diff --git a/src/storage/tx/ob_trans_define_v4.h b/src/storage/tx/ob_trans_define_v4.h index 8c2536df8..d99001088 100644 --- a/src/storage/tx/ob_trans_define_v4.h +++ b/src/storage/tx/ob_trans_define_v4.h @@ -257,6 +257,10 @@ class ObTxSavePoint private: enum class T { INVL= 0, SAVEPOINT= 1, SNAPSHOT= 2, STASH= 3 } type_; int64_t scn_; + /* The savepoint should be unique to the session, + and the session id is required to distinguish the + savepoint for the multi-branch scenario of xa */ + uint32_t session_id_; union { ObTxReadSnapshot *snapshot_; common::ObFixedLengthString<128> name_; @@ -268,7 +272,7 @@ public: ObTxSavePoint &operator=(const ObTxSavePoint &a); void release(); void rollback(); - int init(const int64_t scn, const ObString &name, const bool stash = false); + int init(const int64_t scn, const ObString &name, const uint32_t session_id, const bool stash = false); void init(ObTxReadSnapshot *snapshot); bool is_savepoint() const { return type_ == T::SAVEPOINT || type_ == T::STASH; } bool is_snapshot() const { return type_ == T::SNAPSHOT; } @@ -518,6 +522,7 @@ public: K_(flags_.BLOCK), K_(flags_.REPLICA), K_(can_elr), + K_(savepoints), K_(cflict_txs), K_(abort_cause), K_(commit_expire_ts), @@ -543,6 +548,7 @@ public: ObTxAccessMode get_access_mode() const { return access_mode_; } bool is_rdonly() const { return access_mode_ == ObTxAccessMode::RD_ONLY; } bool is_clean() const { return parts_.empty(); } + bool is_shadow() const { return flags_.SHADOW_; } bool is_explicit() const { return flags_.EXPLICIT_; } void set_with_temporary_table() { flags_.WITH_TEMP_TABLE_ = true; } bool with_temporary_table() const { return flags_.WITH_TEMP_TABLE_; } diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index bef007bad..2868794b2 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -2324,7 +2324,7 @@ int ObTransService::recover_tx(const ObTxInfo &tx_info, ObTxDesc *&tx) tx->tenant_id_ = tx_info.tenant_id_; tx->cluster_id_ = tx_info.cluster_id_; tx->cluster_version_ = tx_info.cluster_version_; - tx->addr_ = tx_info.addr_; + tx->addr_ = tx_info.addr_; /*origin scheduler addr*/ tx->tx_id_ = tx_info.tx_id_; tx->isolation_ = tx_info.isolation_; tx->access_mode_ = tx_info.access_mode_; diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 469561e0c..5d37e5f2e 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1135,7 +1135,7 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, return ret; } -int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint) +int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id) { int ret = OB_SUCCESS; int64_t scn = 0; @@ -1143,7 +1143,7 @@ int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &save tx.inc_op_sn(); scn = ObSequence::inc_and_get_max_seq_no(); ObTxSavePoint sp; - if (OB_SUCC(sp.init(scn, savepoint))) { + if (OB_SUCC(sp.init(scn, savepoint, session_id))) { if (OB_FAIL(tx.savepoints_.push_back(sp))) { TRANS_LOG(WARN, "push savepoint failed", K(ret)); } else if (!tx.tx_id_.is_valid() && OB_FAIL(tx_desc_mgr_.add(tx))) { @@ -1154,7 +1154,7 @@ int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &save ARRAY_FOREACH_X(tx.savepoints_, i, cnt, i != cnt - 1) { ObTxSavePoint &it = tx.savepoints_.at(cnt - 2 - i); if (it.is_stash()) { break; } - if (it.is_savepoint() && it.name_ == savepoint) { + if (it.is_savepoint() && it.name_ == savepoint && it.session_id_ == session_id) { TRANS_LOG(TRACE, "move savepoint", K(savepoint), "from", it.scn_, "to", scn, K(tx)); it.release(); break; // assume only one if exist @@ -1180,7 +1180,8 @@ int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &save // 2. invalidate savepoint and snapshot after the savepoint Node. int ObTransService::rollback_to_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, - const int64_t expire_ts) + const int64_t expire_ts, + const uint32_t session_id) { int ret = OB_SUCCESS; auto start_ts = ObTimeUtility::current_time(); @@ -1192,7 +1193,7 @@ int ObTransService::rollback_to_explicit_savepoint(ObTxDesc &tx, const ObTxSavePoint &it = tx.savepoints_.at(cnt - 1 - i); TRANS_LOG(TRACE, "sp iterate:", K(it)); if (it.is_stash()) { break; } - if (it.is_savepoint() && it.name_ == savepoint) { + if (it.is_savepoint() && it.name_ == savepoint && it.session_id_ == session_id) { sp_scn = it.scn_; break; } @@ -1242,7 +1243,7 @@ int ObTransService::rollback_to_explicit_savepoint(ObTxDesc &tx, // impl note // registered snapshot keep valid -int ObTransService::release_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint) +int ObTransService::release_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id) { int ret = OB_SUCCESS; bool hit = false; @@ -1252,7 +1253,7 @@ int ObTransService::release_explicit_savepoint(ObTxDesc &tx, const ObString &sav tx.inc_op_sn(); ARRAY_FOREACH_N(tx.savepoints_, i, cnt) { ObTxSavePoint &it = tx.savepoints_.at(cnt - 1 - i); - if (it.is_savepoint() && it.name_ == savepoint) { + if (it.is_savepoint() && it.name_ == savepoint && it.session_id_ == session_id) { hit = true; sp_id = it.scn_; break; @@ -1269,7 +1270,7 @@ int ObTransService::release_explicit_savepoint(ObTxDesc &tx, const ObString &sav it.release(); } } - TRANS_LOG(TRACE, "release savepoint", K(savepoint), K(sp_id), K(tx)); + TRANS_LOG(TRACE, "release savepoint", K(savepoint), K(sp_id), K(session_id), K(tx)); } } tx.state_change_flags_.EXTRA_CHANGED_ = true; @@ -1288,7 +1289,7 @@ int ObTransService::create_stash_savepoint(ObTxDesc &tx, const ObString &name) tx.inc_op_sn(); auto seq_no = ObSequence::inc_and_get_max_seq_no(); ObTxSavePoint sp; - if (OB_SUCC(sp.init(seq_no, name, true))) { + if (OB_SUCC(sp.init(seq_no, name, 0, true))) { if (OB_FAIL(tx.savepoints_.push_back(sp))) { TRANS_LOG(WARN, "push savepoint failed", K(ret)); } @@ -1760,7 +1761,7 @@ int ObTransService::sql_stmt_start_hook(const ObXATransID &xid, ObTxDesc &tx, co TRANS_LOG(WARN, "register tx fail", K(ret), K_(tx.tx_id), K(xid), KP(&tx)); } else { registed = true; } } - if (OB_SUCC(ret) && OB_FAIL(MTL(ObXAService*)->start_stmt(xid, tx))) { + if (OB_SUCC(ret) && OB_FAIL(MTL(ObXAService*)->start_stmt(xid, session_id, tx))) { TRANS_LOG(WARN, "xa trans start stmt failed", K(ret), K_(tx.xid), K(xid)); ObGlobalTxType global_tx_type = tx.get_global_tx_type(xid); if (ObGlobalTxType::DBLINK_TRANS == global_tx_type && OB_TRANS_XA_BRANCH_FAIL == ret) { diff --git a/src/storage/tx/ob_tx_api.h b/src/storage/tx/ob_tx_api.h index b037f7024..fb7ba4390 100644 --- a/src/storage/tx/ob_tx_api.h +++ b/src/storage/tx/ob_tx_api.h @@ -326,12 +326,15 @@ int create_implicit_savepoint(ObTxDesc &tx, * which hold the savepoint * @savepoint: the name of savepoint to be created * + * @session_id: the session id to which the savepoint + * belongs, used for xa + * * Return: * OB_SUCCESS - OK * OB_ERR_TOO_LONG_IDENT - if savepoint was longer than 128 characters * OB_ERR_TOO_MANY_SAVEPOINT - alive savepoint count out of limit (default 255) */ -int create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint); +int create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id); /** * rollback_to_implicit_savepoint - rollback to a implicit savepoint @@ -386,7 +389,8 @@ int rollback_to_implicit_savepoint(ObTxDesc &tx, */ int rollback_to_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, - const int64_t expire_ts); + const int64_t expire_ts, + const uint32_t session_id); /** * release_explicit_savepoint - release savepoint @@ -400,7 +404,7 @@ int rollback_to_explicit_savepoint(ObTxDesc &tx, * OB_SUCCESS - OK * OB_SAVEPOINT_NOT_EXIST - savepoint not found */ -int release_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint); +int release_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id); // ------------------------------------------------------------------ // savepoints stash diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index ac6ecb1bb..5b346653e 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -80,6 +80,7 @@ void ObXACtx::reset() if (OB_NOT_NULL(xa_branch_info_)) { xa_branch_info_->reset(); } + xa_stmt_info_.reset(); is_terminated_ = false; tlog_.reset(); need_print_trace_log_ = true; @@ -986,6 +987,9 @@ int ObXACtx::process_end_stmt(const obrpc::ObXAEndStmtRPCRequest &req) } else if (OB_FAIL(check_for_execution_(xid, false))) { // include branch fail TRANS_LOG(WARN, "check for execution failed", K(ret), K(xid), K(*this)); + } else if (lock_xid_.empty()) { + // The rpc timeout may trigger repeated unlocking operations, + // which should be considered successful at this time } else { const ObTxStmtInfo &stmt_info = req.get_stmt_info(); // TODO, remove duplicate @@ -1376,6 +1380,8 @@ int ObXACtx::xa_start_(const ObXATransID &xid, TRANS_LOG(WARN, "update branch info failed", K(ret), K(*this)); } else if (OB_FAIL(save_tx_desc_(tx_desc))) { TRANS_LOG(WARN, "save trans desc failed", K(ret), K(*this)); + } else if (OB_FAIL(update_xa_stmt_info_(xid))) { + TRANS_LOG(WARN, "update xa stmt info failed", K(ret), K(xid), K(*this)); } else if (OB_FAIL(register_xa_timeout_task_())) { TRANS_LOG(WARN, "register xa timeout task failed", K(ret), K(*this)); } else { @@ -1420,6 +1426,8 @@ int ObXACtx::xa_start_local_(const ObXATransID &xid, timeout_seconds, flags))) { TRANS_LOG(WARN, "update xa branch info failed", K(ret), K(xid), K(*this)); + } else if (OB_FAIL(update_xa_stmt_info_(xid))) { + TRANS_LOG(WARN, "update xa stmt info failed", K(ret), K(xid), K(*this)); } else if (OB_FAIL(register_xa_timeout_task_())) { TRANS_LOG(WARN, "register xa timeout task failed", K(ret), K(xid), K(*this)); } else { @@ -1439,6 +1447,30 @@ int ObXACtx::xa_start_local_(const ObXATransID &xid, return ret; } +int ObXACtx::update_xa_stmt_info_(const ObXATransID &xid) +{ + int ret = OB_SUCCESS; + bool found = false; + if (!xa_stmt_info_.empty()) { + for (int64_t i = 0; !found && i < xa_stmt_info_.count(); ++i) { + ObXAStmtInfo &info = xa_stmt_info_.at(i); + if (info.xid_.all_equal_to(xid)) { + found = true; + break; + } + } + } + if (!found) { + ObXAStmtInfo stmt_info(xid); + if (OB_FAIL(xa_stmt_info_.push_back(stmt_info))) { + TRANS_LOG(WARN, "add stmt info failed", K(ret), K(stmt_info), K(*this)); + } + } else { + TRANS_LOG(WARN, "xa stmt info already exists ", K(ret), K(found), K(xid), K(*this)); + } + return ret; +} + // xa start in non-original scheduler // case 1: loosely coupled, xa start join // case 2: tightly coupled, xa start join @@ -1504,6 +1536,8 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid, if (OB_TRANS_XA_BRANCH_FAIL == ret) { set_terminated_(); } + } else if (OB_FAIL(update_xa_stmt_info_(xid))) { + TRANS_LOG(WARN, "update xa stmt info failed", K(ret), K(xid), K(*this)); } else { // xa_ref_count_ is added only when success is returned ++xa_ref_count_; @@ -1579,6 +1613,8 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid, if (OB_TRANS_XA_BRANCH_FAIL == ret) { set_terminated_(); } + } else if (OB_FAIL(update_xa_stmt_info_(xid))) { + TRANS_LOG(WARN, "update xa stmt info failed", K(ret), K(xid), K(*this)); } else { // xa_ref_count_ is increased only when success is returned ++xa_ref_count_; @@ -1655,6 +1691,9 @@ int ObXACtx::xa_end(const ObXATransID &xid, } } } + if (OB_SUCC(ret) && OB_FAIL(remove_xa_stmt_info_(xid))) { + TRANS_LOG(WARN, "remove xa stmt info failed", K(ret), K(xid), K(*this)); + } --xa_ref_count_; // if fail, force terminate @@ -1679,11 +1718,32 @@ int ObXACtx::xa_end(const ObXATransID &xid, return ret; } +int ObXACtx::remove_xa_stmt_info_(const ObXATransID &xid) +{ + int ret = OB_SUCCESS; + bool found = false; + if (!xa_stmt_info_.empty()) { + for (int64_t i = 0; !found && i < xa_stmt_info_.count(); ++i) { + ObXAStmtInfo &info = xa_stmt_info_.at(i); + if (info.xid_.all_equal_to(xid)) { + found = true; + xa_stmt_info_.remove(i); + break; + } + } + } + if (!found) { + ret = OB_ERR_UNEXPECTED; + } + TRANS_LOG(INFO, "remove xa stmt info", K(ret), K(found), K(xid), K(*this)); + return ret; +} + // start stmt // if tightly coupled mode, acquire lock and get tx info from original scheduler // if loosely coupled mode, do nothing // @param [in] xid, this is from session -int ObXACtx::start_stmt(const ObXATransID &xid) +int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id) { int ret = OB_SUCCESS; const bool is_original = (GCTX.self_addr() == original_sche_addr_); @@ -1709,6 +1769,8 @@ int ObXACtx::start_stmt(const ObXATransID &xid) if (is_executing_) { ret = OB_TRANS_STMT_NEED_RETRY; TRANS_LOG(INFO, "another branch is executing stmt, try again", K(ret), K(*this)); + } else if (OB_FAIL(create_xa_savepoint_if_need_(xid, session_id))) { + TRANS_LOG(WARN, "check xa savepoint fail", K(ret), K(xid), K(session_id), K(*this)); } else { // this flag indicates that a branch is executing normal stmt is_executing_ = true; @@ -1745,6 +1807,32 @@ int ObXACtx::start_stmt(const ObXATransID &xid) return ret; } +int ObXACtx::create_xa_savepoint_if_need_(const ObXATransID &xid, const uint32_t session_id) +{ + int ret = OB_SUCCESS; + bool found = false; + if (!xa_stmt_info_.empty()) { + for (int64_t i = 0; !found && i < xa_stmt_info_.count(); ++i) { + ObXAStmtInfo &info = xa_stmt_info_.at(i); + if (info.xid_.all_equal_to(xid)) { + found = true; + TRANS_LOG(INFO, "find info", K(ret), K(xid), K(info)); + if (info.is_first_stmt_) { + if (OB_FAIL(MTL(transaction::ObTransService *)->create_explicit_savepoint(*tx_desc_, PL_XA_IMPLICIT_SAVEPOINT, session_id))) { + TRANS_LOG(WARN, "create xa savepoint fail", K(ret), K(xid), K(session_id), K(*this)); + } + info.is_first_stmt_ = false; + } + break; + } + } + } + if (!found) { + ret = OB_ERR_UNEXPECTED; + } + return ret; +} + int ObXACtx::start_stmt_local_(const ObXATransID &xid) { int ret = OB_SUCCESS; diff --git a/src/storage/tx/ob_xa_ctx.h b/src/storage/tx/ob_xa_ctx.h index 43df258a2..c79fc999f 100644 --- a/src/storage/tx/ob_xa_ctx.h +++ b/src/storage/tx/ob_xa_ctx.h @@ -105,7 +105,7 @@ public: const int64_t timeout_seconds, ObTxDesc *&tx_desc); int xa_end(const ObXATransID &xid, const int64_t flags, ObTxDesc *&tx_desc); - int start_stmt(const ObXATransID &xid); + int start_stmt(const ObXATransID &xid, const uint32_t session_id); int wait_start_stmt(); int end_stmt(const ObXATransID &xid); const ObXATransID &get_executing_xid() const { return executing_xid_; } @@ -166,8 +166,8 @@ public: K_(trans_id), K_(is_executing), K_(is_xa_end_trans), K_(tenant_id), K_(is_xa_readonly), K_(xa_trans_state), K_(is_xa_one_phase), K_(xa_branch_count), K_(xa_ref_count), K_(lock_grant), - K_(is_tightly_coupled), K_(lock_xid), K_(is_terminated), - K_(executing_xid), "uref", get_uref(), + K_(is_tightly_coupled), K_(lock_xid), K_(xa_stmt_info), + K_(is_terminated), K_(executing_xid), "uref", get_uref(), K_(has_tx_level_temp_table)); private: int register_timeout_task_(const int64_t interval_us); @@ -246,6 +246,10 @@ private: int check_trans_state_(const bool is_rollback, const int64_t request_id, const bool is_xa_one_phase); + int update_xa_stmt_info_(const ObXATransID &xid); + int create_xa_savepoint_if_need_(const ObXATransID &xid, + const uint32_t session_id); + int remove_xa_stmt_info_(const ObXATransID &xid); private: // for 4.0 dblink int get_dblink_client_(const common::sqlclient::DblinkDriverProto dblink_type, @@ -290,6 +294,7 @@ private: bool is_tightly_coupled_; ObXATransID lock_xid_; ObXABranchInfoArray *xa_branch_info_; + ObXAStmtInfoArray xa_stmt_info_; bool is_terminated_; common::ObTraceEventRecorder tlog_; bool need_print_trace_log_; diff --git a/src/storage/tx/ob_xa_define.h b/src/storage/tx/ob_xa_define.h index 9254c1372..9a2353e20 100644 --- a/src/storage/tx/ob_xa_define.h +++ b/src/storage/tx/ob_xa_define.h @@ -33,6 +33,8 @@ extern const int64_t XA_INNER_TABLE_TIMEOUT; extern const bool ENABLE_NEW_XA; +static const ObString PL_XA_IMPLICIT_SAVEPOINT = "__PL_XA_IMPLICIT_SAVEPOINT"; + class ObXATransState { public: @@ -229,7 +231,18 @@ struct ObXABranchInfo int64_t end_flag_; }; +struct ObXAStmtInfo +{ + ObXAStmtInfo() : xid_(), is_first_stmt_(true) {} + ObXAStmtInfo(const ObXATransID xid) : xid_(xid), is_first_stmt_(true) {} + ~ObXAStmtInfo() {} + TO_STRING_KV(K_(xid), K_(is_first_stmt)); + ObXATransID xid_; + bool is_first_stmt_; +}; + typedef common::ObSEArray ObXABranchInfoArray; +typedef common::ObSEArray ObXAStmtInfoArray; class ObXATimeoutTask : public ObITimeoutTask { diff --git a/src/storage/tx/ob_xa_service.cpp b/src/storage/tx/ob_xa_service.cpp index 36b48832c..2408f08a3 100644 --- a/src/storage/tx/ob_xa_service.cpp +++ b/src/storage/tx/ob_xa_service.cpp @@ -1378,11 +1378,6 @@ int ObXAService::xa_end(const ObXATransID &xid, } else if (!tx_desc->is_xa_trans()) { ret = OB_TRANS_XA_PROTO; TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid)); - } else if (!xid.gtrid_equal_to(tx_desc->get_xid())) { - // tx->get_xid().gtrid != xid.gtrid - // oracle returns 0 - ret = OB_TRANS_XA_NOTA; - TRANS_LOG(WARN, "xid not match", K(ret), K(xid)); } else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid)); @@ -1402,7 +1397,7 @@ int ObXAService::xa_end(const ObXATransID &xid, return ret; } -int ObXAService::start_stmt(const ObXATransID &xid, ObTxDesc &tx_desc) +int ObXAService::start_stmt(const ObXATransID &xid, const uint32_t session_id, ObTxDesc &tx_desc) { int ret = OB_SUCCESS; const ObTransID &tx_id = tx_desc.tid(); @@ -1414,7 +1409,7 @@ int ObXAService::start_stmt(const ObXATransID &xid, ObTxDesc &tx_desc) if (NULL == xa_ctx) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected trans descriptor", K(ret), K(tx_id), K(xid)); - } else if (OB_FAIL(xa_ctx->start_stmt(xid))) { + } else if (OB_FAIL(xa_ctx->start_stmt(xid, session_id))) { TRANS_LOG(WARN, "xa trans start stmt failed", K(ret), K(tx_id), K(xid)); } else if (OB_FAIL(xa_ctx->wait_start_stmt())) { TRANS_LOG(WARN, "fail to wait start stmt", K(ret), K(tx_id), K(xid)); @@ -2195,7 +2190,7 @@ int ObXAService::xa_rollback_all_changes(const ObXATransID &xid, ObTxDesc *&tx_d ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", K(ret), KP(tx_desc), K(xid), K(stmt_expired_time)); } else { - if (OB_FAIL(start_stmt(xid, *tx_desc))) { + if (OB_FAIL(start_stmt(xid, 0/*unused session id*/, *tx_desc))) { TRANS_LOG(WARN, "xa start stmt fail", K(ret), K(tx_desc), K(xid)); } else if (OB_FAIL(MTL(transaction::ObTransService *)->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_expired_time, NULL))) { TRANS_LOG(WARN, "do savepoint rollback error", K(ret), K(tx_desc)); diff --git a/src/storage/tx/ob_xa_service.h b/src/storage/tx/ob_xa_service.h index 7c2dcf7b9..03535623d 100644 --- a/src/storage/tx/ob_xa_service.h +++ b/src/storage/tx/ob_xa_service.h @@ -113,7 +113,7 @@ public: //for rpc use int get_xa_ctx(const ObTransID &trans_id, bool &alloc, ObXACtx *&xa_ctx); int revert_xa_ctx(ObXACtx *xa_ctx); - int start_stmt(const ObXATransID &xid, ObTxDesc &tx_desc); + int start_stmt(const ObXATransID &xid, const uint32_t session_id, ObTxDesc &tx_desc); int end_stmt(const ObXATransID &xid, ObTxDesc &tx_desc); int handle_terminate_for_xa_branch(const ObXATransID &xid, ObTxDesc *tx_desc, diff --git a/unittest/storage/tx/it/test_tx_free_route.cpp b/unittest/storage/tx/it/test_tx_free_route.cpp index 1a8018ebf..85e84d775 100644 --- a/unittest/storage/tx/it/test_tx_free_route.cpp +++ b/unittest/storage/tx/it/test_tx_free_route.cpp @@ -495,13 +495,13 @@ int MockObServer::do_handle_(ObReq &req, ObResp &resp) tx_desc = NULL; break; case ObReq::T::SAVEPOINT: - ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_)); + ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0); break; case ObReq::T::ROLLBACK_SAVEPOINT: - ret = tx_node_.rollback_to_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 1 * 1000 * 1000); + ret = tx_node_.rollback_to_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 1 * 1000 * 1000, 0); break; case ObReq::T::RELEASE_SAVEPOINT: - ret = tx_node_.release_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_)); + ret = tx_node_.release_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0); break; case ObReq::T::READ: if (req.is_serializable_isolation_) {