!6483 添加选项:发布订阅主备切换时是否同步连接信息

Merge pull request !6483 from zhubin79/sub-syncconn
This commit is contained in:
opengauss_bot
2024-09-27 09:56:24 +00:00
committed by Gitee
7 changed files with 108 additions and 12 deletions

View File

@ -119,6 +119,13 @@ Subscription *GetSubscription(Oid subid, bool missing_ok)
sub->skiplsn = TextDatumGetLsn(datum);
}
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subsyncconninfo, &isnull);
if (unlikely(isnull)) {
sub->subsyncconninfo = true;
} else {
sub->subsyncconninfo = DatumGetBool(datum);
}
ReleaseSysCache(tup);
return sub;

View File

@ -64,6 +64,7 @@
#define SUBOPT_CONNECT 0x00000080
#define SUBOPT_SKIPLSN 0x00000100
#define SUBOPT_MATCHDDLOWNER 0x00000200
#define SUBOPT_SYNC_CONNINFO 0x00000400
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@ -84,6 +85,7 @@ typedef struct SubOpts
bool copy_data;
bool connect;
bool matchddlowner;
bool syncconninfo;
XLogRecPtr skiplsn;
} SubOpts;
@ -125,6 +127,9 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte
if (IsSet(supported_opts, SUBOPT_MATCHDDLOWNER)) {
opts->matchddlowner = true;
}
if (IsSet(supported_opts, SUBOPT_SYNC_CONNINFO)) {
opts->syncconninfo = true;
}
/* Parse options */
foreach (lc, stmt_options) {
@ -235,6 +240,15 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte
opts->specified_opts |= SUBOPT_MATCHDDLOWNER;
opts->matchddlowner = defGetBoolean(defel);
} else if (IsSet(supported_opts, SUBOPT_SYNC_CONNINFO) && strcmp(defel->defname, "syncconninfo") == 0) {
if (IsSet(opts->specified_opts, SUBOPT_SYNC_CONNINFO)) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
}
opts->specified_opts |= SUBOPT_SYNC_CONNINFO;
opts->syncconninfo = defGetBoolean(defel);
} else {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname)));
@ -524,7 +538,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
* Connection and publication should not be specified here.
*/
supported_opts = (SUBOPT_ENABLED | SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_COPY_DATA | SUBOPT_CONNECT | SUBOPT_MATCHDDLOWNER);
SUBOPT_COPY_DATA | SUBOPT_CONNECT | SUBOPT_MATCHDDLOWNER | SUBOPT_SYNC_CONNINFO);
parse_subscription_options(stmt->options, supported_opts, &opts);
/*
@ -570,6 +584,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner);
values[Anum_pg_subscription_subsyncconninfo - 1] = BoolGetDatum(opts.syncconninfo);
/* encrypt conninfo */
char *encryptConninfo = EncryptOrDecryptConninfo(stmt->conninfo, 'E');
@ -893,7 +908,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
/* Parse options. */
if (!stmt->refresh) {
supported_opts = (SUBOPT_CONNINFO | SUBOPT_PUBLICATION | SUBOPT_ENABLED | SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_SKIPLSN | SUBOPT_MATCHDDLOWNER);
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_SKIPLSN | SUBOPT_MATCHDDLOWNER |
SUBOPT_SYNC_CONNINFO);
parse_subscription_options(stmt->options, supported_opts, &opts);
} else {
supported_opts = SUBOPT_COPY_DATA;
@ -986,6 +1002,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner);
replaces[Anum_pg_subscription_submatchddlowner - 1] = true;
}
if (IsSet(opts.specified_opts, SUBOPT_SYNC_CONNINFO)) {
values[Anum_pg_subscription_subsyncconninfo - 1] = BoolGetDatum(opts.syncconninfo);
replaces[Anum_pg_subscription_subsyncconninfo - 1] = true;
}
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces);

View File

@ -2387,6 +2387,13 @@ static void UpdateConninfo(char* standbysInfo)
}
subid = HeapTupleGetOid(tup);
if (!sub->subsyncconninfo) {
AbortOutOfAnyTransaction();
ereport(LOG, (errmsg("Prevent subscription \"%s\" update connection info, as the configuration setting "
"\"syncconninfo = false\" is applied.", t_thrd.applyworker_cxt.mySubscription->name)));
return;
}
/* Form a new tuple. */
int rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls));
securec_check(rc, "", "");

View File

