sync code

This commit is contained in:
LiHeng
2022-03-04 23:22:16 +08:00
parent d26ec83e7b
commit de223dd152
2618 changed files with 382415 additions and 163216 deletions

View File

@ -36,21 +36,15 @@ PG_MODULE_MAGIC;
extern "C" void _PG_init(void);
extern "C" void _PG_output_plugin_init(OutputPluginCallbacks* cb);
typedef struct {
MemoryContext context;
bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext* ctx, OutputPluginOptions* opt, bool is_init);
static void pg_decode_shutdown(LogicalDecodingContext* ctx);
static void pg_decode_begin_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn);
static void pg_output_begin(
LogicalDecodingContext* ctx, TestDecodingData* data, ReorderBufferTXN* txn, bool last_write);
LogicalDecodingContext* ctx, PluginTestDecodingData* data, ReorderBufferTXN* txn, bool last_write);
static void pg_decode_commit_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn, XLogRecPtr commit_lsn);
static void pg_decode_abort_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn);
static void pg_decode_prepare_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn);
static void pg_decode_change(
LogicalDecodingContext* ctx, ReorderBufferTXN* txn, Relation rel, ReorderBufferChange* change);
static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id);
@ -69,6 +63,8 @@ void _PG_output_plugin_init(OutputPluginCallbacks* cb)
cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change;
cb->commit_cb = pg_decode_commit_txn;
cb->abort_cb = pg_decode_abort_txn;
cb->prepare_cb = pg_decode_prepare_txn;
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
}
@ -77,84 +73,33 @@ void _PG_output_plugin_init(OutputPluginCallbacks* cb)
static void pg_decode_startup(LogicalDecodingContext* ctx, OutputPluginOptions* opt, bool is_init)
{
ListCell* option = NULL;
TestDecodingData* data = NULL;
PluginTestDecodingData* data = NULL;
data = (TestDecodingData*)palloc0(sizeof(TestDecodingData));
data = (PluginTestDecodingData*)palloc0(sizeof(PluginTestDecodingData));
data->context = AllocSetContextCreate(ctx->context,
"text conversion context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
data->include_xids = true;
data->include_timestamp = false;
data->include_timestamp = true;
data->skip_empty_xacts = false;
data->only_local = true;
data->tableWhiteList = NIL;
ctx->output_plugin_private = data;
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
foreach (option, ctx->output_plugin_options) {
DefElem* elem = (DefElem*)lfirst(option);
Assert(elem->arg == NULL || IsA(elem->arg, String));
if (strcmp(elem->defname, "include-xids") == 0) {
/* if option does not provide a value, it means its value is true */
if (elem->arg == NULL)
data->include_xids = true;
else if (!parse_bool(strVal(elem->arg), &data->include_xids))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname)));
} else if (strcmp(elem->defname, "include-timestamp") == 0) {
if (elem->arg == NULL)
data->include_timestamp = true;
else if (!parse_bool(strVal(elem->arg), &data->include_timestamp))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname)));
} else if (strcmp(elem->defname, "force-binary") == 0) {
bool force_binary = false;
if (elem->arg == NULL)
continue;
else if (!parse_bool(strVal(elem->arg), &force_binary))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname)));
if (force_binary)
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
} else if (strcmp(elem->defname, "skip-empty-xacts") == 0) {
if (elem->arg == NULL)
data->skip_empty_xacts = true;
else if (!parse_bool(strVal(elem->arg), &data->skip_empty_xacts))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname)));
} else if (strcmp(elem->defname, "only-local") == 0) {
if (elem->arg == NULL)
data->only_local = true;
else if (!parse_bool(strVal(elem->arg), &data->only_local))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname)));
} else {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg(
"option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)")));
}
ParseDecodingOptionPlugin(option, data, opt);
}
}
/* cleanup this plugin's resources */
static void pg_decode_shutdown(LogicalDecodingContext* ctx)
{
TestDecodingData* data = (TestDecodingData*)ctx->output_plugin_private;
PluginTestDecodingData* data = (PluginTestDecodingData*)ctx->output_plugin_private;
/* cleanup our own resources via memory context reset */
MemoryContextDelete(data->context);
@ -163,16 +108,18 @@ static void pg_decode_shutdown(LogicalDecodingContext* ctx)
/* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn)
{
TestDecodingData* data = (TestDecodingData*)ctx->output_plugin_private;
PluginTestDecodingData* data = (PluginTestDecodingData*)ctx->output_plugin_private;
data->xact_wrote_changes = false;
if (data->skip_empty_xacts)
if (data->skip_empty_xacts) {
return;
}
pg_output_begin(ctx, data, txn, true);
}
static void pg_output_begin(LogicalDecodingContext* ctx, TestDecodingData* data, ReorderBufferTXN* txn, bool last_write)
static void pg_output_begin(LogicalDecodingContext* ctx, PluginTestDecodingData* data, ReorderBufferTXN* txn,
bool last_write)
{
OutputPluginPrepareWrite(ctx, last_write);
if (data->include_xids)
@ -185,7 +132,7 @@ static void pg_output_begin(LogicalDecodingContext* ctx, TestDecodingData* data,
/* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn, XLogRecPtr commit_lsn)
{
TestDecodingData* data = (TestDecodingData*)ctx->output_plugin_private;
PluginTestDecodingData* data = (PluginTestDecodingData*)ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
@ -203,65 +150,55 @@ static void pg_decode_commit_txn(LogicalDecodingContext* ctx, ReorderBufferTXN*
OutputPluginWrite(ctx, true);
}
/* ABORT callback */
static void pg_decode_abort_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn)
{
PluginTestDecodingData* data = (PluginTestDecodingData*)ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "ABORT %lu", txn->xid);
else
appendStringInfoString(ctx->out, "ABORT");
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)", timestamptz_to_str(txn->commit_time));
OutputPluginWrite(ctx, true);
}
/* PREPARE callback */
static void pg_decode_prepare_txn(LogicalDecodingContext* ctx, ReorderBufferTXN* txn)
{
PluginTestDecodingData* data = (PluginTestDecodingData*)ctx->output_plugin_private;
if (data->skip_empty_xacts && !data->xact_wrote_changes)
return;
OutputPluginPrepareWrite(ctx, true);
if (data->include_xids)
appendStringInfo(ctx->out, "PREPARE %lu", txn->xid);
else
appendStringInfoString(ctx->out, "PREPARE");
if (data->include_timestamp)
appendStringInfo(ctx->out, " (at %s)", timestamptz_to_str(txn->commit_time));
OutputPluginWrite(ctx, true);
}
static bool pg_decode_filter(LogicalDecodingContext* ctx, RepOriginId origin_id)
{
TestDecodingData* data = (TestDecodingData*)ctx->output_plugin_private;
PluginTestDecodingData* data = (PluginTestDecodingData*)ctx->output_plugin_private;
if (data->only_local && origin_id != InvalidRepOriginId)
return true;
return false;
}
/*
* Print literal `outputstr' already represented as string of type `typid'
* into stringbuf `s'.
*
* Some builtin types aren't quoted, the rest is quoted. Escaping is done as
* if u_sess->parser_cxt.standard_conforming_strings were enabled.
*/
static void print_literal(StringInfo s, Oid typid, char* outputstr)
{
const char* valptr = NULL;
switch (typid) {
case INT1OID:
case INT2OID:
case INT4OID:
case INT8OID:
case OIDOID:
case FLOAT4OID:
case FLOAT8OID:
case NUMERICOID:
/* NB: We don't care about Inf, NaN et al. */
appendStringInfoString(s, outputstr);
break;
case BITOID:
case VARBITOID:
appendStringInfo(s, "B'%s'", outputstr);
break;
case BOOLOID:
if (strcmp(outputstr, "t") == 0)
appendStringInfoString(s, "true");
else
appendStringInfoString(s, "false");
break;
default:
appendStringInfoChar(s, '\'');
for (valptr = outputstr; *valptr; valptr++) {
char ch = *valptr;
if (SQL_STR_DOUBLE(ch, false))
appendStringInfoChar(s, ch);
appendStringInfoChar(s, ch);
}
appendStringInfoChar(s, '\'');
break;
}
}
static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool skip_nulls)
{
if (HEAP_TUPLE_IS_COMPRESSED(tuple->t_data))
@ -337,11 +274,11 @@ static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple
else if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK_B(origval))
appendStringInfoString(s, "unchanged-toast-datum");
else if (!typisvarlena)
print_literal(s, typid, OidOutputFunctionCall(typoutput, origval));
PrintLiteral(s, typid, OidOutputFunctionCall(typoutput, origval));
else {
Datum val; /* definitely detoasted Datum */
val = PointerGetDatum(PG_DETOAST_DATUM(origval));
print_literal(s, typid, OidOutputFunctionCall(typoutput, val));
PrintLiteral(s, typid, OidOutputFunctionCall(typoutput, val));
}
}
}
@ -351,13 +288,12 @@ static void tuple_to_stringinfo(StringInfo s, TupleDesc tupdesc, HeapTuple tuple
static void pg_decode_change(
LogicalDecodingContext* ctx, ReorderBufferTXN* txn, Relation relation, ReorderBufferChange* change)
{
TestDecodingData* data = NULL;
PluginTestDecodingData* data = NULL;
Form_pg_class class_form;
TupleDesc tupdesc;
MemoryContext old;
data = (TestDecodingData*)ctx->output_plugin_private;
u_sess->attr.attr_common.extra_float_digits = 0;
data = (PluginTestDecodingData*)ctx->output_plugin_private;
/* output BEGIN if we haven't yet */
if (data->skip_empty_xacts && !data->xact_wrote_changes) {
@ -371,12 +307,18 @@ static void pg_decode_change(
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
char *schema = get_namespace_name(class_form->relnamespace);
char *table = NameStr(class_form->relname);
if (data->tableWhiteList != NIL && !CheckWhiteList(data->tableWhiteList, schema, table)) {
(void)MemoryContextSwitchTo(old);
MemoryContextReset(data->context);
return;
}
OutputPluginPrepareWrite(ctx, true);
appendStringInfoString(ctx->out, "table ");
appendStringInfoString(ctx->out,
quote_qualified_identifier(
get_namespace_name(get_rel_namespace(RelationGetRelid(relation))), NameStr(class_form->relname)));
appendStringInfoString(ctx->out, quote_qualified_identifier(schema, table));
appendStringInfoString(ctx->out, ":");
switch (change->action) {