[FEAT MERGE] patch mysql xa from 425 to 435

This commit is contained in:
jw-guo 2024-11-21 08:45:04 +00:00 committed by ob-robot
parent e63b95a490
commit 8f3fafe9f2
37 changed files with 2310 additions and 211 deletions

View File

@ -68,6 +68,14 @@ FLT_DEF_SPAN(com_query_process, "com_query process")
FLT_DEF_SPAN(end_transaction, "end of transaction")
// for transaction end
// for xa transaction
FLT_DEF_SPAN(xa_start, "xa start")
FLT_DEF_SPAN(xa_end, "xa end")
FLT_DEF_SPAN(xa_prepare, "xa prepare")
FLT_DEF_SPAN(xa_commit, "xa commit")
FLT_DEF_SPAN(xa_rollback, "xa rollback")
// for xa transaction end
// for ps
FLT_DEF_SPAN(ps_prepare, "prepare phase of ps protocol")
FLT_DEF_SPAN(ps_execute, "execute phase of ps protocol")

View File

@ -180,6 +180,8 @@ static const NonReservedKeyword Mysql_pl_none_reserved_keywords[] =
{"submit", SUBMIT},
{"job", JOB},
{"cancel", CANCEL},
{"xa", XA},
{"recover", RECOVER},
};
const NonReservedKeyword *mysql_pl_non_reserved_keyword_lookup(const char *word)

View File

