diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp index 67490ec94..d66531bfe 100644 --- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp +++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp @@ -327,9 +327,6 @@ static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List options.slotname = slotname; PG_TRY(); { - (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_create_slot(&options, NULL, NULL); - ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname))); - /* * Set sync state based on if we were asked to do data copy or * not. @@ -343,10 +340,17 @@ static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List tables = fetch_table_list(publications); foreach (lc, tables) { RangeVar *rv = (RangeVar *)lfirst(lc); - Oid relid = RangeVarGetRelid(rv, AccessShareLock, true); + Oid relid = RangeVarGetRelid(rv, AccessShareLock, false); AddSubscriptionRelState(subid, relid, table_state); } + + /* + * Create slot after synchronizing the table list to avoid + * leaving an unused slot on the publisher. + */ + (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_create_slot(&options, NULL, NULL); + ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname))); } PG_CATCH(); { @@ -731,17 +735,17 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) HeapTuple tup; Oid subid; bool enabled_given = false; - bool enabled; - bool binary_given; - bool binary; - char *synchronous_commit; - char *conninfo; - char *slot_name; - bool slotname_given; - bool copy_data; - bool copy_data_given; - List *publications; - Subscription *sub; + bool enabled = false; + bool binary_given = false; + bool binary = false; + char *synchronous_commit = NULL; + char *conninfo = NULL; + char *slot_name = NULL; + bool slotname_given = false; + bool copy_data = false; + bool copy_data_given = false; + List *publications = NIL; + Subscription *sub = NULL; int rc; bool checkConn = false; bool validateSlot = false; @@ -773,10 +777,13 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) encryptConninfo = sub->conninfo; /* Parse options. */ - parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given, - &slot_name, &synchronous_commit, &binary_given, &binary, ©_data_given, ©_data); + if (!stmt->refresh) { + parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given, + &slot_name, &synchronous_commit, &binary_given, &binary, NULL, NULL); + } else { + parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, + ©_data_given, ©_data); - if (stmt->refresh) { PreventTransactionChain(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); }