Fix wait too long when create some subscription at one time.
This commit is contained in:
@ -1050,7 +1050,7 @@ static void knl_t_autovacuum_init(knl_t_autovacuum_context* autovacuum_cxt)
|
||||
static void KnlTApplyLauncherInit(knl_t_apply_launcher_context* applyLauncherCxt)
|
||||
{
|
||||
applyLauncherCxt->got_SIGHUP = false;
|
||||
applyLauncherCxt->got_SIGUSR2 = false;
|
||||
applyLauncherCxt->newWorkerRequest = false;
|
||||
applyLauncherCxt->got_SIGTERM = false;
|
||||
applyLauncherCxt->onCommitLauncherWakeup = false;
|
||||
applyLauncherCxt->applyLauncherShm = NULL;
|
||||
|
@ -59,6 +59,8 @@ static const int DEFAULT_NAPTIME_PER_CYCLE = 180000L;
|
||||
|
||||
static const int wal_retrieve_retry_interval = 5000;
|
||||
static const int PG_STAT_GET_SUBSCRIPTION_COLS = 7;
|
||||
static const int WAIT_SUB_WORKER_ATTACH_CYCLE = 50000L; /* 50ms */
|
||||
static const int WAIT_SUB_WORKER_ATTACH_TIMEOUT = 1000000L; /* 1s */
|
||||
|
||||
static void ApplyLauncherWakeup(void);
|
||||
static void logicalrep_launcher_onexit(int code, Datum arg);
|
||||
@ -175,6 +177,54 @@ static LogicalRepWorker *logicalrep_worker_find(Oid subid)
|
||||
return res;
|
||||
}
|
||||
|
||||
/*
|
||||
* We can't start another apply worker when another one is still
|
||||
* starting up (or failed while doing so), so just sleep for a bit
|
||||
* more; that worker will wake us up again as soon as it's ready.
|
||||
* We will only wait 1 seconds for this to happen however. Note that
|
||||
* failure to connect to a particular database is not a problem here,
|
||||
* because the worker removes itself from the startingWorker
|
||||
* pointer before trying to connect. Problems detected by the
|
||||
* The problems that may cause this code to fire are errors
|
||||
* in the earlier sections of ApplyWorkerMain, before the worker
|
||||
* removes the LogicalRepWorker from the startingWorker pointer.
|
||||
*/
|
||||
static void WaitForReplicationWorkerAttach()
|
||||
{
|
||||
int timeout = WAIT_SUB_WORKER_ATTACH_TIMEOUT;
|
||||
while (timeout > 0) {
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
||||
|
||||
if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker == NULL) {
|
||||
/* worker has started, we are done */
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
break;
|
||||
}
|
||||
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
|
||||
pg_usleep(WAIT_SUB_WORKER_ATTACH_CYCLE);
|
||||
timeout -= WAIT_SUB_WORKER_ATTACH_CYCLE;
|
||||
}
|
||||
|
||||
if (timeout <= 0) {
|
||||
/* worker took too long time to start */
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
|
||||
LogicalRepWorker *worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker;
|
||||
ereport(WARNING, (errmsg("Apply worker with sub id:%u took too long time to start, so canceled it",
|
||||
worker->subid)));
|
||||
worker->dbid = InvalidOid;
|
||||
worker->userid = InvalidOid;
|
||||
worker->subid = InvalidOid;
|
||||
worker->proc = NULL;
|
||||
worker->workerLaunchTime = 0;
|
||||
t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker = NULL;
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Start new apply background worker.
|
||||
*/
|
||||
@ -226,6 +276,7 @@ static void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, O
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
|
||||
SendPostmasterSignal(PMSIGNAL_START_APPLY_WORKER);
|
||||
WaitForReplicationWorkerAttach();
|
||||
}
|
||||
|
||||
/*
|
||||
@ -380,7 +431,9 @@ static void logicalrep_worker_onexit(int code, Datum arg)
|
||||
{
|
||||
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
|
||||
logicalrep_worker_detach();
|
||||
ApplyLauncherWakeup();
|
||||
if (t_thrd.applylauncher_cxt.applyLauncherShm->applyLauncherPid != 0) {
|
||||
gs_signal_send(t_thrd.applylauncher_cxt.applyLauncherShm->applyLauncherPid, SIGUSR1);
|
||||
}
|
||||
}
|
||||
|
||||
/* SIGHUP: set flag to reload configuration at next convenient time */
|
||||
@ -395,18 +448,35 @@ static void logicalrepLauncherSighub(SIGNAL_ARGS)
|
||||
errno = saveErrno;
|
||||
}
|
||||
|
||||
/* SIGUSR2: a worker is up and running, or just finished, or failed to fork */
|
||||
/*
|
||||
* SIGUSR2: request to start a new worker, for CREATE/ALTER subscription.
|
||||
* we will loop pg_subscription and launch worker immediately.
|
||||
*/
|
||||
static void logicalrep_launcher_sigusr2(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
t_thrd.applylauncher_cxt.got_SIGUSR2 = true;
|
||||
if (t_thrd.proc)
|
||||
t_thrd.applylauncher_cxt.newWorkerRequest = true;
|
||||
if (t_thrd.proc) {
|
||||
SetLatch(&t_thrd.proc->procLatch);
|
||||
}
|
||||
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
/*
|
||||
* SIGUSR1: a worker just finished, or failed to start.
|
||||
* set latch, but we may not try to launch worker immediately, cause we don't
|
||||
* want a abnormal worker restart too fast to cause too many error log.
|
||||
*/
|
||||
static void LogicalrepLauncherSigusr1(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
if (t_thrd.proc) {
|
||||
SetLatch(&t_thrd.proc->procLatch);
|
||||
}
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
/*
|
||||
* ApplyLauncherShmemSize
|
||||
@ -483,7 +553,6 @@ void ApplyLauncherMain()
|
||||
sigjmp_buf localSigjmpBuf;
|
||||
TimestampTz last_start_time = 0;
|
||||
char username[NAMEDATALEN];
|
||||
int nextLaunchSub = 0;
|
||||
|
||||
/* we are a postmaster subprocess now */
|
||||
IsUnderPostmaster = true;
|
||||
@ -525,7 +594,7 @@ void ApplyLauncherMain()
|
||||
gspqsignal(SIGALRM, handle_sig_alarm);
|
||||
|
||||
gspqsignal(SIGPIPE, SIG_IGN);
|
||||
gspqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
gspqsignal(SIGUSR1, LogicalrepLauncherSigusr1);
|
||||
gspqsignal(SIGUSR2, logicalrep_launcher_sigusr2);
|
||||
gspqsignal(SIGFPE, FloatExceptionHandler);
|
||||
gspqsignal(SIGCHLD, SIG_DFL);
|
||||
@ -588,60 +657,19 @@ void ApplyLauncherMain()
|
||||
MemoryContext oldctx;
|
||||
TimestampTz now;
|
||||
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
|
||||
bool canLaunch = true;
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
now = GetCurrentTimestamp();
|
||||
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
||||
if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker != NULL) {
|
||||
LogicalRepWorker *worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker;
|
||||
int waitTime = 1 * 1000;
|
||||
|
||||
/*
|
||||
* We can't start another apply worker when another one is still
|
||||
* starting up (or failed while doing so), so just sleep for a bit
|
||||
* more; that worker will wake us up again as soon as it's ready.
|
||||
* We will only wait 1 seconds (up to a maximum
|
||||
* of 60 seconds) for this to happen however. Note that failure
|
||||
* to connect to a particular database is not a problem here,
|
||||
* because the worker removes itself from the startingWorker
|
||||
* pointer before trying to connect. Problems detected by the
|
||||
* postmaster (like fork() failure) are also reported and handled
|
||||
* differently. The only problems that may cause this code to
|
||||
* fire are errors in the earlier sections of ApplyWorkerMain,
|
||||
* before the worker removes the LogicalRepWorker from the
|
||||
* startingWorker pointer.
|
||||
* Limit the start retry to once a wal_retrieve_retry_interval, but if it's a request from
|
||||
* CREATE/ALTER subscription, we will try to launch worker immediately.
|
||||
*/
|
||||
if (TimestampDifferenceExceeds(worker->workerLaunchTime, now, waitTime)) {
|
||||
LWLockRelease(LogicalRepWorkerLock);
|
||||
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
|
||||
/*
|
||||
* No other process can put a worker in starting mode, so if
|
||||
* startingWorker is still INVALID after exchanging our lock,
|
||||
* we assume it's the same one we saw above (so we don't
|
||||
* recheck the launch time).
|
||||
*/
|
||||
if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker != NULL) {
|
||||
worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker;
|
||||
ereport(WARNING, (errmsg("Apply worker with sub id:%u took too long "
|
||||
"time to start, so canceled it",
|
||||
worker->subid)));
|
||||
worker->dbid = InvalidOid;
|
||||
worker->userid = InvalidOid;
|
||||
worker->subid = InvalidOid;
|
||||
worker->proc = NULL;
|
||||
worker->workerLaunchTime = 0;
|
||||
t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker = NULL;
|
||||
if (t_thrd.applylauncher_cxt.newWorkerRequest ||
|
||||
TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) {
|
||||
if (t_thrd.applylauncher_cxt.newWorkerRequest) {
|
||||
t_thrd.applylauncher_cxt.newWorkerRequest = false;
|
||||
}
|
||||
} else {
|
||||
canLaunch = false;
|
||||
}
|
||||
}
|
||||
LWLockRelease(LogicalRepWorkerLock); /* either shared or exclusive */
|
||||
|
||||
/* Limit the start retry to once a wal_retrieve_retry_interval */
|
||||
if (canLaunch && TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) {
|
||||
/* Use temporary context for the database list and worker info. */
|
||||
subctx = AllocSetContextCreate(TopMemoryContext, "Logical Replication Launcher sublist",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
@ -671,14 +699,12 @@ void ApplyLauncherMain()
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to launch the subscription worker one by one, to avoid launch an abnormal
|
||||
* subscription again and again and normal subscription become starve.
|
||||
* Try to launch the subscription worker one by one, we will wait at the end of
|
||||
* logicalrep_worker_launch, to make sure in the next loop, the previous worker
|
||||
* has started or aborted definitely.
|
||||
*/
|
||||
if (pendingSubList != NIL) {
|
||||
nextLaunchSub = nextLaunchSub % list_length(pendingSubList);
|
||||
Subscription *readyToLaunchSub = (Subscription*)list_nth(pendingSubList, nextLaunchSub);
|
||||
nextLaunchSub++;
|
||||
|
||||
foreach(lc, pendingSubList) {
|
||||
Subscription *readyToLaunchSub = (Subscription*)lfirst(lc);
|
||||
logicalrep_worker_launch(readyToLaunchSub->dbid, readyToLaunchSub->oid,
|
||||
readyToLaunchSub->name, readyToLaunchSub->owner);
|
||||
last_start_time = now;
|
||||
|
@ -3205,7 +3205,7 @@ typedef struct knl_t_lsc_context {
|
||||
typedef struct knl_t_apply_launcher_context {
|
||||
/* Flags set by signal handlers */
|
||||
volatile sig_atomic_t got_SIGHUP;
|
||||
volatile sig_atomic_t got_SIGUSR2;
|
||||
volatile sig_atomic_t newWorkerRequest;
|
||||
volatile sig_atomic_t got_SIGTERM;
|
||||
bool onCommitLauncherWakeup;
|
||||
ApplyLauncherShmStruct *applyLauncherShm;
|
||||
|
Reference in New Issue
Block a user