From b2b4ee913c961506be3a8dc0ebfc45be966a2752 Mon Sep 17 00:00:00 2001 From: l30039603 Date: Mon, 6 Feb 2023 15:17:53 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=85=E6=B5=8B=E8=AF=95=E7=94=A8?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/replication/logical/tablesync.cpp | 1 + src/test/subscription/schedule | 7 +- src/test/subscription/testcase/binary.sh | 128 +++++++++++++ src/test/subscription/testcase/diff_schema.sh | 111 ++++++++++++ src/test/subscription/testcase/generated.sh | 69 +++++++ src/test/subscription/testcase/rewrite.sh | 68 +++++++ src/test/subscription/testcase/sync.sh | 169 ++++++++++++++++++ 7 files changed, 552 insertions(+), 1 deletion(-) create mode 100644 src/test/subscription/testcase/binary.sh create mode 100644 src/test/subscription/testcase/diff_schema.sh create mode 100644 src/test/subscription/testcase/generated.sh create mode 100644 src/test/subscription/testcase/rewrite.sh create mode 100644 src/test/subscription/testcase/sync.sh diff --git a/src/gausskernel/storage/replication/logical/tablesync.cpp b/src/gausskernel/storage/replication/logical/tablesync.cpp index d74a64521..2e457e679 100644 --- a/src/gausskernel/storage/replication/logical/tablesync.cpp +++ b/src/gausskernel/storage/replication/logical/tablesync.cpp @@ -696,6 +696,7 @@ static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRela " ON (i.indexrelid = pg_get_replica_identity_index(%u))" " WHERE a.attnum > 0::pg_catalog.int2" " AND NOT a.attisdropped" + " AND NOT EXISTS (SELECT * FROM pg_attrdef b WHERE b.adrelid = a.attrelid AND b.adnum = a.attnum)" " AND a.attrelid = %u" " ORDER BY a.attnum", lrel->remoteid, lrel->remoteid); diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule index 23ac89c74..123fe13d0 100644 --- a/src/test/subscription/schedule +++ b/src/test/subscription/schedule @@ -1,4 +1,9 @@ rep_changes pub_switchover types -constraints \ No newline at end of file +constraints +binary +diff_schema +generated +rewrite +sync diff --git a/src/test/subscription/testcase/binary.sh b/src/test/subscription/testcase/binary.sh new file mode 100644 index 000000000..7835c32ca --- /dev/null +++ b/src/test/subscription/testcase/binary.sh @@ -0,0 +1,128 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="binary_db" + +function test_1() { + # Create and initialize node + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + # Create tables on both sides of the replication + ddl="CREATE TABLE public.test_numerical ( + a INTEGER PRIMARY KEY, + b NUMERIC, + c FLOAT, + d BIGINT + ); + CREATE TABLE public.test_arrays ( + a INTEGER[] PRIMARY KEY, + b NUMERIC[], + c TEXT[] + )" + + exec_sql $case_db $pub_node1_port "$ddl" + exec_sql $case_db $sub_node1_port "$ddl" + + # Configure logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tpub FOR ALL TABLES" + + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstr' PUBLICATION tpub" + + # Ensure nodes are in sync with each other + wait_for_subscription_sync $case_db $sub_node1_port + + # Insert some content and make sure it's replicated across + exec_sql $case_db $pub_node1_port "INSERT INTO public.test_arrays (a, b, c) VALUES + ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), + ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}')" + exec_sql $case_db $pub_node1_port "INSERT INTO public.test_numerical (a, b, c, d) VALUES + (1, 1.2, 1.3, 10), + (2, 2.2, 2.3, 20), + (3, 3.2, 3.3, 30)" + + wait_for_catchup $case_db $pub_node1_port "tsub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b, c, d FROM test_numerical ORDER BY a")" = "1|1.2|1.3|10 +2|2.2|2.3|20 +3|3.2|3.3|30" ]; then + echo "check replicated data on subscriber success" + else + echo "$failed_keyword when check replicated data on subscriber" + exit 1 + fi + + # Test updates as well + exec_sql $case_db $pub_node1_port "UPDATE public.test_arrays SET b[1] = 42, c = NULL" + exec_sql $case_db $pub_node1_port "UPDATE public.test_numerical SET b = 42, c = NULL" + + wait_for_catchup $case_db $pub_node1_port "tsub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b, c FROM test_arrays ORDER BY a")" = "{1,2,3}|{42,1.2,1.3}| +{3,1,2}|{42,1.1,1.2}|" ]; then + echo "check updated replicated data on subscriber success" + else + echo "$failed_keyword when check updated replicated data on subscriber" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b, c, d FROM test_numerical ORDER BY a")" = "1|42||10 +2|42||20 +3|42||30" ]; then + echo "check updated replicated data on subscriber success" + else + echo "$failed_keyword when check updated replicated data on subscriber" + exit 1 + fi + + # Test to reset back to text formatting, and then to binary again + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tsub SET (binary = false)" + + exec_sql $case_db $pub_node1_port "INSERT INTO public.test_numerical (a, b, c, d) VALUES(4, 4.2, 4.3, 40)" + + wait_for_catchup $case_db $pub_node1_port "tsub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b, c, d FROM test_numerical ORDER BY a")" = "1|42||10 +2|42||20 +3|42||30 +4|4.2|4.3|40" ]; then + echo "check replicated data on subscriber success" + else + echo "$failed_keyword when check replicated data on subscriber" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tsub SET (binary = true)" + + exec_sql $case_db $pub_node1_port "INSERT INTO public.test_arrays (a, b, c) VALUES + ('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}')" + + wait_for_catchup $case_db $pub_node1_port "tsub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b, c FROM test_arrays ORDER BY a")" = "{1,2,3}|{42,1.2,1.3}| +{2,3,1}|{1.2,1.3,1.1}|{two,three,one} +{3,1,2}|{42,1.1,1.2}|" ]; then + echo "check replicated data on subscriber success" + else + echo "$failed_keyword when check replicated data on subscriber" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tsub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tpub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/diff_schema.sh b/src/test/subscription/testcase/diff_schema.sh new file mode 100644 index 000000000..4835350b0 --- /dev/null +++ b/src/test/subscription/testcase/diff_schema.sh @@ -0,0 +1,111 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="rep_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + # Create some preexisting content on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE test_tab (a int primary key, b varchar)" + exec_sql $case_db $pub_node1_port "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')" + + # Setup structure on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), count(c), count(d = 999) FROM test_tab")" = "2|2|2" ]; then + echo "check initial data was copied to subscriber success" + else + echo "$failed_keyword when check initial data was copied to subscriber" + exit 1 + fi + + # Update the rows on the publisher and check the additional columns on + # subscriber didn't change + exec_sql $case_db $pub_node1_port "UPDATE test_tab SET b = md5(b)" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), count(c), count(d = 999) FROM test_tab")" = "2|2|2" ]; then + echo "check extra columns contain local defaults after copy success" + else + echo "$failed_keyword when check extra columns contain local defaults after copy" + exit 1 + fi + + # Change the local values of the extra columns on the subscriber, + # update publisher, and check that subscriber retains the expected + # values + exec_sql $case_db $sub_node1_port "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" + exec_sql $case_db $pub_node1_port "UPDATE test_tab SET b = md5(a::text)" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab")" = "2|2|2" ]; then + echo "check extra columns contain locally changed data success" + else + echo "$failed_keyword when check extra columns contain locally changed data" + exit 1 + fi + + # Another insert + exec_sql $case_db $pub_node1_port "INSERT INTO test_tab VALUES (3, 'baz')" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), count(c), count(d = 999) FROM test_tab")" = "3|3|3" ]; then + echo "check extra columns contain local defaults after apply success" + else + echo "$failed_keyword when check extra columns contain local defaults after apply" + exit 1 + fi + + # Check a bug about adding a replica identity column on the subscriber + # that was not yet mapped to a column on the publisher. This would + # result in errors on the subscriber and replication thus not + # progressing. + exec_sql $case_db $pub_node1_port "CREATE TABLE test_tab2 (a int)" + exec_sql $case_db $sub_node1_port "CREATE TABLE test_tab2 (a int)" + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION" + + wait_for_subscription_sync $case_db $sub_node1_port + + # Add replica identity column. (The serial is not necessary, but it's + # a convenient way to get a default on the new column so that rows + # from the publisher that don't have the column yet can be inserted.) + # it's not supported to alter table add serial column, use default instead, + # can only insert 1 column + exec_sql $case_db $sub_node1_port "ALTER TABLE test_tab2 ADD COLUMN b int DEFAULT 1 PRIMARY KEY" + exec_sql $case_db $pub_node1_port "INSERT INTO test_tab2 VALUES (1)" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM test_tab2")" = "1|1|1" ]; then + echo "check replicated inserts on subscriber success" + else + echo "$failed_keyword when check replicated inserts on subscriber" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/generated.sh b/src/test/subscription/testcase/generated.sh new file mode 100644 index 000000000..5c7de1f73 --- /dev/null +++ b/src/test/subscription/testcase/generated.sh @@ -0,0 +1,69 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="generated_db" + +function test_1() { + # setup + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + exec_sql $case_db $pub_node1_port "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 2) STORED)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab1 (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a * 22) STORED)" + + # data for initial sync + exec_sql $case_db $pub_node1_port "INSERT INTO tab1 (a) VALUES (1), (2), (3)" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION pub1 FOR ALL TABLES" + + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" + + # Wait for initial sync of all subscriptions + echo "$(exec_sql $case_db $sub_node1_port "SELECT * FROM pg_replication_slots")" + + wait_for_subscription_sync $case_db $sub_node1_port + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b FROM tab1")" = "1|22 +2|44 +3|66" ]; then + echo "check generated columns initial sync success" + else + echo "$failed_keyword when generated columns initial sync" + exit 1 + fi + + # data to replicate + exec_sql $case_db $pub_node1_port "INSERT INTO tab1 VALUES (4), (5)" + exec_sql $case_db $pub_node1_port "UPDATE tab1 SET a = 6 WHERE a = 5" + + wait_for_catchup $case_db $pub_node1_port "sub1" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b FROM tab1")" = "1|22 +2|44 +3|66 +4|88 +6|132" ]; then + echo "check generated columns replicated success" + else + echo "$failed_keyword when generated columns replicated" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS sub1" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS pub1" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/rewrite.sh b/src/test/subscription/testcase/rewrite.sh new file mode 100644 index 000000000..7aaed1493 --- /dev/null +++ b/src/test/subscription/testcase/rewrite.sh @@ -0,0 +1,68 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="rewrite_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + exec_sql $case_db $pub_node1_port "CREATE TABLE test1 (a int, b text)" + exec_sql $case_db $sub_node1_port "CREATE TABLE test1 (a int, b text)" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION mypub FOR ALL TABLES" + + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub" + + # Wait for initial sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $pub_node1_port "INSERT INTO test1 (a, b) VALUES (1, 'one'), (2, 'two')" + + wait_for_catchup $case_db $pub_node1_port "mysub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b FROM test1")" = "1|one +2|two" ]; then + echo "check initial data replicated to subscriber success" + else + echo "$failed_keyword when initial data replicated to subscriber" + exit 1 + fi + + # DDL that causes a heap rewrite + exec_sql $case_db $pub_node1_port "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0" + exec_sql $case_db $sub_node1_port "ALTER TABLE test1 ADD c int NOT NULL DEFAULT 0" + + wait_for_catchup $case_db $pub_node1_port "mysub" + + exec_sql $case_db $pub_node1_port "INSERT INTO test1 (a, b, c) VALUES (3, 'three', 33)" + + wait_for_catchup $case_db $pub_node1_port "mysub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, b, c FROM test1")" = "1|one|0 +2|two|0 +3|three|33" ]; then + echo "check data replicated to subscriber success" + else + echo "$failed_keyword when data replicated to subscriber" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS mysub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS mypub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/sync.sh b/src/test/subscription/testcase/sync.sh new file mode 100644 index 000000000..7c5942930 --- /dev/null +++ b/src/test/subscription/testcase/sync.sh @@ -0,0 +1,169 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="sync_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + # Create some preexisting content on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,10)" + + # Setup structure on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key)" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "10" ]; then + echo "check initial data synced for first sub success" + else + echo "$failed_keyword when check initial data synced for first sub" + exit 1 + fi + + # drop subscription so that there is unreplicated data + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(11,20)" + + # recreate the subscription, it will try to do initial copy + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + # but it will be stuck on data copy as it will fail on constraint + poll_query_until $case_db $sub_node1_port "SELECT srsubstate FROM pg_subscription_rel" "d" "Timed out while waiting for subscriber to start sync" + + # remove the conflicting data + exec_sql $case_db $sub_node1_port "DELETE FROM tab_rep" + + # wait for sync to finish this time + wait_for_subscription_sync $case_db $sub_node1_port + + # check that all data is synced + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "20" ]; then + echo "check initial data synced for second sub success" + else + echo "$failed_keyword when initial data synced for second sub" + exit 1 + fi + + # now check another subscription for the same node pair + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" + + # wait for it to start + poll_query_until $case_db $sub_node1_port "SELECT count(*) FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL AND pid IS NOT NULL" "1" "Timed out while waiting for subscriber to start" + + # and drop both subscriptions + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub2" + + # check subscriptions are removed + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_subscription")" = "0" ]; then + echo "check second and third sub are dropped success" + else + echo "$failed_keyword when second and third sub are dropped" + exit 1 + fi + + # remove the conflicting data + exec_sql $case_db $sub_node1_port "DELETE FROM tab_rep" + + # recreate the subscription again + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + # and wait for data sync to finish again + wait_for_subscription_sync $case_db $sub_node1_port + + # check that all data is synced + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "20" ]; then + echo "check initial data synced for fourth sub success" + else + echo "$failed_keyword when initial data synced for fourth sub" + exit 1 + fi + + # add new table on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep_next (a int)" + + # setup structure with existing data on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep_next")" = "0" ]; then + echo "check no data for table added after subscription initialized success" + else + echo "$failed_keyword when no data for table added after subscription initialized" + exit 1 + fi + + # ask for data sync + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION" + + # wait for sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep_next")" = "10" ]; then + echo "check data for table added after subscription initialized are now synced success" + else + echo "$failed_keyword when data for table added after subscription initialized are now synced" + exit 1 + fi + + # Add some data + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep_next SELECT generate_series(1,10)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep_next")" = "20" ]; then + echo "check data for table added after subscription initialized are now synced success" + else + echo "$failed_keyword when changes for table added after subscription initialized replicated" + exit 1 + fi + + # clean up + exec_sql $case_db $pub_node1_port "DROP TABLE tab_rep_next" + exec_sql $case_db $sub_node1_port "DROP TABLE tab_rep_next" + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" + + # Table tap_rep already has the same records on both publisher and subscriber + # at this time. Recreate the subscription which will do the initial copy of + # the table again and fails due to unique constraint violation. + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + poll_query_until $case_db $sub_node1_port "SELECT srsubstate FROM pg_subscription_rel" "d" "Timed out while waiting for subscriber to start sync" + + # DROP SUBSCRIPTION must clean up slots on the publisher side when the + # subscriber is stuck on data copy for constraint violation. + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" + + if [ "$(exec_sql $case_db $pub_node1_port "SELECT count(*) FROM pg_replication_slots")" = "2" ]; then + echo "check DROP SUBSCRIPTION during error can clean up the slots on the publisher success" + else + echo "$failed_keyword when DROP SUBSCRIPTION during error can clean up the slots on the publisher" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub2" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file