DDL的逻辑复制适配mppdb_decoding

This commit is contained in:
liuzheng
2024-04-16 17:46:34 +08:00
committed by yaoxin
parent c5e8eff907
commit 75873b55dd

View File

@ -37,6 +37,7 @@
#include "replication/output_plugin.h"
#include "replication/logical.h"
#include "tcop/ddldeparse.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@ -66,6 +67,11 @@ static void pg_decode_prepare_txn(LogicalDecodingContext* ctx, ReorderBufferTXN*
static void pg_decode_change(
LogicalDecodingContext* ctx, ReorderBufferTXN* txn, Relation rel, ReorderBufferChange* change);
static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id);
static void pg_decode_ddl(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
const char *prefix, Oid relid,
DeparsedCommandType cmdtype,
Size sz, const char *message);
void _PG_init(void)
{
@ -85,6 +91,7 @@ void _PG_output_plugin_init(OutputPluginCallbacks* cb)
cb->prepare_cb = pg_decode_prepare_txn;
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->ddl_cb = pg_decode_ddl;
}
/* initialize this plugin */
@ -424,3 +431,61 @@ static void pg_decode_change(
MemoryContextReset(data->context);
OutputPluginWrite(ctx, true);
}
static char *mppdb_deparse_command_type(DeparsedCommandType cmdtype)
{
switch (cmdtype) {
case DCT_SimpleCmd:
return "Simple";
case DCT_TableDropStart:
return "Drop table";
case DCT_TableDropEnd:
return "Drop Table End";
default:
Assert(false);
}
return NULL;
}
static void pg_decode_ddl(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
const char *prefix, Oid relid,
DeparsedCommandType cmdtype,
Size sz, const char *message)
{
PluginTestDecodingData *data = NULL;
MemoryContext old;
data = (PluginTestDecodingData *)ctx->output_plugin_private;
/* Output BEGIN if we haven't yet */
if (data->skip_empty_xacts && !data->xact_wrote_changes) {
pg_output_begin(ctx, data, txn, false);
}
data->xact_wrote_changes = true;
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "prefix: %s, lsn: %llu, relid %u, cmdtype: %s, size: %lu, content: %s",
prefix,
message_lsn,
relid,
mppdb_deparse_command_type(cmdtype),
sz,
message);
if (cmdtype != DCT_TableDropStart) {
char *tmp = pstrdup(message);
char *owner = NULL;
char *decodestring = deparse_ddl_json_to_string(tmp, &owner);
appendStringInfo(ctx->out, "\ndecode to: %s, [owner %s]", decodestring, owner ? owner : "none");
pfree(tmp);
}
MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
OutputPluginWrite(ctx, true);
}