fix xa savepoint

This commit is contained in:
obdev 2023-02-17 09:12:43 +00:00 committed by ob-robot
parent 9cfd298d40
commit 897c5ae533
14 changed files with 212 additions and 37 deletions

View File

@ -189,6 +189,12 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt)
} else if (!my_session->get_in_transaction()) {
ret = OB_TRANS_XA_PROTO;
LOG_WARN("not in a trans", K(ret));
} else if (my_session->get_xid().empty()) {
ret = OB_TRANS_XA_PROTO;
LOG_WARN("not in xa trans", K(ret));
} else if (!xid.all_equal_to(my_session->get_xid())) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "xid not match", K(ret), K(xid));
} else {
int64_t flags = stmt.get_flags();
flags = my_session->has_tx_level_temp_table() ? (flags | ObXAFlag::TEMPTABLE) : flags;

View File

@ -497,6 +497,7 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx)
OZ (acquire_tx_if_need_(txs, *session));
OZ (stmt_sanity_check_(session, plan, plan_ctx));
OZ (txs->sql_stmt_start_hook(session->get_xid(), *session->get_tx_desc(), session->get_sessid()));
bool start_hook = OB_SUCC(ret) && !session->get_xid().empty() ? true : false;
if (OB_SUCC(ret)
&& txs->get_tx_elr_util().check_and_update_tx_elr_info(*session->get_tx_desc())) {
LOG_WARN("check and update tx elr info", K(ret), KPC(session->get_tx_desc()));
@ -531,6 +532,12 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx)
if (plan->is_contain_oracle_trx_level_temporary_table()) {
OX (tx_desc->set_with_temporary_table());
}
if (OB_FAIL(ret) && start_hook) {
int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc());
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("call sql stmt end hook fail", K(tmp_ret));
}
}
bool print_log = false;
#ifndef NDEBUG
print_log = true;
@ -701,7 +708,28 @@ int ObSqlTransControl::create_savepoint(ObExecContext &exec_ctx,
CHECK_TXN_FREE_ROUTE_ALLOWED();
OZ (get_tx_service(session, txs));
OZ (acquire_tx_if_need_(txs, *session));
OZ (txs->create_explicit_savepoint(*session->get_tx_desc(), sp_name), sp_name);
bool start_hook = false;
OZ(start_hook_if_need_(*session, txs, start_hook));
OZ (txs->create_explicit_savepoint(*session->get_tx_desc(), sp_name, !session->get_xid().empty() ? session->get_sessid() : 0), sp_name);
if (start_hook) {
int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc());
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("call sql stmt end hook fail", K(tmp_ret));
ret = COVER_SUCC(tmp_ret);
}
}
return ret;
}
int ObSqlTransControl::start_hook_if_need_(ObSQLSessionInfo &session,
transaction::ObTransService *txs,
bool &start_hook)
{
int ret = OB_SUCCESS;
if (!session.get_tx_desc()->is_shadow() && !session.has_start_stmt() &&
OB_SUCC(txs->sql_stmt_start_hook(session.get_xid(), *session.get_tx_desc(), session.get_sessid()))) {
start_hook = true;
}
return ret;
}
@ -720,7 +748,16 @@ int ObSqlTransControl::rollback_savepoint(ObExecContext &exec_ctx,
OZ (get_tx_service(session, txs));
OZ (acquire_tx_if_need_(txs, *session));
OX (stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session));
OZ (txs->rollback_to_explicit_savepoint(*session->get_tx_desc(), sp_name, stmt_expire_ts), sp_name);
bool start_hook = false;
OZ(start_hook_if_need_(*session, txs, start_hook));
OZ (txs->rollback_to_explicit_savepoint(*session->get_tx_desc(), sp_name, stmt_expire_ts, !session->get_xid().empty() ? session->get_sessid() : 0), sp_name);
if (start_hook) {
int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc());
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("call sql stmt end hook fail", K(tmp_ret));
ret = COVER_SUCC(tmp_ret);
}
}
return ret;
}
@ -735,7 +772,16 @@ int ObSqlTransControl::release_savepoint(ObExecContext &exec_ctx,
CHECK_TXN_FREE_ROUTE_ALLOWED();
OZ (get_tx_service(session, txs), *session);
OZ (acquire_tx_if_need_(txs, *session));
OZ (txs->release_explicit_savepoint(*session->get_tx_desc(), sp_name), *session, sp_name);
bool start_hook = false;
OZ(start_hook_if_need_(*session, txs, start_hook));
OZ (txs->release_explicit_savepoint(*session->get_tx_desc(), sp_name, !session->get_xid().empty() ? session->get_sessid() : 0), *session, sp_name);
if (start_hook) {
int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc());
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("call sql stmt end hook fail", K(tmp_ret));
ret = COVER_SUCC(tmp_ret);
}
}
return ret;
}

