修复基础数据复制的复制槽残留问题

This commit is contained in:
chenxiaobin19
2023-09-02 11:14:41 +08:00
parent 0fa6e3d817
commit 7095bb417b

View File

@ -1173,13 +1173,13 @@ void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* the apply and tablesync workers and they can't restart because of * the apply and tablesync workers and they can't restart because of
* exclusive lock on the subscription. * exclusive lock on the subscription.
*/ */
rstates = GetSubscriptionRelations(subid, true); rstates = GetSubscriptionRelations(subid, false);
foreach (lc, rstates) { foreach (lc, rstates) {
SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc); SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc);
Oid relid = rstate->relid; Oid relid = rstate->relid;
/* Only cleanup resources of tablesync workers */ /* Only cleanup resources of tablesync workers */
if (!OidIsValid(relid)) if (!OidIsValid(relid) || rstate->state == SUBREL_STATE_READY)
continue; continue;
/* /*
@ -1236,8 +1236,14 @@ void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
/* /*
* Drop the tablesync slots associated with removed tables. * Drop the tablesync slots associated with removed tables.
* *
* For SYNCDONE/READY states, the tablesync slot is known to have * For SYNCDONE state, the slot may remain. In the normal
* already been dropped by the tablesync worker. * process(process_syncing_tables_for_sync), the slot will be
* deleted after the SYNCDONE setting, so if the DropSubscription
* is executed after the SYNCDONE and before the slot is deleted,
* DropSubscription needs to clean up the remaining slot.
*
* For READY state, the slot may also remain after system crashed
* and reovers.
* *
* For other states, there is no certainty, maybe the slot does * For other states, there is no certainty, maybe the slot does
* not exist yet. Also, if we fail after removing some of the * not exist yet. Also, if we fail after removing some of the
@ -1245,12 +1251,10 @@ void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
* slots and fail. For these reasons, we allow missing_ok = true * slots and fail. For these reasons, we allow missing_ok = true
* for the drop. * for the drop.
*/ */
if (rstate->state != SUBREL_STATE_SYNCDONE) { char syncslotname[NAMEDATALEN] = {0};
char syncslotname[NAMEDATALEN] = {0};
ReplicationSlotNameForTablesync(subid, relid, syncslotname, sizeof(syncslotname)); ReplicationSlotNameForTablesync(subid, relid, syncslotname, sizeof(syncslotname));
ReplicationSlotDropAtPubNode(syncslotname, true); ReplicationSlotDropAtPubNode(syncslotname, true);
}
} }
list_free(rstates); list_free(rstates);
@ -1304,7 +1308,7 @@ void ReplicationSlotDropAtPubNode(char *slotname, bool missing_ok)
walrcv_clear_result(res); walrcv_clear_result(res);
FreeStringInfo(&cmd); FreeStringInfo(&cmd);
ereport(WARNING, (errmsg("replication slot \"%s\" does not exist on publisher", slotname))); ereport(DEBUG2, (errmsg("replication slot \"%s\" does not exist on publisher", slotname)));
return; return;
} }
} }