Enhance apply launcher when there are some abnormal subscription.

This commit is contained in:
TotaJ
2021-12-23 20:39:14 +08:00
parent 27bdb0d62b
commit f2af76de00

View File

@ -483,6 +483,7 @@ void ApplyLauncherMain()
sigjmp_buf localSigjmpBuf;
TimestampTz last_start_time = 0;
char username[NAMEDATALEN];
int nextLaunchSub = 0;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
@ -650,6 +651,7 @@ void ApplyLauncherMain()
sublist = get_subscription_list();
/* Start the missing workers for enabled subscriptions. */
List *pendingSubList = NIL;
foreach (lc, sublist) {
Subscription *sub = (Subscription *)lfirst(lc);
LogicalRepWorker *w;
@ -662,15 +664,27 @@ void ApplyLauncherMain()
w = logicalrep_worker_find(sub->oid);
LWLockRelease(LogicalRepWorkerLock);
/* Add to pending list if the subscription has no work attached */
if (w == NULL) {
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner);
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
/* Limit to one worker per mainloop cycle. */
break;
pendingSubList = lappend(pendingSubList, sub);
}
}
/*
* Try to launch the subscription worker one by one, to avoid launch an abnormal
* subscription again and again and normal subscription become starve.
*/
if (pendingSubList != NIL) {
nextLaunchSub = nextLaunchSub % list_length(pendingSubList);
Subscription *readyToLaunchSub = (Subscription*)list_nth(pendingSubList, nextLaunchSub);
nextLaunchSub++;
logicalrep_worker_launch(readyToLaunchSub->dbid, readyToLaunchSub->oid,
readyToLaunchSub->name, readyToLaunchSub->owner);
last_start_time = now;
wait_time = wal_retrieve_retry_interval;
}
/* Switch back to original memory context. */
MemoryContextSwitchTo(oldctx);
/* Clean the temporary memory. */