View File

@ -247,6 +247,9 @@ private:
const ObSQLSessionInfo &session);
static int inc_session_ref(const ObSQLSessionInfo *session);
static int acquire_tx_if_need_(transaction::ObTransService *txs, ObSQLSessionInfo &session);
static int start_hook_if_need_(ObSQLSessionInfo &session,
transaction::ObTransService *txs,
bool &start_hook);
public:
/*
* create a savepoint without name

View File

@ -54,7 +54,7 @@ ObTxIsolationLevel tx_isolation_from_str(const ObString &s)
}
ObTxSavePoint::ObTxSavePoint()
: type_(T::INVL), scn_(0), name_() {}
: type_(T::INVL), scn_(0), session_id_(0), name_() {}
ObTxSavePoint::ObTxSavePoint(const ObTxSavePoint &a)
{
@ -65,9 +65,14 @@ ObTxSavePoint &ObTxSavePoint::operator=(const ObTxSavePoint &a)
{
type_ = a.type_;
scn_ = a.scn_;
session_id_ = a.session_id_;
switch(type_) {
case T::SAVEPOINT:
case T::STASH: name_ = a.name_; break;
case T::STASH: {
name_ = a.name_;
session_id_ = a.session_id_;
break;
}
case T::SNAPSHOT: snapshot_ = a.snapshot_; break;
default: break;
}
@ -84,6 +89,7 @@ void ObTxSavePoint::release()
type_ = T::INVL;
snapshot_ = NULL;
scn_ = 0;
session_id_ = 0;
}
void ObTxSavePoint::rollback()
@ -101,7 +107,7 @@ void ObTxSavePoint::init(ObTxReadSnapshot *snapshot)
scn_ = snapshot->core_.scn_;
}
int ObTxSavePoint::init(int64_t scn, const ObString &name, const bool stash)
int ObTxSavePoint::init(int64_t scn, const ObString &name, const uint32_t session_id, const bool stash)
{
int ret = OB_SUCCESS;
if (OB_FAIL(name_.assign(name))) {
@ -113,6 +119,7 @@ int ObTxSavePoint::init(int64_t scn, const ObString &name, const bool stash)
} else {
type_ = stash ? T::STASH : T::SAVEPOINT;
scn_ = scn;
session_id_ = session_id;
}
return ret;
}
@ -129,6 +136,7 @@ DEF_TO_STRING(ObTxSavePoint)
}
J_COMMA();
J_KV(K_(scn));
J_KV(K_(session_id));
J_OBJ_END();
return pos;
}

View File

@ -257,6 +257,10 @@ class ObTxSavePoint
private:
enum class T { INVL= 0, SAVEPOINT= 1, SNAPSHOT= 2, STASH= 3 } type_;
int64_t scn_;
/* The savepoint should be unique to the session,
and the session id is required to distinguish the
savepoint for the multi-branch scenario of xa */
uint32_t session_id_;
union {
ObTxReadSnapshot *snapshot_;
common::ObFixedLengthString<128> name_;
@ -268,7 +272,7 @@ public:
ObTxSavePoint &operator=(const ObTxSavePoint &a);
void release();
void rollback();
int init(const int64_t scn, const ObString &name, const bool stash = false);
int init(const int64_t scn, const ObString &name, const uint32_t session_id, const bool stash = false);
void init(ObTxReadSnapshot *snapshot);
bool is_savepoint() const { return type_ == T::SAVEPOINT || type_ == T::STASH; }
bool is_snapshot() const { return type_ == T::SNAPSHOT; }
@ -518,6 +522,7 @@ public:
K_(flags_.BLOCK),
K_(flags_.REPLICA),
K_(can_elr),
K_(savepoints),
K_(cflict_txs),
K_(abort_cause),
K_(commit_expire_ts),
@ -543,6 +548,7 @@ public:
ObTxAccessMode get_access_mode() const { return access_mode_; }
bool is_rdonly() const { return access_mode_ == ObTxAccessMode::RD_ONLY; }
bool is_clean() const { return parts_.empty(); }
bool is_shadow() const { return flags_.SHADOW_; }
bool is_explicit() const { return flags_.EXPLICIT_; }
void set_with_temporary_table() { flags_.WITH_TEMP_TABLE_ = true; }
bool with_temporary_table() const { return flags_.WITH_TEMP_TABLE_; }

View File

@ -2324,7 +2324,7 @@ int ObTransService::recover_tx(const ObTxInfo &tx_info, ObTxDesc *&tx)
tx->tenant_id_ = tx_info.tenant_id_;
tx->cluster_id_ = tx_info.cluster_id_;
tx->cluster_version_ = tx_info.cluster_version_;
tx->addr_ = tx_info.addr_;
tx->addr_ = tx_info.addr_; /*origin scheduler addr*/
tx->tx_id_ = tx_info.tx_id_;
tx->isolation_ = tx_info.isolation_;
tx->access_mode_ = tx_info.access_mode_;

View File

@ -1135,7 +1135,7 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
return ret;
}
int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint)
int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id)
{
int ret = OB_SUCCESS;
int64_t scn = 0;
@ -1143,7 +1143,7 @@ int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &save
tx.inc_op_sn();
scn = ObSequence::inc_and_get_max_seq_no();
ObTxSavePoint sp;
if (OB_SUCC(sp.init(scn, savepoint))) {
if (OB_SUCC(sp.init(scn, savepoint, session_id))) {
if (OB_FAIL(tx.savepoints_.push_back(sp))) {
TRANS_LOG(WARN, "push savepoint failed", K(ret));
} else if (!tx.tx_id_.is_valid() && OB_FAIL(tx_desc_mgr_.add(tx))) {
@ -1154,7 +1154,7 @@ int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &save
ARRAY_FOREACH_X(tx.savepoints_, i, cnt, i != cnt - 1) {
ObTxSavePoint &it = tx.savepoints_.at(cnt - 2 - i);
if (it.is_stash()) { break; }
if (it.is_savepoint() && it.name_ == savepoint) {
if (it.is_savepoint() && it.name_ == savepoint && it.session_id_ == session_id) {
TRANS_LOG(TRACE, "move savepoint", K(savepoint), "from", it.scn_, "to", scn, K(tx));
it.release();
break; // assume only one if exist
@ -1180,7 +1180,8 @@ int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &save
// 2. invalidate savepoint and snapshot after the savepoint Node.
int ObTransService::rollback_to_explicit_savepoint(ObTxDesc &tx,
const ObString &savepoint,
const int64_t expire_ts)
const int64_t expire_ts,
const uint32_t session_id)
{
int ret = OB_SUCCESS;
auto start_ts = ObTimeUtility::current_time();
@ -1192,7 +1193,7 @@ int ObTransService::rollback_to_explicit_savepoint(ObTxDesc &tx,
const ObTxSavePoint &it = tx.savepoints_.at(cnt - 1 - i);
TRANS_LOG(TRACE, "sp iterate:", K(it));
if (it.is_stash()) { break; }
if (it.is_savepoint() && it.name_ == savepoint) {
if (it.is_savepoint() && it.name_ == savepoint && it.session_id_ == session_id) {
sp_scn = it.scn_;
break;
}
@ -1242,7 +1243,7 @@ int ObTransService::rollback_to_explicit_savepoint(ObTxDesc &tx,
// impl note
// registered snapshot keep valid
int ObTransService::release_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint)
int ObTransService::release_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id)
{
int ret = OB_SUCCESS;
bool hit = false;
@ -1252,7 +1253,7 @@ int ObTransService::release_explicit_savepoint(ObTxDesc &tx, const ObString &sav
tx.inc_op_sn();
ARRAY_FOREACH_N(tx.savepoints_, i, cnt) {
ObTxSavePoint &it = tx.savepoints_.at(cnt - 1 - i);
if (it.is_savepoint() && it.name_ == savepoint) {
if (it.is_savepoint() && it.name_ == savepoint && it.session_id_ == session_id) {
hit = true;
sp_id = it.scn_;
break;
@ -1269,7 +1270,7 @@ int ObTransService::release_explicit_savepoint(ObTxDesc &tx, const ObString &sav
it.release();
}
}
TRANS_LOG(TRACE, "release savepoint", K(savepoint), K(sp_id), K(tx));
TRANS_LOG(TRACE, "release savepoint", K(savepoint), K(sp_id), K(session_id), K(tx));
}
}
tx.state_change_flags_.EXTRA_CHANGED_ = true;
@ -1288,7 +1289,7 @@ int ObTransService::create_stash_savepoint(ObTxDesc &tx, const ObString &name)
tx.inc_op_sn();
auto seq_no = ObSequence::inc_and_get_max_seq_no();
ObTxSavePoint sp;
if (OB_SUCC(sp.init(seq_no, name, true))) {
if (OB_SUCC(sp.init(seq_no, name, 0, true))) {
if (OB_FAIL(tx.savepoints_.push_back(sp))) {
TRANS_LOG(WARN, "push savepoint failed", K(ret));
}
@ -1760,7 +1761,7 @@ int ObTransService::sql_stmt_start_hook(const ObXATransID &xid, ObTxDesc &tx, co
TRANS_LOG(WARN, "register tx fail", K(ret), K_(tx.tx_id), K(xid), KP(&tx));
} else { registed = true; }
}
if (OB_SUCC(ret) && OB_FAIL(MTL(ObXAService*)->start_stmt(xid, tx))) {
if (OB_SUCC(ret) && OB_FAIL(MTL(ObXAService*)->start_stmt(xid, session_id, tx))) {
TRANS_LOG(WARN, "xa trans start stmt failed", K(ret), K_(tx.xid), K(xid));
ObGlobalTxType global_tx_type = tx.get_global_tx_type(xid);
if (ObGlobalTxType::DBLINK_TRANS == global_tx_type && OB_TRANS_XA_BRANCH_FAIL == ret) {

View File

@ -326,12 +326,15 @@ int create_implicit_savepoint(ObTxDesc &tx,
* which hold the savepoint
* @savepoint: the name of savepoint to be created
*
* @session_id: the session id to which the savepoint
* belongs, used for xa
*
* Return:
* OB_SUCCESS - OK
* OB_ERR_TOO_LONG_IDENT - if savepoint was longer than 128 characters
* OB_ERR_TOO_MANY_SAVEPOINT - alive savepoint count out of limit (default 255)
*/
int create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint);
int create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id);
/**
* rollback_to_implicit_savepoint - rollback to a implicit savepoint
@ -386,7 +389,8 @@ int rollback_to_implicit_savepoint(ObTxDesc &tx,
*/
int rollback_to_explicit_savepoint(ObTxDesc &tx,
const ObString &savepoint,
const int64_t expire_ts);
const int64_t expire_ts,
const uint32_t session_id);
/**
* release_explicit_savepoint - release savepoint
@ -400,7 +404,7 @@ int rollback_to_explicit_savepoint(ObTxDesc &tx,
* OB_SUCCESS - OK
* OB_SAVEPOINT_NOT_EXIST - savepoint not found
*/
int release_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint);
int release_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id);
// ------------------------------------------------------------------
// savepoints stash

View File

@ -80,6 +80,7 @@ void ObXACtx::reset()
if (OB_NOT_NULL(xa_branch_info_)) {
xa_branch_info_->reset();
}
xa_stmt_info_.reset();
is_terminated_ = false;
tlog_.reset();
need_print_trace_log_ = true;
@ -986,6 +987,9 @@ int ObXACtx::process_end_stmt(const obrpc::ObXAEndStmtRPCRequest &req)
} else if (OB_FAIL(check_for_execution_(xid, false))) {
// include branch fail
TRANS_LOG(WARN, "check for execution failed", K(ret), K(xid), K(*this));
} else if (lock_xid_.empty()) {
// The rpc timeout may trigger repeated unlocking operations,
// which should be considered successful at this time
} else {
const ObTxStmtInfo &stmt_info = req.get_stmt_info();
// TODO, remove duplicate
@ -1376,6 +1380,8 @@ int ObXACtx::xa_start_(const ObXATransID &xid,
TRANS_LOG(WARN, "update branch info failed", K(ret), K(*this));
} else if (OB_FAIL(save_tx_desc_(tx_desc))) {
TRANS_LOG(WARN, "save trans desc failed", K(ret), K(*this));
} else if (OB_FAIL(update_xa_stmt_info_(xid))) {
TRANS_LOG(WARN, "update xa stmt info failed", K(ret), K(xid), K(*this));
} else if (OB_FAIL(register_xa_timeout_task_())) {
TRANS_LOG(WARN, "register xa timeout task failed", K(ret), K(*this));
} else {
@ -1420,6 +1426,8 @@ int ObXACtx::xa_start_local_(const ObXATransID &xid,
timeout_seconds,
flags))) {
TRANS_LOG(WARN, "update xa branch info failed", K(ret), K(xid), K(*this));
} else if (OB_FAIL(update_xa_stmt_info_(xid))) {
TRANS_LOG(WARN, "update xa stmt info failed", K(ret), K(xid), K(*this));
} else if (OB_FAIL(register_xa_timeout_task_())) {
TRANS_LOG(WARN, "register xa timeout task failed", K(ret), K(xid), K(*this));
} else {
@ -1439,6 +1447,30 @@ int ObXACtx::xa_start_local_(const ObXATransID &xid,
return ret;
}
int ObXACtx::update_xa_stmt_info_(const ObXATransID &xid)
{
int ret = OB_SUCCESS;
bool found = false;
if (!xa_stmt_info_.empty()) {
for (int64_t i = 0; !found && i < xa_stmt_info_.count(); ++i) {
ObXAStmtInfo &info = xa_stmt_info_.at(i);
if (info.xid_.all_equal_to(xid)) {
found = true;
break;
}
}
}
if (!found) {
ObXAStmtInfo stmt_info(xid);
if (OB_FAIL(xa_stmt_info_.push_back(stmt_info))) {
TRANS_LOG(WARN, "add stmt info failed", K(ret), K(stmt_info), K(*this));
}
} else {
TRANS_LOG(WARN, "xa stmt info already exists ", K(ret), K(found), K(xid), K(*this));
}
return ret;
}
// xa start in non-original scheduler
// case 1: loosely coupled, xa start join
// case 2: tightly coupled, xa start join
@ -1504,6 +1536,8 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid,
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
set_terminated_();
}
} else if (OB_FAIL(update_xa_stmt_info_(xid))) {
TRANS_LOG(WARN, "update xa stmt info failed", K(ret), K(xid), K(*this));
} else {
// xa_ref_count_ is added only when success is returned
++xa_ref_count_;
@ -1579,6 +1613,8 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid,
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
set_terminated_();
}
} else if (OB_FAIL(update_xa_stmt_info_(xid))) {
TRANS_LOG(WARN, "update xa stmt info failed", K(ret), K(xid), K(*this));
} else {
// xa_ref_count_ is increased only when success is returned
++xa_ref_count_;
@ -1655,6 +1691,9 @@ int ObXACtx::xa_end(const ObXATransID &xid,
}
}
}
if (OB_SUCC(ret) && OB_FAIL(remove_xa_stmt_info_(xid))) {
TRANS_LOG(WARN, "remove xa stmt info failed", K(ret), K(xid), K(*this));
}
--xa_ref_count_;
// if fail, force terminate
@ -1679,11 +1718,32 @@ int ObXACtx::xa_end(const ObXATransID &xid,
return ret;
}
int ObXACtx::remove_xa_stmt_info_(const ObXATransID &xid)
{
int ret = OB_SUCCESS;
bool found = false;
if (!xa_stmt_info_.empty()) {
for (int64_t i = 0; !found && i < xa_stmt_info_.count(); ++i) {
ObXAStmtInfo &info = xa_stmt_info_.at(i);
if (info.xid_.all_equal_to(xid)) {
found = true;
xa_stmt_info_.remove(i);
break;
}
}
}
if (!found) {
ret = OB_ERR_UNEXPECTED;
}
TRANS_LOG(INFO, "remove xa stmt info", K(ret), K(found), K(xid), K(*this));
return ret;
}
// start stmt
// if tightly coupled mode, acquire lock and get tx info from original scheduler
// if loosely coupled mode, do nothing
// @param [in] xid, this is from session
int ObXACtx::start_stmt(const ObXATransID &xid)
int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id)
{
int ret = OB_SUCCESS;
const bool is_original = (GCTX.self_addr() == original_sche_addr_);
@ -1709,6 +1769,8 @@ int ObXACtx::start_stmt(const ObXATransID &xid)
if (is_executing_) {
ret = OB_TRANS_STMT_NEED_RETRY;
TRANS_LOG(INFO, "another branch is executing stmt, try again", K(ret), K(*this));
} else if (OB_FAIL(create_xa_savepoint_if_need_(xid, session_id))) {
TRANS_LOG(WARN, "check xa savepoint fail", K(ret), K(xid), K(session_id), K(*this));
} else {
// this flag indicates that a branch is executing normal stmt
is_executing_ = true;
@ -1745,6 +1807,32 @@ int ObXACtx::start_stmt(const ObXATransID &xid)
return ret;
}
int ObXACtx::create_xa_savepoint_if_need_(const ObXATransID &xid, const uint32_t session_id)
{
int ret = OB_SUCCESS;
bool found = false;
if (!xa_stmt_info_.empty()) {
for (int64_t i = 0; !found && i < xa_stmt_info_.count(); ++i) {
ObXAStmtInfo &info = xa_stmt_info_.at(i);
if (info.xid_.all_equal_to(xid)) {
found = true;
TRANS_LOG(INFO, "find info", K(ret), K(xid), K(info));
if (info.is_first_stmt_) {
if (OB_FAIL(MTL(transaction::ObTransService *)->create_explicit_savepoint(*tx_desc_, PL_XA_IMPLICIT_SAVEPOINT, session_id))) {
TRANS_LOG(WARN, "create xa savepoint fail", K(ret), K(xid), K(session_id), K(*this));
}
info.is_first_stmt_ = false;
}
break;
}
}
}
if (!found) {
ret = OB_ERR_UNEXPECTED;
}
return ret;
}
int ObXACtx::start_stmt_local_(const ObXATransID &xid)
{
int ret = OB_SUCCESS;

View File

@ -105,7 +105,7 @@ public:
const int64_t timeout_seconds,
ObTxDesc *&tx_desc);
int xa_end(const ObXATransID &xid, const int64_t flags, ObTxDesc *&tx_desc);
int start_stmt(const ObXATransID &xid);
int start_stmt(const ObXATransID &xid, const uint32_t session_id);
int wait_start_stmt();
int end_stmt(const ObXATransID &xid);
const ObXATransID &get_executing_xid() const { return executing_xid_; }
@ -166,8 +166,8 @@ public:
K_(trans_id), K_(is_executing), K_(is_xa_end_trans), K_(tenant_id),
K_(is_xa_readonly), K_(xa_trans_state), K_(is_xa_one_phase),
K_(xa_branch_count), K_(xa_ref_count), K_(lock_grant),
K_(is_tightly_coupled), K_(lock_xid), K_(is_terminated),
K_(executing_xid), "uref", get_uref(),
K_(is_tightly_coupled), K_(lock_xid), K_(xa_stmt_info),
K_(is_terminated), K_(executing_xid), "uref", get_uref(),
K_(has_tx_level_temp_table));
private:
int register_timeout_task_(const int64_t interval_us);
@ -246,6 +246,10 @@ private:
int check_trans_state_(const bool is_rollback,
const int64_t request_id,
const bool is_xa_one_phase);
int update_xa_stmt_info_(const ObXATransID &xid);
int create_xa_savepoint_if_need_(const ObXATransID &xid,
const uint32_t session_id);
int remove_xa_stmt_info_(const ObXATransID &xid);
private:
// for 4.0 dblink
int get_dblink_client_(const common::sqlclient::DblinkDriverProto dblink_type,
@ -290,6 +294,7 @@ private:
bool is_tightly_coupled_;
ObXATransID lock_xid_;
ObXABranchInfoArray *xa_branch_info_;
ObXAStmtInfoArray xa_stmt_info_;
bool is_terminated_;
common::ObTraceEventRecorder tlog_;
bool need_print_trace_log_;

View File

@ -33,6 +33,8 @@ extern const int64_t XA_INNER_TABLE_TIMEOUT;
extern const bool ENABLE_NEW_XA;
static const ObString PL_XA_IMPLICIT_SAVEPOINT = "__PL_XA_IMPLICIT_SAVEPOINT";
class ObXATransState
{
public:
@ -229,7 +231,18 @@ struct ObXABranchInfo
int64_t end_flag_;
};
struct ObXAStmtInfo
{
ObXAStmtInfo() : xid_(), is_first_stmt_(true) {}
ObXAStmtInfo(const ObXATransID xid) : xid_(xid), is_first_stmt_(true) {}
~ObXAStmtInfo() {}
TO_STRING_KV(K_(xid), K_(is_first_stmt));
ObXATransID xid_;
bool is_first_stmt_;
};
typedef common::ObSEArray<ObXABranchInfo, 4> ObXABranchInfoArray;
typedef common::ObSEArray<ObXAStmtInfo, 1> ObXAStmtInfoArray;
class ObXATimeoutTask : public ObITimeoutTask
{

View File

@ -1378,11 +1378,6 @@ int ObXAService::xa_end(const ObXATransID &xid,
} else if (!tx_desc->is_xa_trans()) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid));
} else if (!xid.gtrid_equal_to(tx_desc->get_xid())) {
// tx->get_xid().gtrid != xid.gtrid
// oracle returns 0
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "xid not match", K(ret), K(xid));
} else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid));
@ -1402,7 +1397,7 @@ int ObXAService::xa_end(const ObXATransID &xid,
return ret;
}
int ObXAService::start_stmt(const ObXATransID &xid, ObTxDesc &tx_desc)
int ObXAService::start_stmt(const ObXATransID &xid, const uint32_t session_id, ObTxDesc &tx_desc)
{
int ret = OB_SUCCESS;
const ObTransID &tx_id = tx_desc.tid();
@ -1414,7 +1409,7 @@ int ObXAService::start_stmt(const ObXATransID &xid, ObTxDesc &tx_desc)
if (NULL == xa_ctx) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected trans descriptor", K(ret), K(tx_id), K(xid));
} else if (OB_FAIL(xa_ctx->start_stmt(xid))) {
} else if (OB_FAIL(xa_ctx->start_stmt(xid, session_id))) {
TRANS_LOG(WARN, "xa trans start stmt failed", K(ret), K(tx_id), K(xid));
} else if (OB_FAIL(xa_ctx->wait_start_stmt())) {
TRANS_LOG(WARN, "fail to wait start stmt", K(ret), K(tx_id), K(xid));
@ -2195,7 +2190,7 @@ int ObXAService::xa_rollback_all_changes(const ObXATransID &xid, ObTxDesc *&tx_d
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), KP(tx_desc), K(xid), K(stmt_expired_time));
} else {
if (OB_FAIL(start_stmt(xid, *tx_desc))) {
if (OB_FAIL(start_stmt(xid, 0/*unused session id*/, *tx_desc))) {
TRANS_LOG(WARN, "xa start stmt fail", K(ret), K(tx_desc), K(xid));
} else if (OB_FAIL(MTL(transaction::ObTransService *)->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_expired_time, NULL))) {
TRANS_LOG(WARN, "do savepoint rollback error", K(ret), K(tx_desc));

View File

@ -113,7 +113,7 @@ public:
//for rpc use
int get_xa_ctx(const ObTransID &trans_id, bool &alloc, ObXACtx *&xa_ctx);
int revert_xa_ctx(ObXACtx *xa_ctx);
int start_stmt(const ObXATransID &xid, ObTxDesc &tx_desc);
int start_stmt(const ObXATransID &xid, const uint32_t session_id, ObTxDesc &tx_desc);
int end_stmt(const ObXATransID &xid, ObTxDesc &tx_desc);
int handle_terminate_for_xa_branch(const ObXATransID &xid,
ObTxDesc *tx_desc,

View File

@ -495,13 +495,13 @@ int MockObServer::do_handle_(ObReq &req, ObResp &resp)
tx_desc = NULL;
break;
case ObReq::T::SAVEPOINT:
ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_));
ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0);
break;
case ObReq::T::ROLLBACK_SAVEPOINT:
ret = tx_node_.rollback_to_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 1 * 1000 * 1000);
ret = tx_node_.rollback_to_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 1 * 1000 * 1000, 0);
break;
case ObReq::T::RELEASE_SAVEPOINT:
ret = tx_node_.release_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_));
ret = tx_node_.release_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0);
break;
case ObReq::T::READ:
if (req.is_serializable_isolation_) {