@ -223,7 +223,7 @@ void obpl_mysql_wrap_get_user_var_into_subquery(ObParseCtx *parse_ctx, ParseNode
DATA DEFINER END_KEY EXTEND FOLLOWS FOUND FUNCTION HANDLER INTERFACE INVOKER JSON LANGUAGE
MESSAGE_TEXT MYSQL_ERRNO NATIONAL NEXT NO OF OPEN PACKAGE PRAGMA PRECEDES RECORD RETURNS ROW ROWTYPE
SCHEMA_NAME SECURITY SUBCLASS_ORIGIN TABLE_NAME USER TYPE VALUE DATETIME TIMESTAMP TIME DATE YEAR
TEXT NCHAR NVARCHAR BOOL BOOLEAN ENUM BIT FIXED SIGNED ROLE SUBMIT CANCEL JOB
TEXT NCHAR NVARCHAR BOOL BOOLEAN ENUM BIT FIXED SIGNED ROLE SUBMIT CANCEL JOB XA RECOVER
//-----------------------------non_reserved keyword end---------------------------------------------
%right END_KEY
%left ELSE IF ELSEIF
@ -235,7 +235,7 @@ void obpl_mysql_wrap_get_user_var_into_subquery(ObParseCtx *parse_ctx, ParseNode
%nonassoc AUTHID INTERFACE
%nonassoc DECLARATION
%type <node> sql_keyword
%type <node> sql_keyword xa_keyword
%type <non_reserved_keyword> unreserved_keyword
%type <node> stmt_block stmt_list stmt outer_stmt sp_proc_outer_statement sp_proc_inner_statement sp_proc_independent_statement
%type <node> create_procedure_stmt sp_proc_stmt expr expr_list procedure_body default_expr
@ -401,6 +401,13 @@ sql_stmt_prefix:
| LOAD { $$ = NULL; }
;
xa_keyword:
BEGIN_KEY { $$ = NULL; }
| SQL_KEYWORD { $$ = NULL; }
| END_KEY { $$ = NULL; }
| RECOVER { $$ = NULL; }
;
sql_stmt:
sql_stmt_prefix /*sql stmt tail*/
{
@ -416,6 +423,13 @@ sql_stmt:
do_parse_sql_stmt(sql_stmt, parse_ctx, @1.first_column, @1.last_column, 2, ';', END_P);
malloc_non_terminal_node($$, parse_ctx->mem_pool_, T_SQL_STMT, 1, sql_stmt);
}
| XA xa_keyword /*sql stmt tail*/
{
//read sql query string直到读到token';'或者END_P
ParseNode *sql_stmt = NULL;
do_parse_sql_stmt(sql_stmt, parse_ctx, @1.first_column, @1.last_column, 2, ';', END_P);
malloc_non_terminal_node($$, parse_ctx->mem_pool_, T_SQL_STMT, 1, sql_stmt);
}
| CREATE sql_keyword /*sql stmt tail*/
{
//read sql query string直到读到token';'或者END_P
@ -800,6 +814,8 @@ unreserved_keyword:
| BIT
| FIXED
| SIGNED
| XA
| RECOVER
;
/*****************************************************************************

View File

@ -48,6 +48,13 @@ int ObEndTransExecutor::end_trans(ObExecContext &ctx, ObEndTransStmt &stmt)
LOG_ERROR("session ptr is null", K(ret));
} else if (my_session->is_in_transaction() &&
my_session->associated_xa()) {
if (lib::is_mysql_mode()) {
// mysql mode
ret = OB_TRANS_XA_RMFAIL;
LOG_WARN("the command cannot be executed in xa trans", K(ret), K(my_session->get_xid()));
ctx.set_need_disconnect(false);
} else {
// oracle mode
#ifdef OB_BUILD_ORACLE_PL
transaction::ObTxDesc *tx_desc = my_session->get_tx_desc();
const transaction::ObXATransID xid = my_session->get_xid();
@ -92,6 +99,7 @@ int ObEndTransExecutor::end_trans(ObExecContext &ctx, ObEndTransStmt &stmt)
}
ctx.set_need_disconnect(false);
#endif
}
} else if (OB_FAIL(ObSqlTransControl::explicit_end_trans(ctx, stmt.get_is_rollback(), stmt.get_hint()))) {
LOG_WARN("fail end trans", K(ret));
}

View File

@ -28,17 +28,379 @@ using namespace common;
using namespace transaction;
namespace sql
{
int ObXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
int ObXaExecutorUtil::build_tx_param(ObSQLSessionInfo *session, transaction::ObTxParam &param)
{
int ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "XA protocol start interface");
UNUSED(ctx);
UNUSED(stmt);
// 暂时禁掉mysql模式下的xa调用
int ret = OB_SUCCESS;
int64_t org_cluster_id = OB_INVALID_ORG_CLUSTER_ID;
if (OB_FAIL(get_org_cluster_id(session, org_cluster_id))) {
LOG_WARN("get original cluster id failed", K(ret));
} else {
int64_t tx_timeout_us = 0;
bool is_read_only = session->get_tx_read_only();
session->get_tx_timeout(tx_timeout_us);
param.timeout_us_ = tx_timeout_us;
param.lock_timeout_us_ = session->get_trx_lock_timeout();
param.access_mode_ = is_read_only ? ObTxAccessMode::RD_ONLY : ObTxAccessMode::RW;
param.isolation_ = session->get_tx_isolation();
param.cluster_id_ = org_cluster_id;
}
return ret;
}
int64_t ObXaExecutorUtil::get_query_timeout(ObSQLSessionInfo *session)
{
int ret = OB_SUCCESS;
int64_t query_timeout = 10L * 1000 * 1000; // 10 seconds by default
if (NULL != session) {
if (OB_FAIL(session->get_query_timeout(query_timeout))) {
LOG_WARN("get query timeout failed", K(ret), K(query_timeout));
query_timeout = 10L * 1000 * 1000;
}
}
return query_timeout;
}
// for mysql xa start
int ObXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
{
int ret = OB_SUCCESS;
FLTSpanGuard(xa_start);
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx);
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
ObXATransID xid;
ObTransID tx_id;
const int64_t start_ts = ObTimeUtility::current_time();
ObXAStmtGuard xa_stmt_guard(start_ts);
if (OB_ISNULL(plan_ctx) || OB_ISNULL(my_session)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid param", K(ret), K(plan_ctx), K(my_session));
} else if (lib::is_oracle_mode()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected in oracle mode", K(ret));
} else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) {
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED;
LOG_WARN("not support tx free route for dblink trans");
} else if (OB_FAIL(xid.set(stmt.get_gtrid_string(),
stmt.get_bqual_string(),
stmt.get_format_id()))) {
LOG_WARN("set xid failed", K(ret), K(stmt));
} else if (my_session->get_in_transaction()) {
ObTxDesc *&tx_desc = my_session->get_tx_desc();
ret = OB_TRANS_XA_OUTSIDE;
LOG_WARN("already start trans", K(ret), K(xid), K(tx_desc->tid()),
K(tx_desc->get_xid()));
} else {
ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock());
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
const int64_t flags = stmt.get_flags();
transaction::ObTxParam &tx_param = plan_ctx->get_trans_param();
ObTxDesc *&tx_desc = my_session->get_tx_desc();
if (OB_FAIL(ObXaExecutorUtil::build_tx_param(my_session, tx_param))) {
LOG_WARN("build tx param failed", K(ret));
} else if (OB_FAIL(MTL(transaction::ObXAService*)->xa_start_for_mysql(xid,
flags, my_session->get_sessid(), tx_param, tx_desc))) {
LOG_WARN("mysql xa start failed", K(ret), K(tx_param));
my_session->get_trans_result().reset();
my_session->reset_tx_variable();
ctx.set_need_disconnect(false);
} else {
// associate xa with session
my_session->associate_xa(xid);
my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id();
FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id());
}
}
// for statistics
const int64_t used_time_us = ObTimeUtility::current_time() - start_ts;
XA_STAT_ADD_XA_START_TOTAL_COUNT();
XA_STAT_ADD_XA_START_TOTAL_USED_TIME(used_time_us);
if (OB_FAIL(ret)) {
XA_STAT_ADD_XA_START_FAIL_COUNT();
}
LOG_INFO("mysql xa start", K(ret), K(xid), K(tx_id));
return ret;
}
// for mysql xa end
int ObXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt)
{
int ret = OB_SUCCESS;
FLTSpanGuard(xa_end);
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
ObXATransID xid;
ObTransID tx_id;
const int64_t start_ts = ObTimeUtility::current_time();
ObXAStmtGuard xa_stmt_guard(start_ts);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid param", K(ret), K(my_session));
} else if (lib::is_oracle_mode()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected in oracle mode", K(ret));
} else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) {
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED;
LOG_WARN("not support tx free route for dblink trans");
} else if (!my_session->get_in_transaction()) {
ret = OB_TRANS_XA_RMFAIL;
LOG_WARN("not in transaction", K(ret));
} else if (OB_FAIL(xid.set(stmt.get_gtrid_string(),
stmt.get_bqual_string(),
stmt.get_format_id()))) {
LOG_WARN("set xid failed", K(ret), K(stmt));
} else if (!xid.all_equal_to(my_session->get_xid())) {
ret = OB_TRANS_XA_NOTA;
LOG_WARN("unknown xid", K(ret), K(xid), K(my_session->get_xid()));
} else {
ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock());
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
ObTxDesc *&tx_desc = my_session->get_tx_desc();
FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id());
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_end_for_mysql(xid, tx_desc))) {
LOG_WARN("mysql xa end failed", K(ret), K(xid));
}
}
// for statistics
const int64_t used_time_us = ObTimeUtility::current_time() - start_ts;
XA_STAT_ADD_XA_END_TOTAL_COUNT();
XA_STAT_ADD_XA_END_TOTAL_USED_TIME(used_time_us);
if (OB_FAIL(ret)) {
XA_STAT_ADD_XA_END_FAIL_COUNT();
}
LOG_INFO("mysql xa end", K(ret), K(xid));
return ret;
}
// for mysql xa prepare
int ObXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt)
{
int ret = OB_SUCCESS;
FLTSpanGuard(xa_prepare);
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
ObXATransID xid;
ObTransID tx_id;
const int64_t start_ts = ObTimeUtility::current_time();
ObXAStmtGuard xa_stmt_guard(start_ts);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid param", K(ret), K(my_session));
} else if (lib::is_oracle_mode()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected in oracle mode", K(ret));
} else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) {
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED;
LOG_WARN("not support tx free route for dblink trans");
} else if (!my_session->get_in_transaction()) {
ret = OB_TRANS_XA_RMFAIL;
LOG_WARN("not in transaction", K(ret));
} else if (OB_FAIL(xid.set(stmt.get_gtrid_string(),
stmt.get_bqual_string(),
stmt.get_format_id()))) {
LOG_WARN("set xid failed", K(ret), K(stmt));
} else if (!xid.all_equal_to(my_session->get_xid())) {
ret = OB_TRANS_XA_NOTA;
LOG_WARN("unknown xid", K(ret), K(xid), K(my_session->get_xid()));
} else {
// in two cases, xa trans need exit
// case one, xa prepare succeeds
// case two, xa trans need rollback
const int64_t timeout_us = ObXaExecutorUtil::get_query_timeout(my_session);
bool need_exit = false;
ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock());
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
ObTxDesc *&tx_desc = my_session->get_tx_desc();
my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id();
FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id());
if (0 >= timeout_us) {
ret = OB_TRANS_STMT_TIMEOUT;
LOG_WARN("xa stmt timeout", K(ret), K(xid), K(timeout_us));
} else if (OB_FAIL(MTL(transaction::ObXAService*)->xa_prepare_for_mysql(xid, timeout_us,
tx_desc, need_exit))) {
LOG_WARN("mysql xa prepare failed", K(ret), K(xid));
}
if (need_exit) {
my_session->get_trans_result().reset();
my_session->reset_tx_variable();
my_session->disassociate_xa();
ctx.set_need_disconnect(false);
}
}
// for statistics
const int64_t used_time_us = ObTimeUtility::current_time() - start_ts;
XA_STAT_ADD_XA_PREPARE_TOTAL_COUNT();
XA_STAT_ADD_XA_PREPARE_TOTAL_USED_TIME(used_time_us);
if (OB_SUCCESS != ret && OB_TRANS_XA_RDONLY != ret) {
XA_STAT_ADD_XA_PREPARE_FAIL_COUNT();
}
LOG_INFO("mysql xa prepare", K(ret), K(xid));
return ret;
}
// for mysql xa commit
int ObXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt)
{
int ret = OB_SUCCESS;
FLTSpanGuard(xa_commit);
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
ObXATransID xid;
ObTransID tx_id;
const int64_t start_ts = ObTimeUtility::current_time();
ObXAStmtGuard xa_stmt_guard(start_ts);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid param", K(ret), K(my_session));
} else if (lib::is_oracle_mode()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected in oracle mode", K(ret));
} else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) {
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED;
LOG_WARN("not support tx free route for dblink trans");
} else if (OB_FAIL(xid.set(stmt.get_gtrid_string(),
stmt.get_bqual_string(),
stmt.get_format_id()))) {
LOG_WARN("set xid failed", K(ret), K(stmt));
} else {
const int64_t timeout_us = ObXaExecutorUtil::get_query_timeout(my_session);
ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock());
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
const int64_t flags = stmt.get_flags();
if (0 >= timeout_us) {
ret = OB_TRANS_STMT_TIMEOUT;
LOG_WARN("xa stmt timeout", K(ret), K(xid), K(timeout_us));
} else if (ObXAFlag::is_tmonephase(flags)) {
bool need_exit = false;
// one phase xa commit
if (!my_session->get_in_transaction()) {
ret = OB_TRANS_XA_RMFAIL;
LOG_WARN("no xa trans for one phase xa commit", K(ret), K(xid));
} else if (my_session->get_xid().empty()) {
ret = OB_TRANS_XA_NOTA;
LOG_WARN("unknown xid", K(ret), K(xid));
} else if (!xid.all_equal_to(my_session->get_xid())) {
ret = OB_TRANS_XA_RMFAIL;
LOG_WARN("unexpected xid", K(ret), K(xid), K(my_session->get_xid()));
} else {
ObTxDesc *&tx_desc = my_session->get_tx_desc();
my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id();
FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id());
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_commit_onephase_for_mysql(xid,
timeout_us, tx_desc, need_exit))) {
LOG_WARN("mysql xa commit failed", K(ret), K(xid));
}
if (need_exit) {
my_session->get_trans_result().reset();
my_session->reset_tx_variable();
my_session->disassociate_xa();
ctx.set_need_disconnect(false);
}
}
} else if (ObXAFlag::is_tmnoflags_for_mysql(flags)) {
// two phase xa commit
const bool is_rollback = false;
if (my_session->get_in_transaction()) {
if (my_session->get_xid().empty()) {
ret = OB_TRANS_XA_NOTA;
LOG_WARN("unknown xid", K(ret), K(xid));
} else {
ret = OB_TRANS_XA_RMFAIL;
LOG_WARN("can not be executed in this state", K(ret), K(xid), K(my_session->get_xid()));
}
} else if (OB_FAIL(MTL(transaction::ObXAService*)->xa_second_phase_twophase_for_mysql(xid,
timeout_us, is_rollback, tx_id))) {
LOG_WARN("mysql xa commit failed", K(ret), K(xid));
}
my_session->get_raw_audit_record().trans_id_ = tx_id;
FLT_SET_TAG(trans_id, tx_id.get_id());
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected flags for mysql xa commit", K(ret), K(xid));
}
}
// for statistics
const int64_t used_time_us = ObTimeUtility::current_time() - start_ts;
XA_STAT_ADD_XA_COMMIT_TOTAL_COUNT();
XA_STAT_ADD_XA_COMMIT_TOTAL_USED_TIME(used_time_us);
if (OB_FAIL(ret)) {
XA_STAT_ADD_XA_COMMIT_FAIL_COUNT();
}
LOG_INFO("mysql xa commit", K(ret), K(xid));
return ret;
}
// for mysql xa rollback
int ObXaRollbackExecutor::execute(ObExecContext &ctx, ObXaRollBackStmt &stmt)
{
int ret = OB_SUCCESS;
FLTSpanGuard(xa_rollback);
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
ObXATransID xid;
ObTransID tx_id;
const int64_t start_ts = ObTimeUtility::current_time();
ObXAStmtGuard xa_stmt_guard(start_ts);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid param", K(ret), K(my_session));
} else if (lib::is_oracle_mode()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected in oracle mode", K(ret));
} else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) {
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED;
LOG_WARN("not support tx free route for dblink trans");
} else if (OB_FAIL(xid.set(stmt.get_gtrid_string(),
stmt.get_bqual_string(),
stmt.get_format_id()))) {
LOG_WARN("set xid failed", K(ret), K(stmt));
} else {
bool need_exit = false;
const int64_t timeout_us = ObXaExecutorUtil::get_query_timeout(my_session);
ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock());
ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock());
if (0 >= timeout_us) {
ret = OB_TRANS_STMT_TIMEOUT;
LOG_WARN("xa stmt timeout", K(ret), K(xid), K(timeout_us));
} else if (!my_session->get_in_transaction()) {
// try two phase xa rollback
const bool is_rollback = true;
if (OB_FAIL(MTL(transaction::ObXAService*)->xa_second_phase_twophase_for_mysql(xid,
timeout_us, is_rollback, tx_id))) {
LOG_WARN("mysql xa rollback failed", K(ret), K(xid));
}
my_session->get_raw_audit_record().trans_id_ = tx_id;
FLT_SET_TAG(trans_id, tx_id.get_id());
} else {
// try one phase xa rollback
ObTxDesc *&tx_desc = my_session->get_tx_desc();
my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id();
FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id());
if (my_session->get_xid().empty()) {
ret = OB_TRANS_XA_NOTA;
LOG_WARN("unknown xid", K(ret), K(xid));
} else if (!xid.all_equal_to(my_session->get_xid())) {
ret = OB_TRANS_XA_RMFAIL;
LOG_WARN("unexpected xid", K(ret), K(xid), K(my_session->get_xid()));
} else if (OB_FAIL(MTL(transaction::ObXAService*)->xa_rollback_onephase_for_mysql(xid,
timeout_us, tx_desc, need_exit))) {
LOG_WARN("mysql xa rollback failed", K(ret), K(xid));
}
if (need_exit) {
my_session->get_trans_result().reset();
my_session->reset_tx_variable();
my_session->disassociate_xa();
ctx.set_need_disconnect(false);
}
}
}
// for statistics
const int64_t used_time_us = ObTimeUtility::current_time() - start_ts;
XA_STAT_ADD_XA_ROLLBACK_TOTAL_COUNT();
XA_STAT_ADD_XA_ROLLBACK_TOTAL_USED_TIME(used_time_us);
if (OB_FAIL(ret)) {
XA_STAT_ADD_XA_ROLLBACK_FAIL_COUNT();
}
LOG_INFO("mysql xa rollback", K(ret), K(xid));
return ret;
}
// for oracle xa start
int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
{
int ret = OB_SUCCESS;
@ -71,7 +433,7 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
} else if (true == my_session->is_for_trigger_package()) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("not support xa start in trigger", K(ret), K(xid));
} else if (OB_FAIL(get_org_cluster_id_(my_session, org_cluster_id))) {
} else if (OB_FAIL(ObXaExecutorUtil::get_org_cluster_id(my_session, org_cluster_id))) {
} else if (OB_FAIL(my_session->get_tx_timeout(tx_timeout))) {
LOG_ERROR("fail to get trans timeout ts", K(ret));
} else if (FALSE_IT(tenant_id = my_session->get_effective_tenant_id())) {
@ -131,7 +493,7 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
return ret;
}
int ObPlXaStartExecutor::get_org_cluster_id_(ObSQLSessionInfo *session, int64_t &org_cluster_id) {
int ObXaExecutorUtil::get_org_cluster_id(ObSQLSessionInfo *session, int64_t &org_cluster_id) {
int ret = OB_SUCCESS;
if (OB_FAIL(session->get_ob_org_cluster_id(org_cluster_id))) {
LOG_WARN("fail to get ob_org_cluster_id", K(ret));
@ -152,45 +514,6 @@ int ObPlXaStartExecutor::get_org_cluster_id_(ObSQLSessionInfo *session, int64_t
return ret;
}
int ObXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt)
{
int ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "XA protocol end interface");
UNUSED(ctx);
UNUSED(stmt);
// 暂时禁掉mysql模式下的xa调用
return ret;
/*
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx();
//storage::ObPartitionService *ps = nullptr;
// if (OB_ISNULL(my_session) || OB_ISNULL(ps = task_exec_ctx.get_partition_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid param", K(ret), K(my_session));
// mysql调用flags置为TMSUCCESS
} else if (OB_FAIL(ps->xa_end(stmt.get_xa_string(),
transaction::ObXAEndFlag::TMSUCCESS,
my_session->get_trans_desc()))) {
LOG_WARN("xa end failed", K(ret), K(stmt.get_xa_string()));
// 如果是OB_TRANS_XA_RMFAIL错误那么由用户决定是否回滚
if (OB_TRANS_XA_RMFAIL != ret
&& OB_SUCCESS != (tmp_ret = ObSqlTransControl::explicit_end_trans(ctx, true))) {
ret = tmp_ret;
LOG_WARN("explicit end trans failed", K(ret));
}
} else {
my_session->reset_tx_variable();
ctx.set_need_disconnect(false);
my_session->get_trans_desc().get_standalone_stmt_desc().reset();
}
LOG_DEBUG("xa end execute", K(stmt.get_xa_string()));
return ret;
*/
}
int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt)
{
int ret = OB_SUCCESS;
@ -263,49 +586,6 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt)
return ret;
}
int ObXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt)
{
int ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "XA protocol prepare interface");
UNUSED(ctx);
UNUSED(stmt);
// 暂时禁掉mysql模式下的xa调用
return ret;
/*
int ret = OB_SUCCESS;
//int tmp_ret = OB_SUCCESS;
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx();
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx);
//storage::ObPartitionService *ps = nullptr;
if (OB_ISNULL(my_session) || OB_ISNULL(plan_ctx)
// || OB_ISNULL(ps = task_exec_ctx.get_partition_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid param", K(ret), K(my_session), K(plan_ctx), K(ps));
} else if (my_session->get_in_transaction()) {
ret = OB_TRANS_XA_OUTSIDE;
LOG_WARN("already start trans", K(ret));
} else {
const uint64_t tenant_id = my_session->get_effective_tenant_id();
int64_t timeout = plan_ctx->get_trans_timeout_timestamp();
transaction::ObStartTransParam &start_trans_param = plan_ctx->get_start_trans_param();
init_start_trans_param(my_session, task_exec_ctx, start_trans_param);
if (OB_FAIL(ps->xa_prepare(stmt.get_xa_string(), tenant_id, timeout))) {
LOG_WARN("xa prepare failed", K(ret), K(stmt.get_xa_string()));
// 如果是OB_TRANS_XA_RMFAIL错误那么由用户决定是否回滚
// if (OB_TRANS_XA_RMFAIL != ret
// && OB_SUCCESS != (tmp_ret = ObSqlTransControl::explicit_end_trans(ctx, true))) {
// ret = tmp_ret;
// LOG_WARN("explicit end trans failed", K(ret));
// }
}
}
LOG_DEBUG("xa prepare execute", K(stmt.get_xa_string()));
return ret;
*/
}
int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt)
{
int ret = OB_SUCCESS;
@ -350,18 +630,6 @@ int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt)
return ret;
}
int ObXaEndTransExecutor::execute_(const ObString &xid,
const bool is_rollback, ObExecContext &ctx)
{
int ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "XA protocol end trans interface");
UNUSED(xid);
UNUSED(is_rollback);
UNUSED(ctx);
// 暂时禁掉mysql模式下的xa调用
return ret;
}
int ObPlXaEndTransExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt)
{
int ret = OB_SUCCESS;

View File

@ -23,6 +23,14 @@ namespace sql
{
class ObExecContext;
class ObXaStartStmt;
class ObXaExecutorUtil
{
public:
static int get_org_cluster_id(ObSQLSessionInfo *session, int64_t &org_cluster_id);
static int build_tx_param(ObSQLSessionInfo *session, transaction::ObTxParam &param);
static int64_t get_query_timeout(ObSQLSessionInfo *session);
};
class ObXaStartExecutor
{
public:
@ -89,23 +97,25 @@ private:
class ObXaCommitStmt;
class ObXaRollBackStmt;
class ObXaEndTransExecutor
class ObXaCommitExecutor
{
public:
ObXaEndTransExecutor() {}
~ObXaEndTransExecutor() {}
int execute(ObExecContext &ctx, ObXaCommitStmt &stmt)
{
return execute_(stmt.get_xa_string(), false, ctx);
}
int execute(ObExecContext &ctx, ObXaRollBackStmt &stmt)
{
return execute_(stmt.get_xa_string(), true, ctx);
}
ObXaCommitExecutor() {}
~ObXaCommitExecutor() {}
int execute(ObExecContext &ctx, ObXaCommitStmt &stmt);
private:
int execute_(const common::ObString& xid, const bool is_rollback,
ObExecContext &ctx);
DISALLOW_COPY_AND_ASSIGN(ObXaEndTransExecutor);
DISALLOW_COPY_AND_ASSIGN(ObXaCommitExecutor);
};
class ObXaRollbackExecutor
{
public:
ObXaRollbackExecutor() {}
~ObXaRollbackExecutor() {}
int execute(ObExecContext &ctx, ObXaRollBackStmt &stmt);
private:
DISALLOW_COPY_AND_ASSIGN(ObXaRollbackExecutor);
};
class ObPlXaEndTransExecutor

View File

@ -911,11 +911,11 @@ int ObCmdExecutor::execute(ObExecContext &ctx, ObICmd &cmd)
break;
}
case stmt::T_XA_COMMIT: {
DEFINE_EXECUTE_CMD(ObXaCommitStmt, ObXaEndTransExecutor);
DEFINE_EXECUTE_CMD(ObXaCommitStmt, ObXaCommitExecutor);
break;
}
case stmt::T_XA_ROLLBACK: {
DEFINE_EXECUTE_CMD(ObXaRollBackStmt, ObXaEndTransExecutor);
DEFINE_EXECUTE_CMD(ObXaRollBackStmt, ObXaRollbackExecutor);
break;
}
case stmt::T_ALTER_DISKGROUP_ADD_DISK: {

View File

@ -377,13 +377,35 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause)
// propagated to the trans start node
need_abort_tx = true;
}
// if XA-txn is on this server, we have acquired its ref, release ref
// and disassocate with session
if (tx_desc->is_xa_trans() && tx_desc->get_addr() == GCONF.self_addr_) {
ObTransService *txs = MTL(ObTransService*);
CK (OB_NOT_NULL(txs), session_id, tx_id);
OZ (txs->release_tx_ref(*tx_desc), session_id, tx_id);
session->get_tx_desc() = NULL;
if (tx_desc->is_xa_trans()) {
if (lib::is_mysql_mode()) {
// for mysql mode
if (OB_DEAD_LOCK == cause) {
need_abort_tx = true;
} else {
// do nothing
}
} else {
// for oracle mode
// in this case, OB_DEAD_LOCK is unexpected
if (OB_DEAD_LOCK == cause) {
// for deadlock
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected cause", KR(ret), K(cause));
} else if (tx_desc->get_addr() == GCONF.self_addr_) {
// for xa loosely coupled mode and xa start resume in a new server
// (not the server xa start first)
// therefore, the temp session for free route can be in the server xa start first
// if XA-txn is on this server, we have acquired its ref, release ref
// and disassocate with session
ObTransService *txs = MTL(ObTransService*);
CK (OB_NOT_NULL(txs), session_id, tx_id);
OZ (txs->release_tx_ref(*tx_desc), session_id, tx_id);
session->get_tx_desc() = NULL;
} else {
// do nothing
}
}
}
} else if (tx_desc->is_xa_trans()) {
const transaction::ObXATransID xid = session->get_xid();
@ -391,17 +413,39 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause)
ObXAService *xas = MTL(ObXAService *);
CK (OB_NOT_NULL(xas));
if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) {
OZ (xas->handle_terminate_for_xa_branch(session->get_xid(), tx_desc, session->get_xa_end_timeout_seconds()),
xid, global_tx_type, session_id, tx_id);
// currently, tx_desc is NULL
if (lib::is_mysql_mode()) {
// for mysql mode
if (OB_DEAD_LOCK == cause) {
// for dead lock
need_abort_tx = true;
} else {
// for session termiante
OZ (xas->handle_terminate_for_mysql(session->get_xid(), tx_desc),
xid, global_tx_type, session_id, tx_id);
// currently, tx_desc is NULL
session->get_tx_desc() = NULL;
}
} else {
// for oracle mode
if (OB_DEAD_LOCK == cause) {
// for deadlock
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected cause", KR(ret), K(cause));
} else {
OZ (xas->handle_terminate_for_xa_branch(session->get_xid(), tx_desc, session->get_xa_end_timeout_seconds()),
xid, global_tx_type, session_id, tx_id);
// currently, tx_desc is NULL
session->get_tx_desc() = NULL;
}
}
} else if (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type) {
OZ (xas->rollback_for_dblink_trans(tx_desc), ret, xid, global_tx_type, tx_id);
// currently, tx_desc is NULL
session->get_tx_desc() = NULL;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected global trans type", K(ret), K(xid), K(global_tx_type), K(tx_id));
}
session->get_tx_desc() = NULL;
} else {
need_abort_tx = true;
}

