insert pg_subscription_rel even if create subscription with false enabled.

This commit is contained in:
chenxiaobin19
2022-09-13 19:52:03 +08:00
parent 0128e74abb
commit a42ff5c78b
3 changed files with 90 additions and 32 deletions

View File

@ -50,8 +50,8 @@
#include "replication/slot.h" #include "replication/slot.h"
static bool ConnectPublisher(char* conninfo, char* slotname); static bool ConnectPublisher(char* conninfo, char* slotname);
static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications = NULL, static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications,
bool copy_data = false); bool *copy_data, bool create_slot);
static void ValidateReplicationSlot(char *slotname, List *publications); static void ValidateReplicationSlot(char *slotname, List *publications);
static List *fetch_table_list(List *publications); static List *fetch_table_list(List *publications);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname);
@ -66,7 +66,7 @@ static bool CheckPublicationsExistOnPublisher(List *publications);
*/ */
static void parse_subscription_options(const List *options, char **conninfo, List **publications, bool *enabled_given, static void parse_subscription_options(const List *options, char **conninfo, List **publications, bool *enabled_given,
bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit, bool *binary_given, bool *binary, bool *enabled, bool *slot_name_given, char **slot_name, char **synchronous_commit, bool *binary_given, bool *binary,
bool *copy_data_given, bool *copy_data) bool *copy_data_given, bool *copy_data, bool *connect_given, bool *connect)
{ {
ListCell *lc; ListCell *lc;
@ -96,6 +96,11 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis
*copy_data = true; *copy_data = true;
} }
if (connect) {
*connect_given = false;
*connect = true;
}
/* Parse options */ /* Parse options */
foreach (lc, options) { foreach (lc, options) {
DefElem *defel = (DefElem *)lfirst(lc); DefElem *defel = (DefElem *)lfirst(lc);
@ -161,12 +166,40 @@ static void parse_subscription_options(const List *options, char **conninfo, Lis
*copy_data_given = true; *copy_data_given = true;
*copy_data = defGetBoolean(defel); *copy_data = defGetBoolean(defel);
} else if (strcmp(defel->defname, "connect") == 0 && connect) {
if (*connect_given) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
}
*connect_given = true;
*connect = defGetBoolean(defel);
} else { } else {
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname))); (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname)));
} }
} }
/*
* We've been explicitly asked to not connect, that requires some
* additional processing.
*/
if (connect && !*connect) {
/* Check for incompatible options from the user. */
if (*enabled_given && *enabled)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options",
"connect = false", "enabled = true")));
if (*copy_data_given && *copy_data)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("%s and %s are mutually exclusive options",
"connect = false", "copy_data = true")));
/* Change the defaults of other options. */
*enabled = false;
*copy_data = false;
}
/* /*
* Do additional checking for disallowed combination when * Do additional checking for disallowed combination when
* slot_name = NONE was used. * slot_name = NONE was used.
@ -321,7 +354,8 @@ static bool ConnectPublisher(char* conninfo, char* slotname)
* Create replication slot in publisher side and Insert tables into pg_subscription_rel. * Create replication slot in publisher side and Insert tables into pg_subscription_rel.
* Please make sure you have already connect to publisher before calling this func. * Please make sure you have already connect to publisher before calling this func.
*/ */
static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications, bool copy_data) static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List *publications, bool *copy_data,
bool create_slot)
{ {
LibpqrcvConnectParam options; LibpqrcvConnectParam options;
char table_state; char table_state;
@ -333,30 +367,34 @@ static void CreateSlotInPublisherAndInsertSubRel(char *slotname, Oid subid, List
options.slotname = slotname; options.slotname = slotname;
PG_TRY(); PG_TRY();
{ {
/* if (copy_data) {
* Set sync state based on if we were asked to do data copy or /*
* not. * Set sync state based on if we were asked to do data copy or
*/ * not.
table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; */
table_state = *copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
/* /*
* Get the table list from publisher and build local table status * Get the table list from publisher and build local table status
* info. * info.
*/ */
tables = fetch_table_list(publications); tables = fetch_table_list(publications);
foreach (lc, tables) { foreach (lc, tables) {
RangeVar *rv = (RangeVar *)lfirst(lc); RangeVar *rv = (RangeVar *)lfirst(lc);
Oid relid = RangeVarGetRelid(rv, AccessShareLock, false); Oid relid = RangeVarGetRelid(rv, AccessShareLock, false);
AddSubscriptionRelState(subid, relid, table_state); AddSubscriptionRelState(subid, relid, table_state);
}
} }
/* /*
* Create slot after synchronizing the table list to avoid * Create slot after synchronizing the table list to avoid
* leaving an unused slot on the publisher. * leaving an unused slot on the publisher.
*/ */
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_create_slot(&options, NULL, NULL); if (create_slot) {
ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname))); (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_create_slot(&options, NULL, NULL);
ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", slotname)));
}
} }
PG_CATCH(); PG_CATCH();
{ {
@ -422,6 +460,8 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
bool binary_given; bool binary_given;
bool copy_data; bool copy_data;
bool copy_data_given; bool copy_data_given;
bool connect;
bool connect_given;
char originname[NAMEDATALEN]; char originname[NAMEDATALEN];
List *publications; List *publications;
int rc; int rc;
@ -431,7 +471,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connection and publication should not be specified here. * Connection and publication should not be specified here.
*/ */
parse_subscription_options(stmt->options, NULL, NULL, &enabled_given, &enabled, &slotname_given, &slotname, parse_subscription_options(stmt->options, NULL, NULL, &enabled_given, &enabled, &slotname_given, &slotname,
&synchronous_commit, &binary_given, &binary, &copy_data_given, &copy_data); &synchronous_commit, &binary_given, &binary, &copy_data_given, &copy_data, &connect_given, &connect);
/* /*
* Since creating a replication slot is not transactional, rolling back * Since creating a replication slot is not transactional, rolling back
@ -510,12 +550,10 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
replorigin_create(originname); replorigin_create(originname);
/* /*
* If requested, create the replication slot on remote side for our * Connect to remote side to execute requested commands and fetch table
* newly created subscription. * info.
*/ */
if (enabled) { if (connect) {
Assert(slotname);
if (!AttemptConnectPublisher(encryptConninfo, slotname, true)) { if (!AttemptConnectPublisher(encryptConninfo, slotname, true)) {
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("Failed to connect to publisher."))); ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("Failed to connect to publisher.")));
} }
@ -524,9 +562,19 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
ereport(ERROR, (errmsg("There are some publications not exist on the publisher."))); ereport(ERROR, (errmsg("There are some publications not exist on the publisher.")));
} }
CreateSlotInPublisherAndInsertSubRel(slotname, subid, publications, copy_data); /*
* If requested, create the replication slot on remote side for our
* newly created subscription.
*/
Assert(!enabled || slotname);
CreateSlotInPublisherAndInsertSubRel(slotname, subid, publications, &copy_data, enabled);
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect();
} else {
ereport(WARNING, (errmsg("tables were not subscribed, you will have to run %s to subscribe the tables",
"ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
} }
pfree_ext(encryptConninfo); pfree_ext(encryptConninfo);
heap_close(rel, RowExclusiveLock); heap_close(rel, RowExclusiveLock);
rc = memset_s(stmt->conninfo, strlen(stmt->conninfo), 0, strlen(stmt->conninfo)); rc = memset_s(stmt->conninfo, strlen(stmt->conninfo), 0, strlen(stmt->conninfo));
@ -789,10 +837,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
/* Parse options. */ /* Parse options. */
if (!stmt->refresh) { if (!stmt->refresh) {
parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given, parse_subscription_options(stmt->options, &conninfo, &publications, &enabled_given, &enabled, &slotname_given,
&slot_name, &synchronous_commit, &binary_given, &binary, NULL, NULL); &slot_name, &synchronous_commit, &binary_given, &binary, NULL, NULL, NULL, NULL);
} else { } else {
parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, parse_subscription_options(stmt->options, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
&copy_data_given, &copy_data); &copy_data_given, &copy_data, NULL, NULL);
PreventTransactionChain(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); PreventTransactionChain(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH");
} }
@ -923,7 +971,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
} }
if (createSlot) { if (createSlot) {
CreateSlotInPublisherAndInsertSubRel(finalSlotName, subid, publications); CreateSlotInPublisherAndInsertSubRel(finalSlotName, subid, publications, NULL, true);
} }
/* no need to validate replication slot if the slot is created just by ourself */ /* no need to validate replication slot if the slot is created just by ourself */
@ -936,6 +984,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
} }
if (stmt->refresh) { if (stmt->refresh) {
if (!sub->enabled) {
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
}
AlterSubscription_refresh(sub, copy_data); AlterSubscription_refresh(sub, copy_data);
} }

