!1077 修复备机超大事物解码无法正常推进问题

Merge pull request !1077 from maxiang/master
This commit is contained in:
opengauss-bot
2021-07-12 09:34:53 +00:00
committed by Gitee

View File

@ -143,6 +143,11 @@ static const int SHIFT_SPEED = 3;
*/
static volatile sig_atomic_t replication_active = false;
typedef struct {
bool replicationStarted;
bool messageReceiveNoTimeout;
} ReplicationCxt;
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndShutdownHandler(SIGNAL_ARGS);
@ -150,8 +155,8 @@ static void WalSndQuickDieHandler(SIGNAL_ARGS);
static void WalSndXLogSendHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
static void IdentifyCommand(Node* cmd_node, bool* replication_started, const char *cmd_string);
static bool HandleWalReplicationCommand(const char *cmd_string);
static void IdentifyCommand(Node* cmd_node, ReplicationCxt* repCxt, const char *cmd_string);
static void HandleWalReplicationCommand(const char *cmd_string, ReplicationCxt* repCxt);
typedef void (*WalSndSendDataCallback)(void);
static int WalSndLoop(WalSndSendDataCallback send_data);
static void InitWalSnd(void);
@ -387,7 +392,12 @@ void CheckPMstateAndRecoveryInProgress(void)
static void WalSndHandshake(void)
{
StringInfoData input_message;
bool replication_started = false;
ReplicationCxt repCxt;
int rc;
rc = memset_s(&repCxt, sizeof(ReplicationCxt), 0, sizeof(ReplicationCxt));
securec_check(rc, "\0", "\0");
int sleeptime = 0;
int timeout = 0;
volatile WalSnd *walsnd = t_thrd.walsender_cxt.MyWalSnd;
@ -399,7 +409,7 @@ static void WalSndHandshake(void)
else
timeout = u_sess->attr.attr_storage.wal_sender_timeout;
while (!replication_started) {
while (!repCxt.replicationStarted) {
int firstchar;
WalSndSetState(WALSNDSTATE_STARTUP);
@ -468,8 +478,10 @@ static void WalSndHandshake(void)
query_string = pq_getmsgstring(&input_message);
pq_getmsgend(&input_message);
if (HandleWalReplicationCommand(query_string)) {
replication_started = true;
HandleWalReplicationCommand(query_string, &repCxt);
if (repCxt.messageReceiveNoTimeout) {
timeout = 0;
}
} break;
@ -1840,7 +1852,7 @@ static bool cmdStringLengthCheck(const char* cmd_string)
return true;
}
static void IdentifyCommand(Node* cmd_node, bool* replication_started, const char *cmd_string){
static void IdentifyCommand(Node* cmd_node, ReplicationCxt* repCxt, const char *cmd_string){
switch (cmd_node->type) {
case T_IdentifySystemCmd:
IdentifySystem();
@ -1902,7 +1914,7 @@ static void IdentifyCommand(Node* cmd_node, bool* replication_started, const cha
if (cmd->kind == REPLICATION_KIND_PHYSICAL) {
StartReplication(cmd);
/* break out of the loop */
*replication_started = true;
repCxt->replicationStarted = true;
} else {
#ifdef ENABLE_MULTIPLE_NODES
CheckPMstateAndRecoveryInProgress();
@ -1916,6 +1928,11 @@ static void IdentifyCommand(Node* cmd_node, bool* replication_started, const cha
AdvanceReplicationCmd *cmd = (AdvanceReplicationCmd *)cmd_node;
if (cmd->kind == REPLICATION_KIND_LOGICAL) {
AdvanceLogicalReplication(cmd);
/*
* This connection is used to notify primary to advance logical replication slot,
* and we don't want it to time out and disconnect.
*/
repCxt->messageReceiveNoTimeout = true;
}
break;
}
@ -1939,9 +1956,8 @@ static void IdentifyCommand(Node* cmd_node, bool* replication_started, const cha
/*
* Execute an incoming replication command.
*/
static bool HandleWalReplicationCommand(const char *cmd_string)
static void HandleWalReplicationCommand(const char *cmd_string, ReplicationCxt* repCxt)
{
bool replication_started = false;
int parse_rc;
Node *cmd_node = NULL;
MemoryContext cmd_context;
@ -1981,13 +1997,11 @@ static bool HandleWalReplicationCommand(const char *cmd_string)
cmd_node = t_thrd.replgram_cxt.replication_parse_result;
IdentifyCommand(cmd_node, &replication_started, cmd_string);
IdentifyCommand(cmd_node, repCxt, cmd_string);
/* done */
(void)MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
return replication_started;
}
/*