fix the upgrade problem after deleting the pending transaction

This commit is contained in:
obdev
2023-04-27 15:12:06 +00:00
committed by ob-robot
parent 271e8aa047
commit e8270d33f4
3 changed files with 336 additions and 5 deletions

View File

@ -2785,6 +2785,9 @@ int ObXACtx::xa_prepare_(const ObXATransID &xid, const int64_t timeout_us, bool
affected_rows)) || 0 == affected_rows) {
TRANS_LOG(WARN, "fail to update xa trans record", K(ret), K(xid), K(coord),
K(affected_rows), K(*this));
} else if (CLUSTER_VERSION_4_1_0_0 && OB_FAIL(MTL(ObXAService*)->insert_xa_pending_record(tenant_id_, xid, trans_id_,
coord, original_sche_addr_))) {
TRANS_LOG(WARN, "fail to insert xa trans record", K(ret), K(xid), K(coord), K(*this));
} else if (OB_FAIL(drive_prepare_(xid, timeout_us))) {
// TODO, sche ctx should provide interfaces to drive xa prepare,
TRANS_LOG(WARN, "drive prepare failed", K(ret), K(*this));

View File

@ -1638,9 +1638,22 @@ int ObXAService::xa_rollback(const ObXATransID &xid,
tx_id,
end_flag))) {
if (OB_ITER_END == ret) {
// For xa rollback, if the internal table does not exist,
// it is considered to have been rolled back.
ret = OB_SUCCESS;
if (CLUSTER_VERSION_4_1_0_0) {
if (OB_FAIL(query_xa_coordinator_with_xid(tenant_id, xid, tx_id, coordinator))) {
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
} else {
TRANS_LOG(WARN, "fail to query xa coordinator from pending transaction", K(ret), K(xid), K(tenant_id));
}
} else if (OB_FAIL(xa_rollback_for_pending_trans_(xid, tx_id, timeout_us, tenant_id, request_id,
!ObXAFlag::contain_loosely(end_flag), coordinator))) {
TRANS_LOG(WARN, "fail to rollback xa trans for pending transaction", K(ret), K(xid), K(tenant_id));
} else {
TRANS_LOG(INFO, "rollback xa trans for pending transaction success", K(ret), K(xid), K(tenant_id));
}
} else {
ret = OB_SUCCESS;
}
} else {
TRANS_LOG(WARN, "fail to query xa scheduler and trans id from global transaction", K(ret), K(xid), K(tenant_id));
}
@ -1661,6 +1674,58 @@ int ObXAService::xa_rollback(const ObXATransID &xid,
return ret;
}
int ObXAService::xa_rollback_for_pending_trans_(const ObXATransID &xid,
const ObTransID &tx_id,
const int64_t timeout_us,
const uint64_t tenant_id,
const int64_t request_id,
const bool is_tightly_coupled,
const share::ObLSID &coord)
{
int ret = OB_SUCCESS;
ObXACtx *xa_ctx = NULL;
bool alloc = true;
if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(tx_id, alloc, xa_ctx))) {
TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(tx_id), K(xid), K(alloc), KP(xa_ctx));
} else if (OB_ISNULL(xa_ctx)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "xa ctx is null", K(ret), K(xid), K(tx_id));
} else {
if (alloc && OB_FAIL(xa_ctx->init(xid,
tx_id,
tenant_id,
GCTX.self_addr(),
is_tightly_coupled,
this,
&xa_ctx_mgr_,
&xa_rpc_,
&timer_))) {
TRANS_LOG(WARN, "xa ctx init failed", K(ret), K(xid), K(tx_id));
} else {
if (OB_FAIL(xa_ctx->two_phase_end_trans(xid, coord, true/*is_rollback*/, timeout_us, request_id))) {
if (OB_TRANS_ROLLBACKED != ret) {
TRANS_LOG(WARN, "xa rollback failed", K(ret), K(xid), K(tx_id));
} else {
ret = OB_SUCCESS;
}
} else if (OB_FAIL(xa_ctx->wait_two_phase_end_trans(xid, true/*is_rollback*/, timeout_us))) {
TRANS_LOG(WARN, "wait xa rollback failed", K(ret), K(xid), K(tx_id));
} else {
// do nothing
}
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = delete_xa_pending_record(tenant_id, tx_id))) {
TRANS_LOG(WARN, "fail to delete xa record from pending trans", K(ret), K(xid), K(tx_id));
}
TRANS_LOG(INFO, "xa rollback success", K(ret), K(xid), K(tx_id));
}
}
}
return ret;
}
int ObXAService::xa_rollback_local(const ObXATransID &xid,
const ObTransID &tx_id,
const int64_t timeout_us,
@ -2282,6 +2347,233 @@ int ObXAService::remote_xa_prepare_(const ObXATransID &xid,
return ret;
}
#define QUERY_XA_COORDINATOR_WITH_TRANS_ID_SQL "\
SELECT coordinator FROM %s \
WHERE trans_id = %ld AND tenant_id = %lu"
int ObXAService::query_xa_coordinator_with_trans_id(const uint64_t tenant_id,
const ObTransID &trans_id,
share::ObLSID &coordinator)
{
int ret = OB_SUCCESS;
ObMySQLProxy *mysql_proxy = NULL;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT);
if (!is_valid_tenant_id(tenant_id) || trans_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(trans_id));
} else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) {
} else if (OB_ISNULL(mysql_proxy)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy));
} else if (OB_FAIL(sql.assign_fmt(QUERY_XA_COORDINATOR_WITH_TRANS_ID_SQL,
OB_ALL_PENDING_TRANSACTION_TNAME,
trans_id.get_id(),
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) {
TRANS_LOG(WARN, "generate query coordinator sql fail", K(ret));
} else if (OB_FAIL(mysql_proxy->read(res, tenant_id, sql.ptr()))) {
TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql));
}
} else {
int64_t id = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "coordinator", id, int64_t);
coordinator = ObLSID(id);
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "fail to extract field from result", K(ret));
}
}
THIS_WORKER.set_timeout_ts(original_timeout_us);
}
return ret;
}
#define DELETE_XA_PENDING_RECORD_SQL "delete from %s where \
tenant_id = %lu and trans_id = %ld"
// delete record from pending trans (table two)
int ObXAService::delete_xa_pending_record(const uint64_t tenant_id,
const ObTransID &tx_id)
{
int ret = OB_SUCCESS;
ObMySQLProxy *mysql_proxy = NULL;
ObSqlString sql;
int64_t affected_rows = 0;
int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT);
if (!is_valid_tenant_id(tenant_id) || !tx_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(tx_id));
} else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) {
} else if (OB_ISNULL(mysql_proxy)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy));
} else if (OB_FAIL(sql.assign_fmt(DELETE_XA_PENDING_RECORD_SQL,
OB_ALL_PENDING_TRANSACTION_TNAME,
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
tx_id.get_id()))) {
TRANS_LOG(WARN, "generate delete xa trans sql fail", K(ret), K(sql));
} else if (OB_FAIL(mysql_proxy->write(tenant_id, sql.ptr(), affected_rows))) {
TRANS_LOG(WARN, "execute delete xa trans sql fail",
KR(ret), K(tenant_id), K(sql), K(affected_rows));
} else if (OB_UNLIKELY(affected_rows > 1)) {
ret = OB_ERR_UNEXPECTED; // 删除了多行
TRANS_LOG(ERROR, "update xa trans sql affects multiple rows", K(tenant_id), K(affected_rows), K(sql));
} else if (OB_UNLIKELY(affected_rows == 0)) {
// 没有删除行,可能有重复xid,作为回滚成功处理
TRANS_LOG(WARN, "update xa trans sql affects no rows", K(tenant_id), K(sql));
} else {
TRANS_LOG(INFO, "delete xa record", K(tenant_id), "lbt", lbt());
}
THIS_WORKER.set_timeout_ts(original_timeout_us);
TRANS_LOG(INFO, "delete xa record", K(ret), K(tx_id));
return ret;
}
#define QUERY_XA_COORDINATOR_WITH_XID_SQL "\
SELECT trans_id, coordinator FROM %s \
WHERE gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld AND tenant_id = %lu limit 1"
// query coord from pending trans (table two)
int ObXAService::query_xa_coordinator_with_xid(const uint64_t tenant_id,
const ObXATransID &xid,
ObTransID &trans_id,
share::ObLSID &coordinator)
{
int ret = OB_SUCCESS;
char gtrid_str[128] = {0};
int64_t gtrid_len = 0;
char bqual_str[128] = {0};
int64_t bqual_len = 0;
ObMySQLProxy *mysql_proxy = NULL;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT);
if (!is_valid_tenant_id(tenant_id) || !xid.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(trans_id));
} else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) {
} else if (OB_ISNULL(mysql_proxy)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy));
} else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(),
xid.get_gtrid_str().length(),
gtrid_str, 128, gtrid_len))) {
TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(tenant_id), K(xid));
} else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(),
xid.get_bqual_str().length(),
bqual_str, 128, bqual_len))) {
TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(tenant_id), K(xid));
} else if (OB_FAIL(sql.assign_fmt(QUERY_XA_COORDINATOR_WITH_XID_SQL,
OB_ALL_PENDING_TRANSACTION_TNAME,
(int)gtrid_len, gtrid_str,
(int)bqual_len, bqual_str,
xid.get_format_id(),
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)))) {
TRANS_LOG(WARN, "generate query coordinator sql fail", K(ret));
} else if (OB_FAIL(mysql_proxy->read(res, tenant_id, sql.ptr()))) {
TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql));
}
} else {
int64_t ls_id_value = 0;
int64_t tx_id_value = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "coordinator", ls_id_value, int64_t);
EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t);
coordinator = ObLSID(ls_id_value);
trans_id = ObTransID(tx_id_value);
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "fail to extract field from result", K(ret));
}
}
THIS_WORKER.set_timeout_ts(original_timeout_us);
}
TRANS_LOG(INFO, "get trans id and coordinator from pending trans", K(ret), K(trans_id), K(coordinator));
return ret;
}
// for __all_pending_transaction
#define INSERT_XA_PENDING_RECORD_SQL "\
insert into %s (tenant_id, gtrid, bqual, format_id, \
trans_id, coordinator, scheduler_ip, scheduler_port, state) \
values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %d)"
int ObXAService::insert_xa_pending_record(const uint64_t tenant_id,
const ObXATransID &xid,
const ObTransID &tx_id,
const share::ObLSID &coordinator,
const common::ObAddr &sche_addr)
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
char scheduler_ip_buf[128] = {0};
sche_addr.ip_to_string(scheduler_ip_buf, 128);
char gtrid_str[128] = {0};
int64_t gtrid_len = 0;
char bqual_str[128] = {0};
int64_t bqual_len = 0;
ObMySQLProxy *mysql_proxy = NULL;
ObSqlString sql;
int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT);
if (!is_valid_tenant_id(tenant_id) || xid.empty() || !tx_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(xid), K(tx_id));
} else if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) {
} else if (OB_ISNULL(mysql_proxy)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "mysql proxy is null", K(ret), KP(mysql_proxy));
} else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(),
xid.get_gtrid_str().length(),
gtrid_str, 128, gtrid_len))) {
TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(tenant_id), K(xid));
} else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(),
xid.get_bqual_str().length(),
bqual_str, 128, bqual_len))) {
TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(tenant_id), K(xid));
} else if (OB_FAIL(sql.assign_fmt(INSERT_XA_PENDING_RECORD_SQL,
OB_ALL_PENDING_TRANSACTION_TNAME,
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
(int)gtrid_len, gtrid_str,
(int)bqual_len, bqual_str,
xid.get_format_id(),
tx_id.get_id(), coordinator.id(),
scheduler_ip_buf, sche_addr.get_port(),
ObXATransState::PREPARING))) {
TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql));
} else if (OB_FAIL(mysql_proxy->write(tenant_id, sql.ptr(), affected_rows))) {
TRANS_LOG(WARN, "execute insert xa trans sql fail",
KR(ret), K(tenant_id), K(sql), K(affected_rows));
if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) {
ret = OB_TRANS_XA_DUPID;
}
} else if (OB_UNLIKELY(affected_rows != 1)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected error, insert sql affects multiple rows or no rows",
K(ret), K(tenant_id), K(affected_rows), K(sql));
} else {
TRANS_LOG(INFO, "insert xa record", K(tenant_id), K(xid), K(tx_id));
}
THIS_WORKER.set_timeout_ts(original_timeout_us);
return ret;
}
#define QUERY_XA_COORDINATOR_TRANSID_SQL "\
SELECT coordinator, trans_id, state, flag FROM %s WHERE \
tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld"
@ -2555,6 +2847,7 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid,
bool alloc = true;
share::ObLSID coordinator;
ObTransID tx_id;
bool record_in_tableone = true;
int64_t end_flag = 0;
int64_t state = ObXATransState::NON_EXISTING;
// only used for constructor
@ -2563,14 +2856,28 @@ int ObXAService::two_phase_xa_commit_(const ObXATransID &xid,
if (OB_FAIL(query_xa_coord_from_tableone(tenant_id, xid, coordinator, tx_id,
state, end_flag))) {
if (OB_ITER_END == ret) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "xa is not valid", K(ret), K(xid), K(tx_id), K(coordinator));
TRANS_LOG(INFO, "record not exist in global transaction", K(ret), K(xid));
if (CLUSTER_VERSION_4_1_0_0) {
ret = OB_SUCCESS;
record_in_tableone = false;
}
} else {
TRANS_LOG(WARN, "fail to qeery record from global transaction", K(ret), K(xid));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (!record_in_tableone && OB_FAIL(query_xa_coordinator_with_xid(tenant_id, xid,
tx_id, coordinator))) {
if (OB_ITER_END == ret) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "xa is not valid", K(ret), K(xid), K(tx_id), K(coordinator));
} else {
TRANS_LOG(WARN, "fail to query trans id and coordinator", K(ret), K(xid),
K(tx_id), K(coordinator));
// TODO, check the TM action
ret = OB_TRANS_XA_RETRY;
}
} else if (OB_UNLIKELY(!coordinator.is_valid())) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "invalid coordinator when xa two phase commit/rollback", K(ret), K(xid), K(coordinator), K(tx_id));