@ -56,13 +56,14 @@ CATALOG(pg_subscription,6126) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6128) BKI_SCHE
* skipped */
bool submatchddlowner; /* True if replicated objects by DDL replication
* should match the original owner on the publisher */
bool subsyncconninfo; /* True if allow synchronization of connection info */
#endif
}
FormData_pg_subscription;
typedef FormData_pg_subscription *Form_pg_subscription;
#define Natts_pg_subscription 11
#define Natts_pg_subscription 12
#define Anum_pg_subscription_subdbid 1
#define Anum_pg_subscription_subname 2
#define Anum_pg_subscription_subowner 3
@ -74,6 +75,7 @@ typedef FormData_pg_subscription *Form_pg_subscription;
#define Anum_pg_subscription_subbinary 9
#define Anum_pg_subscription_subskiplsn 10
#define Anum_pg_subscription_submatchddlowner 11
#define Anum_pg_subscription_subsyncconninfo 12
typedef struct Subscription {
Oid oid; /* Oid of the subscription */
@ -90,6 +92,7 @@ typedef struct Subscription {
* skipped */
bool matchddlowner; /* Indicated if replicated objects by DDL repllication
* shold match the original owner on th publisher */
bool subsyncconninfo;
} Subscription;

View File

@ -76,6 +76,11 @@ select subname, subskiplsn from pg_subscription where subname='testsub';
ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEFGH');
ALTER SUBSCRIPTION testsub SET (skiplsn = 'none');
select subname, subskiplsn from pg_subscription where subname='testsub';
-- set syncconninfo
ALTER SUBSCRIPTION testsub SET (syncconninfo = true);
select subname, subsyncconninfo from pg_subscription where subname='testsub';
ALTER SUBSCRIPTION testsub SET (syncconninfo = false);
select subname, subsyncconninfo from pg_subscription where subname='testsub';
-- disable test
ALTER SUBSCRIPTION testsub DISABLE;
--rename

View File

@ -171,6 +171,21 @@ select subname, subskiplsn from pg_subscription where subname='testsub';
testsub | 0/0
(1 row)
-- set syncconninfo
ALTER SUBSCRIPTION testsub SET (syncconninfo = true);
select subname, subsyncconninfo from pg_subscription where subname='testsub';
subname | subsyncconninfo
---------+-----------------
testsub | t
(1 row)
ALTER SUBSCRIPTION testsub SET (syncconninfo = false);
select subname, subsyncconninfo from pg_subscription where subname='testsub';
subname | subsyncconninfo
---------+-----------------
testsub | f
(1 row)
-- disable test
ALTER SUBSCRIPTION testsub DISABLE;
--rename
@ -359,13 +374,15 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1
testsub | ALTER SUBSCRIPTION testsub SET (binary=true);
testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF');
testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = 'none');
testsub | ALTER SUBSCRIPTION testsub SET (syncconninfo = true);
testsub | ALTER SUBSCRIPTION testsub SET (syncconninfo = false);
testsub | ALTER SUBSCRIPTION testsub DISABLE;
testsub | ALTER SUBSCRIPTION testsub rename to testsub_rename;
sub_len_999 | CREATE SUBSCRIPTION sub_len_999 CONNECTION *********************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************PUBLICATION insert_only WITH (connect = false);
testsub_rename | DROP SUBSCRIPTION IF EXISTS testsub_rename;
testsub_maskconninfo | DROP SUBSCRIPTION IF EXISTS testsub_maskconninfo;
sub_len_999 | DROP SUBSCRIPTION IF EXISTS sub_len_999;
(20 rows)
(22 rows)
--clear audit log
SELECT pg_delete_audit('1012-11-10', '3012-11-11');

View File

@ -5,6 +5,9 @@ source $1/env_utils.sh $1 $2
case_db="sw_db"
function test_1() {
local conninfo = ""
echo "create database and tables."
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
@ -17,8 +20,10 @@ function test_1() {
echo "create publication and subscription."
# Setup logical replication
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub for all tables"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION sw_tap_pub for all tables"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION sw_tap_sub CONNECTION '$publisher_connstr' PUBLICATION sw_tap_pub"
conninfo = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription WHERE subname = 'sw_tap_sub'")"
# Wait for initial table sync to finish
wait_for_subscription_sync $case_db $sub_node1_port
@ -34,7 +39,7 @@ function test_1() {
# test incremental synchronous
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (11)"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
wait_for_catchup $case_db $pub_node1_port "sw_tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "11" ]; then
echo "check incremental data of table is replicated success"
@ -48,7 +53,7 @@ function test_1() {
exec_sql $case_db $pub_node2_port "INSERT INTO tab_rep VALUES (12)"
wait_for_catchup $case_db $pub_node2_port "tap_sub"
wait_for_catchup $case_db $pub_node2_port "sw_tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "12" ]; then
echo "check incremental data of table is replicated after switchover success"
@ -56,13 +61,45 @@ function test_1() {
echo "$failed_keyword when check incremental data of table is replicated after switchover"
exit 1
fi
# default sync conntion info
if [ "$conninfo" = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription where subname = 'sw_tap_sub'")" ]; then
echo "publication node switchover, subsciption node do sync connection info failed"
exit 1
else
echo "publication node do switchover, subsciption node do sync connection info success"
fi
# alter syncconninfo to false, disable sync conntion info after publication switchover
exec_sql $case_db $sub_node1_port "alter subscription sw_tap_sub set (syncconninfo = false)"
conninfo = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription WHERE subname = 'sw_tap_sub'")"
echo "switchover pub_node1 to primary"
switchover_to_primary "pub_datanode1"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (13)"
wait_for_catchup $case_db $pub_node1_port "sw_tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "13" ]; then
echo "check incremental data of table is replicated after switchover success"
else
echo "$failed_keyword when check incremental data of table is replicated after switchover"
exit 1
fi
# syncconninfo = false
if [ "$conninfo" = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription where subname = 'sw_tap_sub'")" ]; then
echo "publication node switchover, syncconninfo is false, subsciption node don't sync connection info , success."
else
echo "publication node do switchover, syncconninfo is false, subsciption node still do sync connection info, false."
exit 1
fi
}
function tear_down(){
switchover_to_primary "pub_datanode1"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION sw_tap_sub"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION sw_tap_pub"
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"