diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 7ed0ec1caa..e3baaaca60 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -640,7 +640,6 @@ int ObTransService::decide_tx_commit_info_(ObTxDesc &tx, ObTxPart *&coord) */ int ObTransService::prepare_tx_coord(ObTxDesc &tx, share::ObLSID &coord_id) { - // TODO: for xa int ret = OB_SUCCESS; tx.lock_.lock(); ObTxPart *coord = NULL; @@ -676,7 +675,6 @@ int ObTransService::prepare_tx(ObTxDesc &tx, tx.commit_expire_ts_ = now + timeout_us; tx.state_ = ObTxDesc::State::SUB_PREPARING; ObTxSubPrepareMsg prepare_msg; - // TODO, retry mechanism if (OB_FAIL(tx.commit_task_.init(&tx, this))) { TRANS_LOG(WARN, "fail to init timeout task", K(ret), K(tx)); } else if (OB_FAIL(register_commit_retry_task_(tx))) { @@ -730,7 +728,6 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id, { int ret = OB_SUCCESS; int64_t now = ObClockGenerator::getClock(); - // TODO, alloc tx desc from tx mgr ObTxDesc *tx = NULL; if (OB_FAIL(tx_desc_mgr_.alloc(tx))) { TRANS_LOG(WARN, "alloc tx fail", K(ret), KPC(this)); @@ -792,9 +789,7 @@ int ObTransService::build_tx_sub_commit_msg_(const ObTxDesc &tx, ObTxSubCommitMs msg.sender_ = share::SCHEDULER_LS; msg.xid_ = tx.xid_; msg.cluster_version_ = GET_MIN_CLUSTER_VERSION(); - // invalid msg.cluster_id_ = GCONF.cluster_id; - // TODO, a special request id msg.request_id_ = tx.op_sn_; return ret; } @@ -809,9 +804,7 @@ int ObTransService::build_tx_sub_rollback_msg_(const ObTxDesc &tx, ObTxSubRollba msg.sender_ = share::SCHEDULER_LS; msg.xid_ = tx.xid_; msg.cluster_version_ = GET_MIN_CLUSTER_VERSION(); - // invalid msg.cluster_id_ = GCONF.cluster_id; - // TODO, a special request id msg.request_id_ = tx.op_sn_; return ret; } @@ -2755,12 +2748,13 @@ int ObTransService::handle_sub_prepare_result(const ObTransID &tx_id, if (OB_FAIL(tx_desc_mgr_.get(tx_id, tx))) { TRANS_LOG(WARN, "cannot found tx by id", K(ret), K(tx_id), K(result)); } else { + bool need_cb = false; tx->lock_.lock(); - // TODO, check state - if (ObTxDesc::State::IN_TERMINATE > tx->state_) { + if (ObTxDesc::State::IN_TERMINATE >= tx->state_) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected tx state", K(ret), K_(tx->state), K(tx_id), K(result), KPC(tx)); + tx->print_trace_(); } else if (ObTxDesc::State::SUB_PREPARED == tx->state_) { TRANS_LOG(WARN, "tx has been prepared", K_(tx->state), K(tx_id), K(result), KPC(tx)); @@ -2772,10 +2766,11 @@ int ObTransService::handle_sub_prepare_result(const ObTransID &tx_id, TRANS_LOG(WARN, "unexpected tx state", K_(tx->state), K(tx_id), K(result), KPC(tx)); } else { + need_cb = true; ret = handle_sub_prepare_result_(*tx, result); } tx->lock_.unlock(); - tx->execute_commit_cb(); + if (need_cb) { tx->execute_commit_cb(); } } if (OB_NOT_NULL(tx)) { tx_desc_mgr_.revert(*tx); @@ -2956,6 +2951,7 @@ int ObTransService::handle_sub_commit_result(const ObTransID &tx_id, ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected trans desc", K(ret), K(tx_id), K(result)); } else { + bool need_cb = false; tx->lock_.lock(); // TODO, check state if (ObTxDesc::State::SUB_COMMITTING != tx->state_) { @@ -2968,10 +2964,11 @@ int ObTransService::handle_sub_commit_result(const ObTransID &tx_id, if (OB_TRANS_COMMITED == result) { final_result = OB_SUCCESS; } + need_cb = true; ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result); } tx->lock_.unlock(); - tx->execute_commit_cb(); + if (need_cb) { tx->execute_commit_cb(); } } if (OB_NOT_NULL(tx)) { tx_desc_mgr_.revert(*tx); @@ -3001,6 +2998,7 @@ int ObTransService::handle_sub_rollback_result(const ObTransID &tx_id, ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected trans desc", K(ret), K(tx_id), K(result)); } else { + bool need_cb = false; tx->lock_.lock(); // TODO, check state if (ObTxDesc::State::SUB_ROLLBACKING != tx->state_) { @@ -3013,10 +3011,11 @@ int ObTransService::handle_sub_rollback_result(const ObTransID &tx_id, if (OB_TRANS_KILLED == result) { final_result = OB_SUCCESS; } + need_cb = true; ret = handle_sub_end_tx_result_(*tx, is_rollback, final_result); } tx->lock_.unlock(); - tx->execute_commit_cb(); + if (need_cb) { tx->execute_commit_cb(); } } if (OB_NOT_NULL(tx)) { tx_desc_mgr_.revert(*tx); diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 0c91ea3162..9a01d5a2b3 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -164,9 +164,11 @@ int ObXACtx::handle_timeout(const int64_t delay) } else if (is_terminated_) { ret = OB_TRANS_IS_EXITING; TRANS_LOG(WARN, "xa trans has terminated", K(ret)); - } else if (ObXATransState::has_submitted(xa_trans_state_)) { + } else if (ObXATransState::has_submitted(xa_trans_state_) && !is_xa_one_phase_) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "xa trans has entered commit phase, unexpected", K(ret), K(*this)); + } else if (ObXATransState::has_submitted(xa_trans_state_)) { + // do nothing } else { timeout_task_.set_running(true); if (get_original_sche_addr() == GCONF.self_addr_) { @@ -237,7 +239,6 @@ int ObXACtx::check_terminated_() const return ret; } -//TODO, verify int ObXACtx::is_one_phase_end_trans_allowed_(const ObXATransID &xid, const bool is_rollback) { int ret = OB_SUCCESS; @@ -261,7 +262,6 @@ int ObXACtx::is_one_phase_end_trans_allowed_(const ObXATransID &xid, const bool if (info.xid_.all_equal_to(xid)) { if (ObXATransState::PREPARING == info.state_) { //in preparing state, the previous xa prepare req is still processing, need retry - //TODO, VERIFY prepared state ret = OB_EAGAIN; } else if (ObXATransState::PREPARED == info.state_) { ret = OB_TRANS_ROLLBACKED; @@ -872,27 +872,6 @@ int ObXACtx::process_xa_end(const obrpc::ObXAEndRPCRequest &req) return ret; } -int ObXACtx::tx_desc_copy_(const ObTxDesc &from_trans_desc, ObTxDesc &to_trans_desc) -{ - // todo tingshan - int ret = OB_SUCCESS; - UNUSED(from_trans_desc); - UNUSED(to_trans_desc); - // if (FALSE_IT(to_trans_desc.set_max_sql_no(from_trans_desc.get_sql_no()))) { - // } else if (FALSE_IT(to_trans_desc.set_stmt_min_sql_no(from_trans_desc.get_stmt_min_sql_no()))) { - // } else if (FALSE_IT(to_trans_desc.get_trans_param() = from_trans_desc.get_trans_param())) { - // } else if (OB_FAIL(to_trans_desc.merge_participants(from_trans_desc.get_participants()))) { - // TRANS_LOG(WARN, "merge participants failed", K(ret), K(from_trans_desc)); - // } else if (OB_FAIL(to_trans_desc.merge_participants_pla(from_trans_desc.get_participants_pla()))) { - // TRANS_LOG(WARN, "merge participants pla failed", K(ret), K(from_trans_desc)); - // } /*else if (OB_FAIL(to_trans_desc.set_trans_consistency_type( - // from_trans_desc.get_trans_param().get_consistency_type()))) { - // TRANS_LOG(WARN, "set consistency type failed", K(ret), K(from_trans_desc)); - // }*/ - - return ret; -} - // handle start stmt request in original scheduler // @param [in] req int ObXACtx::process_start_stmt(const obrpc::ObXAStartStmtRPCRequest &req) @@ -934,7 +913,6 @@ int ObXACtx::process_start_stmt(const obrpc::ObXAStartStmtRPCRequest &req) executing_xid_ = xid; } if (OB_FAIL(ret)) { - // TODO, verify it stmt_unlock_(xid); } } @@ -1006,7 +984,6 @@ int ObXACtx::process_end_stmt(const obrpc::ObXAEndStmtRPCRequest &req) // which should be considered successful at this time } else { const ObTxStmtInfo &stmt_info = req.get_stmt_info(); - // TODO, remove duplicate if (OB_FAIL(MTL(ObTransService*)->update_tx_with_stmt_info(stmt_info, tx_desc_))) { TRANS_LOG(WARN, "update tx desc with stmt info failed", K(ret), K(xid), K(*this)); } else if (OB_FAIL(update_xa_branch_hb_info_(xid))) { @@ -1532,7 +1509,6 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid, TRANS_LOG(WARN, "post xa start request failed", K(ret), K(xid), K(*this)); } else { if (OB_FAIL(cond.wait(wait_time, result)) || OB_FAIL(result)) { - // TODO, check loose couple mode but OB_TRANS_CTX_NOT_EXIST, check OB_TIMEOUT if (is_tightly_coupled_ && (OB_TRANS_XA_BRANCH_FAIL == ret || OB_TRANS_CTX_NOT_EXIST == ret)) { TRANS_LOG(INFO, "xa trans has terminated", K(ret), K(xid), K(result)); //rewrite code @@ -1549,8 +1525,6 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid, } if (OB_FAIL(ret)) { - // OB_TRANS_XA_BRANCH_FAIL, - // TODO, check whether need to delete inner table record if (OB_TRANS_XA_BRANCH_FAIL == ret) { set_terminated_(); } @@ -2774,9 +2748,7 @@ int ObXACtx::xa_prepare_(const ObXATransID &xid, const int64_t timeout_us, bool uint64_t data_version = 0; if (OB_FAIL(MTL(ObTransService*)->prepare_tx_coord(*tx_desc_, coord))) { if (OB_ERR_READ_ONLY_TRANSACTION == ret) { - TRANS_LOG(WARN, "fail to prepare tx coord", K(ret), K(*this)); - // TODO, no need rewrite ret - // ret = OB_TRANS_XA_RDONLY; + TRANS_LOG(INFO, "xa is read only", K(ret), K(*this)); } else { TRANS_LOG(WARN, "fail to prepare tx coord", K(ret), K(*this)); } @@ -2793,7 +2765,6 @@ int ObXACtx::xa_prepare_(const ObXATransID &xid, const int64_t timeout_us, bool insert_xa_pending_record(tenant_id_, xid, trans_id_, coord, original_sche_addr_))) { TRANS_LOG(WARN, "fail to insert xa trans record", K(ret), K(xid), K(coord), K(*this)); } else if (OB_FAIL(drive_prepare_(xid, timeout_us))) { - // TODO, sche ctx should provide interfaces to drive xa prepare, TRANS_LOG(WARN, "drive prepare failed", K(ret), K(*this)); } // if the last branch fails, need exit @@ -2855,7 +2826,6 @@ int ObXACtx::wait_xa_prepare(const ObXATransID &xid, const int64_t timeout_us) } } ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); - // TODO, handle result if (OB_SUCC(ret) || OB_ERR_READ_ONLY_TRANSACTION == ret) { xa_trans_state_ = ObXATransState::PREPARED; } @@ -3043,7 +3013,6 @@ int ObXACtx::check_trans_state_(const bool is_rollback, if (request_id_ == request_id) { ret = OB_EAGAIN; } else { - // todo lixinze:error or not if (is_xa_one_phase_) { ret = OB_TRANS_ROLLBACKED; } else { diff --git a/src/storage/tx/ob_xa_ctx.h b/src/storage/tx/ob_xa_ctx.h index 9881462aa8..1d05749d44 100644 --- a/src/storage/tx/ob_xa_ctx.h +++ b/src/storage/tx/ob_xa_ctx.h @@ -193,7 +193,6 @@ private: // for 4.0 int process_xa_start_tightly_(const obrpc::ObXAStartRPCRequest &req); int process_xa_start_loosely_(const obrpc::ObXAStartRPCRequest &req); - int tx_desc_copy_(const ObTxDesc &from_trans_desc, ObTxDesc &to_trans_desc); int xa_start_(const ObXATransID &xid, const int64_t flags, const int64_t timeout_seconds, diff --git a/src/storage/tx/ob_xa_define.h b/src/storage/tx/ob_xa_define.h index 9a2353e20e..c310a231a1 100644 --- a/src/storage/tx/ob_xa_define.h +++ b/src/storage/tx/ob_xa_define.h @@ -61,8 +61,10 @@ public: { return COMMITTING == state || ROLLBACKING == state + || PREPARING == state || ROLLBACKED == state - || COMMITTED == state; + || COMMITTED == state + || PREPARED == state; } static bool can_convert(const int32_t src_state, const int32_t dst_state); static const char* to_string(int32_t state) { diff --git a/src/storage/tx/ob_xa_service.cpp b/src/storage/tx/ob_xa_service.cpp index 1baf6a0a35..da4f68264a 100644 --- a/src/storage/tx/ob_xa_service.cpp +++ b/src/storage/tx/ob_xa_service.cpp @@ -195,12 +195,10 @@ int ObXAService::local_one_phase_xa_commit_(const ObXATransID &xid, bool alloc = false; share::ObLSID coordinator; int64_t end_flag = 0; - int64_t state = ObXATransState::NON_EXISTING; - // todo lixinze:修改内部表获取sql ObTransID moke_tx_id; if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(trans_id, alloc, xa_ctx))) { - if (OB_FAIL(query_xa_coord_from_tableone(MTL_ID(), xid, coordinator, moke_tx_id, state, end_flag))) { + if (OB_FAIL(query_xa_coord_from_tableone(MTL_ID(), xid, coordinator, moke_tx_id, end_flag))) { if (OB_ITER_END == ret) { ret = OB_TRANS_XA_NOTA; TRANS_LOG(WARN, "xid is not valid", K(ret), K(xid)); @@ -257,8 +255,8 @@ int ObXAService::revert_xa_ctx(ObXACtx *xa_ctx) #define INSERT_XA_STANDBY_TRANS_SQL "\ insert into %s (tenant_id, gtrid, bqual, format_id, \ - trans_id, coordinator, scheduler_ip, scheduler_port, state, flag) \ - values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %d, %ld)" + trans_id, coordinator, scheduler_ip, scheduler_port, flag) \ + values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %ld)" void ObXAService::insert_record_for_standby(const uint64_t tenant_id, const ObXATransID &xid, @@ -300,7 +298,7 @@ void ObXAService::insert_record_for_standby(const uint64_t tenant_id, trans_id.get_id(), coordinator.id(), scheduler_ip_buf, sche_addr.get_port(), - ObXATransState::ACTIVE, (long)0))) { + (long)0))) { TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql)); } else if (OB_FAIL(mysql_proxy->write(exec_tenant_id, sql.ptr(), affected_rows))) { TRANS_LOG(WARN, "execute insert record sql failed", KR(ret), K(exec_tenant_id), K(tenant_id)); @@ -491,71 +489,6 @@ int ObXAService::insert_xa_record(ObISQLClient &client, return ret; } -#define QUERY_XA_STATE_FLAG_SQL "\ - SELECT state, flag FROM %s WHERE \ - tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld" - -int ObXAService::query_xa_state_and_flag(const uint64_t tenant_id, - const ObXATransID &xid, - int64_t &state, - int64_t &end_flag) -{ - int ret = OB_SUCCESS; - ObMySQLProxy *mysql_proxy = NULL; - ObSqlString sql; - SMART_VAR(ObMySQLProxy::MySQLResult, res) { - ObMySQLResult *result = NULL; - char gtrid_str[128] = {0}; - int64_t gtrid_len = 0; - char bqual_str[128] = {0}; - int64_t bqual_len = 0; - - const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); - int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); - - if (!is_valid_tenant_id(tenant_id) || xid.empty()) { - ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(xid)); - } else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) { - } else if (OB_ISNULL(mysql_proxy)) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy)); - } else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(), - xid.get_gtrid_str().length(), - gtrid_str, 128, gtrid_len))) { - TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(tenant_id), K(xid)); - } else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(), - xid.get_bqual_str().length(), - bqual_str, 128, bqual_len))) { - TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(tenant_id), K(xid)); - } else if (OB_FAIL(sql.assign_fmt(QUERY_XA_STATE_FLAG_SQL, - OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME, - tenant_id, - (int)gtrid_len, gtrid_str, - (int)bqual_len, bqual_str, - xid.get_format_id()))) { - TRANS_LOG(WARN, "generate query xa state flag fail", K(ret)); - } else if (OB_FAIL(mysql_proxy->read(res, exec_tenant_id, sql.ptr()))) { - TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(exec_tenant_id), K(tenant_id), K(sql)); - } else if (OB_ISNULL(result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql)); - } else if (OB_FAIL(result->next())) { - if (OB_ITER_END != ret) { - TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql)); - } - } else { - EXTRACT_INT_FIELD_MYSQL(*result, "state", state, int64_t); - EXTRACT_INT_FIELD_MYSQL(*result, "flag", end_flag, int64_t); - } - - THIS_WORKER.set_timeout_ts(original_timeout_us); - } - - return ret; -} - #define DELETE_XA_TRANS_SQL "delete from %s where \ tenant_id = %lu and gtrid = x'%.*s' and bqual = x'%.*s' and format_id = %ld" @@ -1582,11 +1515,7 @@ int ObXAService::one_phase_xa_commit_(const ObXATransID &xid, } else if (coordinator.is_valid()) { ret = OB_TRANS_XA_PROTO; TRANS_LOG(WARN, "xa has entered the commit phase", K(ret), K(tx_id), K(xid), K(coordinator)); - } /*else if (ObXATransState::IDLE != state) { - ret = OB_TRANS_XA_PROTO; - TRANS_LOG(WARN, "xa trans one phase commit invoked in improper state", - K(ret), K(state), K(xid), K(tx_id)); - }*/ else if (sche_addr == GCTX.self_addr()) { + } else if (sche_addr == GCTX.self_addr()) { if (OB_FAIL(local_one_phase_xa_commit_(xid, tx_id, timeout_us, request_id, has_tx_level_temp_table))) { TRANS_LOG(WARN, "local one phase commit failed", K(ret), K(tx_id), K(xid)); } @@ -1600,7 +1529,6 @@ int ObXAService::one_phase_xa_commit_(const ObXATransID &xid, // xa_proto is returned when in tightly couple mode, there are // still multiple branches and one phase commit is triggered. // Under such condition, oracle would not delete inner table record - // TODO, maybe should use OB_SUCC(ret)??? if (OB_TRANS_XA_PROTO != ret) { const bool is_tightly = !ObXAFlag::contain_loosely(end_flag); int tmp_ret = OB_SUCCESS; @@ -1736,7 +1664,6 @@ int ObXAService::xa_rollback_local(const ObXATransID &xid, { int ret = OB_SUCCESS; int64_t end_flag = 0; - int64_t state = ObXATransState::NON_EXISTING; share::ObLSID coordinator; if (OB_UNLIKELY(!xid.is_valid()) @@ -1842,7 +1769,6 @@ int ObXAService::one_phase_xa_rollback_(const ObXATransID &xid, bool alloc = false; ObXACtx *xa_ctx = NULL; share::ObLSID coordinator; - int64_t state = ObXATransState::NON_EXISTING; // mock tmp_tx_id for query global transaction second time ObTransID tmp_tx_id; @@ -1852,7 +1778,6 @@ int ObXAService::one_phase_xa_rollback_(const ObXATransID &xid, xid, coordinator, tmp_tx_id, - state, end_flag))) { if (OB_ITER_END == tmp_ret) { ret = OB_SUCCESS; @@ -2184,12 +2109,10 @@ int ObXAService::xa_prepare(const ObXATransID &xid, } } - // TODO, // 1. tightly coupled, OB_ERR_READ_ONLY_TRANSACTION, delete lock and record // 2. tightly coupled, OB_TRANS_XA_RDONLY, delete record only // 3. loosely coupled, OB_ERR_READ_ONLY_TRANSACTION, delete record if (OB_TRANS_XA_RDONLY == ret) { - // TODO, verify delete_xa_record_state and ret(tmp_ret) if (OB_SUCCESS != (tmp_ret = delete_xa_record(tenant_id, xid))) { TRANS_LOG(WARN, "delete xa record failed", K(tmp_ret), K(tenant_id), K(xid)); } @@ -2350,54 +2273,6 @@ int ObXAService::remote_xa_prepare_(const ObXATransID &xid, return ret; } -#define QUERY_XA_COORDINATOR_WITH_TRANS_ID_SQL "\ - SELECT coordinator FROM %s \ - WHERE trans_id = %ld AND tenant_id = %lu" -int ObXAService::query_xa_coordinator_with_trans_id(const uint64_t tenant_id, - const ObTransID &trans_id, - share::ObLSID &coordinator) -{ - int ret = OB_SUCCESS; - ObMySQLProxy *mysql_proxy = NULL; - ObSqlString sql; - SMART_VAR(ObMySQLProxy::MySQLResult, res) { - ObMySQLResult *result = NULL; - int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); - if (!is_valid_tenant_id(tenant_id) || trans_id.is_valid()) { - ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(trans_id)); - } else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) { - } else if (OB_ISNULL(mysql_proxy)) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy)); - } else if (OB_FAIL(sql.assign_fmt(QUERY_XA_COORDINATOR_WITH_TRANS_ID_SQL, - OB_ALL_PENDING_TRANSACTION_TNAME, - trans_id.get_id(), - ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) { - TRANS_LOG(WARN, "generate query coordinator sql fail", K(ret)); - } else if (OB_FAIL(mysql_proxy->read(res, tenant_id, sql.ptr()))) { - TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(tenant_id), K(sql)); - } else if (OB_ISNULL(result = res.get_result())) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql)); - } else if (OB_FAIL(result->next())) { - if (OB_ITER_END != ret) { - TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql)); - } - } else { - int64_t id = 0; - EXTRACT_INT_FIELD_MYSQL(*result, "coordinator", id, int64_t); - coordinator = ObLSID(id); - if (OB_FAIL(ret)) { - TRANS_LOG(WARN, "fail to extract field from result", K(ret)); - } - } - THIS_WORKER.set_timeout_ts(original_timeout_us); - } - return ret; -} - #define DELETE_XA_PENDING_RECORD_SQL "delete from %s where \ tenant_id = %lu and trans_id = %ld" // delete record from pending trans (table two) @@ -2510,8 +2385,8 @@ int ObXAService::query_xa_coordinator_with_xid(const uint64_t tenant_id, // for __all_pending_transaction #define INSERT_XA_PENDING_RECORD_SQL "\ insert into %s (tenant_id, gtrid, bqual, format_id, \ - trans_id, coordinator, scheduler_ip, scheduler_port, state) \ - values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %d)" + trans_id, coordinator, scheduler_ip, scheduler_port) \ + values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d)" int ObXAService::insert_xa_pending_record(const uint64_t tenant_id, const ObXATransID &xid, @@ -2555,8 +2430,7 @@ int ObXAService::insert_xa_pending_record(const uint64_t tenant_id, (int)bqual_len, bqual_str, xid.get_format_id(), tx_id.get_id(), coordinator.id(), - scheduler_ip_buf, sche_addr.get_port(), - ObXATransState::PREPARING))) { + scheduler_ip_buf, sche_addr.get_port()))) { TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql)); } else if (OB_FAIL(mysql_proxy->write(tenant_id, sql.ptr(), affected_rows))) { TRANS_LOG(WARN, "execute insert xa trans sql fail", @@ -2578,7 +2452,7 @@ int ObXAService::insert_xa_pending_record(const uint64_t tenant_id, } #define QUERY_XA_COORDINATOR_TRANSID_SQL "\ - SELECT coordinator, trans_id, state, flag FROM %s WHERE \ + SELECT coordinator, trans_id, flag FROM %s WHERE \ tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld" // query coord from global transaction (table one) @@ -2586,7 +2460,6 @@ int ObXAService::query_xa_coord_from_tableone(const uint64_t tenant_id, const ObXATransID &xid, share::ObLSID &coordinator, ObTransID &trans_id, - int64_t &state, int64_t &end_flag) { int ret = OB_SUCCESS; @@ -2642,7 +2515,6 @@ int ObXAService::query_xa_coord_from_tableone(const uint64_t tenant_id, int64_t tx_id_value = 0; EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t); - EXTRACT_INT_FIELD_MYSQL(*result, "state", state, int64_t); EXTRACT_INT_FIELD_MYSQL(*result, "flag", end_flag, int64_t); trans_id = ObTransID(tx_id_value); @@ -2852,12 +2724,10 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid, ObTransID tx_id; bool record_in_tableone = true; int64_t end_flag = 0; - int64_t state = ObXATransState::NON_EXISTING; // only used for constructor bool is_tightly_coupled = true; - if (OB_FAIL(query_xa_coord_from_tableone(tenant_id, xid, coordinator, tx_id, - state, end_flag))) { + if (OB_FAIL(query_xa_coord_from_tableone(tenant_id, xid, coordinator, tx_id, end_flag))) { if (OB_ITER_END == ret) { TRANS_LOG(INFO, "record not exist in global transaction", K(ret), K(xid)); uint64_t data_version = 0; @@ -2883,7 +2753,6 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid, } else { TRANS_LOG(WARN, "fail to query trans id and coordinator", K(ret), K(xid), K(tx_id), K(coordinator)); - // TODO, check the TM action ret = OB_TRANS_XA_RETRY; } } else if (OB_UNLIKELY(!coordinator.is_valid())) { diff --git a/src/storage/tx/ob_xa_service.h b/src/storage/tx/ob_xa_service.h index e7f5609c19..63a82fbf6f 100644 --- a/src/storage/tx/ob_xa_service.h +++ b/src/storage/tx/ob_xa_service.h @@ -143,18 +143,11 @@ public: const ObTransID &trans_id, const common::ObAddr &sche_addr, const int64_t flag); - int query_xa_state_and_flag(const uint64_t tenant_id, - const ObXATransID &xid, - int64_t &state, - int64_t &end_flag); int delete_xa_record(const uint64_t tenant_id, const ObXATransID &xid); int delete_xa_branch(const uint64_t tenant_id, const ObXATransID &xid, const bool is_tightly_coupled); - int query_xa_coordinator_with_trans_id(const uint64_t tenant_id, - const ObTransID &trans_id, - share::ObLSID &coordinator); int delete_xa_pending_record(const uint64_t tenant_id, const ObTransID &tx_id); // query coord from tenant table global transaction @@ -162,7 +155,6 @@ public: const ObXATransID &xid, share::ObLSID &coordinator, ObTransID &trans_id, - int64_t &state, int64_t &end_flag); int query_xa_coordinator_with_xid(const uint64_t tenant_id, const ObXATransID &xid,