add include-originid option for logical decode
This commit is contained in:
@ -104,6 +104,7 @@ static void pg_decode_startup(LogicalDecodingContext* ctx, OutputPluginOptions*
|
||||
data->skip_empty_xacts = false;
|
||||
data->only_local = true;
|
||||
data->tableWhiteList = NIL;
|
||||
data->include_originid = false;
|
||||
|
||||
/* read default option from GUC */
|
||||
DecodeOptionsDefault *defaultOption = LogicalDecodeGetOptionsDefault();
|
||||
@ -150,6 +151,8 @@ static void pg_output_begin(LogicalDecodingContext* ctx, PluginTestDecodingData*
|
||||
appendStringInfo(ctx->out, "BEGIN %lu", txn->xid);
|
||||
else
|
||||
appendStringInfoString(ctx->out, "BEGIN");
|
||||
if (data->include_originid)
|
||||
appendStringInfo(ctx->out, " ORIGIN_ID %d", txn->origin_id);
|
||||
OutputPluginWrite(ctx, last_write);
|
||||
}
|
||||
|
||||
|
||||
@ -398,6 +398,12 @@ static void BeginToText(const char* stream, uint32 *curPos, PQExpBuffer res)
|
||||
appendBinaryPQExpBuffer(res, &stream[*curPos], timeLen);
|
||||
*curPos += timeLen;
|
||||
}
|
||||
if (stream[*curPos] == 'O') {
|
||||
*curPos += 1;
|
||||
uint32 originid = ntohl(*(uint32 *)(&stream[*curPos]));
|
||||
*curPos += sizeof(uint32);
|
||||
appendPQExpBuffer(res, " origin_id: %d", originid);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@ -1544,6 +1544,8 @@ void ParseDecodingOptionPlugin(ListCell* option, PluginTestDecodingData* data, O
|
||||
SetConfigOption("logical_sender_timeout", strVal(elem->arg), PGC_USERSET, PGC_S_OVERRIDE);
|
||||
} else if (strncmp(elem->defname, "max-reorderbuffer-in-memory", sizeof("max-reorderbuffer-in-memory")) == 0) {
|
||||
CheckIntOption(elem, &data->max_reorderbuffer_in_memory, 0, 0, maxReorderBuffer);
|
||||
} else if (strncmp(elem->defname, "include-originid", sizeof("include-originid")) == 0) {
|
||||
CheckBooleanOption(elem, &data->include_originid, true);
|
||||
} else if (strncmp(elem->defname, "parallel-decode-num", sizeof("parallel-decode-num")) != 0) {
|
||||
ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"),
|
||||
|
||||
@ -324,6 +324,7 @@ void ParseCommitXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf,
|
||||
change->endLsn = buf->endptr;
|
||||
change->nsubxacts = nsubxacts;
|
||||
change->commitTime = commit_time;
|
||||
change->origin_id = XLogRecGetOrigin(buf->record);
|
||||
|
||||
if (nsubxacts > 0) {
|
||||
MemoryContext oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.pdecode_cxt[slotId].parallelDecodeCtx);
|
||||
|
||||
@ -841,6 +841,7 @@ static logicalLog* ParallelDecodeCommitOrAbort(ParallelReorderBufferChange* chan
|
||||
logChange->endLsn = change->endLsn;
|
||||
logChange->nsubxacts = change->nsubxacts;
|
||||
logChange->commitTime = change->commitTime;
|
||||
logChange->origin_id = change->origin_id;
|
||||
int slotId = worker->slotId;
|
||||
Size subXidSize = sizeof(TransactionId) * change->nsubxacts;
|
||||
if (subXidSize > 0 && change->subXids != NULL) {
|
||||
|
||||
@ -943,6 +943,8 @@ static void ParseDecodingOption(ParallelDecodeOption *data, ListCell *option)
|
||||
}
|
||||
} else if (strncmp(elem->defname, "sender-timeout", sizeof("sender-timeout")) == 0 && elem->arg != NULL) {
|
||||
SetConfigOption("logical_sender_timeout", strVal(elem->arg), PGC_USERSET, PGC_S_OVERRIDE);
|
||||
} else if (strncmp(elem->defname, "include-originid", sizeof("include-originid")) == 0) {
|
||||
CheckBooleanOption(elem, &data->include_originid, false);
|
||||
} else if (strncmp(elem->defname, "parallel-decode-num", sizeof("parallel-decode-num")) != 0) {
|
||||
ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"),
|
||||
@ -972,6 +974,7 @@ static void initParallelDecodeOption(ParallelDecodeOption *pOptions, int paralle
|
||||
pOptions->sending_batch = 0;
|
||||
pOptions->decode_change = parallel_decode_change_to_bin;
|
||||
pOptions->parallel_queue_size = DEFAULT_PARALLEL_QUEUE_SIZE;
|
||||
pOptions->include_originid = false;
|
||||
|
||||
/* GUC */
|
||||
DecodeOptionsDefault *defaultOption = LogicalDecodeGetOptionsDefault();
|
||||
|
||||
@ -1496,8 +1496,8 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco
|
||||
{
|
||||
if (pdata->pOptions.decode_style == 'b') {
|
||||
int curPos = out->len;
|
||||
const uint32 beginBaseLen = 25; /* this length does not include the seperator 'P' */
|
||||
pq_sendint32(out, beginBaseLen);
|
||||
uint32 beginLen = 25; /* this length does not include the seperator 'P' */
|
||||
pq_sendint32(out, beginLen);
|
||||
pq_sendint64(out, txn->first_lsn);
|
||||
appendStringInfoChar(out, 'B');
|
||||
pq_sendint64(out, change->csn);
|
||||
@ -1507,10 +1507,17 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco
|
||||
const char *timeStamp = timestamptz_to_str(txn->commit_time);
|
||||
pq_sendint32(out, (uint32)(strlen(timeStamp)));
|
||||
appendStringInfoString(out, timeStamp);
|
||||
uint32 beginLen = htonl(beginBaseLen + 1 + sizeof(uint32) + strlen(timeStamp));
|
||||
errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32));
|
||||
securec_check(rc, "", "");
|
||||
beginLen += 1 + sizeof(uint32) + strlen(timeStamp);
|
||||
}
|
||||
if (pdata->pOptions.include_originid) {
|
||||
appendStringInfoChar(out, 'O');
|
||||
pq_sendint32(out, txn->origin_id);
|
||||
beginLen += 1 + sizeof(uint32);
|
||||
}
|
||||
|
||||
beginLen = htonl(beginLen);
|
||||
errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32));
|
||||
securec_check(rc, "", "");
|
||||
} else {
|
||||
int curPos = out->len;
|
||||
uint32 beginLen = 0;
|
||||
@ -1525,6 +1532,9 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco
|
||||
const char *timeStamp = timestamptz_to_str(txn->commit_time);
|
||||
appendStringInfo(out, " commit_time: %s", timeStamp);
|
||||
}
|
||||
if (pdata->pOptions.include_originid) {
|
||||
appendStringInfo(out, " origin_id: %d", txn->origin_id);
|
||||
}
|
||||
if (batchSending) {
|
||||
beginLen = htonl((uint32)(out->len - curPos) - (uint32)sizeof(uint32));
|
||||
errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32));
|
||||
|
||||
@ -4822,6 +4822,7 @@ static void LogicalLogHandleAbortOrCommit(ParallelReorderBuffer *prb, logicalLog
|
||||
txn->final_lsn = logChange->finalLsn;
|
||||
txn->nsubtxns = logChange->nsubxacts;
|
||||
txn->commit_time = logChange->commitTime;
|
||||
txn->origin_id = logChange->origin_id;
|
||||
|
||||
bool isForget = false;
|
||||
if (XLByteLT(txn->final_lsn, g_Logicaldispatcher[slotId].startpoint) ||
|
||||
|
||||
@ -134,6 +134,7 @@ typedef struct {
|
||||
ParallelDecodeChangeCB decode_change;
|
||||
List *tableWhiteList;
|
||||
int parallel_queue_size;
|
||||
bool include_originid;
|
||||
} ParallelDecodeOption;
|
||||
|
||||
typedef struct {
|
||||
@ -151,6 +152,7 @@ typedef struct {
|
||||
int max_txn_in_memory;
|
||||
int max_reorderbuffer_in_memory;
|
||||
List *tableWhiteList;
|
||||
bool include_originid;
|
||||
} PluginTestDecodingData;
|
||||
|
||||
typedef struct ParallelLogicalDecodingContext {
|
||||
|
||||
@ -53,6 +53,7 @@ typedef struct logicalLog {
|
||||
int nsubxacts;
|
||||
TransactionId *subXids;
|
||||
TimestampTz commitTime;
|
||||
RepOriginId origin_id;
|
||||
HTAB* toast_hash;
|
||||
ParallelReorderBufferTXN *txn;
|
||||
logicalLog *freeNext;
|
||||
@ -148,6 +149,7 @@ typedef struct ParallelReorderBufferChange {
|
||||
TransactionId * subXids;
|
||||
dlist_node node;
|
||||
TimestampTz commitTime;
|
||||
RepOriginId origin_id;
|
||||
} ParallelReorderBufferChange;
|
||||
|
||||
typedef struct ParallelReorderBufferTXN {
|
||||
|
||||
Reference in New Issue
Block a user