diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp index 6278101a9..9c2f23ac1 100644 --- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp +++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp @@ -50,8 +50,8 @@ #include "replication/slot.h" static bool ConnectPublisher(char* conninfo, char* slotname); -static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications = NULL, - bool copy_data = false); +static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications, + bool *copy_data, bool create_slot); static void ValidateReplicationSlot(char *slotname, List *publications); static List *fetch_table_list(List *publications); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname); @@ -66,7 +66,7 @@ static bool CheckPublicationsExistOnPublisher(List *publications); */ static void parse_subscription_options(const List *options, char **conninfo, List **publications, bool *enabled_given, bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit, bool *binary_given, bool *binary, - bool *copy_data_given, bool *copy_data) + bool *copy_data_given, bool *copy_data, bool *connect_given, bool *connect) { ListCell *lc; @@ -96,6 +96,11 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis *copy_data = true; } + if (connect) { + *connect_given = false; + *connect = true; + } + /* Parse options */ foreach (lc, options) { DefElem *defel = (DefElem *)lfirst(lc); @@ -161,12 +166,40 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis *copy_data_given = true; *copy_data = defGetBoolean(defel); + } else if (strcmp(defel->defname, "connect") == 0 && connect) { + if (*connect_given) { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + } + + *connect_given = true; + *connect = defGetBoolean(defel); } else { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname))); } } + /* + * We've been explicitly asked to not connect, that requires some + * additional processing. + */ + if (connect && !*connect) { + /* Check for incompatible options from the user. */ + if (*enabled_given && *enabled) + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", + "connect = false", "enabled = true"))); + + if (*copy_data_given && *copy_data) + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options", + "connect = false", "copy_data = true"))); + + /* Change the defaults of other options. */ + *enabled = false; + *copy_data = false; + } + /* * Do additional checking for disallowed combination when * slot_name = NONE was used. @@ -321,7 +354,8 @@ static bool ConnectPublisher(char* conninfo, char* slotname) * Create replication slot in publisher side and Insert tables into pg_subscription_rel. * Please make sure you have already connect to publisher before calling this func. */ -static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications, bool copy_data) +static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications, bool *copy_data, + bool create_slot) { LibpqrcvConnectParam options; char table_state; @@ -333,30 +367,34 @@ static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List options.slotname = slotname; PG_TRY(); { - /* - * Set sync state based on if we were asked to do data copy or - * not. - */ - table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + if (copy_data) { + /* + * Set sync state based on if we were asked to do data copy or + * not. + */ + table_state = *copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; - /* - * Get the table list from publisher and build local table status - * info. - */ - tables = fetch_table_list(publications); - foreach (lc, tables) { - RangeVar *rv = (RangeVar *)lfirst(lc); - Oid relid = RangeVarGetRelid(rv, AccessShareLock, false); + /* + * Get the table list from publisher and build local table status + * info. + */ + tables = fetch_table_list(publications); + foreach (lc, tables) { + RangeVar *rv = (RangeVar *)lfirst(lc); + Oid relid = RangeVarGetRelid(rv, AccessShareLock, false); - AddSubscriptionRelState(subid, relid, table_state); + AddSubscriptionRelState(subid, relid, table_state); + } } /* * Create slot after synchronizing the table list to avoid * leaving an unused slot on the publisher. */ - (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_create_slot(&options, NULL, NULL); - ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname))); + if (create_slot) { + (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_create_slot(&options, NULL, NULL); + ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname))); + } } PG_CATCH(); { @@ -422,6 +460,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool binary_given; bool copy_data; bool copy_data_given; + bool connect; + bool connect_given; char originname[NAMEDATALEN]; List *publications; int rc; @@ -431,7 +471,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ parse_subscription_options(stmt->options, NULL, NULL, &enabled_given, &enabled, &slotname_given, &slotname, - &synchronous_commit, &binary_given, &binary, ©_data_given, ©_data); + &synchronous_commit, &binary_given, &binary, ©_data_given, ©_data, &connect_given, &connect); /* * Since creating a replication slot is not transactional, rolling back @@ -510,12 +550,10 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) replorigin_create(originname); /* - * If requested, create the replication slot on remote side for our - * newly created subscription. + * Connect to remote side to execute requested commands and fetch table + * info. */ - if (enabled) { - Assert(slotname); - + if (connect) { if (!AttemptConnectPublisher(encryptConninfo, slotname, true)) { ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("Failed to connect to publisher."))); } @@ -524,9 +562,19 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) ereport(ERROR, (errmsg("There are some publications not exist on the publisher."))); } - CreateSlotInPublisherAndInsertSubRel(slotname, subid, publications, copy_data); + /* + * If requested, create the replication slot on remote side for our + * newly created subscription. + */ + Assert(!enabled || slotname); + CreateSlotInPublisherAndInsertSubRel(slotname, subid, publications, ©_data, enabled); + (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); + } else { + ereport(WARNING, (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables", + "ALTER SUBSCRIPTION ... REFRESH PUBLICATION"))); } + pfree_ext(encryptConninfo); heap_close(rel, RowExclusiveLock); rc = memset_s(stmt->conninfo, strlen(stmt->conninfo), 0, strlen(stmt->conninfo)); @@ -789,10 +837,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Parse options. */ if (!stmt->refresh) { parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given, - &slot_name, &synchronous_commit, &binary_given, &binary, NULL, NULL); + &slot_name, &synchronous_commit, &binary_given, &binary, NULL, NULL, NULL, NULL); } else { parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, - ©_data_given, ©_data); + ©_data_given, ©_data, NULL, NULL); PreventTransactionChain(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); } @@ -923,7 +971,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) } if (createSlot) { - CreateSlotInPublisherAndInsertSubRel(finalSlotName, subid, publications); + CreateSlotInPublisherAndInsertSubRel(finalSlotName, subid, publications, NULL, true); } /* no need to validate replication slot if the slot is created just by ourself */ @@ -936,6 +984,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) } if (stmt->refresh) { + if (!sub->enabled) { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); + } AlterSubscription_refresh(sub, copy_data); } diff --git a/src/test/regress/input/subscription.source b/src/test/regress/input/subscription.source index e98eb74b8..a4be16dee 100644 --- a/src/test/regress/input/subscription.source +++ b/src/test/regress/input/subscription.source @@ -32,8 +32,10 @@ CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; -- fail - unrecognized subscription parameter: create_slot CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (create_slot=false); CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); +CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off); -- create SUBSCRIPTION with conninfo in two single quote, used to check mask string bug CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); +CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off); -- fail - The number of host and port are inconsistent create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1; -- fail - a maximum of 9 servers are supported diff --git a/src/test/regress/output/subscription.source b/src/test/regress/output/subscription.source index 27ce4f94b..52a5e49c3 100644 --- a/src/test/regress/output/subscription.source +++ b/src/test/regress/output/subscription.source @@ -79,8 +79,12 @@ ERROR: invalid connection string syntax CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (create_slot=false); ERROR: unrecognized subscription parameter: create_slot CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); +ERROR: invalid connection string syntax, missing host and port +CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off); -- create SUBSCRIPTION with conninfo in two single quote, used to check mask string bug CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); +ERROR: Failed to connect to publisher. +CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off); -- fail - The number of host and port are inconsistent create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1; ERROR: The number of host and port are inconsistent. @@ -297,8 +301,8 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1 object_name | detail_info ----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- testsub | DROP SUBSCRIPTION IF EXISTS testsub; - testsub | CREATE SUBSCRIPTION testsub CONNECTION **********************PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); - testsub_maskconninfo | CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION ***************************************************************************************************PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); + testsub | CREATE SUBSCRIPTION testsub CONNECTION **********************PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off); + testsub_maskconninfo | CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION ***************************************************************************************************PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off); testsub | ALTER SUBSCRIPTION testsub CONNECTION '*************************************************************************************************; testsub | ALTER SUBSCRIPTION testsub CONNECTION '**********************; testsub | ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3;