!3593 修复发布订阅在IU操作没有newtuple时的core问题
Merge pull request !3593 from pengjiong/fix_date
This commit is contained in:
@ -47,7 +47,7 @@ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId orig
|
||||
static List *LoadPublications(List *pubnames);
|
||||
static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue);
|
||||
static bool ReplconninfoChanged();
|
||||
static void GetConninfo(StringInfoData* standbysInfo);
|
||||
static bool GetConninfo(StringInfoData* standbysInfo);
|
||||
|
||||
/* Entry in the map used to remember which relation schemas we sent. */
|
||||
typedef struct RelationSyncEntry {
|
||||
@ -233,7 +233,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *t
|
||||
OutputPluginWrite(ctx, true);
|
||||
|
||||
/*
|
||||
* Send the newest connecttion information to the subscriber,
|
||||
* Send the newest connection information to the subscriber,
|
||||
* when the connection information about the standby changes.
|
||||
*/
|
||||
if ((t_thrd.publication_cxt.updateConninfoNeeded && ReplconninfoChanged()) ||
|
||||
@ -241,10 +241,12 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *t
|
||||
StringInfoData standbysInfo;
|
||||
initStringInfo(&standbysInfo);
|
||||
|
||||
GetConninfo(&standbysInfo);
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_conninfo(ctx->out, standbysInfo.data);
|
||||
OutputPluginWrite(ctx, true);
|
||||
/* If there is no standby, don't need to send connection info to subscriber */
|
||||
if (GetConninfo(&standbysInfo)) {
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_conninfo(ctx->out, standbysInfo.data);
|
||||
OutputPluginWrite(ctx, true);
|
||||
}
|
||||
|
||||
FreeStringInfo(&standbysInfo);
|
||||
t_thrd.publication_cxt.firstTimeSendConninfo = false;
|
||||
@ -342,29 +344,40 @@ static void pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
/* Send the data */
|
||||
switch (change->action) {
|
||||
case REORDER_BUFFER_CHANGE_INSERT:
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple, data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
if (change->data.tp.newtuple != NULL) {
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_insert(ctx->out, relation, &change->data.tp.newtuple->tuple, data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_UINSERT:
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_insert(ctx->out, relation, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
if (change->data.utp.newtuple != NULL) {
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_insert(ctx->out, relation,
|
||||
(HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
}
|
||||
break;
|
||||
case REORDER_BUFFER_CHANGE_UPDATE: {
|
||||
HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
|
||||
if (change->data.tp.newtuple != NULL) {
|
||||
HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL;
|
||||
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple, data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_update(ctx->out, relation, oldtuple, &change->data.tp.newtuple->tuple, data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_UUPDATE: {
|
||||
HeapTuple oldtuple = change->data.utp.oldtuple ? ((HeapTuple)(&change->data.utp.oldtuple->tuple)) : NULL;
|
||||
if (change->data.utp.newtuple != NULL) {
|
||||
HeapTuple oldtuple = change->data.utp.oldtuple ?
|
||||
((HeapTuple)(&change->data.utp.oldtuple->tuple)) : NULL;
|
||||
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple), data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
logicalrep_write_update(ctx->out, relation, oldtuple, (HeapTuple)(&change->data.utp.newtuple->tuple),
|
||||
data->binary);
|
||||
OutputPluginWrite(ctx, true);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case REORDER_BUFFER_CHANGE_DELETE:
|
||||
@ -649,7 +662,8 @@ static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashval
|
||||
}
|
||||
}
|
||||
|
||||
static void GetConninfo(StringInfoData* standbysInfo)
|
||||
/* Get all primary and standby connection info. If there is no standby, return false, otherwise return true. */
|
||||
static bool GetConninfo(StringInfoData* standbysInfo)
|
||||
{
|
||||
bool primaryJoined = false;
|
||||
StringInfoData hosts;
|
||||
@ -677,6 +691,9 @@ static void GetConninfo(StringInfoData* standbysInfo)
|
||||
}
|
||||
}
|
||||
appendStringInfo(standbysInfo, "host=%s port=%s", hosts.data, ports.data);
|
||||
FreeStringInfo(&hosts);
|
||||
FreeStringInfo(&ports);
|
||||
return primaryJoined;
|
||||
}
|
||||
|
||||
static inline bool ReplconninfoChanged()
|
||||
|
||||
@ -9,4 +9,5 @@ rewrite
|
||||
sync
|
||||
encoding
|
||||
ddl
|
||||
matviews
|
||||
matviews
|
||||
change_wal_level
|
||||
85
src/test/subscription/testcase/change_wal_level.sh
Executable file
85
src/test/subscription/testcase/change_wal_level.sh
Executable file
@ -0,0 +1,85 @@
|
||||
#!/bin/sh
|
||||
|
||||
source $1/env_utils.sh $1 $2
|
||||
|
||||
case_db="changel_wal_level"
|
||||
|
||||
function test_1() {
|
||||
echo "create database and tables."
|
||||
exec_sql $db $pub_node1_port "CREATE DATABASE $case_db"
|
||||
exec_sql $db $sub_node1_port "CREATE DATABASE $case_db"
|
||||
# Create some preexisting content on publisher
|
||||
exec_sql $case_db $pub_node1_port "CREATE TABLE tab_rep (a int primary key)"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep SELECT generate_series(1,10)"
|
||||
|
||||
# Setup structure on subscriber
|
||||
exec_sql $case_db $sub_node1_port "CREATE TABLE tab_rep (a int primary key)"
|
||||
|
||||
# Setup logical replication
|
||||
echo "create publication and subscription."
|
||||
publisher_connstr="port=$pub_node1_port host=$g_local_ip dbname=$case_db user=$username password=$passwd"
|
||||
exec_sql $case_db $pub_node1_port "CREATE PUBLICATION tap_pub FOR ALL TABLES"
|
||||
exec_sql $case_db $sub_node1_port "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
|
||||
|
||||
# Wait for initial table sync to finish
|
||||
wait_for_subscription_sync $case_db $sub_node1_port
|
||||
|
||||
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep"
|
||||
wait_for_catchup $case_db $pub_node1_port "tap_sub"
|
||||
|
||||
# change wal_level to non-logical
|
||||
restart_guc "pub_datanode1" "wal_level = hot_standby"
|
||||
|
||||
# Do IUD operatition, this operation will not record logical xlog, cause wal_level is not logical
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(1)"
|
||||
exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = 2 where a = 1"
|
||||
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(2)"
|
||||
|
||||
exec_sql $case_db $pub_node1_port "select pg_sleep(5)"
|
||||
|
||||
# change wal_level to logical
|
||||
restart_guc "pub_datanode1" "wal_level = logical"
|
||||
wait_for_catchup $case_db $pub_node1_port "tap_sub"
|
||||
|
||||
# no data in subscription side
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "0" ]; then
|
||||
echo "check data synced for first sub success"
|
||||
else
|
||||
echo "$failed_keyword when check data synced for first sub"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# one row in publication side
|
||||
if [ "$(exec_sql $case_db $pub_node1_port "SELECT count(*) FROM tab_rep")" = "1" ]; then
|
||||
echo "check data in pub success"
|
||||
else
|
||||
echo "$failed_keyword when check data in pub"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(10)"
|
||||
exec_sql $case_db $pub_node1_port "INSERT INTO tab_rep values(11)"
|
||||
exec_sql $case_db $pub_node1_port "UPDATE tab_rep SET a = 20 where a = 10"
|
||||
exec_sql $case_db $pub_node1_port "DELETE FROM tab_rep where a = 20"
|
||||
|
||||
if [ "$(exec_sql $case_db $sub_node1_port "SELECT count(*) FROM tab_rep")" = "1" ]; then
|
||||
echo "check data synced for first sub success"
|
||||
else
|
||||
echo "$failed_keyword when check data synced for first sub"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
function tear_down() {
|
||||
exec_sql $case_db $sub_node1_port "DROP SUBSCRIPTION IF EXISTS tap_sub"
|
||||
exec_sql $case_db $pub_node1_port "DROP PUBLICATION IF EXISTS tap_pub"
|
||||
|
||||
exec_sql $db $sub_node1_port "DROP DATABASE $case_db"
|
||||
exec_sql $db $pub_node1_port "DROP DATABASE $case_db"
|
||||
|
||||
echo "tear down"
|
||||
}
|
||||
|
||||
test_1
|
||||
tear_down
|
||||
Reference in New Issue
Block a user