!6486 【回合6.0.0】添加选项:发布订阅主备切换时是否同步连接信息
Merge pull request !6486 from zhubin79/cherry-pick-1727429322
This commit is contained in:
@ -119,6 +119,13 @@ Subscription *GetSubscription(Oid subid, bool missing_ok)
|
||||
sub->skiplsn = TextDatumGetLsn(datum);
|
||||
}
|
||||
|
||||
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subsyncconninfo, &isnull);
|
||||
if (unlikely(isnull)) {
|
||||
sub->subsyncconninfo = true;
|
||||
} else {
|
||||
sub->subsyncconninfo = DatumGetBool(datum);
|
||||
}
|
||||
|
||||
ReleaseSysCache(tup);
|
||||
|
||||
return sub;
|
||||
|
||||
@ -64,6 +64,7 @@
|
||||
#define SUBOPT_CONNECT 0x00000080
|
||||
#define SUBOPT_SKIPLSN 0x00000100
|
||||
#define SUBOPT_MATCHDDLOWNER 0x00000200
|
||||
#define SUBOPT_SYNC_CONNINFO 0x00000400
|
||||
|
||||
/* check if the 'val' has 'bits' set */
|
||||
#define IsSet(val, bits) (((val) & (bits)) == (bits))
|
||||
@ -84,6 +85,7 @@ typedef struct SubOpts
|
||||
bool copy_data;
|
||||
bool connect;
|
||||
bool matchddlowner;
|
||||
bool syncconninfo;
|
||||
XLogRecPtr skiplsn;
|
||||
} SubOpts;
|
||||
|
||||
@ -125,6 +127,9 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte
|
||||
if (IsSet(supported_opts, SUBOPT_MATCHDDLOWNER)) {
|
||||
opts->matchddlowner = true;
|
||||
}
|
||||
if (IsSet(supported_opts, SUBOPT_SYNC_CONNINFO)) {
|
||||
opts->syncconninfo = true;
|
||||
}
|
||||
|
||||
/* Parse options */
|
||||
foreach (lc, stmt_options) {
|
||||
@ -235,6 +240,15 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte
|
||||
opts->specified_opts |= SUBOPT_MATCHDDLOWNER;
|
||||
opts->matchddlowner = defGetBoolean(defel);
|
||||
|
||||
} else if (IsSet(supported_opts, SUBOPT_SYNC_CONNINFO) && strcmp(defel->defname, "syncconninfo") == 0) {
|
||||
if (IsSet(opts->specified_opts, SUBOPT_SYNC_CONNINFO)) {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR),
|
||||
errmsg("conflicting or redundant options")));
|
||||
}
|
||||
|
||||
opts->specified_opts |= SUBOPT_SYNC_CONNINFO;
|
||||
opts->syncconninfo = defGetBoolean(defel);
|
||||
} else {
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname)));
|
||||
@ -524,7 +538,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
|
||||
* Connection and publication should not be specified here.
|
||||
*/
|
||||
supported_opts = (SUBOPT_ENABLED | SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
|
||||
SUBOPT_COPY_DATA | SUBOPT_CONNECT | SUBOPT_MATCHDDLOWNER);
|
||||
SUBOPT_COPY_DATA | SUBOPT_CONNECT | SUBOPT_MATCHDDLOWNER | SUBOPT_SYNC_CONNINFO);
|
||||
parse_subscription_options(stmt->options, supported_opts, &opts);
|
||||
|
||||
/*
|
||||
@ -570,6 +584,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
|
||||
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
|
||||
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
|
||||
values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner);
|
||||
values[Anum_pg_subscription_subsyncconninfo - 1] = BoolGetDatum(opts.syncconninfo);
|
||||
|
||||
/* encrypt conninfo */
|
||||
char *encryptConninfo = EncryptOrDecryptConninfo(stmt->conninfo, 'E');
|
||||
@ -893,7 +908,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
|
||||
/* Parse options. */
|
||||
if (!stmt->refresh) {
|
||||
supported_opts = (SUBOPT_CONNINFO | SUBOPT_PUBLICATION | SUBOPT_ENABLED | SUBOPT_SLOT_NAME |
|
||||
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_SKIPLSN | SUBOPT_MATCHDDLOWNER);
|
||||
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_SKIPLSN | SUBOPT_MATCHDDLOWNER |
|
||||
SUBOPT_SYNC_CONNINFO);
|
||||
parse_subscription_options(stmt->options, supported_opts, &opts);
|
||||
} else {
|
||||
supported_opts = SUBOPT_COPY_DATA;
|
||||
@ -986,6 +1002,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
|
||||
values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner);
|
||||
replaces[Anum_pg_subscription_submatchddlowner - 1] = true;
|
||||
}
|
||||
if (IsSet(opts.specified_opts, SUBOPT_SYNC_CONNINFO)) {
|
||||
values[Anum_pg_subscription_subsyncconninfo - 1] = BoolGetDatum(opts.syncconninfo);
|
||||
replaces[Anum_pg_subscription_subsyncconninfo - 1] = true;
|
||||
}
|
||||
|
||||
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);
|
||||
|
||||
|
||||
@ -2387,6 +2387,13 @@ static void UpdateConninfo(char* standbysInfo)
|
||||
}
|
||||
subid = HeapTupleGetOid(tup);
|
||||
|
||||
if (!sub->subsyncconninfo) {
|
||||
AbortOutOfAnyTransaction();
|
||||
ereport(LOG, (errmsg("Prevent subscription \"%s\" update connection info, as the configuration setting "
|
||||
"\"syncconninfo = false\" is applied.", t_thrd.applyworker_cxt.mySubscription->name)));
|
||||
return;
|
||||
}
|
||||
|
||||
/* Form a new tuple. */
|
||||
int rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
|
||||
securec_check(rc, "", "");
|
||||
|
||||
@ -56,13 +56,14 @@ CATALOG(pg_subscription,6126) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6128) BKI_SCHE
|
||||
* skipped */
|
||||
bool submatchddlowner; /* True if replicated objects by DDL replication
|
||||
* should match the original owner on the publisher */
|
||||
bool subsyncconninfo; /* True if allow synchronization of connection info */
|
||||
#endif
|
||||
}
|
||||
FormData_pg_subscription;
|
||||
|
||||
typedef FormData_pg_subscription *Form_pg_subscription;
|
||||
|
||||
#define Natts_pg_subscription 11
|
||||
#define Natts_pg_subscription 12
|
||||
#define Anum_pg_subscription_subdbid 1
|
||||
#define Anum_pg_subscription_subname 2
|
||||
#define Anum_pg_subscription_subowner 3
|
||||
@ -74,6 +75,7 @@ typedef FormData_pg_subscription *Form_pg_subscription;
|
||||
#define Anum_pg_subscription_subbinary 9
|
||||
#define Anum_pg_subscription_subskiplsn 10
|
||||
#define Anum_pg_subscription_submatchddlowner 11
|
||||
#define Anum_pg_subscription_subsyncconninfo 12
|
||||
|
||||
typedef struct Subscription {
|
||||
Oid oid; /* Oid of the subscription */
|
||||
@ -90,6 +92,7 @@ typedef struct Subscription {
|
||||
* skipped */
|
||||
bool matchddlowner; /* Indicated if replicated objects by DDL repllication
|
||||
* shold match the original owner on th publisher */
|
||||
bool subsyncconninfo;
|
||||
} Subscription;
|
||||
|
||||
|
||||
|
||||
@ -76,6 +76,11 @@ select subname, subskiplsn from pg_subscription where subname='testsub';
|
||||
ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEFGH');
|
||||
ALTER SUBSCRIPTION testsub SET (skiplsn = 'none');
|
||||
select subname, subskiplsn from pg_subscription where subname='testsub';
|
||||
-- set syncconninfo
|
||||
ALTER SUBSCRIPTION testsub SET (syncconninfo = true);
|
||||
select subname, subsyncconninfo from pg_subscription where subname='testsub';
|
||||
ALTER SUBSCRIPTION testsub SET (syncconninfo = false);
|
||||
select subname, subsyncconninfo from pg_subscription where subname='testsub';
|
||||
-- disable test
|
||||
ALTER SUBSCRIPTION testsub DISABLE;
|
||||
--rename
|
||||
|
||||
@ -171,6 +171,21 @@ select subname, subskiplsn from pg_subscription where subname='testsub';
|
||||
testsub | 0/0
|
||||
(1 row)
|
||||
|
||||
-- set syncconninfo
|
||||
ALTER SUBSCRIPTION testsub SET (syncconninfo = true);
|
||||
select subname, subsyncconninfo from pg_subscription where subname='testsub';
|
||||
subname | subsyncconninfo
|
||||
---------+-----------------
|
||||
testsub | t
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION testsub SET (syncconninfo = false);
|
||||
select subname, subsyncconninfo from pg_subscription where subname='testsub';
|
||||
subname | subsyncconninfo
|
||||
---------+-----------------
|
||||
testsub | f
|
||||
(1 row)
|
||||
|
||||
-- disable test
|
||||
ALTER SUBSCRIPTION testsub DISABLE;
|
||||
--rename
|
||||
@ -359,13 +374,15 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1
|
||||
testsub | ALTER SUBSCRIPTION testsub SET (binary=true);
|
||||
testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF');
|
||||
testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = 'none');
|
||||
testsub | ALTER SUBSCRIPTION testsub SET (syncconninfo = true);
|
||||
testsub | ALTER SUBSCRIPTION testsub SET (syncconninfo = false);
|
||||
testsub | ALTER SUBSCRIPTION testsub DISABLE;
|
||||
testsub | ALTER SUBSCRIPTION testsub rename to testsub_rename;
|
||||
sub_len_999 | CREATE SUBSCRIPTION sub_len_999 CONNECTION *********************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************PUBLICATION insert_only WITH (connect = false);
|
||||
testsub_rename | DROP SUBSCRIPTION IF EXISTS testsub_rename;
|
||||
testsub_maskconninfo | DROP SUBSCRIPTION IF EXISTS testsub_maskconninfo;
|
||||
sub_len_999 | DROP SUBSCRIPTION IF EXISTS sub_len_999;
|
||||
(20 rows)
|
||||
(22 rows)
|
||||
|
||||
--clear audit log
|
||||
SELECT pg_delete_audit('1012-11-10', '3012-11-11');
|
||||
|
||||
@ -5,6 +5,9 @@ source $1/env_utils.sh $1 $2
|
||||
case_db="sw_db"
|
||||
|
||||
function test_1() {
|
||||
|
||||
local conninfo = ""
|
||||
|
||||
echo "create database and tables."
|
||||
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
|
||||
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
|
||||
@ -17,8 +20,10 @@ function test_1() {
|
||||
echo "create publication and subscription."
|
||||
# Setup logical replication
|
||||
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
|
||||
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub for all tables"
|
||||
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION sw_tap_pub for all tables"
|
||||
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION sw_tap_sub CONNECTION '$publisher_connstr' PUBLICATION sw_tap_pub"
|
||||
|
||||
conninfo = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription WHERE subname = 'sw_tap_sub'")"
|
||||
|
||||
# Wait for initial table sync to finish
|
||||
wait_for_subscription_sync $case_db $sub_node1_port
|
||||
@ -34,7 +39,7 @@ function test_1() {
|
||||
# test incremental synchronous
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (11)"
|
||||
|
||||
wait_for_catchup $case_db $pub_node1_port "tap_sub"
|
||||
wait_for_catchup $case_db $pub_node1_port "sw_tap_sub"
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "11" ]; then
|
||||
echo "check incremental data of table is replicated success"
|
||||
@ -48,7 +53,7 @@ function test_1() {
|
||||
|
||||
exec_sql $case_db $pub_node2_port "INSERT INTO tab_rep VALUES (12)"
|
||||
|
||||
wait_for_catchup $case_db $pub_node2_port "tap_sub"
|
||||
wait_for_catchup $case_db $pub_node2_port "sw_tap_sub"
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "12" ]; then
|
||||
echo "check incremental data of table is replicated after switchover success"
|
||||
@ -56,13 +61,45 @@ function test_1() {
|
||||
echo "$failed_keyword when check incremental data of table is replicated after switchover"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# default sync conntion info
|
||||
if [ "$conninfo" = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription where subname = 'sw_tap_sub'")" ]; then
|
||||
echo "publication node switchover, subsciption node do sync connection info failed"
|
||||
exit 1
|
||||
else
|
||||
echo "publication node do switchover, subsciption node do sync connection info success"
|
||||
fi
|
||||
|
||||
# alter syncconninfo to false, disable sync conntion info after publication switchover
|
||||
exec_sql $case_db $sub_node1_port "alter subscription sw_tap_sub set (syncconninfo = false)"
|
||||
conninfo = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription WHERE subname = 'sw_tap_sub'")"
|
||||
|
||||
echo "switchover pub_node1 to primary"
|
||||
switchover_to_primary "pub_datanode1"
|
||||
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (13)"
|
||||
|
||||
wait_for_catchup $case_db $pub_node1_port "sw_tap_sub"
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "13" ]; then
|
||||
echo "check incremental data of table is replicated after switchover success"
|
||||
else
|
||||
echo "$failed_keyword when check incremental data of table is replicated after switchover"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# syncconninfo = false
|
||||
if [ "$conninfo" = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription where subname = 'sw_tap_sub'")" ]; then
|
||||
echo "publication node switchover, syncconninfo is false, subsciption node don't sync connection info , success."
|
||||
else
|
||||
echo "publication node do switchover, syncconninfo is false, subsciption node still do sync connection info, false."
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
function tear_down(){
|
||||
switchover_to_primary "pub_datanode1"
|
||||
|
||||
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub"
|
||||
exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub"
|
||||
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION sw_tap_sub"
|
||||
exec_sql $case_db $pub_node1_port "DROP PUBLICATION sw_tap_pub"
|
||||
|
||||
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
|
||||
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
|
||||
|
||||
Reference in New Issue
Block a user