remove todo of xa

This commit is contained in:
obdev
2023-05-19 11:17:02 +00:00
committed by ob-robot
parent 1b5b7766a8
commit 24d447659a
6 changed files with 28 additions and 198 deletions

View File

@ -640,7 +640,6 @@ int ObTransService::decide_tx_commit_info_(ObTxDesc &tx, ObTxPart *&coord)
*/ */
int ObTransService::prepare_tx_coord(ObTxDesc &tx, share::ObLSID &coord_id) int ObTransService::prepare_tx_coord(ObTxDesc &tx, share::ObLSID &coord_id)
{ {
// TODO: for xa
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
tx.lock_.lock(); tx.lock_.lock();
ObTxPart *coord = NULL; ObTxPart *coord = NULL;
@ -676,7 +675,6 @@ int ObTransService::prepare_tx(ObTxDesc &tx,
tx.commit_expire_ts_ = now + timeout_us; tx.commit_expire_ts_ = now + timeout_us;
tx.state_ = ObTxDesc::State::SUB_PREPARING; tx.state_ = ObTxDesc::State::SUB_PREPARING;
ObTxSubPrepareMsg prepare_msg; ObTxSubPrepareMsg prepare_msg;
// TODO, retry mechanism
if (OB_FAIL(tx.commit_task_.init(&tx, this))) { if (OB_FAIL(tx.commit_task_.init(&tx, this))) {
TRANS_LOG(WARN, "fail to init timeout task", K(ret), K(tx)); TRANS_LOG(WARN, "fail to init timeout task", K(ret), K(tx));
} else if (OB_FAIL(register_commit_retry_task_(tx))) { } else if (OB_FAIL(register_commit_retry_task_(tx))) {
@ -730,7 +728,6 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id,
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t now = ObClockGenerator::getClock(); int64_t now = ObClockGenerator::getClock();
// TODO, alloc tx desc from tx mgr
ObTxDesc *tx = NULL; ObTxDesc *tx = NULL;
if (OB_FAIL(tx_desc_mgr_.alloc(tx))) { if (OB_FAIL(tx_desc_mgr_.alloc(tx))) {
TRANS_LOG(WARN, "alloc tx fail", K(ret), KPC(this)); TRANS_LOG(WARN, "alloc tx fail", K(ret), KPC(this));
@ -792,9 +789,7 @@ int ObTransService::build_tx_sub_commit_msg_(const ObTxDesc &tx, ObTxSubCommitMs
msg.sender_ = share::SCHEDULER_LS; msg.sender_ = share::SCHEDULER_LS;
msg.xid_ = tx.xid_; msg.xid_ = tx.xid_;
msg.cluster_version_ = GET_MIN_CLUSTER_VERSION(); msg.cluster_version_ = GET_MIN_CLUSTER_VERSION();
// invalid
msg.cluster_id_ = GCONF.cluster_id; msg.cluster_id_ = GCONF.cluster_id;
// TODO, a special request id
msg.request_id_ = tx.op_sn_; msg.request_id_ = tx.op_sn_;
return ret; return ret;
} }
@ -809,9 +804,7 @@ int ObTransService::build_tx_sub_rollback_msg_(const ObTxDesc &tx, ObTxSubRollba
msg.sender_ = share::SCHEDULER_LS; msg.sender_ = share::SCHEDULER_LS;
msg.xid_ = tx.xid_; msg.xid_ = tx.xid_;
msg.cluster_version_ = GET_MIN_CLUSTER_VERSION(); msg.cluster_version_ = GET_MIN_CLUSTER_VERSION();
// invalid
msg.cluster_id_ = GCONF.cluster_id; msg.cluster_id_ = GCONF.cluster_id;
// TODO, a special request id
msg.request_id_ = tx.op_sn_; msg.request_id_ = tx.op_sn_;
return ret; return ret;
} }
@ -2755,12 +2748,13 @@ int ObTransService::handle_sub_prepare_result(const ObTransID &tx_id,
if (OB_FAIL(tx_desc_mgr_.get(tx_id, tx))) { if (OB_FAIL(tx_desc_mgr_.get(tx_id, tx))) {
TRANS_LOG(WARN, "cannot found tx by id", K(ret), K(tx_id), K(result)); TRANS_LOG(WARN, "cannot found tx by id", K(ret), K(tx_id), K(result));
} else { } else {
bool need_cb = false;
tx->lock_.lock(); tx->lock_.lock();
// TODO, check state if (ObTxDesc::State::IN_TERMINATE >= tx->state_) {
if (ObTxDesc::State::IN_TERMINATE > tx->state_) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected tx state", K(ret), TRANS_LOG(WARN, "unexpected tx state", K(ret),
K_(tx->state), K(tx_id), K(result), KPC(tx)); K_(tx->state), K(tx_id), K(result), KPC(tx));
tx->print_trace_();
} else if (ObTxDesc::State::SUB_PREPARED == tx->state_) { } else if (ObTxDesc::State::SUB_PREPARED == tx->state_) {
TRANS_LOG(WARN, "tx has been prepared", K_(tx->state), TRANS_LOG(WARN, "tx has been prepared", K_(tx->state),
K(tx_id), K(result), KPC(tx)); K(tx_id), K(result), KPC(tx));
@ -2772,10 +2766,11 @@ int ObTransService::handle_sub_prepare_result(const ObTransID &tx_id,
TRANS_LOG(WARN, "unexpected tx state", K_(tx->state), TRANS_LOG(WARN, "unexpected tx state", K_(tx->state),
K(tx_id), K(result), KPC(tx)); K(tx_id), K(result), KPC(tx));
} else { } else {
need_cb = true;
ret = handle_sub_prepare_result_(*tx, result); ret = handle_sub_prepare_result_(*tx, result);
} }
tx->lock_.unlock(); tx->lock_.unlock();
tx->execute_commit_cb(); if (need_cb) { tx->execute_commit_cb(); }
} }
if (OB_NOT_NULL(tx)) { if (OB_NOT_NULL(tx)) {
tx_desc_mgr_.revert(*tx); tx_desc_mgr_.revert(*tx);
@ -2956,6 +2951,7 @@ int ObTransService::handle_sub_commit_result(const ObTransID &tx_id,
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected trans desc", K(ret), K(tx_id), K(result)); TRANS_LOG(WARN, "unexpected trans desc", K(ret), K(tx_id), K(result));
} else { } else {
bool need_cb = false;
tx->lock_.lock(); tx->lock_.lock();
// TODO, check state // TODO, check state
if (ObTxDesc::State::SUB_COMMITTING != tx->state_) { if (ObTxDesc::State::SUB_COMMITTING != tx->state_) {
@ -2968,10 +2964,11 @@ int ObTransService::handle_sub_commit_result(const ObTransID &tx_id,
if (OB_TRANS_COMMITED == result) { if (OB_TRANS_COMMITED == result) {
final_result = OB_SUCCESS; final_result = OB_SUCCESS;
} }
need_cb = true;
ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result); ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result);
} }
tx->lock_.unlock(); tx->lock_.unlock();
tx->execute_commit_cb(); if (need_cb) { tx->execute_commit_cb(); }
} }
if (OB_NOT_NULL(tx)) { if (OB_NOT_NULL(tx)) {
tx_desc_mgr_.revert(*tx); tx_desc_mgr_.revert(*tx);
@ -3001,6 +2998,7 @@ int ObTransService::handle_sub_rollback_result(const ObTransID &tx_id,
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected trans desc", K(ret), K(tx_id), K(result)); TRANS_LOG(WARN, "unexpected trans desc", K(ret), K(tx_id), K(result));
} else { } else {
bool need_cb = false;
tx->lock_.lock(); tx->lock_.lock();
// TODO, check state // TODO, check state
if (ObTxDesc::State::SUB_ROLLBACKING != tx->state_) { if (ObTxDesc::State::SUB_ROLLBACKING != tx->state_) {
@ -3013,10 +3011,11 @@ int ObTransService::handle_sub_rollback_result(const ObTransID &tx_id,
if (OB_TRANS_KILLED == result) { if (OB_TRANS_KILLED == result) {
final_result = OB_SUCCESS; final_result = OB_SUCCESS;
} }
need_cb = true;
ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result); ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result);
} }
tx->lock_.unlock(); tx->lock_.unlock();
tx->execute_commit_cb(); if (need_cb) { tx->execute_commit_cb(); }
} }
if (OB_NOT_NULL(tx)) { if (OB_NOT_NULL(tx)) {
tx_desc_mgr_.revert(*tx); tx_desc_mgr_.revert(*tx);

View File

@ -164,9 +164,11 @@ int ObXACtx::handle_timeout(const int64_t delay)
} else if (is_terminated_) { } else if (is_terminated_) {
ret = OB_TRANS_IS_EXITING; ret = OB_TRANS_IS_EXITING;
TRANS_LOG(WARN, "xa trans has terminated", K(ret)); TRANS_LOG(WARN, "xa trans has terminated", K(ret));
} else if (ObXATransState::has_submitted(xa_trans_state_)) { } else if (ObXATransState::has_submitted(xa_trans_state_) && !is_xa_one_phase_) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "xa trans has entered commit phase, unexpected", K(ret), K(*this)); TRANS_LOG(WARN, "xa trans has entered commit phase, unexpected", K(ret), K(*this));
} else if (ObXATransState::has_submitted(xa_trans_state_)) {
// do nothing
} else { } else {
timeout_task_.set_running(true); timeout_task_.set_running(true);
if (get_original_sche_addr() == GCONF.self_addr_) { if (get_original_sche_addr() == GCONF.self_addr_) {
@ -237,7 +239,6 @@ int ObXACtx::check_terminated_() const
return ret; return ret;
} }
//TODO, verify
int ObXACtx::is_one_phase_end_trans_allowed_(const ObXATransID &xid, const bool is_rollback) int ObXACtx::is_one_phase_end_trans_allowed_(const ObXATransID &xid, const bool is_rollback)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -261,7 +262,6 @@ int ObXACtx::is_one_phase_end_trans_allowed_(const ObXATransID &xid, const bool
if (info.xid_.all_equal_to(xid)) { if (info.xid_.all_equal_to(xid)) {
if (ObXATransState::PREPARING == info.state_) { if (ObXATransState::PREPARING == info.state_) {
//in preparing state, the previous xa prepare req is still processing, need retry //in preparing state, the previous xa prepare req is still processing, need retry
//TODO, VERIFY prepared state
ret = OB_EAGAIN; ret = OB_EAGAIN;
} else if (ObXATransState::PREPARED == info.state_) { } else if (ObXATransState::PREPARED == info.state_) {
ret = OB_TRANS_ROLLBACKED; ret = OB_TRANS_ROLLBACKED;
@ -872,27 +872,6 @@ int ObXACtx::process_xa_end(const obrpc::ObXAEndRPCRequest &req)
return ret; return ret;
} }
int ObXACtx::tx_desc_copy_(const ObTxDesc &from_trans_desc, ObTxDesc &to_trans_desc)
{
// todo tingshan
int ret = OB_SUCCESS;
UNUSED(from_trans_desc);
UNUSED(to_trans_desc);
// if (FALSE_IT(to_trans_desc.set_max_sql_no(from_trans_desc.get_sql_no()))) {
// } else if (FALSE_IT(to_trans_desc.set_stmt_min_sql_no(from_trans_desc.get_stmt_min_sql_no()))) {
// } else if (FALSE_IT(to_trans_desc.get_trans_param() = from_trans_desc.get_trans_param())) {
// } else if (OB_FAIL(to_trans_desc.merge_participants(from_trans_desc.get_participants()))) {
// TRANS_LOG(WARN, "merge participants failed", K(ret), K(from_trans_desc));
// } else if (OB_FAIL(to_trans_desc.merge_participants_pla(from_trans_desc.get_participants_pla()))) {
// TRANS_LOG(WARN, "merge participants pla failed", K(ret), K(from_trans_desc));
// } /*else if (OB_FAIL(to_trans_desc.set_trans_consistency_type(
// from_trans_desc.get_trans_param().get_consistency_type()))) {
// TRANS_LOG(WARN, "set consistency type failed", K(ret), K(from_trans_desc));
// }*/
return ret;
}
// handle start stmt request in original scheduler // handle start stmt request in original scheduler
// @param [in] req // @param [in] req
int ObXACtx::process_start_stmt(const obrpc::ObXAStartStmtRPCRequest &req) int ObXACtx::process_start_stmt(const obrpc::ObXAStartStmtRPCRequest &req)
@ -934,7 +913,6 @@ int ObXACtx::process_start_stmt(const obrpc::ObXAStartStmtRPCRequest &req)
executing_xid_ = xid; executing_xid_ = xid;
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
// TODO, verify it
stmt_unlock_(xid); stmt_unlock_(xid);
} }
} }
@ -1006,7 +984,6 @@ int ObXACtx::process_end_stmt(const obrpc::ObXAEndStmtRPCRequest &req)
// which should be considered successful at this time // which should be considered successful at this time
} else { } else {
const ObTxStmtInfo &stmt_info = req.get_stmt_info(); const ObTxStmtInfo &stmt_info = req.get_stmt_info();
// TODO, remove duplicate
if (OB_FAIL(MTL(ObTransService*)->update_tx_with_stmt_info(stmt_info, tx_desc_))) { if (OB_FAIL(MTL(ObTransService*)->update_tx_with_stmt_info(stmt_info, tx_desc_))) {
TRANS_LOG(WARN, "update tx desc with stmt info failed", K(ret), K(xid), K(*this)); TRANS_LOG(WARN, "update tx desc with stmt info failed", K(ret), K(xid), K(*this));
} else if (OB_FAIL(update_xa_branch_hb_info_(xid))) { } else if (OB_FAIL(update_xa_branch_hb_info_(xid))) {
@ -1532,7 +1509,6 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid,
TRANS_LOG(WARN, "post xa start request failed", K(ret), K(xid), K(*this)); TRANS_LOG(WARN, "post xa start request failed", K(ret), K(xid), K(*this));
} else { } else {
if (OB_FAIL(cond.wait(wait_time, result)) || OB_FAIL(result)) { if (OB_FAIL(cond.wait(wait_time, result)) || OB_FAIL(result)) {
// TODO, check loose couple mode but OB_TRANS_CTX_NOT_EXIST, check OB_TIMEOUT
if (is_tightly_coupled_ && (OB_TRANS_XA_BRANCH_FAIL == ret || OB_TRANS_CTX_NOT_EXIST == ret)) { if (is_tightly_coupled_ && (OB_TRANS_XA_BRANCH_FAIL == ret || OB_TRANS_CTX_NOT_EXIST == ret)) {
TRANS_LOG(INFO, "xa trans has terminated", K(ret), K(xid), K(result)); TRANS_LOG(INFO, "xa trans has terminated", K(ret), K(xid), K(result));
//rewrite code //rewrite code
@ -1549,8 +1525,6 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid,
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
// OB_TRANS_XA_BRANCH_FAIL,
// TODO, check whether need to delete inner table record
if (OB_TRANS_XA_BRANCH_FAIL == ret) { if (OB_TRANS_XA_BRANCH_FAIL == ret) {
set_terminated_(); set_terminated_();
} }
@ -2774,9 +2748,7 @@ int ObXACtx::xa_prepare_(const ObXATransID &xid, const int64_t timeout_us, bool
uint64_t data_version = 0; uint64_t data_version = 0;
if (OB_FAIL(MTL(ObTransService*)->prepare_tx_coord(*tx_desc_, coord))) { if (OB_FAIL(MTL(ObTransService*)->prepare_tx_coord(*tx_desc_, coord))) {
if (OB_ERR_READ_ONLY_TRANSACTION == ret) { if (OB_ERR_READ_ONLY_TRANSACTION == ret) {
TRANS_LOG(WARN, "fail to prepare tx coord", K(ret), K(*this)); TRANS_LOG(INFO, "xa is read only", K(ret), K(*this));
// TODO, no need rewrite ret
// ret = OB_TRANS_XA_RDONLY;
} else { } else {
TRANS_LOG(WARN, "fail to prepare tx coord", K(ret), K(*this)); TRANS_LOG(WARN, "fail to prepare tx coord", K(ret), K(*this));
} }
@ -2793,7 +2765,6 @@ int ObXACtx::xa_prepare_(const ObXATransID &xid, const int64_t timeout_us, bool
insert_xa_pending_record(tenant_id_, xid, trans_id_, coord, original_sche_addr_))) { insert_xa_pending_record(tenant_id_, xid, trans_id_, coord, original_sche_addr_))) {
TRANS_LOG(WARN, "fail to insert xa trans record", K(ret), K(xid), K(coord), K(*this)); TRANS_LOG(WARN, "fail to insert xa trans record", K(ret), K(xid), K(coord), K(*this));
} else if (OB_FAIL(drive_prepare_(xid, timeout_us))) { } else if (OB_FAIL(drive_prepare_(xid, timeout_us))) {
// TODO, sche ctx should provide interfaces to drive xa prepare,
TRANS_LOG(WARN, "drive prepare failed", K(ret), K(*this)); TRANS_LOG(WARN, "drive prepare failed", K(ret), K(*this));
} }
// if the last branch fails, need exit // if the last branch fails, need exit
@ -2855,7 +2826,6 @@ int ObXACtx::wait_xa_prepare(const ObXATransID &xid, const int64_t timeout_us)
} }
} }
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
// TODO, handle result
if (OB_SUCC(ret) || OB_ERR_READ_ONLY_TRANSACTION == ret) { if (OB_SUCC(ret) || OB_ERR_READ_ONLY_TRANSACTION == ret) {
xa_trans_state_ = ObXATransState::PREPARED; xa_trans_state_ = ObXATransState::PREPARED;
} }
@ -3043,7 +3013,6 @@ int ObXACtx::check_trans_state_(const bool is_rollback,
if (request_id_ == request_id) { if (request_id_ == request_id) {
ret = OB_EAGAIN; ret = OB_EAGAIN;
} else { } else {
// todo lixinze:error or not
if (is_xa_one_phase_) { if (is_xa_one_phase_) {
ret = OB_TRANS_ROLLBACKED; ret = OB_TRANS_ROLLBACKED;
} else { } else {

View File

@ -193,7 +193,6 @@ private:
// for 4.0 // for 4.0
int process_xa_start_tightly_(const obrpc::ObXAStartRPCRequest &req); int process_xa_start_tightly_(const obrpc::ObXAStartRPCRequest &req);
int process_xa_start_loosely_(const obrpc::ObXAStartRPCRequest &req); int process_xa_start_loosely_(const obrpc::ObXAStartRPCRequest &req);
int tx_desc_copy_(const ObTxDesc &from_trans_desc, ObTxDesc &to_trans_desc);
int xa_start_(const ObXATransID &xid, int xa_start_(const ObXATransID &xid,
const int64_t flags, const int64_t flags,
const int64_t timeout_seconds, const int64_t timeout_seconds,

View File

@ -61,8 +61,10 @@ public:
{ {
return COMMITTING == state return COMMITTING == state
|| ROLLBACKING == state || ROLLBACKING == state
|| PREPARING == state
|| ROLLBACKED == state || ROLLBACKED == state
|| COMMITTED == state; || COMMITTED == state
|| PREPARED == state;
} }
static bool can_convert(const int32_t src_state, const int32_t dst_state); static bool can_convert(const int32_t src_state, const int32_t dst_state);
static const char* to_string(int32_t state) { static const char* to_string(int32_t state) {

View File

@ -195,12 +195,10 @@ int ObXAService::local_one_phase_xa_commit_(const ObXATransID &xid,
bool alloc = false; bool alloc = false;
share::ObLSID coordinator; share::ObLSID coordinator;
int64_t end_flag = 0; int64_t end_flag = 0;
int64_t state = ObXATransState::NON_EXISTING;
// todo lixinze:修改内部表获取sql
ObTransID moke_tx_id; ObTransID moke_tx_id;
if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(trans_id, alloc, xa_ctx))) { if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(trans_id, alloc, xa_ctx))) {
if (OB_FAIL(query_xa_coord_from_tableone(MTL_ID(), xid, coordinator, moke_tx_id, state, end_flag))) { if (OB_FAIL(query_xa_coord_from_tableone(MTL_ID(), xid, coordinator, moke_tx_id, end_flag))) {
if (OB_ITER_END == ret) { if (OB_ITER_END == ret) {
ret = OB_TRANS_XA_NOTA; ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "xid is not valid", K(ret), K(xid)); TRANS_LOG(WARN, "xid is not valid", K(ret), K(xid));
@ -257,8 +255,8 @@ int ObXAService::revert_xa_ctx(ObXACtx *xa_ctx)
#define INSERT_XA_STANDBY_TRANS_SQL "\ #define INSERT_XA_STANDBY_TRANS_SQL "\
insert into %s (tenant_id, gtrid, bqual, format_id, \ insert into %s (tenant_id, gtrid, bqual, format_id, \
trans_id, coordinator, scheduler_ip, scheduler_port, state, flag) \ trans_id, coordinator, scheduler_ip, scheduler_port, flag) \
values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %d, %ld)" values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %ld)"
void ObXAService::insert_record_for_standby(const uint64_t tenant_id, void ObXAService::insert_record_for_standby(const uint64_t tenant_id,
const ObXATransID &xid, const ObXATransID &xid,
@ -300,7 +298,7 @@ void ObXAService::insert_record_for_standby(const uint64_t tenant_id,
trans_id.get_id(), trans_id.get_id(),
coordinator.id(), coordinator.id(),
scheduler_ip_buf, sche_addr.get_port(), scheduler_ip_buf, sche_addr.get_port(),
ObXATransState::ACTIVE, (long)0))) { (long)0))) {
TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql)); TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql));
} else if (OB_FAIL(mysql_proxy->write(exec_tenant_id, sql.ptr(), affected_rows))) { } else if (OB_FAIL(mysql_proxy->write(exec_tenant_id, sql.ptr(), affected_rows))) {
TRANS_LOG(WARN, "execute insert record sql failed", KR(ret), K(exec_tenant_id), K(tenant_id)); TRANS_LOG(WARN, "execute insert record sql failed", KR(ret), K(exec_tenant_id), K(tenant_id));
@ -491,71 +489,6 @@ int ObXAService::insert_xa_record(ObISQLClient &client,
return ret; return ret;
} }
#define QUERY_XA_STATE_FLAG_SQL "\
SELECT state, flag FROM %s WHERE \
tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld"
int ObXAService::query_xa_state_and_flag(const uint64_t tenant_id,
const ObXATransID &xid,
int64_t &state,
int64_t &end_flag)
{
int ret = OB_SUCCESS;
ObMySQLProxy *mysql_proxy = NULL;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
char gtrid_str[128] = {0};
int64_t gtrid_len = 0;
char bqual_str[128] = {0};
int64_t bqual_len = 0;
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT);
if (!is_valid_tenant_id(tenant_id) || xid.empty()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(xid));
} else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) {
} else if (OB_ISNULL(mysql_proxy)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy));
} else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(),
xid.get_gtrid_str().length(),
gtrid_str, 128, gtrid_len))) {
TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(tenant_id), K(xid));
} else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(),
xid.get_bqual_str().length(),
bqual_str, 128, bqual_len))) {
TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(tenant_id), K(xid));
} else if (OB_FAIL(sql.assign_fmt(QUERY_XA_STATE_FLAG_SQL,
OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME,
tenant_id,
(int)gtrid_len, gtrid_str,
(int)bqual_len, bqual_str,
xid.get_format_id()))) {
TRANS_LOG(WARN, "generate query xa state flag fail", K(ret));
} else if (OB_FAIL(mysql_proxy->read(res, exec_tenant_id, sql.ptr()))) {
TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(exec_tenant_id), K(tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql));
}
} else {
EXTRACT_INT_FIELD_MYSQL(*result, "state", state, int64_t);
EXTRACT_INT_FIELD_MYSQL(*result, "flag", end_flag, int64_t);
}
THIS_WORKER.set_timeout_ts(original_timeout_us);
}
return ret;
}
#define DELETE_XA_TRANS_SQL "delete from %s where \ #define DELETE_XA_TRANS_SQL "delete from %s where \
tenant_id = %lu and gtrid = x'%.*s' and bqual = x'%.*s' and format_id = %ld" tenant_id = %lu and gtrid = x'%.*s' and bqual = x'%.*s' and format_id = %ld"
@ -1582,11 +1515,7 @@ int ObXAService::one_phase_xa_commit_(const ObXATransID &xid,
} else if (coordinator.is_valid()) { } else if (coordinator.is_valid()) {
ret = OB_TRANS_XA_PROTO; ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "xa has entered the commit phase", K(ret), K(tx_id), K(xid), K(coordinator)); TRANS_LOG(WARN, "xa has entered the commit phase", K(ret), K(tx_id), K(xid), K(coordinator));
} /*else if (ObXATransState::IDLE != state) { } else if (sche_addr == GCTX.self_addr()) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "xa trans one phase commit invoked in improper state",
K(ret), K(state), K(xid), K(tx_id));
}*/ else if (sche_addr == GCTX.self_addr()) {
if (OB_FAIL(local_one_phase_xa_commit_(xid, tx_id, timeout_us, request_id, has_tx_level_temp_table))) { if (OB_FAIL(local_one_phase_xa_commit_(xid, tx_id, timeout_us, request_id, has_tx_level_temp_table))) {
TRANS_LOG(WARN, "local one phase commit failed", K(ret), K(tx_id), K(xid)); TRANS_LOG(WARN, "local one phase commit failed", K(ret), K(tx_id), K(xid));
} }
@ -1600,7 +1529,6 @@ int ObXAService::one_phase_xa_commit_(const ObXATransID &xid,
// xa_proto is returned when in tightly couple mode, there are // xa_proto is returned when in tightly couple mode, there are
// still multiple branches and one phase commit is triggered. // still multiple branches and one phase commit is triggered.
// Under such condition, oracle would not delete inner table record // Under such condition, oracle would not delete inner table record
// TODO, maybe should use OB_SUCC(ret)???
if (OB_TRANS_XA_PROTO != ret) { if (OB_TRANS_XA_PROTO != ret) {
const bool is_tightly = !ObXAFlag::contain_loosely(end_flag); const bool is_tightly = !ObXAFlag::contain_loosely(end_flag);
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
@ -1736,7 +1664,6 @@ int ObXAService::xa_rollback_local(const ObXATransID &xid,
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t end_flag = 0; int64_t end_flag = 0;
int64_t state = ObXATransState::NON_EXISTING;
share::ObLSID coordinator; share::ObLSID coordinator;
if (OB_UNLIKELY(!xid.is_valid()) if (OB_UNLIKELY(!xid.is_valid())
@ -1842,7 +1769,6 @@ int ObXAService::one_phase_xa_rollback_(const ObXATransID &xid,
bool alloc = false; bool alloc = false;
ObXACtx *xa_ctx = NULL; ObXACtx *xa_ctx = NULL;
share::ObLSID coordinator; share::ObLSID coordinator;
int64_t state = ObXATransState::NON_EXISTING;
// mock tmp_tx_id for query global transaction second time // mock tmp_tx_id for query global transaction second time
ObTransID tmp_tx_id; ObTransID tmp_tx_id;
@ -1852,7 +1778,6 @@ int ObXAService::one_phase_xa_rollback_(const ObXATransID &xid,
xid, xid,
coordinator, coordinator,
tmp_tx_id, tmp_tx_id,
state,
end_flag))) { end_flag))) {
if (OB_ITER_END == tmp_ret) { if (OB_ITER_END == tmp_ret) {
ret = OB_SUCCESS; ret = OB_SUCCESS;
@ -2184,12 +2109,10 @@ int ObXAService::xa_prepare(const ObXATransID &xid,
} }
} }
// TODO,
// 1. tightly coupled, OB_ERR_READ_ONLY_TRANSACTION, delete lock and record // 1. tightly coupled, OB_ERR_READ_ONLY_TRANSACTION, delete lock and record
// 2. tightly coupled, OB_TRANS_XA_RDONLY, delete record only // 2. tightly coupled, OB_TRANS_XA_RDONLY, delete record only
// 3. loosely coupled, OB_ERR_READ_ONLY_TRANSACTION, delete record // 3. loosely coupled, OB_ERR_READ_ONLY_TRANSACTION, delete record
if (OB_TRANS_XA_RDONLY == ret) { if (OB_TRANS_XA_RDONLY == ret) {
// TODO, verify delete_xa_record_state and ret(tmp_ret)
if (OB_SUCCESS != (tmp_ret = delete_xa_record(tenant_id, xid))) { if (OB_SUCCESS != (tmp_ret = delete_xa_record(tenant_id, xid))) {
TRANS_LOG(WARN, "delete xa record failed", K(tmp_ret), K(tenant_id), K(xid)); TRANS_LOG(WARN, "delete xa record failed", K(tmp_ret), K(tenant_id), K(xid));
} }
@ -2350,54 +2273,6 @@ int ObXAService::remote_xa_prepare_(const ObXATransID &xid,
return ret; return ret;
} }
#define QUERY_XA_COORDINATOR_WITH_TRANS_ID_SQL "\
SELECT coordinator FROM %s \
WHERE trans_id = %ld AND tenant_id = %lu"
int ObXAService::query_xa_coordinator_with_trans_id(const uint64_t tenant_id,
const ObTransID &trans_id,
share::ObLSID &coordinator)
{
int ret = OB_SUCCESS;
ObMySQLProxy *mysql_proxy = NULL;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT);
if (!is_valid_tenant_id(tenant_id) || trans_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(trans_id));
} else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) {
} else if (OB_ISNULL(mysql_proxy)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy));
} else if (OB_FAIL(sql.assign_fmt(QUERY_XA_COORDINATOR_WITH_TRANS_ID_SQL,
OB_ALL_PENDING_TRANSACTION_TNAME,
trans_id.get_id(),
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) {
TRANS_LOG(WARN, "generate query coordinator sql fail", K(ret));
} else if (OB_FAIL(mysql_proxy->read(res, tenant_id, sql.ptr()))) {
TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql));
}
} else {
int64_t id = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "coordinator", id, int64_t);
coordinator = ObLSID(id);
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "fail to extract field from result", K(ret));
}
}
THIS_WORKER.set_timeout_ts(original_timeout_us);
}
return ret;
}
#define DELETE_XA_PENDING_RECORD_SQL "delete from %s where \ #define DELETE_XA_PENDING_RECORD_SQL "delete from %s where \
tenant_id = %lu and trans_id = %ld" tenant_id = %lu and trans_id = %ld"
// delete record from pending trans (table two) // delete record from pending trans (table two)
@ -2510,8 +2385,8 @@ int ObXAService::query_xa_coordinator_with_xid(const uint64_t tenant_id,
// for __all_pending_transaction // for __all_pending_transaction
#define INSERT_XA_PENDING_RECORD_SQL "\ #define INSERT_XA_PENDING_RECORD_SQL "\
insert into %s (tenant_id, gtrid, bqual, format_id, \ insert into %s (tenant_id, gtrid, bqual, format_id, \
trans_id, coordinator, scheduler_ip, scheduler_port, state) \ trans_id, coordinator, scheduler_ip, scheduler_port) \
values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %d)" values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d)"
int ObXAService::insert_xa_pending_record(const uint64_t tenant_id, int ObXAService::insert_xa_pending_record(const uint64_t tenant_id,
const ObXATransID &xid, const ObXATransID &xid,
@ -2555,8 +2430,7 @@ int ObXAService::insert_xa_pending_record(const uint64_t tenant_id,
(int)bqual_len, bqual_str, (int)bqual_len, bqual_str,
xid.get_format_id(), xid.get_format_id(),
tx_id.get_id(), coordinator.id(), tx_id.get_id(), coordinator.id(),
scheduler_ip_buf, sche_addr.get_port(), scheduler_ip_buf, sche_addr.get_port()))) {
ObXATransState::PREPARING))) {
TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql)); TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql));
} else if (OB_FAIL(mysql_proxy->write(tenant_id, sql.ptr(), affected_rows))) { } else if (OB_FAIL(mysql_proxy->write(tenant_id, sql.ptr(), affected_rows))) {
TRANS_LOG(WARN, "execute insert xa trans sql fail", TRANS_LOG(WARN, "execute insert xa trans sql fail",
@ -2578,7 +2452,7 @@ int ObXAService::insert_xa_pending_record(const uint64_t tenant_id,
} }
#define QUERY_XA_COORDINATOR_TRANSID_SQL "\ #define QUERY_XA_COORDINATOR_TRANSID_SQL "\
SELECT coordinator, trans_id, state, flag FROM %s WHERE \ SELECT coordinator, trans_id, flag FROM %s WHERE \
tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld" tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld"
// query coord from global transaction (table one) // query coord from global transaction (table one)
@ -2586,7 +2460,6 @@ int ObXAService::query_xa_coord_from_tableone(const uint64_t tenant_id,
const ObXATransID &xid, const ObXATransID &xid,
share::ObLSID &coordinator, share::ObLSID &coordinator,
ObTransID &trans_id, ObTransID &trans_id,
int64_t &state,
int64_t &end_flag) int64_t &end_flag)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -2642,7 +2515,6 @@ int ObXAService::query_xa_coord_from_tableone(const uint64_t tenant_id,
int64_t tx_id_value = 0; int64_t tx_id_value = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t); EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t);
EXTRACT_INT_FIELD_MYSQL(*result, "state", state, int64_t);
EXTRACT_INT_FIELD_MYSQL(*result, "flag", end_flag, int64_t); EXTRACT_INT_FIELD_MYSQL(*result, "flag", end_flag, int64_t);
trans_id = ObTransID(tx_id_value); trans_id = ObTransID(tx_id_value);
@ -2852,12 +2724,10 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid,
ObTransID tx_id; ObTransID tx_id;
bool record_in_tableone = true; bool record_in_tableone = true;
int64_t end_flag = 0; int64_t end_flag = 0;
int64_t state = ObXATransState::NON_EXISTING;
// only used for constructor // only used for constructor
bool is_tightly_coupled = true; bool is_tightly_coupled = true;
if (OB_FAIL(query_xa_coord_from_tableone(tenant_id, xid, coordinator, tx_id, if (OB_FAIL(query_xa_coord_from_tableone(tenant_id, xid, coordinator, tx_id, end_flag))) {
state, end_flag))) {
if (OB_ITER_END == ret) { if (OB_ITER_END == ret) {
TRANS_LOG(INFO, "record not exist in global transaction", K(ret), K(xid)); TRANS_LOG(INFO, "record not exist in global transaction", K(ret), K(xid));
uint64_t data_version = 0; uint64_t data_version = 0;
@ -2883,7 +2753,6 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid,
} else { } else {
TRANS_LOG(WARN, "fail to query trans id and coordinator", K(ret), K(xid), TRANS_LOG(WARN, "fail to query trans id and coordinator", K(ret), K(xid),
K(tx_id), K(coordinator)); K(tx_id), K(coordinator));
// TODO, check the TM action
ret = OB_TRANS_XA_RETRY; ret = OB_TRANS_XA_RETRY;
} }
} else if (OB_UNLIKELY(!coordinator.is_valid())) { } else if (OB_UNLIKELY(!coordinator.is_valid())) {

View File

@ -143,18 +143,11 @@ public:
const ObTransID &trans_id, const ObTransID &trans_id,
const common::ObAddr &sche_addr, const common::ObAddr &sche_addr,
const int64_t flag); const int64_t flag);
int query_xa_state_and_flag(const uint64_t tenant_id,
const ObXATransID &xid,
int64_t &state,
int64_t &end_flag);
int delete_xa_record(const uint64_t tenant_id, int delete_xa_record(const uint64_t tenant_id,
const ObXATransID &xid); const ObXATransID &xid);
int delete_xa_branch(const uint64_t tenant_id, int delete_xa_branch(const uint64_t tenant_id,
const ObXATransID &xid, const ObXATransID &xid,
const bool is_tightly_coupled); const bool is_tightly_coupled);
int query_xa_coordinator_with_trans_id(const uint64_t tenant_id,
const ObTransID &trans_id,
share::ObLSID &coordinator);
int delete_xa_pending_record(const uint64_t tenant_id, int delete_xa_pending_record(const uint64_t tenant_id,
const ObTransID &tx_id); const ObTransID &tx_id);
// query coord from tenant table global transaction // query coord from tenant table global transaction
@ -162,7 +155,6 @@ public:
const ObXATransID &xid, const ObXATransID &xid,
share::ObLSID &coordinator, share::ObLSID &coordinator,
ObTransID &trans_id, ObTransID &trans_id,
int64_t &state,
int64_t &end_flag); int64_t &end_flag);
int query_xa_coordinator_with_xid(const uint64_t tenant_id, int query_xa_coordinator_with_xid(const uint64_t tenant_id,
const ObXATransID &xid, const ObXATransID &xid,