[4.1][4.2] refine set autocommit in dblink trans

This commit is contained in:
jw-guo 2023-08-14 08:18:44 +00:00 committed by ob-robot
parent 3a6bc5bba3
commit 2defdd7296
3 changed files with 70 additions and 11 deletions

View File

@ -227,6 +227,12 @@ int ObVariableSetExecutor::execute(ObExecContext &ctx, ObVariableSetStmt &stmt)
LOG_WARN("fail to process session autocommit", K(ret), K(ret_ac));
if (OB_ERR_WRONG_VALUE_FOR_VAR == ret_ac) {
ret = ret_ac;
} else if (OB_OP_NOT_ALLOW == ret_ac) {
ret = ret_ac;
} else if (OB_TRANS_XA_ERR_COMMIT == ret_ac) {
ret = ret_ac;
} else if (OB_ERR_UNEXPECTED == ret_ac) {
ret = ret_ac;
}
} else {}
} else {}
@ -942,8 +948,45 @@ int ObVariableSetExecutor::process_session_autocommit_hook(ObExecContext &exec_c
LOG_USER_ERROR(OB_ERR_WRONG_VALUE_FOR_VAR, (int)strlen(OB_SV_AUTOCOMMIT), OB_SV_AUTOCOMMIT,
(int)strlen(autocommit_str), autocommit_str);
} else {
// in xa trans or dblink trans
if (in_trans && my_session->associated_xa()) {
const transaction::ObXATransID xid = my_session->get_xid();
transaction::ObTxDesc *tx_desc = my_session->get_tx_desc();
const transaction::ObGlobalTxType global_tx_type = tx_desc->get_global_tx_type(xid);
// not allow to set autocommit to on
if (false == orig_ac && 1 == autocommit) {
ret = OB_TRANS_XA_ERR_COMMIT;
LOG_WARN("not allow to set autocommit on in xa trans", K(ret), K(xid));
} else if (true == orig_ac && 1 == autocommit) {
if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) {
// do nothing
} else if (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type) {
// in dblink trans, this case is not posssible
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected case for dblink trans", K(ret), K(xid));
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected global trans type", K(ret), K(xid), K(global_tx_type));
}
} else {
// in xa trans
if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) {
LOG_INFO("set autocommit off in xa trans", K(ret), K(xid));
// in dblink trans
} else if (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type) {
if (my_session->need_restore_auto_commit()) {
ret = OB_OP_NOT_ALLOW;
LOG_WARN("not allow to set autocomit off", K(ret), K(xid));
} else {
LOG_INFO("set autocommit off in dblink trans", K(ret), K(xid));
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexpected global trans type", K(ret), K(xid), K(global_tx_type));
}
}
// skip commit txn if this is txn free route temporary node
if (false == orig_ac && true == in_trans && 1 == autocommit && !my_session->is_txn_free_route_temp()) {
} else if (false == orig_ac && true == in_trans && 1 == autocommit && !my_session->is_txn_free_route_temp()) {
if (OB_FAIL(ObSqlTransControl::implicit_end_trans(exec_ctx, false))) {
LOG_WARN("fail implicit commit trans", K(ret));
}

View File

@ -250,12 +250,32 @@ int ObResultSet::on_cmd_execute()
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid inner state", K(cmd_));
} else if (cmd_->cause_implicit_commit()) {
// not allow implicit commit in xa trans
if (my_session_.associated_xa()) {
ret = OB_TRANS_XA_ERR_COMMIT;
if (my_session_.is_in_transaction() && my_session_.associated_xa()) {
int tmp_ret = OB_SUCCESS;
transaction::ObTxDesc *tx_desc = my_session_.get_tx_desc();
const transaction::ObXATransID xid = my_session_.get_xid();
const transaction::ObGlobalTxType global_tx_type = tx_desc->get_global_tx_type(xid);
if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) {
// commit is not allowed in xa trans
ret = OB_TRANS_XA_ERR_COMMIT;
LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), K(xid), K(global_tx_type),
KPC(tx_desc));
} else if (transaction::ObGlobalTxType::DBLINK_TRANS == global_tx_type) {
transaction::ObTransID tx_id;
if (OB_FAIL(ObTMService::tm_commit(get_exec_context(), tx_id))) {
LOG_WARN("fail to do commit for dblink trans", K(ret), K(tx_id), K(xid),
K(global_tx_type));
}
my_session_.restore_auto_commit();
const bool force_disconnect = false;
if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = my_session_.get_dblink_context().clean_dblink_conn(force_disconnect)))) {
LOG_WARN("dblink transaction failed to release dblink connections", K(tmp_ret), K(tx_id), K(xid));
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected global trans type", K(ret), K(xid), K(global_tx_type), KPC(tx_desc));
}
get_exec_context().set_need_disconnect(false);
const transaction::ObTxDesc *tx_desc = my_session_.get_tx_desc();
LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), KPC(tx_desc));
} else {
// commit current open transaction, synchronously
if (OB_FAIL(ObSqlTransControl::implicit_end_trans(get_exec_context(), false))) {
@ -1089,11 +1109,6 @@ int ObResultSet::init_cmd_exec_context(ObExecContext &exec_ctx)
ret = OB_NOT_INIT;
LOG_WARN("cmd or ctx is NULL", K(ret), K(cmd_), K(plan_ctx));
ret = OB_ERR_UNEXPECTED;
} else if ((ObStmt::is_ddl_stmt(stmt_type_, true) || ObStmt::is_dcl_stmt(stmt_type_))
&& stmt::T_VARIABLE_SET != stmt_type_ && my_session_.associated_xa()) {
ret = OB_TRANS_XA_ERR_COMMIT;
const transaction::ObTxDesc *tx_desc = my_session_.get_tx_desc();
LOG_WARN("COMMIT is not allowed in a xa trans", K(ret), KPC(tx_desc));
} else if (OB_ISNULL(buf = get_mem_pool().alloc(sizeof(ObNewRow)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc memory", K(sizeof(ObNewRow)), K(ret));

View File

@ -715,6 +715,7 @@ public:
}
}
void set_restore_auto_commit() { restore_auto_commit_ = true; }
bool need_restore_auto_commit() const { return restore_auto_commit_; }
void reset_show_warnings_buf() { show_warnings_buf_.reset(); }
ObPrivSet get_user_priv_set() const { return user_priv_set_; }
ObPrivSet get_db_priv_set() const { return db_priv_set_; }