View File

@ -1754,6 +1754,12 @@ bool ObSQLUtils::is_readonly_stmt(ParseResult &result)
|| T_SHOW_SEQUENCES == type
|| T_SHOW_ENGINE == type
|| T_SHOW_OPEN_TABLES == type
|| T_XA_START == type
|| T_XA_END == type
|| T_XA_PREPARE == type
|| T_XA_COMMIT == type
|| T_XA_ROLLBACK == type
|| T_XA_RECOVER == type
|| (T_SET_ROLE == type && lib::is_mysql_mode())
|| T_SHOW_CREATE_USER == type) {
ret = true;

View File

@ -1047,6 +1047,7 @@ static const NonReservedKeyword Mysql_none_reserved_keywords[] =
{"wrapper", WRAPPER},
{"x509", X509},
{"xa", XA},
{"xid", XID},
{"xml", XML},
{"xor", XOR},
{"year", YEAR},

View File

@ -818,7 +818,7 @@ B'([01])*'|0B([01])+ {
STORE_PARAM_NODE();
} else {
yylval->node->sql_str_off_ = yylloc->first_column - 1;
return HEX_STRING_VALUE;
return BIN_STRING_VALUE;
}
}

View File

@ -71,6 +71,7 @@ extern int easy_vsnprintf(char *buf, size_t size, const char *fmt, va_list args)
%token <node> CLIENT_VERSION
%token <node> MYSQL_DRIVER
%token <node> HEX_STRING_VALUE
%token <node> BIN_STRING_VALUE
%token <node> REMAP_TABLE_NAME
%token <node> REMAP_DATABASE_TABLE_NAME
%token <node> OUTLINE_DEFAULT_TOKEN/*use for outline parser to just filter hint of query_sql*/
@ -375,7 +376,7 @@ END_P SET_VAR DELIMITER
WAIT WARNINGS WASH WEEK WEIGHT_STRING WHENEVER WORK WRAPPER WINDOW WEAK WITH_COLUMN_GROUP WITHOUT
X509 XA XML
X509 XA XID XML
YEAR
@ -464,7 +465,7 @@ END_P SET_VAR DELIMITER
%type <node> parameterized_trim
%type <ival> opt_with_consistent_snapshot opt_config_scope opt_index_keyname opt_full opt_mode_flag opt_extended opt_extended_or_full
%type <node> opt_priority opt_low_priority delete_option delete_option_list opt_delete_option_list
%type <node> opt_work begin_stmt commit_stmt rollback_stmt opt_ignore opt_ignore_or_replace ignore_or_replace xa_begin_stmt xa_end_stmt xa_prepare_stmt xa_commit_stmt xa_rollback_stmt
%type <node> opt_work begin_stmt commit_stmt rollback_stmt opt_ignore opt_ignore_or_replace ignore_or_replace xa_begin_stmt xa_end_stmt xa_prepare_stmt xa_commit_stmt xa_rollback_stmt xa_recover_stmt xa_xid opt_join_or_resume opt_suspend opt_one_phase opt_convert_xid
%type <node> alter_table_stmt alter_table_actions alter_table_action_list alter_table_action alter_column_option alter_index_option alter_constraint_option standalone_alter_action alter_partition_option opt_to alter_tablegroup_option opt_table opt_tablegroup_option_list alter_tg_partition_option alter_column_group_option
%type <node> tablegroup_option_list tablegroup_option alter_tablegroup_actions alter_tablegroup_action tablegroup_option_list_space_seperated
%type <node> opt_tg_partition_option tg_hash_partition_option tg_key_partition_option tg_range_partition_option tg_subpartition_option tg_list_partition_option
@ -507,7 +508,7 @@ END_P SET_VAR DELIMITER
%type <node> flashback_stmt purge_stmt opt_flashback_rename_table opt_flashback_rename_database opt_flashback_rename_tenant
%type <node> tenant_name_list opt_tenant_list tenant_list_tuple cache_type flush_scope opt_zone_list
%type <node> into_opt into_clause field_opt field_term field_term_list line_opt line_term line_term_list into_var_list into_var file_partition_opt file_opt file_option_list file_option file_size_const
%type <node> string_list text_string string_val_list
%type <node> string_list text_string string_val_list ulong_num
%type <node> balance_task_type opt_balance_task_type
%type <node> list_expr list_partition_element list_partition_expr list_partition_list list_partition_option opt_list_partition_list opt_list_subpartition_list list_subpartition_list list_subpartition_element drop_partition_name_list
%type <node> primary_zone_name change_tenant_name_or_tenant_id distribute_method distribute_method_list
@ -732,6 +733,7 @@ stmt:
| xa_prepare_stmt { $$ = $1; check_question_mark($$, result); }
| xa_commit_stmt { $$ = $1; check_question_mark($$, result); }
| xa_rollback_stmt { $$ = $1; check_question_mark($$, result); }
| xa_recover_stmt { $$ = $1; check_question_mark($$, result); }
| optimize_stmt { $$ = $1; check_question_mark($$, result); }
| dump_memory_stmt { $$ = $1; check_question_mark($$, result); }
| get_diagnostics_stmt { $$ = $1; question_mark_issue($$, result); }
@ -1074,6 +1076,17 @@ STRING_VALUE %prec LOWER_THAN_COMP
$$->sql_str_off_ = $2->sql_str_off_;
$$->is_forbid_parameter_ = $2->is_forbid_parameter_;
}
| charset_introducer BIN_STRING_VALUE
{
/* _utf8mb4 0x42 作为字符串处理 */
malloc_non_terminal_node($$, result->malloc_pool_, T_VARCHAR, 1, $1);
$$->str_value_ = $2->str_value_;
$$->str_len_ = $2->str_len_;
$$->raw_text_ = $2->raw_text_;
$$->text_len_ = $2->text_len_;
$$->sql_str_off_ = $2->sql_str_off_;
$$->is_forbid_parameter_ = $2->is_forbid_parameter_;
}
| STRING_VALUE string_val_list %prec LOWER_THAN_COMP
{
ParseNode *str_node = NULL;
@ -1261,6 +1274,11 @@ complex_string_literal { $$ = $1; }
$$ = $1;
$$->type_ = T_HEX_STRING;
}
| BIN_STRING_VALUE
{
$$ = $1;
$$->type_ = T_HEX_STRING;
}
;
number_literal:
@ -6633,11 +6651,37 @@ STRING_VALUE
$$ = $1;
}
| HEX_STRING_VALUE
{
$$ = $1;
}
| BIN_STRING_VALUE
{
$$ = $1;
};
ulong_num:
INTNUM
{
if(T_UINT64 == $1->type_) {
yyerror(&@1, result, "formatID should not greater than INT64_MAX\n");
YYERROR;
}
$$ = $1;
}
| HEX_STRING_VALUE
{
int err_no = 0;
uint64_t val = 0;
val = ob_strntoull($1->raw_text_ + 2, $1->text_len_ - 2, 16, NULL, &err_no);
if(val > INT64_MAX || ERANGE == errno) {
YYABORT;
} else {
$1->value_ = val;
$$ = $1;
}
};
int_type_i:
TINYINT { $$[0] = T_TINYINT; }
| SMALLINT { $$[0] = T_SMALLINT; }
@ -16352,44 +16396,143 @@ BEGI opt_hint_value opt_work
******************************************************************************/
xa_begin_stmt:
XA START STRING_VALUE
XA START xa_xid opt_join_or_resume
{
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_START, 1, $3);
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_START, 2, $3, $4);
}
| XA BEGI STRING_VALUE
| XA BEGI xa_xid opt_join_or_resume
{
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_START, 1, $3);
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_START, 2, $3, $4);
}
;
xa_end_stmt:
XA END STRING_VALUE
XA END xa_xid opt_suspend
{
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_END, 1, $3);
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_END, 2, $3, $4);
}
;
xa_prepare_stmt:
XA PREPARE STRING_VALUE
XA PREPARE xa_xid
{
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_PREPARE, 1, $3);
}
;
xa_commit_stmt:
XA COMMIT STRING_VALUE
XA COMMIT xa_xid opt_one_phase
{
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_COMMIT, 1, $3);
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_COMMIT, 2, $3, $4);
}
;
xa_rollback_stmt:
XA ROLLBACK STRING_VALUE
XA ROLLBACK xa_xid
{
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_ROLLBACK, 1, $3);
}
;
xa_recover_stmt:
XA RECOVER opt_convert_xid
{
malloc_non_terminal_node($$, result->malloc_pool_, T_XA_RECOVER, 1, $3);
}
;
xa_xid:
text_string
{
if(64 < $1->str_len_) {
yyerror(&@1, result, "gtrid length should not greater than 64\n");
YYERROR;
}
malloc_non_terminal_node($$, result->malloc_pool_, T_LINK_NODE, 1, $1);
}
| text_string ',' text_string
{
if(64 < $1->str_len_) {
yyerror(&@1, result, "gtrid length should not greater than 64\n");
YYERROR;
}
if(64 < $3->str_len_) {
yyerror(&@3, result, "bqual length should not greater than 64\n");
YYERROR;
}
malloc_non_terminal_node($$, result->malloc_pool_, T_LINK_NODE, 2, $1, $3);
}
| text_string ',' text_string ',' ulong_num
{
if(64 < $1->str_len_) {
yyerror(&@1, result, "gtrid length should not greater than 64\n");
YYERROR;
}
if(64 < $3->str_len_) {
yyerror(&@3, result, "bqual length should not greater than 64\n");
YYERROR;
}
malloc_non_terminal_node($$, result->malloc_pool_, T_LINK_NODE, 3, $1, $3, $5);
}
opt_join_or_resume:
/* empty */
{
$$= NULL;
}
| JOIN
{
malloc_terminal_node($$, result->malloc_pool_, T_INT);
$$->value_ = 0;
}
| RESUME
{
malloc_terminal_node($$, result->malloc_pool_, T_INT);
$$->value_ = 1;
}
;
opt_suspend:
/* empty */
{
$$= NULL;
}
| SUSPEND
{
malloc_terminal_node($$, result->malloc_pool_, T_INT);
$$->value_ = 0;
}
| SUSPEND FOR MIGRATE
{
malloc_terminal_node($$, result->malloc_pool_, T_INT);
$$->value_ = 1;
}
;
opt_one_phase:
/* empty */
{
$$= NULL;
}
| ONE PHASE
{
malloc_terminal_node($$, result->malloc_pool_, T_INT);
$$->value_ = 0;
}
;
opt_convert_xid:
/* empty */
{
$$= NULL;
}
| CONVERT XID
{
malloc_terminal_node($$, result->malloc_pool_, T_INT);
$$->value_ = 0;
}
/*****************************************************************************
*
* commit grammer
@ -24462,6 +24605,7 @@ ACCESS_INFO
| WRAPPER
| X509
| XA
| XID
| XML
| YEAR
| ZONE

View File

@ -22,6 +22,7 @@
#include "observer/ob_server_struct.h"
#include "sql/resolver/dcl/ob_grant_resolver.h"
#include "observer/virtual_table/ob_tenant_all_tables.h"
#include "storage/tx/ob_xa_define.h"
#include "share/schema/ob_schema_printer.h"
using namespace oceanbase::common;
using namespace oceanbase::share;
@ -158,6 +159,7 @@ int ObShowResolver::resolve(const ParseNode &parse_tree)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_PROFILE)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_PROCEDURE_CODE)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_FUNCTION_CODE)
&& OB_UNLIKELY(parse_tree.type_ != T_XA_RECOVER)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_ENGINE)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_OPEN_TABLES)
&& OB_UNLIKELY(parse_tree.type_ != T_SHOW_CREATE_USER)
@ -1780,6 +1782,41 @@ int ObShowResolver::resolve(const ParseNode &parse_tree)
}();
break;
}
case T_XA_RECOVER: {
if (is_oracle_mode) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "xa recover in oracle mode is");
} else if (OB_UNLIKELY(parse_tree.num_child_ != 1)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parse tree is wrong",
K(ret),
K(parse_tree.num_child_),
K(parse_tree.children_));
} else {
if(parse_tree.children_[0] == NULL) {
GEN_SQL_STEP_1(ObShowSqlSet::XA_RECOVER);
GEN_SQL_STEP_2(ObShowSqlSet::XA_RECOVER,
OB_SYS_DATABASE_NAME,
OB_ALL_VIRTUAL_GLOBAL_TRANSACTION_TNAME,
transaction::ObXATransState::PREPARED);
} else {
if(parse_tree.children_[0]->value_ != 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("parse tree is wrong",
K(ret),
K(parse_tree.num_child_),
K(parse_tree.children_));
} else {
GEN_SQL_STEP_1(ObShowSqlSet::XA_RECOVER_CONVERT_XID);
GEN_SQL_STEP_2(ObShowSqlSet::XA_RECOVER_CONVERT_XID,
OB_SYS_DATABASE_NAME,
OB_ALL_VIRTUAL_GLOBAL_TRANSACTION_TNAME,
transaction::ObXATransState::PREPARED);
}
}
}
break;
}
default:
/* won't be here */
ret = OB_NOT_IMPLEMENT;
@ -3783,6 +3820,16 @@ DEFINE_SHOW_CLAUSE_SET(SHOW_OLAP_ASYNC_JOB_STATUS,
SELECT T.job_name AS 'job_id', T.cowner AS 'schema_name', CASE WHEN T.state IS NULL THEN 'SUBMITTED' WHEN T.state = 'SCHEDULED' THEN 'RUNNING' WHEN T.state = 'KILLED' THEN 'CANCELLED' ELSE T.state END as 'status', NULL as fail_msg, T.start_date as 'create_time', T.gmt_modified as 'update_time', T.job_action as 'definition' FROM %s.%s T WHERE T.JOB_CLASS = 'OLAP_ASYNC_JOB_CLASS' AND T.TENANT_ID = %d AND T.JOB > 0 %s %s) %s",
NULL,
NULL);
DEFINE_SHOW_CLAUSE_SET(XA_RECOVER,
NULL,
"SELECT format_id as formatID, length(gtrid) as gtrid_length, length(bqual) as bqual_length, concat(gtrid,bqual) as data from %s.%s where state = %ld",
NULL,
NULL);
DEFINE_SHOW_CLAUSE_SET(XA_RECOVER_CONVERT_XID,
NULL,
"SELECT format_id as formatID, length(gtrid) as gtrid_length, length(bqual) as bqual_length, concat('0x',hex(concat(gtrid,bqual))) as data from %s.%s where state = %ld",
NULL,
NULL);
DEFINE_SHOW_CLAUSE_SET(SHOW_CREATE_USER,
NULL,
"SELECT \"%.*s\" AS `CREATE USER for %.*s@%.*s` FROM DUAL",

View File

@ -207,6 +207,8 @@ struct ObShowResolver::ObShowSqlSet
DECLARE_SHOW_CLAUSE_SET(SHOW_TRIGGERS_LIKE);
DECLARE_SHOW_CLAUSE_SET(SHOW_SEQUENCES);
DECLARE_SHOW_CLAUSE_SET(SHOW_SEQUENCES_LIKE);
DECLARE_SHOW_CLAUSE_SET(XA_RECOVER);
DECLARE_SHOW_CLAUSE_SET(XA_RECOVER_CONVERT_XID);
DECLARE_SHOW_CLAUSE_SET(SHOW_ENGINE);
DECLARE_SHOW_CLAUSE_SET(SHOW_OPEN_TABLES);
DECLARE_SHOW_CLAUSE_SET(SHOW_OLAP_ASYNC_JOB_STATUS);

View File

@ -772,6 +772,7 @@ int ObResolver::resolve(IsPrepared if_prepared, const ParseNode &parse_tree, ObS
case T_SHOW_OPEN_TABLES:
case T_SHOW_SEQUENCES:
case T_SHOW_OLAP_ASYNC_JOB_STATUS:
case T_XA_RECOVER:
case T_SHOW_CHECK_TABLE:
case T_SHOW_CREATE_USER: {
REGISTER_STMT_RESOLVER(Show);

View File

@ -7888,19 +7888,137 @@ int ObResolverUtils::resolve_string(const ParseNode *node, ObString &string)
int ret = OB_SUCCESS;
if (OB_UNLIKELY(NULL == node)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("node should not be null");
LOG_WARN("node should not be null", K(ret));
} else if (OB_UNLIKELY(T_VARCHAR != node->type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("node type is not T_VARCHAR", "type", get_type_name(node->type_));
LOG_WARN("node type is not T_VARCHAR", "type", get_type_name(node->type_), K(ret));
} else if (OB_UNLIKELY(node->str_len_ < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("empty string");
LOG_WARN("empty string", K(ret));
} else {
string = ObString(node->str_len_, node->str_value_);
}
return ret;
}
int ObResolverUtils::resolve_xid(const ParseNode *node, common::ObString &gtrid_string, common::ObString &bqual_string, int64_t & format_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(NULL == node)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("node should not be null", K(ret));
} else if (OB_UNLIKELY(T_LINK_NODE != node->type_ || 1 > node->num_child_ || 3 < node->num_child_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected type or unexpected child num when reslve xid", K(node->type_), K(node->num_child_), K(ret));
} else if(1 <= node->num_child_ && OB_FAIL(ObResolverUtils::resolve_text(node->children_[0], gtrid_string))) {
LOG_WARN("resolve gtrid string fail", K(node->children_[0]), K(gtrid_string), K(ret));
} else if(2 <= node->num_child_ && OB_FAIL(ObResolverUtils::resolve_text(node->children_[1], bqual_string))) {
LOG_WARN("resolve bqual string fail", K(node->children_[1]), K(bqual_string), K(ret));
} else if(3 == node->num_child_ && OB_FAIL(ObResolverUtils::resolve_ulong(node->children_[2], format_id))) {
LOG_WARN("resolve format id fail", K(node->children_[2]), K(format_id), K(ret));
} else {
// for mysql mode
// if format id is not specified, set format id to 1 by default
if (lib::is_mysql_mode() && 3 > node->num_child_) {
format_id = 1;
}
}
return ret;
}
int ObResolverUtils::resolve_text(const ParseNode *node, ObString &string)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(NULL == node)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("node should not be null", K(ret));
} else if (OB_UNLIKELY(T_VARCHAR != node->type_ && T_HEX_STRING != node->type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("node type is not T_VARCHAR/T_HEX_STRING", "type", get_type_name(node->type_), K(ret));
} else if (OB_UNLIKELY(node->str_len_ < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("empty string", K(ret));
} else {
string = ObString(node->str_len_, node->str_value_);
}
return ret;
}
int ObResolverUtils::resolve_ulong(const ParseNode *node, int64_t & format_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(NULL == node)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("node should not be null", K(ret));
} else if (OB_UNLIKELY(T_INT != node->type_ && T_HEX_STRING != node->type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("node type is not T_INT/T_HEX_STRING", "type", get_type_name(node->type_), K(ret));
} else {
format_id = node->value_;
}
return ret;
}
int ObResolverUtils::resolve_opt_join_or_resume(const ParseNode *node, int64_t & flag)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(NULL == node)) {
// do nothing
} else if (OB_UNLIKELY(T_INT != node->type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected node type", K(node->type_), K(ret));
} else {
if(node->value_ != 0 && node->value_ != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected val", K(node->value_), K(ret));
} else {
ret = OB_TRANS_XA_INVAL;
LOG_WARN("not support start arguments", K(node->value_), K(ret));
}
}
return ret;
}
int ObResolverUtils::resolve_opt_suspend(const ParseNode *node, int64_t & flag)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(NULL == node)) {
// do nothing
} else if (OB_UNLIKELY(T_INT != node->type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected node type", K(node->type_), K(ret));
} else {
if(node->value_ != 0 && node->value_ != 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected val", K(node->value_), K(ret));
} else {
// 目前这里进行里检查,可以不管直接赋值然后给执行时报错
ret = OB_TRANS_XA_INVAL;
LOG_WARN("not support start arguments", K(node->value_), K(ret));
}
}
return ret;
}
int ObResolverUtils::resolve_opt_one_phase(const ParseNode *node, int64_t & flag)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(NULL == node)) {
// do nothing
} else if (OB_UNLIKELY(T_INT != node->type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected node type", K(node->type_), K(ret));
} else {
if(node->value_ != 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected val", K(node->value_), K(ret));
} else {
flag = transaction::ObXAFlag::OBTMONEPHASE;
}
}
return ret;
}
// judge whether pdml stmt contain udf can parallel execute or not has two stage:
// stage1:check has dml write stmt or read/write package var info in this funciton;
// stage2:record udf has select stmt info, and when optimize this stmt,

View File

@ -672,7 +672,12 @@ public:
ParseNode *&func_udf);
static int set_direction_by_mode(const ParseNode &sort_node, OrderItem &order_item);
static int resolve_string(const ParseNode *node, common::ObString &string);
static int resolve_xid(const ParseNode *node, common::ObString &gtrid_string, common::ObString &bqual_string, int64_t & format_id);
static int resolve_text(const ParseNode *node, common::ObString &string);
static int resolve_ulong(const ParseNode *node, int64_t & format_id);
static int resolve_opt_join_or_resume(const ParseNode *node, int64_t & flag);
static int resolve_opt_suspend(const ParseNode *node, int64_t & flag);
static int resolve_opt_one_phase(const ParseNode *node, int64_t & flag);
// check some kind of the non-updatable view, which is forbidden for all dml statement:
// mysql:
// aggregate

