From 3ee2c7f19d84cf590cb831d285121856de67f9ba Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Wed, 7 Feb 2024 16:39:29 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8F=91=E5=B8=83=E8=AE=A2?= =?UTF-8?q?=E9=98=85=E5=9C=BA=E6=99=AF=E4=B8=8B=E8=AE=A2=E9=98=85=E7=AB=AF?= =?UTF-8?q?=E7=9A=84=E7=89=A9=E5=8C=96=E8=A7=86=E5=9B=BE=E5=88=B7=E6=96=B0?= =?UTF-8?q?=E4=B8=8D=E5=87=BA=E6=96=B0=E6=95=B0=E6=8D=AE=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../runtime/executor/execReplication.cpp | 30 ++++++++ src/test/subscription/testcase/bugs.sh | 70 +++++++++++++++++++ src/test/subscription/testcase/matviews.sh | 16 +++++ 3 files changed, 116 insertions(+) diff --git a/src/gausskernel/runtime/executor/execReplication.cpp b/src/gausskernel/runtime/executor/execReplication.cpp index f07a5d8a9..b6c15fd85 100644 --- a/src/gausskernel/runtime/executor/execReplication.cpp +++ b/src/gausskernel/runtime/executor/execReplication.cpp @@ -21,6 +21,7 @@ #include "access/xact.h" #include "commands/trigger.h" #include "commands/cluster.h" +#include "commands/matview.h" #include "catalog/pg_partition_fn.h" #include "catalog/pg_publication.h" #include "executor/executor.h" @@ -645,6 +646,16 @@ void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot, FakeRelation ExecARInsertTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, (HeapTuple)tuple, recheckIndexes); list_free_ext(recheckIndexes); + + /* try to insert tuple into mlog-table. */ + if (targetRel != NULL && targetRel->rd_mlogoid != InvalidOid) { + /* judge whether need to insert into mlog-table */ + if (targetRel->rd_tam_ops == TableAmUstore) { + tuple = (Tuple)UHeapToHeap(targetRel->rd_att, (UHeapTuple)tuple); + } + insert_into_mlog_table(targetRel, targetRel->rd_mlogoid, (HeapTuple)tuple, + &(((HeapTuple)tuple)->t_self), GetCurrentTransactionId(), 'I'); + } } /* @@ -769,6 +780,19 @@ void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot searchSlotTid, (HeapTuple)tuple, NULL, recheckIndexes); list_free(recheckIndexes); + + /* update tuple from mlog of matview(delete + insert). */ + if (rel != NULL && rel->rd_mlogoid != InvalidOid) { + /* judge whether need to insert into mlog-table */ + /* 1. delete one tuple. */ + insert_into_mlog_table(rel, rel->rd_mlogoid, NULL, searchSlotTid, tmfd.xmin, 'D'); + /* 2. insert new tuple */ + if (rel->rd_tam_ops == TableAmUstore) { + tuple = (Tuple)UHeapToHeap(rel->rd_att, (UHeapTuple)tuple); + } + insert_into_mlog_table(rel, rel->rd_mlogoid, (HeapTuple)tuple, &(((HeapTuple)tuple)->t_self), + GetCurrentTransactionId(), 'I'); + } } /* @@ -822,6 +846,12 @@ void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot /* AFTER ROW DELETE Triggers */ ExecARDeleteTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, NULL, tid); + + /* delete tuple from mlog of matview */ + if (rel != NULL && rel->rd_mlogoid != InvalidOid) { + /* judge whether need to insert into mlog-table */ + insert_into_mlog_table(rel, rel->rd_mlogoid, NULL, tid, tmfd.xmin, 'D'); + } } /* diff --git a/src/test/subscription/testcase/bugs.sh b/src/test/subscription/testcase/bugs.sh index 5eaa09491..b7b27a588 100644 --- a/src/test/subscription/testcase/bugs.sh +++ b/src/test/subscription/testcase/bugs.sh @@ -133,6 +133,76 @@ create table t_pubsub_0349( fi exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = error" + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE t_pubsub_0349" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE t_pubsub_0349" + + # BUG4: fix pg_replication_origin_status remain + exec_sql $case_db $pub_node1_port "create table tab_rep (a int primary key, b int); insert into tab_rep values (1,1)" + exec_sql $case_db $sub_node1_port "create table tab_rep (a int primary key, b int); insert into tab_rep values (1,1)" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub;" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_replication_origin_status")" = "0" ]; then + echo "check if pg_replication_origin_status is empty success" + else + echo "$failed_keyword when check if pg_replication_origin_status is empty" + exit 1 + fi + + # BUG5: fix IUD not record mlog + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE tab_rep" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE tab_rep" + + exec_sql $case_db $pub_node1_port "create table tab_rep (a int primary key, b int);" + exec_sql $case_db $sub_node1_port "create table tab_rep (a int primary key, b int);" + + exec_sql $case_db $sub_node1_port "create INCREMENTAL MATERIALIZED VIEW test_mv1 as select * from tab_rep;" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $pub_node1_port "insert into tab_rep values (1, 1);" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;" + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM test_mv1")" = "1|1" ]; then + echo "check if insert into incremental mview success" + else + echo "$failed_keyword when check if insert into incremental mview" + exit 1 + fi + + exec_sql $case_db $pub_node1_port "update tab_rep set b = 2 where a = 1;" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;" + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM test_mv1")" = "1|2" ]; then + echo "check if update into incremental mview success" + else + echo "$failed_keyword when check if update incremental mview" + exit 1 + fi + + exec_sql $case_db $pub_node1_port "delete from tab_rep where a = 1;" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;" + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM test_mv1")" = "0" ]; then + echo "check if delete incremental mview success" + else + echo "$failed_keyword when check if delete incremental mview" + exit 1 + fi } function tear_down() { diff --git a/src/test/subscription/testcase/matviews.sh b/src/test/subscription/testcase/matviews.sh index c04e57758..9381ca7ee 100644 --- a/src/test/subscription/testcase/matviews.sh +++ b/src/test/subscription/testcase/matviews.sh @@ -35,6 +35,22 @@ function test_1() { # not replicated, so this does not hang. echo "materialized view data not replicated"; + + # create a MV on the subscriber + exec_sql $case_db $sub_node1_port "CREATE MATERIALIZED VIEW testmv1 AS SELECT * FROM test1;" + exec_sql $case_db $pub_node1_port "INSERT INTO test1 (a, b) VALUES (3, 'three')" + wait_for_catchup $case_db $pub_node1_port "mysub" + + exec_sql $case_db $sub_node1_port "REFRESH MATERIALIZED VIEW testmv1" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM testmv1")" = "1|one +2|two +3|three" ]; then + echo "check if refresh materialized view success" + else + echo "$failed_keyword when check if refresh materialized view" + exit 1 + fi } function tear_down() {