diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index b573e917c..277e9e491 100644 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -476,6 +476,7 @@ void WalReceiverMain(void) case WALRCV_STOPPED: SpinLockRelease(&walrcv->mutex); ereport(WARNING, (errmsg("walreceiver requested to stop when starting up."))); + KillWalRcvWriter(); proc_exit(1); break; @@ -755,44 +756,19 @@ static bool WalRecCheckTimeOut(TimestampTz nowtime, TimestampTz last_recv_timest } /* - * Mark us as STOPPED in shared memory at exit. + * Mark us as STOPPED in proc at exit. */ static void WalRcvDie(int code, Datum arg) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; - ThreadId writerPid; - int i = 1; /* * Shutdown WalRcvWriter thread, clear the data receive buffer. * Ensure that all WAL records received are flushed to disk. */ - SpinLockAcquire(&walrcv->mutex); - writerPid = walrcv->writerPid; - SpinLockRelease(&walrcv->mutex); - - if (writerPid != 0) - (void)gs_signal_send(writerPid, SIGTERM); - - ereport(LOG, (errmsg("waiting walrcvwriter: %lu terminate", writerPid))); - - while (writerPid) { - pg_usleep(10000L); // sleep 0.01s - - SpinLockAcquire(&walrcv->mutex); - writerPid = walrcv->writerPid; - SpinLockRelease(&walrcv->mutex); - - if ((writerPid != 0) && (i % 2000 == 0)) { - if (gs_signal_send(writerPid, SIGTERM) != 0) { - ereport(WARNING, (errmsg("walrcvwriter:%lu may be terminated", writerPid))); - break; - } - i = 1; - } - i++; - } + KillWalRcvWriter(); + /* we have to set REDO_FINISH_STATUS_LOCAL to false here, or there will be problems in this case: extremRTO is on, and DN received force finish signal, if cleanup is blocked, the force finish signal will be ignored! diff --git a/src/gausskernel/storage/replication/walreceiverfuncs.cpp b/src/gausskernel/storage/replication/walreceiverfuncs.cpp index 78672a4b7..66b5f1be5 100644 --- a/src/gausskernel/storage/replication/walreceiverfuncs.cpp +++ b/src/gausskernel/storage/replication/walreceiverfuncs.cpp @@ -330,6 +330,37 @@ static void set_rcv_slot_name(const char *slotname) return; } +void KillWalRcvWriter(void) +{ + volatile WalRcvData *walRcv = t_thrd.walreceiverfuncs_cxt.WalRcv; + ThreadId writerPid; + int i = 1; + /* + * Shutdown WalRcvWriter thread. + */ + SpinLockAcquire(&walRcv->mutex); + writerPid = walRcv->writerPid; + SpinLockRelease(&walRcv->mutex); + if (writerPid != 0) { + (void)gs_signal_send(writerPid, SIGTERM); + } + ereport(LOG, (errmsg("waiting walrcvwriter: %lu terminate", writerPid))); + while (writerPid) { + pg_usleep(10000L); /* sleep 0.01s */ + SpinLockAcquire(&walRcv->mutex); + writerPid = walRcv->writerPid; + SpinLockRelease(&walRcv->mutex); + if ((writerPid != 0) && (i % 2000 == 0)) { + if (gs_signal_send(writerPid, SIGTERM) != 0) { + ereport(WARNING, (errmsg("walrcvwriter:%lu may be terminated", writerPid))); + break; + } + i = 1; + } + i++; + } +} + /* * Stop walreceiver (if running) and wait for it to die. * Executed by the Startup process. diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 23a89f0fa..f196475a8 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -223,6 +223,7 @@ extern void WSDataRcvCheck(char* data_buf, Size nbytes); /* prototypes for functions in walreceiverfuncs.c */ extern Size WalRcvShmemSize(void); extern void WalRcvShmemInit(void); +extern void KillWalRcvWriter(void); extern void ShutdownWalRcv(void); extern bool WalRcvInProgress(void); extern void connect_dn_str(char* conninfo, int replIndex);