diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp index 9c2f23ac1..10448434f 100644 --- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp +++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp @@ -559,6 +559,7 @@ ObjectAddress CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) } if (!CheckPublicationsExistOnPublisher(publications)) { + (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); ereport(ERROR, (errmsg("There are some publications not exist on the publisher."))); } @@ -947,6 +948,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg( "Failed to connect to publisher."))); } if (!CheckPublicationsExistOnPublisher(publications)) { + (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); ereport(ERROR, (errmsg("There are some publications not exist on the publisher."))); } (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); @@ -967,6 +969,7 @@ ObjectAddress AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) } if (!CheckPublicationsExistOnPublisher(publications)) { + (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_disconnect(); ereport(ERROR, (errmsg("There are some publications not exist on the publisher."))); } diff --git a/src/gausskernel/storage/replication/logical/tablesync.cpp b/src/gausskernel/storage/replication/logical/tablesync.cpp index 2e457e679..11484ae5d 100644 --- a/src/gausskernel/storage/replication/logical/tablesync.cpp +++ b/src/gausskernel/storage/replication/logical/tablesync.cpp @@ -696,7 +696,8 @@ static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRela " ON (i.indexrelid = pg_get_replica_identity_index(%u))" " WHERE a.attnum > 0::pg_catalog.int2" " AND NOT a.attisdropped" - " AND NOT EXISTS (SELECT * FROM pg_attrdef b WHERE b.adrelid = a.attrelid AND b.adnum = a.attnum)" + " AND NOT EXISTS (SELECT * FROM pg_attrdef b WHERE b.adrelid = a.attrelid AND b.adnum = a.attnum" + " AND b.adgencol = 's')" " AND a.attrelid = %u" " ORDER BY a.attnum", lrel->remoteid, lrel->remoteid); diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule index 123fe13d0..c66d5af9b 100644 --- a/src/test/subscription/schedule +++ b/src/test/subscription/schedule @@ -7,3 +7,6 @@ diff_schema generated rewrite sync +encoding +ddl +matviews \ No newline at end of file diff --git a/src/test/subscription/testcase/bugs.sh b/src/test/subscription/testcase/bugs.sh new file mode 100644 index 000000000..e69de29bb diff --git a/src/test/subscription/testcase/ddl.sh b/src/test/subscription/testcase/ddl.sh new file mode 100644 index 000000000..944391687 --- /dev/null +++ b/src/test/subscription/testcase/ddl.sh @@ -0,0 +1,57 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="ddl_db" + +# Test some logical replication DDL behavior +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + exec_sql $case_db $pub_node1_port "CREATE TABLE test1 (a int, b text);" + exec_sql $case_db $sub_node1_port "CREATE TABLE test1 (a int, b text);" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION mypub FOR ALL TABLES;" + + # One of the specified publications exists. + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub, non_existent_pub" 2> exec.out + if [ "$(cat exec.out)" = "ERROR: There are some publications not exist on the publisher." ]; then + echo "check create subscription throws warning for non-existent publication success" + else + echo "$failed_keyword when check create subscription throws warning for non-existent publication" + exit 1 + fi + rm exec.out + + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub" + + wait_for_subscription_sync $case_db $sub_node1_port "mysub" + + # Specifying non-existent publication along with set publication. + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION mysub SET PUBLICATION non_existent_pub" 2> exec.out + if [ "$(cat exec.out)" = "ERROR: There are some publications not exist on the publisher." ]; then + echo "check alter subscription set publication throws warning for non-existent publications success" + else + echo "$failed_keyword when check alter subscription set publication throws warning for non-existent publications" + exit 1 + fi + rm exec.out +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS mysub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS mypub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/encoding.sh b/src/test/subscription/testcase/encoding.sh new file mode 100644 index 000000000..290ba6a8a --- /dev/null +++ b/src/test/subscription/testcase/encoding.sh @@ -0,0 +1,48 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="encoding_db" + +# Test replication between databases with different encodings +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db WITH encoding 'UTF8'" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db WITH LC_CTYPE 'en_US' encoding 'LATIN1' lc_collate 'en_US'" + + exec_sql $case_db $pub_node1_port "CREATE TABLE test1 (a int, b text);" + exec_sql $case_db $sub_node1_port "CREATE TABLE test1 (a int, b text);" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION mypub FOR ALL TABLES;" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub" + + # Wait for initial sync to finish + wait_for_subscription_sync $case_db $sub_node1_port "mysub" + + exec_sql $case_db $pub_node1_port "INSERT INTO test1 VALUES (1, E'Mot\xc3\xb6rhead')" + + wait_for_catchup $case_db $pub_node1_port "mysub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a FROM test1 WHERE b = E'Mot\xf6rhead'")" = "1" ]; then + echo "check data replicated to subscriber success" + else + echo "$failed_keyword when check data replicated to subscriber" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS mysub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS mypub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/generated.sh b/src/test/subscription/testcase/generated.sh index 5c7de1f73..b369d4be2 100644 --- a/src/test/subscription/testcase/generated.sh +++ b/src/test/subscription/testcase/generated.sh @@ -11,10 +11,13 @@ function test_1() { exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" exec_sql $case_db $pub_node1_port "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab2 (a int PRIMARY KEY, b int default 5)" exec_sql $case_db $sub_node1_port "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab2 (a int PRIMARY KEY, b int default 10)" # data for initial sync exec_sql $case_db $pub_node1_port "INSERT INTO tab1 (a) VALUES (1), (2), (3)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab2 (a) VALUES (1), (2), (3)" echo "create publication and subscription." publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" @@ -37,9 +40,19 @@ function test_1() { exit 1 fi + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b FROM tab2")" = "1|5 +2|5 +3|5" ]; then + echo "check default columns initial sync success" + else + echo "$failed_keyword when default columns initial sync" + exit 1 + fi + # data to replicate exec_sql $case_db $pub_node1_port "INSERT INTO tab1 VALUES (4), (5)" exec_sql $case_db $pub_node1_port "UPDATE tab1 SET a = 6 WHERE a = 5" + exec_sql $case_db $pub_node1_port "INSERT INTO tab2 VALUES (4), (5)" wait_for_catchup $case_db $pub_node1_port "sub1" @@ -53,6 +66,17 @@ function test_1() { echo "$failed_keyword when generated columns replicated" exit 1 fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b FROM tab2")" = "1|5 +2|5 +3|5 +4|5 +5|5" ]; then + echo "check default columns replicated success" + else + echo "$failed_keyword when default columns replicated" + exit 1 + fi } function tear_down() { diff --git a/src/test/subscription/testcase/matviews.sh b/src/test/subscription/testcase/matviews.sh new file mode 100644 index 000000000..c04e57758 --- /dev/null +++ b/src/test/subscription/testcase/matviews.sh @@ -0,0 +1,51 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="matviews_db" + +# Test materialized views behavior +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + exec_sql $case_db $pub_node1_port "CREATE TABLE test1 (a int PRIMARY KEY, b text)" + exec_sql $case_db $pub_node1_port "INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two')" + exec_sql $case_db $sub_node1_port "CREATE TABLE test1 (a int PRIMARY KEY, b text)" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION mypub FOR ALL TABLES;" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub" + + # Wait for initial sync to finish + wait_for_subscription_sync $case_db $sub_node1_port "mysub" + + # Materialized views are not supported by logical replication, but + # logical decoding does produce change information for them, so we + # need to make sure they are properly ignored. + + # create a MV with some data + exec_sql $case_db $pub_node1_port "CREATE MATERIALIZED VIEW testmv1 AS SELECT * FROM test1;" + wait_for_catchup $case_db $pub_node1_port "mysub" + + # There is no equivalent relation on the subscriber, but MV data is + # not replicated, so this does not hang. + + echo "materialized view data not replicated"; +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS mysub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS mypub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file