From 75873b55dddc9b27ab5888e3004a2250dd9ac7bd Mon Sep 17 00:00:00 2001 From: liuzheng Date: Tue, 16 Apr 2024 17:46:34 +0800 Subject: [PATCH] =?UTF-8?q?DDL=E7=9A=84=E9=80=BB=E8=BE=91=E5=A4=8D?= =?UTF-8?q?=E5=88=B6=E9=80=82=E9=85=8Dmppdb=5Fdecoding?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- contrib/mppdb_decoding/mppdb_decoding.cpp | 65 +++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/contrib/mppdb_decoding/mppdb_decoding.cpp b/contrib/mppdb_decoding/mppdb_decoding.cpp index 621a8cfc5..bb685c909 100644 --- a/contrib/mppdb_decoding/mppdb_decoding.cpp +++ b/contrib/mppdb_decoding/mppdb_decoding.cpp @@ -37,6 +37,7 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#include "tcop/ddldeparse.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -66,6 +67,11 @@ static void pg_decode_prepare_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* static void pg_decode_change( LogicalDecodingContext* ctx, ReorderBufferTXN* txn, Relation rel, ReorderBufferChange* change); static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id); +static void pg_decode_ddl(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + const char *prefix, Oid relid, + DeparsedCommandType cmdtype, + Size sz, const char *message); void _PG_init(void) { @@ -85,6 +91,7 @@ void _PG_output_plugin_init(OutputPluginCallbacks* cb) cb->prepare_cb = pg_decode_prepare_txn; cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; + cb->ddl_cb = pg_decode_ddl; } /* initialize this plugin */ @@ -424,3 +431,61 @@ static void pg_decode_change( MemoryContextReset(data->context); OutputPluginWrite(ctx, true); } + +static char *mppdb_deparse_command_type(DeparsedCommandType cmdtype) +{ + switch (cmdtype) { + case DCT_SimpleCmd: + return "Simple"; + case DCT_TableDropStart: + return "Drop table"; + case DCT_TableDropEnd: + return "Drop Table End"; + default: + Assert(false); + } + return NULL; +} + +static void pg_decode_ddl(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr message_lsn, + const char *prefix, Oid relid, + DeparsedCommandType cmdtype, + Size sz, const char *message) +{ + PluginTestDecodingData *data = NULL; + MemoryContext old; + + data = (PluginTestDecodingData *)ctx->output_plugin_private; + + /* Output BEGIN if we haven't yet */ + if (data->skip_empty_xacts && !data->xact_wrote_changes) { + pg_output_begin(ctx, data, txn, false); + } + data->xact_wrote_changes = true; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "prefix: %s, lsn: %llu, relid %u, cmdtype: %s, size: %lu, content: %s", + prefix, + message_lsn, + relid, + mppdb_deparse_command_type(cmdtype), + sz, + message); + + if (cmdtype != DCT_TableDropStart) { + char *tmp = pstrdup(message); + char *owner = NULL; + char *decodestring = deparse_ddl_json_to_string(tmp, &owner); + appendStringInfo(ctx->out, "\ndecode to: %s, [owner %s]", decodestring, owner ? owner : "none"); + pfree(tmp); + } + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + OutputPluginWrite(ctx, true); +} \ No newline at end of file