From c6bbafe4731a268c427fd28fbc866fad1fba2fbc Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 20 Mar 2023 17:23:49 +0800 Subject: [PATCH] fix savepoint is not synchronized between servers --- src/sql/engine/cmd/ob_tcl_executor.cpp | 2 +- src/sql/ob_sql_trans_control.cpp | 5 ++- src/sql/ob_sql_trans_control.h | 2 +- src/storage/tx/ob_trans_define_v4.cpp | 38 +++++++++++++--- src/storage/tx/ob_trans_define_v4.h | 19 ++++++-- src/storage/tx/ob_trans_service_v4.cpp | 40 +++++++++++++++++ src/storage/tx/ob_trans_service_v4.h | 2 + src/storage/tx/ob_tx_api.cpp | 9 ++-- src/storage/tx/ob_tx_api.h | 5 ++- src/storage/tx/ob_xa_ctx.cpp | 44 +++++++++++++------ src/storage/tx/ob_xa_rpc.cpp | 15 ++++--- src/storage/tx/ob_xa_rpc.h | 15 ++++--- unittest/storage/tx/it/test_tx_free_route.cpp | 2 +- 13 files changed, 155 insertions(+), 43 deletions(-) diff --git a/src/sql/engine/cmd/ob_tcl_executor.cpp b/src/sql/engine/cmd/ob_tcl_executor.cpp index 7491e5be7..5c9604c4e 100644 --- a/src/sql/engine/cmd/ob_tcl_executor.cpp +++ b/src/sql/engine/cmd/ob_tcl_executor.cpp @@ -73,7 +73,7 @@ int ObCreateSavePointExecutor::execute(ObExecContext &ctx, if (OB_ISNULL(session)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid param", K(ret), K(session)); - } else if (OB_FAIL(ObSqlTransControl::create_savepoint(ctx, stmt.get_sp_name()))) { + } else if (OB_FAIL(ObSqlTransControl::create_savepoint(ctx, stmt.get_sp_name(), true))) { LOG_WARN("fail create savepoint", K(ret), K(stmt.get_sp_name())); } else if (!session->has_explicit_start_trans()) { if (OB_FAIL(session->get_autocommit(ac))) { diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index e022b6d28..5601370b8 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -722,7 +722,8 @@ int ObSqlTransControl::stmt_setup_savepoint_(ObSQLSessionInfo *session, K(session->get_txn_free_route_ctx()), KPC(session)); \ } int ObSqlTransControl::create_savepoint(ObExecContext &exec_ctx, - const ObString &sp_name) + const ObString &sp_name, + const bool user_create) { int ret = OB_SUCCESS; ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx); @@ -734,7 +735,7 @@ int ObSqlTransControl::create_savepoint(ObExecContext &exec_ctx, OZ (acquire_tx_if_need_(txs, *session)); bool start_hook = false; OZ(start_hook_if_need_(*session, txs, start_hook)); - OZ (txs->create_explicit_savepoint(*session->get_tx_desc(), sp_name, get_real_session_id(*session)), sp_name); + OZ (txs->create_explicit_savepoint(*session->get_tx_desc(), sp_name, get_real_session_id(*session), user_create), sp_name); if (start_hook) { int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc()); if (OB_SUCCESS != tmp_ret) { diff --git a/src/sql/ob_sql_trans_control.h b/src/sql/ob_sql_trans_control.h index 3f041a79d..abd39e1af 100644 --- a/src/sql/ob_sql_trans_control.h +++ b/src/sql/ob_sql_trans_control.h @@ -220,7 +220,7 @@ public: using namespace oceanbase::transaction; return kill_tx(session, static_cast(ObTxAbortCause::SESSION_DISCONNECT)); } - static int create_savepoint(ObExecContext &exec_ctx, const common::ObString &sp_name); + static int create_savepoint(ObExecContext &exec_ctx, const common::ObString &sp_name, const bool user_create = false); static int rollback_savepoint(ObExecContext &exec_ctx, const common::ObString &sp_name); static int release_savepoint(ObExecContext &exec_ctx, const common::ObString &sp_name); static int xa_rollback_all_changes(ObExecContext &exec_ctx); diff --git a/src/storage/tx/ob_trans_define_v4.cpp b/src/storage/tx/ob_trans_define_v4.cpp index a09ad7289..4514ccffb 100644 --- a/src/storage/tx/ob_trans_define_v4.cpp +++ b/src/storage/tx/ob_trans_define_v4.cpp @@ -54,7 +54,7 @@ ObTxIsolationLevel tx_isolation_from_str(const ObString &s) } ObTxSavePoint::ObTxSavePoint() - : type_(T::INVL), scn_(0), session_id_(0), name_() {} + : type_(T::INVL), scn_(0), session_id_(0), user_create_(false), name_() {} ObTxSavePoint::ObTxSavePoint(const ObTxSavePoint &a) { @@ -65,12 +65,12 @@ ObTxSavePoint &ObTxSavePoint::operator=(const ObTxSavePoint &a) { type_ = a.type_; scn_ = a.scn_; - session_id_ = a.session_id_; switch(type_) { case T::SAVEPOINT: case T::STASH: { name_ = a.name_; session_id_ = a.session_id_; + user_create_ = a.user_create_; break; } case T::SNAPSHOT: snapshot_ = a.snapshot_; break; @@ -79,6 +79,23 @@ ObTxSavePoint &ObTxSavePoint::operator=(const ObTxSavePoint &a) return *this; } +bool ObTxSavePoint::operator==(const ObTxSavePoint &a) const +{ + bool is_equal = false; + if (type_ == a.type_ && scn_== a.scn_) { + switch(type_) { + case T::SAVEPOINT: + case T::STASH: { + is_equal = name_ == a.name_ && session_id_ == a.session_id_ && user_create_ == a.user_create_; + break; + } + case T::SNAPSHOT: is_equal = snapshot_ == a.snapshot_; break; + default: break; + } + } + return is_equal; +} + ObTxSavePoint::~ObTxSavePoint() { release(); @@ -90,6 +107,7 @@ void ObTxSavePoint::release() snapshot_ = NULL; scn_ = 0; session_id_ = 0; + user_create_ = false; } void ObTxSavePoint::rollback() @@ -107,7 +125,7 @@ void ObTxSavePoint::init(ObTxReadSnapshot *snapshot) scn_ = snapshot->core_.scn_; } -int ObTxSavePoint::init(int64_t scn, const ObString &name, const uint32_t session_id, const bool stash) +int ObTxSavePoint::init(int64_t scn, const ObString &name, const uint32_t session_id, const bool user_create, const bool stash) { int ret = OB_SUCCESS; if (OB_FAIL(name_.assign(name))) { @@ -120,6 +138,7 @@ int ObTxSavePoint::init(int64_t scn, const ObString &name, const uint32_t sessio type_ = stash ? T::STASH : T::SAVEPOINT; scn_ = scn; session_id_ = session_id; + user_create_ = user_create; } return ret; } @@ -137,6 +156,7 @@ DEF_TO_STRING(ObTxSavePoint) J_COMMA(); J_KV(K_(scn)); J_KV(K_(session_id)); + J_KV(K_(user_create)); J_OBJ_END(); return pos; } @@ -169,6 +189,12 @@ OB_SERIALIZE_MEMBER(ObTxParam, access_mode_, isolation_, cluster_id_); +OB_SERIALIZE_MEMBER(ObTxSavePoint, + type_, + scn_, + session_id_, + user_create_, + name_); OB_SERIALIZE_MEMBER(ObTxInfo, tenant_id_, @@ -187,12 +213,14 @@ OB_SERIALIZE_MEMBER(ObTxInfo, expire_ts_, active_scn_, parts_, - session_id_); + session_id_, + savepoints_); OB_SERIALIZE_MEMBER(ObTxStmtInfo, tx_id_, op_sn_, parts_, - state_); + state_, + savepoints_); int ObTxDesc::trans_deep_copy(const ObTxDesc &x) { diff --git a/src/storage/tx/ob_trans_define_v4.h b/src/storage/tx/ob_trans_define_v4.h index fde882129..2d76d7494 100644 --- a/src/storage/tx/ob_trans_define_v4.h +++ b/src/storage/tx/ob_trans_define_v4.h @@ -261,6 +261,7 @@ private: and the session id is required to distinguish the savepoint for the multi-branch scenario of xa */ uint32_t session_id_; + bool user_create_; union { ObTxReadSnapshot *snapshot_; common::ObFixedLengthString<128> name_; @@ -270,14 +271,20 @@ public: ~ObTxSavePoint(); ObTxSavePoint(const ObTxSavePoint &s); ObTxSavePoint &operator=(const ObTxSavePoint &a); + bool operator==(const ObTxSavePoint &a) const; void release(); void rollback(); - int init(const int64_t scn, const ObString &name, const uint32_t session_id, const bool stash = false); + int init(const int64_t scn, + const ObString &name, + const uint32_t session_id, + const bool user_create, + const bool stash = false); void init(ObTxReadSnapshot *snapshot); bool is_savepoint() const { return type_ == T::SAVEPOINT || type_ == T::STASH; } bool is_snapshot() const { return type_ == T::SNAPSHOT; } bool is_stash() const { return type_ == T::STASH; } DECLARE_TO_STRING; + OB_UNIS_VERSION(1); }; typedef ObSEArray ObTxSavePointList; @@ -741,6 +748,7 @@ private: class ObTxInfo { friend class ObTransService; + friend class ObXACtx; OB_UNIS_VERSION(1); protected: uint64_t tenant_id_; @@ -761,6 +769,7 @@ protected: int64_t active_scn_; ObTxPartList parts_; uint32_t session_id_ = 0; + ObTxSavePointList savepoints_; public: TO_STRING_KV(K_(tenant_id), K_(session_id), @@ -776,7 +785,8 @@ public: K_(expire_ts), K_(parts), K_(cluster_id), - K_(cluster_version)); + K_(cluster_version), + K_(savepoints)); // TODO xa bool is_valid() const { return tx_id_.is_valid(); } const ObTransID &tid() const { return tx_id_; } @@ -785,17 +795,20 @@ public: class ObTxStmtInfo { friend class ObTransService; + friend class ObXACtx; OB_UNIS_VERSION(1); protected: ObTransID tx_id_; uint64_t op_sn_; ObTxPartList parts_; ObTxDesc::State state_; + ObTxSavePointList savepoints_; public: TO_STRING_KV(K_(tx_id), K_(op_sn), K_(parts), - K_(state)); + K_(state), + K_(savepoints)); // TODO xa bool is_valid() const { return tx_id_.is_valid(); } const ObTransID &tid() const { return tx_id_; } diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index a661e8101..3d68596a3 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -2352,6 +2352,10 @@ int ObTransService::recover_tx(const ObTxInfo &tx_info, ObTxDesc *&tx) tx_desc_mgr_.revert(*tx); tx = NULL; TRANS_LOG(WARN, "assgin parts fail", K(ret)); + } else if (tx->savepoints_.assign(tx_info.savepoints_)) { + tx_desc_mgr_.revert(*tx); + tx = NULL; + TRANS_LOG(WARN, "assgin savepoints fail", K(ret)); } else if (OB_FAIL(tx_desc_mgr_.add_with_txid(tx_info.tx_id_, *tx))) { tx_desc_mgr_.revert(*tx); tx = NULL; @@ -2387,6 +2391,8 @@ int ObTransService::get_tx_info(ObTxDesc &tx, ObTxInfo &tx_info) tx.lock_.lock(); if (OB_FAIL(tx_info.parts_.assign(tx.parts_))) { TRANS_LOG(WARN, "assgin parts fail", K(ret), K(tx)); + } else if (OB_FAIL(assign_user_savepoint_(tx, tx_info.savepoints_))) { + TRANS_LOG(WARN, "assgin savepoint fail", K(ret), K(tx)); } else { tx_info.tenant_id_ = tx.tenant_id_; tx_info.cluster_id_ = tx.cluster_id_; @@ -2410,12 +2416,43 @@ int ObTransService::get_tx_info(ObTxDesc &tx, ObTxInfo &tx_info) return ret; } +int ObTransService::assign_user_savepoint_(ObTxDesc &tx, ObTxSavePointList &savepoints) +{ + int ret = OB_SUCCESS; + ARRAY_FOREACH_N(tx.savepoints_, i, cnt) { + if (tx.savepoints_.at(i).user_create_) { + if (OB_FAIL(savepoints.push_back(tx.savepoints_.at(i)))) { + TRANS_LOG(WARN, "push back user create sp fail", K(ret), K(tx)); + } + } + } + return ret; +} + +int ObTransService::update_user_savepoint(ObTxDesc &tx, const ObTxSavePointList &savepoints) +{ + int ret = OB_SUCCESS; + int j = 0; + bool is_contain = false; + ARRAY_FOREACH_N(savepoints, i, cnt) { + for (j = 0, is_contain = false; jop_sn_ = tx_info.op_sn_; tx->state_ = tx_info.state_; tx->update_parts_(tx_info.parts_); + if (OB_FAIL(MTL(ObTransService *)->update_user_savepoint(*tx, tx_info.savepoints_))) { + TRANS_LOG(WARN, "update user sp fail", K(ret), K(*this), K(tx), K(tx_info)); + } tx->lock_.unlock(); return ret; } diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index 614c6647f..dcd8900ca 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -140,6 +140,7 @@ int iterate_tx_scheduler_stat(ObTxSchedulerStatIterator &tx_scheduler_stat_iter) * recover transaction descriptor with tx info */ int recover_tx(const ObTxInfo &tx_info, ObTxDesc *&tx); +int update_user_savepoint(ObTxDesc &tx, const ObTxSavePointList &savepoints); int get_tx_info(ObTxDesc &tx, ObTxInfo &tx_info); int get_tx_stmt_info(ObTxDesc &tx, ObTxStmtInfo &stmt_info); int update_tx_with_stmt_info(const ObTxStmtInfo &tx_info, ObTxDesc *&tx); @@ -292,6 +293,7 @@ int sub_end_tx_local_ls_(const ObTransID &tx_id, const ObXATransID &xid, const common::ObAddr &sender_addr, const bool is_rollback); +int assign_user_savepoint_(ObTxDesc &tx, ObTxSavePointList &savepoints); private: ObTxCtxMgr tx_ctx_mgr_; diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index d018ec0c5..9f35f30c5 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1143,7 +1143,10 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, return ret; } -int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id) +int ObTransService::create_explicit_savepoint(ObTxDesc &tx, + const ObString &savepoint, + const uint32_t session_id, + const bool user_create) { int ret = OB_SUCCESS; int64_t scn = 0; @@ -1151,7 +1154,7 @@ int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &save tx.inc_op_sn(); scn = ObSequence::inc_and_get_max_seq_no(); ObTxSavePoint sp; - if (OB_SUCC(sp.init(scn, savepoint, session_id))) { + if (OB_SUCC(sp.init(scn, savepoint, session_id, user_create))) { if (OB_FAIL(tx.savepoints_.push_back(sp))) { TRANS_LOG(WARN, "push savepoint failed", K(ret)); } else if (!tx.tx_id_.is_valid() && OB_FAIL(tx_desc_mgr_.add(tx))) { @@ -1297,7 +1300,7 @@ int ObTransService::create_stash_savepoint(ObTxDesc &tx, const ObString &name) tx.inc_op_sn(); auto seq_no = ObSequence::inc_and_get_max_seq_no(); ObTxSavePoint sp; - if (OB_SUCC(sp.init(seq_no, name, 0, true))) { + if (OB_SUCC(sp.init(seq_no, name, 0, false, true))) { if (OB_FAIL(tx.savepoints_.push_back(sp))) { TRANS_LOG(WARN, "push savepoint failed", K(ret)); } diff --git a/src/storage/tx/ob_tx_api.h b/src/storage/tx/ob_tx_api.h index 63e7372cc..720c92580 100644 --- a/src/storage/tx/ob_tx_api.h +++ b/src/storage/tx/ob_tx_api.h @@ -334,7 +334,10 @@ int create_implicit_savepoint(ObTxDesc &tx, * OB_ERR_TOO_LONG_IDENT - if savepoint was longer than 128 characters * OB_ERR_TOO_MANY_SAVEPOINT - alive savepoint count out of limit (default 255) */ -int create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id); +int create_explicit_savepoint(ObTxDesc &tx, + const ObString &savepoint, + const uint32_t session_id, + const bool user_create); /** * rollback_to_implicit_savepoint - rollback to a implicit savepoint diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 2785e5eb0..c9c12faa4 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -730,9 +730,9 @@ int ObXACtx::process_xa_start_tightly_(const obrpc::ObXAStartRPCRequest &req) } else if (is_new_branch) { ++xa_branch_count_; } - if (OB_SUCC(ret) && req.need_response()) { + if (OB_SUCC(ret)) { SMART_VAR(ObXAStartRPCResponse, response) { - if (OB_FAIL(response.init(trans_id_, *tx_desc_))) { + if (OB_FAIL(response.init(trans_id_, *tx_desc_, req.is_first_branch()))) { TRANS_LOG(WARN, "init xa start response failed", K(ret)); } else if (OB_FAIL(xa_rpc_->xa_start_response(tenant_id_, sender, response, NULL))) { TRANS_LOG(WARN, "xa start response failed", K(ret)); @@ -773,7 +773,7 @@ int ObXACtx::process_xa_start_loosely_(const obrpc::ObXAStartRPCRequest &req) TRANS_LOG(WARN, "register xa trans timeout task failed", K(ret), K(xid), K(*this)); } else { SMART_VAR(ObXAStartRPCResponse, response) { - if (OB_FAIL(response.init(trans_id_, *tx_desc_))) { + if (OB_FAIL(response.init(trans_id_, *tx_desc_, req.is_first_branch()))) { TRANS_LOG(WARN, "init xa start response failed", K(ret)); } else if (OB_FAIL(xa_rpc_->xa_start_response(tenant_id_, sender, response, NULL))) { TRANS_LOG(WARN, "xa start response failed", K(ret)); @@ -790,9 +790,13 @@ int ObXACtx::process_xa_start_response(const obrpc::ObXAStartRPCResponse &resp) int ret = OB_SUCCESS; const ObTxInfo &tx_info = resp.get_tx_info(); // no need guard - if (NULL != tx_desc_) { + if (resp.is_first_branch() && NULL != tx_desc_) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "trans desc is NULL", K(ret)); + } else if (!resp.is_first_branch()) { + if (OB_FAIL(MTL(ObTransService *)->update_user_savepoint(*tx_desc_, tx_info.savepoints_))) { + TRANS_LOG(WARN, "update user sp fail", K(ret), K(*this), K(tx_info)); + } } else if (OB_FAIL(MTL(ObTransService *)->recover_tx(tx_info, tx_desc_))) { TRANS_LOG(WARN, "recover tx failed", K(ret), K(*this), K(tx_info)); } else { @@ -848,6 +852,11 @@ int ObXACtx::process_xa_end(const obrpc::ObXAEndRPCRequest &req) if (OB_FAIL(MTL(ObTransService*)->update_tx_with_stmt_info(stmt_info, tx_desc_))) { TRANS_LOG(WARN, "update tx desc with stmt info failed", K(ret), K(req), K(*this)); } + } else { + // if tightly coupled, need to update user savepoint + if (OB_FAIL(MTL(ObTransService *)->update_user_savepoint(*tx_desc_, stmt_info.savepoints_))) { + TRANS_LOG(WARN, "update user sp fail", K(ret), K(*this), K(req)); + } } } if (OB_TRANS_XA_BRANCH_FAIL == ret) { @@ -1357,6 +1366,7 @@ int ObXACtx::xa_start_remote_first(const ObXATransID &xid, } else { // set global trans type to xa trans tx_desc->set_global_tx_type(ObGlobalTxType::XA_TRANS); + TRANS_LOG(WARN, "xa start remote success", K(ret), K(xid), K(flags), K(*tx_desc), K(*this)); } TRANS_LOG(INFO, "xa start remote first", K(ret), K(xid), K(flags), K(*this)); return ret; @@ -1486,7 +1496,7 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid, int tmp_ret = OB_SUCCESS; int result = OB_SUCCESS; const int64_t now = ObTimeUtility::current_time(); - const bool need_response = true; // need tx desc + const bool is_first_branch = true; // need tx desc ObXAStartRPCRequest xa_start_request; obrpc::ObXARPCCB cb; ObTransCond cond; @@ -1503,7 +1513,7 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid, is_tightly_coupled_, timeout_seconds, flags, - need_response))) { + is_first_branch))) { TRANS_LOG(WARN, "init sync request failed", K(ret), K(*this)); } else if (OB_FAIL(cb.init(&cond))) { TRANS_LOG(WARN, "init cb failed", K(ret), K(xid), K(*this)); @@ -1565,12 +1575,12 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid, int tmp_ret = OB_SUCCESS; int result = OB_SUCCESS; const int64_t now = ObTimeUtility::current_time(); - const bool need_response = false; // no need tx desc + const bool is_first_branch = false; // no need tx desc ObXAStartRPCRequest xa_start_request; obrpc::ObXARPCCB cb; ObTransCond cond; const int64_t wait_time = (INT64_MAX / 2 ) - now; - // xa_sync_status_cond_.reset(); + xa_sync_status_cond_.reset(); if (OB_ISNULL(xa_rpc_) || OB_ISNULL(xa_service_)) { ret = OB_ERR_UNEXPECTED; @@ -1588,7 +1598,7 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid, is_tightly_coupled_, timeout_seconds, flags, - need_response))) { + is_first_branch))) { TRANS_LOG(WARN, "init sync request failed", K(ret), K(*this)); } else if (OB_FAIL(cb.init(&cond))) { TRANS_LOG(WARN, "init cb failed", K(ret), K(xid), K(*this)); @@ -1606,6 +1616,10 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid, } else { TRANS_LOG(WARN, "wait cond failed", K(ret), K(xid), K(result)); } + } else if (OB_FAIL(wait_xa_sync_status_(wait_time))) { + TRANS_LOG(WARN, "wait xa sync status failed", K(ret), K(xid)); + } else { + // do nothing } } @@ -1819,7 +1833,7 @@ int ObXACtx::create_xa_savepoint_if_need_(const ObXATransID &xid, const uint32_t found = true; TRANS_LOG(INFO, "find info", K(ret), K(xid), K(info)); if (info.is_first_stmt_) { - if (OB_FAIL(MTL(transaction::ObTransService *)->create_explicit_savepoint(*tx_desc_, PL_XA_IMPLICIT_SAVEPOINT, session_id))) { + if (OB_FAIL(MTL(transaction::ObTransService *)->create_explicit_savepoint(*tx_desc_, PL_XA_IMPLICIT_SAVEPOINT, session_id, false))) { TRANS_LOG(WARN, "create xa savepoint fail", K(ret), K(xid), K(session_id), K(*this)); } else { TRANS_LOG(INFO, "create pl xa savepoint success", K(ret), K(xid), K(session_id), K(*this)); @@ -2670,10 +2684,12 @@ int ObXACtx::xa_prepare(const ObXATransID &xid, const int64_t timeout_us) } else if (OB_FAIL(xa_prepare_(xid, timeout_us, need_exit))) { TRANS_LOG(WARN, "xa prepare failed", K(ret), K(xid), K(need_exit), K(*this)); if (need_exit) { - int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_, - ObTxAbortCause::IMPLICIT_ROLLBACK))) { - TRANS_LOG(WARN, "fail to stop transaction", K(tmp_ret), K(*this)); + if (OB_ERR_READ_ONLY_TRANSACTION != ret) { + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_, + ObTxAbortCause::IMPLICIT_ROLLBACK))) { + TRANS_LOG(WARN, "fail to stop transaction", K(tmp_ret), K(*this)); + } } set_terminated_(); set_exiting_(); diff --git a/src/storage/tx/ob_xa_rpc.cpp b/src/storage/tx/ob_xa_rpc.cpp index a06247d18..f7e81aaff 100644 --- a/src/storage/tx/ob_xa_rpc.cpp +++ b/src/storage/tx/ob_xa_rpc.cpp @@ -36,12 +36,13 @@ OB_SERIALIZE_MEMBER(ObXAStartRPCRequest, sender_, is_new_branch_, is_tightly_coupled_, - need_response_, + is_first_branch_, timeout_seconds_, flags_); OB_SERIALIZE_MEMBER(ObXAStartRPCResponse, tx_id_, - tx_info_); + tx_info_, + is_first_branch_); OB_SERIALIZE_MEMBER(ObXAEndRPCRequest, tx_id_, stmt_info_, @@ -162,7 +163,7 @@ int ObXAStartRPCRequest::init(const ObTransID &tx_id, const bool is_tightly_coupled, const int64_t timeout_seconds, const int64_t flags, - const bool need_response) + const bool is_first_branch) { int ret = OB_SUCCESS; if (!tx_id.is_valid() || @@ -178,7 +179,7 @@ int ObXAStartRPCRequest::init(const ObTransID &tx_id, sender_ = sender; is_new_branch_ = is_new_branch; is_tightly_coupled_ = is_tightly_coupled; - need_response_ = need_response; // false by default + is_first_branch_ = is_first_branch; // false by default timeout_seconds_ = timeout_seconds; flags_ = flags; } @@ -214,7 +215,7 @@ int ObXAStartP::process() if (xa_ctx->is_tightly_coupled() != is_tightly_coupled) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected tight couple flag", K(is_tightly_coupled), K(xid), K(trans_id)); - } else if (arg_.need_response() && OB_FAIL(xa_ctx->wait_xa_start_complete())) { + } else if (arg_.is_first_branch() && OB_FAIL(xa_ctx->wait_xa_start_complete())) { TRANS_LOG(WARN, "wait xa start complete failed", K(ret)); } else if (OB_FAIL(xa_ctx->process_xa_start(arg_))) { TRANS_LOG(WARN, "xa ctx remote pull failed", K(ret), K(trans_id), K(xid)); @@ -228,7 +229,8 @@ int ObXAStartP::process() } int ObXAStartRPCResponse::init(const ObTransID &tx_id, - ObTxDesc &tx_desc) + ObTxDesc &tx_desc, + const bool is_first_branch) { int ret = OB_SUCCESS; if (!tx_desc.is_valid()) { @@ -238,6 +240,7 @@ int ObXAStartRPCResponse::init(const ObTransID &tx_id, TRANS_LOG(WARN, "get tx info failed", KR(ret)); } else { tx_id_ = tx_id; + is_first_branch_ = is_first_branch; } return ret; } diff --git a/src/storage/tx/ob_xa_rpc.h b/src/storage/tx/ob_xa_rpc.h index 993e91c43..3f0218345 100644 --- a/src/storage/tx/ob_xa_rpc.h +++ b/src/storage/tx/ob_xa_rpc.h @@ -66,18 +66,18 @@ public: const bool is_tightly_coupled, const int64_t timeout_seconds, const int64_t flags, - const bool need_response); + const bool is_first_branch); const transaction::ObXATransID &get_xid() const { return xid_; } const transaction::ObTransID &get_tx_id() const { return tx_id_; } const common::ObAddr &get_sender() const { return sender_; } bool is_new_branch() const { return is_new_branch_; } bool is_tightly_coupled() const { return is_tightly_coupled_; } - bool need_response() const { return need_response_; } + bool is_first_branch() const { return is_first_branch_; } int64_t get_timeout_seconds() const { return timeout_seconds_; } int64_t get_flags() const { return flags_; } bool is_valid() const; TO_STRING_KV(K_(tx_id), K_(xid), K_(sender), K_(is_new_branch), K_(is_tightly_coupled), - K_(need_response), K_(timeout_seconds), K_(flags)); + K_(is_first_branch), K_(timeout_seconds), K_(flags)); private: transaction::ObTransID tx_id_; transaction::ObXATransID xid_; @@ -85,7 +85,7 @@ private: bool is_new_branch_; // for tightly-coupled xa branches bool is_tightly_coupled_; - bool need_response_; + bool is_first_branch_; int64_t timeout_seconds_; int64_t flags_; }; @@ -97,14 +97,17 @@ public: ObXAStartRPCResponse() {} ~ObXAStartRPCResponse() {} int init(const transaction::ObTransID &tx_id, - transaction::ObTxDesc &tx_desc); + transaction::ObTxDesc &tx_desc, + const bool is_first_branch); const transaction::ObTransID &get_tx_id() const { return tx_id_; } const transaction::ObTxInfo &get_tx_info() const { return tx_info_; } + bool is_first_branch() const { return is_first_branch_; } bool is_valid() const { return tx_id_.is_valid() && tx_info_.is_valid(); } - TO_STRING_KV(K_(tx_id), K_(tx_info)); + TO_STRING_KV(K_(tx_id), K_(tx_info), K_(is_first_branch)); private: transaction::ObTransID tx_id_; transaction::ObTxInfo tx_info_; + bool is_first_branch_; }; class ObXAEndRPCRequest diff --git a/unittest/storage/tx/it/test_tx_free_route.cpp b/unittest/storage/tx/it/test_tx_free_route.cpp index 85e84d775..36c821fea 100644 --- a/unittest/storage/tx/it/test_tx_free_route.cpp +++ b/unittest/storage/tx/it/test_tx_free_route.cpp @@ -495,7 +495,7 @@ int MockObServer::do_handle_(ObReq &req, ObResp &resp) tx_desc = NULL; break; case ObReq::T::SAVEPOINT: - ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0); + ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0, false); break; case ObReq::T::ROLLBACK_SAVEPOINT: ret = tx_node_.rollback_to_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 1 * 1000 * 1000, 0);