[dblink][4.2/4.3] add interfaces to support select in dblink trans

This commit is contained in:
jw-guo
2023-10-18 07:39:57 +00:00
committed by ob-robot
parent 7de3b4472a
commit 1ff26def43
2 changed files with 77 additions and 1 deletions

View File

@ -99,6 +99,7 @@ void ObXACtx::reset()
has_tx_level_temp_table_ = false;
local_lock_level_ = -1;
executing_xid_.reset();
need_stmt_lock_ = true;
is_inited_ = false;
}
@ -1778,6 +1779,18 @@ int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id)
// NOTE that anohter error code maybe required for loosely coupled mode
ret = OB_TRANS_XA_BRANCH_FAIL;
TRANS_LOG(INFO, "xa trans has terminated", K(ret), K(xid), K(*this));
} else if (!need_stmt_lock_) {
// only for dblink
if (!is_executing_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected stmt lock", K(xid), K(*this));
} else if (xid.all_equal_to(executing_xid_)) {
local_lock_level_++;
TRANS_LOG(INFO, "acquire local lock repeatedly", K(xid), K_(local_lock_level), K(*this));
// return OB_SUCCESS
} else {
TRANS_LOG(INFO, "no need stmt lock", K(xid), K(*this));
}
} else if (!is_tightly_coupled_) {
// loosely coupled mode
if (OB_FAIL(create_xa_savepoint_if_need_(xid, session_id))) {
@ -1938,6 +1951,23 @@ int ObXACtx::end_stmt(const ObXATransID &xid)
} else if (OB_ISNULL(tx_desc_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "trans descriptor is null", K(ret), K(xid), K(*this));
} else if (!need_stmt_lock_) {
// only for dblink
if (!is_executing_ || local_lock_level_ < 0) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected lock lock", K(xid), K(*this));
} else if (xid.all_equal_to(executing_xid_)) {
// for case of repeated start stmt, local_lock_level mush be greater than 0
if (0 == local_lock_level_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected lock lock", K(xid), K(*this));
} else {
local_lock_level_--;
TRANS_LOG(INFO, "release local lock", K(xid), K_(local_lock_level), K(*this));
}
} else {
TRANS_LOG(INFO, "no need stmt lock", K(xid), K(*this));
}
} else if (!is_tightly_coupled_) {
// do nothing
} else if (OB_UNLIKELY(!is_executing_)) {
@ -3151,6 +3181,48 @@ int ObXACtx::check_trans_state_(const bool is_rollback,
return ret;
}
int ObXACtx::stop_check_stmt_lock(const ObXATransID &xid)
{
int ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (!is_executing_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this));
} else if (!xid.all_equal_to(executing_xid_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this));
} else if (!need_stmt_lock_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this));
} else {
need_stmt_lock_ = false;
}
TRANS_LOG(INFO, "stop check stmt lock", K(ret), K(xid), K_(trans_id), K_(is_terminated),
K_(local_lock_level));
return ret;
}
int ObXACtx::start_check_stmt_lock(const ObXATransID &xid)
{
int ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (!is_executing_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this));
} else if (!xid.all_equal_to(executing_xid_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this));
} else if (need_stmt_lock_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(INFO, "unexpected stmt lock", K(ret), K(xid), K(*this));
} else {
need_stmt_lock_ = true;
}
TRANS_LOG(INFO, "start check stmt lock", K(ret), K(xid), K_(trans_id), K_(is_terminated),
K_(local_lock_level));
return ret;
}
}//transaction
}//oceanbase

View File

@ -161,6 +161,9 @@ public:
int recover_tx_for_dblink_callback(ObTxDesc *&tx_desc);
int revert_tx_for_dblink_callback(ObTxDesc *&tx_desc);
bool has_tx_level_temp_table() { return has_tx_level_temp_table_; }
int start_check_stmt_lock(const ObXATransID &xid);
int stop_check_stmt_lock(const ObXATransID &xid);
OB_INLINE bool is_executing() const { return is_executing_; }
TO_STRING_KV(K_(is_inited), K_(xid), K_(original_sche_addr), K_(is_exiting),
K_(trans_id), K_(is_executing), K_(is_xa_end_trans), K_(tenant_id),
@ -168,7 +171,7 @@ public:
K_(xa_branch_count), K_(xa_ref_count), K_(lock_grant),
K_(is_tightly_coupled), K_(lock_xid), K_(xa_stmt_info),
K_(is_terminated), K_(executing_xid), "uref", get_uref(),
K_(has_tx_level_temp_table), K_(local_lock_level));
K_(has_tx_level_temp_table), K_(local_lock_level), K_(need_stmt_lock));
private:
int register_timeout_task_(const int64_t interval_us);
int unregister_timeout_task_();
@ -327,6 +330,7 @@ private:
// 4.1 if local_lock_level > 0, decrease the local_lock_level
// 4.2 if local_lock_level == 0, execute the normal global lock release processing
int64_t local_lock_level_;
bool need_stmt_lock_;
};
}//transaction