From f11a591adaf5ce1613596514bd804d0d8270f89a Mon Sep 17 00:00:00 2001 From: "arcoalien@qq.com" Date: Mon, 10 Jul 2023 15:18:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=86=99=E8=BD=AC=E5=8F=91?= =?UTF-8?q?=E5=BC=80=E5=90=AF=E5=90=8E=E6=89=A7=E8=A1=8C=E5=87=BD=E6=95=B0?= =?UTF-8?q?=E5=86=85=E9=83=A8=E6=9C=89DDL=E5=A4=87=E6=9C=BA=E6=B2=A1?= =?UTF-8?q?=E6=9C=89=E6=8B=A6=E6=88=AA=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gausskernel/runtime/executor/spi.cpp | 8 ++++++++ .../storage/access/transam/xact.cpp | 4 ++++ .../storage/replication/libpqsw.cpp | 20 +++++++++++++++++-- src/include/replication/libpqsw.h | 1 + 4 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/gausskernel/runtime/executor/spi.cpp b/src/gausskernel/runtime/executor/spi.cpp index 68e7bd010..6c76256e8 100644 --- a/src/gausskernel/runtime/executor/spi.cpp +++ b/src/gausskernel/runtime/executor/spi.cpp @@ -47,6 +47,7 @@ #include "utils/elog.h" #include "commands/sqladvisor.h" #include "distributelayer/streamMain.h" +#include "replication/libpqsw.h" #ifdef ENABLE_MOT #include "storage/mot/jit_exec.h" @@ -2492,6 +2493,13 @@ void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan, parse_query_fun foreach (list_item, raw_parsetree_list) { Node *parsetree = (Node *)lfirst(list_item); CachedPlanSource *plansource = NULL; + +#ifndef ENABLE_MULTIPLE_NODES + if (g_instance.attr.attr_sql.enableRemoteExcute) { + libpqsw_check_ddl_on_primary(CreateCommandTag(parsetree)); + } +#endif + #ifdef ENABLE_MULTIPLE_NODES if (IS_PGXC_COORDINATOR && PointerIsValid(query_string_locationlist) && list_length(query_string_locationlist) > 1) { diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index bc2850766..852e0b3e9 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -4022,6 +4022,10 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback) } #endif + if (SS_STANDBY_MODE_WITH_REMOTE_EXECUTE) { + libpqsw_disconnect(); + } + s->savepointList = NULL; TwoPhaseCommit = false; diff --git a/src/gausskernel/storage/replication/libpqsw.cpp b/src/gausskernel/storage/replication/libpqsw.cpp index 6e238d7b6..8d138e48f 100644 --- a/src/gausskernel/storage/replication/libpqsw.cpp +++ b/src/gausskernel/storage/replication/libpqsw.cpp @@ -257,7 +257,9 @@ static bool libpqsw_receive(bool transfer = true) if (u_sess->attr.attr_common.enable_full_encryption && *(conn->inBuffer) == 'Z') { get_redirect_manager()->state.client_enable_ce = true; } else { - transfer_func(conn->inBuffer, conn->inEnd); + if (get_sw_cxt()->streamConn->xactStatus != PQTRANS_INERROR) { + transfer_func(conn->inBuffer, conn->inEnd); + } } } @@ -1054,6 +1056,12 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con libpqsw_inner_excute_pbe(true, true); } } + + if (get_sw_cxt()->streamConn->xactStatus == PQTRANS_INERROR) { + libpqsw_disconnect(); + ereport(ERROR, (errmsg("The primary node report error when last request was transferred to it!"))); + } + // because we are not skip Q message process, so send_ready_for_query will be true after transfer. // but after transter, master will send Z message for front, so we not need to this flag. if (get_redirect_manager()->state.client_enable_ce || libpqsw_end_command(commandTag) || @@ -1114,7 +1122,7 @@ bool libpqsw_fetch_command(const char* commandTag) // is special commandTag need forbid redirect bool libpqsw_special_command(const char* commandTag) { - return commandTag != NULL && strcmp(commandTag, "LOCK TABLE") == 0; + return commandTag != NULL && (strcmp(commandTag, "LOCK TABLE") == 0 || strcmp(commandTag, "TRUNCATE TABLE") == 0); } // is special commandTag need forbid redirect @@ -1404,3 +1412,11 @@ bool libpqsw_send_pbe(const char* buffer, size_t buffer_size) conn->asyncStatus = PGASYNC_BUSY; return result; } + +void libpqsw_check_ddl_on_primary(const char* commandTag) +{ + if (commandTag != NULL && t_thrd.role == SW_SENDER && IsTransactionInProgressState() && + (set_command_type_by_commandTag(commandTag) == CMD_DDL || libpqsw_special_command(commandTag))) { + ereport(ERROR, (errmsg("The multi-write feature doesn't support DDL within transaction or function!"))); + } +} diff --git a/src/include/replication/libpqsw.h b/src/include/replication/libpqsw.h index a97362d52..b2b9bcb69 100644 --- a/src/include/replication/libpqsw.h +++ b/src/include/replication/libpqsw.h @@ -85,6 +85,7 @@ bool libpqsw_only_localrun(); void libpqsw_create_conn(); void libpqsw_trace_q_msg(const char* commandTag, const char* queryString); void libpqsw_disconnect(void); +void libpqsw_check_ddl_on_primary(const char* commandTag); #ifdef _cplusplus }