From a4a0147952ca70fc6a0c4951f8057e9bb494a082 Mon Sep 17 00:00:00 2001 From: TotaJ Date: Mon, 14 Mar 2022 22:09:53 +0800 Subject: [PATCH] Fix wait too long when create some subscription at one time. --- .../process/threadpool/knl_thread.cpp | 2 +- .../storage/replication/logical/launcher.cpp | 150 ++++++++++-------- src/include/knl/knl_thread.h | 2 +- 3 files changed, 90 insertions(+), 64 deletions(-) diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 673d25104..4fccea596 100644 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1050,7 +1050,7 @@ static void knl_t_autovacuum_init(knl_t_autovacuum_context* autovacuum_cxt) static void KnlTApplyLauncherInit(knl_t_apply_launcher_context* applyLauncherCxt) { applyLauncherCxt->got_SIGHUP = false; - applyLauncherCxt->got_SIGUSR2 = false; + applyLauncherCxt->newWorkerRequest = false; applyLauncherCxt->got_SIGTERM = false; applyLauncherCxt->onCommitLauncherWakeup = false; applyLauncherCxt->applyLauncherShm = NULL; diff --git a/src/gausskernel/storage/replication/logical/launcher.cpp b/src/gausskernel/storage/replication/logical/launcher.cpp index 9303c9f08..2a531feea 100644 --- a/src/gausskernel/storage/replication/logical/launcher.cpp +++ b/src/gausskernel/storage/replication/logical/launcher.cpp @@ -59,6 +59,8 @@ static const int DEFAULT_NAPTIME_PER_CYCLE = 180000L; static const int wal_retrieve_retry_interval = 5000; static const int PG_STAT_GET_SUBSCRIPTION_COLS = 7; +static const int WAIT_SUB_WORKER_ATTACH_CYCLE = 50000L; /* 50ms */ +static const int WAIT_SUB_WORKER_ATTACH_TIMEOUT = 1000000L; /* 1s */ static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); @@ -175,6 +177,54 @@ static LogicalRepWorker *logicalrep_worker_find(Oid subid) return res; } +/* + * We can't start another apply worker when another one is still + * starting up (or failed while doing so), so just sleep for a bit + * more; that worker will wake us up again as soon as it's ready. + * We will only wait 1 seconds for this to happen however. Note that + * failure to connect to a particular database is not a problem here, + * because the worker removes itself from the startingWorker + * pointer before trying to connect. Problems detected by the + * The problems that may cause this code to fire are errors + * in the earlier sections of ApplyWorkerMain, before the worker + * removes the LogicalRepWorker from the startingWorker pointer. + */ +static void WaitForReplicationWorkerAttach() +{ + int timeout = WAIT_SUB_WORKER_ATTACH_TIMEOUT; + while (timeout > 0) { + CHECK_FOR_INTERRUPTS(); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker == NULL) { + /* worker has started, we are done */ + LWLockRelease(LogicalRepWorkerLock); + break; + } + + LWLockRelease(LogicalRepWorkerLock); + + pg_usleep(WAIT_SUB_WORKER_ATTACH_CYCLE); + timeout -= WAIT_SUB_WORKER_ATTACH_CYCLE; + } + + if (timeout <= 0) { + /* worker took too long time to start */ + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + LogicalRepWorker *worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker; + ereport(WARNING, (errmsg("Apply worker with sub id:%u took too long time to start, so canceled it", + worker->subid))); + worker->dbid = InvalidOid; + worker->userid = InvalidOid; + worker->subid = InvalidOid; + worker->proc = NULL; + worker->workerLaunchTime = 0; + t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker = NULL; + LWLockRelease(LogicalRepWorkerLock); + } +} + + /* * Start new apply background worker. */ @@ -226,6 +276,7 @@ static void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, O LWLockRelease(LogicalRepWorkerLock); SendPostmasterSignal(PMSIGNAL_START_APPLY_WORKER); + WaitForReplicationWorkerAttach(); } /* @@ -380,7 +431,9 @@ static void logicalrep_worker_onexit(int code, Datum arg) { (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); logicalrep_worker_detach(); - ApplyLauncherWakeup(); + if (t_thrd.applylauncher_cxt.applyLauncherShm->applyLauncherPid != 0) { + gs_signal_send(t_thrd.applylauncher_cxt.applyLauncherShm->applyLauncherPid, SIGUSR1); + } } /* SIGHUP: set flag to reload configuration at next convenient time */ @@ -395,18 +448,35 @@ static void logicalrepLauncherSighub(SIGNAL_ARGS) errno = saveErrno; } -/* SIGUSR2: a worker is up and running, or just finished, or failed to fork */ +/* + * SIGUSR2: request to start a new worker, for CREATE/ALTER subscription. + * we will loop pg_subscription and launch worker immediately. + */ static void logicalrep_launcher_sigusr2(SIGNAL_ARGS) { int save_errno = errno; - t_thrd.applylauncher_cxt.got_SIGUSR2 = true; - if (t_thrd.proc) + t_thrd.applylauncher_cxt.newWorkerRequest = true; + if (t_thrd.proc) { SetLatch(&t_thrd.proc->procLatch); + } errno = save_errno; } +/* + * SIGUSR1: a worker just finished, or failed to start. + * set latch, but we may not try to launch worker immediately, cause we don't + * want a abnormal worker restart too fast to cause too many error log. + */ +static void LogicalrepLauncherSigusr1(SIGNAL_ARGS) +{ + int save_errno = errno; + if (t_thrd.proc) { + SetLatch(&t_thrd.proc->procLatch); + } + errno = save_errno; +} /* * ApplyLauncherShmemSize @@ -483,7 +553,6 @@ void ApplyLauncherMain() sigjmp_buf localSigjmpBuf; TimestampTz last_start_time = 0; char username[NAMEDATALEN]; - int nextLaunchSub = 0; /* we are a postmaster subprocess now */ IsUnderPostmaster = true; @@ -525,7 +594,7 @@ void ApplyLauncherMain() gspqsignal(SIGALRM, handle_sig_alarm); gspqsignal(SIGPIPE, SIG_IGN); - gspqsignal(SIGUSR1, procsignal_sigusr1_handler); + gspqsignal(SIGUSR1, LogicalrepLauncherSigusr1); gspqsignal(SIGUSR2, logicalrep_launcher_sigusr2); gspqsignal(SIGFPE, FloatExceptionHandler); gspqsignal(SIGCHLD, SIG_DFL); @@ -588,60 +657,19 @@ void ApplyLauncherMain() MemoryContext oldctx; TimestampTz now; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; - bool canLaunch = true; CHECK_FOR_INTERRUPTS(); now = GetCurrentTimestamp(); - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker != NULL) { - LogicalRepWorker *worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker; - int waitTime = 1 * 1000; - - /* - * We can't start another apply worker when another one is still - * starting up (or failed while doing so), so just sleep for a bit - * more; that worker will wake us up again as soon as it's ready. - * We will only wait 1 seconds (up to a maximum - * of 60 seconds) for this to happen however. Note that failure - * to connect to a particular database is not a problem here, - * because the worker removes itself from the startingWorker - * pointer before trying to connect. Problems detected by the - * postmaster (like fork() failure) are also reported and handled - * differently. The only problems that may cause this code to - * fire are errors in the earlier sections of ApplyWorkerMain, - * before the worker removes the LogicalRepWorker from the - * startingWorker pointer. - */ - if (TimestampDifferenceExceeds(worker->workerLaunchTime, now, waitTime)) { - LWLockRelease(LogicalRepWorkerLock); - LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - /* - * No other process can put a worker in starting mode, so if - * startingWorker is still INVALID after exchanging our lock, - * we assume it's the same one we saw above (so we don't - * recheck the launch time). - */ - if (t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker != NULL) { - worker = t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker; - ereport(WARNING, (errmsg("Apply worker with sub id:%u took too long " - "time to start, so canceled it", - worker->subid))); - worker->dbid = InvalidOid; - worker->userid = InvalidOid; - worker->subid = InvalidOid; - worker->proc = NULL; - worker->workerLaunchTime = 0; - t_thrd.applylauncher_cxt.applyLauncherShm->startingWorker = NULL; - } - } else { - canLaunch = false; + /* + * Limit the start retry to once a wal_retrieve_retry_interval, but if it's a request from + * CREATE/ALTER subscription, we will try to launch worker immediately. + */ + if (t_thrd.applylauncher_cxt.newWorkerRequest || + TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) { + if (t_thrd.applylauncher_cxt.newWorkerRequest) { + t_thrd.applylauncher_cxt.newWorkerRequest = false; } - } - LWLockRelease(LogicalRepWorkerLock); /* either shared or exclusive */ - - /* Limit the start retry to once a wal_retrieve_retry_interval */ - if (canLaunch && TimestampDifferenceExceeds(last_start_time, now, wal_retrieve_retry_interval)) { /* Use temporary context for the database list and worker info. */ subctx = AllocSetContextCreate(TopMemoryContext, "Logical Replication Launcher sublist", ALLOCSET_DEFAULT_SIZES); @@ -671,14 +699,12 @@ void ApplyLauncherMain() } /* - * Try to launch the subscription worker one by one, to avoid launch an abnormal - * subscription again and again and normal subscription become starve. + * Try to launch the subscription worker one by one, we will wait at the end of + * logicalrep_worker_launch, to make sure in the next loop, the previous worker + * has started or aborted definitely. */ - if (pendingSubList != NIL) { - nextLaunchSub = nextLaunchSub % list_length(pendingSubList); - Subscription *readyToLaunchSub = (Subscription*)list_nth(pendingSubList, nextLaunchSub); - nextLaunchSub++; - + foreach(lc, pendingSubList) { + Subscription *readyToLaunchSub = (Subscription*)lfirst(lc); logicalrep_worker_launch(readyToLaunchSub->dbid, readyToLaunchSub->oid, readyToLaunchSub->name, readyToLaunchSub->owner); last_start_time = now; diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 58e5847c2..d53a8e406 100644 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -3205,7 +3205,7 @@ typedef struct knl_t_lsc_context { typedef struct knl_t_apply_launcher_context { /* Flags set by signal handlers */ volatile sig_atomic_t got_SIGHUP; - volatile sig_atomic_t got_SIGUSR2; + volatile sig_atomic_t newWorkerRequest; volatile sig_atomic_t got_SIGTERM; bool onCommitLauncherWakeup; ApplyLauncherShmStruct *applyLauncherShm;