From c05a52c0ecd95cce1154d259b69ebd6a59952271 Mon Sep 17 00:00:00 2001 From: totaj Date: Mon, 19 Jun 2023 15:46:09 +0800 Subject: [PATCH] Fix pgoutput core when there is no new data in logical xlog. --- .../storage/replication/pgoutput/pgoutput.cpp | 59 ++++++++----- src/test/subscription/schedule | 3 +- .../subscription/testcase/change_wal_level.sh | 85 +++++++++++++++++++ 3 files changed, 125 insertions(+), 22 deletions(-) create mode 100755 src/test/subscription/testcase/change_wal_level.sh diff --git a/src/gausskernel/storage/replication/pgoutput/pgoutput.cpp b/src/gausskernel/storage/replication/pgoutput/pgoutput.cpp index 8cb9a6a06..4478a8a0a 100644 --- a/src/gausskernel/storage/replication/pgoutput/pgoutput.cpp +++ b/src/gausskernel/storage/replication/pgoutput/pgoutput.cpp @@ -47,7 +47,7 @@ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId orig static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static bool ReplconninfoChanged(); -static void GetConninfo(StringInfoData* standbysInfo); +static bool GetConninfo(StringInfoData* standbysInfo); /* Entry in the map used to remember which relation schemas we sent. */ typedef struct RelationSyncEntry { @@ -233,7 +233,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *t OutputPluginWrite(ctx, true); /* - * Send the newest connecttion information to the subscriber, + * Send the newest connection information to the subscriber, * when the connection information about the standby changes. */ if ((t_thrd.publication_cxt.updateConninfoNeeded && ReplconninfoChanged()) || @@ -241,10 +241,12 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *t StringInfoData standbysInfo; initStringInfo(&standbysInfo); - GetConninfo(&standbysInfo); - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_conninfo(ctx->out, standbysInfo.data); - OutputPluginWrite(ctx, true); + /* If there is no standby, don't need to send connection info to subscriber */ + if (GetConninfo(&standbysInfo)) { + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_conninfo(ctx->out, standbysInfo.data); + OutputPluginWrite(ctx, true); + } FreeStringInfo(&standbysInfo); t_thrd.publication_cxt.firstTimeSendConninfo = false; @@ -342,29 +344,40 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Send the data */ switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple, data->binary); - OutputPluginWrite(ctx, true); + if (change->data.tp.newtuple != NULL) { + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple, data->binary); + OutputPluginWrite(ctx, true); + } break; case REORDER_BUFFER_CHANGE_UINSERT: - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary); - OutputPluginWrite(ctx, true); + if (change->data.utp.newtuple != NULL) { + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, relation, + (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary); + OutputPluginWrite(ctx, true); + } break; case REORDER_BUFFER_CHANGE_UPDATE: { - HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; + if (change->data.tp.newtuple != NULL) { + HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple, data->binary); - OutputPluginWrite(ctx, true); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple, data->binary); + OutputPluginWrite(ctx, true); + } break; } case REORDER_BUFFER_CHANGE_UUPDATE: { - HeapTuple oldtuple = change->data.utp.oldtuple ? ((HeapTuple)(&change->data.utp.oldtuple->tuple)) : NULL; + if (change->data.utp.newtuple != NULL) { + HeapTuple oldtuple = change->data.utp.oldtuple ? + ((HeapTuple)(&change->data.utp.oldtuple->tuple)) : NULL; - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary); - OutputPluginWrite(ctx, true); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple), + data->binary); + OutputPluginWrite(ctx, true); + } break; } case REORDER_BUFFER_CHANGE_DELETE: @@ -649,7 +662,8 @@ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashval } } -static void GetConninfo(StringInfoData* standbysInfo) +/* Get all primary and standby connection info. If there is no standby, return false, otherwise return true. */ +static bool GetConninfo(StringInfoData* standbysInfo) { bool primaryJoined = false; StringInfoData hosts; @@ -677,6 +691,9 @@ static void GetConninfo(StringInfoData* standbysInfo) } } appendStringInfo(standbysInfo, "host=%s port=%s", hosts.data, ports.data); + FreeStringInfo(&hosts); + FreeStringInfo(&ports); + return primaryJoined; } static inline bool ReplconninfoChanged() diff --git a/src/test/subscription/schedule b/src/test/subscription/schedule index c66d5af9b..6913c6c8d 100644 --- a/src/test/subscription/schedule +++ b/src/test/subscription/schedule @@ -9,4 +9,5 @@ rewrite sync encoding ddl -matviews \ No newline at end of file +matviews +change_wal_level \ No newline at end of file diff --git a/src/test/subscription/testcase/change_wal_level.sh b/src/test/subscription/testcase/change_wal_level.sh new file mode 100755 index 000000000..5ee70c255 --- /dev/null +++ b/src/test/subscription/testcase/change_wal_level.sh @@ -0,0 +1,85 @@ +#!/bin/sh + +source $1/env_utils.sh $1 $2 + +case_db="changel_wal_level" + +function test_1() { + echo "create database and tables." + exec_sql $db $pub_node1_port "CREATE DATABASE $case_db" + exec_sql $db $sub_node1_port "CREATE DATABASE $case_db" + # Create some preexisting content on publisher + exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,10)" + + # Setup structure on subscriber + exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key)" + + # Setup logical replication + echo "create publication and subscription." + publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd" + exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES" + exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" + + # Wait for initial table sync to finish + wait_for_subscription_sync $case_db $sub_node1_port + + exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + # change wal_level to non-logical + restart_guc "pub_datanode1" "wal_level = hot_standby" + + # Do IUD operatition, this operation will not record logical xlog, cause wal_level is not logical + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(1)" + exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = 2 where a = 1" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(2)" + + exec_sql $case_db $pub_node1_port "select pg_sleep(5)" + + # change wal_level to logical + restart_guc "pub_datanode1" "wal_level = logical" + wait_for_catchup $case_db $pub_node1_port "tap_sub" + + # no data in subscription side + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "0" ]; then + echo "check data synced for first sub success" + else + echo "$failed_keyword when check data synced for first sub" + exit 1 + fi + + # one row in publication side + if [ "$(exec_sql $case_db $pub_node1_port "SELECT count(*) FROM tab_rep")" = "1" ]; then + echo "check data in pub success" + else + echo "$failed_keyword when check data in pub" + exit 1 + fi + + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(10)" + exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(11)" + exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = 20 where a = 10" + exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep where a = 20" + + if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "1" ]; then + echo "check data synced for first sub success" + else + echo "$failed_keyword when check data synced for first sub" + exit 1 + fi +} + +function tear_down() { + exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub" + exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub" + + exec_sql $db $sub_node1_port "DROP DATABASE $case_db" + exec_sql $db $pub_node1_port "DROP DATABASE $case_db" + + echo "tear down" +} + +test_1 +tear_down \ No newline at end of file