修复发布订阅场景下订阅端的物化视图刷新不出新数据的问题

This commit is contained in:
chenxiaobin19
2024-02-07 16:39:29 +08:00
committed by zhang_xubo
parent 2eac9ed302
commit 3ee2c7f19d
3 changed files with 116 additions and 0 deletions

View File

@ -21,6 +21,7 @@
#include "access/xact.h"
#include "commands/trigger.h"
#include "commands/cluster.h"
#include "commands/matview.h"
#include "catalog/pg_partition_fn.h"
#include "catalog/pg_publication.h"
#include "executor/executor.h"
@ -645,6 +646,16 @@ void ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot, FakeRelation
ExecARInsertTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, (HeapTuple)tuple, recheckIndexes);
list_free_ext(recheckIndexes);
/* try to insert tuple into mlog-table. */
if (targetRel != NULL && targetRel->rd_mlogoid != InvalidOid) {
/* judge whether need to insert into mlog-table */
if (targetRel->rd_tam_ops == TableAmUstore) {
tuple = (Tuple)UHeapToHeap(targetRel->rd_att, (UHeapTuple)tuple);
}
insert_into_mlog_table(targetRel, targetRel->rd_mlogoid, (HeapTuple)tuple,
&(((HeapTuple)tuple)->t_self), GetCurrentTransactionId(), 'I');
}
}
/*
@ -769,6 +780,19 @@ void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot
searchSlotTid, (HeapTuple)tuple, NULL, recheckIndexes);
list_free(recheckIndexes);
/* update tuple from mlog of matview(delete + insert). */
if (rel != NULL && rel->rd_mlogoid != InvalidOid) {
/* judge whether need to insert into mlog-table */
/* 1. delete one tuple. */
insert_into_mlog_table(rel, rel->rd_mlogoid, NULL, searchSlotTid, tmfd.xmin, 'D');
/* 2. insert new tuple */
if (rel->rd_tam_ops == TableAmUstore) {
tuple = (Tuple)UHeapToHeap(rel->rd_att, (UHeapTuple)tuple);
}
insert_into_mlog_table(rel, rel->rd_mlogoid, (HeapTuple)tuple, &(((HeapTuple)tuple)->t_self),
GetCurrentTransactionId(), 'I');
}
}
/*
@ -822,6 +846,12 @@ void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot
/* AFTER ROW DELETE Triggers */
ExecARDeleteTriggers(estate, resultRelInfo, relAndPart->partOid, InvalidBktId, NULL, tid);
/* delete tuple from mlog of matview */
if (rel != NULL && rel->rd_mlogoid != InvalidOid) {
/* judge whether need to insert into mlog-table */
insert_into_mlog_table(rel, rel->rd_mlogoid, NULL, tid, tmfd.xmin, 'D');
}
}
/*

View File

@ -133,6 +133,76 @@ create table t_pubsub_0349(
fi
exec_sql $case_db $sub_node1_port "ALTER SYSTEM SET subscription_conflict_resolution = error"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE t_pubsub_0349"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE t_pubsub_0349"
# BUG4: fix pg_replication_origin_status remain
exec_sql $case_db $pub_node1_port "create table tab_rep (a int primary key, b int); insert into tab_rep values (1,1)"
exec_sql $case_db $sub_node1_port "create table tab_rep (a int primary key, b int); insert into tab_rep values (1,1)"
echo "create publication and subscription."
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION tap_sub;"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM pg_replication_origin_status")" = "0" ]; then
echo "check if pg_replication_origin_status is empty success"
else
echo "$failed_keyword when check if pg_replication_origin_status is empty"
exit 1
fi
# BUG5: fix IUD not record mlog
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub;DROP TABLE tab_rep"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub;DROP TABLE tab_rep"
exec_sql $case_db $pub_node1_port "create table tab_rep (a int primary key, b int);"
exec_sql $case_db $sub_node1_port "create table tab_rep (a int primary key, b int);"
exec_sql $case_db $sub_node1_port "create INCREMENTAL MATERIALIZED VIEW test_mv1 as select * from tab_rep;"
echo "create publication and subscription."
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
wait_for_subscription_sync $case_db $sub_node1_port
exec_sql $case_db $pub_node1_port "insert into tab_rep values (1, 1);"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM test_mv1")" = "1|1" ]; then
echo "check if insert into incremental mview success"
else
echo "$failed_keyword when check if insert into incremental mview"
exit 1
fi
exec_sql $case_db $pub_node1_port "update tab_rep set b = 2 where a = 1;"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM test_mv1")" = "1|2" ]; then
echo "check if update into incremental mview success"
else
echo "$failed_keyword when check if update incremental mview"
exit 1
fi
exec_sql $case_db $pub_node1_port "delete from tab_rep where a = 1;"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
exec_sql $case_db $sub_node1_port "REFRESH INCREMENTAL MATERIALIZED VIEW test_mv1;"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM test_mv1")" = "0" ]; then
echo "check if delete incremental mview success"
else
echo "$failed_keyword when check if delete incremental mview"
exit 1
fi
}
function tear_down() {

View File

@ -35,6 +35,22 @@ function test_1() {
# not replicated, so this does not hang.
echo "materialized view data not replicated";
# create a MV on the subscriber
exec_sql $case_db $sub_node1_port "CREATE MATERIALIZED VIEW testmv1 AS SELECT * FROM test1;"
exec_sql $case_db $pub_node1_port "INSERT INTO test1 (a, b) VALUES (3, 'three')"
wait_for_catchup $case_db $pub_node1_port "mysub"
exec_sql $case_db $sub_node1_port "REFRESH MATERIALIZED VIEW testmv1"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT * FROM testmv1")" = "1|one
2|two
3|three" ]; then
echo "check if refresh materialized view success"
else
echo "$failed_keyword when check if refresh materialized view"
exit 1
fi
}
function tear_down() {