[xa][4.x] acquire data lock before set tx desc in session for xa
This commit is contained in:
@ -90,6 +90,7 @@ int ObTMService::tm_rm_start(ObExecContext &exec_ctx,
|
|||||||
// step 2, promote or start trans in current session
|
// step 2, promote or start trans in current session
|
||||||
if (OB_SUCCESS != ret) {
|
if (OB_SUCCESS != ret) {
|
||||||
} else {
|
} else {
|
||||||
|
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
|
||||||
const int64_t timeout_seconds = my_session->get_xa_end_timeout_seconds();
|
const int64_t timeout_seconds = my_session->get_xa_end_timeout_seconds();
|
||||||
//if (need_start || need_promote) {
|
//if (need_start || need_promote) {
|
||||||
// if (OB_FAIL(ObXAService::generate_xid(xid))) {
|
// if (OB_FAIL(ObXAService::generate_xid(xid))) {
|
||||||
@ -160,6 +161,7 @@ int ObTMService::tm_commit(ObExecContext &exec_ctx,
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected param", K(ret), KP(xa_service), KP(my_session), KP(tx_desc));
|
LOG_WARN("unexpected param", K(ret), KP(xa_service), KP(my_session), KP(tx_desc));
|
||||||
} else {
|
} else {
|
||||||
|
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
|
||||||
tx_id = tx_desc->tid();
|
tx_id = tx_desc->tid();
|
||||||
if (OB_FAIL(xa_service->commit_for_dblink_trans(tx_desc))) {
|
if (OB_FAIL(xa_service->commit_for_dblink_trans(tx_desc))) {
|
||||||
LOG_WARN("fail to commit for dblink trans", K(ret));
|
LOG_WARN("fail to commit for dblink trans", K(ret));
|
||||||
@ -193,6 +195,7 @@ int ObTMService::tm_rollback(ObExecContext &exec_ctx,
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected param", K(ret), KP(xa_service), KP(my_session), KP(tx_desc));
|
LOG_WARN("unexpected param", K(ret), KP(xa_service), KP(my_session), KP(tx_desc));
|
||||||
} else {
|
} else {
|
||||||
|
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
|
||||||
tx_id = tx_desc->tid();
|
tx_id = tx_desc->tid();
|
||||||
if (OB_FAIL(xa_service->rollback_for_dblink_trans(tx_desc))) {
|
if (OB_FAIL(xa_service->rollback_for_dblink_trans(tx_desc))) {
|
||||||
LOG_WARN("fail to rollback for dblink trans", K(ret));
|
LOG_WARN("fail to rollback for dblink trans", K(ret));
|
||||||
@ -227,6 +230,7 @@ int ObTMService::recover_tx_for_callback(const ObTransID &tx_id,
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected session", K(ret), K(tx_id), K(tx_desc->tid()));
|
LOG_WARN("unexpected session", K(ret), K(tx_id), K(tx_desc->tid()));
|
||||||
} else {
|
} else {
|
||||||
|
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
|
||||||
if (OB_FAIL(xa_service->recover_tx_for_dblink_callback(tx_id, tx_desc))) {
|
if (OB_FAIL(xa_service->recover_tx_for_dblink_callback(tx_id, tx_desc))) {
|
||||||
LOG_WARN("fail to recover tx for dblink callback", K(ret), K(tx_id));
|
LOG_WARN("fail to recover tx for dblink callback", K(ret), K(tx_id));
|
||||||
} else if (NULL == tx_desc || tx_desc->get_xid().empty()) {
|
} else if (NULL == tx_desc || tx_desc->get_xid().empty()) {
|
||||||
@ -254,6 +258,7 @@ int ObTMService::revert_tx_for_callback(ObExecContext &exec_ctx)
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected session", K(ret), K(tx_desc->tid()));
|
LOG_WARN("unexpected session", K(ret), K(tx_desc->tid()));
|
||||||
} else {
|
} else {
|
||||||
|
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
|
||||||
tx_id = tx_desc->tid();
|
tx_id = tx_desc->tid();
|
||||||
if (OB_FAIL(xa_service->revert_tx_for_dblink_callback(tx_desc))) {
|
if (OB_FAIL(xa_service->revert_tx_for_dblink_callback(tx_desc))) {
|
||||||
LOG_WARN("fail to recover tx for dblink callback", K(ret), K(tx_id));
|
LOG_WARN("fail to recover tx for dblink callback", K(ret), K(tx_id));
|
||||||
|
|||||||
@ -71,6 +71,8 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
|
|||||||
LOG_ERROR("fail to get trans timeout ts", K(ret));
|
LOG_ERROR("fail to get trans timeout ts", K(ret));
|
||||||
} else if (FALSE_IT(tenant_id = my_session->get_effective_tenant_id())) {
|
} else if (FALSE_IT(tenant_id = my_session->get_effective_tenant_id())) {
|
||||||
} else {
|
} else {
|
||||||
|
ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock());
|
||||||
|
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
|
||||||
const int64_t flags = stmt.get_flags();
|
const int64_t flags = stmt.get_flags();
|
||||||
transaction::ObTxParam &tx_param = plan_ctx->get_trans_param();
|
transaction::ObTxParam &tx_param = plan_ctx->get_trans_param();
|
||||||
const bool is_readonly = ObXAFlag::contain_tmreadonly(flags);
|
const bool is_readonly = ObXAFlag::contain_tmreadonly(flags);
|
||||||
@ -196,6 +198,8 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt)
|
|||||||
ret = OB_TRANS_XA_NOTA;
|
ret = OB_TRANS_XA_NOTA;
|
||||||
TRANS_LOG(WARN, "xid not match", K(ret), K(xid));
|
TRANS_LOG(WARN, "xid not match", K(ret), K(xid));
|
||||||
} else {
|
} else {
|
||||||
|
ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock());
|
||||||
|
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
|
||||||
int64_t flags = stmt.get_flags();
|
int64_t flags = stmt.get_flags();
|
||||||
flags = my_session->has_tx_level_temp_table() ? (flags | ObXAFlag::TEMPTABLE) : flags;
|
flags = my_session->has_tx_level_temp_table() ? (flags | ObXAFlag::TEMPTABLE) : flags;
|
||||||
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_end(xid, flags,
|
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_end(xid, flags,
|
||||||
|
|||||||
@ -678,9 +678,11 @@ int ObXAService::recover_tx_for_dblink_callback(const ObTransID &tx_id,
|
|||||||
} else {
|
} else {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
if (OB_SUCCESS != ret) {
|
||||||
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
|
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
TRANS_LOG(INFO, "recover tx for dblink callback", K(ret), K(tx_id));
|
TRANS_LOG(INFO, "recover tx for dblink callback", K(ret), K(tx_id));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -698,12 +700,15 @@ int ObXAService::revert_tx_for_dblink_callback(ObTxDesc *&tx_desc)
|
|||||||
if (NULL == xa_ctx) {
|
if (NULL == xa_ctx) {
|
||||||
ret = ret = OB_ERR_UNEXPECTED;
|
ret = ret = OB_ERR_UNEXPECTED;
|
||||||
TRANS_LOG(WARN, "xa transaction context is null", K(ret), K(tx_id));
|
TRANS_LOG(WARN, "xa transaction context is null", K(ret), K(tx_id));
|
||||||
} else if (OB_FAIL(xa_ctx->revert_tx_for_dblink_callback(tx_desc))) {
|
} else {
|
||||||
|
if (OB_FAIL(xa_ctx->revert_tx_for_dblink_callback(tx_desc))) {
|
||||||
TRANS_LOG(WARN, "fail to revert tx for dblink callback", K(ret), K(tx_id));
|
TRANS_LOG(WARN, "fail to revert tx for dblink callback", K(ret), K(tx_id));
|
||||||
} else {
|
} else {
|
||||||
// NOTE that tx_desc is null currently
|
// NOTE that tx_desc is null currently
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
TRANS_LOG(INFO, "revert tx for dblink callback", K(ret), K(tx_id));
|
TRANS_LOG(INFO, "revert tx for dblink callback", K(ret), K(tx_id));
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
Reference in New Issue
Block a user