diff --git a/src/gausskernel/runtime/executor/execReplication.cpp b/src/gausskernel/runtime/executor/execReplication.cpp index f07a5d8a9..b6c15fd85 100644 --- a/src/gausskernel/runtime/executor/execReplication.cpp +++ b/src/gausskernel/runtime/executor/execReplication.cpp @@ -21,6 +21,7 @@ #include "access/xact.h" #include "commands/trigger.h" #include "commands/cluster.h" +#include "commands/matview.h" #include "catalog/pg_partition_fn.h" #include "catalog/pg_publication.h" #include "executor/executor.h" @@ -645,6 +646,16 @@ void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot, FakeRelation ExecARInsertTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, (HeapTuple)tuple, recheckIndexes); list_free_ext(recheckIndexes); + + /* try to insert tuple into mlog-table. */ + if (targetRel != NULL && targetRel->rd_mlogoid != InvalidOid) { + /* judge whether need to insert into mlog-table */ + if (targetRel->rd_tam_ops == TableAmUstore) { + tuple = (Tuple)UHeapToHeap(targetRel->rd_att, (UHeapTuple)tuple); + } + insert_into_mlog_table(targetRel, targetRel->rd_mlogoid, (HeapTuple)tuple, + &(((HeapTuple)tuple)->t_self), GetCurrentTransactionId(), 'I'); + } } /* @@ -769,6 +780,19 @@ void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot searchSlotTid, (HeapTuple)tuple, NULL, recheckIndexes); list_free(recheckIndexes); + + /* update tuple from mlog of matview(delete + insert). */ + if (rel != NULL && rel->rd_mlogoid != InvalidOid) { + /* judge whether need to insert into mlog-table */ + /* 1. delete one tuple. */ + insert_into_mlog_table(rel, rel->rd_mlogoid, NULL, searchSlotTid, tmfd.xmin, 'D'); + /* 2. insert new tuple */ + if (rel->rd_tam_ops == TableAmUstore) { + tuple = (Tuple)UHeapToHeap(rel->rd_att, (UHeapTuple)tuple); + } + insert_into_mlog_table(rel, rel->rd_mlogoid, (HeapTuple)tuple, &(((HeapTuple)tuple)->t_self), + GetCurrentTransactionId(), 'I'); + } } /* @@ -822,6 +846,12 @@ void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot /* AFTER ROW DELETE Triggers */ ExecARDeleteTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, NULL, tid); + + /* delete tuple from mlog of matview */ + if (rel != NULL && rel->rd_mlogoid != InvalidOid) { + /* judge whether need to insert into mlog-table */ + insert_into_mlog_table(rel, rel->rd_mlogoid, NULL, tid, tmfd.xmin, 'D'); + } } /* diff --git a/src/test/subscription/testcase/bugs.sh b/src/test/subscription/testcase/bugs.sh index 5eaa09491..b7b27a588 100644 --- a/src/test/subscription/testcase/bugs.sh +++ b/src/test/subscription/testcase/bugs.sh @@ -133,6 +133,76 @@ create table t_pubsub_0349( fi exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = error" + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE t_pubsub_0349" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE t_pubsub_0349" + + # BUG4: fix pg_replication_origin_status remain + exec_sql $case_db $pub_node1_port "create table tab_rep (a int primary key, b int); insert into tab_rep values (1,1)" + exec_sql $case_db $sub_node1_port "create table tab_rep (a int primary key, b int); insert into tab_rep values (1,1)" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub;" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_replication_origin_status")" = "0" ]; then + echo "check if pg_replication_origin_status is empty success" + else + echo "$failed_keyword when check if pg_replication_origin_status is empty" + exit 1 + fi + + # BUG5: fix IUD not record mlog + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE tab_rep" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE tab_rep" + + exec_sql $case_db $pub_node1_port "create table tab_rep (a int primary key, b int);" + exec_sql $case_db $sub_node1_port "create table tab_rep (a int primary key, b int);" + + exec_sql $case_db $sub_node1_port "create INCREMENTAL MATERIALIZED VIEW test_mv1 as select * from tab_rep;" + + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $pub_node1_port "insert into tab_rep values (1, 1);" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;" + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM test_mv1")" = "1|1" ]; then + echo "check if insert into incremental mview success" + else + echo "$failed_keyword when check if insert into incremental mview" + exit 1 + fi + + exec_sql $case_db $pub_node1_port "update tab_rep set b = 2 where a = 1;" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;" + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM test_mv1")" = "1|2" ]; then + echo "check if update into incremental mview success" + else + echo "$failed_keyword when check if update incremental mview" + exit 1 + fi + + exec_sql $case_db $pub_node1_port "delete from tab_rep where a = 1;" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;" + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM test_mv1")" = "0" ]; then + echo "check if delete incremental mview success" + else + echo "$failed_keyword when check if delete incremental mview" + exit 1 + fi } function tear_down() { diff --git a/src/test/subscription/testcase/matviews.sh b/src/test/subscription/testcase/matviews.sh index c04e57758..9381ca7ee 100644 --- a/src/test/subscription/testcase/matviews.sh +++ b/src/test/subscription/testcase/matviews.sh @@ -35,6 +35,22 @@ function test_1() { # not replicated, so this does not hang. echo "materialized view data not replicated"; + + # create a MV on the subscriber + exec_sql $case_db $sub_node1_port "CREATE MATERIALIZED VIEW testmv1 AS SELECT * FROM test1;" + exec_sql $case_db $pub_node1_port "INSERT INTO test1 (a, b) VALUES (3, 'three')" + wait_for_catchup $case_db $pub_node1_port "mysub" + + exec_sql $case_db $sub_node1_port "REFRESH MATERIALIZED VIEW testmv1" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM testmv1")" = "1|one +2|two +3|three" ]; then + echo "check if refresh materialized view success" + else + echo "$failed_keyword when check if refresh materialized view" + exit 1 + fi } function tear_down() {