View File

@ -35,19 +35,33 @@ int ObXaCommitResolver::resolve(const ParseNode &parse_node)
{
int ret = OB_SUCCESS;
ObXaCommitStmt *xa_commit_stmt = NULL;
if (OB_UNLIKELY(T_XA_COMMIT != parse_node.type_ || 1 != parse_node.num_child_)) {
if (OB_UNLIKELY(T_XA_COMMIT != parse_node.type_ || 2 != parse_node.num_child_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_));
LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_), K(ret));
} else if (OB_UNLIKELY(NULL == (xa_commit_stmt = create_stmt<ObXaCommitStmt>()))) {
ret = OB_SQL_RESOLVER_NO_MEMORY;
LOG_WARN("failed to create xa end stmt", K(ret));
} else {
ObString xa_string;
if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) {
LOG_WARN("resolve string failed", K(ret));
ObString gtrid_string;
ObString bqual_string;
int64_t format_id = -1;
int64_t flag = 0;
if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) {
LOG_WARN("resolve xid failed", K(ret));
} else if (OB_FAIL(ObResolverUtils::resolve_opt_one_phase(parse_node.children_[1], flag))) {
LOG_WARN("resolve xa commit one phase failed", K(ret));
} else {
xa_commit_stmt->set_xa_string(xa_string);
LOG_DEBUG("xa commit resolver", K(xa_string));
if(gtrid_string.length() <= 0) {
ret = OB_TRANS_XA_INVAL;
LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret));
} else {
xa_commit_stmt->set_xa_string(gtrid_string, bqual_string);
if(format_id >= 0) {
xa_commit_stmt->set_format_id(format_id);
}
xa_commit_stmt->set_flags(flag);
LOG_DEBUG("xa commit resolver", K(gtrid_string), K(bqual_string), K(format_id));
}
}
}
return ret;

View File

@ -34,19 +34,32 @@ int ObXaEndResolver::resolve(const ParseNode &parse_node)
{
int ret = OB_SUCCESS;
ObXaEndStmt *xa_end_stmt = NULL;
if (OB_UNLIKELY(T_XA_END != parse_node.type_ || 1 != parse_node.num_child_)) {
if (OB_UNLIKELY(T_XA_END != parse_node.type_ || 2 != parse_node.num_child_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_));
LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_), K(ret));
} else if (OB_UNLIKELY(NULL == (xa_end_stmt = create_stmt<ObXaEndStmt>()))) {
ret = OB_SQL_RESOLVER_NO_MEMORY;
LOG_WARN("failed to create xa end stmt", K(ret));
} else {
ObString xa_string;
if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) {
LOG_WARN("resolve string failed", K(ret));
ObString gtrid_string;
ObString bqual_string;
int64_t format_id = -1;
int64_t flag = 0;
if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) {
LOG_WARN("resolve xid failed in xa end", K(ret));
} else if (OB_FAIL(ObResolverUtils::resolve_opt_suspend(parse_node.children_[1], flag))) {
LOG_WARN("resolve xa end suspend failed", K(ret));
} else {
xa_end_stmt->set_xa_string(xa_string);
LOG_DEBUG("xa end resolver", K(xa_string));
if(gtrid_string.length() <= 0) {
ret = OB_TRANS_XA_INVAL;
LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret));
} else {
xa_end_stmt->set_xa_string(gtrid_string, bqual_string);
if(format_id >= 0) {
xa_end_stmt->set_format_id(format_id);
}
LOG_DEBUG("xa end resolver", K(gtrid_string), K(bqual_string), K(format_id), K(flag));
}
}
}
return ret;

View File

@ -36,17 +36,27 @@ int ObXaPrepareResolver::resolve(const ParseNode &parse_node)
ObXaPrepareStmt *xa_prepare_stmt = NULL;
if (OB_UNLIKELY(T_XA_PREPARE != parse_node.type_ || 1 != parse_node.num_child_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_));
LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_), K(ret));
} else if (OB_UNLIKELY(NULL == (xa_prepare_stmt = create_stmt<ObXaPrepareStmt>()))) {
ret = OB_SQL_RESOLVER_NO_MEMORY;
LOG_WARN("failed to create xa end stmt", K(ret));
} else {
ObString xa_string;
if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) {
LOG_WARN("resolve string failed", K(ret));
ObString gtrid_string;
ObString bqual_string;
int64_t format_id = -1;
if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) {
LOG_WARN("resolve xid failed", K(ret));
} else {
xa_prepare_stmt->set_xa_string(xa_string);
LOG_DEBUG("xa prepare resolver", K(xa_string));
if(gtrid_string.length() <= 0) {
ret = OB_TRANS_XA_INVAL;
LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret));
} else {
xa_prepare_stmt->set_xa_string(gtrid_string, bqual_string);
if(format_id >= 0) {
xa_prepare_stmt->set_format_id(format_id);
}
}
LOG_DEBUG("xa prepare resolver", K(gtrid_string), K(bqual_string), K(format_id));
}
}
return ret;

View File

@ -36,17 +36,27 @@ int ObXaRollBackResolver::resolve(const ParseNode &parse_node)
ObXaRollBackStmt *xa_rollback_stmt = NULL;
if (OB_UNLIKELY(T_XA_ROLLBACK != parse_node.type_ || 1 != parse_node.num_child_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_));
LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_), K(ret));
} else if (OB_UNLIKELY(NULL == (xa_rollback_stmt = create_stmt<ObXaRollBackStmt>()))) {
ret = OB_SQL_RESOLVER_NO_MEMORY;
LOG_WARN("failed to create xa end stmt", K(ret));
} else {
ObString xa_string;
if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) {
LOG_WARN("resolve string failed", K(ret));
ObString gtrid_string;
ObString bqual_string;
int64_t format_id = -1;
if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) {
LOG_WARN("resolve xid failed", K(ret));
} else {
xa_rollback_stmt->set_xa_string(xa_string);
LOG_DEBUG("xa rollback resolver", K(xa_string));
if(gtrid_string.length() <= 0) {
ret = OB_TRANS_XA_INVAL;
LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret));
} else {
xa_rollback_stmt->set_xa_string(gtrid_string, bqual_string);
if(format_id >= 0) {
xa_rollback_stmt->set_format_id(format_id);
}
}
LOG_DEBUG("xa rollback resolver",K(gtrid_string), K(bqual_string), K(format_id));
}
}
return ret;

View File

@ -35,19 +35,33 @@ int ObXaStartResolver::resolve(const ParseNode &parse_node)
{
int ret = OB_SUCCESS;
ObXaStartStmt *xa_start_stmt = NULL;
if (OB_UNLIKELY(T_XA_START != parse_node.type_ || 1 !=parse_node.num_child_)) {
if (OB_UNLIKELY(T_XA_START != parse_node.type_ || 2 != parse_node.num_child_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected val", K(parse_node.type_), K(parse_node.num_child_), K(ret));
LOG_WARN("unexpected val", K(parse_node.type_), K(parse_node.num_child_), K(ret));
} else if (OB_UNLIKELY(NULL == (xa_start_stmt = create_stmt<ObXaStartStmt>()))) {
ret = OB_SQL_RESOLVER_NO_MEMORY;
LOG_WARN("failed to create xa start stmt", K(ret));
} else {
ObString xa_string;
if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) {
LOG_WARN("resolve string failed", K(ret));
ObString gtrid_string;
ObString bqual_string;
int64_t format_id = -1;
int64_t flag = 0;
if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) {
LOG_WARN("resolve xid failed in xa start", K(ret));
} else if (OB_FAIL(ObResolverUtils::resolve_opt_join_or_resume(parse_node.children_[1], flag))) {
LOG_WARN("resolve xa start join or resume failed", K(ret));
} else {
xa_start_stmt->set_xa_string(xa_string);
LOG_DEBUG("xa start resolver", K(xa_string));
if(gtrid_string.length() <= 0) {
ret = OB_TRANS_XA_INVAL;
LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret));
} else {
xa_start_stmt->set_xa_string(gtrid_string, bqual_string);
if(format_id >= 0) {
xa_start_stmt->set_format_id(format_id);
}
xa_start_stmt->set_flags(flag);
LOG_DEBUG("xa start resolver", K(gtrid_string), K(bqual_string), K(format_id), K(flag));
}
}
}

