diff --git a/src/common/backend/catalog/pg_subscription.cpp b/src/common/backend/catalog/pg_subscription.cpp index 529d02f83..543535fa7 100644 --- a/src/common/backend/catalog/pg_subscription.cpp +++ b/src/common/backend/catalog/pg_subscription.cpp @@ -119,6 +119,13 @@ Subscription *GetSubscription(Oid subid, bool missing_ok) sub->skiplsn = TextDatumGetLsn(datum); } + datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, Anum_pg_subscription_subsyncconninfo, &isnull); + if (unlikely(isnull)) { + sub->subsyncconninfo = true; + } else { + sub->subsyncconninfo = DatumGetBool(datum); + } + ReleaseSysCache(tup); return sub; diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp index b0c32e6c5..d8bf3d8c6 100644 --- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp +++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp @@ -64,6 +64,7 @@ #define SUBOPT_CONNECT 0x00000080 #define SUBOPT_SKIPLSN 0x00000100 #define SUBOPT_MATCHDDLOWNER 0x00000200 +#define SUBOPT_SYNC_CONNINFO 0x00000400 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -84,6 +85,7 @@ typedef struct SubOpts bool copy_data; bool connect; bool matchddlowner; + bool syncconninfo; XLogRecPtr skiplsn; } SubOpts; @@ -125,6 +127,9 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte if (IsSet(supported_opts, SUBOPT_MATCHDDLOWNER)) { opts->matchddlowner = true; } + if (IsSet(supported_opts, SUBOPT_SYNC_CONNINFO)) { + opts->syncconninfo = true; + } /* Parse options */ foreach (lc, stmt_options) { @@ -235,6 +240,15 @@ static void parse_subscription_options(const List *stmt_options, bits32 supporte opts->specified_opts |= SUBOPT_MATCHDDLOWNER; opts->matchddlowner = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_SYNC_CONNINFO) && strcmp(defel->defname, "syncconninfo") == 0) { + if (IsSet(opts->specified_opts, SUBOPT_SYNC_CONNINFO)) { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + } + + opts->specified_opts |= SUBOPT_SYNC_CONNINFO; + opts->syncconninfo = defGetBoolean(defel); } else { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized subscription parameter: %s", defel->defname))); @@ -524,7 +538,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Connection and publication should not be specified here. */ supported_opts = (SUBOPT_ENABLED | SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_COPY_DATA | SUBOPT_CONNECT | SUBOPT_MATCHDDLOWNER); + SUBOPT_COPY_DATA | SUBOPT_CONNECT | SUBOPT_MATCHDDLOWNER | SUBOPT_SYNC_CONNINFO); parse_subscription_options(stmt->options, supported_opts, &opts); /* @@ -570,6 +584,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner); + values[Anum_pg_subscription_subsyncconninfo - 1] = BoolGetDatum(opts.syncconninfo); /* encrypt conninfo */ char *encryptConninfo = EncryptOrDecryptConninfo(stmt->conninfo, 'E'); @@ -893,7 +908,8 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) /* Parse options. */ if (!stmt->refresh) { supported_opts = (SUBOPT_CONNINFO | SUBOPT_PUBLICATION | SUBOPT_ENABLED | SUBOPT_SLOT_NAME | - SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_SKIPLSN | SUBOPT_MATCHDDLOWNER); + SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_SKIPLSN | SUBOPT_MATCHDDLOWNER | + SUBOPT_SYNC_CONNINFO); parse_subscription_options(stmt->options, supported_opts, &opts); } else { supported_opts = SUBOPT_COPY_DATA; @@ -986,6 +1002,10 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_submatchddlowner - 1] = BoolGetDatum(opts.matchddlowner); replaces[Anum_pg_subscription_submatchddlowner - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_SYNC_CONNINFO)) { + values[Anum_pg_subscription_subsyncconninfo - 1] = BoolGetDatum(opts.syncconninfo); + replaces[Anum_pg_subscription_subsyncconninfo - 1] = true; + } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); diff --git a/src/gausskernel/storage/replication/logical/worker.cpp b/src/gausskernel/storage/replication/logical/worker.cpp index 4c416cc9b..d6ecd5de1 100644 --- a/src/gausskernel/storage/replication/logical/worker.cpp +++ b/src/gausskernel/storage/replication/logical/worker.cpp @@ -2387,6 +2387,13 @@ static void UpdateConninfo(char* standbysInfo) } subid = HeapTupleGetOid(tup); + if (!sub->subsyncconninfo) { + AbortOutOfAnyTransaction(); + ereport(LOG, (errmsg("Prevent subscription \"%s\" update connection info, as the configuration setting " + "\"syncconninfo = false\" is applied.", t_thrd.applyworker_cxt.mySubscription->name))); + return; + } + /* Form a new tuple. */ int rc = memset_s(nulls, sizeof(nulls), false, sizeof(nulls)); securec_check(rc, "", ""); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index a493c0472..37a505790 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -56,13 +56,14 @@ CATALOG(pg_subscription,6126) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6128) BKI_SCHE * skipped */ bool submatchddlowner; /* True if replicated objects by DDL replication * should match the original owner on the publisher */ + bool subsyncconninfo; /* True if allow synchronization of connection info */ #endif } FormData_pg_subscription; typedef FormData_pg_subscription *Form_pg_subscription; -#define Natts_pg_subscription 11 +#define Natts_pg_subscription 12 #define Anum_pg_subscription_subdbid 1 #define Anum_pg_subscription_subname 2 #define Anum_pg_subscription_subowner 3 @@ -74,6 +75,7 @@ typedef FormData_pg_subscription *Form_pg_subscription; #define Anum_pg_subscription_subbinary 9 #define Anum_pg_subscription_subskiplsn 10 #define Anum_pg_subscription_submatchddlowner 11 +#define Anum_pg_subscription_subsyncconninfo 12 typedef struct Subscription { Oid oid; /* Oid of the subscription */ @@ -90,6 +92,7 @@ typedef struct Subscription { * skipped */ bool matchddlowner; /* Indicated if replicated objects by DDL repllication * shold match the original owner on th publisher */ + bool subsyncconninfo; } Subscription; diff --git a/src/test/regress/input/subscription.source b/src/test/regress/input/subscription.source index 3083dc5b6..6167e7658 100644 --- a/src/test/regress/input/subscription.source +++ b/src/test/regress/input/subscription.source @@ -76,6 +76,11 @@ select subname, subskiplsn from pg_subscription where subname='testsub'; ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEFGH'); ALTER SUBSCRIPTION testsub SET (skiplsn = 'none'); select subname, subskiplsn from pg_subscription where subname='testsub'; +-- set syncconninfo +ALTER SUBSCRIPTION testsub SET (syncconninfo = true); +select subname, subsyncconninfo from pg_subscription where subname='testsub'; +ALTER SUBSCRIPTION testsub SET (syncconninfo = false); +select subname, subsyncconninfo from pg_subscription where subname='testsub'; -- disable test ALTER SUBSCRIPTION testsub DISABLE; --rename diff --git a/src/test/regress/output/subscription.source b/src/test/regress/output/subscription.source index 24fe8ecaa..1965db431 100644 --- a/src/test/regress/output/subscription.source +++ b/src/test/regress/output/subscription.source @@ -171,6 +171,21 @@ select subname, subskiplsn from pg_subscription where subname='testsub'; testsub | 0/0 (1 row) +-- set syncconninfo +ALTER SUBSCRIPTION testsub SET (syncconninfo = true); +select subname, subsyncconninfo from pg_subscription where subname='testsub'; + subname | subsyncconninfo +---------+----------------- + testsub | t +(1 row) + +ALTER SUBSCRIPTION testsub SET (syncconninfo = false); +select subname, subsyncconninfo from pg_subscription where subname='testsub'; + subname | subsyncconninfo +---------+----------------- + testsub | f +(1 row) + -- disable test ALTER SUBSCRIPTION testsub DISABLE; --rename @@ -359,13 +374,15 @@ SELECT object_name,detail_info FROM pg_query_audit('2022-01-13 9:30:00', '2031-1 testsub | ALTER SUBSCRIPTION testsub SET (binary=true); testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = '0/ABCDEF'); testsub | ALTER SUBSCRIPTION testsub SET (skiplsn = 'none'); + testsub | ALTER SUBSCRIPTION testsub SET (syncconninfo = true); + testsub | ALTER SUBSCRIPTION testsub SET (syncconninfo = false); testsub | ALTER SUBSCRIPTION testsub DISABLE; testsub | ALTER SUBSCRIPTION testsub rename to testsub_rename; sub_len_999 | CREATE SUBSCRIPTION sub_len_999 CONNECTION *********************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************************PUBLICATION insert_only WITH (connect = false); testsub_rename | DROP SUBSCRIPTION IF EXISTS testsub_rename; testsub_maskconninfo | DROP SUBSCRIPTION IF EXISTS testsub_maskconninfo; sub_len_999 | DROP SUBSCRIPTION IF EXISTS sub_len_999; -(20 rows) +(22 rows) --clear audit log SELECT pg_delete_audit('1012-11-10', '3012-11-11'); diff --git a/src/test/subscription/testcase/pub_switchover.sh b/src/test/subscription/testcase/pub_switchover.sh index ee6845671..19b98bd24 100644 --- a/src/test/subscription/testcase/pub_switchover.sh +++ b/src/test/subscription/testcase/pub_switchover.sh @@ -5,6 +5,9 @@ source $1/env_utils.sh $1 $2 case_db="sw_db" function test_1() { + + local conninfo = "" + echo "create database and tables." exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" @@ -17,8 +20,10 @@ function test_1() { echo "create publication and subscription." # Setup logical replication publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" - exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub for all tables" - exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION sw_tap_pub for all tables" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION sw_tap_sub CONNECTION '$publisher_connstr' PUBLICATION sw_tap_pub" + + conninfo = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription WHERE subname = 'sw_tap_sub'")" # Wait for initial table sync to finish wait_for_subscription_sync $case_db $sub_node1_port @@ -34,7 +39,7 @@ function test_1() { # test incremental synchronous exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (11)" - wait_for_catchup $case_db $pub_node1_port "tap_sub" + wait_for_catchup $case_db $pub_node1_port "sw_tap_sub" if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "11" ]; then echo "check incremental data of table is replicated success" @@ -48,7 +53,7 @@ function test_1() { exec_sql $case_db $pub_node2_port "INSERT INTO tab_rep VALUES (12)" - wait_for_catchup $case_db $pub_node2_port "tap_sub" + wait_for_catchup $case_db $pub_node2_port "sw_tap_sub" if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "12" ]; then echo "check incremental data of table is replicated after switchover success" @@ -56,13 +61,45 @@ function test_1() { echo "$failed_keyword when check incremental data of table is replicated after switchover" exit 1 fi + + # default sync conntion info + if [ "$conninfo" = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription where subname = 'sw_tap_sub'")" ]; then + echo "publication node switchover, subsciption node do sync connection info failed" + exit 1 + else + echo "publication node do switchover, subsciption node do sync connection info success" + fi + + # alter syncconninfo to false, disable sync conntion info after publication switchover + exec_sql $case_db $sub_node1_port "alter subscription sw_tap_sub set (syncconninfo = false)" + conninfo = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription WHERE subname = 'sw_tap_sub'")" + + echo "switchover pub_node1 to primary" + switchover_to_primary "pub_datanode1" + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (13)" + + wait_for_catchup $case_db $pub_node1_port "sw_tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "13" ]; then + echo "check incremental data of table is replicated after switchover success" + else + echo "$failed_keyword when check incremental data of table is replicated after switchover" + exit 1 + fi + + # syncconninfo = false + if [ "$conninfo" = "$(exec_sql $case_db $sub_node1_port "SELECT subconninfo FROM pg_subscription where subname = 'sw_tap_sub'")" ]; then + echo "publication node switchover, syncconninfo is false, subsciption node don't sync connection info , success." + else + echo "publication node do switchover, syncconninfo is false, subsciption node still do sync connection info, false." + exit 1 + fi } function tear_down(){ - switchover_to_primary "pub_datanode1" - - exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" - exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub" + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION sw_tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION sw_tap_pub" exec_sql $db $sub_node1_port "DROP DATABASE $case_db" exec_sql $db $pub_node1_port "DROP DATABASE $case_db"