fix table sync with different column order && add testcase for subscription

This commit is contained in:
chenxiaobin19
2022-12-27 11:38:34 +08:00
parent be58b1a98f
commit 5927d3111a
6 changed files with 1000 additions and 31 deletions

View File

@ -547,21 +547,10 @@ void process_syncing_tables(XLogRecPtr current_lsn)
static List *make_copy_attnamelist(LogicalRepRelMapEntry *rel) static List *make_copy_attnamelist(LogicalRepRelMapEntry *rel)
{ {
List *attnamelist = NIL; List *attnamelist = NIL;
TupleDesc desc = RelationGetDescr(rel->localrel);
int i; int i;
for (i = 0; i < desc->natts; i++) { for (i = 0; i < rel->remoterel.natts; i++) {
int remoteattnum = rel->attrmap[i]; attnamelist = lappend(attnamelist, makeString(rel->remoterel.attnames[i]));
/* Skip dropped attributes. */
if (desc->attrs[i]->attisdropped)
continue;
/* Skip attributes that are missing on remote side. */
if (remoteattnum < 0)
continue;
attnamelist = lappend(attnamelist, makeString(rel->remoterel.attnames[remoteattnum]));
} }
return attnamelist; return attnamelist;

View File

@ -28,9 +28,9 @@ gsctl_wait_time=3600
data_dir=$g_data_path data_dir=$g_data_path
function exec_sql(){ function exec_sql(){
result=$(gsql -d $1 -p $2 -Atq -c "$3" |sed 'N;s/\n/ /g;b') result=$(gsql -d $1 -p $2 -Atq -c "$3")
if [ "$result" != "" ]; then if [ "$result" != "" ]; then
echo $result echo "$result"
fi fi
} }
@ -84,3 +84,37 @@ function switchover_to_primary() {
exit 1 exit 1
fi fi
} }
function get_log_file(){
logfile=$(ls -rtl $data_dir/$1/pg_log/ | tail -n 1 | awk '{print $9}')
echo "$data_dir/$1/pg_log/$logfile"
}
function restart_guc(){
gs_guc set -D $data_dir/$1 -c "$2"
gs_ctl restart -D $data_dir/$1
if [ $? -eq 0 ]; then
echo "restart $2 success!"
else
echo "$failed_keyword, restart $2 fail!"
exit 1
fi
}
function poll_query_until(){
max_attempts=20
attempt=0
while (($attempt < $max_attempts))
do
if [ "$(exec_sql $1 $2 "$3")" = "$4" ]; then
break
fi
sleep 1
attempt=`expr $attempt \+ 1`
done
if [ $attempt -eq $max_attempts ]; then
echo "$failed_keyword, $5"
exit 1
fi
}

View File

@ -1,2 +1,4 @@
rep_changes rep_changes
pub_switchover pub_switchover
types
constraints

View File

@ -0,0 +1,137 @@
#!/bin/sh
source $1/env_utils.sh $1 $2
case_db="constraints"
function test_1() {
echo "create database and tables."
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
# Setup structure on publisher
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_fk (bid int PRIMARY KEY);"
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, junk text, bid int REFERENCES tab_fk (bid));"
# Setup structure on subscriber; column order intentionally different
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_fk (bid int PRIMARY KEY);"
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_fk_ref (id int PRIMARY KEY, bid int REFERENCES tab_fk (bid), junk text);"
# Setup logical replication
echo "create publication and subscription."
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES;"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (copy_data = false)"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_fk (bid) VALUES (1);"
# "junk" value is meant to be large enough to force out-of-line storage
exec_sql $case_db $pub_node1_port "INSERT INTO tab_fk_ref (id, bid, junk) VALUES (1, 1, repeat(pi()::text,20000));"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk;")" = "1|1|1" ]; then
echo "check replicated tab_fk inserts on subscriber success"
else
echo "$failed_keyword when check replicated tab_fk inserts on subscriber change"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;")" = "1|1|1" ]; then
echo "check replicated tab_fk_ref inserts on subscriber success"
else
echo "$failed_keyword when check replicated tab_fk_ref inserts on subscriber"
exit 1
fi
# Drop the fk on publisher
exec_sql $case_db $pub_node1_port "DROP TABLE tab_fk CASCADE;"
# Insert data
exec_sql $case_db $pub_node1_port "INSERT INTO tab_fk_ref (id, bid) VALUES (2, 2);"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
# FK is not enforced on subscriber
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;")" = "2|1|2" ]; then
echo "check FK ignored on subscriber success"
else
echo "$failed_keyword when check FK ignored on subscriber"
exit 1
fi
# Add replica trigger
exec_sql $case_db $sub_node1_port "CREATE FUNCTION filter_basic_dml_fn() RETURNS TRIGGER AS \$\$
BEGIN
IF (TG_OP = 'INSERT') THEN
IF (NEW.id < 10) THEN
RETURN NEW;
ELSE
RETURN NULL;
END IF;
ELSIF (TG_OP = 'UPDATE') THEN
RETURN NULL;
ELSE
RAISE WARNING 'Unknown action';
RETURN NULL;
END IF;
END;
\$\$ LANGUAGE plpgsql;
CREATE TRIGGER filter_basic_dml_trg
BEFORE INSERT OR UPDATE OF bid ON tab_fk_ref
FOR EACH ROW EXECUTE PROCEDURE filter_basic_dml_fn();
ALTER TABLE tab_fk_ref ENABLE REPLICA TRIGGER filter_basic_dml_trg;"
# Insert data
exec_sql $case_db $pub_node1_port "INSERT INTO tab_fk_ref (id, bid) VALUES (10, 10);"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
# The trigger should cause the insert to be skipped on subscriber
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;")" = "2|1|2" ]; then
echo "check replica insert trigger applied on subscriber success"
else
echo "$failed_keyword when check replica insert trigger applied on subscriber"
exit 1
fi
# Update data
exec_sql $case_db $pub_node1_port "UPDATE tab_fk_ref SET bid = 2 WHERE bid = 1;"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
# The trigger should cause the update to be skipped on subscriber
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(bid), max(bid) FROM tab_fk_ref;")" = "2|1|2" ]; then
echo "check replica update column trigger applied on subscriber success"
else
echo "$failed_keyword when check replica update column trigger applied on subscriber"
exit 1
fi
# Update on a column not specified in the trigger, but it will trigger
# anyway because logical replication ships all columns in an update.
exec_sql $case_db $pub_node1_port "UPDATE tab_fk_ref SET id = 6 WHERE id = 1;"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(id), max(id) FROM tab_fk_ref;")" = "2|1|2" ]; then
echo "check column trigger applied even on update for other column success"
else
echo "$failed_keyword when check column trigger applied even on update for other column"
exit 1
fi
}
function tear_down() {
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub"
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
echo "tear down"
}
test_1
tear_down

