diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 26fb06efac..d030498a02 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -99,6 +99,7 @@ void ObXACtx::reset() has_tx_level_temp_table_ = false; local_lock_level_ = -1; executing_xid_.reset(); + need_stmt_lock_ = true; is_inited_ = false; } @@ -1778,6 +1779,18 @@ int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id) // NOTE that anohter error code maybe required for loosely coupled mode ret = OB_TRANS_XA_BRANCH_FAIL; TRANS_LOG(INFO, "xa trans has terminated", K(ret), K(xid), K(*this)); + } else if (!need_stmt_lock_) { + // only for dblink + if (!is_executing_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected stmt lock", K(xid), K(*this)); + } else if (xid.all_equal_to(executing_xid_)) { + local_lock_level_++; + TRANS_LOG(INFO, "acquire local lock repeatedly", K(xid), K_(local_lock_level), K(*this)); + // return OB_SUCCESS + } else { + TRANS_LOG(INFO, "no need stmt lock", K(xid), K(*this)); + } } else if (!is_tightly_coupled_) { // loosely coupled mode if (OB_FAIL(create_xa_savepoint_if_need_(xid, session_id))) { @@ -1938,6 +1951,23 @@ int ObXACtx::end_stmt(const ObXATransID &xid) } else if (OB_ISNULL(tx_desc_)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "trans descriptor is null", K(ret), K(xid), K(*this)); + } else if (!need_stmt_lock_) { + // only for dblink + if (!is_executing_ || local_lock_level_ < 0) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected lock lock", K(xid), K(*this)); + } else if (xid.all_equal_to(executing_xid_)) { + // for case of repeated start stmt, local_lock_level mush be greater than 0 + if (0 == local_lock_level_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected lock lock", K(xid), K(*this)); + } else { + local_lock_level_--; + TRANS_LOG(INFO, "release local lock", K(xid), K_(local_lock_level), K(*this)); + } + } else { + TRANS_LOG(INFO, "no need stmt lock", K(xid), K(*this)); + } } else if (!is_tightly_coupled_) { // do nothing } else if (OB_UNLIKELY(!is_executing_)) { @@ -3151,6 +3181,48 @@ int ObXACtx::check_trans_state_(const bool is_rollback, return ret; } +int ObXACtx::stop_check_stmt_lock(const ObXATransID &xid) +{ + int ret = OB_SUCCESS; + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (!is_executing_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this)); + } else if (!xid.all_equal_to(executing_xid_)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this)); + } else if (!need_stmt_lock_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this)); + } else { + need_stmt_lock_ = false; + } + TRANS_LOG(INFO, "stop check stmt lock", K(ret), K(xid), K_(trans_id), K_(is_terminated), + K_(local_lock_level)); + return ret; +} + +int ObXACtx::start_check_stmt_lock(const ObXATransID &xid) +{ + int ret = OB_SUCCESS; + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (!is_executing_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this)); + } else if (!xid.all_equal_to(executing_xid_)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this)); + } else if (need_stmt_lock_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this)); + } else { + need_stmt_lock_ = true; + } + TRANS_LOG(INFO, "start check stmt lock", K(ret), K(xid), K_(trans_id), K_(is_terminated), + K_(local_lock_level)); + return ret; +} + }//transaction }//oceanbase diff --git a/src/storage/tx/ob_xa_ctx.h b/src/storage/tx/ob_xa_ctx.h index 66bcdd519a..449727303b 100644 --- a/src/storage/tx/ob_xa_ctx.h +++ b/src/storage/tx/ob_xa_ctx.h @@ -161,6 +161,9 @@ public: int recover_tx_for_dblink_callback(ObTxDesc *&tx_desc); int revert_tx_for_dblink_callback(ObTxDesc *&tx_desc); bool has_tx_level_temp_table() { return has_tx_level_temp_table_; } + int start_check_stmt_lock(const ObXATransID &xid); + int stop_check_stmt_lock(const ObXATransID &xid); + OB_INLINE bool is_executing() const { return is_executing_; } TO_STRING_KV(K_(is_inited), K_(xid), K_(original_sche_addr), K_(is_exiting), K_(trans_id), K_(is_executing), K_(is_xa_end_trans), K_(tenant_id), @@ -168,7 +171,7 @@ public: K_(xa_branch_count), K_(xa_ref_count), K_(lock_grant), K_(is_tightly_coupled), K_(lock_xid), K_(xa_stmt_info), K_(is_terminated), K_(executing_xid), "uref", get_uref(), - K_(has_tx_level_temp_table), K_(local_lock_level)); + K_(has_tx_level_temp_table), K_(local_lock_level), K_(need_stmt_lock)); private: int register_timeout_task_(const int64_t interval_us); int unregister_timeout_task_(); @@ -327,6 +330,7 @@ private: // 4.1 if local_lock_level > 0, decrease the local_lock_level // 4.2 if local_lock_level == 0, execute the normal global lock release processing int64_t local_lock_level_; + bool need_stmt_lock_; }; }//transaction