mppdb_decoding逻辑复制槽解码添加truncate table事务信息
This commit is contained in:
@ -63,7 +63,8 @@ static void pg_output_begin(
|
||||
static void pg_decode_commit_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn, XLogRecPtr commit_lsn);
|
||||
static void pg_decode_abort_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn);
|
||||
static void pg_decode_prepare_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn);
|
||||
|
||||
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
int nrelations, Relation relations[], ReorderBufferChange *change);
|
||||
static void pg_decode_change(
|
||||
LogicalDecodingContext* ctx, ReorderBufferTXN* txn, Relation rel, ReorderBufferChange* change);
|
||||
static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id);
|
||||
@ -86,6 +87,7 @@ void _PG_output_plugin_init(OutputPluginCallbacks* cb)
|
||||
cb->startup_cb = pg_decode_startup;
|
||||
cb->begin_cb = pg_decode_begin_txn;
|
||||
cb->change_cb = pg_decode_change;
|
||||
cb->truncate_cb = pg_decode_truncate;
|
||||
cb->commit_cb = pg_decode_commit_txn;
|
||||
cb->abort_cb = pg_decode_abort_txn;
|
||||
cb->prepare_cb = pg_decode_prepare_txn;
|
||||
@ -312,6 +314,54 @@ static void TupleToJsoninfo(Relation relation, cJSON* cols_name, cJSON* cols_typ
|
||||
}
|
||||
}
|
||||
|
||||
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||
int nrelations, Relation relations[], ReorderBufferChange *change)
|
||||
{
|
||||
PluginTestDecodingData *data;
|
||||
MemoryContext old;
|
||||
int i;
|
||||
|
||||
data = (PluginTestDecodingData*)ctx->output_plugin_private;
|
||||
|
||||
/* output BEGIN if we haven't yet */
|
||||
if (data->skip_empty_xacts && !data->xact_wrote_changes) {
|
||||
pg_output_begin(ctx, data, txn, false);
|
||||
}
|
||||
data->xact_wrote_changes = true;
|
||||
|
||||
/* Avoid leaking memory by using and resetting our own context */
|
||||
old = MemoryContextSwitchTo(data->context);
|
||||
|
||||
OutputPluginPrepareWrite(ctx, true);
|
||||
|
||||
appendStringInfoString(ctx->out, "table ");
|
||||
|
||||
for (i = 0; i < nrelations; i++) {
|
||||
if (i > 0)
|
||||
appendStringInfoString(ctx->out, ", ");
|
||||
|
||||
appendStringInfoString(ctx->out,
|
||||
quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
|
||||
NameStr(relations[i]->rd_rel->relname)));
|
||||
}
|
||||
|
||||
appendStringInfoString(ctx->out, ": TRUNCATE:");
|
||||
|
||||
if (change->data.truncate.restart_seqs
|
||||
|| change->data.truncate.cascade) {
|
||||
if (change->data.truncate.restart_seqs)
|
||||
appendStringInfo(ctx->out, " restart_seqs");
|
||||
if (change->data.truncate.cascade)
|
||||
appendStringInfo(ctx->out, " cascade");
|
||||
} else
|
||||
appendStringInfoString(ctx->out, " (no-flags)");
|
||||
|
||||
MemoryContextSwitchTo(old);
|
||||
MemoryContextReset(data->context);
|
||||
|
||||
OutputPluginWrite(ctx, true);
|
||||
}
|
||||
|
||||
/*
|
||||
* callback for individual changed tuples
|
||||
*/
|
||||
|
||||
@ -361,3 +361,32 @@ drop table tr_sub;
|
||||
drop table table_without_key;
|
||||
drop table bmsql_order_line;
|
||||
drop sequence toasttable_rand_seq;
|
||||
CREATE publication pub1 FOR ALL TABLES with (ddl = 'all');
|
||||
select 'init' from pg_create_logical_replication_slot('slot1', 'mppdb_decoding');
|
||||
?column?
|
||||
----------
|
||||
init
|
||||
(1 row)
|
||||
|
||||
create table test_truncate(id int);
|
||||
truncate table test_truncate;
|
||||
SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||
--?.*
|
||||
--?.*
|
||||
BEGIN
|
||||
--?.*
|
||||
--?.*
|
||||
--?.*
|
||||
BEGIN
|
||||
table public.test_truncate: TRUNCATE: (no-flags)
|
||||
--?.*
|
||||
(6 rows)
|
||||
|
||||
SELECT pg_drop_replication_slot('slot1');
|
||||
pg_drop_replication_slot
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
drop publication pub1;
|
||||
--?.*
|
||||
|
||||
@ -219,3 +219,12 @@ drop table tr_sub;
|
||||
drop table table_without_key;
|
||||
drop table bmsql_order_line;
|
||||
drop sequence toasttable_rand_seq;
|
||||
|
||||
CREATE publication pub1 FOR ALL TABLES with (ddl = 'all');
|
||||
select 'init' from pg_create_logical_replication_slot('slot1', 'mppdb_decoding');
|
||||
create table test_truncate(id int);
|
||||
truncate table test_truncate;
|
||||
SELECT data FROM pg_logical_slot_get_changes('slot1', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
|
||||
SELECT pg_drop_replication_slot('slot1');
|
||||
drop publication pub1;
|
||||
drop table test_truncate;
|
||||
Reference in New Issue
Block a user