diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 0506285ec..76662c334 100644 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -143,6 +143,11 @@ static const int SHIFT_SPEED = 3; */ static volatile sig_atomic_t replication_active = false; +typedef struct { + bool replicationStarted; + bool messageReceiveNoTimeout; +} ReplicationCxt; + /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndShutdownHandler(SIGNAL_ARGS); @@ -150,8 +155,8 @@ static void WalSndQuickDieHandler(SIGNAL_ARGS); static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); -static void IdentifyCommand(Node* cmd_node, bool* replication_started, const char *cmd_string); -static bool HandleWalReplicationCommand(const char *cmd_string); +static void IdentifyCommand(Node* cmd_node, ReplicationCxt* repCxt, const char *cmd_string); +static void HandleWalReplicationCommand(const char *cmd_string, ReplicationCxt* repCxt); typedef void (*WalSndSendDataCallback)(void); static int WalSndLoop(WalSndSendDataCallback send_data); static void InitWalSnd(void); @@ -387,7 +392,12 @@ void CheckPMstateAndRecoveryInProgress(void) static void WalSndHandshake(void) { StringInfoData input_message; - bool replication_started = false; + ReplicationCxt repCxt; + + int rc; + rc = memset_s(&repCxt, sizeof(ReplicationCxt), 0, sizeof(ReplicationCxt)); + securec_check(rc, "\0", "\0"); + int sleeptime = 0; int timeout = 0; volatile WalSnd *walsnd = t_thrd.walsender_cxt.MyWalSnd; @@ -399,7 +409,7 @@ static void WalSndHandshake(void) else timeout = u_sess->attr.attr_storage.wal_sender_timeout; - while (!replication_started) { + while (!repCxt.replicationStarted) { int firstchar; WalSndSetState(WALSNDSTATE_STARTUP); @@ -468,8 +478,10 @@ static void WalSndHandshake(void) query_string = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); - if (HandleWalReplicationCommand(query_string)) { - replication_started = true; + HandleWalReplicationCommand(query_string, &repCxt); + + if (repCxt.messageReceiveNoTimeout) { + timeout = 0; } } break; @@ -1840,7 +1852,7 @@ static bool cmdStringLengthCheck(const char* cmd_string) return true; } -static void IdentifyCommand(Node* cmd_node, bool* replication_started, const char *cmd_string){ +static void IdentifyCommand(Node* cmd_node, ReplicationCxt* repCxt, const char *cmd_string){ switch (cmd_node->type) { case T_IdentifySystemCmd: IdentifySystem(); @@ -1902,7 +1914,7 @@ static void IdentifyCommand(Node* cmd_node, bool* replication_started, const cha if (cmd->kind == REPLICATION_KIND_PHYSICAL) { StartReplication(cmd); /* break out of the loop */ - *replication_started = true; + repCxt->replicationStarted = true; } else { #ifdef ENABLE_MULTIPLE_NODES CheckPMstateAndRecoveryInProgress(); @@ -1916,6 +1928,11 @@ static void IdentifyCommand(Node* cmd_node, bool* replication_started, const cha AdvanceReplicationCmd *cmd = (AdvanceReplicationCmd *)cmd_node; if (cmd->kind == REPLICATION_KIND_LOGICAL) { AdvanceLogicalReplication(cmd); + /* + * This connection is used to notify primary to advance logical replication slot, + * and we don't want it to time out and disconnect. + */ + repCxt->messageReceiveNoTimeout = true; } break; } @@ -1939,9 +1956,8 @@ static void IdentifyCommand(Node* cmd_node, bool* replication_started, const cha /* * Execute an incoming replication command. */ -static bool HandleWalReplicationCommand(const char *cmd_string) +static void HandleWalReplicationCommand(const char *cmd_string, ReplicationCxt* repCxt) { - bool replication_started = false; int parse_rc; Node *cmd_node = NULL; MemoryContext cmd_context; @@ -1981,13 +1997,11 @@ static bool HandleWalReplicationCommand(const char *cmd_string) cmd_node = t_thrd.replgram_cxt.replication_parse_result; - IdentifyCommand(cmd_node, &replication_started, cmd_string); + IdentifyCommand(cmd_node, repCxt, cmd_string); /* done */ (void)MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); - - return replication_started; } /*