diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 3b13a10887..5974235cb0 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -2785,6 +2785,9 @@ int ObXACtx::xa_prepare_(const ObXATransID &xid, const int64_t timeout_us, bool affected_rows)) || 0 == affected_rows) { TRANS_LOG(WARN, "fail to update xa trans record", K(ret), K(xid), K(coord), K(affected_rows), K(*this)); + } else if (CLUSTER_VERSION_4_1_0_0 && OB_FAIL(MTL(ObXAService*)->insert_xa_pending_record(tenant_id_, xid, trans_id_, + coord, original_sche_addr_))) { + TRANS_LOG(WARN, "fail to insert xa trans record", K(ret), K(xid), K(coord), K(*this)); } else if (OB_FAIL(drive_prepare_(xid, timeout_us))) { // TODO, sche ctx should provide interfaces to drive xa prepare, TRANS_LOG(WARN, "drive prepare failed", K(ret), K(*this)); diff --git a/src/storage/tx/ob_xa_service.cpp b/src/storage/tx/ob_xa_service.cpp index eb7b35932f..bfa415309d 100644 --- a/src/storage/tx/ob_xa_service.cpp +++ b/src/storage/tx/ob_xa_service.cpp @@ -1638,9 +1638,22 @@ int ObXAService::xa_rollback(const ObXATransID &xid, tx_id, end_flag))) { if (OB_ITER_END == ret) { - // For xa rollback, if the internal table does not exist, - // it is considered to have been rolled back. - ret = OB_SUCCESS; + if (CLUSTER_VERSION_4_1_0_0) { + if (OB_FAIL(query_xa_coordinator_with_xid(tenant_id, xid, tx_id, coordinator))) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } else { + TRANS_LOG(WARN, "fail to query xa coordinator from pending transaction", K(ret), K(xid), K(tenant_id)); + } + } else if (OB_FAIL(xa_rollback_for_pending_trans_(xid, tx_id, timeout_us, tenant_id, request_id, + !ObXAFlag::contain_loosely(end_flag), coordinator))) { + TRANS_LOG(WARN, "fail to rollback xa trans for pending transaction", K(ret), K(xid), K(tenant_id)); + } else { + TRANS_LOG(INFO, "rollback xa trans for pending transaction success", K(ret), K(xid), K(tenant_id)); + } + } else { + ret = OB_SUCCESS; + } } else { TRANS_LOG(WARN, "fail to query xa scheduler and trans id from global transaction", K(ret), K(xid), K(tenant_id)); } @@ -1661,6 +1674,58 @@ int ObXAService::xa_rollback(const ObXATransID &xid, return ret; } +int ObXAService::xa_rollback_for_pending_trans_(const ObXATransID &xid, + const ObTransID &tx_id, + const int64_t timeout_us, + const uint64_t tenant_id, + const int64_t request_id, + const bool is_tightly_coupled, + const share::ObLSID &coord) +{ + int ret = OB_SUCCESS; + ObXACtx *xa_ctx = NULL; + bool alloc = true; + if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(tx_id, alloc, xa_ctx))) { + TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(tx_id), K(xid), K(alloc), KP(xa_ctx)); + } else if (OB_ISNULL(xa_ctx)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "xa ctx is null", K(ret), K(xid), K(tx_id)); + } else { + if (alloc && OB_FAIL(xa_ctx->init(xid, + tx_id, + tenant_id, + GCTX.self_addr(), + is_tightly_coupled, + this, + &xa_ctx_mgr_, + &xa_rpc_, + &timer_))) { + TRANS_LOG(WARN, "xa ctx init failed", K(ret), K(xid), K(tx_id)); + } else { + if (OB_FAIL(xa_ctx->two_phase_end_trans(xid, coord, true/*is_rollback*/, timeout_us, request_id))) { + if (OB_TRANS_ROLLBACKED != ret) { + TRANS_LOG(WARN, "xa rollback failed", K(ret), K(xid), K(tx_id)); + } else { + ret = OB_SUCCESS; + } + } else if (OB_FAIL(xa_ctx->wait_two_phase_end_trans(xid, true/*is_rollback*/, timeout_us))) { + TRANS_LOG(WARN, "wait xa rollback failed", K(ret), K(xid), K(tx_id)); + } else { + // do nothing + } + if (OB_SUCC(ret)) { + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = delete_xa_pending_record(tenant_id, tx_id))) { + TRANS_LOG(WARN, "fail to delete xa record from pending trans", K(ret), K(xid), K(tx_id)); + } + TRANS_LOG(INFO, "xa rollback success", K(ret), K(xid), K(tx_id)); + } + } + } + + return ret; +} + int ObXAService::xa_rollback_local(const ObXATransID &xid, const ObTransID &tx_id, const int64_t timeout_us, @@ -2282,6 +2347,233 @@ int ObXAService::remote_xa_prepare_(const ObXATransID &xid, return ret; } +#define QUERY_XA_COORDINATOR_WITH_TRANS_ID_SQL "\ + SELECT coordinator FROM %s \ + WHERE trans_id = %ld AND tenant_id = %lu" +int ObXAService::query_xa_coordinator_with_trans_id(const uint64_t tenant_id, + const ObTransID &trans_id, + share::ObLSID &coordinator) +{ + int ret = OB_SUCCESS; + ObMySQLProxy *mysql_proxy = NULL; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); + THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + if (!is_valid_tenant_id(tenant_id) || trans_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(trans_id)); + } else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) { + } else if (OB_ISNULL(mysql_proxy)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy)); + } else if (OB_FAIL(sql.assign_fmt(QUERY_XA_COORDINATOR_WITH_TRANS_ID_SQL, + OB_ALL_PENDING_TRANSACTION_TNAME, + trans_id.get_id(), + ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) { + TRANS_LOG(WARN, "generate query coordinator sql fail", K(ret)); + } else if (OB_FAIL(mysql_proxy->read(res, tenant_id, sql.ptr()))) { + TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql)); + } else if (OB_FAIL(result->next())) { + if (OB_ITER_END != ret) { + TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql)); + } + } else { + int64_t id = 0; + EXTRACT_INT_FIELD_MYSQL(*result, "coordinator", id, int64_t); + coordinator = ObLSID(id); + if (OB_FAIL(ret)) { + TRANS_LOG(WARN, "fail to extract field from result", K(ret)); + } + } + THIS_WORKER.set_timeout_ts(original_timeout_us); + } + return ret; +} + +#define DELETE_XA_PENDING_RECORD_SQL "delete from %s where \ + tenant_id = %lu and trans_id = %ld" +// delete record from pending trans (table two) +int ObXAService::delete_xa_pending_record(const uint64_t tenant_id, + const ObTransID &tx_id) +{ + int ret = OB_SUCCESS; + ObMySQLProxy *mysql_proxy = NULL; + ObSqlString sql; + int64_t affected_rows = 0; + int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); + THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + if (!is_valid_tenant_id(tenant_id) || !tx_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(tx_id)); + } else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) { + } else if (OB_ISNULL(mysql_proxy)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy)); + } else if (OB_FAIL(sql.assign_fmt(DELETE_XA_PENDING_RECORD_SQL, + OB_ALL_PENDING_TRANSACTION_TNAME, + ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), + tx_id.get_id()))) { + TRANS_LOG(WARN, "generate delete xa trans sql fail", K(ret), K(sql)); + } else if (OB_FAIL(mysql_proxy->write(tenant_id, sql.ptr(), affected_rows))) { + TRANS_LOG(WARN, "execute delete xa trans sql fail", + KR(ret), K(tenant_id), K(sql), K(affected_rows)); + } else if (OB_UNLIKELY(affected_rows > 1)) { + ret = OB_ERR_UNEXPECTED; // 删除了多行 + TRANS_LOG(ERROR, "update xa trans sql affects multiple rows", K(tenant_id), K(affected_rows), K(sql)); + } else if (OB_UNLIKELY(affected_rows == 0)) { + // 没有删除行,可能有重复xid,作为回滚成功处理 + TRANS_LOG(WARN, "update xa trans sql affects no rows", K(tenant_id), K(sql)); + } else { + TRANS_LOG(INFO, "delete xa record", K(tenant_id), "lbt", lbt()); + } + THIS_WORKER.set_timeout_ts(original_timeout_us); + TRANS_LOG(INFO, "delete xa record", K(ret), K(tx_id)); + return ret; +} + +#define QUERY_XA_COORDINATOR_WITH_XID_SQL "\ + SELECT trans_id, coordinator FROM %s \ + WHERE gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld AND tenant_id = %lu limit 1" +// query coord from pending trans (table two) +int ObXAService::query_xa_coordinator_with_xid(const uint64_t tenant_id, + const ObXATransID &xid, + ObTransID &trans_id, + share::ObLSID &coordinator) +{ + int ret = OB_SUCCESS; + char gtrid_str[128] = {0}; + int64_t gtrid_len = 0; + char bqual_str[128] = {0}; + int64_t bqual_len = 0; + ObMySQLProxy *mysql_proxy = NULL; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); + THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + if (!is_valid_tenant_id(tenant_id) || !xid.is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(trans_id)); + } else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) { + } else if (OB_ISNULL(mysql_proxy)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy)); + } else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(), + xid.get_gtrid_str().length(), + gtrid_str, 128, gtrid_len))) { + TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(tenant_id), K(xid)); + } else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(), + xid.get_bqual_str().length(), + bqual_str, 128, bqual_len))) { + TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(tenant_id), K(xid)); + } else if (OB_FAIL(sql.assign_fmt(QUERY_XA_COORDINATOR_WITH_XID_SQL, + OB_ALL_PENDING_TRANSACTION_TNAME, + (int)gtrid_len, gtrid_str, + (int)bqual_len, bqual_str, + xid.get_format_id(), + ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) { + TRANS_LOG(WARN, "generate query coordinator sql fail", K(ret)); + } else if (OB_FAIL(mysql_proxy->read(res, tenant_id, sql.ptr()))) { + TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql)); + } else if (OB_FAIL(result->next())) { + if (OB_ITER_END != ret) { + TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql)); + } + } else { + int64_t ls_id_value = 0; + int64_t tx_id_value = 0; + EXTRACT_INT_FIELD_MYSQL(*result, "coordinator", ls_id_value, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t); + coordinator = ObLSID(ls_id_value); + trans_id = ObTransID(tx_id_value); + if (OB_FAIL(ret)) { + TRANS_LOG(WARN, "fail to extract field from result", K(ret)); + } + } + THIS_WORKER.set_timeout_ts(original_timeout_us); + } + TRANS_LOG(INFO, "get trans id and coordinator from pending trans", K(ret), K(trans_id), K(coordinator)); + return ret; +} + +// for __all_pending_transaction +#define INSERT_XA_PENDING_RECORD_SQL "\ + insert into %s (tenant_id, gtrid, bqual, format_id, \ + trans_id, coordinator, scheduler_ip, scheduler_port, state) \ + values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %d)" + +int ObXAService::insert_xa_pending_record(const uint64_t tenant_id, + const ObXATransID &xid, + const ObTransID &tx_id, + const share::ObLSID &coordinator, + const common::ObAddr &sche_addr) +{ + int ret = OB_SUCCESS; + int64_t affected_rows = 0; + char scheduler_ip_buf[128] = {0}; + sche_addr.ip_to_string(scheduler_ip_buf, 128); + char gtrid_str[128] = {0}; + int64_t gtrid_len = 0; + char bqual_str[128] = {0}; + int64_t bqual_len = 0; + ObMySQLProxy *mysql_proxy = NULL; + ObSqlString sql; + + int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); + THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + + if (!is_valid_tenant_id(tenant_id) || xid.empty() || !tx_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(xid), K(tx_id)); + } else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) { + } else if (OB_ISNULL(mysql_proxy)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "mysql proxy is null", K(ret), KP(mysql_proxy)); + } else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(), + xid.get_gtrid_str().length(), + gtrid_str, 128, gtrid_len))) { + TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(tenant_id), K(xid)); + } else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(), + xid.get_bqual_str().length(), + bqual_str, 128, bqual_len))) { + TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(tenant_id), K(xid)); + } else if (OB_FAIL(sql.assign_fmt(INSERT_XA_PENDING_RECORD_SQL, + OB_ALL_PENDING_TRANSACTION_TNAME, + ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), + (int)gtrid_len, gtrid_str, + (int)bqual_len, bqual_str, + xid.get_format_id(), + tx_id.get_id(), coordinator.id(), + scheduler_ip_buf, sche_addr.get_port(), + ObXATransState::PREPARING))) { + TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql)); + } else if (OB_FAIL(mysql_proxy->write(tenant_id, sql.ptr(), affected_rows))) { + TRANS_LOG(WARN, "execute insert xa trans sql fail", + KR(ret), K(tenant_id), K(sql), K(affected_rows)); + if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) { + ret = OB_TRANS_XA_DUPID; + } + } else if (OB_UNLIKELY(affected_rows != 1)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected error, insert sql affects multiple rows or no rows", + K(ret), K(tenant_id), K(affected_rows), K(sql)); + } else { + TRANS_LOG(INFO, "insert xa record", K(tenant_id), K(xid), K(tx_id)); + } + + THIS_WORKER.set_timeout_ts(original_timeout_us); + + return ret; +} + #define QUERY_XA_COORDINATOR_TRANSID_SQL "\ SELECT coordinator, trans_id, state, flag FROM %s WHERE \ tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld" @@ -2555,6 +2847,7 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid, bool alloc = true; share::ObLSID coordinator; ObTransID tx_id; + bool record_in_tableone = true; int64_t end_flag = 0; int64_t state = ObXATransState::NON_EXISTING; // only used for constructor @@ -2563,14 +2856,28 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid, if (OB_FAIL(query_xa_coord_from_tableone(tenant_id, xid, coordinator, tx_id, state, end_flag))) { if (OB_ITER_END == ret) { - ret = OB_TRANS_XA_NOTA; - TRANS_LOG(WARN, "xa is not valid", K(ret), K(xid), K(tx_id), K(coordinator)); + TRANS_LOG(INFO, "record not exist in global transaction", K(ret), K(xid)); + if (CLUSTER_VERSION_4_1_0_0) { + ret = OB_SUCCESS; + record_in_tableone = false; + } } else { TRANS_LOG(WARN, "fail to qeery record from global transaction", K(ret), K(xid)); } } if (OB_FAIL(ret)) { // do nothing + } else if (!record_in_tableone && OB_FAIL(query_xa_coordinator_with_xid(tenant_id, xid, + tx_id, coordinator))) { + if (OB_ITER_END == ret) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "xa is not valid", K(ret), K(xid), K(tx_id), K(coordinator)); + } else { + TRANS_LOG(WARN, "fail to query trans id and coordinator", K(ret), K(xid), + K(tx_id), K(coordinator)); + // TODO, check the TM action + ret = OB_TRANS_XA_RETRY; + } } else if (OB_UNLIKELY(!coordinator.is_valid())) { ret = OB_TRANS_XA_PROTO; TRANS_LOG(WARN, "invalid coordinator when xa two phase commit/rollback", K(ret), K(xid), K(coordinator), K(tx_id)); diff --git a/src/storage/tx/ob_xa_service.h b/src/storage/tx/ob_xa_service.h index b84297fc4f..e7f5609c19 100644 --- a/src/storage/tx/ob_xa_service.h +++ b/src/storage/tx/ob_xa_service.h @@ -152,6 +152,11 @@ public: int delete_xa_branch(const uint64_t tenant_id, const ObXATransID &xid, const bool is_tightly_coupled); + int query_xa_coordinator_with_trans_id(const uint64_t tenant_id, + const ObTransID &trans_id, + share::ObLSID &coordinator); + int delete_xa_pending_record(const uint64_t tenant_id, + const ObTransID &tx_id); // query coord from tenant table global transaction int query_xa_coord_from_tableone(const uint64_t tenant_id, const ObXATransID &xid, @@ -159,6 +164,15 @@ public: ObTransID &trans_id, int64_t &state, int64_t &end_flag); + int query_xa_coordinator_with_xid(const uint64_t tenant_id, + const ObXATransID &xid, + ObTransID &trans_id, + share::ObLSID &coordinator); + int insert_xa_pending_record(const uint64_t tenant_id, + const ObXATransID &xid, + const ObTransID &trans_id, + const share::ObLSID &coordinator, + const common::ObAddr &sche_addr); int query_sche_and_coord(const uint64_t tenant_id, const ObXATransID &xid, ObAddr &scheduler_addr, @@ -226,6 +240,13 @@ private: const int64_t timeout_us, const int64_t request_id, bool &has_tx_level_temp_table); + int xa_rollback_for_pending_trans_(const ObXATransID &xid, + const ObTransID &tx_id, + const int64_t timeout_us, + const uint64_t tenant_id, + const int64_t request_id, + const bool is_tightly_coupled, + const share::ObLSID &coord); int two_phase_xa_rollback_(const ObXATransID &xid, const ObTransID &tx_id, const int64_t timeout_us,