From d8b89ceea53c21aa481f70a4679d31ee6345a993 Mon Sep 17 00:00:00 2001 From: xue_meng_en <1836611252@qq.com> Date: Sat, 16 Apr 2022 16:27:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8F=91=E5=B8=83=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E6=AD=BB=E9=94=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gausskernel/process/tcop/postgres.cpp | 3 +++ .../storage/replication/logical/worker.cpp | 26 +++++++++---------- src/include/replication/logicalworker.h | 1 + 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index cfe867e63..020659a7b 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -6114,6 +6114,9 @@ void ProcessInterrupts(void) /* The logical replication launcher can be stopped at any time. */ proc_exit(0); + } else if (IsLogicalWorker()) { + ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating logical replication worker due to administrator command"))); #endif } else if (IsTxnSnapCapturerProcess()) { ereport(FATAL, diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp index f91d90ef3..4bbfc62b6 100644 --- a/src/gausskernel/storage/replication/logical/worker.cpp +++ b/src/gausskernel/storage/replication/logical/worker.cpp @@ -121,18 +121,6 @@ static void LogicalrepWorkerSighub(SIGNAL_ARGS) t_thrd.applyworker_cxt.got_SIGHUP = true; } -/* SIGTERM: time to die */ -static void LogicalrepWorkerSigterm(SIGNAL_ARGS) -{ - int saveErrno = errno; - - t_thrd.applyworker_cxt.got_SIGTERM = true; - if (t_thrd.proc) - SetLatch(&t_thrd.proc->procLatch); - - errno = saveErrno; -} - /* * Make sure that we started local transaction. * @@ -1068,13 +1056,15 @@ static void ApplyLoop(void) /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); - while (!t_thrd.applyworker_cxt.got_SIGTERM) { + for (;;) { MemoryContextSwitchTo(t_thrd.applyworker_cxt.messageContext); int len; char *buf = NULL; unsigned char type; + CHECK_FOR_INTERRUPTS(); + /* Wait a while for data to arrive */ if ((WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { StringInfoData s; @@ -1371,7 +1361,7 @@ void ApplyWorkerMain() */ gspqsignal(SIGHUP, LogicalrepWorkerSighub); gspqsignal(SIGINT, StatementCancelHandler); - gspqsignal(SIGTERM, LogicalrepWorkerSigterm); + gspqsignal(SIGTERM, die); gspqsignal(SIGQUIT, quickdie); gspqsignal(SIGALRM, handle_sig_alarm); @@ -1727,3 +1717,11 @@ static void UpdateConninfo(char* standbysInfo) ereport(LOG, (errmsg("Update conninfo successfully, new conninfo %s.", standbysInfo))); } + +/* + * Is current process a logical replication worker? + */ +bool IsLogicalWorker(void) +{ + return t_thrd.applyworker_cxt.curWorker != NULL; +} diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index ad8509d9b..0ef5135e6 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -13,5 +13,6 @@ #define LOGICALWORKER_H extern void ApplyWorkerMain(); +extern bool IsLogicalWorker(void); #endif /* LOGICALWORKER_H */