From 8f3fafe9f2bf32a861335f53a07289be2d1857a3 Mon Sep 17 00:00:00 2001 From: jw-guo Date: Thu, 21 Nov 2024 08:45:04 +0000 Subject: [PATCH] [FEAT MERGE] patch mysql xa from 425 to 435 --- deps/oblib/src/lib/trace/ob_trace_def.h | 8 + .../pl_non_reserved_keywords_mysql_mode.c | 2 + src/pl/parser/pl_parser_mysql_mode.y | 20 +- src/sql/engine/cmd/ob_tcl_executor.cpp | 8 + src/sql/engine/cmd/ob_xa_executor.cpp | 474 ++++++++++--- src/sql/engine/cmd/ob_xa_executor.h | 38 +- src/sql/executor/ob_cmd_executor.cpp | 4 +- src/sql/ob_sql_trans_control.cpp | 66 +- src/sql/ob_sql_utils.cpp | 6 + .../parser/non_reserved_keywords_mysql_mode.c | 1 + src/sql/parser/sql_parser_mysql_mode.l | 2 +- src/sql/parser/sql_parser_mysql_mode.y | 170 ++++- src/sql/resolver/cmd/ob_show_resolver.cpp | 47 ++ src/sql/resolver/cmd/ob_show_resolver.h | 2 + src/sql/resolver/ob_resolver.cpp | 1 + src/sql/resolver/ob_resolver_utils.cpp | 124 +++- src/sql/resolver/ob_resolver_utils.h | 7 +- src/sql/resolver/xa/ob_xa_commit_resolver.cpp | 28 +- src/sql/resolver/xa/ob_xa_end_resolver.cpp | 27 +- .../resolver/xa/ob_xa_prepare_resolver.cpp | 22 +- .../resolver/xa/ob_xa_rollback_resolver.cpp | 22 +- src/sql/resolver/xa/ob_xa_start_resolver.cpp | 28 +- src/storage/CMakeLists.txt | 2 + src/storage/tx/ob_mysql_xa_service.cpp | 642 ++++++++++++++++++ src/storage/tx/ob_trans_define_v4.cpp | 1 - src/storage/tx/ob_tx_api.cpp | 3 + src/storage/tx/ob_tx_free_route.cpp | 10 + src/storage/tx/ob_xa_ctx.cpp | 325 ++++++++- src/storage/tx/ob_xa_ctx.h | 25 +- src/storage/tx/ob_xa_define.h | 1 + src/storage/tx/ob_xa_inner_sql_client.cpp | 205 ++++++ src/storage/tx/ob_xa_inner_sql_client.h | 57 ++ .../tx/ob_xa_inner_table_gc_worker.cpp | 68 +- src/storage/tx/ob_xa_service.h | 44 ++ src/storage/tx/ob_xa_trans_event.cpp | 12 +- src/storage/tx/ob_xa_trans_event.h | 6 +- .../tx/ob_xa_trans_heartbeat_worker.cpp | 13 +- 37 files changed, 2310 insertions(+), 211 deletions(-) create mode 100644 src/storage/tx/ob_mysql_xa_service.cpp create mode 100644 src/storage/tx/ob_xa_inner_sql_client.cpp create mode 100644 src/storage/tx/ob_xa_inner_sql_client.h diff --git a/deps/oblib/src/lib/trace/ob_trace_def.h b/deps/oblib/src/lib/trace/ob_trace_def.h index 7ee50ef09..5e7a59822 100644 --- a/deps/oblib/src/lib/trace/ob_trace_def.h +++ b/deps/oblib/src/lib/trace/ob_trace_def.h @@ -68,6 +68,14 @@ FLT_DEF_SPAN(com_query_process, "com_query process") FLT_DEF_SPAN(end_transaction, "end of transaction") // for transaction end + // for xa transaction + FLT_DEF_SPAN(xa_start, "xa start") + FLT_DEF_SPAN(xa_end, "xa end") + FLT_DEF_SPAN(xa_prepare, "xa prepare") + FLT_DEF_SPAN(xa_commit, "xa commit") + FLT_DEF_SPAN(xa_rollback, "xa rollback") + // for xa transaction end + // for ps FLT_DEF_SPAN(ps_prepare, "prepare phase of ps protocol") FLT_DEF_SPAN(ps_execute, "execute phase of ps protocol") diff --git a/src/pl/parser/pl_non_reserved_keywords_mysql_mode.c b/src/pl/parser/pl_non_reserved_keywords_mysql_mode.c index f2ed1f239..131de8776 100644 --- a/src/pl/parser/pl_non_reserved_keywords_mysql_mode.c +++ b/src/pl/parser/pl_non_reserved_keywords_mysql_mode.c @@ -180,6 +180,8 @@ static const NonReservedKeyword Mysql_pl_none_reserved_keywords[] = {"submit", SUBMIT}, {"job", JOB}, {"cancel", CANCEL}, + {"xa", XA}, + {"recover", RECOVER}, }; const NonReservedKeyword *mysql_pl_non_reserved_keyword_lookup(const char *word) diff --git a/src/pl/parser/pl_parser_mysql_mode.y b/src/pl/parser/pl_parser_mysql_mode.y index ae5385b04..fe4f7c5f9 100644 --- a/src/pl/parser/pl_parser_mysql_mode.y +++ b/src/pl/parser/pl_parser_mysql_mode.y @@ -223,7 +223,7 @@ void obpl_mysql_wrap_get_user_var_into_subquery(ObParseCtx *parse_ctx, ParseNode DATA DEFINER END_KEY EXTEND FOLLOWS FOUND FUNCTION HANDLER INTERFACE INVOKER JSON LANGUAGE MESSAGE_TEXT MYSQL_ERRNO NATIONAL NEXT NO OF OPEN PACKAGE PRAGMA PRECEDES RECORD RETURNS ROW ROWTYPE SCHEMA_NAME SECURITY SUBCLASS_ORIGIN TABLE_NAME USER TYPE VALUE DATETIME TIMESTAMP TIME DATE YEAR - TEXT NCHAR NVARCHAR BOOL BOOLEAN ENUM BIT FIXED SIGNED ROLE SUBMIT CANCEL JOB + TEXT NCHAR NVARCHAR BOOL BOOLEAN ENUM BIT FIXED SIGNED ROLE SUBMIT CANCEL JOB XA RECOVER //-----------------------------non_reserved keyword end--------------------------------------------- %right END_KEY %left ELSE IF ELSEIF @@ -235,7 +235,7 @@ void obpl_mysql_wrap_get_user_var_into_subquery(ObParseCtx *parse_ctx, ParseNode %nonassoc AUTHID INTERFACE %nonassoc DECLARATION -%type sql_keyword +%type sql_keyword xa_keyword %type unreserved_keyword %type stmt_block stmt_list stmt outer_stmt sp_proc_outer_statement sp_proc_inner_statement sp_proc_independent_statement %type create_procedure_stmt sp_proc_stmt expr expr_list procedure_body default_expr @@ -401,6 +401,13 @@ sql_stmt_prefix: | LOAD { $$ = NULL; } ; +xa_keyword: + BEGIN_KEY { $$ = NULL; } + | SQL_KEYWORD { $$ = NULL; } + | END_KEY { $$ = NULL; } + | RECOVER { $$ = NULL; } +; + sql_stmt: sql_stmt_prefix /*sql stmt tail*/ { @@ -416,6 +423,13 @@ sql_stmt: do_parse_sql_stmt(sql_stmt, parse_ctx, @1.first_column, @1.last_column, 2, ';', END_P); malloc_non_terminal_node($$, parse_ctx->mem_pool_, T_SQL_STMT, 1, sql_stmt); } + | XA xa_keyword /*sql stmt tail*/ + { + //read sql query string直到读到token';'或者END_P + ParseNode *sql_stmt = NULL; + do_parse_sql_stmt(sql_stmt, parse_ctx, @1.first_column, @1.last_column, 2, ';', END_P); + malloc_non_terminal_node($$, parse_ctx->mem_pool_, T_SQL_STMT, 1, sql_stmt); + } | CREATE sql_keyword /*sql stmt tail*/ { //read sql query string直到读到token';'或者END_P @@ -800,6 +814,8 @@ unreserved_keyword: | BIT | FIXED | SIGNED + | XA + | RECOVER ; /***************************************************************************** diff --git a/src/sql/engine/cmd/ob_tcl_executor.cpp b/src/sql/engine/cmd/ob_tcl_executor.cpp index 816f094ca..bc4209bf5 100644 --- a/src/sql/engine/cmd/ob_tcl_executor.cpp +++ b/src/sql/engine/cmd/ob_tcl_executor.cpp @@ -48,6 +48,13 @@ int ObEndTransExecutor::end_trans(ObExecContext &ctx, ObEndTransStmt &stmt) LOG_ERROR("session ptr is null", K(ret)); } else if (my_session->is_in_transaction() && my_session->associated_xa()) { + if (lib::is_mysql_mode()) { + // mysql mode + ret = OB_TRANS_XA_RMFAIL; + LOG_WARN("the command cannot be executed in xa trans", K(ret), K(my_session->get_xid())); + ctx.set_need_disconnect(false); + } else { + // oracle mode #ifdef OB_BUILD_ORACLE_PL transaction::ObTxDesc *tx_desc = my_session->get_tx_desc(); const transaction::ObXATransID xid = my_session->get_xid(); @@ -92,6 +99,7 @@ int ObEndTransExecutor::end_trans(ObExecContext &ctx, ObEndTransStmt &stmt) } ctx.set_need_disconnect(false); #endif + } } else if (OB_FAIL(ObSqlTransControl::explicit_end_trans(ctx, stmt.get_is_rollback(), stmt.get_hint()))) { LOG_WARN("fail end trans", K(ret)); } diff --git a/src/sql/engine/cmd/ob_xa_executor.cpp b/src/sql/engine/cmd/ob_xa_executor.cpp index 308358f0e..75c2580ac 100644 --- a/src/sql/engine/cmd/ob_xa_executor.cpp +++ b/src/sql/engine/cmd/ob_xa_executor.cpp @@ -28,17 +28,379 @@ using namespace common; using namespace transaction; namespace sql { - -int ObXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) +int ObXaExecutorUtil::build_tx_param(ObSQLSessionInfo *session, transaction::ObTxParam ¶m) { - int ret = OB_NOT_SUPPORTED; - LOG_USER_ERROR(OB_NOT_SUPPORTED, "XA protocol start interface"); - UNUSED(ctx); - UNUSED(stmt); - // 暂时禁掉mysql模式下的xa调用 + int ret = OB_SUCCESS; + int64_t org_cluster_id = OB_INVALID_ORG_CLUSTER_ID; + if (OB_FAIL(get_org_cluster_id(session, org_cluster_id))) { + LOG_WARN("get original cluster id failed", K(ret)); + } else { + int64_t tx_timeout_us = 0; + bool is_read_only = session->get_tx_read_only(); + session->get_tx_timeout(tx_timeout_us); + param.timeout_us_ = tx_timeout_us; + param.lock_timeout_us_ = session->get_trx_lock_timeout(); + param.access_mode_ = is_read_only ? ObTxAccessMode::RD_ONLY : ObTxAccessMode::RW; + param.isolation_ = session->get_tx_isolation(); + param.cluster_id_ = org_cluster_id; + } return ret; } +int64_t ObXaExecutorUtil::get_query_timeout(ObSQLSessionInfo *session) +{ + int ret = OB_SUCCESS; + int64_t query_timeout = 10L * 1000 * 1000; // 10 seconds by default + if (NULL != session) { + if (OB_FAIL(session->get_query_timeout(query_timeout))) { + LOG_WARN("get query timeout failed", K(ret), K(query_timeout)); + query_timeout = 10L * 1000 * 1000; + } + } + return query_timeout; +} + +// for mysql xa start +int ObXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) +{ + int ret = OB_SUCCESS; + FLTSpanGuard(xa_start); + ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx); + ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); + ObXATransID xid; + ObTransID tx_id; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); + if (OB_ISNULL(plan_ctx) || OB_ISNULL(my_session)) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid param", K(ret), K(plan_ctx), K(my_session)); + } else if (lib::is_oracle_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected in oracle mode", K(ret)); + } else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) { + ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED; + LOG_WARN("not support tx free route for dblink trans"); + } else if (OB_FAIL(xid.set(stmt.get_gtrid_string(), + stmt.get_bqual_string(), + stmt.get_format_id()))) { + LOG_WARN("set xid failed", K(ret), K(stmt)); + } else if (my_session->get_in_transaction()) { + ObTxDesc *&tx_desc = my_session->get_tx_desc(); + ret = OB_TRANS_XA_OUTSIDE; + LOG_WARN("already start trans", K(ret), K(xid), K(tx_desc->tid()), + K(tx_desc->get_xid())); + } else { + ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock()); + ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock()); + const int64_t flags = stmt.get_flags(); + transaction::ObTxParam &tx_param = plan_ctx->get_trans_param(); + ObTxDesc *&tx_desc = my_session->get_tx_desc(); + if (OB_FAIL(ObXaExecutorUtil::build_tx_param(my_session, tx_param))) { + LOG_WARN("build tx param failed", K(ret)); + } else if (OB_FAIL(MTL(transaction::ObXAService*)->xa_start_for_mysql(xid, + flags, my_session->get_sessid(), tx_param, tx_desc))) { + LOG_WARN("mysql xa start failed", K(ret), K(tx_param)); + my_session->get_trans_result().reset(); + my_session->reset_tx_variable(); + ctx.set_need_disconnect(false); + } else { + // associate xa with session + my_session->associate_xa(xid); + my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id(); + FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id()); + } + } + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_START_TOTAL_COUNT(); + XA_STAT_ADD_XA_START_TOTAL_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + XA_STAT_ADD_XA_START_FAIL_COUNT(); + } + LOG_INFO("mysql xa start", K(ret), K(xid), K(tx_id)); + return ret; +} + +// for mysql xa end +int ObXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt) +{ + int ret = OB_SUCCESS; + FLTSpanGuard(xa_end); + ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); + ObXATransID xid; + ObTransID tx_id; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); + if (OB_ISNULL(my_session)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid param", K(ret), K(my_session)); + } else if (lib::is_oracle_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected in oracle mode", K(ret)); + } else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) { + ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED; + LOG_WARN("not support tx free route for dblink trans"); + } else if (!my_session->get_in_transaction()) { + ret = OB_TRANS_XA_RMFAIL; + LOG_WARN("not in transaction", K(ret)); + } else if (OB_FAIL(xid.set(stmt.get_gtrid_string(), + stmt.get_bqual_string(), + stmt.get_format_id()))) { + LOG_WARN("set xid failed", K(ret), K(stmt)); + } else if (!xid.all_equal_to(my_session->get_xid())) { + ret = OB_TRANS_XA_NOTA; + LOG_WARN("unknown xid", K(ret), K(xid), K(my_session->get_xid())); + } else { + ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock()); + ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock()); + ObTxDesc *&tx_desc = my_session->get_tx_desc(); + FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id()); + if (OB_FAIL(MTL(transaction::ObXAService*)->xa_end_for_mysql(xid, tx_desc))) { + LOG_WARN("mysql xa end failed", K(ret), K(xid)); + } + } + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_END_TOTAL_COUNT(); + XA_STAT_ADD_XA_END_TOTAL_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + XA_STAT_ADD_XA_END_FAIL_COUNT(); + } + LOG_INFO("mysql xa end", K(ret), K(xid)); + return ret; +} + +// for mysql xa prepare +int ObXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt) +{ + int ret = OB_SUCCESS; + FLTSpanGuard(xa_prepare); + ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); + ObXATransID xid; + ObTransID tx_id; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); + if (OB_ISNULL(my_session)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid param", K(ret), K(my_session)); + } else if (lib::is_oracle_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected in oracle mode", K(ret)); + } else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) { + ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED; + LOG_WARN("not support tx free route for dblink trans"); + } else if (!my_session->get_in_transaction()) { + ret = OB_TRANS_XA_RMFAIL; + LOG_WARN("not in transaction", K(ret)); + } else if (OB_FAIL(xid.set(stmt.get_gtrid_string(), + stmt.get_bqual_string(), + stmt.get_format_id()))) { + LOG_WARN("set xid failed", K(ret), K(stmt)); + } else if (!xid.all_equal_to(my_session->get_xid())) { + ret = OB_TRANS_XA_NOTA; + LOG_WARN("unknown xid", K(ret), K(xid), K(my_session->get_xid())); + } else { + // in two cases, xa trans need exit + // case one, xa prepare succeeds + // case two, xa trans need rollback + const int64_t timeout_us = ObXaExecutorUtil::get_query_timeout(my_session); + bool need_exit = false; + ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock()); + ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock()); + ObTxDesc *&tx_desc = my_session->get_tx_desc(); + my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id(); + FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id()); + if (0 >= timeout_us) { + ret = OB_TRANS_STMT_TIMEOUT; + LOG_WARN("xa stmt timeout", K(ret), K(xid), K(timeout_us)); + } else if (OB_FAIL(MTL(transaction::ObXAService*)->xa_prepare_for_mysql(xid, timeout_us, + tx_desc, need_exit))) { + LOG_WARN("mysql xa prepare failed", K(ret), K(xid)); + } + if (need_exit) { + my_session->get_trans_result().reset(); + my_session->reset_tx_variable(); + my_session->disassociate_xa(); + ctx.set_need_disconnect(false); + } + } + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_PREPARE_TOTAL_COUNT(); + XA_STAT_ADD_XA_PREPARE_TOTAL_USED_TIME(used_time_us); + if (OB_SUCCESS != ret && OB_TRANS_XA_RDONLY != ret) { + XA_STAT_ADD_XA_PREPARE_FAIL_COUNT(); + } + LOG_INFO("mysql xa prepare", K(ret), K(xid)); + return ret; +} + +// for mysql xa commit +int ObXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt) +{ + int ret = OB_SUCCESS; + FLTSpanGuard(xa_commit); + ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); + ObXATransID xid; + ObTransID tx_id; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); + if (OB_ISNULL(my_session)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid param", K(ret), K(my_session)); + } else if (lib::is_oracle_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected in oracle mode", K(ret)); + } else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) { + ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED; + LOG_WARN("not support tx free route for dblink trans"); + } else if (OB_FAIL(xid.set(stmt.get_gtrid_string(), + stmt.get_bqual_string(), + stmt.get_format_id()))) { + LOG_WARN("set xid failed", K(ret), K(stmt)); + } else { + const int64_t timeout_us = ObXaExecutorUtil::get_query_timeout(my_session); + ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock()); + ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock()); + const int64_t flags = stmt.get_flags(); + if (0 >= timeout_us) { + ret = OB_TRANS_STMT_TIMEOUT; + LOG_WARN("xa stmt timeout", K(ret), K(xid), K(timeout_us)); + } else if (ObXAFlag::is_tmonephase(flags)) { + bool need_exit = false; + // one phase xa commit + if (!my_session->get_in_transaction()) { + ret = OB_TRANS_XA_RMFAIL; + LOG_WARN("no xa trans for one phase xa commit", K(ret), K(xid)); + } else if (my_session->get_xid().empty()) { + ret = OB_TRANS_XA_NOTA; + LOG_WARN("unknown xid", K(ret), K(xid)); + } else if (!xid.all_equal_to(my_session->get_xid())) { + ret = OB_TRANS_XA_RMFAIL; + LOG_WARN("unexpected xid", K(ret), K(xid), K(my_session->get_xid())); + } else { + ObTxDesc *&tx_desc = my_session->get_tx_desc(); + my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id(); + FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id()); + if (OB_FAIL(MTL(transaction::ObXAService*)->xa_commit_onephase_for_mysql(xid, + timeout_us, tx_desc, need_exit))) { + LOG_WARN("mysql xa commit failed", K(ret), K(xid)); + } + if (need_exit) { + my_session->get_trans_result().reset(); + my_session->reset_tx_variable(); + my_session->disassociate_xa(); + ctx.set_need_disconnect(false); + } + } + } else if (ObXAFlag::is_tmnoflags_for_mysql(flags)) { + // two phase xa commit + const bool is_rollback = false; + if (my_session->get_in_transaction()) { + if (my_session->get_xid().empty()) { + ret = OB_TRANS_XA_NOTA; + LOG_WARN("unknown xid", K(ret), K(xid)); + } else { + ret = OB_TRANS_XA_RMFAIL; + LOG_WARN("can not be executed in this state", K(ret), K(xid), K(my_session->get_xid())); + } + } else if (OB_FAIL(MTL(transaction::ObXAService*)->xa_second_phase_twophase_for_mysql(xid, + timeout_us, is_rollback, tx_id))) { + LOG_WARN("mysql xa commit failed", K(ret), K(xid)); + } + my_session->get_raw_audit_record().trans_id_ = tx_id; + FLT_SET_TAG(trans_id, tx_id.get_id()); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected flags for mysql xa commit", K(ret), K(xid)); + } + } + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_COMMIT_TOTAL_COUNT(); + XA_STAT_ADD_XA_COMMIT_TOTAL_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + XA_STAT_ADD_XA_COMMIT_FAIL_COUNT(); + } + LOG_INFO("mysql xa commit", K(ret), K(xid)); + return ret; +} + +// for mysql xa rollback +int ObXaRollbackExecutor::execute(ObExecContext &ctx, ObXaRollBackStmt &stmt) +{ + int ret = OB_SUCCESS; + FLTSpanGuard(xa_rollback); + ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); + ObXATransID xid; + ObTransID tx_id; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); + if (OB_ISNULL(my_session)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid param", K(ret), K(my_session)); + } else if (lib::is_oracle_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected in oracle mode", K(ret)); + } else if (!my_session->is_inner() && my_session->is_txn_free_route_temp()) { + ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED; + LOG_WARN("not support tx free route for dblink trans"); + } else if (OB_FAIL(xid.set(stmt.get_gtrid_string(), + stmt.get_bqual_string(), + stmt.get_format_id()))) { + LOG_WARN("set xid failed", K(ret), K(stmt)); + } else { + bool need_exit = false; + const int64_t timeout_us = ObXaExecutorUtil::get_query_timeout(my_session); + ObSQLSessionInfo::LockGuard session_query_guard(my_session->get_query_lock()); + ObSQLSessionInfo::LockGuard data_lock_guard(my_session->get_thread_data_lock()); + if (0 >= timeout_us) { + ret = OB_TRANS_STMT_TIMEOUT; + LOG_WARN("xa stmt timeout", K(ret), K(xid), K(timeout_us)); + } else if (!my_session->get_in_transaction()) { + // try two phase xa rollback + const bool is_rollback = true; + if (OB_FAIL(MTL(transaction::ObXAService*)->xa_second_phase_twophase_for_mysql(xid, + timeout_us, is_rollback, tx_id))) { + LOG_WARN("mysql xa rollback failed", K(ret), K(xid)); + } + my_session->get_raw_audit_record().trans_id_ = tx_id; + FLT_SET_TAG(trans_id, tx_id.get_id()); + } else { + // try one phase xa rollback + ObTxDesc *&tx_desc = my_session->get_tx_desc(); + my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id(); + FLT_SET_TAG(trans_id, my_session->get_tx_id().get_id()); + if (my_session->get_xid().empty()) { + ret = OB_TRANS_XA_NOTA; + LOG_WARN("unknown xid", K(ret), K(xid)); + } else if (!xid.all_equal_to(my_session->get_xid())) { + ret = OB_TRANS_XA_RMFAIL; + LOG_WARN("unexpected xid", K(ret), K(xid), K(my_session->get_xid())); + } else if (OB_FAIL(MTL(transaction::ObXAService*)->xa_rollback_onephase_for_mysql(xid, + timeout_us, tx_desc, need_exit))) { + LOG_WARN("mysql xa rollback failed", K(ret), K(xid)); + } + if (need_exit) { + my_session->get_trans_result().reset(); + my_session->reset_tx_variable(); + my_session->disassociate_xa(); + ctx.set_need_disconnect(false); + } + } + } + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_ROLLBACK_TOTAL_COUNT(); + XA_STAT_ADD_XA_ROLLBACK_TOTAL_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + XA_STAT_ADD_XA_ROLLBACK_FAIL_COUNT(); + } + LOG_INFO("mysql xa rollback", K(ret), K(xid)); + return ret; +} + +// for oracle xa start int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) { int ret = OB_SUCCESS; @@ -71,7 +433,7 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) } else if (true == my_session->is_for_trigger_package()) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support xa start in trigger", K(ret), K(xid)); - } else if (OB_FAIL(get_org_cluster_id_(my_session, org_cluster_id))) { + } else if (OB_FAIL(ObXaExecutorUtil::get_org_cluster_id(my_session, org_cluster_id))) { } else if (OB_FAIL(my_session->get_tx_timeout(tx_timeout))) { LOG_ERROR("fail to get trans timeout ts", K(ret)); } else if (FALSE_IT(tenant_id = my_session->get_effective_tenant_id())) { @@ -131,7 +493,7 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) return ret; } -int ObPlXaStartExecutor::get_org_cluster_id_(ObSQLSessionInfo *session, int64_t &org_cluster_id) { +int ObXaExecutorUtil::get_org_cluster_id(ObSQLSessionInfo *session, int64_t &org_cluster_id) { int ret = OB_SUCCESS; if (OB_FAIL(session->get_ob_org_cluster_id(org_cluster_id))) { LOG_WARN("fail to get ob_org_cluster_id", K(ret)); @@ -152,45 +514,6 @@ int ObPlXaStartExecutor::get_org_cluster_id_(ObSQLSessionInfo *session, int64_t return ret; } -int ObXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt) -{ - int ret = OB_NOT_SUPPORTED; - LOG_USER_ERROR(OB_NOT_SUPPORTED, "XA protocol end interface"); - UNUSED(ctx); - UNUSED(stmt); - // 暂时禁掉mysql模式下的xa调用 - return ret; - /* - int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); - ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx(); - //storage::ObPartitionService *ps = nullptr; - - // if (OB_ISNULL(my_session) || OB_ISNULL(ps = task_exec_ctx.get_partition_service())) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("invalid param", K(ret), K(my_session)); - // mysql调用flags置为TMSUCCESS - } else if (OB_FAIL(ps->xa_end(stmt.get_xa_string(), - transaction::ObXAEndFlag::TMSUCCESS, - my_session->get_trans_desc()))) { - LOG_WARN("xa end failed", K(ret), K(stmt.get_xa_string())); - // 如果是OB_TRANS_XA_RMFAIL错误那么由用户决定是否回滚 - if (OB_TRANS_XA_RMFAIL != ret - && OB_SUCCESS != (tmp_ret = ObSqlTransControl::explicit_end_trans(ctx, true))) { - ret = tmp_ret; - LOG_WARN("explicit end trans failed", K(ret)); - } - } else { - my_session->reset_tx_variable(); - ctx.set_need_disconnect(false); - my_session->get_trans_desc().get_standalone_stmt_desc().reset(); - } - LOG_DEBUG("xa end execute", K(stmt.get_xa_string())); - return ret; - */ -} - int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt) { int ret = OB_SUCCESS; @@ -263,49 +586,6 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt) return ret; } -int ObXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt) -{ - int ret = OB_NOT_SUPPORTED; - LOG_USER_ERROR(OB_NOT_SUPPORTED, "XA protocol prepare interface"); - UNUSED(ctx); - UNUSED(stmt); - // 暂时禁掉mysql模式下的xa调用 - return ret; - /* - int ret = OB_SUCCESS; - //int tmp_ret = OB_SUCCESS; - ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); - ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx(); - ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx); - //storage::ObPartitionService *ps = nullptr; - if (OB_ISNULL(my_session) || OB_ISNULL(plan_ctx) - // || OB_ISNULL(ps = task_exec_ctx.get_partition_service())) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("invalid param", K(ret), K(my_session), K(plan_ctx), K(ps)); - } else if (my_session->get_in_transaction()) { - ret = OB_TRANS_XA_OUTSIDE; - LOG_WARN("already start trans", K(ret)); - } else { - const uint64_t tenant_id = my_session->get_effective_tenant_id(); - int64_t timeout = plan_ctx->get_trans_timeout_timestamp(); - transaction::ObStartTransParam &start_trans_param = plan_ctx->get_start_trans_param(); - init_start_trans_param(my_session, task_exec_ctx, start_trans_param); - if (OB_FAIL(ps->xa_prepare(stmt.get_xa_string(), tenant_id, timeout))) { - LOG_WARN("xa prepare failed", K(ret), K(stmt.get_xa_string())); - // 如果是OB_TRANS_XA_RMFAIL错误那么由用户决定是否回滚 - // if (OB_TRANS_XA_RMFAIL != ret - // && OB_SUCCESS != (tmp_ret = ObSqlTransControl::explicit_end_trans(ctx, true))) { - // ret = tmp_ret; - // LOG_WARN("explicit end trans failed", K(ret)); - // } - } - } - - LOG_DEBUG("xa prepare execute", K(stmt.get_xa_string())); - return ret; - */ -} - int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt) { int ret = OB_SUCCESS; @@ -350,18 +630,6 @@ int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt) return ret; } -int ObXaEndTransExecutor::execute_(const ObString &xid, - const bool is_rollback, ObExecContext &ctx) -{ - int ret = OB_NOT_SUPPORTED; - LOG_USER_ERROR(OB_NOT_SUPPORTED, "XA protocol end trans interface"); - UNUSED(xid); - UNUSED(is_rollback); - UNUSED(ctx); - // 暂时禁掉mysql模式下的xa调用 - return ret; -} - int ObPlXaEndTransExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt) { int ret = OB_SUCCESS; diff --git a/src/sql/engine/cmd/ob_xa_executor.h b/src/sql/engine/cmd/ob_xa_executor.h index a50dc848f..bca01016d 100644 --- a/src/sql/engine/cmd/ob_xa_executor.h +++ b/src/sql/engine/cmd/ob_xa_executor.h @@ -23,6 +23,14 @@ namespace sql { class ObExecContext; class ObXaStartStmt; +class ObXaExecutorUtil +{ +public: + static int get_org_cluster_id(ObSQLSessionInfo *session, int64_t &org_cluster_id); + static int build_tx_param(ObSQLSessionInfo *session, transaction::ObTxParam ¶m); + static int64_t get_query_timeout(ObSQLSessionInfo *session); +}; + class ObXaStartExecutor { public: @@ -89,23 +97,25 @@ private: class ObXaCommitStmt; class ObXaRollBackStmt; -class ObXaEndTransExecutor + +class ObXaCommitExecutor { public: - ObXaEndTransExecutor() {} - ~ObXaEndTransExecutor() {} - int execute(ObExecContext &ctx, ObXaCommitStmt &stmt) - { - return execute_(stmt.get_xa_string(), false, ctx); - } - int execute(ObExecContext &ctx, ObXaRollBackStmt &stmt) - { - return execute_(stmt.get_xa_string(), true, ctx); - } + ObXaCommitExecutor() {} + ~ObXaCommitExecutor() {} + int execute(ObExecContext &ctx, ObXaCommitStmt &stmt); private: - int execute_(const common::ObString& xid, const bool is_rollback, - ObExecContext &ctx); - DISALLOW_COPY_AND_ASSIGN(ObXaEndTransExecutor); + DISALLOW_COPY_AND_ASSIGN(ObXaCommitExecutor); +}; + +class ObXaRollbackExecutor +{ +public: + ObXaRollbackExecutor() {} + ~ObXaRollbackExecutor() {} + int execute(ObExecContext &ctx, ObXaRollBackStmt &stmt); +private: + DISALLOW_COPY_AND_ASSIGN(ObXaRollbackExecutor); }; class ObPlXaEndTransExecutor diff --git a/src/sql/executor/ob_cmd_executor.cpp b/src/sql/executor/ob_cmd_executor.cpp index f0f1b0f96..3d42ff8f6 100644 --- a/src/sql/executor/ob_cmd_executor.cpp +++ b/src/sql/executor/ob_cmd_executor.cpp @@ -911,11 +911,11 @@ int ObCmdExecutor::execute(ObExecContext &ctx, ObICmd &cmd) break; } case stmt::T_XA_COMMIT: { - DEFINE_EXECUTE_CMD(ObXaCommitStmt, ObXaEndTransExecutor); + DEFINE_EXECUTE_CMD(ObXaCommitStmt, ObXaCommitExecutor); break; } case stmt::T_XA_ROLLBACK: { - DEFINE_EXECUTE_CMD(ObXaRollBackStmt, ObXaEndTransExecutor); + DEFINE_EXECUTE_CMD(ObXaRollBackStmt, ObXaRollbackExecutor); break; } case stmt::T_ALTER_DISKGROUP_ADD_DISK: { diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index cec3d8970..e3fe27574 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -377,13 +377,35 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause) // propagated to the trans start node need_abort_tx = true; } - // if XA-txn is on this server, we have acquired its ref, release ref - // and disassocate with session - if (tx_desc->is_xa_trans() && tx_desc->get_addr() == GCONF.self_addr_) { - ObTransService *txs = MTL(ObTransService*); - CK (OB_NOT_NULL(txs), session_id, tx_id); - OZ (txs->release_tx_ref(*tx_desc), session_id, tx_id); - session->get_tx_desc() = NULL; + if (tx_desc->is_xa_trans()) { + if (lib::is_mysql_mode()) { + // for mysql mode + if (OB_DEAD_LOCK == cause) { + need_abort_tx = true; + } else { + // do nothing + } + } else { + // for oracle mode + // in this case, OB_DEAD_LOCK is unexpected + if (OB_DEAD_LOCK == cause) { + // for deadlock + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected cause", KR(ret), K(cause)); + } else if (tx_desc->get_addr() == GCONF.self_addr_) { + // for xa loosely coupled mode and xa start resume in a new server + // (not the server xa start first) + // therefore, the temp session for free route can be in the server xa start first + // if XA-txn is on this server, we have acquired its ref, release ref + // and disassocate with session + ObTransService *txs = MTL(ObTransService*); + CK (OB_NOT_NULL(txs), session_id, tx_id); + OZ (txs->release_tx_ref(*tx_desc), session_id, tx_id); + session->get_tx_desc() = NULL; + } else { + // do nothing + } + } } } else if (tx_desc->is_xa_trans()) { const transaction::ObXATransID xid = session->get_xid(); @@ -391,17 +413,39 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause) ObXAService *xas = MTL(ObXAService *); CK (OB_NOT_NULL(xas)); if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) { - OZ (xas->handle_terminate_for_xa_branch(session->get_xid(), tx_desc, session->get_xa_end_timeout_seconds()), - xid, global_tx_type, session_id, tx_id); - // currently, tx_desc is NULL + if (lib::is_mysql_mode()) { + // for mysql mode + if (OB_DEAD_LOCK == cause) { + // for dead lock + need_abort_tx = true; + } else { + // for session termiante + OZ (xas->handle_terminate_for_mysql(session->get_xid(), tx_desc), + xid, global_tx_type, session_id, tx_id); + // currently, tx_desc is NULL + session->get_tx_desc() = NULL; + } + } else { + // for oracle mode + if (OB_DEAD_LOCK == cause) { + // for deadlock + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected cause", KR(ret), K(cause)); + } else { + OZ (xas->handle_terminate_for_xa_branch(session->get_xid(), tx_desc, session->get_xa_end_timeout_seconds()), + xid, global_tx_type, session_id, tx_id); + // currently, tx_desc is NULL + session->get_tx_desc() = NULL; + } + } } else if (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type) { OZ (xas->rollback_for_dblink_trans(tx_desc), ret, xid, global_tx_type, tx_id); // currently, tx_desc is NULL + session->get_tx_desc() = NULL; } else { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected global trans type", K(ret), K(xid), K(global_tx_type), K(tx_id)); } - session->get_tx_desc() = NULL; } else { need_abort_tx = true; } diff --git a/src/sql/ob_sql_utils.cpp b/src/sql/ob_sql_utils.cpp index 22563f486..be2249820 100644 --- a/src/sql/ob_sql_utils.cpp +++ b/src/sql/ob_sql_utils.cpp @@ -1754,6 +1754,12 @@ bool ObSQLUtils::is_readonly_stmt(ParseResult &result) || T_SHOW_SEQUENCES == type || T_SHOW_ENGINE == type || T_SHOW_OPEN_TABLES == type + || T_XA_START == type + || T_XA_END == type + || T_XA_PREPARE == type + || T_XA_COMMIT == type + || T_XA_ROLLBACK == type + || T_XA_RECOVER == type || (T_SET_ROLE == type && lib::is_mysql_mode()) || T_SHOW_CREATE_USER == type) { ret = true; diff --git a/src/sql/parser/non_reserved_keywords_mysql_mode.c b/src/sql/parser/non_reserved_keywords_mysql_mode.c index 36889b6ed..b0717c14c 100644 --- a/src/sql/parser/non_reserved_keywords_mysql_mode.c +++ b/src/sql/parser/non_reserved_keywords_mysql_mode.c @@ -1047,6 +1047,7 @@ static const NonReservedKeyword Mysql_none_reserved_keywords[] = {"wrapper", WRAPPER}, {"x509", X509}, {"xa", XA}, + {"xid", XID}, {"xml", XML}, {"xor", XOR}, {"year", YEAR}, diff --git a/src/sql/parser/sql_parser_mysql_mode.l b/src/sql/parser/sql_parser_mysql_mode.l index 9b8dd1479..d64aced18 100644 --- a/src/sql/parser/sql_parser_mysql_mode.l +++ b/src/sql/parser/sql_parser_mysql_mode.l @@ -818,7 +818,7 @@ B'([01])*'|0B([01])+ { STORE_PARAM_NODE(); } else { yylval->node->sql_str_off_ = yylloc->first_column - 1; - return HEX_STRING_VALUE; + return BIN_STRING_VALUE; } } diff --git a/src/sql/parser/sql_parser_mysql_mode.y b/src/sql/parser/sql_parser_mysql_mode.y index 026b9c1ca..e8dd7f7c4 100644 --- a/src/sql/parser/sql_parser_mysql_mode.y +++ b/src/sql/parser/sql_parser_mysql_mode.y @@ -71,6 +71,7 @@ extern int easy_vsnprintf(char *buf, size_t size, const char *fmt, va_list args) %token CLIENT_VERSION %token MYSQL_DRIVER %token HEX_STRING_VALUE +%token BIN_STRING_VALUE %token REMAP_TABLE_NAME %token REMAP_DATABASE_TABLE_NAME %token OUTLINE_DEFAULT_TOKEN/*use for outline parser to just filter hint of query_sql*/ @@ -375,7 +376,7 @@ END_P SET_VAR DELIMITER WAIT WARNINGS WASH WEEK WEIGHT_STRING WHENEVER WORK WRAPPER WINDOW WEAK WITH_COLUMN_GROUP WITHOUT - X509 XA XML + X509 XA XID XML YEAR @@ -464,7 +465,7 @@ END_P SET_VAR DELIMITER %type parameterized_trim %type opt_with_consistent_snapshot opt_config_scope opt_index_keyname opt_full opt_mode_flag opt_extended opt_extended_or_full %type opt_priority opt_low_priority delete_option delete_option_list opt_delete_option_list -%type opt_work begin_stmt commit_stmt rollback_stmt opt_ignore opt_ignore_or_replace ignore_or_replace xa_begin_stmt xa_end_stmt xa_prepare_stmt xa_commit_stmt xa_rollback_stmt +%type opt_work begin_stmt commit_stmt rollback_stmt opt_ignore opt_ignore_or_replace ignore_or_replace xa_begin_stmt xa_end_stmt xa_prepare_stmt xa_commit_stmt xa_rollback_stmt xa_recover_stmt xa_xid opt_join_or_resume opt_suspend opt_one_phase opt_convert_xid %type alter_table_stmt alter_table_actions alter_table_action_list alter_table_action alter_column_option alter_index_option alter_constraint_option standalone_alter_action alter_partition_option opt_to alter_tablegroup_option opt_table opt_tablegroup_option_list alter_tg_partition_option alter_column_group_option %type tablegroup_option_list tablegroup_option alter_tablegroup_actions alter_tablegroup_action tablegroup_option_list_space_seperated %type opt_tg_partition_option tg_hash_partition_option tg_key_partition_option tg_range_partition_option tg_subpartition_option tg_list_partition_option @@ -507,7 +508,7 @@ END_P SET_VAR DELIMITER %type flashback_stmt purge_stmt opt_flashback_rename_table opt_flashback_rename_database opt_flashback_rename_tenant %type tenant_name_list opt_tenant_list tenant_list_tuple cache_type flush_scope opt_zone_list %type into_opt into_clause field_opt field_term field_term_list line_opt line_term line_term_list into_var_list into_var file_partition_opt file_opt file_option_list file_option file_size_const -%type string_list text_string string_val_list +%type string_list text_string string_val_list ulong_num %type balance_task_type opt_balance_task_type %type list_expr list_partition_element list_partition_expr list_partition_list list_partition_option opt_list_partition_list opt_list_subpartition_list list_subpartition_list list_subpartition_element drop_partition_name_list %type primary_zone_name change_tenant_name_or_tenant_id distribute_method distribute_method_list @@ -732,6 +733,7 @@ stmt: | xa_prepare_stmt { $$ = $1; check_question_mark($$, result); } | xa_commit_stmt { $$ = $1; check_question_mark($$, result); } | xa_rollback_stmt { $$ = $1; check_question_mark($$, result); } + | xa_recover_stmt { $$ = $1; check_question_mark($$, result); } | optimize_stmt { $$ = $1; check_question_mark($$, result); } | dump_memory_stmt { $$ = $1; check_question_mark($$, result); } | get_diagnostics_stmt { $$ = $1; question_mark_issue($$, result); } @@ -1074,6 +1076,17 @@ STRING_VALUE %prec LOWER_THAN_COMP $$->sql_str_off_ = $2->sql_str_off_; $$->is_forbid_parameter_ = $2->is_forbid_parameter_; } +| charset_introducer BIN_STRING_VALUE +{ + /* _utf8mb4 0x42 作为字符串处理 */ + malloc_non_terminal_node($$, result->malloc_pool_, T_VARCHAR, 1, $1); + $$->str_value_ = $2->str_value_; + $$->str_len_ = $2->str_len_; + $$->raw_text_ = $2->raw_text_; + $$->text_len_ = $2->text_len_; + $$->sql_str_off_ = $2->sql_str_off_; + $$->is_forbid_parameter_ = $2->is_forbid_parameter_; +} | STRING_VALUE string_val_list %prec LOWER_THAN_COMP { ParseNode *str_node = NULL; @@ -1261,6 +1274,11 @@ complex_string_literal { $$ = $1; } $$ = $1; $$->type_ = T_HEX_STRING; } +| BIN_STRING_VALUE +{ + $$ = $1; + $$->type_ = T_HEX_STRING; +} ; number_literal: @@ -6633,11 +6651,37 @@ STRING_VALUE $$ = $1; } | HEX_STRING_VALUE +{ + $$ = $1; +} +| BIN_STRING_VALUE { $$ = $1; }; +ulong_num: +INTNUM +{ + if(T_UINT64 == $1->type_) { + yyerror(&@1, result, "formatID should not greater than INT64_MAX\n"); + YYERROR; + } + $$ = $1; +} +| HEX_STRING_VALUE +{ + int err_no = 0; + uint64_t val = 0; + val = ob_strntoull($1->raw_text_ + 2, $1->text_len_ - 2, 16, NULL, &err_no); + if(val > INT64_MAX || ERANGE == errno) { + YYABORT; + } else { + $1->value_ = val; + $$ = $1; + } +}; + int_type_i: TINYINT { $$[0] = T_TINYINT; } | SMALLINT { $$[0] = T_SMALLINT; } @@ -16352,44 +16396,143 @@ BEGI opt_hint_value opt_work ******************************************************************************/ xa_begin_stmt: -XA START STRING_VALUE +XA START xa_xid opt_join_or_resume { - malloc_non_terminal_node($$, result->malloc_pool_, T_XA_START, 1, $3); + malloc_non_terminal_node($$, result->malloc_pool_, T_XA_START, 2, $3, $4); } -| XA BEGI STRING_VALUE +| XA BEGI xa_xid opt_join_or_resume { - malloc_non_terminal_node($$, result->malloc_pool_, T_XA_START, 1, $3); + malloc_non_terminal_node($$, result->malloc_pool_, T_XA_START, 2, $3, $4); } ; xa_end_stmt: -XA END STRING_VALUE +XA END xa_xid opt_suspend { - malloc_non_terminal_node($$, result->malloc_pool_, T_XA_END, 1, $3); + malloc_non_terminal_node($$, result->malloc_pool_, T_XA_END, 2, $3, $4); } ; xa_prepare_stmt: -XA PREPARE STRING_VALUE +XA PREPARE xa_xid { malloc_non_terminal_node($$, result->malloc_pool_, T_XA_PREPARE, 1, $3); } ; xa_commit_stmt: -XA COMMIT STRING_VALUE +XA COMMIT xa_xid opt_one_phase { - malloc_non_terminal_node($$, result->malloc_pool_, T_XA_COMMIT, 1, $3); + malloc_non_terminal_node($$, result->malloc_pool_, T_XA_COMMIT, 2, $3, $4); } ; xa_rollback_stmt: -XA ROLLBACK STRING_VALUE +XA ROLLBACK xa_xid { malloc_non_terminal_node($$, result->malloc_pool_, T_XA_ROLLBACK, 1, $3); } ; +xa_recover_stmt: +XA RECOVER opt_convert_xid +{ + malloc_non_terminal_node($$, result->malloc_pool_, T_XA_RECOVER, 1, $3); +} +; + +xa_xid: +text_string +{ + if(64 < $1->str_len_) { + yyerror(&@1, result, "gtrid length should not greater than 64\n"); + YYERROR; + } + malloc_non_terminal_node($$, result->malloc_pool_, T_LINK_NODE, 1, $1); +} +| text_string ',' text_string +{ + if(64 < $1->str_len_) { + yyerror(&@1, result, "gtrid length should not greater than 64\n"); + YYERROR; + } + if(64 < $3->str_len_) { + yyerror(&@3, result, "bqual length should not greater than 64\n"); + YYERROR; + } + malloc_non_terminal_node($$, result->malloc_pool_, T_LINK_NODE, 2, $1, $3); +} +| text_string ',' text_string ',' ulong_num +{ + if(64 < $1->str_len_) { + yyerror(&@1, result, "gtrid length should not greater than 64\n"); + YYERROR; + } + if(64 < $3->str_len_) { + yyerror(&@3, result, "bqual length should not greater than 64\n"); + YYERROR; + } + malloc_non_terminal_node($$, result->malloc_pool_, T_LINK_NODE, 3, $1, $3, $5); +} + +opt_join_or_resume: +/* empty */ +{ + $$= NULL; +} +| JOIN +{ + malloc_terminal_node($$, result->malloc_pool_, T_INT); + $$->value_ = 0; + +} +| RESUME +{ + malloc_terminal_node($$, result->malloc_pool_, T_INT); + $$->value_ = 1; +} +; + +opt_suspend: +/* empty */ +{ + $$= NULL; +} +| SUSPEND +{ + malloc_terminal_node($$, result->malloc_pool_, T_INT); + $$->value_ = 0; +} +| SUSPEND FOR MIGRATE +{ + malloc_terminal_node($$, result->malloc_pool_, T_INT); + $$->value_ = 1; +} +; + +opt_one_phase: +/* empty */ +{ + $$= NULL; +} +| ONE PHASE +{ + malloc_terminal_node($$, result->malloc_pool_, T_INT); + $$->value_ = 0; +} +; + +opt_convert_xid: +/* empty */ +{ + $$= NULL; +} +| CONVERT XID +{ + malloc_terminal_node($$, result->malloc_pool_, T_INT); + $$->value_ = 0; +} + /***************************************************************************** * * commit grammer @@ -24462,6 +24605,7 @@ ACCESS_INFO | WRAPPER | X509 | XA +| XID | XML | YEAR | ZONE diff --git a/src/sql/resolver/cmd/ob_show_resolver.cpp b/src/sql/resolver/cmd/ob_show_resolver.cpp index 99b157490..8cb1dcb60 100644 --- a/src/sql/resolver/cmd/ob_show_resolver.cpp +++ b/src/sql/resolver/cmd/ob_show_resolver.cpp @@ -22,6 +22,7 @@ #include "observer/ob_server_struct.h" #include "sql/resolver/dcl/ob_grant_resolver.h" #include "observer/virtual_table/ob_tenant_all_tables.h" +#include "storage/tx/ob_xa_define.h" #include "share/schema/ob_schema_printer.h" using namespace oceanbase::common; using namespace oceanbase::share; @@ -158,6 +159,7 @@ int ObShowResolver::resolve(const ParseNode &parse_tree) && OB_UNLIKELY(parse_tree.type_ != T_SHOW_PROFILE) && OB_UNLIKELY(parse_tree.type_ != T_SHOW_PROCEDURE_CODE) && OB_UNLIKELY(parse_tree.type_ != T_SHOW_FUNCTION_CODE) + && OB_UNLIKELY(parse_tree.type_ != T_XA_RECOVER) && OB_UNLIKELY(parse_tree.type_ != T_SHOW_ENGINE) && OB_UNLIKELY(parse_tree.type_ != T_SHOW_OPEN_TABLES) && OB_UNLIKELY(parse_tree.type_ != T_SHOW_CREATE_USER) @@ -1780,6 +1782,41 @@ int ObShowResolver::resolve(const ParseNode &parse_tree) }(); break; } + case T_XA_RECOVER: { + if (is_oracle_mode) { + ret = OB_NOT_SUPPORTED; + LOG_USER_ERROR(OB_NOT_SUPPORTED, "xa recover in oracle mode is"); + } else if (OB_UNLIKELY(parse_tree.num_child_ != 1)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("parse tree is wrong", + K(ret), + K(parse_tree.num_child_), + K(parse_tree.children_)); + } else { + if(parse_tree.children_[0] == NULL) { + GEN_SQL_STEP_1(ObShowSqlSet::XA_RECOVER); + GEN_SQL_STEP_2(ObShowSqlSet::XA_RECOVER, + OB_SYS_DATABASE_NAME, + OB_ALL_VIRTUAL_GLOBAL_TRANSACTION_TNAME, + transaction::ObXATransState::PREPARED); + } else { + if(parse_tree.children_[0]->value_ != 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("parse tree is wrong", + K(ret), + K(parse_tree.num_child_), + K(parse_tree.children_)); + } else { + GEN_SQL_STEP_1(ObShowSqlSet::XA_RECOVER_CONVERT_XID); + GEN_SQL_STEP_2(ObShowSqlSet::XA_RECOVER_CONVERT_XID, + OB_SYS_DATABASE_NAME, + OB_ALL_VIRTUAL_GLOBAL_TRANSACTION_TNAME, + transaction::ObXATransState::PREPARED); + } + } + } + break; + } default: /* won't be here */ ret = OB_NOT_IMPLEMENT; @@ -3783,6 +3820,16 @@ DEFINE_SHOW_CLAUSE_SET(SHOW_OLAP_ASYNC_JOB_STATUS, SELECT T.job_name AS 'job_id', T.cowner AS 'schema_name', CASE WHEN T.state IS NULL THEN 'SUBMITTED' WHEN T.state = 'SCHEDULED' THEN 'RUNNING' WHEN T.state = 'KILLED' THEN 'CANCELLED' ELSE T.state END as 'status', NULL as fail_msg, T.start_date as 'create_time', T.gmt_modified as 'update_time', T.job_action as 'definition' FROM %s.%s T WHERE T.JOB_CLASS = 'OLAP_ASYNC_JOB_CLASS' AND T.TENANT_ID = %d AND T.JOB > 0 %s %s) %s", NULL, NULL); +DEFINE_SHOW_CLAUSE_SET(XA_RECOVER, + NULL, + "SELECT format_id as formatID, length(gtrid) as gtrid_length, length(bqual) as bqual_length, concat(gtrid,bqual) as data from %s.%s where state = %ld", + NULL, + NULL); +DEFINE_SHOW_CLAUSE_SET(XA_RECOVER_CONVERT_XID, + NULL, + "SELECT format_id as formatID, length(gtrid) as gtrid_length, length(bqual) as bqual_length, concat('0x',hex(concat(gtrid,bqual))) as data from %s.%s where state = %ld", + NULL, + NULL); DEFINE_SHOW_CLAUSE_SET(SHOW_CREATE_USER, NULL, "SELECT \"%.*s\" AS `CREATE USER for %.*s@%.*s` FROM DUAL", diff --git a/src/sql/resolver/cmd/ob_show_resolver.h b/src/sql/resolver/cmd/ob_show_resolver.h index d6b8b01fa..ae561092f 100644 --- a/src/sql/resolver/cmd/ob_show_resolver.h +++ b/src/sql/resolver/cmd/ob_show_resolver.h @@ -207,6 +207,8 @@ struct ObShowResolver::ObShowSqlSet DECLARE_SHOW_CLAUSE_SET(SHOW_TRIGGERS_LIKE); DECLARE_SHOW_CLAUSE_SET(SHOW_SEQUENCES); DECLARE_SHOW_CLAUSE_SET(SHOW_SEQUENCES_LIKE); + DECLARE_SHOW_CLAUSE_SET(XA_RECOVER); + DECLARE_SHOW_CLAUSE_SET(XA_RECOVER_CONVERT_XID); DECLARE_SHOW_CLAUSE_SET(SHOW_ENGINE); DECLARE_SHOW_CLAUSE_SET(SHOW_OPEN_TABLES); DECLARE_SHOW_CLAUSE_SET(SHOW_OLAP_ASYNC_JOB_STATUS); diff --git a/src/sql/resolver/ob_resolver.cpp b/src/sql/resolver/ob_resolver.cpp index 7dda269c7..dadf2f384 100644 --- a/src/sql/resolver/ob_resolver.cpp +++ b/src/sql/resolver/ob_resolver.cpp @@ -772,6 +772,7 @@ int ObResolver::resolve(IsPrepared if_prepared, const ParseNode &parse_tree, ObS case T_SHOW_OPEN_TABLES: case T_SHOW_SEQUENCES: case T_SHOW_OLAP_ASYNC_JOB_STATUS: + case T_XA_RECOVER: case T_SHOW_CHECK_TABLE: case T_SHOW_CREATE_USER: { REGISTER_STMT_RESOLVER(Show); diff --git a/src/sql/resolver/ob_resolver_utils.cpp b/src/sql/resolver/ob_resolver_utils.cpp index f9abf5fb2..9c121e7c5 100644 --- a/src/sql/resolver/ob_resolver_utils.cpp +++ b/src/sql/resolver/ob_resolver_utils.cpp @@ -7888,19 +7888,137 @@ int ObResolverUtils::resolve_string(const ParseNode *node, ObString &string) int ret = OB_SUCCESS; if (OB_UNLIKELY(NULL == node)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("node should not be null"); + LOG_WARN("node should not be null", K(ret)); } else if (OB_UNLIKELY(T_VARCHAR != node->type_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("node type is not T_VARCHAR", "type", get_type_name(node->type_)); + LOG_WARN("node type is not T_VARCHAR", "type", get_type_name(node->type_), K(ret)); } else if (OB_UNLIKELY(node->str_len_ < 0)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("empty string"); + LOG_WARN("empty string", K(ret)); } else { string = ObString(node->str_len_, node->str_value_); } return ret; } +int ObResolverUtils::resolve_xid(const ParseNode *node, common::ObString >rid_string, common::ObString &bqual_string, int64_t & format_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(NULL == node)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("node should not be null", K(ret)); + } else if (OB_UNLIKELY(T_LINK_NODE != node->type_ || 1 > node->num_child_ || 3 < node->num_child_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected type or unexpected child num when reslve xid", K(node->type_), K(node->num_child_), K(ret)); + } else if(1 <= node->num_child_ && OB_FAIL(ObResolverUtils::resolve_text(node->children_[0], gtrid_string))) { + LOG_WARN("resolve gtrid string fail", K(node->children_[0]), K(gtrid_string), K(ret)); + } else if(2 <= node->num_child_ && OB_FAIL(ObResolverUtils::resolve_text(node->children_[1], bqual_string))) { + LOG_WARN("resolve bqual string fail", K(node->children_[1]), K(bqual_string), K(ret)); + } else if(3 == node->num_child_ && OB_FAIL(ObResolverUtils::resolve_ulong(node->children_[2], format_id))) { + LOG_WARN("resolve format id fail", K(node->children_[2]), K(format_id), K(ret)); + } else { + // for mysql mode + // if format id is not specified, set format id to 1 by default + if (lib::is_mysql_mode() && 3 > node->num_child_) { + format_id = 1; + } + } + return ret; +} + +int ObResolverUtils::resolve_text(const ParseNode *node, ObString &string) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(NULL == node)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("node should not be null", K(ret)); + } else if (OB_UNLIKELY(T_VARCHAR != node->type_ && T_HEX_STRING != node->type_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("node type is not T_VARCHAR/T_HEX_STRING", "type", get_type_name(node->type_), K(ret)); + } else if (OB_UNLIKELY(node->str_len_ < 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("empty string", K(ret)); + } else { + string = ObString(node->str_len_, node->str_value_); + } + return ret; +} + +int ObResolverUtils::resolve_ulong(const ParseNode *node, int64_t & format_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(NULL == node)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("node should not be null", K(ret)); + } else if (OB_UNLIKELY(T_INT != node->type_ && T_HEX_STRING != node->type_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("node type is not T_INT/T_HEX_STRING", "type", get_type_name(node->type_), K(ret)); + } else { + format_id = node->value_; + } + return ret; +} + +int ObResolverUtils::resolve_opt_join_or_resume(const ParseNode *node, int64_t & flag) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(NULL == node)) { + // do nothing + } else if (OB_UNLIKELY(T_INT != node->type_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected node type", K(node->type_), K(ret)); + } else { + if(node->value_ != 0 && node->value_ != 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected val", K(node->value_), K(ret)); + } else { + ret = OB_TRANS_XA_INVAL; + LOG_WARN("not support start arguments", K(node->value_), K(ret)); + } + } + return ret; +} + +int ObResolverUtils::resolve_opt_suspend(const ParseNode *node, int64_t & flag) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(NULL == node)) { + // do nothing + } else if (OB_UNLIKELY(T_INT != node->type_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected node type", K(node->type_), K(ret)); + } else { + if(node->value_ != 0 && node->value_ != 1) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected val", K(node->value_), K(ret)); + } else { + // 目前这里进行里检查,可以不管直接赋值然后给执行时报错 + ret = OB_TRANS_XA_INVAL; + LOG_WARN("not support start arguments", K(node->value_), K(ret)); + } + } + return ret; +} + +int ObResolverUtils::resolve_opt_one_phase(const ParseNode *node, int64_t & flag) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(NULL == node)) { + // do nothing + } else if (OB_UNLIKELY(T_INT != node->type_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected node type", K(node->type_), K(ret)); + } else { + if(node->value_ != 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected val", K(node->value_), K(ret)); + } else { + flag = transaction::ObXAFlag::OBTMONEPHASE; + } + } + return ret; +} + // judge whether pdml stmt contain udf can parallel execute or not has two stage: // stage1:check has dml write stmt or read/write package var info in this funciton; // stage2:record udf has select stmt info, and when optimize this stmt, diff --git a/src/sql/resolver/ob_resolver_utils.h b/src/sql/resolver/ob_resolver_utils.h index 2bae5bd00..0372869ce 100644 --- a/src/sql/resolver/ob_resolver_utils.h +++ b/src/sql/resolver/ob_resolver_utils.h @@ -672,7 +672,12 @@ public: ParseNode *&func_udf); static int set_direction_by_mode(const ParseNode &sort_node, OrderItem &order_item); static int resolve_string(const ParseNode *node, common::ObString &string); - + static int resolve_xid(const ParseNode *node, common::ObString >rid_string, common::ObString &bqual_string, int64_t & format_id); + static int resolve_text(const ParseNode *node, common::ObString &string); + static int resolve_ulong(const ParseNode *node, int64_t & format_id); + static int resolve_opt_join_or_resume(const ParseNode *node, int64_t & flag); + static int resolve_opt_suspend(const ParseNode *node, int64_t & flag); + static int resolve_opt_one_phase(const ParseNode *node, int64_t & flag); // check some kind of the non-updatable view, which is forbidden for all dml statement: // mysql: // aggregate diff --git a/src/sql/resolver/xa/ob_xa_commit_resolver.cpp b/src/sql/resolver/xa/ob_xa_commit_resolver.cpp index d69902bda..6cd05c57c 100644 --- a/src/sql/resolver/xa/ob_xa_commit_resolver.cpp +++ b/src/sql/resolver/xa/ob_xa_commit_resolver.cpp @@ -35,19 +35,33 @@ int ObXaCommitResolver::resolve(const ParseNode &parse_node) { int ret = OB_SUCCESS; ObXaCommitStmt *xa_commit_stmt = NULL; - if (OB_UNLIKELY(T_XA_COMMIT != parse_node.type_ || 1 != parse_node.num_child_)) { + if (OB_UNLIKELY(T_XA_COMMIT != parse_node.type_ || 2 != parse_node.num_child_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_)); + LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_), K(ret)); } else if (OB_UNLIKELY(NULL == (xa_commit_stmt = create_stmt()))) { ret = OB_SQL_RESOLVER_NO_MEMORY; LOG_WARN("failed to create xa end stmt", K(ret)); } else { - ObString xa_string; - if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) { - LOG_WARN("resolve string failed", K(ret)); + ObString gtrid_string; + ObString bqual_string; + int64_t format_id = -1; + int64_t flag = 0; + if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) { + LOG_WARN("resolve xid failed", K(ret)); + } else if (OB_FAIL(ObResolverUtils::resolve_opt_one_phase(parse_node.children_[1], flag))) { + LOG_WARN("resolve xa commit one phase failed", K(ret)); } else { - xa_commit_stmt->set_xa_string(xa_string); - LOG_DEBUG("xa commit resolver", K(xa_string)); + if(gtrid_string.length() <= 0) { + ret = OB_TRANS_XA_INVAL; + LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret)); + } else { + xa_commit_stmt->set_xa_string(gtrid_string, bqual_string); + if(format_id >= 0) { + xa_commit_stmt->set_format_id(format_id); + } + xa_commit_stmt->set_flags(flag); + LOG_DEBUG("xa commit resolver", K(gtrid_string), K(bqual_string), K(format_id)); + } } } return ret; diff --git a/src/sql/resolver/xa/ob_xa_end_resolver.cpp b/src/sql/resolver/xa/ob_xa_end_resolver.cpp index 9405fa162..99e452f91 100644 --- a/src/sql/resolver/xa/ob_xa_end_resolver.cpp +++ b/src/sql/resolver/xa/ob_xa_end_resolver.cpp @@ -34,19 +34,32 @@ int ObXaEndResolver::resolve(const ParseNode &parse_node) { int ret = OB_SUCCESS; ObXaEndStmt *xa_end_stmt = NULL; - if (OB_UNLIKELY(T_XA_END != parse_node.type_ || 1 != parse_node.num_child_)) { + if (OB_UNLIKELY(T_XA_END != parse_node.type_ || 2 != parse_node.num_child_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_)); + LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_), K(ret)); } else if (OB_UNLIKELY(NULL == (xa_end_stmt = create_stmt()))) { ret = OB_SQL_RESOLVER_NO_MEMORY; LOG_WARN("failed to create xa end stmt", K(ret)); } else { - ObString xa_string; - if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) { - LOG_WARN("resolve string failed", K(ret)); + ObString gtrid_string; + ObString bqual_string; + int64_t format_id = -1; + int64_t flag = 0; + if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) { + LOG_WARN("resolve xid failed in xa end", K(ret)); + } else if (OB_FAIL(ObResolverUtils::resolve_opt_suspend(parse_node.children_[1], flag))) { + LOG_WARN("resolve xa end suspend failed", K(ret)); } else { - xa_end_stmt->set_xa_string(xa_string); - LOG_DEBUG("xa end resolver", K(xa_string)); + if(gtrid_string.length() <= 0) { + ret = OB_TRANS_XA_INVAL; + LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret)); + } else { + xa_end_stmt->set_xa_string(gtrid_string, bqual_string); + if(format_id >= 0) { + xa_end_stmt->set_format_id(format_id); + } + LOG_DEBUG("xa end resolver", K(gtrid_string), K(bqual_string), K(format_id), K(flag)); + } } } return ret; diff --git a/src/sql/resolver/xa/ob_xa_prepare_resolver.cpp b/src/sql/resolver/xa/ob_xa_prepare_resolver.cpp index bddc1c3f0..364050241 100644 --- a/src/sql/resolver/xa/ob_xa_prepare_resolver.cpp +++ b/src/sql/resolver/xa/ob_xa_prepare_resolver.cpp @@ -36,17 +36,27 @@ int ObXaPrepareResolver::resolve(const ParseNode &parse_node) ObXaPrepareStmt *xa_prepare_stmt = NULL; if (OB_UNLIKELY(T_XA_PREPARE != parse_node.type_ || 1 != parse_node.num_child_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_)); + LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_), K(ret)); } else if (OB_UNLIKELY(NULL == (xa_prepare_stmt = create_stmt()))) { ret = OB_SQL_RESOLVER_NO_MEMORY; LOG_WARN("failed to create xa end stmt", K(ret)); } else { - ObString xa_string; - if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) { - LOG_WARN("resolve string failed", K(ret)); + ObString gtrid_string; + ObString bqual_string; + int64_t format_id = -1; + if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) { + LOG_WARN("resolve xid failed", K(ret)); } else { - xa_prepare_stmt->set_xa_string(xa_string); - LOG_DEBUG("xa prepare resolver", K(xa_string)); + if(gtrid_string.length() <= 0) { + ret = OB_TRANS_XA_INVAL; + LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret)); + } else { + xa_prepare_stmt->set_xa_string(gtrid_string, bqual_string); + if(format_id >= 0) { + xa_prepare_stmt->set_format_id(format_id); + } + } + LOG_DEBUG("xa prepare resolver", K(gtrid_string), K(bqual_string), K(format_id)); } } return ret; diff --git a/src/sql/resolver/xa/ob_xa_rollback_resolver.cpp b/src/sql/resolver/xa/ob_xa_rollback_resolver.cpp index d5a7b071b..dd5293793 100644 --- a/src/sql/resolver/xa/ob_xa_rollback_resolver.cpp +++ b/src/sql/resolver/xa/ob_xa_rollback_resolver.cpp @@ -36,17 +36,27 @@ int ObXaRollBackResolver::resolve(const ParseNode &parse_node) ObXaRollBackStmt *xa_rollback_stmt = NULL; if (OB_UNLIKELY(T_XA_ROLLBACK != parse_node.type_ || 1 != parse_node.num_child_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_)); + LOG_WARN("unexpected parse node", K(parse_node.type_), K(parse_node.num_child_), K(ret)); } else if (OB_UNLIKELY(NULL == (xa_rollback_stmt = create_stmt()))) { ret = OB_SQL_RESOLVER_NO_MEMORY; LOG_WARN("failed to create xa end stmt", K(ret)); } else { - ObString xa_string; - if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) { - LOG_WARN("resolve string failed", K(ret)); + ObString gtrid_string; + ObString bqual_string; + int64_t format_id = -1; + if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) { + LOG_WARN("resolve xid failed", K(ret)); } else { - xa_rollback_stmt->set_xa_string(xa_string); - LOG_DEBUG("xa rollback resolver", K(xa_string)); + if(gtrid_string.length() <= 0) { + ret = OB_TRANS_XA_INVAL; + LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret)); + } else { + xa_rollback_stmt->set_xa_string(gtrid_string, bqual_string); + if(format_id >= 0) { + xa_rollback_stmt->set_format_id(format_id); + } + } + LOG_DEBUG("xa rollback resolver",K(gtrid_string), K(bqual_string), K(format_id)); } } return ret; diff --git a/src/sql/resolver/xa/ob_xa_start_resolver.cpp b/src/sql/resolver/xa/ob_xa_start_resolver.cpp index 19bc0e97f..d0fb8dc3a 100644 --- a/src/sql/resolver/xa/ob_xa_start_resolver.cpp +++ b/src/sql/resolver/xa/ob_xa_start_resolver.cpp @@ -35,19 +35,33 @@ int ObXaStartResolver::resolve(const ParseNode &parse_node) { int ret = OB_SUCCESS; ObXaStartStmt *xa_start_stmt = NULL; - if (OB_UNLIKELY(T_XA_START != parse_node.type_ || 1 !=parse_node.num_child_)) { + if (OB_UNLIKELY(T_XA_START != parse_node.type_ || 2 != parse_node.num_child_)) { ret = OB_ERR_UNEXPECTED; - LOG_ERROR("unexpected val", K(parse_node.type_), K(parse_node.num_child_), K(ret)); + LOG_WARN("unexpected val", K(parse_node.type_), K(parse_node.num_child_), K(ret)); } else if (OB_UNLIKELY(NULL == (xa_start_stmt = create_stmt()))) { ret = OB_SQL_RESOLVER_NO_MEMORY; LOG_WARN("failed to create xa start stmt", K(ret)); } else { - ObString xa_string; - if (OB_FAIL(ObResolverUtils::resolve_string(parse_node.children_[0], xa_string))) { - LOG_WARN("resolve string failed", K(ret)); + ObString gtrid_string; + ObString bqual_string; + int64_t format_id = -1; + int64_t flag = 0; + if (OB_FAIL(ObResolverUtils::resolve_xid(parse_node.children_[0], gtrid_string, bqual_string, format_id))) { + LOG_WARN("resolve xid failed in xa start", K(ret)); + } else if (OB_FAIL(ObResolverUtils::resolve_opt_join_or_resume(parse_node.children_[1], flag))) { + LOG_WARN("resolve xa start join or resume failed", K(ret)); } else { - xa_start_stmt->set_xa_string(xa_string); - LOG_DEBUG("xa start resolver", K(xa_string)); + if(gtrid_string.length() <= 0) { + ret = OB_TRANS_XA_INVAL; + LOG_WARN("resolve xid failed, gtrid string can not be NULL", K(ret)); + } else { + xa_start_stmt->set_xa_string(gtrid_string, bqual_string); + if(format_id >= 0) { + xa_start_stmt->set_format_id(format_id); + } + xa_start_stmt->set_flags(flag); + LOG_DEBUG("xa start resolver", K(gtrid_string), K(bqual_string), K(format_id), K(flag)); + } } } diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index ed67a68c4..e297e1e57 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -485,6 +485,8 @@ ob_set_subtarget(ob_storage tx tx/ob_tx_free_route_state.cpp tx/ob_tx_free_route_rpc.cpp tx/ob_tx_free_route_msg.cpp + tx/ob_mysql_xa_service.cpp + tx/ob_xa_inner_sql_client.cpp ) ob_set_subtarget(ob_storage tx_storage diff --git a/src/storage/tx/ob_mysql_xa_service.cpp b/src/storage/tx/ob_mysql_xa_service.cpp new file mode 100644 index 000000000..26d4bb926 --- /dev/null +++ b/src/storage/tx/ob_mysql_xa_service.cpp @@ -0,0 +1,642 @@ +// Copyright (c) 2021 OceanBase +// OceanBase is licensed under Mulan PubL v2. +// You can use this software according to the terms and conditions of the Mulan PubL v2. +// You may obtain a copy of Mulan PubL v2 at: +// http://license.coscl.org.cn/MulanPubL-2.0 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PubL v2 for more details. +// +#include "storage/tx/ob_xa_service.h" +#include "storage/tx/ob_xa_ctx.h" +#include "storage/tx/ob_trans_service.h" +#include "lib/mysqlclient/ob_isql_client.h" +#include "lib/mysqlclient/ob_mysql_proxy.h" // ObMySQLProxy +#include "lib/mysqlclient/ob_mysql_result.h" +#include "share/inner_table/ob_inner_table_schema_constants.h" +#include "share/rc/ob_tenant_base.h" +#include "share/rc/ob_tenant_module_init_ctx.h" +#include "observer/ob_srv_network_frame.h" +#include "storage/tx/ob_xa_inner_sql_client.h" + +namespace oceanbase +{ + +using namespace share; +using namespace common; +using namespace common::sqlclient; + +namespace transaction +{ +int ObXAService::xa_start_for_mysql(const ObXATransID &xid, + const int64_t flags, + const uint32_t session_id, + const ObTxParam &tx_param, + ObTxDesc *&tx_desc) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(xid.empty()) || + OB_UNLIKELY(!xid.is_valid()) || + OB_UNLIKELY(0 > xid.get_format_id())) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid)); + } else if (ObXAFlag::OBTMNOFLAGS != flags) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid flags for mysql xa start", K(ret), K(xid)); + } else if (OB_FAIL(xa_start_for_mysql_(xid, session_id, tx_param, tx_desc))) { + TRANS_LOG(WARN, "mysql xa start failed", K(ret), K(xid)); + } else { + // do nothing + } + // set xa_start_addr for txn-free-route + if (OB_SUCC(ret) && OB_NOT_NULL(tx_desc)) { + tx_desc->set_xa_start_addr(GCONF.self_addr_); + } + TRANS_LOG(INFO, "mysql xa start", K(ret), K(xid)); + return ret; +} + +int ObXAService::xa_start_for_mysql_(const ObXATransID &xid, + const uint32_t session_id, + const ObTxParam &tx_param, + ObTxDesc *&tx_desc) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + const uint64_t tenant_id = MTL_ID(); + ObAddr sche_addr = GCTX.self_addr(); + bool alloc = true; + ObXACtx *xa_ctx = NULL; + if (tx_desc != NULL) { + MTL(ObTransService *)->release_tx(*tx_desc); + tx_desc = NULL; + } + if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id))) { + TRANS_LOG(WARN, "acquire trans failed", K(ret), K(tx_param)); + } else if (OB_FAIL(MTL(ObTransService *)->start_tx(*tx_desc, tx_param))) { + TRANS_LOG(WARN, "start trans failed", K(ret), KPC(tx_desc)); + MTL(ObTransService *)->release_tx(*tx_desc); + tx_desc = NULL; + } else { + const bool is_tightly_coupled = false; + const ObTransID trans_id = tx_desc->tid(); + if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(trans_id, alloc, xa_ctx))) { + TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(xid)); + } else if (!alloc) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected error", K(ret), K(xid)); + } else if (OB_FAIL(xa_ctx->init(xid, + trans_id, + tenant_id, + GCTX.self_addr(), + is_tightly_coupled, + this/*xa service*/, + &xa_ctx_mgr_, + &xa_rpc_, + &timer_))) { + TRANS_LOG(WARN, "xa ctx init failed", K(ret), K(xid), K(trans_id)); + } else if (OB_FAIL(xa_ctx->xa_start_for_mysql(xid, tx_desc))) { + TRANS_LOG(WARN, "xa ctx start failed", K(ret), K(xid), K(trans_id)); + } + if (OB_FAIL(ret)) { + if (OB_NOT_NULL(xa_ctx)) { + xa_ctx_mgr_.erase_xa_ctx(trans_id); + xa_ctx_mgr_.revert_xa_ctx(xa_ctx); + } + // since tx_desc is not set into xa ctx, release tx desc explicitly + if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc, + ObTxAbortCause::IMPLICIT_ROLLBACK))) { + TRANS_LOG(WARN, "abort transaction failed", K(tmp_ret), K(trans_id), K(xid)); + } + MTL(ObTransService *)->release_tx(*tx_desc); + tx_desc = NULL; + } else { + // for statistics + XA_STAT_ADD_XA_TRANS_START_COUNT(); + } + } + return ret; +} + +int ObXAService::xa_end_for_mysql(const ObXATransID &xid, + ObTxDesc *&tx_desc) +{ + int ret = OB_SUCCESS; + ObXACtx *xa_ctx = NULL; + if (OB_UNLIKELY(xid.empty()) || + OB_UNLIKELY(!xid.is_valid()) || + OB_UNLIKELY(0 > xid.get_format_id()) || + OB_UNLIKELY(NULL == tx_desc)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), KP(tx_desc)); + } else if (!tx_desc->is_xa_trans()) { + ret = OB_TRANS_XA_PROTO; + TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid)); + } else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid)); + } else if (!xid.all_equal_to(tx_desc->get_xid())) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "unknown xid", K(ret), K(xid)); + } else if (OB_FAIL(xa_ctx->xa_end_for_mysql(xid, tx_desc))) { + TRANS_LOG(WARN, "xa end failed", K(ret), K(xid)); + } else { + // do nothing + } + TRANS_LOG(INFO, "mysql xa end", K(ret), K(xid)); + return ret; +} + +int ObXAService::xa_prepare_for_mysql(const ObXATransID &xid, + const int64_t timeout_us, + ObTxDesc *&tx_desc, + bool &need_exit) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + bool is_read_only = false; + ObXACtx *xa_ctx = NULL; + share::ObLSID coord_id; + need_exit = false; + ObTransID tx_id; + // 1. check basic first, if fail, do not exit + if (OB_UNLIKELY(xid.empty()) || + OB_UNLIKELY(!xid.is_valid()) || + OB_UNLIKELY(0 > xid.get_format_id()) || + OB_UNLIKELY(NULL == tx_desc) || + OB_UNLIKELY(0 >= timeout_us)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), K(timeout_us), KP(tx_desc)); + } else if (!tx_desc->is_xa_trans()) { + ret = OB_TRANS_XA_PROTO; + TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid)); + } else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid)); + } else { + tx_id = tx_desc->tid(); + } + // 2. pre xa prepare, persist xa record + if (OB_FAIL(ret)) { + } else if (OB_FAIL(xa_ctx->pre_xa_prepare_for_mysql(xid, tx_desc, need_exit, + is_read_only, coord_id))) { + TRANS_LOG(WARN, "pre xa prepare for mysql failed", K(ret), K(xid)); + } else if (OB_FAIL(MTL(ObXAService*)->insert_record_for_mysql(MTL_ID(), xid, tx_id, + coord_id, GCTX.self_addr(), is_read_only))) { + // if persist failed, abort this trans + TRANS_LOG(WARN, "insert xa record failed", K(ret), K(xid), K(coord_id)); + need_exit = true; + if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) { + // if duplicate, return DUPID and abort this trans + ret = OB_TRANS_XA_DUPID; + } + } else { + // do nothing + } + // 3. send sub prepare to coordinator + if (is_read_only) { + // read-only trans + // since ctx has exited, do nothing + } else if (OB_FAIL(ret)) { + // if fail and need exit (include dupid), abort the trans + if (need_exit) { + if (OB_SUCCESS != (tmp_ret = xa_ctx->handle_abort_for_mysql( + ObTxAbortCause::IMPLICIT_ROLLBACK))) { + TRANS_LOG(WARN, "handle abort failed", K(tmp_ret), K(xid)); + } + } + } else { + need_exit = true; + if (OB_FAIL(xa_ctx->xa_prepare_for_mysql(xid, timeout_us))) { + TRANS_LOG(WARN, "xa prepare for mysql failed", K(ret), K(xid), K(timeout_us)); + } else if (OB_FAIL(xa_ctx->wait_xa_prepare_for_mysql(xid, timeout_us))) { + TRANS_LOG(WARN, "wait xa prepare for mysql failed", K(ret), K(xid), K(timeout_us)); + } + } + if (NULL != xa_ctx && need_exit) { + xa_ctx_mgr_.revert_xa_ctx(xa_ctx); + tx_desc = NULL; + xa_ctx = NULL; + } + TRANS_LOG(INFO, "mysql xa prepare", K(ret), K(xid), K(need_exit)); + return ret; +} + +int ObXAService::xa_commit_onephase_for_mysql(const ObXATransID &xid, + const int64_t timeout_us, + ObTxDesc *&tx_desc, + bool &need_exit) +{ + int ret = OB_SUCCESS; + bool is_read_only = false; + ObXACtx *xa_ctx = NULL; + need_exit = false; + if (OB_UNLIKELY(xid.empty()) || + OB_UNLIKELY(!xid.is_valid()) || + OB_UNLIKELY(0 > xid.get_format_id()) || + OB_UNLIKELY(NULL == tx_desc) || + OB_UNLIKELY(0 >= timeout_us)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), K(timeout_us), KP(tx_desc)); + } else if (!tx_desc->is_xa_trans()) { + ret = OB_TRANS_XA_PROTO; + TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid)); + } else if (!xid.all_equal_to(tx_desc->get_xid())) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "unknown xid", K(ret), K(xid)); + } else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid)); + } else { + const bool is_rollback = false; + ObTransID tx_id = tx_desc->tid(); + if (OB_FAIL(xa_ctx->one_phase_end_trans_for_mysql(xid, is_rollback, timeout_us, + tx_desc, need_exit))) { + TRANS_LOG(WARN, "one phase xa commit failed", K(ret), K(xid)); + } else { + need_exit = true; + if (OB_FAIL(xa_ctx->wait_one_phase_end_trans_for_mysql(is_rollback, timeout_us))) { + TRANS_LOG(WARN, "fail to wait one phase xa end trans", K(ret), K(xid)); + } else { + // do nothing + } + } + if (need_exit) { + xa_ctx_mgr_.revert_xa_ctx(xa_ctx); + xa_ctx = NULL; + tx_desc = NULL; + } + } + // for statistics + XA_STAT_ADD_XA_ONE_PHASE_COMMIT_TOTAL_COUNT(); + TRANS_LOG(INFO, "mysql one phase xa commit", K(ret), K(xid)); + return ret; +} + +int ObXAService::xa_second_phase_twophase_for_mysql(const ObXATransID &xid, + const int64_t timeout_us, + const bool is_rollback, + ObTransID &tx_id) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + const int64_t start_ts = ObTimeUtility::current_time(); + const uint64_t tenant_id = MTL_ID(); + ObXACtx *xa_ctx = NULL; + bool alloc = true; + bool is_read_only = false; + share::ObLSID coord_id; + ObXAInnerSQLClient inner_sql_client; + if (OB_UNLIKELY(xid.empty()) || + OB_UNLIKELY(!xid.is_valid()) || + OB_UNLIKELY(0 > xid.get_format_id()) || + OB_UNLIKELY(0 >= timeout_us)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), K(timeout_us)); + } else if (OB_FAIL(inner_sql_client.start(MTL(ObTransService *)->get_mysql_proxy()))) { + TRANS_LOG(WARN, "xa inner sql client start failed", K(ret), K(xid)); + } else if (OB_FAIL(inner_sql_client.query_xa_coord_for_mysql(xid, coord_id, tx_id, + is_read_only))) { + if (OB_ITER_END == ret) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "unkonwn xid", K(tmp_ret), K(xid)); + } else { + TRANS_LOG(WARN, "query xa record for mysql failed", K(ret), K(xid), K(coord_id), + K(tx_id), K(is_read_only)); + } + } else if (is_read_only) { + // read only trans + if (OB_SUCCESS != (tmp_ret = inner_sql_client.delete_xa_branch_for_mysql(xid))) { + TRANS_LOG(WARN, "delete xa branch failed", K(tmp_ret), K(xid), K(tenant_id), K(tx_id)); + } else { + // do nothing + } + } else { + if (OB_UNLIKELY(!coord_id.is_valid()) || + OB_UNLIKELY(!tx_id.is_valid())) { + ret = OB_TRANS_XA_PROTO; + TRANS_LOG(WARN, "invalid record for mysql two phase xa commit", K(ret), K(xid), + K(coord_id), K(tx_id)); + } else if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(tx_id, alloc, xa_ctx))) { + TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(tx_id), K(xid), K(alloc), KP(xa_ctx)); + } else if (OB_ISNULL(xa_ctx)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "xa ctx is null", K(ret), K(xid), K(tx_id)); + } else if (false == alloc) { + ret = OB_TRANS_STMT_NEED_RETRY; + TRANS_LOG(WARN, "xa ctx already exist", K(ret), K(xid), K(tx_id)); + } else { + const int64_t request_id = start_ts; + const bool is_tightly_coupled = false; + if (OB_FAIL(xa_ctx->init(xid, + tx_id, + tenant_id, + GCTX.self_addr(), + is_tightly_coupled, + this, + &xa_ctx_mgr_, + &xa_rpc_, + &timer_))) { + TRANS_LOG(WARN, "xa ctx init failed", K(ret), K(xid), K(tx_id)); + } else if (OB_FAIL(xa_ctx->two_phase_end_trans(xid, coord_id, is_rollback, + timeout_us, request_id))) { + TRANS_LOG(WARN, "xa commit failed", K(ret), K(xid), K(tx_id)); + } else if (OB_FAIL(xa_ctx->wait_two_phase_end_trans(xid, is_rollback, timeout_us))) { + TRANS_LOG(WARN, "wait xa commit failed", K(ret), K(xid), K(tx_id)); + } else if (OB_SUCCESS != (tmp_ret = inner_sql_client.delete_xa_branch_for_mysql(xid))) { + TRANS_LOG(WARN, "delete xa branch failed", K(tmp_ret), K(xid), K(tx_id)); + } else { + // do nothing + } + xa_ctx_mgr_.revert_xa_ctx(xa_ctx); + } + } + if (inner_sql_client.has_started()) { + (void)inner_sql_client.end(); + } + if (is_rollback) { + TRANS_LOG(INFO, "mysql two phase xa rollback", K(ret), K(xid)); + } else { + TRANS_LOG(INFO, "mysql two phase xa commit", K(ret), K(xid)); + } + return ret; +} + +int ObXAService::xa_rollback_onephase_for_mysql(const ObXATransID &xid, + const int64_t timeout_us, + ObTxDesc *&tx_desc, + bool &need_exit) +{ + int ret = OB_SUCCESS; + bool is_read_only = false; + ObXACtx *xa_ctx = NULL; + need_exit = false; + if (OB_UNLIKELY(xid.empty()) || + OB_UNLIKELY(!xid.is_valid()) || + OB_UNLIKELY(0 > xid.get_format_id()) || + OB_UNLIKELY(NULL == tx_desc) || + OB_UNLIKELY(0 >= timeout_us)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid arguments", K(ret), K(xid), K(timeout_us), KP(tx_desc)); + } else if (!tx_desc->is_xa_trans()) { + ret = OB_TRANS_XA_PROTO; + TRANS_LOG(WARN, "Routine invoked in an improper context", K(ret), K(xid)); + } else if (!xid.all_equal_to(tx_desc->get_xid())) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "unknown xid", K(ret), K(xid)); + } else if (NULL == (xa_ctx = tx_desc->get_xa_ctx())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "transaction context is null", K(ret), K(xid)); + } else { + const bool is_rollback = true; + ObTransID tx_id = tx_desc->tid(); + if (OB_FAIL(xa_ctx->one_phase_end_trans_for_mysql(xid, is_rollback, timeout_us, + tx_desc, need_exit))) { + TRANS_LOG(WARN, "one phase xa commit failed", K(ret), K(xid)); + } else { + need_exit = true; + if (OB_FAIL(xa_ctx->wait_one_phase_end_trans_for_mysql(is_rollback, timeout_us))) { + TRANS_LOG(WARN, "fail to wait one phase xa end trans", K(ret), K(xid)); + } else { + // do nothing + } + } + if (need_exit) { + // if exit for xa rollback, return success + ret = OB_SUCCESS; + xa_ctx_mgr_.revert_xa_ctx(xa_ctx); + xa_ctx = NULL; + tx_desc = NULL; + } + } + TRANS_LOG(INFO, "mysql one phase xa rollback", K(ret), K(xid)); + return ret; +} + +#define INSERT_MYSQLXA_SQL "\ + insert into %s (tenant_id, gtrid, bqual, format_id, \ + trans_id, coordinator, scheduler_ip, scheduler_port, state, flag, is_readonly) \ + values (%lu, x'%.*s', x'%.*s', %ld, %ld, %ld, '%s', %d, %d, %ld, %d)" + +int ObXAService::insert_record_for_mysql(const uint64_t tenant_id, + const ObXATransID &xid, + const ObTransID &trans_id, + const share::ObLSID &coordinator, + const ObAddr &sche_addr, + const bool is_readonly) +{ + int ret = OB_SUCCESS; + ObMySQLProxy *mysql_proxy = MTL(ObTransService *)->get_mysql_proxy(); + ObSqlString sql; + int64_t affected_rows = 0; + char scheduler_ip_buf[128] = {0}; + sche_addr.ip_to_string(scheduler_ip_buf, 128); + char gtrid_str[128] = {0}; + int64_t gtrid_len = 0; + char bqual_str[128] = {0}; + int64_t bqual_len = 0; + const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); + int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); + + if (!is_valid_tenant_id(tenant_id) || xid.empty() || !trans_id.is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id), K(xid), K(trans_id)); + } else if (!is_readonly && !coordinator.is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(is_readonly), K(coordinator)); + } else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(), + xid.get_gtrid_str().length(), + gtrid_str, 128, gtrid_len))) { + TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(tenant_id), K(xid)); + } else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(), + xid.get_bqual_str().length(), + bqual_str, 128, bqual_len))) { + TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(tenant_id), K(xid)); + } else if (OB_FAIL(sql.assign_fmt(INSERT_MYSQLXA_SQL, + OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME, + tenant_id, + (int)gtrid_len, gtrid_str, + (int)bqual_len, bqual_str, + xid.get_format_id(), + trans_id.get_id(), + coordinator.id(), + scheduler_ip_buf, sche_addr.get_port(), + ObXATransState::PREPARED, (long)0, + is_readonly))) { + TRANS_LOG(WARN, "generate insert xa trans sql fail", K(ret), K(sql)); + } else if (OB_FAIL(mysql_proxy->write(exec_tenant_id, sql.ptr(), affected_rows))) { + TRANS_LOG(WARN, "execute insert record sql failed", KR(ret), K(exec_tenant_id), K(tenant_id)); + } else { + TRANS_LOG(INFO, "execute insert record sql success", K(exec_tenant_id), K(tenant_id), + K(sql), K(affected_rows)); + } + THIS_WORKER.set_timeout_ts(original_timeout_us); + return ret; +} + +int ObXAService::handle_terminate_for_mysql(const ObXATransID &xid, ObTxDesc *tx_desc) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + if (NULL == tx_desc || !tx_desc->is_valid() || !xid.is_valid() || 0 > xid.get_format_id()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), KP(tx_desc), K(xid)); + } else if (!xid.all_equal_to(tx_desc->get_xid())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected trans descriptor", K(ret), K(xid)); + } else { + ObXACtx *xa_ctx = tx_desc->get_xa_ctx(); + ObTransID tx_id = tx_desc->tid(); + TRANS_LOG(INFO, "start to terminate mysql xa trans", K(xid), K(tx_id)); + if (NULL == xa_ctx) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected xa ctx", K(ret), K(xid), K(tx_id)); + } else if (OB_FAIL(xa_ctx->handle_abort_for_mysql(ObTxAbortCause::SESSION_DISCONNECT))) { + TRANS_LOG(WARN, "handle terminate for mysql failed", K(ret), K(xid), K(tx_id)); + } + xa_ctx_mgr_.revert_xa_ctx(xa_ctx); + tx_desc = NULL; + } + XA_INNER_INCREMENT_TERMINATE_COUNT(); + return ret; +} + +#define GC_MYSQL_RECORD_SQL "delete from %s where tenant_id = %lu and \ + gtrid = x'%.*s' and \ + bqual = x'%.*s'" + +#define QUERY_MYSQL_LONG_PENDING_SQL "select HEX(gtrid) as gtrid, HEX(bqual) as bqual, \ + trans_id, is_readonly \ + from %s where tenant_id = %lu and \ + unix_timestamp(now()) - %ld > unix_timestamp(gmt_modified) limit 20" + +// 1. for read-only xa trans, if exceed gc threadhold, do not check trans ctx +// and clean record directly +// 2. for read-write xa trans, if exceed gc threadhold, check trans ctx. +// if trans ctx exists, do not clean record +int ObXAService::gc_record_for_mysql() +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + int64_t gc_threshold_second = GCONF._xa_gc_timeout / 1000000; // in seconds + const uint64_t tenant_id = MTL_ID(); + const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); + ObMySQLProxy *mysql_proxy = MTL(ObTransService *)->get_mysql_proxy(); + ObSqlString sql; + + THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + if (OB_FAIL(sql.append_fmt(QUERY_MYSQL_LONG_PENDING_SQL, + OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME, + tenant_id, + gc_threshold_second))) { + TRANS_LOG(WARN, "generate sql fail", K(ret), K(sql), K(tenant_id)); + } else if (OB_FAIL(mysql_proxy->read(res, exec_tenant_id, sql.ptr()))) { + TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "execute sql fail", K(ret), K(tenant_id), K(sql)); + } else { + int64_t count = 0; + while (OB_SUCC(ret) && OB_SUCC(result->next())) { + int64_t affected_rows = 0; + bool is_exist = false; + bool need_delete = false; + char gtrid_str[128] = {0}; + int64_t gtrid_len = 0; + char bqual_str[128] = {0}; + int64_t bqual_len = 0; + int64_t tx_id_value = 0; + bool is_readonly = false; + EXTRACT_STRBUF_FIELD_MYSQL(*result, "gtrid", gtrid_str, 128, gtrid_len); + EXTRACT_STRBUF_FIELD_MYSQL(*result, "bqual", bqual_str, 128, bqual_len); + EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t); + EXTRACT_BOOL_FIELD_MYSQL(*result, "is_readonly", is_readonly); + if (OB_FAIL(ret)) { + TRANS_LOG(WARN, "extract field failed", K(ret)); + } else if (is_readonly) { + need_delete = true; + } else { + if (OB_FAIL(check_trans_ctx(tx_id_value, is_exist))) { + // if fail, do not gc record + TRANS_LOG(WARN, "check trans ctx failed", K(ret), K(tx_id_value)); + } else if (!is_exist) { + need_delete = true; + } + } + if (need_delete) { + ObSqlString delete_sql; + if (OB_FAIL(delete_sql.assign_fmt(GC_MYSQL_RECORD_SQL, + OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME, + tenant_id, + (int)gtrid_len, gtrid_str, + (int)bqual_len, bqual_str))) { + TRANS_LOG(WARN, "generate query xa scheduler sql fail", K(ret), K(delete_sql)); + } else if (OB_SUCCESS != (tmp_ret = mysql_proxy->write(exec_tenant_id, + delete_sql.ptr(), + affected_rows))) { + TRANS_LOG(WARN, "execute gc record sql failed", KR(ret), K(delete_sql)); + } else { + TRANS_LOG(INFO, "gc record success", K(ret), K(tx_id_value), K(is_readonly), + K(affected_rows)); + count++; + } + } + } // end while + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + } + TRANS_LOG(INFO, "gc record for mysql", K(ret), K(count)); + } + } + + return ret; +} + +#define CHECK_TRANS_CTX_SQL "SELECT trans_id \ + FROM %s WHERE \ + trans_id = %ld" + +int ObXAService::check_trans_ctx(const int64_t tx_id_value, bool &is_exist) +{ + int ret = OB_SUCCESS; + ObMySQLProxy *mysql_proxy = NULL; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + if (FALSE_IT(mysql_proxy = MTL(ObTransService *)->get_mysql_proxy())) { + } else if (OB_ISNULL(mysql_proxy)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "mysql_proxy is null", K(ret), KP(mysql_proxy)); + } else if (OB_FAIL(sql.assign_fmt(CHECK_TRANS_CTX_SQL, + OB_ALL_VIRTUAL_TRANS_STAT_TNAME, + tx_id_value))) { + TRANS_LOG(WARN, "generate sql fail", K(ret), K(sql)); + } else if (OB_FAIL(mysql_proxy->read(res, MTL_ID() , sql.ptr()))) { + TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "execute sql fail", K(ret), K(sql)); + } else if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + is_exist = false; + // return success + ret = OB_SUCCESS; + } else { + TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql)); + } + } else { + is_exist = true; + } + TRANS_LOG(INFO, "check trans ctx from virtual table", K(ret), K(tx_id_value), K(is_exist)); + } + return ret; +} + +} // transaction +} // oceanbase diff --git a/src/storage/tx/ob_trans_define_v4.cpp b/src/storage/tx/ob_trans_define_v4.cpp index 3891354c8..71ddfce45 100644 --- a/src/storage/tx/ob_trans_define_v4.cpp +++ b/src/storage/tx/ob_trans_define_v4.cpp @@ -571,7 +571,6 @@ void ObTxDesc::reset() FORCE_PRINT_TRACE(&tlog_, "[tx desc trace]"); } #endif - TRANS_LOG(DEBUG, "reset txdesc", KPC(this), K(lbt())); tenant_id_ = 0; cluster_id_ = -1; trace_info_.reset(); diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index c0da64c40..664446355 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -2050,6 +2050,7 @@ int ObTransService::release_tx_ref(ObTxDesc &tx) int ObTransService::tx_sanity_check(ObTxDesc &tx) { + ObSpinLockGuard guard(tx.lock_); return tx_sanity_check_(tx); } @@ -2144,6 +2145,7 @@ int ObTransService::sql_stmt_start_hook(const ObXATransID &xid, ObGlobalTxType global_tx_type = tx.get_global_tx_type(xid); if (ObGlobalTxType::DBLINK_TRANS == global_tx_type && OB_TRANS_XA_BRANCH_FAIL == ret) { // if dblink trans, change errno (branch fail) to the errno of plain trans + ObSpinLockGuard guard(tx.lock_); if (OB_FAIL(tx_sanity_check_(tx))) { TRANS_LOG(WARN, "tx state insanity", K(ret), K(global_tx_type), K(xid)); } else { @@ -2175,6 +2177,7 @@ int ObTransService::sql_stmt_end_hook(const ObXATransID &xid, ObTxDesc &tx) ObGlobalTxType global_tx_type = tx.get_global_tx_type(xid); if (ObGlobalTxType::DBLINK_TRANS == global_tx_type && OB_TRANS_XA_BRANCH_FAIL == ret) { // if dblink trans, change errno (branch fail) to the errno of plain trans + ObSpinLockGuard guard(tx.lock_); if (OB_FAIL(tx_sanity_check_(tx))) { TRANS_LOG(WARN, "tx state insanity", K(ret), K(global_tx_type), K(xid)); } else { diff --git a/src/storage/tx/ob_tx_free_route.cpp b/src/storage/tx/ob_tx_free_route.cpp index 5ccfea515..ea21e618d 100644 --- a/src/storage/tx/ob_tx_free_route.cpp +++ b/src/storage/tx/ob_tx_free_route.cpp @@ -13,6 +13,7 @@ #include "ob_trans_service.h" #include "lib/utility/serialization.h" +#include "storage/tx/ob_xa_ctx.h" /* * The exception handle of txn state update with synchronization via proxy @@ -1131,6 +1132,15 @@ bool ObTransService::need_fallback_(ObTxDesc &tx, int64_t &total_size) } else if (tx.is_xa_trans() && tx.is_xa_tightly_couple()) { TRANS_LOG(TRACE, "need fallback for tightly coupled xa trans"); fallback = true; + } else if (tx.is_xa_trans() + && NULL != tx.get_xa_ctx() + && tx.get_xa_ctx()->is_mysql_mode() + && ObXATransState::ACTIVE != tx.get_xa_ctx()->get_state()) { + // To be compatible with mysql, trans can not be disassociated with session after xa end. + // However, any dml can not be executed after xa end. Therefore, we need stop tx free route. + // NOTE that the xa trans state is switched to IDLE from ACTIVE in xa end. + TRANS_LOG(TRACE, "need fallback for mysql xa trans"); + fallback = true; } else { total_size = OB_E(EventTable::EN_TX_FREE_ROUTE_STATE_SIZE, tx.tx_id_) tx.estimate_state_size(); if (total_size > MAX_STATE_SIZE) { diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 1d7bd7c40..e1c3ac434 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -101,6 +101,7 @@ void ObXACtx::reset() local_lock_level_ = -1; executing_xid_.reset(); need_stmt_lock_ = true; + is_mysql_mode_ = false; is_inited_ = false; } @@ -165,6 +166,9 @@ int ObXACtx::handle_timeout(const int64_t delay) if (is_exiting_) { ret = OB_TRANS_IS_EXITING; TRANS_LOG(WARN, "xa ctx is exiting", K(ret)); + } else if (is_mysql_mode_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected timeout task for mysql", K(ret), K(*this)); } else if (is_terminated_) { ret = OB_TRANS_IS_EXITING; TRANS_LOG(WARN, "xa trans has terminated", K(ret)); @@ -1828,6 +1832,16 @@ int ObXACtx::start_stmt(const ObXATransID &xid, const uint32_t session_id) } else if (is_exiting_) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "xa ctx is exiting", K(ret), K(*this)); + } else if (is_mysql_mode_) { + // only for mysql mode + if (OB_FAIL(MTL(ObTransService*)->tx_sanity_check(*tx_desc_))) { + TRANS_LOG(WARN, "tx state insanity", K(ret), K(xid), K(*this)); + } else if (ObXATransState::ACTIVE != xa_trans_state_) { + ret = OB_TRANS_XA_RMFAIL; + TRANS_LOG(WARN, "cannot be executed in this state", K(ret), K(xid), K(*this)); + } else { + // do nothing + } } else if (is_terminated_) { // NOTE that anohter error code maybe required for loosely coupled mode ret = OB_TRANS_XA_BRANCH_FAIL; @@ -2615,6 +2629,8 @@ int ObXACtx::try_heartbeat() const int64_t now = ObTimeUtility::current_time(); if (is_exiting_ || is_terminated_) { // do nothing + } else if (is_mysql_mode_) { + // do nothing for mysql mode } else if (original_sche_addr_ != GCTX.self_addr()) { // temproray scheduler, do nothing } else if (OB_ISNULL(xa_branch_info_) @@ -3288,6 +3304,313 @@ int ObXACtx::start_check_stmt_lock(const ObXATransID &xid) return ret; } -}//transaction +// only for mysql mode +// xa start +// @param [in] xid +// @param [in] tx_desc +int ObXACtx::xa_start_for_mysql(const ObXATransID &xid, + ObTxDesc *tx_desc) +{ + int ret = OB_SUCCESS; + const int64_t timeout_seconds = 60; + const int64_t flags = ObXAFlag::OBTMNOFLAGS; + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (OB_UNLIKELY(xid.empty()) || OB_ISNULL(tx_desc)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(xid)); + } else if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(xid)); + } else if (is_exiting_) { + ret = OB_TRANS_IS_EXITING; + TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(*this)); + } else if (!xid.all_equal_to(xid_)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected xid", K(xid), K(xid_), K(*this)); + } else if (OB_FAIL(save_tx_desc_(tx_desc))) { + TRANS_LOG(WARN, "save trans desc failed", K(ret), K(*this)); + } else { + // no need register timeout task + xa_trans_state_ = ObXATransState::ACTIVE; + ++xa_branch_count_; + ++xa_ref_count_; + // set trans_desc members + tx_desc->set_xid(xid); + tx_desc->set_xa_ctx(this); + // set global trans type to xa trans + tx_desc->set_global_tx_type(ObGlobalTxType::XA_TRANS); + is_mysql_mode_ = true; + } + REC_TRACE_EXT(tlog_, xa_start, OB_Y(ret), OB_ID(ctx_ref), get_uref()); + TRANS_LOG(INFO, "xa start for mysql", K(ret), K(*this)); + return ret; +} + +// only for mysql mode +// xa end +// @param [in] xid +// @param [in] tx_desc +int ObXACtx::xa_end_for_mysql(const ObXATransID &xid, + ObTxDesc *tx_desc) +{ + int ret = OB_SUCCESS; + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (OB_UNLIKELY(xid.empty()) || OB_ISNULL(tx_desc)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(xid)); + } else if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(xid)); + } else if (is_exiting_) { + ret = OB_TRANS_IS_EXITING; + TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(*this)); + } else if (!is_mysql_mode_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "not mysql mode", K(ret), K(xid), K(*this)); + } else if (!xid.all_equal_to(xid_)) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "xid not match", K(ret), K(xid)); + } else if (ObXATransState::ACTIVE != xa_trans_state_) { + ret = OB_TRANS_XA_RMFAIL; + TRANS_LOG(WARN, "not active state", K(ret), K(xid), K(*this)); + } else { + xa_trans_state_ = ObXATransState::IDLE; + } + REC_TRACE_EXT(tlog_, xa_end, OB_Y(ret), OB_ID(ctx_ref), get_uref()); + TRANS_LOG(INFO, "xa end for mysql", K(ret), K(*this)); + return ret; +} + +// only for mysql mode +// xa prepare +// @param [in] xid +// @param [in] tx_desc +// @param [out] need_exit +int ObXACtx::pre_xa_prepare_for_mysql(const ObXATransID &xid, + ObTxDesc *tx_desc, + bool &need_exit, + bool &is_read_only, + share::ObLSID &coord_id) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + need_exit = false; + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (OB_UNLIKELY(xid.empty()) || OB_ISNULL(tx_desc)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(xid)); + } else if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(xid)); + } else if (is_exiting_) { + ret = OB_TRANS_IS_EXITING; + TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(*this)); + need_exit = true; + } else if (!is_mysql_mode_ || tx_desc_ != tx_desc) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected xa ctx", K(ret), K(xid), K(*this)); + } else if (!xid.all_equal_to(xid_)) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "unknown xid", K(ret), K(xid), K(*this)); + } else if (ObXATransState::IDLE != xa_trans_state_) { + ret = OB_TRANS_XA_RMFAIL; + TRANS_LOG(WARN, "not idle state", K(ret), K(xid), K(*this)); + } else if (OB_FAIL(MTL(ObTransService*)->tx_sanity_check(*tx_desc))) { + TRANS_LOG(WARN, "tx state insanity", K(ret), K(xid), K(*this)); + need_exit = true; + } else { + if (OB_FAIL(MTL(ObTransService*)->prepare_tx_coord(*tx_desc_, coord_id))) { + if (OB_ERR_READ_ONLY_TRANSACTION == ret) { + // read only tranaction, rewrite ret to OB_SUCCESS + XA_STAT_ADD_XA_READ_ONLY_TRANS_TOTAL_COUNT(); + TRANS_LOG(INFO, "xa is read only", K(ret), K(*this)); + is_read_only = true; + ret = OB_SUCCESS; + } else { + TRANS_LOG(WARN, "prepare tx coord failed", K(ret), K(*this)); + } + } else if (!coord_id.is_valid()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "invalid coordinator", K(ret), K_(xid), K(coord_id), K(*this)); + } + if (OB_FAIL(ret) || is_read_only) { + need_exit = true; + } + } + if (need_exit) { + if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_, + ObTxAbortCause::IMPLICIT_ROLLBACK))) { + TRANS_LOG(WARN, "abort tx failed", K(tmp_ret), K(*this)); + } + --xa_ref_count_; + set_terminated_(); + try_exit_(); + } + REC_TRACE_EXT(tlog_, xa_prepare, OB_Y(ret), OB_ID(ctx_ref), get_uref()); + TRANS_LOG(INFO, "pre xa prepare for mysql", K(ret), K(need_exit), K(*this)); + return ret; +} + +// only for mysql mode +// xa prepare +// if fail, need exit +// @param [in] xid +// @param [in] tx_desc +int ObXACtx::xa_prepare_for_mysql(const ObXATransID &xid, + int64_t timeout_us) +{ + int ret = OB_SUCCESS; + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (OB_FAIL(drive_prepare_(xid, timeout_us))) { + TRANS_LOG(WARN, "drive prepare failed", K(ret), K(*this)); + } + if (OB_FAIL(ret)) { + --xa_ref_count_; + try_exit_(); + } + return ret; +} + +// only for mysql mode +// @param [in] xid +// @param [in] timeout_us +int ObXACtx::wait_xa_prepare_for_mysql(const ObXATransID &xid, + const int64_t timeout_us) +{ + int ret = OB_SUCCESS; + int result = OB_SUCCESS; + + if (OB_FAIL(end_trans_cb_.wait(timeout_us + 10000000, result)) || OB_FAIL(result)) { + TRANS_LOG(WARN, "wait trans prepare failed", K(ret), K(xid), K(*this)); + } + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (OB_SUCC(ret)) { + xa_trans_state_ = ObXATransState::PREPARED; + } + --xa_ref_count_; + try_exit_(); + return ret; +} + +// only for mysql mode +// @param [in] xid +// @param [in] is_rollback +// @param [in] timeout_us +int ObXACtx::one_phase_end_trans_for_mysql(const ObXATransID &xid, + const bool is_rollback, + const int64_t timeout_us, + ObTxDesc *tx_desc, + bool &need_exit) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + need_exit = false; + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (OB_UNLIKELY(xid.empty()) || OB_ISNULL(tx_desc)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(xid)); + } else if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(xid)); + } else if (is_exiting_) { + ret = OB_TRANS_IS_EXITING; + TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(xid), K(*this)); + need_exit = true; + } else if (!is_mysql_mode_ || tx_desc_ != tx_desc) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "unexpected xa ctx", K(ret), K(xid), K(*this)); + } else if (!xid.all_equal_to(xid_)) { + ret = OB_TRANS_XA_NOTA; + TRANS_LOG(WARN, "unknown xid", K(ret), K(xid), K(*this)); + } else if (ObXATransState::IDLE != xa_trans_state_) { + ret = OB_TRANS_XA_RMFAIL; + TRANS_LOG(WARN, "not idle state", K(ret), K(xid), K(*this)); + } else { + if (OB_FAIL(MTL(ObTransService*)->tx_sanity_check(*tx_desc))) { + need_exit = true; + TRANS_LOG(WARN, "tx state insanity", K(ret), K(xid), K(*this)); + } else { + // in this, need exit + // case one, if fail, exit directly + // case two, if success, exit in wait interface + const int64_t now = ObTimeUtility::current_time(); + if (OB_FAIL(MTL(ObTransService*)->end_1pc_trans(*tx_desc_, &end_trans_cb_, is_rollback, + now + timeout_us))) { + TRANS_LOG(WARN, "end 1pc trans failed", K(ret), K(*this)); + need_exit = true; + } else { + if (is_rollback) { + xa_trans_state_ = ObXATransState::ROLLBACKING; + } else { + xa_trans_state_ = ObXATransState::COMMITTING; + } + } + } + is_xa_one_phase_ = true; + } + if (need_exit) { + if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_, + ObTxAbortCause::IMPLICIT_ROLLBACK))) { + TRANS_LOG(WARN, "abort tx failed", K(tmp_ret), K(*this)); + } + --xa_ref_count_; + set_terminated_(); + try_exit_(); + } + REC_TRACE_EXT(tlog_, xa_one_phase, OB_Y(ret), OB_ID(is_rollback), is_rollback, + OB_ID(ctx_ref), get_uref()); + return ret; +} + +int ObXACtx::wait_one_phase_end_trans_for_mysql(const bool is_rollback, const int64_t timeout_us) +{ + int ret = OB_SUCCESS; + int result = OB_SUCCESS; + + if (!is_rollback) { + if (OB_FAIL(end_trans_cb_.wait(timeout_us + 10 * 1000 * 1000, result)) || OB_FAIL(result)) { + TRANS_LOG(WARN, "wait sub2pc end failed", K(ret), K(is_rollback), K(*this)); + } + } + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (is_rollback) { + xa_trans_state_ = ObXATransState::ROLLBACKED; + } else if (OB_SUCCESS == ret) { + xa_trans_state_ = ObXATransState::COMMITTED; + } + --xa_ref_count_; + set_exiting_(); + TRANS_LOG(INFO, "wait one phase end trans", K(ret), K(is_rollback), K(*this)); + + return ret; +} + +int ObXACtx::handle_abort_for_mysql(int cause) +{ + int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); + if (!is_inited_) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "ObXACtx not inited", K(ret)); + } else if (!is_mysql_mode_ || NULL == tx_desc_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected xa ctx", K(ret), K(*this)); + } else if (ObXATransState::ACTIVE != xa_trans_state_ + && ObXATransState::IDLE != xa_trans_state_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected xa trans state", K(ret), K(*this)); + } else { + if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc_, cause))) { + TRANS_LOG(WARN, "abort tx failed", K(tmp_ret), K(*this)); + } + --xa_ref_count_; + set_terminated_(); + set_exiting_(); + } + return ret; +} + +}//transaction }//oceanbase diff --git a/src/storage/tx/ob_xa_ctx.h b/src/storage/tx/ob_xa_ctx.h index fb6f4823e..9d36f7268 100644 --- a/src/storage/tx/ob_xa_ctx.h +++ b/src/storage/tx/ob_xa_ctx.h @@ -146,6 +146,27 @@ public: ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); return xa_ref_count_; } +public: + // for mysql xa + int xa_start_for_mysql(const ObXATransID &xid, ObTxDesc *tx_desc); + int xa_end_for_mysql(const ObXATransID &xid, ObTxDesc *tx_desc); + int pre_xa_prepare_for_mysql(const ObXATransID &xid, + ObTxDesc *tx_desc, + bool &need_exit, + bool &is_read_only, + share::ObLSID &coord_id); + int xa_prepare_for_mysql(const ObXATransID &xid, const int64_t timeout_us); + int wait_xa_prepare_for_mysql(const ObXATransID &xid, const int64_t timeout_us); + int one_phase_end_trans_for_mysql(const ObXATransID &xid, + const bool is_rollback, + const int64_t timeout_us, + ObTxDesc *tx_desc, + bool &need_exit); + int wait_one_phase_end_trans_for_mysql(const bool is_rollback, + const int64_t timeout_us); + bool is_mysql_mode() const { return is_mysql_mode_; } + int handle_abort_for_mysql(int cause); + int32_t get_state() const { return xa_trans_state_; } public: // for 4.0 dblink int xa_start_for_dblink(const ObXATransID &xid, @@ -172,7 +193,8 @@ public: K_(xa_branch_count), K_(xa_ref_count), K_(lock_grant), K_(is_tightly_coupled), K_(lock_xid), K_(xa_stmt_info), K_(is_terminated), K_(executing_xid), "uref", get_uref(), - K_(has_tx_level_temp_table), K_(local_lock_level), K_(need_stmt_lock)); + K_(has_tx_level_temp_table), K_(local_lock_level), K_(need_stmt_lock), + K_(is_mysql_mode)); private: int register_timeout_task_(const int64_t interval_us); int unregister_timeout_task_(); @@ -337,6 +359,7 @@ private: // 4.2 if local_lock_level == 0, execute the normal global lock release processing int64_t local_lock_level_; bool need_stmt_lock_; + bool is_mysql_mode_; }; }//transaction diff --git a/src/storage/tx/ob_xa_define.h b/src/storage/tx/ob_xa_define.h index 0162677bc..98841724c 100644 --- a/src/storage/tx/ob_xa_define.h +++ b/src/storage/tx/ob_xa_define.h @@ -131,6 +131,7 @@ public: static bool contain_tmreadonly(const int64_t flag) { return flag & OBTMREADONLY; } static bool contain_tmserializable(const int64_t flag) { return flag & OBTMSERIALIZABLE; } static bool is_tmnoflags(const int64_t flag, const int64_t xa_req_type); + static bool is_tmnoflags_for_mysql(const int64_t flag) { return OBTMNOFLAGS == flag; } static bool contain_loosely(const int64_t flag) { return flag & OBLOOSELY; } static bool contain_tmjoin(const int64_t flag) { return flag & OBTMJOIN; } static bool is_tmjoin(const int64_t flag) { return flag == OBTMJOIN; } diff --git a/src/storage/tx/ob_xa_inner_sql_client.cpp b/src/storage/tx/ob_xa_inner_sql_client.cpp new file mode 100644 index 000000000..aa1848dd4 --- /dev/null +++ b/src/storage/tx/ob_xa_inner_sql_client.cpp @@ -0,0 +1,205 @@ +// Copyright (c) 2021 OceanBase +// OceanBase is licensed under Mulan PubL v2. +// You can use this software according to the terms and conditions of the Mulan PubL v2. +// You may obtain a copy of Mulan PubL v2 at: +// http://license.coscl.org.cn/MulanPubL-2.0 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PubL v2 for more details. +#include "ob_xa_inner_sql_client.h" + +namespace oceanbase +{ +using namespace common; +using namespace common::sqlclient; +namespace transaction +{ +ObXAInnerSQLClient::~ObXAInnerSQLClient() +{ + reset(); + close(); +} + +void ObXAInnerSQLClient::reset() +{ + start_ts_ = 0; + has_started_ = false; +} + +int ObXAInnerSQLClient::start(ObISQLClient *sql_client) +{ + int ret = OB_SUCCESS; + const int32_t group_id = 0; + if (OB_UNLIKELY(NULL == sql_client)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid arguments", K(ret), KP(sql_client)); + } else if (has_started_) { + // return success + ret = OB_SUCCESS; + } else { + const uint64_t exec_tenant_id = gen_meta_tenant_id(MTL_ID()); + if (OB_FAIL(connect(exec_tenant_id, group_id, sql_client))) { + TRANS_LOG(WARN, "failed to init", K(ret)); + } else { + start_ts_ = ObTimeUtility::current_time(); + has_started_ = true; + } + } + return ret; +} + +int ObXAInnerSQLClient::end() +{ + int ret = OB_SUCCESS; + start_ts_ = 0; + has_started_ = false; + close(); + return ret; +} + +#define QUERY_MYSQLXA_SQL "\ + SELECT coordinator, trans_id, is_readonly FROM %s WHERE \ + tenant_id = %lu AND gtrid = x'%.*s' AND bqual = x'%.*s' AND format_id = %ld" + +int ObXAInnerSQLClient::query_xa_coord_for_mysql(const ObXATransID &xid, + share::ObLSID &coord_id, + ObTransID &tx_id, + bool &is_readonly) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + char gtrid_str[128] = {0}; + int64_t gtrid_len = 0; + char bqual_str[128] = {0}; + int64_t bqual_len = 0; + const uint64_t tenant_id = MTL_ID(); + const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); + + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAInnerSqlStatGuard stat_guard(start_ts); + + if (!has_started_ || NULL == get_connection()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected xa inner sql client", K(ret), K(xid)); + } else if (xid.empty()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(xid)); + } else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(), + xid.get_gtrid_str().length(), + gtrid_str, + 128, + gtrid_len))) { + TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(xid)); + } else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(), + xid.get_bqual_str().length(), + bqual_str, + 128, + bqual_len))) { + TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(xid)); + } else if (OB_FAIL(sql.assign_fmt(QUERY_MYSQLXA_SQL, + OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME, + tenant_id, + (int)gtrid_len, gtrid_str, + (int)bqual_len, bqual_str, + xid.get_format_id()))) { + TRANS_LOG(WARN, "generate query coordinator sql fail", K(ret)); + } else if (OB_FAIL(read(res, exec_tenant_id, sql.ptr()))) { + TRANS_LOG(WARN, "execute sql read fail", KR(ret), K(exec_tenant_id), K(sql)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "execute sql fail", K(ret), K(sql)); + } else if (OB_FAIL(result->next())) { + if (OB_ITER_END != ret) { + TRANS_LOG(WARN, "iterate next result fail", K(ret), K(sql)); + } + } else { + int64_t tx_id_value = 0; + EXTRACT_INT_FIELD_MYSQL(*result, "trans_id", tx_id_value, int64_t); + EXTRACT_BOOL_FIELD_MYSQL(*result, "is_readonly", is_readonly); + tx_id = ObTransID(tx_id_value); + if (OB_FAIL(ret)) { + TRANS_LOG(WARN, "fail to extract field from result", K(ret)); + } else { + int64_t ls_id_value = 0; + EXTRACT_INT_FIELD_MYSQL(*result, "coordinator", ls_id_value, int64_t); + if (OB_FAIL(ret)) { + if (OB_ERR_NULL_VALUE == ret) { + // rewrite code, and caller would check validity of coordinator + ret = OB_SUCCESS; + } else { + TRANS_LOG(WARN, "fail to extract coordinator from result", K(ret), K(xid)); + } + } else { + coord_id = share::ObLSID(ls_id_value); + } + } + } + } + return ret; +} + + +#define DELETE_XA_TRANS_SQL "delete from %s where \ + tenant_id = %lu and gtrid = x'%.*s' and bqual = x'%.*s' and format_id = %ld" + +int ObXAInnerSQLClient::delete_xa_branch_for_mysql(const ObXATransID &xid) +{ + int ret = OB_SUCCESS; + ObSqlString sql; + int64_t affected_rows = 0; + char gtrid_str[128] = {0}; + int64_t gtrid_len = 0; + char bqual_str[128] = {0}; + int64_t bqual_len = 0; + const uint64_t tenant_id = MTL_ID(); + const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAInnerSqlStatGuard stat_guard(start_ts); + + if (!has_started_ || NULL == get_connection()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected xa inner sql client", K(ret), K(xid)); + } else if (xid.empty()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(xid)); + } else if (OB_FAIL(hex_print(xid.get_gtrid_str().ptr(), + xid.get_gtrid_str().length(), + gtrid_str, + 128, + gtrid_len))) { + TRANS_LOG(WARN, "fail to convert gtrid to hex", K(ret), K(xid)); + } else if (OB_FAIL(hex_print(xid.get_bqual_str().ptr(), + xid.get_bqual_str().length(), + bqual_str, + 128, + bqual_len))) { + TRANS_LOG(WARN, "fail to convert bqual to hex", K(ret), K(xid)); + } else { + if (OB_FAIL(sql.assign_fmt(DELETE_XA_TRANS_SQL, + OB_ALL_TENANT_GLOBAL_TRANSACTION_TNAME, + tenant_id, + (int)gtrid_len, gtrid_str, + (int)bqual_len, bqual_str, + xid.get_format_id()))) { + TRANS_LOG(WARN, "generate delete xa trans sql fail", K(ret), K(sql)); + } else if (OB_FAIL(write(exec_tenant_id, sql.ptr(), affected_rows))) { + TRANS_LOG(WARN, "execute delete xa trans sql fail", + KR(ret), K(exec_tenant_id), K(sql), K(affected_rows)); + } else if (OB_UNLIKELY(1 < affected_rows)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "delete multiple rows", K(xid), K(affected_rows), K(sql)); + } else if (OB_UNLIKELY(0 == affected_rows)) { + TRANS_LOG(WARN, "delete no rows", K(xid), K(sql)); + // return OB_SUCCESS + } else { + // do nothing + } + } + return ret; +} + +} // end namespace transaction +} // end namespace oceanbase diff --git a/src/storage/tx/ob_xa_inner_sql_client.h b/src/storage/tx/ob_xa_inner_sql_client.h new file mode 100644 index 000000000..218ec471e --- /dev/null +++ b/src/storage/tx/ob_xa_inner_sql_client.h @@ -0,0 +1,57 @@ +// Copyright (c) 2021 OceanBase +// OceanBase is licensed under Mulan PubL v2. +// You can use this software according to the terms and conditions of the Mulan PubL v2. +// You may obtain a copy of Mulan PubL v2 at: +// http://license.coscl.org.cn/MulanPubL-2.0 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +// See the Mulan PubL v2 for more details. + +#ifndef OCEANBASE_XA_INNER_SQL_CLIENT_H_ +#define OCEANBASE_XA_INNER_SQL_CLIENT_H_ + +#include "lib/string/ob_sql_string.h" +#include "lib/oblog/ob_log_module.h" +#include "lib/mysqlclient/ob_isql_client.h" +#include "lib/mysqlclient/ob_mysql_connection.h" +#include "lib/mysqlclient/ob_single_connection_proxy.h" +#include "storage/tx/ob_trans_define.h" + +namespace oceanbase +{ +namespace common +{ +namespace sqlclient +{ +class ObISQLConnection; +class ObISQLConnectionPool; +} +} +namespace transaction +{ +// the object of this class is not thread-safe +class ObXAInnerSQLClient: public ObSingleConnectionProxy +{ +public: + ObXAInnerSQLClient() : start_ts_(0), has_started_(false) {}; + ~ObXAInnerSQLClient(); + void reset(); +public: + int start(ObISQLClient *sql_client); + int end(); + bool has_started() const { return has_started_; } + int query_xa_coord_for_mysql(const ObXATransID &xid, + share::ObLSID &coord_id, + ObTransID &tx_id, + bool &is_readonly); + int delete_xa_branch_for_mysql(const ObXATransID &xid); +private: + int64_t start_ts_; + bool has_started_; +}; + +} // end namespace transaction +} // end namespace oceanbase + +#endif // OCEANBASE_XA_INNER_SQL_CLIENT_H_ diff --git a/src/storage/tx/ob_xa_inner_table_gc_worker.cpp b/src/storage/tx/ob_xa_inner_table_gc_worker.cpp index 35ad27304..32ed9664d 100644 --- a/src/storage/tx/ob_xa_inner_table_gc_worker.cpp +++ b/src/storage/tx/ob_xa_inner_table_gc_worker.cpp @@ -56,15 +56,20 @@ int ObXAInnerTableGCWorker::start() return ret; } +// 1. in mysql mode, only meta leader collect invalid records +// 2. no need gc for sys tenant temporarily void ObXAInnerTableGCWorker::run1() { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; + const ObLSID meta_ls_id(ObLSID::SYS_LS_ID); const uint64_t tenant_id = MTL_ID(); + const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id); int64_t last_scan_ts = ObTimeUtil::current_time(); + bool has_decided_mode = false; bool is_oracle_mode = false; int64_t start_delay = ObRandom::rand(1, 100); - // use start delay avoid diffient thread do gc on the same time lib::set_thread_name("ObXAGCWorker"); for (int64_t i = 0; i < start_delay && !has_set_stop(); ++i) { @@ -76,27 +81,54 @@ void ObXAInnerTableGCWorker::run1() int64_t gc_interval = std::max(2 * max_gc_cost_time_, tmp_start_delay); gc_interval = std::min(gc_interval, gc_interval_upper_bound); - int64_t gc_cost_time = 0; - int64_t before_gc_ts = 0; - + if (OB_SUCCESS != (tmp_ret = ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id, + is_oracle_mode))) { + TRANS_LOG(WARN, "check oracle mode failed", K(ret)); + } else { + has_decided_mode = true; + } while (!has_set_stop()) { - before_gc_ts = ObTimeUtil::current_time(); - if (before_gc_ts - last_scan_ts > gc_interval) { - if (is_user_tenant(tenant_id) - && OB_SUCC(share::ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id, is_oracle_mode)) - && is_oracle_mode) { - if (OB_FAIL(xa_service_->gc_invalid_xa_record(tenant_id))) { - TRANS_LOG(WARN, "gc invalid xa record failed", K(ret), K(tenant_id), - K(gc_interval), K(last_scan_ts), K(before_gc_ts)); + if (is_user_tenant(tenant_id) && has_decided_mode) { + int64_t gc_cost_time = 0; + int64_t before_gc_ts = ObTimeUtil::current_time(); + if (before_gc_ts - last_scan_ts > gc_interval) { + if (is_oracle_mode) { + // oracle mode + if (OB_FAIL(xa_service_->gc_invalid_xa_record(tenant_id))) { + TRANS_LOG(WARN, "gc invalid xa record failed", K(ret), K(tenant_id), + K(gc_interval), K(last_scan_ts), K(before_gc_ts)); + } else { + // update last scan ts + last_scan_ts = ObTimeUtil::current_time(); + gc_cost_time = last_scan_ts - before_gc_ts; // compute gc cost + max_gc_cost_time_ = std::max(max_gc_cost_time_, gc_cost_time); + TRANS_LOG(INFO, "clean invalid records", K(tenant_id), K(ret), K(gc_cost_time)); + } } else { - // update last scan ts - last_scan_ts = ObTimeUtil::current_time(); - gc_cost_time = last_scan_ts - before_gc_ts; // compute gc cost - max_gc_cost_time_ = std::max(max_gc_cost_time_, gc_cost_time); - TRANS_LOG(INFO, "scan xa inner table for one round", K(tenant_id), K(ret), K(gc_cost_time)); + // mysql mode + common::ObAddr leader_addr; + if (OB_FAIL(MTL(ObTransService *)->get_location_adapter()->nonblock_get_leader( + GCONF.cluster_id, meta_tenant_id, meta_ls_id, leader_addr))) { + TRANS_LOG(WARN, "get leader failed", K(ret)); + } else if (GCTX.self_addr() == leader_addr) { + if (OB_FAIL(xa_service_->gc_record_for_mysql())) { + TRANS_LOG(WARN, "gc record failed", K(ret)); + } else { + // update last scan ts + last_scan_ts = ObTimeUtil::current_time(); + gc_cost_time = last_scan_ts - before_gc_ts; + max_gc_cost_time_ = std::max(max_gc_cost_time_, gc_cost_time); + TRANS_LOG(INFO, "clean invalid records", K(ret), K(gc_cost_time)); + } + } } + } + } else if (is_user_tenant(tenant_id) && !has_decided_mode) { + if (OB_SUCCESS != (tmp_ret = ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id, + is_oracle_mode))) { + TRANS_LOG(WARN, "check oracle mode failed", K(ret)); } else { - last_scan_ts = before_gc_ts; + has_decided_mode = true; } } //sleep 20 secnd whether gc succ or not diff --git a/src/storage/tx/ob_xa_service.h b/src/storage/tx/ob_xa_service.h index 2a096f1b9..43a01e0b6 100644 --- a/src/storage/tx/ob_xa_service.h +++ b/src/storage/tx/ob_xa_service.h @@ -139,6 +139,41 @@ public: int xa_prepare_for_original(const ObXATransID &xid, const ObTransID &trans_id, const int64_t timeout_seconds); +public: + // for mysql + int xa_start_for_mysql(const ObXATransID &xid, + const int64_t flags, + const uint32_t session_id, + const ObTxParam &tx_param, + ObTxDesc *&tx_desc); + int xa_end_for_mysql(const ObXATransID &xid, + ObTxDesc *&tx_desc); + int xa_prepare_for_mysql(const ObXATransID &xid, + const int64_t timeout_us, + ObTxDesc *&tx_desc, + bool &need_exit); + int xa_commit_onephase_for_mysql(const ObXATransID &xid, + const int64_t timeout_us, + ObTxDesc *&tx_desc, + bool &need_exit); + int xa_second_phase_twophase_for_mysql(const ObXATransID &xid, + const int64_t timeout_us, + const bool is_rollback, + ObTransID &tx_id); + int xa_rollback_onephase_for_mysql(const ObXATransID &xid, + const int64_t timeout_us, + ObTxDesc *&tx_desc, + bool &need_exit); + int insert_record_for_mysql(const uint64_t tenant_id, + const ObXATransID &xid, + const ObTransID &trans_id, + const share::ObLSID &coordinator, + const ObAddr &sche_addr, + const bool is_read_only); + int handle_terminate_for_mysql(const ObXATransID &xid, + ObTxDesc *tx_desc); + int gc_record_for_mysql(); + int check_trans_ctx(const int64_t tx_id_value, bool &is_exist); public: // for 4.0 dblink int xa_start_for_tm_promotion(const int64_t flags, @@ -322,6 +357,12 @@ private: const ObTransID &tx_id, const ObAddr &original_sche_addr, const int64_t timeout_us); +private: + // for mysql + int xa_start_for_mysql_(const ObXATransID &xid, + const uint32_t session_id, + const ObTxParam &tx_param, + ObTxDesc *&tx_desc); private: // for 4.0 dblink int xa_start_for_tm_promotion_(const int64_t flags, @@ -487,6 +528,9 @@ private: #define XA_INNER_INCREMENT_COMPENSATE_COUNT() \ { MTL(ObXAService*)->get_statistics().inc_compensate_record_count();} \ +#define XA_INNER_INCREMENT_TERMINATE_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_session_terminate_count();} \ + /////// statistics of dblink trans #define DBLINK_STAT_ADD_TRANS_COUNT() \ { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_count();} \ diff --git a/src/storage/tx/ob_xa_trans_event.cpp b/src/storage/tx/ob_xa_trans_event.cpp index 916304245..8fd8082b1 100644 --- a/src/storage/tx/ob_xa_trans_event.cpp +++ b/src/storage/tx/ob_xa_trans_event.cpp @@ -293,6 +293,11 @@ void ObXATransStatistics::inc_compensate_record_count() ATOMIC_INC(&xa_compensate_record_count_); } +void ObXATransStatistics::inc_session_terminate_count() +{ + ATOMIC_INC(&xa_terminate_count_); +} + void ObXATransStatistics::try_print_xa_statistics() { static const int64_t STAT_INTERVAL = 9000000; // 9 seconds @@ -351,9 +356,11 @@ void ObXATransStatistics::try_print_xa_statistics() } // for inner logic int64_t xa_compensate_record_count = ATOMIC_LOAD(&xa_compensate_record_count_); - if (0 != xa_compensate_record_count) { + int64_t xa_terminate_count = ATOMIC_LOAD(&xa_terminate_count_); + if (0 != xa_compensate_record_count || 0 != xa_terminate_count) { FLOG_INFO("xa statistics of inner logic", - "xa_compensate_record_count", xa_compensate_record_count); + "xa_compensate_record_count", xa_compensate_record_count, + "xa_terminate_count", xa_terminate_count); } // reset // NOTE that the active info should not be reset @@ -389,6 +396,7 @@ void ObXATransStatistics::try_print_xa_statistics() ATOMIC_STORE(&xa_inner_rpc_ten_ms_count_, 0); ATOMIC_STORE(&xa_inner_rpc_twenty_ms_count_, 0); ATOMIC_STORE(&xa_compensate_record_count_, 0); + ATOMIC_STORE(&xa_terminate_count_, 0); } // for active info int64_t active_xa_stmt_count = ATOMIC_LOAD(&active_xa_stmt_count_); diff --git a/src/storage/tx/ob_xa_trans_event.h b/src/storage/tx/ob_xa_trans_event.h index 0325355fa..ec5d2502d 100644 --- a/src/storage/tx/ob_xa_trans_event.h +++ b/src/storage/tx/ob_xa_trans_event.h @@ -33,7 +33,8 @@ public: xa_inner_sql_count_(0), xa_inner_sql_used_time_us_(0), xa_inner_rpc_count_(0), xa_inner_rpc_used_time_us_(0), xa_inner_sql_ten_ms_count_(0), xa_inner_sql_twenty_ms_count_(0), xa_inner_rpc_ten_ms_count_(0), xa_inner_rpc_twenty_ms_count_(0), - active_xa_stmt_count_(0), active_xa_ctx_count_(0), xa_compensate_record_count_(0) {} + active_xa_stmt_count_(0), active_xa_ctx_count_(0), xa_compensate_record_count_(0), + xa_terminate_count_(0) {} ~ObXATransStatistics() { destroy(); } int init(const uint64_t tenant_id); void reset(); @@ -111,6 +112,8 @@ public: void dec_active_xa_ctx_count(); // increment compensate record count void inc_compensate_record_count(); + // increment session terminate count + void inc_session_terminate_count(); public: void try_print_xa_statistics(); @@ -160,6 +163,7 @@ private: int64_t active_xa_ctx_count_; // inner logic int64_t xa_compensate_record_count_; + int64_t xa_terminate_count_; }; class ObDBLinkTransStatistics diff --git a/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp b/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp index e26be9454..296e34d89 100644 --- a/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp +++ b/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp @@ -19,6 +19,7 @@ namespace oceanbase { using namespace common; using namespace storage; +using namespace share; namespace transaction { @@ -59,18 +60,22 @@ void ObXATransHeartbeatWorker::run1() static const int64_t INTERVAL_US = 5 * 1000 * 1000; // 5s int64_t loop_count = 0; int64_t total_time = 0; - + const uint64_t tenant_id = MTL_ID(); lib::set_thread_name("ObXAHbWorker"); + while (!has_set_stop()) { - int64_t start_time = ObTimeUtility::current_time(); loop_count++; - // MTL(ObXAService *)->get_xa_statistics().print_statistics(start_time); MTL(ObXAService *)->try_print_statistics(); + bool is_oracle_mode = false; + int64_t start_time = ObTimeUtility::current_time(); + (void)ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(tenant_id, is_oracle_mode); + // currently, heartbeat is only for oracle user tenant + const bool need_heartbeat = is_oracle_mode && is_user_tenant(tenant_id); if (OB_UNLIKELY(!is_inited_)) { TRANS_LOG(WARN, "xa trans heartbeat not init"); ret = OB_NOT_INIT; - } else { + } else if (need_heartbeat) { if (OB_FAIL(xa_service_->xa_scheduler_hb_req())) { TRANS_LOG(WARN, "xa scheduler heartbeat failed", K(ret)); }