fix savepoint is not synchronized between servers

This commit is contained in:
obdev 2023-03-20 17:23:49 +08:00 committed by ob-robot
parent ec988d25b2
commit c6bbafe473
13 changed files with 155 additions and 43 deletions

View File

@ -73,7 +73,7 @@ int ObCreateSavePointExecutor::execute(ObExecContext &ctx,
if (OB_ISNULL(session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid param", K(ret), K(session));
} else if (OB_FAIL(ObSqlTransControl::create_savepoint(ctx, stmt.get_sp_name()))) {
} else if (OB_FAIL(ObSqlTransControl::create_savepoint(ctx, stmt.get_sp_name(), true))) {
LOG_WARN("fail create savepoint", K(ret), K(stmt.get_sp_name()));
} else if (!session->has_explicit_start_trans()) {
if (OB_FAIL(session->get_autocommit(ac))) {

View File

@ -722,7 +722,8 @@ int ObSqlTransControl::stmt_setup_savepoint_(ObSQLSessionInfo *session,
K(session->get_txn_free_route_ctx()), KPC(session)); \
}
int ObSqlTransControl::create_savepoint(ObExecContext &exec_ctx,
const ObString &sp_name)
const ObString &sp_name,
const bool user_create)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
@ -734,7 +735,7 @@ int ObSqlTransControl::create_savepoint(ObExecContext &exec_ctx,
OZ (acquire_tx_if_need_(txs, *session));
bool start_hook = false;
OZ(start_hook_if_need_(*session, txs, start_hook));
OZ (txs->create_explicit_savepoint(*session->get_tx_desc(), sp_name, get_real_session_id(*session)), sp_name);
OZ (txs->create_explicit_savepoint(*session->get_tx_desc(), sp_name, get_real_session_id(*session), user_create), sp_name);
if (start_hook) {
int tmp_ret = txs->sql_stmt_end_hook(session->get_xid(), *session->get_tx_desc());
if (OB_SUCCESS != tmp_ret) {

View File

@ -220,7 +220,7 @@ public:
using namespace oceanbase::transaction;
return kill_tx(session, static_cast<int>(ObTxAbortCause::SESSION_DISCONNECT));
}
static int create_savepoint(ObExecContext &exec_ctx, const common::ObString &sp_name);
static int create_savepoint(ObExecContext &exec_ctx, const common::ObString &sp_name, const bool user_create = false);
static int rollback_savepoint(ObExecContext &exec_ctx, const common::ObString &sp_name);
static int release_savepoint(ObExecContext &exec_ctx, const common::ObString &sp_name);
static int xa_rollback_all_changes(ObExecContext &exec_ctx);

View File

@ -54,7 +54,7 @@ ObTxIsolationLevel tx_isolation_from_str(const ObString &s)
}
ObTxSavePoint::ObTxSavePoint()
: type_(T::INVL), scn_(0), session_id_(0), name_() {}
: type_(T::INVL), scn_(0), session_id_(0), user_create_(false), name_() {}
ObTxSavePoint::ObTxSavePoint(const ObTxSavePoint &a)
{
@ -65,12 +65,12 @@ ObTxSavePoint &ObTxSavePoint::operator=(const ObTxSavePoint &a)
{
type_ = a.type_;
scn_ = a.scn_;
session_id_ = a.session_id_;
switch(type_) {
case T::SAVEPOINT:
case T::STASH: {
name_ = a.name_;
session_id_ = a.session_id_;
user_create_ = a.user_create_;
break;
}
case T::SNAPSHOT: snapshot_ = a.snapshot_; break;
@ -79,6 +79,23 @@ ObTxSavePoint &ObTxSavePoint::operator=(const ObTxSavePoint &a)
return *this;
}
bool ObTxSavePoint::operator==(const ObTxSavePoint &a) const
{
bool is_equal = false;
if (type_ == a.type_ && scn_== a.scn_) {
switch(type_) {
case T::SAVEPOINT:
case T::STASH: {
is_equal = name_ == a.name_ && session_id_ == a.session_id_ && user_create_ == a.user_create_;
break;
}
case T::SNAPSHOT: is_equal = snapshot_ == a.snapshot_; break;
default: break;
}
}
return is_equal;
}
ObTxSavePoint::~ObTxSavePoint()
{
release();
@ -90,6 +107,7 @@ void ObTxSavePoint::release()
snapshot_ = NULL;
scn_ = 0;
session_id_ = 0;
user_create_ = false;
}
void ObTxSavePoint::rollback()
@ -107,7 +125,7 @@ void ObTxSavePoint::init(ObTxReadSnapshot *snapshot)
scn_ = snapshot->core_.scn_;
}
int ObTxSavePoint::init(int64_t scn, const ObString &name, const uint32_t session_id, const bool stash)
int ObTxSavePoint::init(int64_t scn, const ObString &name, const uint32_t session_id, const bool user_create, const bool stash)
{
int ret = OB_SUCCESS;
if (OB_FAIL(name_.assign(name))) {
@ -120,6 +138,7 @@ int ObTxSavePoint::init(int64_t scn, const ObString &name, const uint32_t sessio
type_ = stash ? T::STASH : T::SAVEPOINT;
scn_ = scn;
session_id_ = session_id;
user_create_ = user_create;
}
return ret;
}
@ -137,6 +156,7 @@ DEF_TO_STRING(ObTxSavePoint)
J_COMMA();
J_KV(K_(scn));
J_KV(K_(session_id));
J_KV(K_(user_create));
J_OBJ_END();
return pos;
}
@ -169,6 +189,12 @@ OB_SERIALIZE_MEMBER(ObTxParam,
access_mode_,
isolation_,
cluster_id_);
OB_SERIALIZE_MEMBER(ObTxSavePoint,
type_,
scn_,
session_id_,
user_create_,
name_);
OB_SERIALIZE_MEMBER(ObTxInfo,
tenant_id_,
@ -187,12 +213,14 @@ OB_SERIALIZE_MEMBER(ObTxInfo,
expire_ts_,
active_scn_,
parts_,
session_id_);
session_id_,
savepoints_);
OB_SERIALIZE_MEMBER(ObTxStmtInfo,
tx_id_,
op_sn_,
parts_,
state_);
state_,
savepoints_);
int ObTxDesc::trans_deep_copy(const ObTxDesc &x)
{

View File

@ -261,6 +261,7 @@ private:
and the session id is required to distinguish the
savepoint for the multi-branch scenario of xa */
uint32_t session_id_;
bool user_create_;
union {
ObTxReadSnapshot *snapshot_;
common::ObFixedLengthString<128> name_;
@ -270,14 +271,20 @@ public:
~ObTxSavePoint();
ObTxSavePoint(const ObTxSavePoint &s);
ObTxSavePoint &operator=(const ObTxSavePoint &a);
bool operator==(const ObTxSavePoint &a) const;
void release();
void rollback();
int init(const int64_t scn, const ObString &name, const uint32_t session_id, const bool stash = false);
int init(const int64_t scn,
const ObString &name,
const uint32_t session_id,
const bool user_create,
const bool stash = false);
void init(ObTxReadSnapshot *snapshot);
bool is_savepoint() const { return type_ == T::SAVEPOINT || type_ == T::STASH; }
bool is_snapshot() const { return type_ == T::SNAPSHOT; }
bool is_stash() const { return type_ == T::STASH; }
DECLARE_TO_STRING;
OB_UNIS_VERSION(1);
};
typedef ObSEArray<ObTxSavePoint, 4> ObTxSavePointList;
@ -741,6 +748,7 @@ private:
class ObTxInfo
{
friend class ObTransService;
friend class ObXACtx;
OB_UNIS_VERSION(1);
protected:
uint64_t tenant_id_;
@ -761,6 +769,7 @@ protected:
int64_t active_scn_;
ObTxPartList parts_;
uint32_t session_id_ = 0;
ObTxSavePointList savepoints_;
public:
TO_STRING_KV(K_(tenant_id),
K_(session_id),
@ -776,7 +785,8 @@ public:
K_(expire_ts),
K_(parts),
K_(cluster_id),
K_(cluster_version));
K_(cluster_version),
K_(savepoints));
// TODO xa
bool is_valid() const { return tx_id_.is_valid(); }
const ObTransID &tid() const { return tx_id_; }
@ -785,17 +795,20 @@ public:
class ObTxStmtInfo
{
friend class ObTransService;
friend class ObXACtx;
OB_UNIS_VERSION(1);
protected:
ObTransID tx_id_;
uint64_t op_sn_;
ObTxPartList parts_;
ObTxDesc::State state_;
ObTxSavePointList savepoints_;
public:
TO_STRING_KV(K_(tx_id),
K_(op_sn),
K_(parts),
K_(state));
K_(state),
K_(savepoints));
// TODO xa
bool is_valid() const { return tx_id_.is_valid(); }
const ObTransID &tid() const { return tx_id_; }

View File

@ -2352,6 +2352,10 @@ int ObTransService::recover_tx(const ObTxInfo &tx_info, ObTxDesc *&tx)
tx_desc_mgr_.revert(*tx);
tx = NULL;
TRANS_LOG(WARN, "assgin parts fail", K(ret));
} else if (tx->savepoints_.assign(tx_info.savepoints_)) {
tx_desc_mgr_.revert(*tx);
tx = NULL;
TRANS_LOG(WARN, "assgin savepoints fail", K(ret));
} else if (OB_FAIL(tx_desc_mgr_.add_with_txid(tx_info.tx_id_, *tx))) {
tx_desc_mgr_.revert(*tx);
tx = NULL;
@ -2387,6 +2391,8 @@ int ObTransService::get_tx_info(ObTxDesc &tx, ObTxInfo &tx_info)
tx.lock_.lock();
if (OB_FAIL(tx_info.parts_.assign(tx.parts_))) {
TRANS_LOG(WARN, "assgin parts fail", K(ret), K(tx));
} else if (OB_FAIL(assign_user_savepoint_(tx, tx_info.savepoints_))) {
TRANS_LOG(WARN, "assgin savepoint fail", K(ret), K(tx));
} else {
tx_info.tenant_id_ = tx.tenant_id_;
tx_info.cluster_id_ = tx.cluster_id_;
@ -2410,12 +2416,43 @@ int ObTransService::get_tx_info(ObTxDesc &tx, ObTxInfo &tx_info)
return ret;
}
int ObTransService::assign_user_savepoint_(ObTxDesc &tx, ObTxSavePointList &savepoints)
{
int ret = OB_SUCCESS;
ARRAY_FOREACH_N(tx.savepoints_, i, cnt) {
if (tx.savepoints_.at(i).user_create_) {
if (OB_FAIL(savepoints.push_back(tx.savepoints_.at(i)))) {
TRANS_LOG(WARN, "push back user create sp fail", K(ret), K(tx));
}
}
}
return ret;
}
int ObTransService::update_user_savepoint(ObTxDesc &tx, const ObTxSavePointList &savepoints)
{
int ret = OB_SUCCESS;
int j = 0;
bool is_contain = false;
ARRAY_FOREACH_N(savepoints, i, cnt) {
for (j = 0, is_contain = false; j<tx.savepoints_.count() && !is_contain; j++) {
is_contain = savepoints.at(i) == tx.savepoints_.at(j);
}
if (!is_contain && OB_FAIL(tx.savepoints_.push_back(savepoints.at(i)))) {
TRANS_LOG(WARN, "push back user sp fail", K(ret));
}
}
return ret;
}
int ObTransService::get_tx_stmt_info(ObTxDesc &tx, ObTxStmtInfo &stmt_info)
{
int ret = OB_SUCCESS;
tx.lock_.lock();
if (OB_FAIL(stmt_info.parts_.assign(tx.parts_))) {
TRANS_LOG(WARN, "assgin parts fail", K(ret), K(tx));
} else if (OB_FAIL(assign_user_savepoint_(tx, stmt_info.savepoints_))) {
TRANS_LOG(WARN, "assgin savepoint fail", K(ret), K(tx));
} else {
stmt_info.tx_id_ = tx.tx_id_;
stmt_info.op_sn_ = tx.op_sn_;
@ -2432,6 +2469,9 @@ int ObTransService::update_tx_with_stmt_info(const ObTxStmtInfo &tx_info, ObTxDe
tx->op_sn_ = tx_info.op_sn_;
tx->state_ = tx_info.state_;
tx->update_parts_(tx_info.parts_);
if (OB_FAIL(MTL(ObTransService *)->update_user_savepoint(*tx, tx_info.savepoints_))) {
TRANS_LOG(WARN, "update user sp fail", K(ret), K(*this), K(tx), K(tx_info));
}
tx->lock_.unlock();
return ret;
}

View File

@ -140,6 +140,7 @@ int iterate_tx_scheduler_stat(ObTxSchedulerStatIterator &tx_scheduler_stat_iter)
* recover transaction descriptor with tx info
*/
int recover_tx(const ObTxInfo &tx_info, ObTxDesc *&tx);
int update_user_savepoint(ObTxDesc &tx, const ObTxSavePointList &savepoints);
int get_tx_info(ObTxDesc &tx, ObTxInfo &tx_info);
int get_tx_stmt_info(ObTxDesc &tx, ObTxStmtInfo &stmt_info);
int update_tx_with_stmt_info(const ObTxStmtInfo &tx_info, ObTxDesc *&tx);
@ -292,6 +293,7 @@ int sub_end_tx_local_ls_(const ObTransID &tx_id,
const ObXATransID &xid,
const common::ObAddr &sender_addr,
const bool is_rollback);
int assign_user_savepoint_(ObTxDesc &tx, ObTxSavePointList &savepoints);
private:
ObTxCtxMgr tx_ctx_mgr_;

View File

@ -1143,7 +1143,10 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx,
return ret;
}
int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id)
int ObTransService::create_explicit_savepoint(ObTxDesc &tx,
const ObString &savepoint,
const uint32_t session_id,
const bool user_create)
{
int ret = OB_SUCCESS;
int64_t scn = 0;
@ -1151,7 +1154,7 @@ int ObTransService::create_explicit_savepoint(ObTxDesc &tx, const ObString &save
tx.inc_op_sn();
scn = ObSequence::inc_and_get_max_seq_no();
ObTxSavePoint sp;
if (OB_SUCC(sp.init(scn, savepoint, session_id))) {
if (OB_SUCC(sp.init(scn, savepoint, session_id, user_create))) {
if (OB_FAIL(tx.savepoints_.push_back(sp))) {
TRANS_LOG(WARN, "push savepoint failed", K(ret));
} else if (!tx.tx_id_.is_valid() && OB_FAIL(tx_desc_mgr_.add(tx))) {
@ -1297,7 +1300,7 @@ int ObTransService::create_stash_savepoint(ObTxDesc &tx, const ObString &name)
tx.inc_op_sn();
auto seq_no = ObSequence::inc_and_get_max_seq_no();
ObTxSavePoint sp;
if (OB_SUCC(sp.init(seq_no, name, 0, true))) {
if (OB_SUCC(sp.init(seq_no, name, 0, false, true))) {
if (OB_FAIL(tx.savepoints_.push_back(sp))) {
TRANS_LOG(WARN, "push savepoint failed", K(ret));
}

View File

@ -334,7 +334,10 @@ int create_implicit_savepoint(ObTxDesc &tx,
* OB_ERR_TOO_LONG_IDENT - if savepoint was longer than 128 characters
* OB_ERR_TOO_MANY_SAVEPOINT - alive savepoint count out of limit (default 255)
*/
int create_explicit_savepoint(ObTxDesc &tx, const ObString &savepoint, const uint32_t session_id);
int create_explicit_savepoint(ObTxDesc &tx,
const ObString &savepoint,
const uint32_t session_id,
const bool user_create);
/**
* rollback_to_implicit_savepoint - rollback to a implicit savepoint

View File

@ -730,9 +730,9 @@ int ObXACtx::process_xa_start_tightly_(const obrpc::ObXAStartRPCRequest &req)
} else if (is_new_branch) {
++xa_branch_count_;
}
if (OB_SUCC(ret) && req.need_response()) {
if (OB_SUCC(ret)) {
SMART_VAR(ObXAStartRPCResponse, response) {
if (OB_FAIL(response.init(trans_id_, *tx_desc_))) {
if (OB_FAIL(response.init(trans_id_, *tx_desc_, req.is_first_branch()))) {
TRANS_LOG(WARN, "init xa start response failed", K(ret));
} else if (OB_FAIL(xa_rpc_->xa_start_response(tenant_id_, sender, response, NULL))) {
TRANS_LOG(WARN, "xa start response failed", K(ret));
@ -773,7 +773,7 @@ int ObXACtx::process_xa_start_loosely_(const obrpc::ObXAStartRPCRequest &req)
TRANS_LOG(WARN, "register xa trans timeout task failed", K(ret), K(xid), K(*this));
} else {
SMART_VAR(ObXAStartRPCResponse, response) {
if (OB_FAIL(response.init(trans_id_, *tx_desc_))) {
if (OB_FAIL(response.init(trans_id_, *tx_desc_, req.is_first_branch()))) {
TRANS_LOG(WARN, "init xa start response failed", K(ret));
} else if (OB_FAIL(xa_rpc_->xa_start_response(tenant_id_, sender, response, NULL))) {
TRANS_LOG(WARN, "xa start response failed", K(ret));
@ -790,9 +790,13 @@ int ObXACtx::process_xa_start_response(const obrpc::ObXAStartRPCResponse &resp)
int ret = OB_SUCCESS;
const ObTxInfo &tx_info = resp.get_tx_info();
// no need guard
if (NULL != tx_desc_) {
if (resp.is_first_branch() && NULL != tx_desc_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "trans desc is NULL", K(ret));
} else if (!resp.is_first_branch()) {
if (OB_FAIL(MTL(ObTransService *)->update_user_savepoint(*tx_desc_, tx_info.savepoints_))) {
TRANS_LOG(WARN, "update user sp fail", K(ret), K(*this), K(tx_info));
}
} else if (OB_FAIL(MTL(ObTransService *)->recover_tx(tx_info, tx_desc_))) {
TRANS_LOG(WARN, "recover tx failed", K(ret), K(*this), K(tx_info));
} else {
@ -848,6 +852,11 @@ int ObXACtx::process_xa_end(const obrpc::ObXAEndRPCRequest &req)
if (OB_FAIL(MTL(ObTransService*)->update_tx_with_stmt_info(stmt_info, tx_desc_))) {
TRANS_LOG(WARN, "update tx desc with stmt info failed", K(ret), K(req), K(*this));
}
} else {
// if tightly coupled, need to update user savepoint
if (OB_FAIL(MTL(ObTransService *)->update_user_savepoint(*tx_desc_, stmt_info.savepoints_))) {
TRANS_LOG(WARN, "update user sp fail", K(ret), K(*this), K(req));
}
}
}
if (OB_TRANS_XA_BRANCH_FAIL == ret) {
@ -1357,6 +1366,7 @@ int ObXACtx::xa_start_remote_first(const ObXATransID &xid,
} else {
// set global trans type to xa trans
tx_desc->set_global_tx_type(ObGlobalTxType::XA_TRANS);
TRANS_LOG(WARN, "xa start remote success", K(ret), K(xid), K(flags), K(*tx_desc), K(*this));
}
TRANS_LOG(INFO, "xa start remote first", K(ret), K(xid), K(flags), K(*this));
return ret;
@ -1486,7 +1496,7 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid,
int tmp_ret = OB_SUCCESS;
int result = OB_SUCCESS;
const int64_t now = ObTimeUtility::current_time();
const bool need_response = true; // need tx desc
const bool is_first_branch = true; // need tx desc
ObXAStartRPCRequest xa_start_request;
obrpc::ObXARPCCB<obrpc::OB_XA_START_REQ> cb;
ObTransCond cond;
@ -1503,7 +1513,7 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid,
is_tightly_coupled_,
timeout_seconds,
flags,
need_response))) {
is_first_branch))) {
TRANS_LOG(WARN, "init sync request failed", K(ret), K(*this));
} else if (OB_FAIL(cb.init(&cond))) {
TRANS_LOG(WARN, "init cb failed", K(ret), K(xid), K(*this));
@ -1565,12 +1575,12 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid,
int tmp_ret = OB_SUCCESS;
int result = OB_SUCCESS;
const int64_t now = ObTimeUtility::current_time();
const bool need_response = false; // no need tx desc
const bool is_first_branch = false; // no need tx desc
ObXAStartRPCRequest xa_start_request;
obrpc::ObXARPCCB<obrpc::OB_XA_START_REQ> cb;
ObTransCond cond;
const int64_t wait_time = (INT64_MAX / 2 ) - now;
// xa_sync_status_cond_.reset();
xa_sync_status_cond_.reset();
if (OB_ISNULL(xa_rpc_) || OB_ISNULL(xa_service_)) {
ret = OB_ERR_UNEXPECTED;
@ -1588,7 +1598,7 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid,
is_tightly_coupled_,
timeout_seconds,
flags,
need_response))) {
is_first_branch))) {
TRANS_LOG(WARN, "init sync request failed", K(ret), K(*this));
} else if (OB_FAIL(cb.init(&cond))) {
TRANS_LOG(WARN, "init cb failed", K(ret), K(xid), K(*this));
@ -1606,6 +1616,10 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid,
} else {
TRANS_LOG(WARN, "wait cond failed", K(ret), K(xid), K(result));
}
} else if (OB_FAIL(wait_xa_sync_status_(wait_time))) {
TRANS_LOG(WARN, "wait xa sync status failed", K(ret), K(xid));
} else {
// do nothing
}
}
@ -1819,7 +1833,7 @@ int ObXACtx::create_xa_savepoint_if_need_(const ObXATransID &xid, const uint32_t
found = true;
TRANS_LOG(INFO, "find info", K(ret), K(xid), K(info));
if (info.is_first_stmt_) {
if (OB_FAIL(MTL(transaction::ObTransService *)->create_explicit_savepoint(*tx_desc_, PL_XA_IMPLICIT_SAVEPOINT, session_id))) {
if (OB_FAIL(MTL(transaction::ObTransService *)->create_explicit_savepoint(*tx_desc_, PL_XA_IMPLICIT_SAVEPOINT, session_id, false))) {
TRANS_LOG(WARN, "create xa savepoint fail", K(ret), K(xid), K(session_id), K(*this));
} else {
TRANS_LOG(INFO, "create pl xa savepoint success", K(ret), K(xid), K(session_id), K(*this));
@ -2670,10 +2684,12 @@ int ObXACtx::xa_prepare(const ObXATransID &xid, const int64_t timeout_us)
} else if (OB_FAIL(xa_prepare_(xid, timeout_us, need_exit))) {
TRANS_LOG(WARN, "xa prepare failed", K(ret), K(xid), K(need_exit), K(*this));
if (need_exit) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_,
ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "fail to stop transaction", K(tmp_ret), K(*this));
if (OB_ERR_READ_ONLY_TRANSACTION != ret) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_,
ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "fail to stop transaction", K(tmp_ret), K(*this));
}
}
set_terminated_();
set_exiting_();

View File

@ -36,12 +36,13 @@ OB_SERIALIZE_MEMBER(ObXAStartRPCRequest,
sender_,
is_new_branch_,
is_tightly_coupled_,
need_response_,
is_first_branch_,
timeout_seconds_,
flags_);
OB_SERIALIZE_MEMBER(ObXAStartRPCResponse,
tx_id_,
tx_info_);
tx_info_,
is_first_branch_);
OB_SERIALIZE_MEMBER(ObXAEndRPCRequest,
tx_id_,
stmt_info_,
@ -162,7 +163,7 @@ int ObXAStartRPCRequest::init(const ObTransID &tx_id,
const bool is_tightly_coupled,
const int64_t timeout_seconds,
const int64_t flags,
const bool need_response)
const bool is_first_branch)
{
int ret = OB_SUCCESS;
if (!tx_id.is_valid() ||
@ -178,7 +179,7 @@ int ObXAStartRPCRequest::init(const ObTransID &tx_id,
sender_ = sender;
is_new_branch_ = is_new_branch;
is_tightly_coupled_ = is_tightly_coupled;
need_response_ = need_response; // false by default
is_first_branch_ = is_first_branch; // false by default
timeout_seconds_ = timeout_seconds;
flags_ = flags;
}
@ -214,7 +215,7 @@ int ObXAStartP::process()
if (xa_ctx->is_tightly_coupled() != is_tightly_coupled) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected tight couple flag", K(is_tightly_coupled), K(xid), K(trans_id));
} else if (arg_.need_response() && OB_FAIL(xa_ctx->wait_xa_start_complete())) {
} else if (arg_.is_first_branch() && OB_FAIL(xa_ctx->wait_xa_start_complete())) {
TRANS_LOG(WARN, "wait xa start complete failed", K(ret));
} else if (OB_FAIL(xa_ctx->process_xa_start(arg_))) {
TRANS_LOG(WARN, "xa ctx remote pull failed", K(ret), K(trans_id), K(xid));
@ -228,7 +229,8 @@ int ObXAStartP::process()
}
int ObXAStartRPCResponse::init(const ObTransID &tx_id,
ObTxDesc &tx_desc)
ObTxDesc &tx_desc,
const bool is_first_branch)
{
int ret = OB_SUCCESS;
if (!tx_desc.is_valid()) {
@ -238,6 +240,7 @@ int ObXAStartRPCResponse::init(const ObTransID &tx_id,
TRANS_LOG(WARN, "get tx info failed", KR(ret));
} else {
tx_id_ = tx_id;
is_first_branch_ = is_first_branch;
}
return ret;
}

View File

@ -66,18 +66,18 @@ public:
const bool is_tightly_coupled,
const int64_t timeout_seconds,
const int64_t flags,
const bool need_response);
const bool is_first_branch);
const transaction::ObXATransID &get_xid() const { return xid_; }
const transaction::ObTransID &get_tx_id() const { return tx_id_; }
const common::ObAddr &get_sender() const { return sender_; }
bool is_new_branch() const { return is_new_branch_; }
bool is_tightly_coupled() const { return is_tightly_coupled_; }
bool need_response() const { return need_response_; }
bool is_first_branch() const { return is_first_branch_; }
int64_t get_timeout_seconds() const { return timeout_seconds_; }
int64_t get_flags() const { return flags_; }
bool is_valid() const;
TO_STRING_KV(K_(tx_id), K_(xid), K_(sender), K_(is_new_branch), K_(is_tightly_coupled),
K_(need_response), K_(timeout_seconds), K_(flags));
K_(is_first_branch), K_(timeout_seconds), K_(flags));
private:
transaction::ObTransID tx_id_;
transaction::ObXATransID xid_;
@ -85,7 +85,7 @@ private:
bool is_new_branch_;
// for tightly-coupled xa branches
bool is_tightly_coupled_;
bool need_response_;
bool is_first_branch_;
int64_t timeout_seconds_;
int64_t flags_;
};
@ -97,14 +97,17 @@ public:
ObXAStartRPCResponse() {}
~ObXAStartRPCResponse() {}
int init(const transaction::ObTransID &tx_id,
transaction::ObTxDesc &tx_desc);
transaction::ObTxDesc &tx_desc,
const bool is_first_branch);
const transaction::ObTransID &get_tx_id() const { return tx_id_; }
const transaction::ObTxInfo &get_tx_info() const { return tx_info_; }
bool is_first_branch() const { return is_first_branch_; }
bool is_valid() const { return tx_id_.is_valid() && tx_info_.is_valid(); }
TO_STRING_KV(K_(tx_id), K_(tx_info));
TO_STRING_KV(K_(tx_id), K_(tx_info), K_(is_first_branch));
private:
transaction::ObTransID tx_id_;
transaction::ObTxInfo tx_info_;
bool is_first_branch_;
};
class ObXAEndRPCRequest

View File

@ -495,7 +495,7 @@ int MockObServer::do_handle_(ObReq &req, ObResp &resp)
tx_desc = NULL;
break;
case ObReq::T::SAVEPOINT:
ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0);
ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0, false);
break;
case ObReq::T::ROLLBACK_SAVEPOINT:
ret = tx_node_.rollback_to_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 1 * 1000 * 1000, 0);