View File

@ -15,6 +15,8 @@ function test_1() {
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full2 (x text)" exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full2 (x text)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')" exec_sql $case_db $pub_node1_port "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)" exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)"
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_mixed (a int primary key, b text, c numeric)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_mixed (a, b, c) VALUES (1, 'foo', 1.1)"
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)" exec_sql $case_db $pub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)"
exec_sql $case_db $pub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)" exec_sql $case_db $pub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)"
exec_sql $case_db $pub_node1_port "ALTER TABLE tab_include REPLICA IDENTITY FULL" exec_sql $case_db $pub_node1_port "ALTER TABLE tab_include REPLICA IDENTITY FULL"
@ -36,6 +38,10 @@ function test_1() {
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full_pk (a int primary key, b text)" exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full_pk (a int primary key, b text)"
exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL" exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL"
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_nothing (a int)" exec_sql $case_db $sub_node1_port "CREATE TABLE tab_nothing (a int)"
# different column count and order than on publisher
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_mixed (d text default 'local', c numeric, b text, a int primary key)"
# replication of the table with included index # replication of the table with included index
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)" exec_sql $case_db $sub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)"
exec_sql $case_db $sub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)" exec_sql $case_db $sub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)"
@ -49,7 +55,7 @@ function test_1() {
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub" exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)" exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)"
exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_include, tab_nothing, tab_full_pk, tab_no_replidentity_index" exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_mixed, tab_include, tab_nothing, tab_full_pk, tab_no_replidentity_index"
exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins" exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
@ -70,18 +76,19 @@ function test_1() {
exit 1 exit 1
fi fi
exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins SELECT generate_series(1,50)" exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins SELECT generate_series(1,50)"
exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a > 20" exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a > 20"
exec_sql $case_db $pub_node1_port "UPDATE tab_ins SET a = -a" exec_sql $case_db $pub_node1_port "UPDATE tab_ins SET a = -a"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,50)" exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,50)"
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep WHERE a > 20" exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep WHERE a > 20"
exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = -a" exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = -a"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')" exec_sql $case_db $pub_node1_port "INSERT INTO tab_mixed VALUES (2, 'bar', 2.2)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_nothing VALUES (generate_series(1,20))" exec_sql $case_db $pub_node1_port "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_include SELECT generate_series(1,50)" exec_sql $case_db $pub_node1_port "INSERT INTO tab_nothing VALUES (generate_series(1,20))"
exec_sql $case_db $pub_node1_port "DELETE FROM tab_include WHERE a > 20" exec_sql $case_db $pub_node1_port "INSERT INTO tab_include SELECT generate_series(1,50)"
exec_sql $case_db $pub_node1_port "UPDATE tab_include SET a = -a" exec_sql $case_db $pub_node1_port "DELETE FROM tab_include WHERE a > 20"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_no_replidentity_index VALUES(1)" exec_sql $case_db $pub_node1_port "UPDATE tab_include SET a = -a"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_no_replidentity_index VALUES(1)"
wait_for_catchup $case_db $pub_node1_port "tap_sub" wait_for_catchup $case_db $pub_node1_port "tap_sub"
@ -99,6 +106,14 @@ function test_1() {
exit 1 exit 1
fi fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_mixed")" = "local|1.1|foo|1
local|2.2|bar|2" ]; then
echo "check replicated changes with different column order success"
else
echo "$failed_keyword when check replicated changes with different column order"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_nothing")" = "20" ]; then if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_nothing")" = "20" ]; then
echo "check replicated changes with REPLICA IDENTITY NOTHING success" echo "check replicated changes with REPLICA IDENTITY NOTHING success"
else else
@ -212,11 +227,251 @@ function test_1() {
exec_sql $case_db $pub_node1_port "DROP TABLE temp2" exec_sql $case_db $pub_node1_port "DROP TABLE temp2"
exec_sql $case_db $sub_node1_port "DROP TABLE temp1" exec_sql $case_db $sub_node1_port "DROP TABLE temp1"
exec_sql $case_db $sub_node1_port "DROP TABLE temp2" exec_sql $case_db $sub_node1_port "DROP TABLE temp2"
# add REPLICA IDENTITY FULL so we can update
exec_sql $case_db $pub_node1_port "ALTER TABLE tab_full REPLICA IDENTITY FULL"
exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full REPLICA IDENTITY FULL"
exec_sql $case_db $pub_node1_port "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"
exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full2 REPLICA IDENTITY FULL"
exec_sql $case_db $pub_node1_port "ALTER TABLE tab_ins REPLICA IDENTITY FULL"
exec_sql $case_db $sub_node1_port "ALTER TABLE tab_ins REPLICA IDENTITY FULL"
# tab_mixed can use DEFAULT, since it has a primary key
# and do the updates
exec_sql $case_db $pub_node1_port "UPDATE tab_full SET a = a * a"
exec_sql $case_db $pub_node1_port "UPDATE tab_full2 SET x = 'bb' WHERE x = 'b'"
exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET b = 'baz' WHERE a = 1"
exec_sql $case_db $pub_node1_port "UPDATE tab_full_pk SET b = 'bar' WHERE a = 1"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_full")" = "20|1|100" ]; then
echo "check update works with REPLICA IDENTITY FULL and duplicate tuples success"
else
echo "$failed_keyword when check update works with REPLICA IDENTITY FULL and duplicate tuples"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT x FROM tab_full2 ORDER BY 1")" = "a
bb
bb" ]; then
echo "check update works with REPLICA IDENTITY FULL and text datums success"
else
echo "$failed_keyword when check update works with REPLICA IDENTITY FULL and text datums"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_mixed ORDER BY a")" = "local|1.1|baz|1
local|2.2|bar|2" ]; then
echo "check update works with different column order and subscriber local values success"
else
echo "$failed_keyword when check update works with different column order and subscriber local values"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_full_pk ORDER BY a")" = "1|bar
2|baz" ]; then
echo "check update works with REPLICA IDENTITY FULL and a primary key success"
else
echo "$failed_keyword when check update works with REPLICA IDENTITY FULL and a primary key"
exit 1
fi
# Check that subscriber handles cases where update/delete target tuple
# is missing.
exec_sql $case_db $sub_node1_port "DELETE FROM tab_full_pk"
# Note that the current location of the log file is not grabbed immediately
# after reloading the configuration, but after sending one SQL command to
# the node so that we are sure that the reloading has taken effect.
logfile=$(get_log_file "sub_datanode1")
location=$(awk 'END{print NR}' $logfile)
exec_sql $case_db $pub_node1_port "UPDATE tab_full_pk SET b = 'quux' WHERE a = 1"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
content=$(tail -n +$location $logfile)
if [[ "$content" =~ "logical replication did not find row for update in replication target relation \"tab_full_pk\"" ]]; then
echo "check print updated row is missing success"
else
echo "$failed_keyword when check print updated row is missing"
exit 1
fi
# check behavior with toasted values
exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET b = repeat('xyzzy', 100000) WHERE a = 2"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a")" = "1|3|1.1|local
2|500000|2.2|local" ]; then
echo "check update transmits large column value success"
else
echo "$failed_keyword when check update transmits large column value"
exit 1
fi
exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET c = 3.3 WHERE a = 2"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT a, length(b), c, d FROM tab_mixed ORDER BY a")" = "1|3|1.1|local
2|500000|3.3|local" ]; then
echo "check update with non-transmitted large column value success"
else
echo "$failed_keyword when check update with non-transmitted large column value"
exit 1
fi
# check behavior with dropped columns
# this update should get transmitted before the column goes away
exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET b = 'bar', c = 2.2 WHERE a = 2"
exec_sql $case_db $pub_node1_port "ALTER TABLE tab_mixed DROP COLUMN b"
exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET c = 11.11 WHERE a = 1"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_mixed ORDER BY a")" = "local|11.11|baz|1
local|2.2|bar|2" ]; then
echo "check update works with dropped publisher column success"
else
echo "$failed_keyword when check update works with dropped publisher column"
exit 1
fi
exec_sql $case_db $sub_node1_port "ALTER TABLE tab_mixed DROP COLUMN d"
exec_sql $case_db $pub_node1_port "UPDATE tab_mixed SET c = 22.22 WHERE a = 2"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM tab_mixed ORDER BY a")" = "11.11|baz|1
22.22|bar|2" ]; then
echo "check update works with dropped subscriber column success"
else
echo "$failed_keyword when check update works with dropped subscriber column"
exit 1
fi
# check that change of connection string and/or publication list causes
# restart of subscription workers. We check the state along with
# application_name to ensure that the walsender is (re)started.
#
# Not all of these are registered as tests as we need to poll for a change
# but the test suite will fail nonetheless when something goes wrong.
exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins SELECT generate_series(1001,1100)"
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1152|1|1100" ]; then
echo "check replicated inserts after subscription publication change success"
else
echo "$failed_keyword when check replicated inserts after subscription publication change"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_rep")" = "20|-20|-1" ]; then
echo "check changes skipped after subscription publication change success"
else
echo "$failed_keyword when check changes skipped after subscription publication change"
exit 1
fi
# check alter publication (relcache invalidation etc)
exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only SET (publish = 'insert, delete')"
exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"
exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a > 0"
exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = false)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_full VALUES(0)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_notrep VALUES (11)"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_notrep")" = "0" ]; then
echo "check non-replicated table is empty on subscriber success"
else
echo "$failed_keyword when check non-replicated table is empty on subscriber"
exit 1
fi
# note that data are different on provider and subscriber
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1052|1|1002" ]; then
echo "check replicated deletes after alter publication success"
else
echo "$failed_keyword when check replicated deletes after alter publication"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_full")" = "21|0|100" ]; then
echo "check replicated insert after alter publication success"
else
echo "$failed_keyword when check replicated insert after alter publication"
exit 1
fi
# check restart on rename
exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub RENAME TO tap_sub_renamed"
poll_query_until $case_db $sub_node1_port "SELECT subname FROM pg_stat_subscription" "tap_sub_renamed" "Timed out while waiting for apply to restart after renaming SUBSCRIPTION"
# check all the cleanup
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub_renamed"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_subscription")" = "0" ]; then
echo "check subscription was dropped on subscriber success"
else
echo "$failed_keyword when check subscription was dropped on subscriber"
exit 1
fi
if [ "$(exec_sql $case_db $pub_node1_port "SELECT count(*) FROM pg_replication_slots")" = "2" ]; then
echo "check replication slot was dropped on publisher success"
else
echo "$failed_keyword when check replication slot was dropped on publisher"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_subscription_rel")" = "0" ]; then
echo "check subscription relation status was dropped on subscriber success"
else
echo "$failed_keyword when check subscription relation status was dropped on subscriber"
exit 1
fi
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_replication_origin")" = "0" ]; then
echo "check replication origin was dropped on subscriber success"
else
echo "$failed_keyword when check replication origin was dropped on subscriber"
exit 1
fi
# CREATE PUBLICATION while wal_level=hot_standby should succeed, with a WARNING
restart_guc "pub_datanode1" "wal_level = hot_standby"
logfile=$(get_log_file "pub_datanode1")
location=$(awk 'END{print NR}' $logfile)
exec_sql $case_db $pub_node1_port "BEGIN;CREATE TABLE skip_wal(a int);CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal;ROLLBACK;"
content=$(tail -n +$location $logfile)
if [[ "$content" =~ "wal_level is insufficient to publish logical changes" ]]; then
echo "check print wal_level warning success"
else
echo "$failed_keyword when check print wal_level warning"
exit 1
fi
restart_guc "pub_datanode1" "wal_level = logical"
} }
function tear_down() { function tear_down() {
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub, tap_pub_ins_only" exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub, tap_pub_ins_only"
exec_sql $db $sub_node1_port "DROP DATABASE $case_db" exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
exec_sql $db $pub_node1_port "DROP DATABASE $case_db" exec_sql $db $pub_node1_port "DROP DATABASE $case_db"

View File

@ -0,0 +1,552 @@
#!/bin/sh
source $1/env_utils.sh $1 $2
case_db="types"
function test_1() {
echo "create database and tables."
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
# Setup structure on both nodes
ddl="CREATE TABLE public.tst_one_array (
a INTEGER PRIMARY KEY,
b INTEGER[]
);
CREATE TABLE public.tst_arrays (
a INTEGER[] PRIMARY KEY,
b TEXT[],
c FLOAT[],
d INTERVAL[]
);
CREATE TYPE public.tst_enum_t AS ENUM ('a', 'b', 'c', 'd', 'e');
CREATE TABLE public.tst_one_enum (
a INTEGER PRIMARY KEY,
b public.tst_enum_t
);
CREATE TABLE public.tst_enums (
a public.tst_enum_t PRIMARY KEY,
b public.tst_enum_t[]
);
CREATE TYPE public.tst_comp_basic_t AS (a FLOAT, b TEXT, c INTEGER);
CREATE TYPE public.tst_comp_enum_t AS (a FLOAT, b public.tst_enum_t, c INTEGER);
CREATE TYPE public.tst_comp_enum_array_t AS (a FLOAT, b public.tst_enum_t[], c INTEGER);
CREATE TABLE public.tst_one_comp (
a INTEGER PRIMARY KEY,
b public.tst_comp_basic_t
);
CREATE TABLE public.tst_comps (
a public.tst_comp_basic_t PRIMARY KEY,
b public.tst_comp_basic_t[]
);
CREATE TABLE public.tst_comp_enum (
a INTEGER PRIMARY KEY,
b public.tst_comp_enum_t
);
CREATE TABLE public.tst_comp_enum_array (
a public.tst_comp_enum_t PRIMARY KEY,
b public.tst_comp_enum_t[]
);
CREATE TABLE public.tst_comp_one_enum_array (
a INTEGER PRIMARY KEY,
b public.tst_comp_enum_array_t
);
CREATE TABLE public.tst_comp_enum_what (
a public.tst_comp_enum_array_t PRIMARY KEY,
b public.tst_comp_enum_array_t[]
);
CREATE TYPE public.tst_comp_mix_t AS (
a public.tst_comp_basic_t,
b public.tst_comp_basic_t[],
c public.tst_enum_t,
d public.tst_enum_t[]
);
CREATE TABLE public.tst_comp_mix_array (
a public.tst_comp_mix_t PRIMARY KEY,
b public.tst_comp_mix_t[]
);
CREATE TABLE public.tst_range (
a INTEGER PRIMARY KEY,
b int4range
);
CREATE TABLE public.tst_range_array (
a INTEGER PRIMARY KEY,
b TSTZRANGE,
c int8range[]
);
CREATE TABLE public.tst_hstore (
a INTEGER PRIMARY KEY,
b hstore
);
SET check_function_bodies=off;
CREATE FUNCTION public.monot_incr(int) RETURNS bool LANGUAGE sql
AS ' select \$1 > max(a) from public.tst_dom_constr; ';
CREATE DOMAIN monot_int AS int CHECK (monot_incr(VALUE));
CREATE TABLE public.tst_dom_constr (a monot_int);"
exec_sql $case_db $pub_node1_port "$ddl"
exec_sql $case_db $sub_node1_port "$ddl"
echo "create publication and subscription."
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)"
# Wait for initial sync to finish as well
wait_for_subscription_sync $case_db $sub_node1_port
# Insert initial test data
exec_sql $case_db $pub_node1_port "-- test_tbl_one_array_col
INSERT INTO tst_one_array (a, b) VALUES
(1, '{1, 2, 3}'),
(2, '{2, 3, 1}'),
(3, '{3, 2, 1}'),
(4, '{4, 3, 2}'),
(5, '{5, NULL, 3}');
-- test_tbl_arrays
INSERT INTO tst_arrays (a, b, c, d) VALUES
('{1, 2, 3}', '{\"a\", \"b\", \"c\"}', '{1.1, 2.2, 3.3}', '{\"1 day\", \"2 days\", \"3 days\"}'),
('{2, 3, 1}', '{\"b\", \"c\", \"a\"}', '{2.2, 3.3, 1.1}', '{\"2 minutes\", \"3 minutes\", \"1 minute\"}'),
('{3, 1, 2}', '{\"c\", \"a\", \"b\"}', '{3.3, 1.1, 2.2}', '{\"3 years\", \"1 year\", \"2 years\"}'),
('{4, 1, 2}', '{\"d\", \"a\", \"b\"}', '{4.4, 1.1, 2.2}', '{\"4 years\", \"1 year\", \"2 years\"}'),
('{5, NULL, NULL}', '{\"e\", NULL, \"b\"}', '{5.5, 1.1, NULL}', '{\"5 years\", NULL, NULL}');
-- test_tbl_single_enum
INSERT INTO tst_one_enum (a, b) VALUES
(1, 'a'),
(2, 'b'),
(3, 'c'),
(4, 'd'),
(5, NULL);
-- test_tbl_enums
INSERT INTO tst_enums (a, b) VALUES
('a', '{b, c}'),
('b', '{c, a}'),
('c', '{b, a}'),
('d', '{c, b}'),
('e', '{d, NULL}');
-- test_tbl_single_composites
INSERT INTO tst_one_comp (a, b) VALUES
(1, ROW(1.0, 'a', 1)),
(2, ROW(2.0, 'b', 2)),
(3, ROW(3.0, 'c', 3)),
(4, ROW(4.0, 'd', 4)),
(5, ROW(NULL, NULL, 5));
-- test_tbl_composites
INSERT INTO tst_comps (a, b) VALUES
(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]),
(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]),
(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]),
(ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]),
(ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]);
-- test_tbl_composite_with_enums
INSERT INTO tst_comp_enum (a, b) VALUES
(1, ROW(1.0, 'a', 1)),
(2, ROW(2.0, 'b', 2)),
(3, ROW(3.0, 'c', 3)),
(4, ROW(4.0, 'd', 4)),
(5, ROW(NULL, 'e', NULL));
-- test_tbl_composite_with_enums_array
INSERT INTO tst_comp_enum_array (a, b) VALUES
(ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]),
(ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]),
(ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]),
(ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]),
(ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]);
-- test_tbl_composite_with_single_enums_array_in_composite
INSERT INTO tst_comp_one_enum_array (a, b) VALUES
(1, ROW(1.0, '{a, b, c}', 1)),
(2, ROW(2.0, '{a, b, c}', 2)),
(3, ROW(3.0, '{a, b, c}', 3)),
(4, ROW(4.0, '{c, b, d}', 4)),
(5, ROW(5.0, '{NULL, e, NULL}', 5));
-- test_tbl_composite_with_enums_array_in_composite
INSERT INTO tst_comp_enum_what (a, b) VALUES
(ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]),
(ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]),
(ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]),
(ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]),
(ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]);
-- test_tbl_mixed_composites
INSERT INTO tst_comp_mix_array (a, b) VALUES
(ROW(
ROW(1,'a',1),
ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t],
'a',
'{a,b,NULL,c}'),
ARRAY[
ROW(
ROW(1,'a',1),
ARRAY[
ROW(1,'a',1)::tst_comp_basic_t,
ROW(2,'b',2)::tst_comp_basic_t,
NULL
],
'a',
'{a,b,c}'
)::tst_comp_mix_t
]
);
-- test_tbl_range
INSERT INTO tst_range (a, b) VALUES
(1, '[1, 10]'),
(2, '[2, 20]'),
(3, '[3, 30]'),
(4, '[4, 40]'),
(5, '[5, 50]');
-- test_tbl_range_array
INSERT INTO tst_range_array (a, b, c) VALUES
(1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{\"[1,2]\", \"[10,20]\"}'),
(2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{\"[2,3]\", \"[20,30]\"}'),
(3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{\"[3,4]\"}'),
(4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{\"[4,5]\", NULL, \"[40,50]\"}'),
(5, NULL, NULL);
-- tst_hstore
INSERT INTO tst_hstore (a, b) VALUES
(1, '\"a\"=>\"1\"'),
(2, '\"zzz\"=>\"foo\"'),
(3, '\"123\"=>\"321\"'),
(4, '\"yellow horse\"=>\"moaned\"');
-- tst_dom_constr
INSERT INTO tst_dom_constr VALUES (10);"
wait_for_catchup $case_db $pub_node1_port "tap_sub_slot"
# Check the data on subscriber
expected="1|{1,2,3}
2|{2,3,1}
3|{3,2,1}
4|{4,3,2}
5|{5,NULL,3}
{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{\"1 day\",\"2 days\",\"3 days\"}
{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{\"3 years\",\"1 year\",\"2 years\"}
{4,1,2}|{d,a,b}|{4.4,1.1,2.2}|{\"4 years\",\"1 year\",\"2 years\"}
{5,NULL,NULL}|{e,NULL,b}|{5.5,1.1,NULL}|{\"5 years\",NULL,NULL}
1|a
2|b
3|c
4|d
5|
a|{b,c}
b|{c,a}
c|{b,a}
d|{c,b}
e|{d,NULL}
1|(1,a,1)
2|(2,b,2)
3|(3,c,3)
4|(4,d,4)
5|(,,5)
(1,a,1)|{\"(1,a,1)\"}
(2,b,2)|{\"(2,b,2)\"}
(3,c,3)|{\"(3,c,3)\"}
(4,d,4)|{\"(4,d,3)\"}
(5,e,)|{NULL,\"(5,,5)\"}
1|(1,a,1)
2|(2,b,2)
3|(3,c,3)
4|(4,d,4)
5|(,e,)
(1,a,1)|{\"(1,a,1)\"}
(2,b,2)|{\"(2,b,2)\"}
(3,c,3)|{\"(3,c,3)\"}
(4,d,3)|{\"(3,d,3)\"}
(5,e,3)|{\"(3,e,3)\",NULL}
1|(1,\"{a,b,c}\",1)
2|(2,\"{a,b,c}\",2)
3|(3,\"{a,b,c}\",3)
4|(4,\"{c,b,d}\",4)
5|(5,\"{NULL,e,NULL}\",5)
(1,\"{a,b,c}\",1)|{\"(1,\\\""{a,b,c}\\\"",1)\"}
(2,\"{b,c,a}\",2)|{\"(2,\\\""{b,c,a}\\\"",1)\"}
(3,\"{c,a,b}\",1)|{\"(3,\\\""{c,a,b}\\\"",1)\"}
(4,\"{c,b,d}\",4)|{\"(4,\\\""{c,b,d}\\\"",4)\"}
(5,\"{c,NULL,b}\",)|{\"(5,\\\""{c,e,b}\\\"",1)\"}
(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\"}\",a,\"{a,b,NULL,c}\")|{\"(\\\"(1,a,1)\\\",\\\"{\\\"\\\"(1,a,1)\\\"\\\",\\\"\\\"(2,b,2)\\\"\\\",NULL}\\\",a,\\\"{a,b,c}\\\")\"}
1|[1,11)
2|[2,21)
3|[3,31)
4|[4,41)
5|[5,51)
1|[\"2014-08-04 00:00:00+02\",infinity)|{\"[1,3)\",\"[10,21)\"}
2|[\"2014-08-02 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[2,4)\",\"[20,31)\"}
3|[\"2014-08-01 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[3,5)\"}
4|[\"2014-07-31 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[4,6)\",NULL,\"[40,51)\"}
5||
1|\"a\"=>\"1\"
2|\"zzz\"=>\"foo\"
3|\"123\"=>\"321\"
4|\"yellow horse\"=>\"moaned\""
if [ "$(exec_sql $case_db $sub_node1_port "SET timezone = '+2';
SELECT a, b FROM tst_one_array ORDER BY a;
SELECT a, b, c, d FROM tst_arrays ORDER BY a;
SELECT a, b FROM tst_one_enum ORDER BY a;
SELECT a, b FROM tst_enums ORDER BY a;
SELECT a, b FROM tst_one_comp ORDER BY a;
SELECT a, b FROM tst_comps ORDER BY a;
SELECT a, b FROM tst_comp_enum ORDER BY a;
SELECT a, b FROM tst_comp_enum_array ORDER BY a;
SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
SELECT a, b FROM tst_comp_enum_what ORDER BY a;
SELECT a, b FROM tst_comp_mix_array ORDER BY a;
SELECT a, b FROM tst_range ORDER BY a;
SELECT a, b, c FROM tst_range_array ORDER BY a;
SELECT a, b FROM tst_hstore ORDER BY a;")" = "$expected" ]; then
echo "check replicated inserts on subscriber success"
else
echo "$failed_keyword when check replicated inserts on subscriber"
exit 1
fi
# Run batch of updates
exec_sql $case_db $pub_node1_port "UPDATE tst_one_array SET b = '{4, 5, 6}' WHERE a = 1;
UPDATE tst_one_array SET b = '{4, 5, 6, 1}' WHERE a > 3;
UPDATE tst_arrays SET b = '{\"1a\", \"2b\", \"3c\"}', c = '{1.0, 2.0, 3.0}', d = '{\"1 day 1 second\", \"2 days 2 seconds\", \"3 days 3 second\"}' WHERE a = '{1, 2, 3}';
UPDATE tst_arrays SET b = '{\"c\", \"d\", \"e\"}', c = '{3.0, 4.0, 5.0}', d = '{\"3 day 1 second\", \"4 days 2 seconds\", \"5 days 3 second\"}' WHERE a[1] > 3;
UPDATE tst_one_enum SET b = 'c' WHERE a = 1;
UPDATE tst_one_enum SET b = NULL WHERE a > 3;
UPDATE tst_enums SET b = '{e, NULL}' WHERE a = 'a';
UPDATE tst_enums SET b = '{e, d}' WHERE a > 'c';
UPDATE tst_one_comp SET b = ROW(1.0, 'A', 1) WHERE a = 1;
UPDATE tst_one_comp SET b = ROW(NULL, 'x', -1) WHERE a > 3;
UPDATE tst_comps SET b = ARRAY[ROW(9, 'x', -1)::tst_comp_basic_t] WHERE (a).a = 1.0;
UPDATE tst_comps SET b = ARRAY[NULL, ROW(9, 'x', NULL)::tst_comp_basic_t] WHERE (a).a > 3.9;
UPDATE tst_comp_enum SET b = ROW(1.0, NULL, NULL) WHERE a = 1;
UPDATE tst_comp_enum SET b = ROW(4.0, 'd', 44) WHERE a > 3;
UPDATE tst_comp_enum_array SET b = ARRAY[NULL, ROW(3, 'd', 3)::tst_comp_enum_t] WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t;
UPDATE tst_comp_enum_array SET b = ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t, ROW(2, 'b', 2)::tst_comp_enum_t] WHERE (a).a > 3;
UPDATE tst_comp_one_enum_array SET b = ROW(1.0, '{a, e, c}', NULL) WHERE a = 1;
UPDATE tst_comp_one_enum_array SET b = ROW(4.0, '{c, b, d}', 4) WHERE a > 3;
UPDATE tst_comp_enum_what SET b = ARRAY[NULL, ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t, ROW(NULL, '{a, e, c}', 2)::tst_comp_enum_array_t] WHERE (a).a = 1;
UPDATE tst_comp_enum_what SET b = ARRAY[ROW(5, '{a, b, c}', 5)::tst_comp_enum_array_t] WHERE (a).a > 3;
UPDATE tst_comp_mix_array SET b[2] = NULL WHERE ((a).a).a = 1;
UPDATE tst_range SET b = '[100, 1000]' WHERE a = 1;
UPDATE tst_range SET b = '(1, 90)' WHERE a > 3;
UPDATE tst_range_array SET c = '{\"[100, 1000]\"}' WHERE a = 1;
UPDATE tst_range_array SET b = tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), c = '{NULL, \"[11,9999999]\"}' WHERE a > 3;
UPDATE tst_hstore SET b = '\"updated\"=>\"value\"' WHERE a < 3;
UPDATE tst_hstore SET b = '\"also\"=>\"updated\"' WHERE a = 3;"
wait_for_catchup $case_db $pub_node1_port "tap_sub_slot"
# Check the data on subscriber
expected="1|{4,5,6}
2|{2,3,1}
3|{3,2,1}
4|{4,5,6,1}
5|{4,5,6,1}
{1,2,3}|{1a,2b,3c}|{1,2,3}|{\"1 day 00:00:01\",\"2 days 00:00:02\",\"3 days 00:00:03\"}
{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00}
{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{\"3 years\",\"1 year\",\"2 years\"}
{4,1,2}|{c,d,e}|{3,4,5}|{\"3 days 00:00:01\",\"4 days 00:00:02\",\"5 days 00:00:03\"}
{5,NULL,NULL}|{c,d,e}|{3,4,5}|{\"3 days 00:00:01\",\"4 days 00:00:02\",\"5 days 00:00:03\"}
1|c
2|b
3|c
4|
5|
a|{e,NULL}
b|{c,a}
c|{b,a}
d|{e,d}
e|{e,d}
1|(1,A,1)
2|(2,b,2)
3|(3,c,3)
4|(,x,-1)
5|(,x,-1)
(1,a,1)|{\"(9,x,-1)\"}
(2,b,2)|{\"(2,b,2)\"}
(3,c,3)|{\"(3,c,3)\"}
(4,d,4)|{NULL,\"(9,x,)\"}
(5,e,)|{NULL,\"(9,x,)\"}
1|(1,,)
2|(2,b,2)
3|(3,c,3)
4|(4,d,44)
5|(4,d,44)
(1,a,1)|{NULL,\"(3,d,3)\"}
(2,b,2)|{\"(2,b,2)\"}
(3,c,3)|{\"(3,c,3)\"}
(4,d,3)|{\"(1,a,1)\",\"(2,b,2)\"}
(5,e,3)|{\"(1,a,1)\",\"(2,b,2)\"}
1|(1,\"{a,e,c}\",)
2|(2,\"{a,b,c}\",2)
3|(3,\"{a,b,c}\",3)
4|(4,\"{c,b,d}\",4)
5|(4,\"{c,b,d}\",4)
(1,\"{a,b,c}\",1)|{NULL,\"(1,\\\""{a,b,c}\\\"",1)\",\"(,\\\""{a,e,c}\\\"",2)\"}
(2,\"{b,c,a}\",2)|{\"(2,\\\""{b,c,a}\\\"",1)\"}
(3,\"{c,a,b}\",1)|{\"(3,\\\""{c,a,b}\\\"",1)\"}
(4,\"{c,b,d}\",4)|{\"(5,\\\""{a,b,c}\\\"",5)\"}
(5,\"{c,NULL,b}\",)|{\"(5,\\\""{a,b,c}\\\"",5)\"}
(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\"}\",a,\"{a,b,NULL,c}\")|{\"(\\\"(1,a,1)\\\",\\\"{\\\"\\\"(1,a,1)\\\"\\\",\\\"\\\"(2,b,2)\\\"\\\",NULL}\\\",a,\\\"{a,b,c}\\\")\",NULL}
1|[100,1001)
2|[2,21)
3|[3,31)
4|[2,90)
5|[2,90)
1|[\"2014-08-04 00:00:00+02\",infinity)|{\"[100,1001)\"}
2|[\"2014-08-02 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[2,4)\",\"[20,31)\"}
3|[\"2014-08-01 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[3,5)\"}
4|[\"2014-08-04 00:00:00+02\",infinity)|{NULL,\"[11,10000000)\"}
5|[\"2014-08-04 00:00:00+02\",infinity)|{NULL,\"[11,10000000)\"}
1|\"updated\"=>\"value\"
2|\"updated\"=>\"value\"
3|\"also\"=>\"updated\"
4|\"yellow horse\"=>\"moaned\""
if [ "$(exec_sql $case_db $sub_node1_port "SET timezone = '+2';
SELECT a, b FROM tst_one_array ORDER BY a;
SELECT a, b, c, d FROM tst_arrays ORDER BY a;
SELECT a, b FROM tst_one_enum ORDER BY a;
SELECT a, b FROM tst_enums ORDER BY a;
SELECT a, b FROM tst_one_comp ORDER BY a;
SELECT a, b FROM tst_comps ORDER BY a;
SELECT a, b FROM tst_comp_enum ORDER BY a;
SELECT a, b FROM tst_comp_enum_array ORDER BY a;
SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
SELECT a, b FROM tst_comp_enum_what ORDER BY a;
SELECT a, b FROM tst_comp_mix_array ORDER BY a;
SELECT a, b FROM tst_range ORDER BY a;
SELECT a, b, c FROM tst_range_array ORDER BY a;
SELECT a, b FROM tst_hstore ORDER BY a;")" = "$expected" ]; then
echo "check replicated updates on subscriber success"
else
echo "$failed_keyword when check replicated updates on subscriber"
exit 1
fi
# Run batch of deletes
exec_sql $case_db $pub_node1_port "DELETE FROM tst_one_array WHERE a = 1;
DELETE FROM tst_one_array WHERE b = '{2, 3, 1}';
DELETE FROM tst_arrays WHERE a = '{1, 2, 3}';
DELETE FROM tst_arrays WHERE a[1] = 2;
DELETE FROM tst_one_enum WHERE a = 1;
DELETE FROM tst_one_enum WHERE b = 'b';
DELETE FROM tst_enums WHERE a = 'a';
DELETE FROM tst_enums WHERE b[1] = 'b';
DELETE FROM tst_one_comp WHERE a = 1;
DELETE FROM tst_one_comp WHERE (b).a = 2.0;
DELETE FROM tst_comps WHERE (a).b = 'a';
DELETE FROM tst_comps WHERE ROW(3, 'c', 3)::tst_comp_basic_t = ANY(b);
DELETE FROM tst_comp_enum WHERE a = 1;
DELETE FROM tst_comp_enum WHERE (b).a = 2.0;
DELETE FROM tst_comp_enum_array WHERE a = ROW(1.0, 'a', 1)::tst_comp_enum_t;
DELETE FROM tst_comp_enum_array WHERE ROW(3, 'c', 3)::tst_comp_enum_t = ANY(b);
DELETE FROM tst_comp_one_enum_array WHERE a = 1;
DELETE FROM tst_comp_one_enum_array WHERE 'a' = ANY((b).b);
DELETE FROM tst_comp_enum_what WHERE (a).a = 1;
DELETE FROM tst_comp_enum_what WHERE (b[1]).b = '{c, a, b}';
DELETE FROM tst_comp_mix_array WHERE ((a).a).a = 1;
DELETE FROM tst_range WHERE a = 1;
DELETE FROM tst_range WHERE '[10,20]' && b;
DELETE FROM tst_range_array WHERE a = 1;
DELETE FROM tst_range_array WHERE tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 05 00:00:00 2014 CEST'::timestamptz) && b;
DELETE FROM tst_hstore WHERE a = 1;"
wait_for_catchup $case_db $pub_node1_port "tap_sub_slot"
# Check the data on subscriber
expected="3|{3,2,1}
4|{4,5,6,1}
5|{4,5,6,1}
{3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{\"3 years\",\"1 year\",\"2 years\"}
{4,1,2}|{c,d,e}|{3,4,5}|{\"3 days 00:00:01\",\"4 days 00:00:02\",\"5 days 00:00:03\"}
{5,NULL,NULL}|{c,d,e}|{3,4,5}|{\"3 days 00:00:01\",\"4 days 00:00:02\",\"5 days 00:00:03\"}
3|c
4|
5|
b|{c,a}
d|{e,d}
e|{e,d}
3|(3,c,3)
4|(,x,-1)
5|(,x,-1)
(2,b,2)|{\"(2,b,2)\"}
(4,d,4)|{NULL,\"(9,x,)\"}
(5,e,)|{NULL,\"(9,x,)\"}
3|(3,c,3)
4|(4,d,44)
5|(4,d,44)
(2,b,2)|{\"(2,b,2)\"}
(4,d,3)|{\"(1,a,1)\",\"(2,b,2)\"}
(5,e,3)|{\"(1,a,1)\",\"(2,b,2)\"}
4|(4,\"{c,b,d}\",4)
5|(4,\"{c,b,d}\",4)
(2,\"{b,c,a}\",2)|{\"(2,\\\""{b,c,a}\\\"",1)\"}
(4,\"{c,b,d}\",4)|{\"(5,\\\""{a,b,c}\\\"",5)\"}
(5,\"{c,NULL,b}\",)|{\"(5,\\\""{a,b,c}\\\"",5)\"}
2|[\"2014-08-02 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[2,4)\",\"[20,31)\"}
3|[\"2014-08-01 00:00:00+02\",\"2014-08-04 00:00:00+02\")|{\"[3,5)\"}
2|\"updated\"=>\"value\"
3|\"also\"=>\"updated\"
4|\"yellow horse\"=>\"moaned\""
if [ "$(exec_sql $case_db $sub_node1_port "SET timezone = '+2';
SELECT a, b FROM tst_one_array ORDER BY a;
SELECT a, b, c, d FROM tst_arrays ORDER BY a;
SELECT a, b FROM tst_one_enum ORDER BY a;
SELECT a, b FROM tst_enums ORDER BY a;
SELECT a, b FROM tst_one_comp ORDER BY a;
SELECT a, b FROM tst_comps ORDER BY a;
SELECT a, b FROM tst_comp_enum ORDER BY a;
SELECT a, b FROM tst_comp_enum_array ORDER BY a;
SELECT a, b FROM tst_comp_one_enum_array ORDER BY a;
SELECT a, b FROM tst_comp_enum_what ORDER BY a;
SELECT a, b FROM tst_comp_mix_array ORDER BY a;
SELECT a, b FROM tst_range ORDER BY a;
SELECT a, b, c FROM tst_range_array ORDER BY a;
SELECT a, b FROM tst_hstore ORDER BY a;")" = "$expected" ]; then
echo "check replicated deletes on subscriber success"
else
echo "$failed_keyword when check replicated deletes on subscriber"
exit 1
fi
# Test a domain with a constraint backed by a SQL-language function,
# which needs an active snapshot in order to operate.
exec_sql $case_db $pub_node1_port "INSERT INTO tst_dom_constr VALUES (11)"
wait_for_catchup $case_db $pub_node1_port "tap_sub_slot"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT sum(a) FROM tst_dom_constr")" = "21" ]; then
echo "check sql-function constraint on domain success"
else
echo "$failed_keyword when check sql-function constraint on domain"
exit 1
fi
}
function tear_down() {
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub"
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
echo "tear down"
}
test_1
tear_down