From 2defdd72963e9a91b1550eb10867f84ecd1cedaf Mon Sep 17 00:00:00 2001 From: jw-guo Date: Mon, 14 Aug 2023 08:18:44 +0000 Subject: [PATCH] [4.1][4.2] refine set autocommit in dblink trans --- .../engine/cmd/ob_variable_set_executor.cpp | 45 ++++++++++++++++++- src/sql/ob_result_set.cpp | 35 ++++++++++----- src/sql/session/ob_sql_session_info.h | 1 + 3 files changed, 70 insertions(+), 11 deletions(-) diff --git a/src/sql/engine/cmd/ob_variable_set_executor.cpp b/src/sql/engine/cmd/ob_variable_set_executor.cpp index f9310b77c..18f06ea2b 100644 --- a/src/sql/engine/cmd/ob_variable_set_executor.cpp +++ b/src/sql/engine/cmd/ob_variable_set_executor.cpp @@ -227,6 +227,12 @@ int ObVariableSetExecutor::execute(ObExecContext &ctx, ObVariableSetStmt &stmt) LOG_WARN("fail to process session autocommit", K(ret), K(ret_ac)); if (OB_ERR_WRONG_VALUE_FOR_VAR == ret_ac) { ret = ret_ac; + } else if (OB_OP_NOT_ALLOW == ret_ac) { + ret = ret_ac; + } else if (OB_TRANS_XA_ERR_COMMIT == ret_ac) { + ret = ret_ac; + } else if (OB_ERR_UNEXPECTED == ret_ac) { + ret = ret_ac; } } else {} } else {} @@ -942,8 +948,45 @@ int ObVariableSetExecutor::process_session_autocommit_hook(ObExecContext &exec_c LOG_USER_ERROR(OB_ERR_WRONG_VALUE_FOR_VAR, (int)strlen(OB_SV_AUTOCOMMIT), OB_SV_AUTOCOMMIT, (int)strlen(autocommit_str), autocommit_str); } else { + // in xa trans or dblink trans + if (in_trans && my_session->associated_xa()) { + const transaction::ObXATransID xid = my_session->get_xid(); + transaction::ObTxDesc *tx_desc = my_session->get_tx_desc(); + const transaction::ObGlobalTxType global_tx_type = tx_desc->get_global_tx_type(xid); + // not allow to set autocommit to on + if (false == orig_ac && 1 == autocommit) { + ret = OB_TRANS_XA_ERR_COMMIT; + LOG_WARN("not allow to set autocommit on in xa trans", K(ret), K(xid)); + } else if (true == orig_ac && 1 == autocommit) { + if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) { + // do nothing + } else if (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type) { + // in dblink trans, this case is not posssible + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected case for dblink trans", K(ret), K(xid)); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected global trans type", K(ret), K(xid), K(global_tx_type)); + } + } else { + // in xa trans + if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) { + LOG_INFO("set autocommit off in xa trans", K(ret), K(xid)); + // in dblink trans + } else if (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type) { + if (my_session->need_restore_auto_commit()) { + ret = OB_OP_NOT_ALLOW; + LOG_WARN("not allow to set autocomit off", K(ret), K(xid)); + } else { + LOG_INFO("set autocommit off in dblink trans", K(ret), K(xid)); + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected global trans type", K(ret), K(xid), K(global_tx_type)); + } + } // skip commit txn if this is txn free route temporary node - if (false == orig_ac && true == in_trans && 1 == autocommit && !my_session->is_txn_free_route_temp()) { + } else if (false == orig_ac && true == in_trans && 1 == autocommit && !my_session->is_txn_free_route_temp()) { if (OB_FAIL(ObSqlTransControl::implicit_end_trans(exec_ctx, false))) { LOG_WARN("fail implicit commit trans", K(ret)); } diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index e0f7bf444..6c872b10a 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -250,12 +250,32 @@ int ObResultSet::on_cmd_execute() ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid inner state", K(cmd_)); } else if (cmd_->cause_implicit_commit()) { - // not allow implicit commit in xa trans - if (my_session_.associated_xa()) { - ret = OB_TRANS_XA_ERR_COMMIT; + if (my_session_.is_in_transaction() && my_session_.associated_xa()) { + int tmp_ret = OB_SUCCESS; + transaction::ObTxDesc *tx_desc = my_session_.get_tx_desc(); + const transaction::ObXATransID xid = my_session_.get_xid(); + const transaction::ObGlobalTxType global_tx_type = tx_desc->get_global_tx_type(xid); + if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) { + // commit is not allowed in xa trans + ret = OB_TRANS_XA_ERR_COMMIT; + LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), K(xid), K(global_tx_type), + KPC(tx_desc)); + } else if (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type) { + transaction::ObTransID tx_id; + if (OB_FAIL(ObTMService::tm_commit(get_exec_context(), tx_id))) { + LOG_WARN("fail to do commit for dblink trans", K(ret), K(tx_id), K(xid), + K(global_tx_type)); + } + my_session_.restore_auto_commit(); + const bool force_disconnect = false; + if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = my_session_.get_dblink_context().clean_dblink_conn(force_disconnect)))) { + LOG_WARN("dblink transaction failed to release dblink connections", K(tmp_ret), K(tx_id), K(xid)); + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected global trans type", K(ret), K(xid), K(global_tx_type), KPC(tx_desc)); + } get_exec_context().set_need_disconnect(false); - const transaction::ObTxDesc *tx_desc = my_session_.get_tx_desc(); - LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), KPC(tx_desc)); } else { // commit current open transaction, synchronously if (OB_FAIL(ObSqlTransControl::implicit_end_trans(get_exec_context(), false))) { @@ -1089,11 +1109,6 @@ int ObResultSet::init_cmd_exec_context(ObExecContext &exec_ctx) ret = OB_NOT_INIT; LOG_WARN("cmd or ctx is NULL", K(ret), K(cmd_), K(plan_ctx)); ret = OB_ERR_UNEXPECTED; - } else if ((ObStmt::is_ddl_stmt(stmt_type_, true) || ObStmt::is_dcl_stmt(stmt_type_)) - && stmt::T_VARIABLE_SET != stmt_type_ && my_session_.associated_xa()) { - ret = OB_TRANS_XA_ERR_COMMIT; - const transaction::ObTxDesc *tx_desc = my_session_.get_tx_desc(); - LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), KPC(tx_desc)); } else if (OB_ISNULL(buf = get_mem_pool().alloc(sizeof(ObNewRow)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory", K(sizeof(ObNewRow)), K(ret)); diff --git a/src/sql/session/ob_sql_session_info.h b/src/sql/session/ob_sql_session_info.h index bcd04b524..07cdb0e47 100644 --- a/src/sql/session/ob_sql_session_info.h +++ b/src/sql/session/ob_sql_session_info.h @@ -715,6 +715,7 @@ public: } } void set_restore_auto_commit() { restore_auto_commit_ = true; } + bool need_restore_auto_commit() const { return restore_auto_commit_; } void reset_show_warnings_buf() { show_warnings_buf_.reset(); } ObPrivSet get_user_priv_set() const { return user_priv_set_; } ObPrivSet get_db_priv_set() const { return db_priv_set_; }