From 5927d3111a7b7226b5f3f892918efc5501a585e4 Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Tue, 27 Dec 2022 11:38:34 +0800 Subject: [PATCH] fix table sync with different column order && add testcase for subscription --- .../storage/replication/logical/tablesync.cpp | 15 +- src/test/subscription/env_utils.sh | 38 +- src/test/subscription/schedule | 4 +- src/test/subscription/testcase/constraints.sh | 137 +++++ src/test/subscription/testcase/rep_changes.sh | 285 ++++++++- src/test/subscription/testcase/types.sh | 552 ++++++++++++++++++ 6 files changed, 1000 insertions(+), 31 deletions(-) create mode 100644 src/test/subscription/testcase/constraints.sh create mode 100644 src/test/subscription/testcase/types.sh diff --git a/src/gausskernel/storage/replication/logical/tablesync.cpp b/src/gausskernel/storage/replication/logical/tablesync.cpp index 4f8726343..d74a64521 100644 --- a/src/gausskernel/storage/replication/logical/tablesync.cpp +++ b/src/gausskernel/storage/replication/logical/tablesync.cpp @@ -547,21 +547,10 @@ void process_syncing_tables(XLogRecPtr current_lsn) static List *make_copy_attnamelist(LogicalRepRelMapEntry *rel) { List *attnamelist = NIL; - TupleDesc desc = RelationGetDescr(rel->localrel); int i; - for (i = 0; i < desc->natts; i++) { - int remoteattnum = rel->attrmap[i]; - - /* Skip dropped attributes. */ - if (desc->attrs[i]->attisdropped) - continue; - - /* Skip attributes that are missing on remote side. */ - if (remoteattnum < 0) - continue; - - attnamelist = lappend(attnamelist, makeString(rel->remoterel.attnames[remoteattnum])); + for (i = 0; i < rel->remoterel.natts; i++) { + attnamelist = lappend(attnamelist, makeString(rel->remoterel.attnames[i])); } return attnamelist; diff --git a/src/test/subscription/env_utils.sh b/src/test/subscription/env_utils.sh index 2e8bcb91a..b29bc8479 100644 --- a/src/test/subscription/env_utils.sh +++ b/src/test/subscription/env_utils.sh @@ -28,9 +28,9 @@ gsctl_wait_time=3600 data_dir=$g_data_path function exec_sql(){ - result=$(gsql -d $1 -p $2 -Atq -c "$3" |sed 'N;s/\n/ /g;b') + result=$(gsql -d $1 -p $2 -Atq -c "$3") if [ "$result" != "" ]; then - echo $result + echo "$result" fi } @@ -84,3 +84,37 @@ function switchover_to_primary() { exit 1 fi } + +function get_log_file(){ + logfile=$(ls -rtl $data_dir/$1/pg_log/ | tail -n 1 | awk '{print $9}') + echo "$data_dir/$1/pg_log/$logfile" +} + +function restart_guc(){ + gs_guc set -D $data_dir/$1 -c "$2" + gs_ctl restart -D $data_dir/$1 + if [ $? -eq 0 ]; then + echo "restart $2 success!" + else + echo "$failed_keyword, restart $2 fail!" + exit 1 + fi +} + +function poll_query_until(){ + max_attempts=20 + attempt=0 + while (($attempt < $max_attempts)) + do + if [ "$(exec_sql $1 $2 "$3")" = "$4" ]; then + break + fi + sleep 1 + attempt=`expr $attempt \+ 1` + done + + if [ $attempt -eq $max_attempts ]; then + echo "$failed_keyword, $5" + exit 1 + fi +} diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule index 887d694ef..23ac89c74 100644 --- a/src/test/subscription/schedule +++ b/src/test/subscription/schedule @@ -1,2 +1,4 @@ rep_changes -pub_switchover \ No newline at end of file +pub_switchover +types +constraints \ No newline at end of file diff --git a/src/test/subscription/testcase/constraints.sh b/src/test/subscription/testcase/constraints.sh new file mode 100644 index 000000000..477451bc7 --- /dev/null +++ b/src/test/subscription/testcase/constraints.sh @@ -0,0 +1,137 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="constraints" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + # Setup structure on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_fk (bid int PRIMARY KEY);" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));" + + # Setup structure on subscriber; column order intentionally different + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_fk (bid int PRIMARY KEY);" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES;" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_fk (bid) VALUES (1);" + # "junk" value is meant to be large enough to force out-of-line storage + exec_sql $case_db $pub_node1_port "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk;")" = "1|1|1" ]; then + echo "check replicated tab_fk inserts on subscriber success" + else + echo "$failed_keyword when check replicated tab_fk inserts on subscriber change" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;")" = "1|1|1" ]; then + echo "check replicated tab_fk_ref inserts on subscriber success" + else + echo "$failed_keyword when check replicated tab_fk_ref inserts on subscriber" + exit 1 + fi + + # Drop the fk on publisher + exec_sql $case_db $pub_node1_port "DROP TABLE tab_fk CASCADE;" + + # Insert data + exec_sql $case_db $pub_node1_port "INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + # FK is not enforced on subscriber + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;")" = "2|1|2" ]; then + echo "check FK ignored on subscriber success" + else + echo "$failed_keyword when check FK ignored on subscriber" + exit 1 + fi + + # Add replica trigger + exec_sql $case_db $sub_node1_port "CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$ + BEGIN + IF (TG_OP = 'INSERT') THEN + IF (NEW.id < 10) THEN + RETURN NEW; + ELSE + RETURN NULL; + END IF; + ELSIF (TG_OP = 'UPDATE') THEN + RETURN NULL; + ELSE + RAISE WARNING 'Unknown action'; + RETURN NULL; + END IF; + END; + \$\$ LANGUAGE plpgsql; + CREATE TRIGGER filter_basic_dml_trg + BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref + FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn(); + ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;" + + # Insert data + exec_sql $case_db $pub_node1_port "INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + # The trigger should cause the insert to be skipped on subscriber + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;")" = "2|1|2" ]; then + echo "check replica insert trigger applied on subscriber success" + else + echo "$failed_keyword when check replica insert trigger applied on subscriber" + exit 1 + fi + + # Update data + exec_sql $case_db $pub_node1_port "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + # The trigger should cause the update to be skipped on subscriber + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;")" = "2|1|2" ]; then + echo "check replica update column trigger applied on subscriber success" + else + echo "$failed_keyword when check replica update column trigger applied on subscriber" + exit 1 + fi + + # Update on a column not specified in the trigger, but it will trigger + # anyway because logical replication ships all columns in an update. + exec_sql $case_db $pub_node1_port "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(id), max(id) FROM tab_fk_ref;")" = "2|1|2" ]; then + echo "check column trigger applied even on update for other column success" + else + echo "$failed_keyword when check column trigger applied even on update for other column" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/rep_changes.sh b/src/test/subscription/testcase/rep_changes.sh index cb2d5f22b..6b2ec0da0 100644 --- a/src/test/subscription/testcase/rep_changes.sh +++ b/src/test/subscription/testcase/rep_changes.sh @@ -15,6 +15,8 @@ function test_1() { exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full2 (x text)" exec_sql $case_db $pub_node1_port "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')" exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_mixed (a int primary key, b text, c numeric)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)" exec_sql $case_db $pub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)" exec_sql $case_db $pub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)" exec_sql $case_db $pub_node1_port "ALTER TABLE tab_include REPLICA IDENTITY FULL" @@ -36,6 +38,10 @@ function test_1() { exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full_pk (a int primary key, b text)" exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL" exec_sql $case_db $sub_node1_port "CREATE TABLE tab_nothing (a int)" + + # different column count and order than on publisher + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)" + # replication of the table with included index exec_sql $case_db $sub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)" exec_sql $case_db $sub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)" @@ -49,7 +55,7 @@ function test_1() { publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub" exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)" - exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_include, tab_nothing, tab_full_pk, tab_no_replidentity_index" + exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_nothing, tab_full_pk, tab_no_replidentity_index" exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins" exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" @@ -70,18 +76,19 @@ function test_1() { exit 1 fi - exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins SELECT generate_series(1,50)" - exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a > 20" - exec_sql $case_db $pub_node1_port "UPDATE tab_ins SET a = -a" - exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,50)" - exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep WHERE a > 20" - exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = -a" - exec_sql $case_db $pub_node1_port "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')" - exec_sql $case_db $pub_node1_port "INSERT INTO tab_nothing VALUES (generate_series(1,20))" - exec_sql $case_db $pub_node1_port "INSERT INTO tab_include SELECT generate_series(1,50)" - exec_sql $case_db $pub_node1_port "DELETE FROM tab_include WHERE a > 20" - exec_sql $case_db $pub_node1_port "UPDATE tab_include SET a = -a" - exec_sql $case_db $pub_node1_port "INSERT INTO tab_no_replidentity_index VALUES(1)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins SELECT generate_series(1,50)" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a > 20" + exec_sql $case_db $pub_node1_port "UPDATE tab_ins SET a = -a" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,50)" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep WHERE a > 20" + exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = -a" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_nothing VALUES (generate_series(1,20))" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_include SELECT generate_series(1,50)" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_include WHERE a > 20" + exec_sql $case_db $pub_node1_port "UPDATE tab_include SET a = -a" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_no_replidentity_index VALUES(1)" wait_for_catchup $case_db $pub_node1_port "tap_sub" @@ -99,6 +106,14 @@ function test_1() { exit 1 fi + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_mixed")" = "local|1.1|foo|1 +local|2.2|bar|2" ]; then + echo "check replicated changes with different column order success" + else + echo "$failed_keyword when check replicated changes with different column order" + exit 1 + fi + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_nothing")" = "20" ]; then echo "check replicated changes with REPLICA IDENTITY NOTHING success" else @@ -212,11 +227,251 @@ function test_1() { exec_sql $case_db $pub_node1_port "DROP TABLE temp2" exec_sql $case_db $sub_node1_port "DROP TABLE temp1" exec_sql $case_db $sub_node1_port "DROP TABLE temp2" + + # add REPLICA IDENTITY FULL so we can update + exec_sql $case_db $pub_node1_port "ALTER TABLE tab_full REPLICA IDENTITY FULL" + exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full REPLICA IDENTITY FULL" + exec_sql $case_db $pub_node1_port "ALTER TABLE tab_full2 REPLICA IDENTITY FULL" + exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full2 REPLICA IDENTITY FULL" + exec_sql $case_db $pub_node1_port "ALTER TABLE tab_ins REPLICA IDENTITY FULL" + exec_sql $case_db $sub_node1_port "ALTER TABLE tab_ins REPLICA IDENTITY FULL" + # tab_mixed can use DEFAULT, since it has a primary key + + # and do the updates + exec_sql $case_db $pub_node1_port "UPDATE tab_full SET a = a * a" + exec_sql $case_db $pub_node1_port "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'" + exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET b = 'baz' WHERE a = 1" + exec_sql $case_db $pub_node1_port "UPDATE tab_full_pk SET b = 'bar' WHERE a = 1" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_full")" = "20|1|100" ]; then + echo "check update works with REPLICA IDENTITY FULL and duplicate tuples success" + else + echo "$failed_keyword when check update works with REPLICA IDENTITY FULL and duplicate tuples" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT x FROM tab_full2 ORDER BY 1")" = "a +bb +bb" ]; then + echo "check update works with REPLICA IDENTITY FULL and text datums success" + else + echo "$failed_keyword when check update works with REPLICA IDENTITY FULL and text datums" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_mixed ORDER BY a")" = "local|1.1|baz|1 +local|2.2|bar|2" ]; then + echo "check update works with different column order and subscriber local values success" + else + echo "$failed_keyword when check update works with different column order and subscriber local values" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_full_pk ORDER BY a")" = "1|bar +2|baz" ]; then + echo "check update works with REPLICA IDENTITY FULL and a primary key success" + else + echo "$failed_keyword when check update works with REPLICA IDENTITY FULL and a primary key" + exit 1 + fi + + # Check that subscriber handles cases where update/delete target tuple + # is missing. + exec_sql $case_db $sub_node1_port "DELETE FROM tab_full_pk" + + # Note that the current location of the log file is not grabbed immediately + # after reloading the configuration, but after sending one SQL command to + # the node so that we are sure that the reloading has taken effect. + logfile=$(get_log_file "sub_datanode1") + + location=$(awk 'END{print NR}' $logfile) + + exec_sql $case_db $pub_node1_port "UPDATE tab_full_pk SET b = 'quux' WHERE a = 1" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + content=$(tail -n +$location $logfile) + if [[ "$content" =~ "logical replication did not find row for update in replication target relation \"tab_full_pk\"" ]]; then + echo "check print updated row is missing success" + else + echo "$failed_keyword when check print updated row is missing" + exit 1 + fi + + # check behavior with toasted values + exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET b = repeat('xyzzy', 100000) WHERE a = 2" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a")" = "1|3|1.1|local +2|500000|2.2|local" ]; then + echo "check update transmits large column value success" + else + echo "$failed_keyword when check update transmits large column value" + exit 1 + fi + + exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET c = 3.3 WHERE a = 2" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a")" = "1|3|1.1|local +2|500000|3.3|local" ]; then + echo "check update with non-transmitted large column value success" + else + echo "$failed_keyword when check update with non-transmitted large column value" + exit 1 + fi + + # check behavior with dropped columns + + # this update should get transmitted before the column goes away + exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET b = 'bar', c = 2.2 WHERE a = 2" + exec_sql $case_db $pub_node1_port "ALTER TABLE tab_mixed DROP COLUMN b" + exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET c = 11.11 WHERE a = 1" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_mixed ORDER BY a")" = "local|11.11|baz|1 +local|2.2|bar|2" ]; then + echo "check update works with dropped publisher column success" + else + echo "$failed_keyword when check update works with dropped publisher column" + exit 1 + fi + + exec_sql $case_db $sub_node1_port "ALTER TABLE tab_mixed DROP COLUMN d" + exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET c = 22.22 WHERE a = 2" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_mixed ORDER BY a")" = "11.11|baz|1 +22.22|bar|2" ]; then + echo "check update works with dropped subscriber column success" + else + echo "$failed_keyword when check update works with dropped subscriber column" + exit 1 + fi + + # check that change of connection string and/or publication list causes + # restart of subscription workers. We check the state along with + # application_name to ensure that the walsender is (re)started. + # + # Not all of these are registered as tests as we need to poll for a change + # but the test suite will fail nonetheless when something goes wrong. + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only" + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins SELECT generate_series(1001,1100)" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1152|1|1100" ]; then + echo "check replicated inserts after subscription publication change success" + else + echo "$failed_keyword when check replicated inserts after subscription publication change" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_rep")" = "20|-20|-1" ]; then + echo "check changes skipped after subscription publication change success" + else + echo "$failed_keyword when check changes skipped after subscription publication change" + exit 1 + fi + + # check alter publication (relcache invalidation etc) + exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')" + exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a > 0" + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = false)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_full VALUES(0)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_notrep VALUES (11)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_notrep")" = "0" ]; then + echo "check non-replicated table is empty on subscriber success" + else + echo "$failed_keyword when check non-replicated table is empty on subscriber" + exit 1 + fi + + # note that data are different on provider and subscriber + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1052|1|1002" ]; then + echo "check replicated deletes after alter publication success" + else + echo "$failed_keyword when check replicated deletes after alter publication" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_full")" = "21|0|100" ]; then + echo "check replicated insert after alter publication success" + else + echo "$failed_keyword when check replicated insert after alter publication" + exit 1 + fi + + # check restart on rename + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed" + + poll_query_until $case_db $sub_node1_port "SELECT subname FROM pg_stat_subscription" "tap_sub_renamed" "Timed out while waiting for apply to restart after renaming SUBSCRIPTION" + + # check all the cleanup + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub_renamed" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_subscription")" = "0" ]; then + echo "check subscription was dropped on subscriber success" + else + echo "$failed_keyword when check subscription was dropped on subscriber" + exit 1 + fi + + if [ "$(exec_sql $case_db $pub_node1_port "SELECT count(*) FROM pg_replication_slots")" = "2" ]; then + echo "check replication slot was dropped on publisher success" + else + echo "$failed_keyword when check replication slot was dropped on publisher" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_subscription_rel")" = "0" ]; then + echo "check subscription relation status was dropped on subscriber success" + else + echo "$failed_keyword when check subscription relation status was dropped on subscriber" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_replication_origin")" = "0" ]; then + echo "check replication origin was dropped on subscriber success" + else + echo "$failed_keyword when check replication origin was dropped on subscriber" + exit 1 + fi + + # CREATE PUBLICATION while wal_level=hot_standby should succeed, with a WARNING + restart_guc "pub_datanode1" "wal_level = hot_standby" + logfile=$(get_log_file "pub_datanode1") + + location=$(awk 'END{print NR}' $logfile) + + exec_sql $case_db $pub_node1_port "BEGIN;CREATE TABLE skip_wal(a int);CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal;ROLLBACK;" + + content=$(tail -n +$location $logfile) + if [[ "$content" =~ "wal_level is insufficient to publish logical changes" ]]; then + echo "check print wal_level warning success" + else + echo "$failed_keyword when check print wal_level warning" + exit 1 + fi + + restart_guc "pub_datanode1" "wal_level = logical" } function tear_down() { - exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" - exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub, tap_pub_ins_only" + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub, tap_pub_ins_only" exec_sql $db $sub_node1_port "DROP DATABASE $case_db" exec_sql $db $pub_node1_port "DROP DATABASE $case_db" diff --git a/src/test/subscription/testcase/types.sh b/src/test/subscription/testcase/types.sh new file mode 100644 index 000000000..e74ad2be1 --- /dev/null +++ b/src/test/subscription/testcase/types.sh @@ -0,0 +1,552 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="types" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + + # Setup structure on both nodes + ddl="CREATE TABLE public.tst_one_array ( + a INTEGER PRIMARY KEY, + b INTEGER[] + ); + CREATE TABLE public.tst_arrays ( + a INTEGER[] PRIMARY KEY, + b TEXT[], + c FLOAT[], + d INTERVAL[] + ); + + CREATE TYPE public.tst_enum_t AS ENUM ('a', 'b', 'c', 'd', 'e'); + CREATE TABLE public.tst_one_enum ( + a INTEGER PRIMARY KEY, + b public.tst_enum_t + ); + CREATE TABLE public.tst_enums ( + a public.tst_enum_t PRIMARY KEY, + b public.tst_enum_t[] + ); + + CREATE TYPE public.tst_comp_basic_t AS (a FLOAT, b TEXT, c INTEGER); + CREATE TYPE public.tst_comp_enum_t AS (a FLOAT, b public.tst_enum_t, c INTEGER); + CREATE TYPE public.tst_comp_enum_array_t AS (a FLOAT, b public.tst_enum_t[], c INTEGER); + CREATE TABLE public.tst_one_comp ( + a INTEGER PRIMARY KEY, + b public.tst_comp_basic_t + ); + CREATE TABLE public.tst_comps ( + a public.tst_comp_basic_t PRIMARY KEY, + b public.tst_comp_basic_t[] + ); + CREATE TABLE public.tst_comp_enum ( + a INTEGER PRIMARY KEY, + b public.tst_comp_enum_t + ); + CREATE TABLE public.tst_comp_enum_array ( + a public.tst_comp_enum_t PRIMARY KEY, + b public.tst_comp_enum_t[] + ); + CREATE TABLE public.tst_comp_one_enum_array ( + a INTEGER PRIMARY KEY, + b public.tst_comp_enum_array_t + ); + CREATE TABLE public.tst_comp_enum_what ( + a public.tst_comp_enum_array_t PRIMARY KEY, + b public.tst_comp_enum_array_t[] + ); + + CREATE TYPE public.tst_comp_mix_t AS ( + a public.tst_comp_basic_t, + b public.tst_comp_basic_t[], + c public.tst_enum_t, + d public.tst_enum_t[] + ); + CREATE TABLE public.tst_comp_mix_array ( + a public.tst_comp_mix_t PRIMARY KEY, + b public.tst_comp_mix_t[] + ); + CREATE TABLE public.tst_range ( + a INTEGER PRIMARY KEY, + b int4range + ); + CREATE TABLE public.tst_range_array ( + a INTEGER PRIMARY KEY, + b TSTZRANGE, + c int8range[] + ); + CREATE TABLE public.tst_hstore ( + a INTEGER PRIMARY KEY, + b hstore + ); + + SET check_function_bodies=off; + CREATE FUNCTION public.monot_incr(int) RETURNS bool LANGUAGE sql + AS ' select \$1 > max(a) from public.tst_dom_constr; '; + CREATE DOMAIN monot_int AS int CHECK (monot_incr(VALUE)); + CREATE TABLE public.tst_dom_constr (a monot_int);" + + exec_sql $case_db $pub_node1_port "$ddl" + exec_sql $case_db $sub_node1_port "$ddl" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" + + # Wait for initial sync to finish as well + wait_for_subscription_sync $case_db $sub_node1_port + + # Insert initial test data + exec_sql $case_db $pub_node1_port "-- test_tbl_one_array_col + INSERT INTO tst_one_array (a, b) VALUES + (1, '{1, 2, 3}'), + (2, '{2, 3, 1}'), + (3, '{3, 2, 1}'), + (4, '{4, 3, 2}'), + (5, '{5, NULL, 3}'); + + -- test_tbl_arrays + INSERT INTO tst_arrays (a, b, c, d) VALUES + ('{1, 2, 3}', '{\"a\", \"b\", \"c\"}', '{1.1, 2.2, 3.3}', '{\"1 day\", \"2 days\", \"3 days\"}'), + ('{2, 3, 1}', '{\"b\", \"c\", \"a\"}', '{2.2, 3.3, 1.1}', '{\"2 minutes\", \"3 minutes\", \"1 minute\"}'), + ('{3, 1, 2}', '{\"c\", \"a\", \"b\"}', '{3.3, 1.1, 2.2}', '{\"3 years\", \"1 year\", \"2 years\"}'), + ('{4, 1, 2}', '{\"d\", \"a\", \"b\"}', '{4.4, 1.1, 2.2}', '{\"4 years\", \"1 year\", \"2 years\"}'), + ('{5, NULL, NULL}', '{\"e\", NULL, \"b\"}', '{5.5, 1.1, NULL}', '{\"5 years\", NULL, NULL}'); + + -- test_tbl_single_enum + INSERT INTO tst_one_enum (a, b) VALUES + (1, 'a'), + (2, 'b'), + (3, 'c'), + (4, 'd'), + (5, NULL); + + -- test_tbl_enums + INSERT INTO tst_enums (a, b) VALUES + ('a', '{b, c}'), + ('b', '{c, a}'), + ('c', '{b, a}'), + ('d', '{c, b}'), + ('e', '{d, NULL}'); + + -- test_tbl_single_composites + INSERT INTO tst_one_comp (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)), + (3, ROW(3.0, 'c', 3)), + (4, ROW(4.0, 'd', 4)), + (5, ROW(NULL, NULL, 5)); + + -- test_tbl_composites + INSERT INTO tst_comps (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]), + (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]), + (ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]), + (ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]); + + -- test_tbl_composite_with_enums + INSERT INTO tst_comp_enum (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)), + (3, ROW(3.0, 'c', 3)), + (4, ROW(4.0, 'd', 4)), + (5, ROW(NULL, 'e', NULL)); + + -- test_tbl_composite_with_enums_array + INSERT INTO tst_comp_enum_array (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]), + (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]), + (ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]), + (ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]); + + -- test_tbl_composite_with_single_enums_array_in_composite + INSERT INTO tst_comp_one_enum_array (a, b) VALUES + (1, ROW(1.0, '{a, b, c}', 1)), + (2, ROW(2.0, '{a, b, c}', 2)), + (3, ROW(3.0, '{a, b, c}', 3)), + (4, ROW(4.0, '{c, b, d}', 4)), + (5, ROW(5.0, '{NULL, e, NULL}', 5)); + + -- test_tbl_composite_with_enums_array_in_composite + INSERT INTO tst_comp_enum_what (a, b) VALUES + (ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]), + (ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]), + (ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]), + (ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]), + (ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]); + + -- test_tbl_mixed_composites + INSERT INTO tst_comp_mix_array (a, b) VALUES + (ROW( + ROW(1,'a',1), + ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t], + 'a', + '{a,b,NULL,c}'), + ARRAY[ + ROW( + ROW(1,'a',1), + ARRAY[ + ROW(1,'a',1)::tst_comp_basic_t, + ROW(2,'b',2)::tst_comp_basic_t, + NULL + ], + 'a', + '{a,b,c}' + )::tst_comp_mix_t + ] + ); + + -- test_tbl_range + INSERT INTO tst_range (a, b) VALUES + (1, '[1, 10]'), + (2, '[2, 20]'), + (3, '[3, 30]'), + (4, '[4, 40]'), + (5, '[5, 50]'); + + -- test_tbl_range_array + INSERT INTO tst_range_array (a, b, c) VALUES + (1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{\"[1,2]\", \"[10,20]\"}'), + (2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{\"[2,3]\", \"[20,30]\"}'), + (3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{\"[3,4]\"}'), + (4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{\"[4,5]\", NULL, \"[40,50]\"}'), + (5, NULL, NULL); + + -- tst_hstore + INSERT INTO tst_hstore (a, b) VALUES + (1, '\"a\"=>\"1\"'), + (2, '\"zzz\"=>\"foo\"'), + (3, '\"123\"=>\"321\"'), + (4, '\"yellow horse\"=>\"moaned\"'); + + -- tst_dom_constr + INSERT INTO tst_dom_constr VALUES (10);" + + wait_for_catchup $case_db $pub_node1_port "tap_sub_slot" + + # Check the data on subscriber + expected="1|{1,2,3} +2|{2,3,1} +3|{3,2,1} +4|{4,3,2} +5|{5,NULL,3} +{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{\"1 day\",\"2 days\",\"3 days\"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{\"3 years\",\"1 year\",\"2 years\"} +{4,1,2}|{d,a,b}|{4.4,1.1,2.2}|{\"4 years\",\"1 year\",\"2 years\"} +{5,NULL,NULL}|{e,NULL,b}|{5.5,1.1,NULL}|{\"5 years\",NULL,NULL} +1|a +2|b +3|c +4|d +5| +a|{b,c} +b|{c,a} +c|{b,a} +d|{c,b} +e|{d,NULL} +1|(1,a,1) +2|(2,b,2) +3|(3,c,3) +4|(4,d,4) +5|(,,5) +(1,a,1)|{\"(1,a,1)\"} +(2,b,2)|{\"(2,b,2)\"} +(3,c,3)|{\"(3,c,3)\"} +(4,d,4)|{\"(4,d,3)\"} +(5,e,)|{NULL,\"(5,,5)\"} +1|(1,a,1) +2|(2,b,2) +3|(3,c,3) +4|(4,d,4) +5|(,e,) +(1,a,1)|{\"(1,a,1)\"} +(2,b,2)|{\"(2,b,2)\"} +(3,c,3)|{\"(3,c,3)\"} +(4,d,3)|{\"(3,d,3)\"} +(5,e,3)|{\"(3,e,3)\",NULL} +1|(1,\"{a,b,c}\",1) +2|(2,\"{a,b,c}\",2) +3|(3,\"{a,b,c}\",3) +4|(4,\"{c,b,d}\",4) +5|(5,\"{NULL,e,NULL}\",5) +(1,\"{a,b,c}\",1)|{\"(1,\\\""{a,b,c}\\\"",1)\"} +(2,\"{b,c,a}\",2)|{\"(2,\\\""{b,c,a}\\\"",1)\"} +(3,\"{c,a,b}\",1)|{\"(3,\\\""{c,a,b}\\\"",1)\"} +(4,\"{c,b,d}\",4)|{\"(4,\\\""{c,b,d}\\\"",4)\"} +(5,\"{c,NULL,b}\",)|{\"(5,\\\""{c,e,b}\\\"",1)\"} +(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\"}\",a,\"{a,b,NULL,c}\")|{\"(\\\"(1,a,1)\\\",\\\"{\\\"\\\"(1,a,1)\\\"\\\",\\\"\\\"(2,b,2)\\\"\\\",NULL}\\\",a,\\\"{a,b,c}\\\")\"} +1|[1,11) +2|[2,21) +3|[3,31) +4|[4,41) +5|[5,51) +1|[\"2014-08-04 00:00:00+02\",infinity)|{\"[1,3)\",\"[10,21)\"} +2|[\"2014-08-02 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[2,4)\",\"[20,31)\"} +3|[\"2014-08-01 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[3,5)\"} +4|[\"2014-07-31 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[4,6)\",NULL,\"[40,51)\"} +5|| +1|\"a\"=>\"1\" +2|\"zzz\"=>\"foo\" +3|\"123\"=>\"321\" +4|\"yellow horse\"=>\"moaned\"" + + if [ "$(exec_sql $case_db $sub_node1_port "SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a;")" = "$expected" ]; then + echo "check replicated inserts on subscriber success" + else + echo "$failed_keyword when check replicated inserts on subscriber" + exit 1 + fi + + # Run batch of updates + exec_sql $case_db $pub_node1_port "UPDATE tst_one_array SET b = '{4, 5, 6}' WHERE a = 1; + UPDATE tst_one_array SET b = '{4, 5, 6, 1}' WHERE a > 3; + UPDATE tst_arrays SET b = '{\"1a\", \"2b\", \"3c\"}', c = '{1.0, 2.0, 3.0}', d = '{\"1 day 1 second\", \"2 days 2 seconds\", \"3 days 3 second\"}' WHERE a = '{1, 2, 3}'; + UPDATE tst_arrays SET b = '{\"c\", \"d\", \"e\"}', c = '{3.0, 4.0, 5.0}', d = '{\"3 day 1 second\", \"4 days 2 seconds\", \"5 days 3 second\"}' WHERE a[1] > 3; + UPDATE tst_one_enum SET b = 'c' WHERE a = 1; + UPDATE tst_one_enum SET b = NULL WHERE a > 3; + UPDATE tst_enums SET b = '{e, NULL}' WHERE a = 'a'; + UPDATE tst_enums SET b = '{e, d}' WHERE a > 'c'; + UPDATE tst_one_comp SET b = ROW(1.0, 'A', 1) WHERE a = 1; + UPDATE tst_one_comp SET b = ROW(NULL, 'x', -1) WHERE a > 3; + UPDATE tst_comps SET b = ARRAY[ROW(9, 'x', -1)::tst_comp_basic_t] WHERE (a).a = 1.0; + UPDATE tst_comps SET b = ARRAY[NULL, ROW(9, 'x', NULL)::tst_comp_basic_t] WHERE (a).a > 3.9; + UPDATE tst_comp_enum SET b = ROW(1.0, NULL, NULL) WHERE a = 1; + UPDATE tst_comp_enum SET b = ROW(4.0, 'd', 44) WHERE a > 3; + UPDATE tst_comp_enum_array SET b = ARRAY[NULL, ROW(3, 'd', 3)::tst_comp_enum_t] WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t; + UPDATE tst_comp_enum_array SET b = ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t, ROW(2, 'b', 2)::tst_comp_enum_t] WHERE (a).a > 3; + UPDATE tst_comp_one_enum_array SET b = ROW(1.0, '{a, e, c}', NULL) WHERE a = 1; + UPDATE tst_comp_one_enum_array SET b = ROW(4.0, '{c, b, d}', 4) WHERE a > 3; + UPDATE tst_comp_enum_what SET b = ARRAY[NULL, ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t, ROW(NULL, '{a, e, c}', 2)::tst_comp_enum_array_t] WHERE (a).a = 1; + UPDATE tst_comp_enum_what SET b = ARRAY[ROW(5, '{a, b, c}', 5)::tst_comp_enum_array_t] WHERE (a).a > 3; + UPDATE tst_comp_mix_array SET b[2] = NULL WHERE ((a).a).a = 1; + UPDATE tst_range SET b = '[100, 1000]' WHERE a = 1; + UPDATE tst_range SET b = '(1, 90)' WHERE a > 3; + UPDATE tst_range_array SET c = '{\"[100, 1000]\"}' WHERE a = 1; + UPDATE tst_range_array SET b = tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), c = '{NULL, \"[11,9999999]\"}' WHERE a > 3; + UPDATE tst_hstore SET b = '\"updated\"=>\"value\"' WHERE a < 3; + UPDATE tst_hstore SET b = '\"also\"=>\"updated\"' WHERE a = 3;" + + wait_for_catchup $case_db $pub_node1_port "tap_sub_slot" + + # Check the data on subscriber + expected="1|{4,5,6} +2|{2,3,1} +3|{3,2,1} +4|{4,5,6,1} +5|{4,5,6,1} +{1,2,3}|{1a,2b,3c}|{1,2,3}|{\"1 day 00:00:01\",\"2 days 00:00:02\",\"3 days 00:00:03\"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{\"3 years\",\"1 year\",\"2 years\"} +{4,1,2}|{c,d,e}|{3,4,5}|{\"3 days 00:00:01\",\"4 days 00:00:02\",\"5 days 00:00:03\"} +{5,NULL,NULL}|{c,d,e}|{3,4,5}|{\"3 days 00:00:01\",\"4 days 00:00:02\",\"5 days 00:00:03\"} +1|c +2|b +3|c +4| +5| +a|{e,NULL} +b|{c,a} +c|{b,a} +d|{e,d} +e|{e,d} +1|(1,A,1) +2|(2,b,2) +3|(3,c,3) +4|(,x,-1) +5|(,x,-1) +(1,a,1)|{\"(9,x,-1)\"} +(2,b,2)|{\"(2,b,2)\"} +(3,c,3)|{\"(3,c,3)\"} +(4,d,4)|{NULL,\"(9,x,)\"} +(5,e,)|{NULL,\"(9,x,)\"} +1|(1,,) +2|(2,b,2) +3|(3,c,3) +4|(4,d,44) +5|(4,d,44) +(1,a,1)|{NULL,\"(3,d,3)\"} +(2,b,2)|{\"(2,b,2)\"} +(3,c,3)|{\"(3,c,3)\"} +(4,d,3)|{\"(1,a,1)\",\"(2,b,2)\"} +(5,e,3)|{\"(1,a,1)\",\"(2,b,2)\"} +1|(1,\"{a,e,c}\",) +2|(2,\"{a,b,c}\",2) +3|(3,\"{a,b,c}\",3) +4|(4,\"{c,b,d}\",4) +5|(4,\"{c,b,d}\",4) +(1,\"{a,b,c}\",1)|{NULL,\"(1,\\\""{a,b,c}\\\"",1)\",\"(,\\\""{a,e,c}\\\"",2)\"} +(2,\"{b,c,a}\",2)|{\"(2,\\\""{b,c,a}\\\"",1)\"} +(3,\"{c,a,b}\",1)|{\"(3,\\\""{c,a,b}\\\"",1)\"} +(4,\"{c,b,d}\",4)|{\"(5,\\\""{a,b,c}\\\"",5)\"} +(5,\"{c,NULL,b}\",)|{\"(5,\\\""{a,b,c}\\\"",5)\"} +(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\"}\",a,\"{a,b,NULL,c}\")|{\"(\\\"(1,a,1)\\\",\\\"{\\\"\\\"(1,a,1)\\\"\\\",\\\"\\\"(2,b,2)\\\"\\\",NULL}\\\",a,\\\"{a,b,c}\\\")\",NULL} +1|[100,1001) +2|[2,21) +3|[3,31) +4|[2,90) +5|[2,90) +1|[\"2014-08-04 00:00:00+02\",infinity)|{\"[100,1001)\"} +2|[\"2014-08-02 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[2,4)\",\"[20,31)\"} +3|[\"2014-08-01 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[3,5)\"} +4|[\"2014-08-04 00:00:00+02\",infinity)|{NULL,\"[11,10000000)\"} +5|[\"2014-08-04 00:00:00+02\",infinity)|{NULL,\"[11,10000000)\"} +1|\"updated\"=>\"value\" +2|\"updated\"=>\"value\" +3|\"also\"=>\"updated\" +4|\"yellow horse\"=>\"moaned\"" + + if [ "$(exec_sql $case_db $sub_node1_port "SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a;")" = "$expected" ]; then + echo "check replicated updates on subscriber success" + else + echo "$failed_keyword when check replicated updates on subscriber" + exit 1 + fi + + # Run batch of deletes + exec_sql $case_db $pub_node1_port "DELETE FROM tst_one_array WHERE a = 1; + DELETE FROM tst_one_array WHERE b = '{2, 3, 1}'; + DELETE FROM tst_arrays WHERE a = '{1, 2, 3}'; + DELETE FROM tst_arrays WHERE a[1] = 2; + DELETE FROM tst_one_enum WHERE a = 1; + DELETE FROM tst_one_enum WHERE b = 'b'; + DELETE FROM tst_enums WHERE a = 'a'; + DELETE FROM tst_enums WHERE b[1] = 'b'; + DELETE FROM tst_one_comp WHERE a = 1; + DELETE FROM tst_one_comp WHERE (b).a = 2.0; + DELETE FROM tst_comps WHERE (a).b = 'a'; + DELETE FROM tst_comps WHERE ROW(3, 'c', 3)::tst_comp_basic_t = ANY(b); + DELETE FROM tst_comp_enum WHERE a = 1; + DELETE FROM tst_comp_enum WHERE (b).a = 2.0; + DELETE FROM tst_comp_enum_array WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t; + DELETE FROM tst_comp_enum_array WHERE ROW(3, 'c', 3)::tst_comp_enum_t = ANY(b); + DELETE FROM tst_comp_one_enum_array WHERE a = 1; + DELETE FROM tst_comp_one_enum_array WHERE 'a' = ANY((b).b); + DELETE FROM tst_comp_enum_what WHERE (a).a = 1; + DELETE FROM tst_comp_enum_what WHERE (b[1]).b = '{c, a, b}'; + DELETE FROM tst_comp_mix_array WHERE ((a).a).a = 1; + DELETE FROM tst_range WHERE a = 1; + DELETE FROM tst_range WHERE '[10,20]' && b; + DELETE FROM tst_range_array WHERE a = 1; + DELETE FROM tst_range_array WHERE tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 05 00:00:00 2014 CEST'::timestamptz) && b; + DELETE FROM tst_hstore WHERE a = 1;" + + wait_for_catchup $case_db $pub_node1_port "tap_sub_slot" + + # Check the data on subscriber + expected="3|{3,2,1} +4|{4,5,6,1} +5|{4,5,6,1} +{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{\"3 years\",\"1 year\",\"2 years\"} +{4,1,2}|{c,d,e}|{3,4,5}|{\"3 days 00:00:01\",\"4 days 00:00:02\",\"5 days 00:00:03\"} +{5,NULL,NULL}|{c,d,e}|{3,4,5}|{\"3 days 00:00:01\",\"4 days 00:00:02\",\"5 days 00:00:03\"} +3|c +4| +5| +b|{c,a} +d|{e,d} +e|{e,d} +3|(3,c,3) +4|(,x,-1) +5|(,x,-1) +(2,b,2)|{\"(2,b,2)\"} +(4,d,4)|{NULL,\"(9,x,)\"} +(5,e,)|{NULL,\"(9,x,)\"} +3|(3,c,3) +4|(4,d,44) +5|(4,d,44) +(2,b,2)|{\"(2,b,2)\"} +(4,d,3)|{\"(1,a,1)\",\"(2,b,2)\"} +(5,e,3)|{\"(1,a,1)\",\"(2,b,2)\"} +4|(4,\"{c,b,d}\",4) +5|(4,\"{c,b,d}\",4) +(2,\"{b,c,a}\",2)|{\"(2,\\\""{b,c,a}\\\"",1)\"} +(4,\"{c,b,d}\",4)|{\"(5,\\\""{a,b,c}\\\"",5)\"} +(5,\"{c,NULL,b}\",)|{\"(5,\\\""{a,b,c}\\\"",5)\"} +2|[\"2014-08-02 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[2,4)\",\"[20,31)\"} +3|[\"2014-08-01 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[3,5)\"} +2|\"updated\"=>\"value\" +3|\"also\"=>\"updated\" +4|\"yellow horse\"=>\"moaned\"" + + if [ "$(exec_sql $case_db $sub_node1_port "SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a;")" = "$expected" ]; then + echo "check replicated deletes on subscriber success" + else + echo "$failed_keyword when check replicated deletes on subscriber" + exit 1 + fi + + # Test a domain with a constraint backed by a SQL-language function, + # which needs an active snapshot in order to operate. + exec_sql $case_db $pub_node1_port "INSERT INTO tst_dom_constr VALUES (11)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub_slot" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT sum(a) FROM tst_dom_constr")" = "21" ]; then + echo "check sql-function constraint on domain success" + else + echo "$failed_keyword when check sql-function constraint on domain" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down