diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index cfe867e63..020659a7b 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -6114,6 +6114,9 @@ void ProcessInterrupts(void) /* The logical replication launcher can be stopped at any time. */ proc_exit(0); + } else if (IsLogicalWorker()) { + ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating logical replication worker due to administrator command"))); #endif } else if (IsTxnSnapCapturerProcess()) { ereport(FATAL, diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp index f91d90ef3..4bbfc62b6 100644 --- a/src/gausskernel/storage/replication/logical/worker.cpp +++ b/src/gausskernel/storage/replication/logical/worker.cpp @@ -121,18 +121,6 @@ static void LogicalrepWorkerSighub(SIGNAL_ARGS) t_thrd.applyworker_cxt.got_SIGHUP = true; } -/* SIGTERM: time to die */ -static void LogicalrepWorkerSigterm(SIGNAL_ARGS) -{ - int saveErrno = errno; - - t_thrd.applyworker_cxt.got_SIGTERM = true; - if (t_thrd.proc) - SetLatch(&t_thrd.proc->procLatch); - - errno = saveErrno; -} - /* * Make sure that we started local transaction. * @@ -1068,13 +1056,15 @@ static void ApplyLoop(void) /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); - while (!t_thrd.applyworker_cxt.got_SIGTERM) { + for (;;) { MemoryContextSwitchTo(t_thrd.applyworker_cxt.messageContext); int len; char *buf = NULL; unsigned char type; + CHECK_FOR_INTERRUPTS(); + /* Wait a while for data to arrive */ if ((WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { StringInfoData s; @@ -1371,7 +1361,7 @@ void ApplyWorkerMain() */ gspqsignal(SIGHUP, LogicalrepWorkerSighub); gspqsignal(SIGINT, StatementCancelHandler); - gspqsignal(SIGTERM, LogicalrepWorkerSigterm); + gspqsignal(SIGTERM, die); gspqsignal(SIGQUIT, quickdie); gspqsignal(SIGALRM, handle_sig_alarm); @@ -1727,3 +1717,11 @@ static void UpdateConninfo(char* standbysInfo) ereport(LOG, (errmsg("Update conninfo successfully, new conninfo %s.", standbysInfo))); } + +/* + * Is current process a logical replication worker? + */ +bool IsLogicalWorker(void) +{ + return t_thrd.applyworker_cxt.curWorker != NULL; +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index ad8509d9b..0ef5135e6 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -13,5 +13,6 @@ #define LOGICALWORKER_H extern void ApplyWorkerMain(); +extern bool IsLogicalWorker(void); #endif /* LOGICALWORKER_H */