View File

@ -485,6 +485,8 @@ ob_set_subtarget(ob_storage tx
tx/ob_tx_free_route_state.cpp
tx/ob_tx_free_route_rpc.cpp
tx/ob_tx_free_route_msg.cpp
tx/ob_mysql_xa_service.cpp
tx/ob_xa_inner_sql_client.cpp
)
ob_set_subtarget(ob_storage tx_storage

View File

@ -0,0 +1,642 @@
// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
//
#include "storage/tx/ob_xa_service.h"
#include "storage/tx/ob_xa_ctx.h"
#include "storage/tx/ob_trans_service.h"
#include "lib/mysqlclient/ob_isql_client.h"
#include "lib/mysqlclient/ob_mysql_proxy.h" // ObMySQLProxy
#include "lib/mysqlclient/ob_mysql_result.h"
#include "share/inner_table/ob_inner_table_schema_constants.h"
#include "share/rc/ob_tenant_base.h"
#include "share/rc/ob_tenant_module_init_ctx.h"
#include "observer/ob_srv_network_frame.h"
#include "storage/tx/ob_xa_inner_sql_client.h"
namespace oceanbase
{
using namespace share;
using namespace common;
using namespace common::sqlclient;
namespace transaction
{
int ObXAService::xa_start_for_mysql(const ObXATransID &xid,
const int64_t flags,
const uint32_t session_id,
const ObTxParam &tx_param,
ObTxDesc *&tx_desc)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(xid.empty()) ||
OB_UNLIKELY(!xid.is_valid()) ||
OB_UNLIKELY(0 > xid.get_format_id())) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid));
} else if (ObXAFlag::OBTMNOFLAGS != flags) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid flags for mysql xa start", K(ret), K(xid));
} else if (OB_FAIL(xa_start_for_mysql_(xid, session_id, tx_param, tx_desc))) {
TRANS_LOG(WARN, "mysql xa start failed", K(ret), K(xid));
} else {
// do nothing
}
// set xa_start_addr for txn-free-route
if (OB_SUCC(ret) && OB_NOT_NULL(tx_desc)) {
tx_desc->set_xa_start_addr(GCONF.self_addr_);
}
TRANS_LOG(INFO, "mysql xa start", K(ret), K(xid));
return ret;
}
int ObXAService::xa_start_for_mysql_(const ObXATransID &xid,
const uint32_t session_id,
const ObTxParam &tx_param,
ObTxDesc *&tx_desc)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
ObAddr sche_addr = GCTX.self_addr();
bool alloc = true;
ObXACtx *xa_ctx = NULL;
if (tx_desc != NULL) {
MTL(ObTransService *)->release_tx(*tx_desc);
tx_desc = NULL;
}
if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id))) {
TRANS_LOG(WARN, "acquire trans failed", K(ret), K(tx_param));
} else if (OB_FAIL(MTL(ObTransService *)->start_tx(*tx_desc, tx_param))) {
TRANS_LOG(WARN, "start trans failed", K(ret), KPC(tx_desc));
MTL(ObTransService *)->release_tx(*tx_desc);
tx_desc = NULL;
} else {
const bool is_tightly_coupled = false;
const ObTransID trans_id = tx_desc->tid();
if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(trans_id, alloc, xa_ctx))) {
TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(xid));
} else if (!alloc) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected error", K(ret), K(xid));
} else if (OB_FAIL(xa_ctx->init(xid,
trans_id,
tenant_id,
GCTX.self_addr(),
is_tightly_coupled,
this/*xa service*/,
&xa_ctx_mgr_,
&xa_rpc_,
&timer_))) {
TRANS_LOG(WARN, "xa ctx init failed", K(ret), K(xid), K(trans_id));
} else if (OB_FAIL(xa_ctx->xa_start_for_mysql(xid, tx_desc))) {
TRANS_LOG(WARN, "xa ctx start failed", K(ret), K(xid), K(trans_id));
}
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(xa_ctx)) {
xa_ctx_mgr_.erase_xa_ctx(trans_id);
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
// since tx_desc is not set into xa ctx, release tx desc explicitly
if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc,
ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "abort transaction failed", K(tmp_ret), K(trans_id), K(xid));
}
MTL(ObTransService *)->release_tx(*tx_desc);
tx_desc = NULL;
} else {
// for statistics
XA_STAT_ADD_XA_TRANS_START_COUNT();
}
}
return ret;
}
int ObXAService::xa_end_for_mysql(const ObXATransID &xid,
ObTxDesc *&tx_desc)
{
int ret = OB_SUCCESS;
ObXACtx *xa_ctx = NULL;
if (OB_UNLIKELY(xid.empty()) ||
OB_UNLIKELY(!xid.is_valid()) ||
OB_UNLIKELY(0 > xid.get_format_id()) ||
OB_UNLIKELY(NULL == tx_desc)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), KP(tx_desc));
} else if (!tx_desc->is_xa_trans()) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid));
} else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid));
} else if (!xid.all_equal_to(tx_desc->get_xid())) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "unknown xid", K(ret), K(xid));
} else if (OB_FAIL(xa_ctx->xa_end_for_mysql(xid, tx_desc))) {
TRANS_LOG(WARN, "xa end failed", K(ret), K(xid));
} else {
// do nothing
}
TRANS_LOG(INFO, "mysql xa end", K(ret), K(xid));
return ret;
}
int ObXAService::xa_prepare_for_mysql(const ObXATransID &xid,
const int64_t timeout_us,
ObTxDesc *&tx_desc,
bool &need_exit)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
bool is_read_only = false;
ObXACtx *xa_ctx = NULL;
share::ObLSID coord_id;
need_exit = false;
ObTransID tx_id;
// 1. check basic first, if fail, do not exit
if (OB_UNLIKELY(xid.empty()) ||
OB_UNLIKELY(!xid.is_valid()) ||
OB_UNLIKELY(0 > xid.get_format_id()) ||
OB_UNLIKELY(NULL == tx_desc) ||
OB_UNLIKELY(0 >= timeout_us)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), K(timeout_us), KP(tx_desc));
} else if (!tx_desc->is_xa_trans()) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid));
} else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid));
} else {
tx_id = tx_desc->tid();
}
// 2. pre xa prepare, persist xa record
if (OB_FAIL(ret)) {
} else if (OB_FAIL(xa_ctx->pre_xa_prepare_for_mysql(xid, tx_desc, need_exit,
is_read_only, coord_id))) {
TRANS_LOG(WARN, "pre xa prepare for mysql failed", K(ret), K(xid));
} else if (OB_FAIL(MTL(ObXAService*)->insert_record_for_mysql(MTL_ID(), xid, tx_id,
coord_id, GCTX.self_addr(), is_read_only))) {
// if persist failed, abort this trans
TRANS_LOG(WARN, "insert xa record failed", K(ret), K(xid), K(coord_id));
need_exit = true;
if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) {
// if duplicate, return DUPID and abort this trans
ret = OB_TRANS_XA_DUPID;
}
} else {
// do nothing
}
// 3. send sub prepare to coordinator
if (is_read_only) {
// read-only trans
// since ctx has exited, do nothing
} else if (OB_FAIL(ret)) {
// if fail and need exit (include dupid), abort the trans
if (need_exit) {
if (OB_SUCCESS != (tmp_ret = xa_ctx->handle_abort_for_mysql(
ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "handle abort failed", K(tmp_ret), K(xid));
}
}
} else {
need_exit = true;
if (OB_FAIL(xa_ctx->xa_prepare_for_mysql(xid, timeout_us))) {
TRANS_LOG(WARN, "xa prepare for mysql failed", K(ret), K(xid), K(timeout_us));
} else if (OB_FAIL(xa_ctx->wait_xa_prepare_for_mysql(xid, timeout_us))) {
TRANS_LOG(WARN, "wait xa prepare for mysql failed", K(ret), K(xid), K(timeout_us));
}
}
if (NULL != xa_ctx && need_exit) {
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
tx_desc = NULL;
xa_ctx = NULL;
}
TRANS_LOG(INFO, "mysql xa prepare", K(ret), K(xid), K(need_exit));
return ret;
}
int ObXAService::xa_commit_onephase_for_mysql(const ObXATransID &xid,
const int64_t timeout_us,
ObTxDesc *&tx_desc,
bool &need_exit)
{
int ret = OB_SUCCESS;
bool is_read_only = false;
ObXACtx *xa_ctx = NULL;
need_exit = false;
if (OB_UNLIKELY(xid.empty()) ||
OB_UNLIKELY(!xid.is_valid()) ||
OB_UNLIKELY(0 > xid.get_format_id()) ||
OB_UNLIKELY(NULL == tx_desc) ||
OB_UNLIKELY(0 >= timeout_us)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), K(timeout_us), KP(tx_desc));
} else if (!tx_desc->is_xa_trans()) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid));
} else if (!xid.all_equal_to(tx_desc->get_xid())) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "unknown xid", K(ret), K(xid));
} else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid));
} else {
const bool is_rollback = false;
ObTransID tx_id = tx_desc->tid();
if (OB_FAIL(xa_ctx->one_phase_end_trans_for_mysql(xid, is_rollback, timeout_us,
tx_desc, need_exit))) {
TRANS_LOG(WARN, "one phase xa commit failed", K(ret), K(xid));
} else {
need_exit = true;
if (OB_FAIL(xa_ctx->wait_one_phase_end_trans_for_mysql(is_rollback, timeout_us))) {
TRANS_LOG(WARN, "fail to wait one phase xa end trans", K(ret), K(xid));
} else {
// do nothing
}
}
if (need_exit) {
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
xa_ctx = NULL;
tx_desc = NULL;
}
}
// for statistics
XA_STAT_ADD_XA_ONE_PHASE_COMMIT_TOTAL_COUNT();
TRANS_LOG(INFO, "mysql one phase xa commit", K(ret), K(xid));
return ret;
}
int ObXAService::xa_second_phase_twophase_for_mysql(const ObXATransID &xid,
const int64_t timeout_us,
const bool is_rollback,
ObTransID &tx_id)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const int64_t start_ts = ObTimeUtility::current_time();
const uint64_t tenant_id = MTL_ID();
ObXACtx *xa_ctx = NULL;
bool alloc = true;
bool is_read_only = false;
share::ObLSID coord_id;
ObXAInnerSQLClient inner_sql_client;
if (OB_UNLIKELY(xid.empty()) ||
OB_UNLIKELY(!xid.is_valid()) ||
OB_UNLIKELY(0 > xid.get_format_id()) ||
OB_UNLIKELY(0 >= timeout_us)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), K(timeout_us));
} else if (OB_FAIL(inner_sql_client.start(MTL(ObTransService *)->get_mysql_proxy()))) {
TRANS_LOG(WARN, "xa inner sql client start failed", K(ret), K(xid));
} else if (OB_FAIL(inner_sql_client.query_xa_coord_for_mysql(xid, coord_id, tx_id,
is_read_only))) {
if (OB_ITER_END == ret) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "unkonwn xid", K(tmp_ret), K(xid));
} else {
TRANS_LOG(WARN, "query xa record for mysql failed", K(ret), K(xid), K(coord_id),
K(tx_id), K(is_read_only));
}
} else if (is_read_only) {
// read only trans
if (OB_SUCCESS != (tmp_ret = inner_sql_client.delete_xa_branch_for_mysql(xid))) {
TRANS_LOG(WARN, "delete xa branch failed", K(tmp_ret), K(xid), K(tenant_id), K(tx_id));
} else {
// do nothing
}
} else {
if (OB_UNLIKELY(!coord_id.is_valid()) ||
OB_UNLIKELY(!tx_id.is_valid())) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "invalid record for mysql two phase xa commit", K(ret), K(xid),
K(coord_id), K(tx_id));
} else if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(tx_id, alloc, xa_ctx))) {
TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(tx_id), K(xid), K(alloc), KP(xa_ctx));
} else if (OB_ISNULL(xa_ctx)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "xa ctx is null", K(ret), K(xid), K(tx_id));
} else if (false == alloc) {
ret = OB_TRANS_STMT_NEED_RETRY;
TRANS_LOG(WARN, "xa ctx already exist", K(ret), K(xid), K(tx_id));
} else {
const int64_t request_id = start_ts;
const bool is_tightly_coupled = false;
if (OB_FAIL(xa_ctx->init(xid,
tx_id,
tenant_id,
GCTX.self_addr(),
is_tightly_coupled,
this,
&xa_ctx_mgr_,
&xa_rpc_,
&timer_))) {
TRANS_LOG(WARN, "xa ctx init failed", K(ret), K(xid), K(tx_id));
} else if (OB_FAIL(xa_ctx->two_phase_end_trans(xid, coord_id, is_rollback,
timeout_us, request_id))) {
TRANS_LOG(WARN, "xa commit failed", K(ret), K(xid), K(tx_id));
} else if (OB_FAIL(xa_ctx->wait_two_phase_end_trans(xid, is_rollback, timeout_us))) {
TRANS_LOG(WARN, "wait xa commit failed", K(ret), K(xid), K(tx_id));
} else if (OB_SUCCESS != (tmp_ret = inner_sql_client.delete_xa_branch_for_mysql(xid))) {
TRANS_LOG(WARN, "delete xa branch failed", K(tmp_ret), K(xid), K(tx_id));
} else {
// do nothing
}
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
}
}
if (inner_sql_client.has_started()) {
(void)inner_sql_client.end();
}
if (is_rollback) {
TRANS_LOG(INFO, "mysql two phase xa rollback", K(ret), K(xid));
} else {
TRANS_LOG(INFO, "mysql two phase xa commit", K(ret), K(xid));
}
return ret;
}
int ObXAService::xa_rollback_onephase_for_mysql(const ObXATransID &xid,
const int64_t timeout_us,
ObTxDesc *&tx_desc,
bool &need_exit)
{
int ret = OB_SUCCESS;
bool is_read_only = false;
ObXACtx *xa_ctx = NULL;
need_exit = false;
if (OB_UNLIKELY(xid.empty()) ||
OB_UNLIKELY(!xid.is_valid()) ||
OB_UNLIKELY(0 > xid.get_format_id()) ||
OB_UNLIKELY(NULL == tx_desc) ||
OB_UNLIKELY(0 >= timeout_us)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), K(timeout_us), KP(tx_desc));
} else if (!tx_desc->is_xa_trans()) {
ret = OB_TRANS_XA_PROTO;
TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid));
} else if (!xid.all_equal_to(tx_desc->get_xid())) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "unknown xid", K(ret), K(xid));
} else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid));
} else {
const bool is_rollback = true;
ObTransID tx_id = tx_desc->tid();
if (OB_FAIL(xa_ctx->one_phase_end_trans_for_mysql(xid, is_rollback, timeout_us,
tx_desc, need_exit))) {
TRANS_LOG(WARN, "one phase xa commit failed", K(ret), K(xid));
} else {
need_exit = true;
if (OB_FAIL(xa_ctx->wait_one_phase_end_trans_for_mysql(is_rollback, timeout_us))) {
TRANS_LOG(WARN, "fail to wait one phase xa end trans", K(ret), K(xid));
} else {
// do nothing
}
}
if (need_exit) {
// if exit for xa rollback, return success
ret = OB_SUCCESS;
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
xa_ctx = NULL;
tx_desc = NULL;
}
}
TRANS_LOG(INFO, "mysql one phase xa rollback", K(ret), K(xid));
return ret;
}
#define INSERT_MYSQLXA_SQL "\
insert into %s (tenant_id, gtrid, bqual, format_id, \
trans_id, coordinator, scheduler_ip, scheduler_port, state, flag, is_readonly) \
values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %d, %ld, %d)"
int ObXAService::insert_record_for_mysql(const uint64_t tenant_id,
const ObXATransID &xid,
const ObTransID &trans_id,
const share::ObLSID &coordinator,
const ObAddr &sche_addr,
const bool is_readonly)
{
int ret = OB_SUCCESS;
ObMySQLProxy *mysql_proxy = MTL(ObTransService *)->get_mysql_proxy();
ObSqlString sql;
int64_t affected_rows = 0;
char scheduler_ip_buf[128] = {0};
sche_addr.ip_to_string(scheduler_ip_buf, 128);
char gtrid_str[128] = {0};
int64_t gtrid_len = 0;
char bqual_str[128] = {0};
int64_t bqual_len = 0;
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
int64_t original_timeout_us = THIS_WORKER.get_timeout_ts();
const int64_t start_ts = ObTimeUtility::current_time();
THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT);
ObXAInnerSqlStatGuard stat_guard(start_ts);
if (!is_valid_tenant_id(tenant_id) || xid.empty() || !trans_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(xid), K(trans_id));
} else if (!is_readonly && !coordinator.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(is_readonly), K(coordinator));
} else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(),
xid.get_gtrid_str().length(),
gtrid_str, 128, gtrid_len))) {
TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(tenant_id), K(xid));
} else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(),
xid.get_bqual_str().length(),
bqual_str, 128, bqual_len))) {
TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(tenant_id), K(xid));
} else if (OB_FAIL(sql.assign_fmt(INSERT_MYSQLXA_SQL,
OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME,
tenant_id,
(int)gtrid_len, gtrid_str,
(int)bqual_len, bqual_str,
xid.get_format_id(),
trans_id.get_id(),
coordinator.id(),
scheduler_ip_buf, sche_addr.get_port(),
ObXATransState::PREPARED, (long)0,
is_readonly))) {
TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql));
} else if (OB_FAIL(mysql_proxy->write(exec_tenant_id, sql.ptr(), affected_rows))) {
TRANS_LOG(WARN, "execute insert record sql failed", KR(ret), K(exec_tenant_id), K(tenant_id));
} else {
TRANS_LOG(INFO, "execute insert record sql success", K(exec_tenant_id), K(tenant_id),
K(sql), K(affected_rows));
}
THIS_WORKER.set_timeout_ts(original_timeout_us);
return ret;
}
int ObXAService::handle_terminate_for_mysql(const ObXATransID &xid, ObTxDesc *tx_desc)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (NULL == tx_desc || !tx_desc->is_valid() || !xid.is_valid() || 0 > xid.get_format_id()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), KP(tx_desc), K(xid));
} else if (!xid.all_equal_to(tx_desc->get_xid())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected trans descriptor", K(ret), K(xid));
} else {
ObXACtx *xa_ctx = tx_desc->get_xa_ctx();
ObTransID tx_id = tx_desc->tid();
TRANS_LOG(INFO, "start to terminate mysql xa trans", K(xid), K(tx_id));
if (NULL == xa_ctx) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected xa ctx", K(ret), K(xid), K(tx_id));
} else if (OB_FAIL(xa_ctx->handle_abort_for_mysql(ObTxAbortCause::SESSION_DISCONNECT))) {
TRANS_LOG(WARN, "handle terminate for mysql failed", K(ret), K(xid), K(tx_id));
}
xa_ctx_mgr_.revert_xa_ctx(xa_ctx);
tx_desc = NULL;
}
XA_INNER_INCREMENT_TERMINATE_COUNT();
return ret;
}
#define GC_MYSQL_RECORD_SQL "delete from %s where tenant_id = %lu and \
gtrid = x'%.*s' and \
bqual = x'%.*s'"
#define QUERY_MYSQL_LONG_PENDING_SQL "select HEX(gtrid) as gtrid, HEX(bqual) as bqual, \
trans_id, is_readonly \
from %s where tenant_id = %lu and \
unix_timestamp(now()) - %ld > unix_timestamp(gmt_modified) limit 20"
// 1. for read-only xa trans, if exceed gc threadhold, do not check trans ctx
// and clean record directly
// 2. for read-write xa trans, if exceed gc threadhold, check trans ctx.
// if trans ctx exists, do not clean record
int ObXAService::gc_record_for_mysql()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t gc_threshold_second = GCONF._xa_gc_timeout / 1000000; // in seconds
const uint64_t tenant_id = MTL_ID();
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
ObMySQLProxy *mysql_proxy = MTL(ObTransService *)->get_mysql_proxy();
ObSqlString sql;
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT);
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
if (OB_FAIL(sql.append_fmt(QUERY_MYSQL_LONG_PENDING_SQL,
OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME,
tenant_id,
gc_threshold_second))) {
TRANS_LOG(WARN, "generate sql fail", K(ret), K(sql), K(tenant_id));
} else if (OB_FAIL(mysql_proxy->read(res, exec_tenant_id, sql.ptr()))) {
TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql));
} else {
int64_t count = 0;
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
int64_t affected_rows = 0;
bool is_exist = false;
bool need_delete = false;
char gtrid_str[128] = {0};
int64_t gtrid_len = 0;
char bqual_str[128] = {0};
int64_t bqual_len = 0;
int64_t tx_id_value = 0;
bool is_readonly = false;
EXTRACT_STRBUF_FIELD_MYSQL(*result, "gtrid", gtrid_str, 128, gtrid_len);
EXTRACT_STRBUF_FIELD_MYSQL(*result, "bqual", bqual_str, 128, bqual_len);
EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t);
EXTRACT_BOOL_FIELD_MYSQL(*result, "is_readonly", is_readonly);
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "extract field failed", K(ret));
} else if (is_readonly) {
need_delete = true;
} else {
if (OB_FAIL(check_trans_ctx(tx_id_value, is_exist))) {
// if fail, do not gc record
TRANS_LOG(WARN, "check trans ctx failed", K(ret), K(tx_id_value));
} else if (!is_exist) {
need_delete = true;
}
}
if (need_delete) {
ObSqlString delete_sql;
if (OB_FAIL(delete_sql.assign_fmt(GC_MYSQL_RECORD_SQL,
OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME,
tenant_id,
(int)gtrid_len, gtrid_str,
(int)bqual_len, bqual_str))) {
TRANS_LOG(WARN, "generate query xa scheduler sql fail", K(ret), K(delete_sql));
} else if (OB_SUCCESS != (tmp_ret = mysql_proxy->write(exec_tenant_id,
delete_sql.ptr(),
affected_rows))) {
TRANS_LOG(WARN, "execute gc record sql failed", KR(ret), K(delete_sql));
} else {
TRANS_LOG(INFO, "gc record success", K(ret), K(tx_id_value), K(is_readonly),
K(affected_rows));
count++;
}
}
} // end while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
TRANS_LOG(INFO, "gc record for mysql", K(ret), K(count));
}
}
return ret;
}
#define CHECK_TRANS_CTX_SQL "SELECT trans_id \
FROM %s WHERE \
trans_id = %ld"
int ObXAService::check_trans_ctx(const int64_t tx_id_value, bool &is_exist)
{
int ret = OB_SUCCESS;
ObMySQLProxy *mysql_proxy = NULL;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) {
} else if (OB_ISNULL(mysql_proxy)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy));
} else if (OB_FAIL(sql.assign_fmt(CHECK_TRANS_CTX_SQL,
OB_ALL_VIRTUAL_TRANS_STAT_TNAME,
tx_id_value))) {
TRANS_LOG(WARN, "generate sql fail", K(ret), K(sql));
} else if (OB_FAIL(mysql_proxy->read(res, MTL_ID() , sql.ptr()))) {
TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "execute sql fail", K(ret), K(sql));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END == ret) {
is_exist = false;
// return success
ret = OB_SUCCESS;
} else {
TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql));
}
} else {
is_exist = true;
}
TRANS_LOG(INFO, "check trans ctx from virtual table", K(ret), K(tx_id_value), K(is_exist));
}
return ret;
}
} // transaction
} // oceanbase

