Fix pgoutput core when there is no new data in logical xlog.

This commit is contained in:
totaj
2023-06-19 15:46:09 +08:00
parent f48d7b96a5
commit c05a52c0ec
3 changed files with 125 additions and 22 deletions

View File

@ -47,7 +47,7 @@ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId orig
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue);
static bool ReplconninfoChanged();
static void GetConninfo(StringInfoData* standbysInfo);
static bool GetConninfo(StringInfoData* standbysInfo);
/* Entry in the map used to remember which relation schemas we sent. */
typedef struct RelationSyncEntry {
@ -233,7 +233,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *t
OutputPluginWrite(ctx, true);
/*
* Send the newest connecttion information to the subscriber,
* Send the newest connection information to the subscriber,
* when the connection information about the standby changes.
*/
if ((t_thrd.publication_cxt.updateConninfoNeeded && ReplconninfoChanged()) ||
@ -241,10 +241,12 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *t
StringInfoData standbysInfo;
initStringInfo(&standbysInfo);
GetConninfo(&standbysInfo);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_conninfo(ctx->out, standbysInfo.data);
OutputPluginWrite(ctx, true);
/* If there is no standby, don't need to send connection info to subscriber */
if (GetConninfo(&standbysInfo)) {
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_conninfo(ctx->out, standbysInfo.data);
OutputPluginWrite(ctx, true);
}
FreeStringInfo(&standbysInfo);
t_thrd.publication_cxt.firstTimeSendConninfo = false;
@ -342,29 +344,40 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Send the data */
switch (change->action) {
case REORDER_BUFFER_CHANGE_INSERT:
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple, data->binary);
OutputPluginWrite(ctx, true);
if (change->data.tp.newtuple != NULL) {
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple, data->binary);
OutputPluginWrite(ctx, true);
}
break;
case REORDER_BUFFER_CHANGE_UINSERT:
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
OutputPluginWrite(ctx, true);
if (change->data.utp.newtuple != NULL) {
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation,
(HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
OutputPluginWrite(ctx, true);
}
break;
case REORDER_BUFFER_CHANGE_UPDATE: {
HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
if (change->data.tp.newtuple != NULL) {
HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple, data->binary);
OutputPluginWrite(ctx, true);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple, data->binary);
OutputPluginWrite(ctx, true);
}
break;
}
case REORDER_BUFFER_CHANGE_UUPDATE: {
HeapTuple oldtuple = change->data.utp.oldtuple ? ((HeapTuple)(&change->data.utp.oldtuple->tuple)) : NULL;
if (change->data.utp.newtuple != NULL) {
HeapTuple oldtuple = change->data.utp.oldtuple ?
((HeapTuple)(&change->data.utp.oldtuple->tuple)) : NULL;
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
OutputPluginWrite(ctx, true);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple),
data->binary);
OutputPluginWrite(ctx, true);
}
break;
}
case REORDER_BUFFER_CHANGE_DELETE:
@ -649,7 +662,8 @@ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashval
}
}
static void GetConninfo(StringInfoData* standbysInfo)
/* Get all primary and standby connection info. If there is no standby, return false, otherwise return true. */
static bool GetConninfo(StringInfoData* standbysInfo)
{
bool primaryJoined = false;
StringInfoData hosts;
@ -677,6 +691,9 @@ static void GetConninfo(StringInfoData* standbysInfo)
}
}
appendStringInfo(standbysInfo, "host=%s port=%s", hosts.data, ports.data);
FreeStringInfo(&hosts);
FreeStringInfo(&ports);
return primaryJoined;
}
static inline bool ReplconninfoChanged()

View File

@ -9,4 +9,5 @@ rewrite
sync
encoding
ddl
matviews
matviews
change_wal_level

View File

@ -0,0 +1,85 @@
#!/bin/sh
source $1/env_utils.sh $1 $2
case_db="changel_wal_level"
function test_1() {
echo "create database and tables."
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
# Create some preexisting content on publisher
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,10)"
# Setup structure on subscriber
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key)"
# Setup logical replication
echo "create publication and subscription."
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
# Wait for initial table sync to finish
wait_for_subscription_sync $case_db $sub_node1_port
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
# change wal_level to non-logical
restart_guc "pub_datanode1" "wal_level = hot_standby"
# Do IUD operatition, this operation will not record logical xlog, cause wal_level is not logical
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(1)"
exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = 2 where a = 1"
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(2)"
exec_sql $case_db $pub_node1_port "select pg_sleep(5)"
# change wal_level to logical
restart_guc "pub_datanode1" "wal_level = logical"
wait_for_catchup $case_db $pub_node1_port "tap_sub"
# no data in subscription side
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "0" ]; then
echo "check data synced for first sub success"
else
echo "$failed_keyword when check data synced for first sub"
exit 1
fi
# one row in publication side
if [ "$(exec_sql $case_db $pub_node1_port "SELECT count(*) FROM tab_rep")" = "1" ]; then
echo "check data in pub success"
else
echo "$failed_keyword when check data in pub"
exit 1
fi
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(10)"
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(11)"
exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = 20 where a = 10"
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep where a = 20"
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "1" ]; then
echo "check data synced for first sub success"
else
echo "$failed_keyword when check data synced for first sub"
exit 1
fi
}
function tear_down() {
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub"
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
echo "tear down"
}
test_1
tear_down