View File

@ -32,8 +32,10 @@ CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub;
-- fail - unrecognized subscription parameter: create_slot -- fail - unrecognized subscription parameter: create_slot
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (create_slot=false); CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (create_slot=false);
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off);
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off);
-- create SUBSCRIPTION with conninfo in two single quote, used to check mask string bug -- create SUBSCRIPTION with conninfo in two single quote, used to check mask string bug
CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off);
CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off);
-- fail - The number of host and port are inconsistent -- fail - The number of host and port are inconsistent
create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1; create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1;
-- fail - a maximum of 9 servers are supported -- fail - a maximum of 9 servers are supported

View File

@ -79,8 +79,12 @@ ERROR: invalid connection string syntax
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (create_slot=false); CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (create_slot=false);
ERROR: unrecognized subscription parameter: create_slot ERROR: unrecognized subscription parameter: create_slot
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off);
ERROR: invalid connection string syntax, missing host and port
CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off);
-- create SUBSCRIPTION with conninfo in two single quote, used to check mask string bug -- create SUBSCRIPTION with conninfo in two single quote, used to check mask string bug
CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off);
ERROR: Failed to connect to publisher.
CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION 'host=''1.2.3.4'' port=''12345'' user=''username'' dbname=''postgres'' password=''password_1234''' PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off);
-- fail - The number of host and port are inconsistent -- fail - The number of host and port are inconsistent
create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1; create subscription sub1 connection 'dbname=postgres user=pubusr password=Huawei@123 host=192.168.0.38,192.168.0.38,192.168.0.38 port=14001,14501' publication pub1;
ERROR: The number of host and port are inconsistent. ERROR: The number of host and port are inconsistent.
@ -297,8 +301,8 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1
object_name | detail_info object_name | detail_info
----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
testsub | DROP SUBSCRIPTION IF EXISTS testsub; testsub | DROP SUBSCRIPTION IF EXISTS testsub;
testsub | CREATE SUBSCRIPTION testsub CONNECTION **********************PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); testsub | CREATE SUBSCRIPTION testsub CONNECTION **********************PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off);
testsub_maskconninfo | CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION ***************************************************************************************************PUBLICATION testpub WITH (ENABLED=false, slot_name='testsub', synchronous_commit=off); testsub_maskconninfo | CREATE SUBSCRIPTION testsub_maskconninfo CONNECTION ***************************************************************************************************PUBLICATION testpub WITH (CONNECT=false, slot_name='testsub', synchronous_commit=off);
testsub | ALTER SUBSCRIPTION testsub CONNECTION '*************************************************************************************************; testsub | ALTER SUBSCRIPTION testsub CONNECTION '*************************************************************************************************;
testsub | ALTER SUBSCRIPTION testsub CONNECTION '**********************; testsub | ALTER SUBSCRIPTION testsub CONNECTION '**********************;
testsub | ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3; testsub | ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3;