create slot after synchronizing the table list

This commit is contained in:
chenxiaobin19
2022-06-21 21:29:14 +08:00
parent 00b088690d
commit d1344cdda4

View File

@ -327,9 +327,6 @@ static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List
options.slotname = slotname;
PG_TRY();
{
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_create_slot(&options, NULL, NULL);
ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname)));
/*
* Set sync state based on if we were asked to do data copy or
* not.
@ -343,10 +340,17 @@ static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List
tables = fetch_table_list(publications);
foreach (lc, tables) {
RangeVar *rv = (RangeVar *)lfirst(lc);
Oid relid = RangeVarGetRelid(rv, AccessShareLock, true);
Oid relid = RangeVarGetRelid(rv, AccessShareLock, false);
AddSubscriptionRelState(subid, relid, table_state);
}
/*
* Create slot after synchronizing the table list to avoid
* leaving an unused slot on the publisher.
*/
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_create_slot(&options, NULL, NULL);
ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname)));
}
PG_CATCH();
{
@ -731,17 +735,17 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
HeapTuple tup;
Oid subid;
bool enabled_given = false;
bool enabled;
bool binary_given;
bool binary;
char *synchronous_commit;
char *conninfo;
char *slot_name;
bool slotname_given;
bool copy_data;
bool copy_data_given;
List *publications;
Subscription *sub;
bool enabled = false;
bool binary_given = false;
bool binary = false;
char *synchronous_commit = NULL;
char *conninfo = NULL;
char *slot_name = NULL;
bool slotname_given = false;
bool copy_data = false;
bool copy_data_given = false;
List *publications = NIL;
Subscription *sub = NULL;
int rc;
bool checkConn = false;
bool validateSlot = false;
@ -773,10 +777,13 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
encryptConninfo = sub->conninfo;
/* Parse options. */
parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given,
&slot_name, &synchronous_commit, &binary_given, &binary, &copy_data_given, &copy_data);
if (!stmt->refresh) {
parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given,
&slot_name, &synchronous_commit, &binary_given, &binary, NULL, NULL);
} else {
parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
&copy_data_given, &copy_data);
if (stmt->refresh) {
PreventTransactionChain(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
}