diff --git a/contrib/mppdb_decoding/mppdb_decoding.cpp b/contrib/mppdb_decoding/mppdb_decoding.cpp index 4953f24e5..60f22c740 100644 --- a/contrib/mppdb_decoding/mppdb_decoding.cpp +++ b/contrib/mppdb_decoding/mppdb_decoding.cpp @@ -63,7 +63,8 @@ static void pg_output_begin( static void pg_decode_commit_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn, XLogRecPtr commit_lsn); static void pg_decode_abort_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn); static void pg_decode_prepare_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn); - +static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change); static void pg_decode_change( LogicalDecodingContext* ctx, ReorderBufferTXN* txn, Relation rel, ReorderBufferChange* change); static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id); @@ -86,6 +87,7 @@ void _PG_output_plugin_init(OutputPluginCallbacks* cb) cb->startup_cb = pg_decode_startup; cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; + cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->abort_cb = pg_decode_abort_txn; cb->prepare_cb = pg_decode_prepare_txn; @@ -312,6 +314,54 @@ static void TupleToJsoninfo(Relation relation, cJSON* cols_name, cJSON* cols_typ } } +static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + PluginTestDecodingData *data; + MemoryContext old; + int i; + + data = (PluginTestDecodingData*)ctx->output_plugin_private; + + /* output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) { + pg_output_begin(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfoString(ctx->out, "table "); + + for (i = 0; i < nrelations; i++) { + if (i > 0) + appendStringInfoString(ctx->out, ", "); + + appendStringInfoString(ctx->out, + quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace), + NameStr(relations[i]->rd_rel->relname))); + } + + appendStringInfoString(ctx->out, ": TRUNCATE:"); + + if (change->data.truncate.restart_seqs + || change->data.truncate.cascade) { + if (change->data.truncate.restart_seqs) + appendStringInfo(ctx->out, " restart_seqs"); + if (change->data.truncate.cascade) + appendStringInfo(ctx->out, " cascade"); + } else + appendStringInfoString(ctx->out, " (no-flags)"); + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + OutputPluginWrite(ctx, true); +} + /* * callback for individual changed tuples */ diff --git a/src/test/regress/output/ddl.source b/src/test/regress/output/ddl.source index 54758d6bb..e362dc502 100644 --- a/src/test/regress/output/ddl.source +++ b/src/test/regress/output/ddl.source @@ -361,3 +361,32 @@ drop table tr_sub; drop table table_without_key; drop table bmsql_order_line; drop sequence toasttable_rand_seq; +CREATE publication pub1 FOR ALL TABLES with (ddl = 'all'); +select 'init' from pg_create_logical_replication_slot('slot1', 'mppdb_decoding'); + ?column? +---------- + init +(1 row) + +create table test_truncate(id int); +truncate table test_truncate; +SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +--?.* +--?.* + BEGIN +--?.* +--?.* +--?.* + BEGIN + table public.test_truncate: TRUNCATE: (no-flags) +--?.* +(6 rows) + +SELECT pg_drop_replication_slot('slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +drop publication pub1; +--?.* diff --git a/src/test/regress/sql/ddl.sql b/src/test/regress/sql/ddl.sql index ea24afa42..bc18b27c6 100644 --- a/src/test/regress/sql/ddl.sql +++ b/src/test/regress/sql/ddl.sql @@ -219,3 +219,12 @@ drop table tr_sub; drop table table_without_key; drop table bmsql_order_line; drop sequence toasttable_rand_seq; + +CREATE publication pub1 FOR ALL TABLES with (ddl = 'all'); +select 'init' from pg_create_logical_replication_slot('slot1', 'mppdb_decoding'); +create table test_truncate(id int); +truncate table test_truncate; +SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT pg_drop_replication_slot('slot1'); +drop publication pub1; +drop table test_truncate; \ No newline at end of file