View File

@ -571,7 +571,6 @@ void ObTxDesc::reset()
FORCE_PRINT_TRACE(&tlog_, "[tx desc trace]");
}
#endif
TRANS_LOG(DEBUG, "reset txdesc", KPC(this), K(lbt()));
tenant_id_ = 0;
cluster_id_ = -1;
trace_info_.reset();

View File

@ -2050,6 +2050,7 @@ int ObTransService::release_tx_ref(ObTxDesc &tx)
int ObTransService::tx_sanity_check(ObTxDesc &tx)
{
ObSpinLockGuard guard(tx.lock_);
return tx_sanity_check_(tx);
}
@ -2144,6 +2145,7 @@ int ObTransService::sql_stmt_start_hook(const ObXATransID &xid,
ObGlobalTxType global_tx_type = tx.get_global_tx_type(xid);
if (ObGlobalTxType::DBLINK_TRANS == global_tx_type && OB_TRANS_XA_BRANCH_FAIL == ret) {
// if dblink trans, change errno (branch fail) to the errno of plain trans
ObSpinLockGuard guard(tx.lock_);
if (OB_FAIL(tx_sanity_check_(tx))) {
TRANS_LOG(WARN, "tx state insanity", K(ret), K(global_tx_type), K(xid));
} else {
@ -2175,6 +2177,7 @@ int ObTransService::sql_stmt_end_hook(const ObXATransID &xid, ObTxDesc &tx)
ObGlobalTxType global_tx_type = tx.get_global_tx_type(xid);
if (ObGlobalTxType::DBLINK_TRANS == global_tx_type && OB_TRANS_XA_BRANCH_FAIL == ret) {
// if dblink trans, change errno (branch fail) to the errno of plain trans
ObSpinLockGuard guard(tx.lock_);
if (OB_FAIL(tx_sanity_check_(tx))) {
TRANS_LOG(WARN, "tx state insanity", K(ret), K(global_tx_type), K(xid));
} else {

View File

@ -13,6 +13,7 @@
#include "ob_trans_service.h"
#include "lib/utility/serialization.h"
#include "storage/tx/ob_xa_ctx.h"
/*
* The exception handle of txn state update with synchronization via proxy
@ -1131,6 +1132,15 @@ bool ObTransService::need_fallback_(ObTxDesc &tx, int64_t &total_size)
} else if (tx.is_xa_trans() && tx.is_xa_tightly_couple()) {
TRANS_LOG(TRACE, "need fallback for tightly coupled xa trans");
fallback = true;
} else if (tx.is_xa_trans()
&& NULL != tx.get_xa_ctx()
&& tx.get_xa_ctx()->is_mysql_mode()
&& ObXATransState::ACTIVE != tx.get_xa_ctx()->get_state()) {
// To be compatible with mysql, trans can not be disassociated with session after xa end.
// However, any dml can not be executed after xa end. Therefore, we need stop tx free route.
// NOTE that the xa trans state is switched to IDLE from ACTIVE in xa end.
TRANS_LOG(TRACE, "need fallback for mysql xa trans");
fallback = true;
} else {
total_size = OB_E(EventTable::EN_TX_FREE_ROUTE_STATE_SIZE, tx.tx_id_) tx.estimate_state_size();
if (total_size > MAX_STATE_SIZE) {

View File

@ -101,6 +101,7 @@ void ObXACtx::reset()
local_lock_level_ = -1;
executing_xid_.reset();
need_stmt_lock_ = true;
is_mysql_mode_ = false;
is_inited_ = false;
}
@ -165,6 +166,9 @@ int ObXACtx::handle_timeout(const int64_t delay)
if (is_exiting_) {
ret = OB_TRANS_IS_EXITING;
TRANS_LOG(WARN, "xa ctx is exiting", K(ret));
} else if (is_mysql_mode_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected timeout task for mysql", K(ret), K(*this));
} else if (is_terminated_) {
ret = OB_TRANS_IS_EXITING;
TRANS_LOG(WARN, "xa trans has terminated", K(ret));
@ -1828,6 +1832,16 @@ int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id)
} else if (is_exiting_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "xa ctx is exiting", K(ret), K(*this));
} else if (is_mysql_mode_) {
// only for mysql mode
if (OB_FAIL(MTL(ObTransService*)->tx_sanity_check(*tx_desc_))) {
TRANS_LOG(WARN, "tx state insanity", K(ret), K(xid), K(*this));
} else if (ObXATransState::ACTIVE != xa_trans_state_) {
ret = OB_TRANS_XA_RMFAIL;
TRANS_LOG(WARN, "cannot be executed in this state", K(ret), K(xid), K(*this));
} else {
// do nothing
}
} else if (is_terminated_) {
// NOTE that anohter error code maybe required for loosely coupled mode
ret = OB_TRANS_XA_BRANCH_FAIL;
@ -2615,6 +2629,8 @@ int ObXACtx::try_heartbeat()
const int64_t now = ObTimeUtility::current_time();
if (is_exiting_ || is_terminated_) {
// do nothing
} else if (is_mysql_mode_) {
// do nothing for mysql mode
} else if (original_sche_addr_ != GCTX.self_addr()) {
// temproray scheduler, do nothing
} else if (OB_ISNULL(xa_branch_info_)
@ -3288,6 +3304,313 @@ int ObXACtx::start_check_stmt_lock(const ObXATransID &xid)
return ret;
}
}//transaction
// only for mysql mode
// xa start
// @param [in] xid
// @param [in] tx_desc
int ObXACtx::xa_start_for_mysql(const ObXATransID &xid,
ObTxDesc *tx_desc)
{
int ret = OB_SUCCESS;
const int64_t timeout_seconds = 60;
const int64_t flags = ObXAFlag::OBTMNOFLAGS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_UNLIKELY(xid.empty()) || OB_ISNULL(tx_desc)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(xid));
} else if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(xid));
} else if (is_exiting_) {
ret = OB_TRANS_IS_EXITING;
TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(*this));
} else if (!xid.all_equal_to(xid_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected xid", K(xid), K(xid_), K(*this));
} else if (OB_FAIL(save_tx_desc_(tx_desc))) {
TRANS_LOG(WARN, "save trans desc failed", K(ret), K(*this));
} else {
// no need register timeout task
xa_trans_state_ = ObXATransState::ACTIVE;
++xa_branch_count_;
++xa_ref_count_;
// set trans_desc members
tx_desc->set_xid(xid);
tx_desc->set_xa_ctx(this);
// set global trans type to xa trans
tx_desc->set_global_tx_type(ObGlobalTxType::XA_TRANS);
is_mysql_mode_ = true;
}
REC_TRACE_EXT(tlog_, xa_start, OB_Y(ret), OB_ID(ctx_ref), get_uref());
TRANS_LOG(INFO, "xa start for mysql", K(ret), K(*this));
return ret;
}
// only for mysql mode
// xa end
// @param [in] xid
// @param [in] tx_desc
int ObXACtx::xa_end_for_mysql(const ObXATransID &xid,
ObTxDesc *tx_desc)
{
int ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_UNLIKELY(xid.empty()) || OB_ISNULL(tx_desc)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(xid));
} else if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(xid));
} else if (is_exiting_) {
ret = OB_TRANS_IS_EXITING;
TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(*this));
} else if (!is_mysql_mode_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "not mysql mode", K(ret), K(xid), K(*this));
} else if (!xid.all_equal_to(xid_)) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "xid not match", K(ret), K(xid));
} else if (ObXATransState::ACTIVE != xa_trans_state_) {
ret = OB_TRANS_XA_RMFAIL;
TRANS_LOG(WARN, "not active state", K(ret), K(xid), K(*this));
} else {
xa_trans_state_ = ObXATransState::IDLE;
}
REC_TRACE_EXT(tlog_, xa_end, OB_Y(ret), OB_ID(ctx_ref), get_uref());
TRANS_LOG(INFO, "xa end for mysql", K(ret), K(*this));
return ret;
}
// only for mysql mode
// xa prepare
// @param [in] xid
// @param [in] tx_desc
// @param [out] need_exit
int ObXACtx::pre_xa_prepare_for_mysql(const ObXATransID &xid,
ObTxDesc *tx_desc,
bool &need_exit,
bool &is_read_only,
share::ObLSID &coord_id)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
need_exit = false;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_UNLIKELY(xid.empty()) || OB_ISNULL(tx_desc)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(xid));
} else if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(xid));
} else if (is_exiting_) {
ret = OB_TRANS_IS_EXITING;
TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(*this));
need_exit = true;
} else if (!is_mysql_mode_ || tx_desc_ != tx_desc) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected xa ctx", K(ret), K(xid), K(*this));
} else if (!xid.all_equal_to(xid_)) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "unknown xid", K(ret), K(xid), K(*this));
} else if (ObXATransState::IDLE != xa_trans_state_) {
ret = OB_TRANS_XA_RMFAIL;
TRANS_LOG(WARN, "not idle state", K(ret), K(xid), K(*this));
} else if (OB_FAIL(MTL(ObTransService*)->tx_sanity_check(*tx_desc))) {
TRANS_LOG(WARN, "tx state insanity", K(ret), K(xid), K(*this));
need_exit = true;
} else {
if (OB_FAIL(MTL(ObTransService*)->prepare_tx_coord(*tx_desc_, coord_id))) {
if (OB_ERR_READ_ONLY_TRANSACTION == ret) {
// read only tranaction, rewrite ret to OB_SUCCESS
XA_STAT_ADD_XA_READ_ONLY_TRANS_TOTAL_COUNT();
TRANS_LOG(INFO, "xa is read only", K(ret), K(*this));
is_read_only = true;
ret = OB_SUCCESS;
} else {
TRANS_LOG(WARN, "prepare tx coord failed", K(ret), K(*this));
}
} else if (!coord_id.is_valid()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "invalid coordinator", K(ret), K_(xid), K(coord_id), K(*this));
}
if (OB_FAIL(ret) || is_read_only) {
need_exit = true;
}
}
if (need_exit) {
if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_,
ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "abort tx failed", K(tmp_ret), K(*this));
}
--xa_ref_count_;
set_terminated_();
try_exit_();
}
REC_TRACE_EXT(tlog_, xa_prepare, OB_Y(ret), OB_ID(ctx_ref), get_uref());
TRANS_LOG(INFO, "pre xa prepare for mysql", K(ret), K(need_exit), K(*this));
return ret;
}
// only for mysql mode
// xa prepare
// if fail, need exit
// @param [in] xid
// @param [in] tx_desc
int ObXACtx::xa_prepare_for_mysql(const ObXATransID &xid,
int64_t timeout_us)
{
int ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_FAIL(drive_prepare_(xid, timeout_us))) {
TRANS_LOG(WARN, "drive prepare failed", K(ret), K(*this));
}
if (OB_FAIL(ret)) {
--xa_ref_count_;
try_exit_();
}
return ret;
}
// only for mysql mode
// @param [in] xid
// @param [in] timeout_us
int ObXACtx::wait_xa_prepare_for_mysql(const ObXATransID &xid,
const int64_t timeout_us)
{
int ret = OB_SUCCESS;
int result = OB_SUCCESS;
if (OB_FAIL(end_trans_cb_.wait(timeout_us + 10000000, result)) || OB_FAIL(result)) {
TRANS_LOG(WARN, "wait trans prepare failed", K(ret), K(xid), K(*this));
}
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_SUCC(ret)) {
xa_trans_state_ = ObXATransState::PREPARED;
}
--xa_ref_count_;
try_exit_();
return ret;
}
// only for mysql mode
// @param [in] xid
// @param [in] is_rollback
// @param [in] timeout_us
int ObXACtx::one_phase_end_trans_for_mysql(const ObXATransID &xid,
const bool is_rollback,
const int64_t timeout_us,
ObTxDesc *tx_desc,
bool &need_exit)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
need_exit = false;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (OB_UNLIKELY(xid.empty()) || OB_ISNULL(tx_desc)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(xid));
} else if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(xid));
} else if (is_exiting_) {
ret = OB_TRANS_IS_EXITING;
TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(*this));
need_exit = true;
} else if (!is_mysql_mode_ || tx_desc_ != tx_desc) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected xa ctx", K(ret), K(xid), K(*this));
} else if (!xid.all_equal_to(xid_)) {
ret = OB_TRANS_XA_NOTA;
TRANS_LOG(WARN, "unknown xid", K(ret), K(xid), K(*this));
} else if (ObXATransState::IDLE != xa_trans_state_) {
ret = OB_TRANS_XA_RMFAIL;
TRANS_LOG(WARN, "not idle state", K(ret), K(xid), K(*this));
} else {
if (OB_FAIL(MTL(ObTransService*)->tx_sanity_check(*tx_desc))) {
need_exit = true;
TRANS_LOG(WARN, "tx state insanity", K(ret), K(xid), K(*this));
} else {
// in this, need exit
// case one, if fail, exit directly
// case two, if success, exit in wait interface
const int64_t now = ObTimeUtility::current_time();
if (OB_FAIL(MTL(ObTransService*)->end_1pc_trans(*tx_desc_, &end_trans_cb_, is_rollback,
now + timeout_us))) {
TRANS_LOG(WARN, "end 1pc trans failed", K(ret), K(*this));
need_exit = true;
} else {
if (is_rollback) {
xa_trans_state_ = ObXATransState::ROLLBACKING;
} else {
xa_trans_state_ = ObXATransState::COMMITTING;
}
}
}
is_xa_one_phase_ = true;
}
if (need_exit) {
if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_,
ObTxAbortCause::IMPLICIT_ROLLBACK))) {
TRANS_LOG(WARN, "abort tx failed", K(tmp_ret), K(*this));
}
--xa_ref_count_;
set_terminated_();
try_exit_();
}
REC_TRACE_EXT(tlog_, xa_one_phase, OB_Y(ret), OB_ID(is_rollback), is_rollback,
OB_ID(ctx_ref), get_uref());
return ret;
}
int ObXACtx::wait_one_phase_end_trans_for_mysql(const bool is_rollback, const int64_t timeout_us)
{
int ret = OB_SUCCESS;
int result = OB_SUCCESS;
if (!is_rollback) {
if (OB_FAIL(end_trans_cb_.wait(timeout_us + 10 * 1000 * 1000, result)) || OB_FAIL(result)) {
TRANS_LOG(WARN, "wait sub2pc end failed", K(ret), K(is_rollback), K(*this));
}
}
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (is_rollback) {
xa_trans_state_ = ObXATransState::ROLLBACKED;
} else if (OB_SUCCESS == ret) {
xa_trans_state_ = ObXATransState::COMMITTED;
}
--xa_ref_count_;
set_exiting_();
TRANS_LOG(INFO, "wait one phase end trans", K(ret), K(is_rollback), K(*this));
return ret;
}
int ObXACtx::handle_abort_for_mysql(int cause)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
if (!is_inited_) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "ObXACtx not inited", K(ret));
} else if (!is_mysql_mode_ || NULL == tx_desc_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected xa ctx", K(ret), K(*this));
} else if (ObXATransState::ACTIVE != xa_trans_state_
&& ObXATransState::IDLE != xa_trans_state_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected xa trans state", K(ret), K(*this));
} else {
if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_, cause))) {
TRANS_LOG(WARN, "abort tx failed", K(tmp_ret), K(*this));
}
--xa_ref_count_;
set_terminated_();
set_exiting_();
}
return ret;
}
}//transaction
}//oceanbase

View File

@ -146,6 +146,27 @@ public:
ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK);
return xa_ref_count_;
}
public:
// for mysql xa
int xa_start_for_mysql(const ObXATransID &xid, ObTxDesc *tx_desc);
int xa_end_for_mysql(const ObXATransID &xid, ObTxDesc *tx_desc);
int pre_xa_prepare_for_mysql(const ObXATransID &xid,
ObTxDesc *tx_desc,
bool &need_exit,
bool &is_read_only,
share::ObLSID &coord_id);
int xa_prepare_for_mysql(const ObXATransID &xid, const int64_t timeout_us);
int wait_xa_prepare_for_mysql(const ObXATransID &xid, const int64_t timeout_us);
int one_phase_end_trans_for_mysql(const ObXATransID &xid,
const bool is_rollback,
const int64_t timeout_us,
ObTxDesc *tx_desc,
bool &need_exit);
int wait_one_phase_end_trans_for_mysql(const bool is_rollback,
const int64_t timeout_us);
bool is_mysql_mode() const { return is_mysql_mode_; }
int handle_abort_for_mysql(int cause);
int32_t get_state() const { return xa_trans_state_; }
public:
// for 4.0 dblink
int xa_start_for_dblink(const ObXATransID &xid,
@ -172,7 +193,8 @@ public:
K_(xa_branch_count), K_(xa_ref_count), K_(lock_grant),
K_(is_tightly_coupled), K_(lock_xid), K_(xa_stmt_info),
K_(is_terminated), K_(executing_xid), "uref", get_uref(),
K_(has_tx_level_temp_table), K_(local_lock_level), K_(need_stmt_lock));
K_(has_tx_level_temp_table), K_(local_lock_level), K_(need_stmt_lock),
K_(is_mysql_mode));
private:
int register_timeout_task_(const int64_t interval_us);
int unregister_timeout_task_();
@ -337,6 +359,7 @@ private:
// 4.2 if local_lock_level == 0, execute the normal global lock release processing
int64_t local_lock_level_;
bool need_stmt_lock_;
bool is_mysql_mode_;
};
}//transaction

View File

@ -131,6 +131,7 @@ public:
static bool contain_tmreadonly(const int64_t flag) { return flag & OBTMREADONLY; }
static bool contain_tmserializable(const int64_t flag) { return flag & OBTMSERIALIZABLE; }
static bool is_tmnoflags(const int64_t flag, const int64_t xa_req_type);
static bool is_tmnoflags_for_mysql(const int64_t flag) { return OBTMNOFLAGS == flag; }
static bool contain_loosely(const int64_t flag) { return flag & OBLOOSELY; }
static bool contain_tmjoin(const int64_t flag) { return flag & OBTMJOIN; }
static bool is_tmjoin(const int64_t flag) { return flag == OBTMJOIN; }

View File

@ -0,0 +1,205 @@
// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#include "ob_xa_inner_sql_client.h"
namespace oceanbase
{
using namespace common;
using namespace common::sqlclient;
namespace transaction
{
ObXAInnerSQLClient::~ObXAInnerSQLClient()
{
reset();
close();
}
void ObXAInnerSQLClient::reset()
{
start_ts_ = 0;
has_started_ = false;
}
int ObXAInnerSQLClient::start(ObISQLClient *sql_client)
{
int ret = OB_SUCCESS;
const int32_t group_id = 0;
if (OB_UNLIKELY(NULL == sql_client)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid arguments", K(ret), KP(sql_client));
} else if (has_started_) {
// return success
ret = OB_SUCCESS;
} else {
const uint64_t exec_tenant_id = gen_meta_tenant_id(MTL_ID());
if (OB_FAIL(connect(exec_tenant_id, group_id, sql_client))) {
TRANS_LOG(WARN, "failed to init", K(ret));
} else {
start_ts_ = ObTimeUtility::current_time();
has_started_ = true;
}
}
return ret;
}
int ObXAInnerSQLClient::end()
{
int ret = OB_SUCCESS;
start_ts_ = 0;
has_started_ = false;
close();
return ret;
}
#define QUERY_MYSQLXA_SQL "\
SELECT coordinator, trans_id, is_readonly FROM %s WHERE \
tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld"
int ObXAInnerSQLClient::query_xa_coord_for_mysql(const ObXATransID &xid,
share::ObLSID &coord_id,
ObTransID &tx_id,
bool &is_readonly)
{
int ret = OB_SUCCESS;
ObSqlString sql;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
ObMySQLResult *result = NULL;
char gtrid_str[128] = {0};
int64_t gtrid_len = 0;
char bqual_str[128] = {0};
int64_t bqual_len = 0;
const uint64_t tenant_id = MTL_ID();
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
const int64_t start_ts = ObTimeUtility::current_time();
ObXAInnerSqlStatGuard stat_guard(start_ts);
if (!has_started_ || NULL == get_connection()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected xa inner sql client", K(ret), K(xid));
} else if (xid.empty()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(xid));
} else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(),
xid.get_gtrid_str().length(),
gtrid_str,
128,
gtrid_len))) {
TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(xid));
} else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(),
xid.get_bqual_str().length(),
bqual_str,
128,
bqual_len))) {
TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(xid));
} else if (OB_FAIL(sql.assign_fmt(QUERY_MYSQLXA_SQL,
OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME,
tenant_id,
(int)gtrid_len, gtrid_str,
(int)bqual_len, bqual_str,
xid.get_format_id()))) {
TRANS_LOG(WARN, "generate query coordinator sql fail", K(ret));
} else if (OB_FAIL(read(res, exec_tenant_id, sql.ptr()))) {
TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(exec_tenant_id), K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "execute sql fail", K(ret), K(sql));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END != ret) {
TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql));
}
} else {
int64_t tx_id_value = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t);
EXTRACT_BOOL_FIELD_MYSQL(*result, "is_readonly", is_readonly);
tx_id = ObTransID(tx_id_value);
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "fail to extract field from result", K(ret));
} else {
int64_t ls_id_value = 0;
EXTRACT_INT_FIELD_MYSQL(*result, "coordinator", ls_id_value, int64_t);
if (OB_FAIL(ret)) {
if (OB_ERR_NULL_VALUE == ret) {
// rewrite code, and caller would check validity of coordinator
ret = OB_SUCCESS;
} else {
TRANS_LOG(WARN, "fail to extract coordinator from result", K(ret), K(xid));
}
} else {
coord_id = share::ObLSID(ls_id_value);
}
}
}
}
return ret;
}
#define DELETE_XA_TRANS_SQL "delete from %s where \
tenant_id = %lu and gtrid = x'%.*s' and bqual = x'%.*s' and format_id = %ld"
int ObXAInnerSQLClient::delete_xa_branch_for_mysql(const ObXATransID &xid)
{
int ret = OB_SUCCESS;
ObSqlString sql;
int64_t affected_rows = 0;
char gtrid_str[128] = {0};
int64_t gtrid_len = 0;
char bqual_str[128] = {0};
int64_t bqual_len = 0;
const uint64_t tenant_id = MTL_ID();
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
const int64_t start_ts = ObTimeUtility::current_time();
ObXAInnerSqlStatGuard stat_guard(start_ts);
if (!has_started_ || NULL == get_connection()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "unexpected xa inner sql client", K(ret), K(xid));
} else if (xid.empty()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid argument", K(ret), K(xid));
} else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(),
xid.get_gtrid_str().length(),
gtrid_str,
128,
gtrid_len))) {
TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(xid));
} else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(),
xid.get_bqual_str().length(),
bqual_str,
128,
bqual_len))) {
TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(xid));
} else {
if (OB_FAIL(sql.assign_fmt(DELETE_XA_TRANS_SQL,
OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME,
tenant_id,
(int)gtrid_len, gtrid_str,
(int)bqual_len, bqual_str,
xid.get_format_id()))) {
TRANS_LOG(WARN, "generate delete xa trans sql fail", K(ret), K(sql));
} else if (OB_FAIL(write(exec_tenant_id, sql.ptr(), affected_rows))) {
TRANS_LOG(WARN, "execute delete xa trans sql fail",
KR(ret), K(exec_tenant_id), K(sql), K(affected_rows));
} else if (OB_UNLIKELY(1 < affected_rows)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "delete multiple rows", K(xid), K(affected_rows), K(sql));
} else if (OB_UNLIKELY(0 == affected_rows)) {
TRANS_LOG(WARN, "delete no rows", K(xid), K(sql));
// return OB_SUCCESS
} else {
// do nothing
}
}
return ret;
}
} // end namespace transaction
} // end namespace oceanbase

View File

@ -0,0 +1,57 @@
// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#ifndef OCEANBASE_XA_INNER_SQL_CLIENT_H_
#define OCEANBASE_XA_INNER_SQL_CLIENT_H_
#include "lib/string/ob_sql_string.h"
#include "lib/oblog/ob_log_module.h"
#include "lib/mysqlclient/ob_isql_client.h"
#include "lib/mysqlclient/ob_mysql_connection.h"
#include "lib/mysqlclient/ob_single_connection_proxy.h"
#include "storage/tx/ob_trans_define.h"
namespace oceanbase
{
namespace common
{
namespace sqlclient
{
class ObISQLConnection;
class ObISQLConnectionPool;
}
}
namespace transaction
{
// the object of this class is not thread-safe
class ObXAInnerSQLClient: public ObSingleConnectionProxy
{
public:
ObXAInnerSQLClient() : start_ts_(0), has_started_(false) {};
~ObXAInnerSQLClient();
void reset();
public:
int start(ObISQLClient *sql_client);
int end();
bool has_started() const { return has_started_; }
int query_xa_coord_for_mysql(const ObXATransID &xid,
share::ObLSID &coord_id,
ObTransID &tx_id,
bool &is_readonly);
int delete_xa_branch_for_mysql(const ObXATransID &xid);
private:
int64_t start_ts_;
bool has_started_;
};
} // end namespace transaction
} // end namespace oceanbase
#endif // OCEANBASE_XA_INNER_SQL_CLIENT_H_

View File

@ -56,15 +56,20 @@ int ObXAInnerTableGCWorker::start()
return ret;
}
// 1. in mysql mode, only meta leader collect invalid records
// 2. no need gc for sys tenant temporarily
void ObXAInnerTableGCWorker::run1()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const ObLSID meta_ls_id(ObLSID::SYS_LS_ID);
const uint64_t tenant_id = MTL_ID();
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
int64_t last_scan_ts = ObTimeUtil::current_time();
bool has_decided_mode = false;
bool is_oracle_mode = false;
int64_t start_delay = ObRandom::rand(1, 100);
// use start delay avoid diffient thread do gc on the same time
lib::set_thread_name("ObXAGCWorker");
for (int64_t i = 0; i < start_delay && !has_set_stop(); ++i) {
@ -76,27 +81,54 @@ void ObXAInnerTableGCWorker::run1()
int64_t gc_interval = std::max(2 * max_gc_cost_time_, tmp_start_delay);
gc_interval = std::min(gc_interval, gc_interval_upper_bound);
int64_t gc_cost_time = 0;
int64_t before_gc_ts = 0;
if (OB_SUCCESS != (tmp_ret = ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id,
is_oracle_mode))) {
TRANS_LOG(WARN, "check oracle mode failed", K(ret));
} else {
has_decided_mode = true;
}
while (!has_set_stop()) {
before_gc_ts = ObTimeUtil::current_time();
if (before_gc_ts - last_scan_ts > gc_interval) {
if (is_user_tenant(tenant_id)
&& OB_SUCC(share::ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id, is_oracle_mode))
&& is_oracle_mode) {
if (OB_FAIL(xa_service_->gc_invalid_xa_record(tenant_id))) {
TRANS_LOG(WARN, "gc invalid xa record failed", K(ret), K(tenant_id),
K(gc_interval), K(last_scan_ts), K(before_gc_ts));
if (is_user_tenant(tenant_id) && has_decided_mode) {
int64_t gc_cost_time = 0;
int64_t before_gc_ts = ObTimeUtil::current_time();
if (before_gc_ts - last_scan_ts > gc_interval) {
if (is_oracle_mode) {
// oracle mode
if (OB_FAIL(xa_service_->gc_invalid_xa_record(tenant_id))) {
TRANS_LOG(WARN, "gc invalid xa record failed", K(ret), K(tenant_id),
K(gc_interval), K(last_scan_ts), K(before_gc_ts));
} else {
// update last scan ts
last_scan_ts = ObTimeUtil::current_time();
gc_cost_time = last_scan_ts - before_gc_ts; // compute gc cost
max_gc_cost_time_ = std::max(max_gc_cost_time_, gc_cost_time);
TRANS_LOG(INFO, "clean invalid records", K(tenant_id), K(ret), K(gc_cost_time));
}
} else {
// update last scan ts
last_scan_ts = ObTimeUtil::current_time();
gc_cost_time = last_scan_ts - before_gc_ts; // compute gc cost
max_gc_cost_time_ = std::max(max_gc_cost_time_, gc_cost_time);
TRANS_LOG(INFO, "scan xa inner table for one round", K(tenant_id), K(ret), K(gc_cost_time));
// mysql mode
common::ObAddr leader_addr;
if (OB_FAIL(MTL(ObTransService *)->get_location_adapter()->nonblock_get_leader(
GCONF.cluster_id, meta_tenant_id, meta_ls_id, leader_addr))) {
TRANS_LOG(WARN, "get leader failed", K(ret));
} else if (GCTX.self_addr() == leader_addr) {
if (OB_FAIL(xa_service_->gc_record_for_mysql())) {
TRANS_LOG(WARN, "gc record failed", K(ret));
} else {
// update last scan ts
last_scan_ts = ObTimeUtil::current_time();
gc_cost_time = last_scan_ts - before_gc_ts;
max_gc_cost_time_ = std::max(max_gc_cost_time_, gc_cost_time);
TRANS_LOG(INFO, "clean invalid records", K(ret), K(gc_cost_time));
}
}
}
}
} else if (is_user_tenant(tenant_id) && !has_decided_mode) {
if (OB_SUCCESS != (tmp_ret = ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id,
is_oracle_mode))) {
TRANS_LOG(WARN, "check oracle mode failed", K(ret));
} else {
last_scan_ts = before_gc_ts;
has_decided_mode = true;
}
}
//sleep 20 secnd whether gc succ or not

View File

@ -139,6 +139,41 @@ public:
int xa_prepare_for_original(const ObXATransID &xid,
const ObTransID &trans_id,
const int64_t timeout_seconds);
public:
// for mysql
int xa_start_for_mysql(const ObXATransID &xid,
const int64_t flags,
const uint32_t session_id,
const ObTxParam &tx_param,
ObTxDesc *&tx_desc);
int xa_end_for_mysql(const ObXATransID &xid,
ObTxDesc *&tx_desc);
int xa_prepare_for_mysql(const ObXATransID &xid,
const int64_t timeout_us,
ObTxDesc *&tx_desc,
bool &need_exit);
int xa_commit_onephase_for_mysql(const ObXATransID &xid,
const int64_t timeout_us,
ObTxDesc *&tx_desc,
bool &need_exit);
int xa_second_phase_twophase_for_mysql(const ObXATransID &xid,
const int64_t timeout_us,
const bool is_rollback,
ObTransID &tx_id);
int xa_rollback_onephase_for_mysql(const ObXATransID &xid,
const int64_t timeout_us,
ObTxDesc *&tx_desc,
bool &need_exit);
int insert_record_for_mysql(const uint64_t tenant_id,
const ObXATransID &xid,
const ObTransID &trans_id,
const share::ObLSID &coordinator,
const ObAddr &sche_addr,
const bool is_read_only);
int handle_terminate_for_mysql(const ObXATransID &xid,
ObTxDesc *tx_desc);
int gc_record_for_mysql();
int check_trans_ctx(const int64_t tx_id_value, bool &is_exist);
public:
// for 4.0 dblink
int xa_start_for_tm_promotion(const int64_t flags,
@ -322,6 +357,12 @@ private:
const ObTransID &tx_id,
const ObAddr &original_sche_addr,
const int64_t timeout_us);
private:
// for mysql
int xa_start_for_mysql_(const ObXATransID &xid,
const uint32_t session_id,
const ObTxParam &tx_param,
ObTxDesc *&tx_desc);
private:
// for 4.0 dblink
int xa_start_for_tm_promotion_(const int64_t flags,
@ -487,6 +528,9 @@ private:
#define XA_INNER_INCREMENT_COMPENSATE_COUNT() \
{ MTL(ObXAService*)->get_statistics().inc_compensate_record_count();} \
#define XA_INNER_INCREMENT_TERMINATE_COUNT() \
{ MTL(ObXAService*)->get_statistics().inc_session_terminate_count();} \
/////// statistics of dblink trans
#define DBLINK_STAT_ADD_TRANS_COUNT() \
{ MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_count();} \

View File

@ -293,6 +293,11 @@ void ObXATransStatistics::inc_compensate_record_count()
ATOMIC_INC(&xa_compensate_record_count_);
}
void ObXATransStatistics::inc_session_terminate_count()
{
ATOMIC_INC(&xa_terminate_count_);
}
void ObXATransStatistics::try_print_xa_statistics()
{
static const int64_t STAT_INTERVAL = 9000000; // 9 seconds
@ -351,9 +356,11 @@ void ObXATransStatistics::try_print_xa_statistics()
}
// for inner logic
int64_t xa_compensate_record_count = ATOMIC_LOAD(&xa_compensate_record_count_);
if (0 != xa_compensate_record_count) {
int64_t xa_terminate_count = ATOMIC_LOAD(&xa_terminate_count_);
if (0 != xa_compensate_record_count || 0 != xa_terminate_count) {
FLOG_INFO("xa statistics of inner logic",
"xa_compensate_record_count", xa_compensate_record_count);
"xa_compensate_record_count", xa_compensate_record_count,
"xa_terminate_count", xa_terminate_count);
}
// reset
// NOTE that the active info should not be reset
@ -389,6 +396,7 @@ void ObXATransStatistics::try_print_xa_statistics()
ATOMIC_STORE(&xa_inner_rpc_ten_ms_count_, 0);
ATOMIC_STORE(&xa_inner_rpc_twenty_ms_count_, 0);
ATOMIC_STORE(&xa_compensate_record_count_, 0);
ATOMIC_STORE(&xa_terminate_count_, 0);
}
// for active info
int64_t active_xa_stmt_count = ATOMIC_LOAD(&active_xa_stmt_count_);

View File

@ -33,7 +33,8 @@ public:
xa_inner_sql_count_(0), xa_inner_sql_used_time_us_(0), xa_inner_rpc_count_(0), xa_inner_rpc_used_time_us_(0),
xa_inner_sql_ten_ms_count_(0), xa_inner_sql_twenty_ms_count_(0),
xa_inner_rpc_ten_ms_count_(0), xa_inner_rpc_twenty_ms_count_(0),
active_xa_stmt_count_(0), active_xa_ctx_count_(0), xa_compensate_record_count_(0) {}
active_xa_stmt_count_(0), active_xa_ctx_count_(0), xa_compensate_record_count_(0),
xa_terminate_count_(0) {}
~ObXATransStatistics() { destroy(); }
int init(const uint64_t tenant_id);
void reset();
@ -111,6 +112,8 @@ public:
void dec_active_xa_ctx_count();
// increment compensate record count
void inc_compensate_record_count();
// increment session terminate count
void inc_session_terminate_count();
public:
void try_print_xa_statistics();
@ -160,6 +163,7 @@ private:
int64_t active_xa_ctx_count_;
// inner logic
int64_t xa_compensate_record_count_;
int64_t xa_terminate_count_;
};
class ObDBLinkTransStatistics

View File

@ -19,6 +19,7 @@ namespace oceanbase
{
using namespace common;
using namespace storage;
using namespace share;
namespace transaction
{
@ -59,18 +60,22 @@ void ObXATransHeartbeatWorker::run1()
static const int64_t INTERVAL_US = 5 * 1000 * 1000; // 5s
int64_t loop_count = 0;
int64_t total_time = 0;
const uint64_t tenant_id = MTL_ID();
lib::set_thread_name("ObXAHbWorker");
while (!has_set_stop()) {
int64_t start_time = ObTimeUtility::current_time();
loop_count++;
// MTL(ObXAService *)->get_xa_statistics().print_statistics(start_time);
MTL(ObXAService *)->try_print_statistics();
bool is_oracle_mode = false;
int64_t start_time = ObTimeUtility::current_time();
(void)ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id, is_oracle_mode);
// currently, heartbeat is only for oracle user tenant
const bool need_heartbeat = is_oracle_mode && is_user_tenant(tenant_id);
if (OB_UNLIKELY(!is_inited_)) {
TRANS_LOG(WARN, "xa trans heartbeat not init");
ret = OB_NOT_INIT;
} else {
} else if (need_heartbeat) {
if (OB_FAIL(xa_service_->xa_scheduler_hb_req())) {
TRANS_LOG(WARN, "xa scheduler heartbeat failed", K(ret));
}