From d466bedceacfa648f7cbb54fa8600be4831ae82f Mon Sep 17 00:00:00 2001 From: cchen676 Date: Sat, 10 Aug 2024 17:42:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=A4=87=E6=9C=BA=E5=BC=80?= =?UTF-8?q?=E5=90=AF=E5=86=99=E8=BD=AC=E5=8F=91=E5=90=8E=E7=9A=84=E4=B8=80?= =?UTF-8?q?=E4=BA=9B=E4=B9=8B=E5=89=8D=E8=AE=BE=E8=AE=A1=E6=97=B6=E6=B2=A1?= =?UTF-8?q?=E8=80=83=E8=99=91=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/catalog/pg_proc.cpp | 2 +- .../backend/client_logic/client_logic.cpp | 10 ++++----- src/common/backend/parser/parse_utilcmd.cpp | 2 +- src/common/pl/plpgsql/src/pl_exec.cpp | 5 +++++ .../optimizer/commands/prepare.cpp | 10 +++++++++ src/gausskernel/optimizer/plan/planner.cpp | 6 ++++++ .../storage/replication/libpqsw.cpp | 21 ++++++++++++++----- src/include/replication/libpqsw.h | 13 +++++++++++- 8 files changed, 56 insertions(+), 13 deletions(-) diff --git a/src/common/backend/catalog/pg_proc.cpp b/src/common/backend/catalog/pg_proc.cpp index 946275a75..f748e3dcb 100644 --- a/src/common/backend/catalog/pg_proc.cpp +++ b/src/common/backend/catalog/pg_proc.cpp @@ -1234,7 +1234,7 @@ ObjectAddress ProcedureCreate(const char* procedureName, Oid procNamespace, Oid * But when we are in inplace-upgrade, we can create function with polymorphic return type */ if (!u_sess->attr.attr_common.enable_full_encryption && !u_sess->attr.attr_common.IsInplaceUpgrade && - (fullEncryptedInParam || fullEncryptedOutParam || is_enc_type(returnType))) { + (fullEncryptedInParam || fullEncryptedOutParam || is_enc_type(returnType)) && t_thrd.role != SW_SENDER) { ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION), errmsg("cannot create function"), errdetail("function does not support full encrypted type parameter when client encryption is disabled."))); } diff --git a/src/common/backend/client_logic/client_logic.cpp b/src/common/backend/client_logic/client_logic.cpp index 631ba9c24..97a940246 100755 --- a/src/common/backend/client_logic/client_logic.cpp +++ b/src/common/backend/client_logic/client_logic.cpp @@ -289,7 +289,7 @@ void insert_gs_sec_encrypted_column_tuple(CeHeapInfo *ce_heap_info, Relation rel #ifdef ENABLE_MULTIPLE_NODES if (IS_MAIN_COORDINATOR && !u_sess->attr.attr_common.enable_full_encryption) { #else - if (!u_sess->attr.attr_common.enable_full_encryption) { + if (!u_sess->attr.attr_common.enable_full_encryption && t_thrd.role != SW_SENDER) { #endif ereport(ERROR, (errcode(ERRCODE_OPERATE_NOT_SUPPORTED), @@ -633,7 +633,7 @@ static bool process_column_settings_flush_args(Oid column_key_id, const char *co #ifdef ENABLE_MULTIPLE_NODES if (IS_MAIN_COORDINATOR && !u_sess->attr.attr_common.enable_full_encryption) { #else - if (!u_sess->attr.attr_common.enable_full_encryption) { + if (!u_sess->attr.attr_common.enable_full_encryption && t_thrd.role != SW_SENDER) { #endif ereport(ERROR, (errcode(ERRCODE_OPERATE_NOT_SUPPORTED), @@ -894,7 +894,7 @@ int drop_global_settings(DropStmt *stmt) #ifdef ENABLE_MULTIPLE_NODES if (IS_MAIN_COORDINATOR && !u_sess->attr.attr_common.enable_full_encryption) { #else - if (!u_sess->attr.attr_common.enable_full_encryption) { + if (!u_sess->attr.attr_common.enable_full_encryption && t_thrd.role != SW_SENDER) { #endif ereport(ERROR, (errcode(ERRCODE_OPERATE_NOT_SUPPORTED), @@ -961,7 +961,7 @@ int drop_column_settings(DropStmt *stmt) #ifdef ENABLE_MULTIPLE_NODES if (IS_MAIN_COORDINATOR && !u_sess->attr.attr_common.enable_full_encryption) { #else - if (!u_sess->attr.attr_common.enable_full_encryption) { + if (!u_sess->attr.attr_common.enable_full_encryption && t_thrd.role != SW_SENDER) { #endif ereport(ERROR, (errcode(ERRCODE_OPERATE_NOT_SUPPORTED), @@ -1065,7 +1065,7 @@ void remove_encrypted_col_by_id(Oid id) #ifdef ENABLE_MULTIPLE_NODES if (IS_MAIN_COORDINATOR && !u_sess->attr.attr_common.enable_full_encryption) { #else - if (!u_sess->attr.attr_common.enable_full_encryption) { + if (!u_sess->attr.attr_common.enable_full_encryption && t_thrd.role != SW_SENDER) { #endif ereport(ERROR, (errcode(ERRCODE_OPERATE_NOT_SUPPORTED), diff --git a/src/common/backend/parser/parse_utilcmd.cpp b/src/common/backend/parser/parse_utilcmd.cpp index 773f66ec6..fd47ffa33 100644 --- a/src/common/backend/parser/parse_utilcmd.cpp +++ b/src/common/backend/parser/parse_utilcmd.cpp @@ -1324,7 +1324,7 @@ static void transformColumnDefinition(CreateStmtContext* cxt, ColumnDef* column, #ifdef ENABLE_MULTIPLE_NODES if (IS_MAIN_COORDINATOR && !u_sess->attr.attr_common.enable_full_encryption) { #else - if (!u_sess->attr.attr_common.enable_full_encryption) { + if (!u_sess->attr.attr_common.enable_full_encryption && t_thrd.role != SW_SENDER) { #endif ereport(ERROR, (errcode(ERRCODE_OPERATE_NOT_SUPPORTED), diff --git a/src/common/pl/plpgsql/src/pl_exec.cpp b/src/common/pl/plpgsql/src/pl_exec.cpp index c4acf9807..9a25a826a 100644 --- a/src/common/pl/plpgsql/src/pl_exec.cpp +++ b/src/common/pl/plpgsql/src/pl_exec.cpp @@ -1317,6 +1317,11 @@ Datum plpgsql_exec_autonm_function(PLpgSQL_function* func, } #endif + if (SS_STANDBY_MODE) { + ereport(ERROR, + (errmodule(MOD_PLSQL), errmsg("SS Standby node does not support invoking autonomous transactions."))); + } + #ifndef ENABLE_MULTIPLE_NODES if (plcallstack.prev != NULL && u_sess->attr.attr_sql.sql_compatibility == A_FORMAT && COMPAT_CURSOR) { PLpgSQL_execstate* estate_tmp = (PLpgSQL_execstate*)(plcallstack.prev->elem); diff --git a/src/gausskernel/optimizer/commands/prepare.cpp b/src/gausskernel/optimizer/commands/prepare.cpp index cf965ed58..d9e173997 100755 --- a/src/gausskernel/optimizer/commands/prepare.cpp +++ b/src/gausskernel/optimizer/commands/prepare.cpp @@ -333,6 +333,11 @@ void PrepareQuery(PrepareStmt* stmt, const char* queryString) */ StorePreparedStatement(stmt->name, plansource, true); + if (ENABLE_REMOTE_EXECUTE) { + const char* commandTag = CreateCommandTag(stmt->query); + (void)libpqsw_process_query_message(commandTag, query_list, queryString, false, false); + } + #ifdef ENABLE_MOT // Try MOT JIT code generation only after the plan source is saved. if ((plansource->storageEngineType == SE_TYPE_MOT || plansource->storageEngineType == SE_TYPE_UNSPECIFIED) && @@ -381,6 +386,11 @@ void ExecuteQuery(ExecuteStmt* stmt, IntoClause* intoClause, const char* querySt ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("EXECUTE does not support variable-result cached plans"))); + if (ENABLE_REMOTE_EXECUTE && + libpqsw_process_query_message(psrc->commandTag, psrc->query_list, queryString, false, false)) { + return; + } + /* Evaluate parameters, if any */ if (entry->plansource->num_params > 0) { /* diff --git a/src/gausskernel/optimizer/plan/planner.cpp b/src/gausskernel/optimizer/plan/planner.cpp index 7e8d3bcc8..781a7720c 100755 --- a/src/gausskernel/optimizer/plan/planner.cpp +++ b/src/gausskernel/optimizer/plan/planner.cpp @@ -443,6 +443,12 @@ bool queryIsReadOnly(Query* query) case CMD_INSERT: case CMD_DELETE: case CMD_MERGE: { + if (SS_STANDBY_MODE_WITH_REMOTE_EXECUTE && query->utilityStmt != NULL && + (query->utilityStmt->type == T_PrepareStmt || query->utilityStmt->type == T_ExecuteStmt || + query->utilityStmt->type == T_DeallocateStmt)) { + return true; + } + if (SS_STANDBY_MODE_WITH_REMOTE_EXECUTE && get_redirect_manager()->state.transaction) { get_redirect_manager()->ss_standby_state |= SS_STANDBY_REQ_WRITE_REDIRECT; } diff --git a/src/gausskernel/storage/replication/libpqsw.cpp b/src/gausskernel/storage/replication/libpqsw.cpp index f5b39aa14..a5da8c317 100644 --- a/src/gausskernel/storage/replication/libpqsw.cpp +++ b/src/gausskernel/storage/replication/libpqsw.cpp @@ -189,6 +189,7 @@ void RedirectMessageManager::push_message(int qtype, StringInfo msg, bool need_s last_message = qtype; copyStringInfo(cur_msg->pbe_stack_msgs[cur_msg->cur_pos], msg); cur_msg->cur_pos ++; + libpqsw_trace("[PUSHED MSG] %c: msg:%s, pos:%d", qtype, msg->data, cur_msg->cur_pos - 1); MemoryContextSwitchTo(old); } @@ -590,6 +591,11 @@ void libpqsw_set_set_command(bool set_command) get_redirect_manager()->state.set_command = set_command; } +static bool libpqsw_prepare_command(const char* commandTag) +{ + return commandTag != NULL && (strcmp(commandTag, "PREPARE") == 0 || strcmp(commandTag, "DEALLOCATE") == 0); +} + /* * wrapper remote excute for extend query (PBE) */ @@ -924,7 +930,8 @@ static bool libpqsw_need_localexec_forSimpleQuery(const char *commandTag, List * redirect_manager->ss_standby_state |= SS_STANDBY_REQ_SAVEPOINT; } else if (query_list != NIL) { /* Don't support DDL with in transaction */ - if (set_command_type_by_commandTag(commandTag) == CMD_DDL || libpqsw_special_command(commandTag)) { + if ((set_command_type_by_commandTag(commandTag) == CMD_DDL && !libpqsw_prepare_command(commandTag)) || + libpqsw_special_command(commandTag)) { if (libpqsw_fetch_command(commandTag)) { get_redirect_manager()->ss_standby_state |= SS_STANDBY_REQ_WRITE_REDIRECT; return ret; @@ -1068,10 +1075,14 @@ bool libpqsw_process_message(int qtype, StringInfo msg) ready_to_excute = redirect_manager->push_message(qtype, msg, false, RT_NORMAL); if (ready_to_excute) { - libpqsw_inner_excute_pbe(true, true); - libpqsw_set_batch(false); - libpqsw_set_redirect(false); - libpqsw_set_set_command(false); + if (qtype != 'S') { + libpqsw_inner_excute_pbe(false, false); + } else if (qtype == 'S') { + libpqsw_inner_excute_pbe(true, true); + libpqsw_set_batch(false); + libpqsw_set_redirect(false); + libpqsw_set_set_command(false); + } } /* for begin in pbe and in trxn */ diff --git a/src/include/replication/libpqsw.h b/src/include/replication/libpqsw.h index ea9530d49..7eb440216 100644 --- a/src/include/replication/libpqsw.h +++ b/src/include/replication/libpqsw.h @@ -131,9 +131,10 @@ typedef struct { } RedirectState; // the max len =(PBEPBEDS) == 8, 20 is enough -#define PBE_MESSAGE_STACK (20) +#define PBE_MESSAGE_STACK (25) #define PBE_MESSAGE_MERGE_ID (PBE_MESSAGE_STACK - 1) #define PBE_MAX_SET_BLOCK (10) +#define PBE_MESSAGE_MAX_CUR_FOR_PBE (PBE_MESSAGE_STACK - 6) enum RedirectType { RT_NORMAL, //transfer to standby RT_TXN_STATUS, @@ -201,6 +202,11 @@ public: return list_length(messages) == PBE_MAX_SET_BLOCK; } + int curpos_of_message() + { + return ((RedirectMessage *)llast(messages))->cur_pos; + } + // is pre last message S or Q bool pre_last_message() { @@ -278,6 +284,11 @@ public: if (qtype == 'S' || qtype == 'Q') { return state.already_connected || messages_manager.lots_of_message(); } + + if (qtype == 'E' && messages_manager.curpos_of_message() > PBE_MESSAGE_MAX_CUR_FOR_PBE) { + return true; + } + return false; }