View File

@ -152,6 +152,11 @@ public:
int delete_xa_branch(const uint64_t tenant_id,
const ObXATransID &xid,
const bool is_tightly_coupled);
int query_xa_coordinator_with_trans_id(const uint64_t tenant_id,
const ObTransID &trans_id,
share::ObLSID &coordinator);
int delete_xa_pending_record(const uint64_t tenant_id,
const ObTransID &tx_id);
// query coord from tenant table global transaction
int query_xa_coord_from_tableone(const uint64_t tenant_id,
const ObXATransID &xid,
@ -159,6 +164,15 @@ public:
ObTransID &trans_id,
int64_t &state,
int64_t &end_flag);
int query_xa_coordinator_with_xid(const uint64_t tenant_id,
const ObXATransID &xid,
ObTransID &trans_id,
share::ObLSID &coordinator);
int insert_xa_pending_record(const uint64_t tenant_id,
const ObXATransID &xid,
const ObTransID &trans_id,
const share::ObLSID &coordinator,
const common::ObAddr &sche_addr);
int query_sche_and_coord(const uint64_t tenant_id,
const ObXATransID &xid,
ObAddr &scheduler_addr,
@ -226,6 +240,13 @@ private:
const int64_t timeout_us,
const int64_t request_id,
bool &has_tx_level_temp_table);
int xa_rollback_for_pending_trans_(const ObXATransID &xid,
const ObTransID &tx_id,
const int64_t timeout_us,
const uint64_t tenant_id,
const int64_t request_id,
const bool is_tightly_coupled,
const share::ObLSID &coord);
int two_phase_xa_rollback_(const ObXATransID &xid,
const ObTransID &tx_id,
const int64_t timeout_us,