From be58b1a98f8422dbc1d1fe7a3e238dede81328ba Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Thu, 1 Dec 2022 19:08:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8F=91=E5=B8=83=E8=AE=A2?= =?UTF-8?q?=E9=98=85fastcheck?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/test/CMakeLists.txt | 1 + src/test/subscription/CMakeLists.txt | 1 + src/test/subscription/GNUmakefile | 36 +++ src/test/subscription/Makefile | 12 + src/test/subscription/env_utils.sh | 86 +++++++ src/test/subscription/pubsub.py | 227 +++++++++++++++++ src/test/subscription/run_check.sh | 52 ++++ src/test/subscription/schedule | 2 + .../subscription/testcase/pub_switchover.sh | 74 ++++++ src/test/subscription/testcase/rep_changes.sh | 228 ++++++++++++++++++ 10 files changed, 719 insertions(+) create mode 100644 src/test/subscription/CMakeLists.txt create mode 100644 src/test/subscription/GNUmakefile create mode 100644 src/test/subscription/Makefile create mode 100644 src/test/subscription/env_utils.sh create mode 100644 src/test/subscription/pubsub.py create mode 100644 src/test/subscription/run_check.sh create mode 100644 src/test/subscription/schedule create mode 100644 src/test/subscription/testcase/pub_switchover.sh create mode 100644 src/test/subscription/testcase/rep_changes.sh diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 1ff925c7c..ac0bca1b2 100755 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -11,6 +11,7 @@ set(CMAKE_MODULE_PATH add_subdirectory(isolation) if ("${ENABLE_MULTIPLE_NODES}" STREQUAL "OFF") add_subdirectory(regress) + add_subdirectory(subscription) endif () if ("${ENABLE_UT}" STREQUAL "ON" AND "${ENABLE_MULTIPLE_NODES}" STREQUAL "OFF") diff --git a/src/test/subscription/CMakeLists.txt b/src/test/subscription/CMakeLists.txt new file mode 100644 index 000000000..6a76ae1d0 --- /dev/null +++ b/src/test/subscription/CMakeLists.txt @@ -0,0 +1 @@ +add_custom_target(check COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/run_check.sh ${CMAKE_CURRENT_SOURCE_DIR} \$\(p\)) \ No newline at end of file diff --git a/src/test/subscription/GNUmakefile b/src/test/subscription/GNUmakefile new file mode 100644 index 000000000..4defbdf70 --- /dev/null +++ b/src/test/subscription/GNUmakefile @@ -0,0 +1,36 @@ +#------------------------------------------------------------------------- +# +# GNUmakefile-- +# Makefile for src/test/subscription (the publication/subscription tests) +# +# Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group +# Portions Copyright (c) 1994, Regents of the University of California +# +# src/test/subscription/GNUmakefile +# +#------------------------------------------------------------------------- + +subdir = src/test/subscription +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global + +# where to find psql for testing an existing installation +PSQLDIR = $(bindir) +p=25800 +## +## Run tests +## + +REGRESS_OPTS = --dlpath=. $(EXTRA_REGRESS_OPTS) +REG_CONF = --regconf=regress.conf + +check: all + sh $(CURDIR)/run_check.sh $(CURDIR) $p + +## +## Clean up +## + +# things created by various check targets +clean distclean maintainer-clean: + rm -rf $(pg_regress_clean_files) diff --git a/src/test/subscription/Makefile b/src/test/subscription/Makefile new file mode 100644 index 000000000..43b370123 --- /dev/null +++ b/src/test/subscription/Makefile @@ -0,0 +1,12 @@ +# The openGauss make files exploit features of GNU make that other makes +# do not have. Because it is a common mistake for users to try to build +# openGauss with a different make, we have this make file that does nothing +# but tell the user to use GNU make. + +# If the user were using GNU make now, this file would not get used because +# GNU make uses a make file named "GNUmakefile" in preference to "Makefile" +# if it exists. openGauss is shipped with a "GNUmakefile". + +all check: + @echo "You must use GNU make to use openGauss. It may be installed" + @echo "on your system with the name 'gmake'." diff --git a/src/test/subscription/env_utils.sh b/src/test/subscription/env_utils.sh new file mode 100644 index 000000000..2e8bcb91a --- /dev/null +++ b/src/test/subscription/env_utils.sh @@ -0,0 +1,86 @@ +#!/bin/sh +#some enviroment vars + +export g_base_port=$2 +export prefix=${GAUSSHOME} +export install_path="$prefix" +export GAUSSHOME="$prefix" +export LD_LIBRARY_PATH=$prefix/lib:$prefix/lib/libobs:$LD_LIBRARY_PATH +export PATH="$prefix/bin":$PATH +export g_local_ip="127.0.0.1" + +db=postgres +scripts_dir="$1" +username=`whoami` +passwd="Gauss@123" + +export g_data_path="$scripts_dir/tmp_check" +export g_username="$username" +export pub_node1_port=`expr $g_base_port` +export pub_node2_port=`expr $g_base_port \+ 3` +export pub_node3_port=`expr $g_base_port \+ 6` +export sub_node1_port=`expr $g_base_port \+ 9` +export sub_node2_port=`expr $g_base_port \+ 12` +export sub_node3_port=`expr $g_base_port \+ 15` + +failed_keyword="testcase_failed" +gsctl_wait_time=3600 +data_dir=$g_data_path + +function exec_sql(){ + result=$(gsql -d $1 -p $2 -Atq -c "$3" |sed 'N;s/\n/ /g;b') + if [ "$result" != "" ]; then + echo $result + fi +} + +function wait_for_subscription_sync(){ + max_attempts=20 + attempt=0 + query="SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + while (($attempt < $max_attempts)) + do + if [ "$(exec_sql $1 $2 "$query")" = "t" ]; then + echo "initial data for subscription has been already synchronized" + break + fi + sleep 1 + attempt=`expr $attempt \+ 1` + done + + if [ $attempt -eq $max_attempts ]; then + echo "$failed_keyword, timed out waiting for subscriber to synchronize data." + exit 1 + fi +} + +function wait_for_catchup(){ + target_lsn=$(exec_sql $1 $2 "SELECT pg_current_xlog_location()") + max_attempts=20 + attempt=0 + query="SELECT '$target_lsn' <= confirmed_flush FROM pg_replication_slots WHERE slot_name = '$3'"; + while (($attempt < $max_attempts)) + do + if [ "$(exec_sql $1 $2 "$query")" = "t" ]; then + echo "subscriber has been caught up" + break + fi + sleep 1 + attempt=`expr $attempt \+ 1` + done + + if [ $attempt -eq $max_attempts ]; then + echo "$failed_keyword, timed out waiting for catchup." + exit 1 + fi +} + +function switchover_to_primary() { + gs_ctl switchover -w -t $gsctl_wait_time -D $data_dir/$1 + if [ $? -eq 0 ]; then + echo "switchover to primary success!" + else + echo "$failed_keyword, switchover to pirmary fail!" + exit 1 + fi +} diff --git a/src/test/subscription/pubsub.py b/src/test/subscription/pubsub.py new file mode 100644 index 000000000..d59f1b267 --- /dev/null +++ b/src/test/subscription/pubsub.py @@ -0,0 +1,227 @@ +#!/usr/bin/python + +import getopt, sys, os +import shutil +import time +import string + +g_data_path = os.environ.get("g_data_path") +install_path = os.environ.get("install_path") +g_local_ip = os.environ.get("g_local_ip") +g_username = os.environ.get("g_username") +g_passwd = "Gauss@123" +pub_node1_port = int(os.environ.get("pub_node1_port")); +pub_node2_port = int(os.environ.get("pub_node2_port")); +pub_node3_port = int(os.environ.get("pub_node3_port")); +sub_node1_port = int(os.environ.get("sub_node1_port")); +sub_node2_port = int(os.environ.get("sub_node2_port")); +sub_node3_port = int(os.environ.get("sub_node3_port")); + + +class Pterodb(): + def __init__(self, data_node_num, port_arr, data_dir, dname_prefix): + self.data_node_num = data_node_num + self.data_dir = data_dir + self.dname_prefix = dname_prefix + + self.ha_port_arr = port_arr + self.service_port_arr = [port_arr[i] + 1 for i in range(data_node_num)] + self.heartbeat_port_arr = [port_arr[i] + 2 for i in range(data_node_num)] + + def init_env(self): + if(os.path.exists(self.data_dir) == False): + os.mkdir(self.data_dir) + + for i in range(1, self.data_node_num + 1): + datanode_cmd_init = install_path + "/bin/gs_initdb -D " + self.data_dir + "/" + self.dname_prefix + str(i) + " --nodename=" + self.dname_prefix + str(i) + " -w " + g_passwd + print datanode_cmd_init + os.system(datanode_cmd_init) + + conf_file = self.data_dir + "/" + self.dname_prefix + str(i) + "/postgresql.conf" + self.__modify_conf_port(conf_file,self.ha_port_arr[i-1]) + self.__turn_on_pg_log(conf_file) + self.__modify_conf_standby(conf_file,i) + self.__modify_conf_application_name(conf_file, "dn" + str(i)) + self.__modify_remote_read_mode(conf_file) + + hba_file = self.data_dir + "/" + self.dname_prefix + str(i) + "/pg_hba.conf" + self.__config_replication_hba(hba_file) + + def __modify_conf_standby(self, conf_file, n): + j = 1 + + file_handler = open(conf_file,"a") + string = "listen_addresses = '*'"+ "\n" + file_handler.write(string) + + for i in range(1, self.data_node_num + 1): + if(i != n): + #repl + string = "replconninfo%d = 'localhost=%s localport=%d localheartbeatport=%d localservice=%d remotehost=%s remoteport=%d remoteheartbeatport=%d remoteservice=%d'\n" % \ + (j, g_local_ip, self.ha_port_arr[n-1], self.heartbeat_port_arr[n-1], self.service_port_arr[n-1], g_local_ip, self.ha_port_arr[i-1], self.heartbeat_port_arr[i-1], self.service_port_arr[i-1]) + print string + file_handler.write(string) + j = j + 1 + + file_handler.close() + + def __modify_conf_application_name(self, conf_file, name): + file_handler = open(conf_file,"a") + string = "application_name = '" + name + "'" + "\n" + file_handler.write(string) + file_handler.close() + + def __modify_remote_read_mode(self, conf_file): + file_handler = open(conf_file,"a") + string = "remote_read_mode = 'off'" + "\n" + file_handler.write(string) + file_handler.close() + + def __modify_conf_port(self, conf_file, port): + file_handler = open(conf_file,"a") + + string = "port = " + str(port) + "\n" + file_handler.write(string) + + file_handler.close() + + def __turn_on_pg_log(self, conf_file): + file_handler = open(conf_file,"a") + pglog_conf = "logging_collector = on \n" + pglog_conf = pglog_conf + "log_directory = 'pg_log' \n" + pglog_conf = pglog_conf + "log_line_prefix = '%m %c %d %p %a %x %e ' \n" + pglog_conf = pglog_conf + "client_min_messages = ERROR \n" + pglog_conf = pglog_conf + "enable_data_replicate = off \n" + pglog_conf = pglog_conf + "replication_type = 1 \n" + pglog_conf = pglog_conf + "wal_level = logical \n" + pglog_conf = pglog_conf + "max_wal_senders = 8 \n" + pglog_conf = pglog_conf + "enable_slot_log = on \n" + file_handler.write(pglog_conf) + file_handler.close() + + def __config_replication_hba(self, hba_file): + file_handler = open(hba_file,"a") + replication_line = "host replication " + str(g_username) + " " + str(g_local_ip) + "/32 trust" + file_handler.write(replication_line) + file_handler.close() + + def __create_default_db(self): + # connect to primary DN to create db + cmd = install_path + "/bin/gsql -p " + str(self.ha_port_arr[0]) + " postgres -c 'create database test'" + os.system(cmd) + + def __rm_pid_file(self): + cmd = "rm -rf " + # dn + for i in range(1,self.data_node_num+1): + rm_cmd = cmd + self.data_dir + "/" + self.dname_prefix + str(i) + "/postmaster.pid" + print rm_cmd + os.system(rm_cmd) + + + def __start_server(self): + #clean evn + self.__rm_pid_file() + + #start primary + datanode_cmd = install_path + "/bin/gs_ctl " + "start -M primary" + " -D " + self.data_dir + "/" + self.dname_prefix + str(1) + " > " + self.data_dir + "/" + self.dname_prefix + str(1) + "/logdn" + str(1) + ".log 2>&1 &" + print datanode_cmd + os.system(datanode_cmd) + + time.sleep(5) + + #start data_node_standby1,2,3...7 + for i in range(2,self.data_node_num+1): + datanode_cmd = install_path + "/bin/gs_ctl" + " start -M standby "+ " -D " + self.data_dir + "/" + self.dname_prefix + str(i) + " > " + self.data_dir + "/" + self.dname_prefix + str(i) + "/logdn" + str(i) + ".log 2>&1 &" + print datanode_cmd + os.system(datanode_cmd) + time.sleep(5) + + datanode_cmd = install_path + "/bin/gs_ctl" + " build "+ "-D " + self.data_dir + "/" + self.dname_prefix + str(i) + " -Z single_node " + " > " + self.data_dir + "/" + self.dname_prefix + str(i) + "/logdn" + str(i) + ".log 2>&1 &" + print datanode_cmd + os.system(datanode_cmd) + time.sleep(5) + + time.sleep(5) + + def __stop_server(self): + for i in range(1,self.data_node_num+1): + datanode_cmd = install_path + "/bin/gs_ctl stop -D " + self.data_dir + "/" + self.dname_prefix + str(i) + " -Z single_node" + print datanode_cmd + os.system(datanode_cmd) + + def run(self, run_type): + if(run_type == 0): + self.init_env() + #print "init_env ok" + self.__start_server() + #print "start_server ok" + self.__create_default_db() + #print "create_default_db ok" + print "start ok" + elif(run_type == 1): + self.__start_server() + print "start ok" + elif(run_type == 2): + self.__stop_server() + print "stop ok" + +def usage(): + print "------------------------------------------------------" + print "python pubsub.py\n" + print " -d datanode_num, set and start up dn" + print " -s means start" + print " -o means stop" + print " -D data directory" + print "------------------------------------------------------" + +def main(): + try: + opts, args = getopt.getopt(sys.argv[1:], "hD:d:sov", ["help", "data_dir="]) + except getopt.GetoptError, err: + # print help information and exit: + print str(err) # will print something like "option -a not recognized" + # usage() + sys.exit(2) + + datanode_num = 0 + data_dir = g_data_path + run_type = 0 + + for o, a in opts: + if o == "-v": + verbose = True + elif o in ("-h", "--help"): + usage() + sys.exit() + elif o in ("-D", "data_dir"): + data_dir = a + elif o in ("-d", "--datanode"): + datanode_num = int(a) + elif o in ("-s", "--start"): + run_type = 1 + elif o in ("-o", "--stop"): + run_type = 2 + else: + assert False, "unhandled option" + + if((datanode_num == 0) and run_type == 0): + usage() + sys.exit() + + create_key_cipher_cmd = install_path + "/bin/gs_guc generate -S " + g_passwd + " -D " + install_path + "/bin -o subscription" + print create_key_cipher_cmd + os.system(create_key_cipher_cmd) + + pub_port_arr = [pub_node1_port, pub_node2_port, pub_node3_port]; + ptdb = Pterodb(datanode_num, pub_port_arr, data_dir, "pub_datanode"); + ptdb.run(run_type) + + sub_port_arr = [sub_node1_port, sub_node2_port, sub_node3_port]; + ptdb = Pterodb(datanode_num, sub_port_arr, data_dir, "sub_datanode"); + ptdb.run(run_type) + + + +if __name__ == "__main__": + main() diff --git a/src/test/subscription/run_check.sh b/src/test/subscription/run_check.sh new file mode 100644 index 000000000..d6d7f1d7c --- /dev/null +++ b/src/test/subscription/run_check.sh @@ -0,0 +1,52 @@ +#!/bin/sh + +count=0 +source $1/env_utils.sh $1 $2 + +#clean temporary files generated after last check +echo "removing $g_data_path" +if [ -d $g_data_path ]; then + rm -rf $g_data_path +fi + +echo "removing $1/results" +if [ -d "$1/results" ]; then + rm -rf $1/results +fi + +mkdir $1/results + +total_starttime=`date +"%Y-%m-%d %H:%M:%S"` +total_startvalue=`date -d "$total_starttime" +%s` + +#init and start the database +printf "init and start the database\n" +node_num=3 +python2 $scripts_dir/pubsub.py -d $node_num > $1/results/deploy_cluster.log 2>&1 + +printf "%-50s%-10s%-10s\n" "testcase" "result" "time(s)" +for line in `cat $1/schedule | grep -v ^#` +do + printf "%-50s" $line + starttime=`date +"%Y-%m-%d %H:%M:%S"` + sh $1/testcase/$line.sh $1 $2 > $1/results/$line.log 2>&1 + count=`expr $count + 1` + endtime=`date +"%Y-%m-%d %H:%M:%S"` + starttime1=`date -d "$starttime" +%s` + endtime1=`date -d "$endtime" +%s` + interval=`expr $endtime1 - $starttime1` + if [ $( grep "$failed_keyword" $1/results/$line.log | wc -l ) -eq 0 ]; then + printf "%-10s%-10s\n" ".... ok" $interval + else + printf "%-10s%-10s\n" ".... FAILED" $interval + exit 1 + fi +done + +printf "stop the database\n" +python2 $scripts_dir/pubsub.py -o -d $node_num > $1/results/stop_cluster.log 2>&1 + +total_endtime=`date +"%Y-%m-%d %H:%M:%S"` +total_endvalue=`date -d "$total_endtime" +%s` +printf "all %d tests passed.\n" $count +printf "total time: %ss\n" $(($total_endvalue - $total_startvalue)) \ No newline at end of file diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule new file mode 100644 index 000000000..887d694ef --- /dev/null +++ b/src/test/subscription/schedule @@ -0,0 +1,2 @@ +rep_changes +pub_switchover \ No newline at end of file diff --git a/src/test/subscription/testcase/pub_switchover.sh b/src/test/subscription/testcase/pub_switchover.sh new file mode 100644 index 000000000..ee6845671 --- /dev/null +++ b/src/test/subscription/testcase/pub_switchover.sh @@ -0,0 +1,74 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="sw_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + # Create some preexisting content on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep AS SELECT generate_series(1,10) AS a" + + # Setup structure on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int)" + + echo "create publication and subscription." + # Setup logical replication + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub for all tables" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "10" ]; then + echo "check initial data of table is replicated success" + else + echo "$failed_keyword when check initial data of table is replicated" + exit 1 + + fi + + # test incremental synchronous + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (11)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "11" ]; then + echo "check incremental data of table is replicated success" + else + echo "$failed_keyword when check incremental data of table is replicated" + exit 1 + fi + + echo "switchover pub_node2 to primary" + switchover_to_primary "pub_datanode2" + + exec_sql $case_db $pub_node2_port "INSERT INTO tab_rep VALUES (12)" + + wait_for_catchup $case_db $pub_node2_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "12" ]; then + echo "check incremental data of table is replicated after switchover success" + else + echo "$failed_keyword when check incremental data of table is replicated after switchover" + exit 1 + fi +} + +function tear_down(){ + switchover_to_primary "pub_datanode1" + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file diff --git a/src/test/subscription/testcase/rep_changes.sh b/src/test/subscription/testcase/rep_changes.sh new file mode 100644 index 000000000..cb2d5f22b --- /dev/null +++ b/src/test/subscription/testcase/rep_changes.sh @@ -0,0 +1,228 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="rep_db" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + # Create some preexisting content on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full2 (x text)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)" + exec_sql $case_db $pub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)" + exec_sql $case_db $pub_node1_port "ALTER TABLE tab_include REPLICA IDENTITY FULL" + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full_pk (a int primary key, b text)" + exec_sql $case_db $pub_node1_port "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL" + # Let this table with REPLICA IDENTITY NOTHING, allowing only INSERT changes. + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_nothing (a int)" + exec_sql $case_db $pub_node1_port "ALTER TABLE tab_nothing REPLICA IDENTITY NOTHING" + # Replicate the changes without replica identity index + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_no_replidentity_index(c1 int)" + exec_sql $case_db $pub_node1_port "CREATE INDEX idx_no_replidentity_index ON tab_no_replidentity_index(c1)" + + # Setup structure on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_notrep (a int)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_ins (a int)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full (a int)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full2 (x text)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key)" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full_pk (a int primary key, b text)" + exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL" + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_nothing (a int)" + # replication of the table with included index + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)" + exec_sql $case_db $sub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)" + exec_sql $case_db $sub_node1_port "ALTER TABLE tab_include REPLICA IDENTITY FULL" + # replication of the table without replica identity index + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_no_replidentity_index(c1 int)" + exec_sql $case_db $sub_node1_port "CREATE INDEX idx_no_replidentity_index ON tab_no_replidentity_index(c1)" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)" + exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_include, tab_nothing, tab_full_pk, tab_no_replidentity_index" + exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_notrep")" = "0" ]; then + echo "check non-replicated table is empty on subscriber success" + else + echo "$failed_keyword when check non-replicated table is empty on subscriber" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_ins")" = "1002" ]; then + echo "check initial data was copied to subscriber success" + else + echo "$failed_keyword when check initial data was copied to subscriber" + exit 1 + fi + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins SELECT generate_series(1,50)" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a > 20" + exec_sql $case_db $pub_node1_port "UPDATE tab_ins SET a = -a" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,50)" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep WHERE a > 20" + exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = -a" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_nothing VALUES (generate_series(1,20))" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_include SELECT generate_series(1,50)" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_include WHERE a > 20" + exec_sql $case_db $pub_node1_port "UPDATE tab_include SET a = -a" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_no_replidentity_index VALUES(1)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1052|1|1002" ]; then + echo "check replicated inserts on subscriber success" + else + echo "$failed_keyword when check replicated inserts on subscriber" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_rep")" = "20|-20|-1" ]; then + echo "check replicated changes on subscriber success" + else + echo "$failed_keyword when check replicated changes on subscriber" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_nothing")" = "20" ]; then + echo "check replicated changes with REPLICA IDENTITY NOTHING success" + else + echo "$failed_keyword when check replicated changes with REPLICA IDENTITY NOTHING" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_include")" = "20|-20|-1" ]; then + echo "check replicated changes with primary key index with included columns success" + else + echo "$failed_keyword when check replicated changes with primary key index with included columns" + exit 1 + fi + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT c1 FROM tab_no_replidentity_index")" = "1" ]; then + echo "check value replicated to subscriber without replica identity index success" + else + echo "$failed_keyword when check value replicated to subscriber without replica identity index" + exit 1 + fi + # insert some duplicate rows + exec_sql $case_db $pub_node1_port "INSERT INTO tab_full SELECT generate_series(1,10)" + + # Test behaviour of ALTER PUBLICATION ... DROP TABLE + # + # When a publisher drops a table from publication, it should also stop sending + # its changes to subscribers. We look at the subscriber whether it receives + # the row that is inserted to the table on the publisher after it is dropped + # from the publication. + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1052|1|1002" ]; then + echo "check rows on subscriber before table drop from publication success" + else + echo "$failed_keyword when check rows on subscriber before table drop from publication" + exit 1 + fi + + # Drop the table from publication + exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only DROP TABLE tab_ins" + # Insert a row in publisher, but publisher will not send this row to subscriber + exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins VALUES(8888)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + # Subscriber will not receive the inserted row, after table is dropped from + # publication, so row count should remain the same. + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1052|1|1002" ]; then + echo "check rows on subscriber after table drop from publication success" + else + echo "$failed_keyword when check rows on subscriber after table drop from publication" + exit 1 + fi + + # Delete the inserted row in publisher + exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a = 8888" + # Add the table to publication again + exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins" + # Refresh publication after table is added to publication + exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION" + + # Test replication with multiple publications for a subscription such that the + # operations are performed on the table from the first publication in the list. + + # Create tables on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE temp1 (a int)" + exec_sql $case_db $pub_node1_port "CREATE TABLE temp2 (a int)" + + # Create tables on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE temp1 (a int)" + exec_sql $case_db $sub_node1_port "CREATE TABLE temp2 (a int)" + + # Setup logical replication that will only be used for this test + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_temp1 FOR TABLE temp1 WITH (publish = insert)" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_temp2 FOR TABLE temp2" + + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + # Subscriber table will have no rows initially + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM temp1")" = "0" ]; then + echo "check initial rows on subscriber with multiple publications success" + else + echo "$failed_keyword when check initial rows on subscriber with multiple publications" + exit 1 + fi + + # Insert a row into the table that's part of first publication in subscriber + # list of publications. + exec_sql $case_db $pub_node1_port "INSERT INTO temp1 VALUES (1)" + + wait_for_catchup $case_db $pub_node1_port "tap_sub_temp1" + + # Subscriber should receive the inserted row + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM temp1")" = "1" ]; then + echo "check rows on subscriber with multiple publications success" + else + echo "$failed_keyword when check rows on subscriber with multiple publications" + exit 1 + fi + + # Drop subscription as we don't need it anymore + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub_temp1" + + # Drop publications as we don't need them anymore + exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub_temp1" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub_temp2" + + # Clean up the tables on both publisher and subscriber as we don't need them + exec_sql $case_db $pub_node1_port "DROP TABLE temp1" + exec_sql $case_db $pub_node1_port "DROP TABLE temp2" + exec_sql $case_db $sub_node1_port "DROP TABLE temp1" + exec_sql $case_db $sub_node1_port "DROP TABLE temp2" +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub, tap_pub_ins_only" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file