增加发布订阅fastcheck
This commit is contained in:
@ -11,6 +11,7 @@ set(CMAKE_MODULE_PATH
|
||||
add_subdirectory(isolation)
|
||||
if ("${ENABLE_MULTIPLE_NODES}" STREQUAL "OFF")
|
||||
add_subdirectory(regress)
|
||||
add_subdirectory(subscription)
|
||||
endif ()
|
||||
|
||||
if ("${ENABLE_UT}" STREQUAL "ON" AND "${ENABLE_MULTIPLE_NODES}" STREQUAL "OFF")
|
||||
|
1
src/test/subscription/CMakeLists.txt
Normal file
1
src/test/subscription/CMakeLists.txt
Normal file
@ -0,0 +1 @@
|
||||
add_custom_target(check COMMAND sh ${CMAKE_CURRENT_SOURCE_DIR}/run_check.sh ${CMAKE_CURRENT_SOURCE_DIR} \$\(p\))
|
36
src/test/subscription/GNUmakefile
Normal file
36
src/test/subscription/GNUmakefile
Normal file
@ -0,0 +1,36 @@
|
||||
#-------------------------------------------------------------------------
|
||||
#
|
||||
# GNUmakefile--
|
||||
# Makefile for src/test/subscription (the publication/subscription tests)
|
||||
#
|
||||
# Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
|
||||
# Portions Copyright (c) 1994, Regents of the University of California
|
||||
#
|
||||
# src/test/subscription/GNUmakefile
|
||||
#
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
subdir = src/test/subscription
|
||||
top_builddir = ../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
|
||||
# where to find psql for testing an existing installation
|
||||
PSQLDIR = $(bindir)
|
||||
p=25800
|
||||
##
|
||||
## Run tests
|
||||
##
|
||||
|
||||
REGRESS_OPTS = --dlpath=. $(EXTRA_REGRESS_OPTS)
|
||||
REG_CONF = --regconf=regress.conf
|
||||
|
||||
check: all
|
||||
sh $(CURDIR)/run_check.sh $(CURDIR) $p
|
||||
|
||||
##
|
||||
## Clean up
|
||||
##
|
||||
|
||||
# things created by various check targets
|
||||
clean distclean maintainer-clean:
|
||||
rm -rf $(pg_regress_clean_files)
|
12
src/test/subscription/Makefile
Normal file
12
src/test/subscription/Makefile
Normal file
@ -0,0 +1,12 @@
|
||||
# The openGauss make files exploit features of GNU make that other makes
|
||||
# do not have. Because it is a common mistake for users to try to build
|
||||
# openGauss with a different make, we have this make file that does nothing
|
||||
# but tell the user to use GNU make.
|
||||
|
||||
# If the user were using GNU make now, this file would not get used because
|
||||
# GNU make uses a make file named "GNUmakefile" in preference to "Makefile"
|
||||
# if it exists. openGauss is shipped with a "GNUmakefile".
|
||||
|
||||
all check:
|
||||
@echo "You must use GNU make to use openGauss. It may be installed"
|
||||
@echo "on your system with the name 'gmake'."
|
86
src/test/subscription/env_utils.sh
Normal file
86
src/test/subscription/env_utils.sh
Normal file
@ -0,0 +1,86 @@
|
||||
#!/bin/sh
|
||||
#some enviroment vars
|
||||
|
||||
export g_base_port=$2
|
||||
export prefix=${GAUSSHOME}
|
||||
export install_path="$prefix"
|
||||
export GAUSSHOME="$prefix"
|
||||
export LD_LIBRARY_PATH=$prefix/lib:$prefix/lib/libobs:$LD_LIBRARY_PATH
|
||||
export PATH="$prefix/bin":$PATH
|
||||
export g_local_ip="127.0.0.1"
|
||||
|
||||
db=postgres
|
||||
scripts_dir="$1"
|
||||
username=`whoami`
|
||||
passwd="Gauss@123"
|
||||
|
||||
export g_data_path="$scripts_dir/tmp_check"
|
||||
export g_username="$username"
|
||||
export pub_node1_port=`expr $g_base_port`
|
||||
export pub_node2_port=`expr $g_base_port \+ 3`
|
||||
export pub_node3_port=`expr $g_base_port \+ 6`
|
||||
export sub_node1_port=`expr $g_base_port \+ 9`
|
||||
export sub_node2_port=`expr $g_base_port \+ 12`
|
||||
export sub_node3_port=`expr $g_base_port \+ 15`
|
||||
|
||||
failed_keyword="testcase_failed"
|
||||
gsctl_wait_time=3600
|
||||
data_dir=$g_data_path
|
||||
|
||||
function exec_sql(){
|
||||
result=$(gsql -d $1 -p $2 -Atq -c "$3" |sed 'N;s/\n/ /g;b')
|
||||
if [ "$result" != "" ]; then
|
||||
echo $result
|
||||
fi
|
||||
}
|
||||
|
||||
function wait_for_subscription_sync(){
|
||||
max_attempts=20
|
||||
attempt=0
|
||||
query="SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
|
||||
while (($attempt < $max_attempts))
|
||||
do
|
||||
if [ "$(exec_sql $1 $2 "$query")" = "t" ]; then
|
||||
echo "initial data for subscription has been already synchronized"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
attempt=`expr $attempt \+ 1`
|
||||
done
|
||||
|
||||
if [ $attempt -eq $max_attempts ]; then
|
||||
echo "$failed_keyword, timed out waiting for subscriber to synchronize data."
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
function wait_for_catchup(){
|
||||
target_lsn=$(exec_sql $1 $2 "SELECT pg_current_xlog_location()")
|
||||
max_attempts=20
|
||||
attempt=0
|
||||
query="SELECT '$target_lsn' <= confirmed_flush FROM pg_replication_slots WHERE slot_name = '$3'";
|
||||
while (($attempt < $max_attempts))
|
||||
do
|
||||
if [ "$(exec_sql $1 $2 "$query")" = "t" ]; then
|
||||
echo "subscriber has been caught up"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
attempt=`expr $attempt \+ 1`
|
||||
done
|
||||
|
||||
if [ $attempt -eq $max_attempts ]; then
|
||||
echo "$failed_keyword, timed out waiting for catchup."
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
function switchover_to_primary() {
|
||||
gs_ctl switchover -w -t $gsctl_wait_time -D $data_dir/$1
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "switchover to primary success!"
|
||||
else
|
||||
echo "$failed_keyword, switchover to pirmary fail!"
|
||||
exit 1
|
||||
fi
|
||||
}
|
227
src/test/subscription/pubsub.py
Normal file
227
src/test/subscription/pubsub.py
Normal file
@ -0,0 +1,227 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import getopt, sys, os
|
||||
import shutil
|
||||
import time
|
||||
import string
|
||||
|
||||
g_data_path = os.environ.get("g_data_path")
|
||||
install_path = os.environ.get("install_path")
|
||||
g_local_ip = os.environ.get("g_local_ip")
|
||||
g_username = os.environ.get("g_username")
|
||||
g_passwd = "Gauss@123"
|
||||
pub_node1_port = int(os.environ.get("pub_node1_port"));
|
||||
pub_node2_port = int(os.environ.get("pub_node2_port"));
|
||||
pub_node3_port = int(os.environ.get("pub_node3_port"));
|
||||
sub_node1_port = int(os.environ.get("sub_node1_port"));
|
||||
sub_node2_port = int(os.environ.get("sub_node2_port"));
|
||||
sub_node3_port = int(os.environ.get("sub_node3_port"));
|
||||
|
||||
|
||||
class Pterodb():
|
||||
def __init__(self, data_node_num, port_arr, data_dir, dname_prefix):
|
||||
self.data_node_num = data_node_num
|
||||
self.data_dir = data_dir
|
||||
self.dname_prefix = dname_prefix
|
||||
|
||||
self.ha_port_arr = port_arr
|
||||
self.service_port_arr = [port_arr[i] + 1 for i in range(data_node_num)]
|
||||
self.heartbeat_port_arr = [port_arr[i] + 2 for i in range(data_node_num)]
|
||||
|
||||
def init_env(self):
|
||||
if(os.path.exists(self.data_dir) == False):
|
||||
os.mkdir(self.data_dir)
|
||||
|
||||
for i in range(1, self.data_node_num + 1):
|
||||
datanode_cmd_init = install_path + "/bin/gs_initdb -D " + self.data_dir + "/" + self.dname_prefix + str(i) + " --nodename=" + self.dname_prefix + str(i) + " -w " + g_passwd
|
||||
print datanode_cmd_init
|
||||
os.system(datanode_cmd_init)
|
||||
|
||||
conf_file = self.data_dir + "/" + self.dname_prefix + str(i) + "/postgresql.conf"
|
||||
self.__modify_conf_port(conf_file,self.ha_port_arr[i-1])
|
||||
self.__turn_on_pg_log(conf_file)
|
||||
self.__modify_conf_standby(conf_file,i)
|
||||
self.__modify_conf_application_name(conf_file, "dn" + str(i))
|
||||
self.__modify_remote_read_mode(conf_file)
|
||||
|
||||
hba_file = self.data_dir + "/" + self.dname_prefix + str(i) + "/pg_hba.conf"
|
||||
self.__config_replication_hba(hba_file)
|
||||
|
||||
def __modify_conf_standby(self, conf_file, n):
|
||||
j = 1
|
||||
|
||||
file_handler = open(conf_file,"a")
|
||||
string = "listen_addresses = '*'"+ "\n"
|
||||
file_handler.write(string)
|
||||
|
||||
for i in range(1, self.data_node_num + 1):
|
||||
if(i != n):
|
||||
#repl
|
||||
string = "replconninfo%d = 'localhost=%s localport=%d localheartbeatport=%d localservice=%d remotehost=%s remoteport=%d remoteheartbeatport=%d remoteservice=%d'\n" % \
|
||||
(j, g_local_ip, self.ha_port_arr[n-1], self.heartbeat_port_arr[n-1], self.service_port_arr[n-1], g_local_ip, self.ha_port_arr[i-1], self.heartbeat_port_arr[i-1], self.service_port_arr[i-1])
|
||||
print string
|
||||
file_handler.write(string)
|
||||
j = j + 1
|
||||
|
||||
file_handler.close()
|
||||
|
||||
def __modify_conf_application_name(self, conf_file, name):
|
||||
file_handler = open(conf_file,"a")
|
||||
string = "application_name = '" + name + "'" + "\n"
|
||||
file_handler.write(string)
|
||||
file_handler.close()
|
||||
|
||||
def __modify_remote_read_mode(self, conf_file):
|
||||
file_handler = open(conf_file,"a")
|
||||
string = "remote_read_mode = 'off'" + "\n"
|
||||
file_handler.write(string)
|
||||
file_handler.close()
|
||||
|
||||
def __modify_conf_port(self, conf_file, port):
|
||||
file_handler = open(conf_file,"a")
|
||||
|
||||
string = "port = " + str(port) + "\n"
|
||||
file_handler.write(string)
|
||||
|
||||
file_handler.close()
|
||||
|
||||
def __turn_on_pg_log(self, conf_file):
|
||||
file_handler = open(conf_file,"a")
|
||||
pglog_conf = "logging_collector = on \n"
|
||||
pglog_conf = pglog_conf + "log_directory = 'pg_log' \n"
|
||||
pglog_conf = pglog_conf + "log_line_prefix = '%m %c %d %p %a %x %e ' \n"
|
||||
pglog_conf = pglog_conf + "client_min_messages = ERROR \n"
|
||||
pglog_conf = pglog_conf + "enable_data_replicate = off \n"
|
||||
pglog_conf = pglog_conf + "replication_type = 1 \n"
|
||||
pglog_conf = pglog_conf + "wal_level = logical \n"
|
||||
pglog_conf = pglog_conf + "max_wal_senders = 8 \n"
|
||||
pglog_conf = pglog_conf + "enable_slot_log = on \n"
|
||||
file_handler.write(pglog_conf)
|
||||
file_handler.close()
|
||||
|
||||
def __config_replication_hba(self, hba_file):
|
||||
file_handler = open(hba_file,"a")
|
||||
replication_line = "host replication " + str(g_username) + " " + str(g_local_ip) + "/32 trust"
|
||||
file_handler.write(replication_line)
|
||||
file_handler.close()
|
||||
|
||||
def __create_default_db(self):
|
||||
# connect to primary DN to create db
|
||||
cmd = install_path + "/bin/gsql -p " + str(self.ha_port_arr[0]) + " postgres -c 'create database test'"
|
||||
os.system(cmd)
|
||||
|
||||
def __rm_pid_file(self):
|
||||
cmd = "rm -rf "
|
||||
# dn
|
||||
for i in range(1,self.data_node_num+1):
|
||||
rm_cmd = cmd + self.data_dir + "/" + self.dname_prefix + str(i) + "/postmaster.pid"
|
||||
print rm_cmd
|
||||
os.system(rm_cmd)
|
||||
|
||||
|
||||
def __start_server(self):
|
||||
#clean evn
|
||||
self.__rm_pid_file()
|
||||
|
||||
#start primary
|
||||
datanode_cmd = install_path + "/bin/gs_ctl " + "start -M primary" + " -D " + self.data_dir + "/" + self.dname_prefix + str(1) + " > " + self.data_dir + "/" + self.dname_prefix + str(1) + "/logdn" + str(1) + ".log 2>&1 &"
|
||||
print datanode_cmd
|
||||
os.system(datanode_cmd)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
#start data_node_standby1,2,3...7
|
||||
for i in range(2,self.data_node_num+1):
|
||||
datanode_cmd = install_path + "/bin/gs_ctl" + " start -M standby "+ " -D " + self.data_dir + "/" + self.dname_prefix + str(i) + " > " + self.data_dir + "/" + self.dname_prefix + str(i) + "/logdn" + str(i) + ".log 2>&1 &"
|
||||
print datanode_cmd
|
||||
os.system(datanode_cmd)
|
||||
time.sleep(5)
|
||||
|
||||
datanode_cmd = install_path + "/bin/gs_ctl" + " build "+ "-D " + self.data_dir + "/" + self.dname_prefix + str(i) + " -Z single_node " + " > " + self.data_dir + "/" + self.dname_prefix + str(i) + "/logdn" + str(i) + ".log 2>&1 &"
|
||||
print datanode_cmd
|
||||
os.system(datanode_cmd)
|
||||
time.sleep(5)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
def __stop_server(self):
|
||||
for i in range(1,self.data_node_num+1):
|
||||
datanode_cmd = install_path + "/bin/gs_ctl stop -D " + self.data_dir + "/" + self.dname_prefix + str(i) + " -Z single_node"
|
||||
print datanode_cmd
|
||||
os.system(datanode_cmd)
|
||||
|
||||
def run(self, run_type):
|
||||
if(run_type == 0):
|
||||
self.init_env()
|
||||
#print "init_env ok"
|
||||
self.__start_server()
|
||||
#print "start_server ok"
|
||||
self.__create_default_db()
|
||||
#print "create_default_db ok"
|
||||
print "start ok"
|
||||
elif(run_type == 1):
|
||||
self.__start_server()
|
||||
print "start ok"
|
||||
elif(run_type == 2):
|
||||
self.__stop_server()
|
||||
print "stop ok"
|
||||
|
||||
def usage():
|
||||
print "------------------------------------------------------"
|
||||
print "python pubsub.py\n"
|
||||
print " -d datanode_num, set and start up dn"
|
||||
print " -s means start"
|
||||
print " -o means stop"
|
||||
print " -D data directory"
|
||||
print "------------------------------------------------------"
|
||||
|
||||
def main():
|
||||
try:
|
||||
opts, args = getopt.getopt(sys.argv[1:], "hD:d:sov", ["help", "data_dir="])
|
||||
except getopt.GetoptError, err:
|
||||
# print help information and exit:
|
||||
print str(err) # will print something like "option -a not recognized"
|
||||
# usage()
|
||||
sys.exit(2)
|
||||
|
||||
datanode_num = 0
|
||||
data_dir = g_data_path
|
||||
run_type = 0
|
||||
|
||||
for o, a in opts:
|
||||
if o == "-v":
|
||||
verbose = True
|
||||
elif o in ("-h", "--help"):
|
||||
usage()
|
||||
sys.exit()
|
||||
elif o in ("-D", "data_dir"):
|
||||
data_dir = a
|
||||
elif o in ("-d", "--datanode"):
|
||||
datanode_num = int(a)
|
||||
elif o in ("-s", "--start"):
|
||||
run_type = 1
|
||||
elif o in ("-o", "--stop"):
|
||||
run_type = 2
|
||||
else:
|
||||
assert False, "unhandled option"
|
||||
|
||||
if((datanode_num == 0) and run_type == 0):
|
||||
usage()
|
||||
sys.exit()
|
||||
|
||||
create_key_cipher_cmd = install_path + "/bin/gs_guc generate -S " + g_passwd + " -D " + install_path + "/bin -o subscription"
|
||||
print create_key_cipher_cmd
|
||||
os.system(create_key_cipher_cmd)
|
||||
|
||||
pub_port_arr = [pub_node1_port, pub_node2_port, pub_node3_port];
|
||||
ptdb = Pterodb(datanode_num, pub_port_arr, data_dir, "pub_datanode");
|
||||
ptdb.run(run_type)
|
||||
|
||||
sub_port_arr = [sub_node1_port, sub_node2_port, sub_node3_port];
|
||||
ptdb = Pterodb(datanode_num, sub_port_arr, data_dir, "sub_datanode");
|
||||
ptdb.run(run_type)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
52
src/test/subscription/run_check.sh
Normal file
52
src/test/subscription/run_check.sh
Normal file
@ -0,0 +1,52 @@
|
||||
#!/bin/sh
|
||||
|
||||
count=0
|
||||
source $1/env_utils.sh $1 $2
|
||||
|
||||
#clean temporary files generated after last check
|
||||
echo "removing $g_data_path"
|
||||
if [ -d $g_data_path ]; then
|
||||
rm -rf $g_data_path
|
||||
fi
|
||||
|
||||
echo "removing $1/results"
|
||||
if [ -d "$1/results" ]; then
|
||||
rm -rf $1/results
|
||||
fi
|
||||
|
||||
mkdir $1/results
|
||||
|
||||
total_starttime=`date +"%Y-%m-%d %H:%M:%S"`
|
||||
total_startvalue=`date -d "$total_starttime" +%s`
|
||||
|
||||
#init and start the database
|
||||
printf "init and start the database\n"
|
||||
node_num=3
|
||||
python2 $scripts_dir/pubsub.py -d $node_num > $1/results/deploy_cluster.log 2>&1
|
||||
|
||||
printf "%-50s%-10s%-10s\n" "testcase" "result" "time(s)"
|
||||
for line in `cat $1/schedule | grep -v ^#`
|
||||
do
|
||||
printf "%-50s" $line
|
||||
starttime=`date +"%Y-%m-%d %H:%M:%S"`
|
||||
sh $1/testcase/$line.sh $1 $2 > $1/results/$line.log 2>&1
|
||||
count=`expr $count + 1`
|
||||
endtime=`date +"%Y-%m-%d %H:%M:%S"`
|
||||
starttime1=`date -d "$starttime" +%s`
|
||||
endtime1=`date -d "$endtime" +%s`
|
||||
interval=`expr $endtime1 - $starttime1`
|
||||
if [ $( grep "$failed_keyword" $1/results/$line.log | wc -l ) -eq 0 ]; then
|
||||
printf "%-10s%-10s\n" ".... ok" $interval
|
||||
else
|
||||
printf "%-10s%-10s\n" ".... FAILED" $interval
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
printf "stop the database\n"
|
||||
python2 $scripts_dir/pubsub.py -o -d $node_num > $1/results/stop_cluster.log 2>&1
|
||||
|
||||
total_endtime=`date +"%Y-%m-%d %H:%M:%S"`
|
||||
total_endvalue=`date -d "$total_endtime" +%s`
|
||||
printf "all %d tests passed.\n" $count
|
||||
printf "total time: %ss\n" $(($total_endvalue - $total_startvalue))
|
2
src/test/subscription/schedule
Normal file
2
src/test/subscription/schedule
Normal file
@ -0,0 +1,2 @@
|
||||
rep_changes
|
||||
pub_switchover
|
74
src/test/subscription/testcase/pub_switchover.sh
Normal file
74
src/test/subscription/testcase/pub_switchover.sh
Normal file
@ -0,0 +1,74 @@
|
||||
#!/bin/sh
|
||||
|
||||
source $1/env_utils.sh $1 $2
|
||||
|
||||
case_db="sw_db"
|
||||
|
||||
function test_1() {
|
||||
echo "create database and tables."
|
||||
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
|
||||
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
|
||||
# Create some preexisting content on publisher
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep AS SELECT generate_series(1,10) AS a"
|
||||
|
||||
# Setup structure on subscriber
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int)"
|
||||
|
||||
echo "create publication and subscription."
|
||||
# Setup logical replication
|
||||
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
|
||||
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub for all tables"
|
||||
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||
|
||||
# Wait for initial table sync to finish
|
||||
wait_for_subscription_sync $case_db $sub_node1_port
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "10" ]; then
|
||||
echo "check initial data of table is replicated success"
|
||||
else
|
||||
echo "$failed_keyword when check initial data of table is replicated"
|
||||
exit 1
|
||||
|
||||
fi
|
||||
|
||||
# test incremental synchronous
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep VALUES (11)"
|
||||
|
||||
wait_for_catchup $case_db $pub_node1_port "tap_sub"
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "11" ]; then
|
||||
echo "check incremental data of table is replicated success"
|
||||
else
|
||||
echo "$failed_keyword when check incremental data of table is replicated"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "switchover pub_node2 to primary"
|
||||
switchover_to_primary "pub_datanode2"
|
||||
|
||||
exec_sql $case_db $pub_node2_port "INSERT INTO tab_rep VALUES (12)"
|
||||
|
||||
wait_for_catchup $case_db $pub_node2_port "tap_sub"
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "12" ]; then
|
||||
echo "check incremental data of table is replicated after switchover success"
|
||||
else
|
||||
echo "$failed_keyword when check incremental data of table is replicated after switchover"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
function tear_down(){
|
||||
switchover_to_primary "pub_datanode1"
|
||||
|
||||
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub"
|
||||
exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub"
|
||||
|
||||
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
|
||||
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
|
||||
|
||||
echo "tear down"
|
||||
}
|
||||
|
||||
test_1
|
||||
tear_down
|
228
src/test/subscription/testcase/rep_changes.sh
Normal file
228
src/test/subscription/testcase/rep_changes.sh
Normal file
@ -0,0 +1,228 @@
|
||||
#!/bin/sh
|
||||
|
||||
source $1/env_utils.sh $1 $2
|
||||
|
||||
case_db="rep_db"
|
||||
|
||||
function test_1() {
|
||||
echo "create database and tables."
|
||||
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
|
||||
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
|
||||
# Create some preexisting content on publisher
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full2 (x text)"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_full2 VALUES ('a'), ('b'), ('b')"
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)"
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)"
|
||||
exec_sql $case_db $pub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)"
|
||||
exec_sql $case_db $pub_node1_port "ALTER TABLE tab_include REPLICA IDENTITY FULL"
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_full_pk (a int primary key, b text)"
|
||||
exec_sql $case_db $pub_node1_port "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL"
|
||||
# Let this table with REPLICA IDENTITY NOTHING, allowing only INSERT changes.
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_nothing (a int)"
|
||||
exec_sql $case_db $pub_node1_port "ALTER TABLE tab_nothing REPLICA IDENTITY NOTHING"
|
||||
# Replicate the changes without replica identity index
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_no_replidentity_index(c1 int)"
|
||||
exec_sql $case_db $pub_node1_port "CREATE INDEX idx_no_replidentity_index ON tab_no_replidentity_index(c1)"
|
||||
|
||||
# Setup structure on subscriber
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_notrep (a int)"
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_ins (a int)"
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full (a int)"
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full2 (x text)"
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key)"
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_full_pk (a int primary key, b text)"
|
||||
exec_sql $case_db $sub_node1_port "ALTER TABLE tab_full_pk REPLICA IDENTITY FULL"
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_nothing (a int)"
|
||||
# replication of the table with included index
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_include (a int, b text) WITH (storage_type = ustore)"
|
||||
exec_sql $case_db $sub_node1_port "CREATE INDEX covering ON tab_include USING ubtree (a) INCLUDE(b)"
|
||||
exec_sql $case_db $sub_node1_port "ALTER TABLE tab_include REPLICA IDENTITY FULL"
|
||||
# replication of the table without replica identity index
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_no_replidentity_index(c1 int)"
|
||||
exec_sql $case_db $sub_node1_port "CREATE INDEX idx_no_replidentity_index ON tab_no_replidentity_index(c1)"
|
||||
|
||||
# Setup logical replication
|
||||
echo "create publication and subscription."
|
||||
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
|
||||
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub"
|
||||
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_ins_only WITH (publish = insert)"
|
||||
exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub ADD TABLE tab_rep, tab_full, tab_full2, tab_include, tab_nothing, tab_full_pk, tab_no_replidentity_index"
|
||||
exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"
|
||||
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
|
||||
|
||||
# Wait for initial table sync to finish
|
||||
wait_for_subscription_sync $case_db $sub_node1_port
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_notrep")" = "0" ]; then
|
||||
echo "check non-replicated table is empty on subscriber success"
|
||||
else
|
||||
echo "$failed_keyword when check non-replicated table is empty on subscriber"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_ins")" = "1002" ]; then
|
||||
echo "check initial data was copied to subscriber success"
|
||||
else
|
||||
echo "$failed_keyword when check initial data was copied to subscriber"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins SELECT generate_series(1,50)"
|
||||
exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a > 20"
|
||||
exec_sql $case_db $pub_node1_port "UPDATE tab_ins SET a = -a"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,50)"
|
||||
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep WHERE a > 20"
|
||||
exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = -a"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_full_pk VALUES (1, 'foo'), (2, 'baz')"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_nothing VALUES (generate_series(1,20))"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_include SELECT generate_series(1,50)"
|
||||
exec_sql $case_db $pub_node1_port "DELETE FROM tab_include WHERE a > 20"
|
||||
exec_sql $case_db $pub_node1_port "UPDATE tab_include SET a = -a"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_no_replidentity_index VALUES(1)"
|
||||
|
||||
wait_for_catchup $case_db $pub_node1_port "tap_sub"
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1052|1|1002" ]; then
|
||||
echo "check replicated inserts on subscriber success"
|
||||
else
|
||||
echo "$failed_keyword when check replicated inserts on subscriber"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_rep")" = "20|-20|-1" ]; then
|
||||
echo "check replicated changes on subscriber success"
|
||||
else
|
||||
echo "$failed_keyword when check replicated changes on subscriber"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_nothing")" = "20" ]; then
|
||||
echo "check replicated changes with REPLICA IDENTITY NOTHING success"
|
||||
else
|
||||
echo "$failed_keyword when check replicated changes with REPLICA IDENTITY NOTHING"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_include")" = "20|-20|-1" ]; then
|
||||
echo "check replicated changes with primary key index with included columns success"
|
||||
else
|
||||
echo "$failed_keyword when check replicated changes with primary key index with included columns"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT c1 FROM tab_no_replidentity_index")" = "1" ]; then
|
||||
echo "check value replicated to subscriber without replica identity index success"
|
||||
else
|
||||
echo "$failed_keyword when check value replicated to subscriber without replica identity index"
|
||||
exit 1
|
||||
fi
|
||||
# insert some duplicate rows
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_full SELECT generate_series(1,10)"
|
||||
|
||||
# Test behaviour of ALTER PUBLICATION ... DROP TABLE
|
||||
#
|
||||
# When a publisher drops a table from publication, it should also stop sending
|
||||
# its changes to subscribers. We look at the subscriber whether it receives
|
||||
# the row that is inserted to the table on the publisher after it is dropped
|
||||
# from the publication.
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1052|1|1002" ]; then
|
||||
echo "check rows on subscriber before table drop from publication success"
|
||||
else
|
||||
echo "$failed_keyword when check rows on subscriber before table drop from publication"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Drop the table from publication
|
||||
exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only DROP TABLE tab_ins"
|
||||
# Insert a row in publisher, but publisher will not send this row to subscriber
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_ins VALUES(8888)"
|
||||
|
||||
wait_for_catchup $case_db $pub_node1_port "tap_sub"
|
||||
|
||||
# Subscriber will not receive the inserted row, after table is dropped from
|
||||
# publication, so row count should remain the same.
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*), min(a), max(a) FROM tab_ins")" = "1052|1|1002" ]; then
|
||||
echo "check rows on subscriber after table drop from publication success"
|
||||
else
|
||||
echo "$failed_keyword when check rows on subscriber after table drop from publication"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Delete the inserted row in publisher
|
||||
exec_sql $case_db $pub_node1_port "DELETE FROM tab_ins WHERE a = 8888"
|
||||
# Add the table to publication again
|
||||
exec_sql $case_db $pub_node1_port "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_ins"
|
||||
# Refresh publication after table is added to publication
|
||||
exec_sql $case_db $sub_node1_port "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"
|
||||
|
||||
# Test replication with multiple publications for a subscription such that the
|
||||
# operations are performed on the table from the first publication in the list.
|
||||
|
||||
# Create tables on publisher
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE temp1 (a int)"
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE temp2 (a int)"
|
||||
|
||||
# Create tables on subscriber
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE temp1 (a int)"
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE temp2 (a int)"
|
||||
|
||||
# Setup logical replication that will only be used for this test
|
||||
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_temp1 FOR TABLE temp1 WITH (publish = insert)"
|
||||
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub_temp2 FOR TABLE temp2"
|
||||
|
||||
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub_temp1 CONNECTION '$publisher_connstr' PUBLICATION tap_pub_temp1, tap_pub_temp2"
|
||||
|
||||
# Wait for initial table sync to finish
|
||||
wait_for_subscription_sync $case_db $sub_node1_port
|
||||
|
||||
# Subscriber table will have no rows initially
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM temp1")" = "0" ]; then
|
||||
echo "check initial rows on subscriber with multiple publications success"
|
||||
else
|
||||
echo "$failed_keyword when check initial rows on subscriber with multiple publications"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Insert a row into the table that's part of first publication in subscriber
|
||||
# list of publications.
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO temp1 VALUES (1)"
|
||||
|
||||
wait_for_catchup $case_db $pub_node1_port "tap_sub_temp1"
|
||||
|
||||
# Subscriber should receive the inserted row
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM temp1")" = "1" ]; then
|
||||
echo "check rows on subscriber with multiple publications success"
|
||||
else
|
||||
echo "$failed_keyword when check rows on subscriber with multiple publications"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Drop subscription as we don't need it anymore
|
||||
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub_temp1"
|
||||
|
||||
# Drop publications as we don't need them anymore
|
||||
exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub_temp1"
|
||||
exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub_temp2"
|
||||
|
||||
# Clean up the tables on both publisher and subscriber as we don't need them
|
||||
exec_sql $case_db $pub_node1_port "DROP TABLE temp1"
|
||||
exec_sql $case_db $pub_node1_port "DROP TABLE temp2"
|
||||
exec_sql $case_db $sub_node1_port "DROP TABLE temp1"
|
||||
exec_sql $case_db $sub_node1_port "DROP TABLE temp2"
|
||||
}
|
||||
|
||||
function tear_down() {
|
||||
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub"
|
||||
exec_sql $case_db $pub_node1_port "DROP PUBLICATION tap_pub, tap_pub_ins_only"
|
||||
|
||||
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
|
||||
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
|
||||
|
||||
echo "tear down"
|
||||
}
|
||||
|
||||
test_1
|
||||
tear_down
|
Reference in New Issue
Block a user