逻辑复制支持Alter table的DDL语句

This commit is contained in:
Hemny
2024-08-05 15:49:59 +08:00
parent 1367a8985d
commit 53b349d849
120 changed files with 25361 additions and 648 deletions

View File

@ -49,6 +49,10 @@ static void pg_decode_prepare_txn(LogicalDecodingContext* ctx, ReorderBufferTXN*
static void pg_decode_change(
LogicalDecodingContext* ctx, ReorderBufferTXN* txn, Relation rel, ReorderBufferChange* change);
static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id);
static void pg_decode_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
ReorderBufferChange *change);
static void pg_decode_ddl(LogicalDecodingContext* ctx, ReorderBufferTXN* txn, XLogRecPtr message_lsn,
const char *prefix, Oid relid, DeparsedCommandType cmdtype, Size sz, const char *message);
@ -65,6 +69,7 @@ void _PG_output_plugin_init(OutputPluginCallbacks* cb)
cb->startup_cb = pg_decode_startup;
cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change;
cb->truncate_cb = pg_decode_truncate;
cb->commit_cb = pg_decode_commit_txn;
cb->abort_cb = pg_decode_abort_txn;
cb->prepare_cb = pg_decode_prepare_txn;
@ -312,15 +317,73 @@ static void pg_decode_change(
OutputPluginWrite(ctx, true);
}
static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
PluginTestDecodingData *data;
MemoryContext old;
int i;
data = (PluginTestDecodingData*)ctx->output_plugin_private;
/* output BEGIN if we haven't yet */
if (data->skip_empty_xacts && !data->xact_wrote_changes) {
pg_output_begin(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "table ");
for (i = 0; i < nrelations; i++) {
if (i > 0)
appendStringInfoString(ctx->out, ", ");
appendStringInfoString(ctx->out,
quote_qualified_identifier(get_namespace_name(relations[i]->rd_rel->relnamespace),
NameStr(relations[i]->rd_rel->relname)));
}
appendStringInfoString(ctx->out, ": TRUNCATE:");
if (change->data.truncate.restart_seqs
|| change->data.truncate.cascade) {
if (change->data.truncate.restart_seqs)
appendStringInfo(ctx->out, " restart_seqs");
if (change->data.truncate.cascade)
appendStringInfo(ctx->out, " cascade");
} else
appendStringInfoString(ctx->out, " (no-flags)");
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
OutputPluginWrite(ctx, true);
}
static char *deparse_command_type(DeparsedCommandType cmdtype)
{
switch (cmdtype) {
case DCT_SimpleCmd:
return "Simple";
case DCT_TableDropStart:
return "Drop table";
return "Drop Table";
case DCT_TableDropEnd:
return "Drop Table End";
case DCT_TableAlter:
return "Alter Table";
case DCT_ObjectCreate:
return "Create Object";
case DCT_ObjectDrop:
return "Drop Object";
case DCT_TypeDropStart:
return "Drop Type";
case DCT_TypeDropEnd:
return "Drop Type End";
default:
Assert(false);
}