add include-originid option for logical decode
This commit is contained in:
chenxiaobin19
2023-03-13 21:40:11 +08:00
committed by chenxiaobin
parent ee6e76c0cd
commit 7ec615deb2
10 changed files with 36 additions and 5 deletions

View File

@ -104,6 +104,7 @@ static void pg_decode_startup(LogicalDecodingContext* ctx, OutputPluginOptions*
data->skip_empty_xacts = false;
data->only_local = true;
data->tableWhiteList = NIL;
data->include_originid = false;
/* read default option from GUC */
DecodeOptionsDefault *defaultOption = LogicalDecodeGetOptionsDefault();
@ -150,6 +151,8 @@ static void pg_output_begin(LogicalDecodingContext* ctx, PluginTestDecodingData*
appendStringInfo(ctx->out, "BEGIN %lu", txn->xid);
else
appendStringInfoString(ctx->out, "BEGIN");
if (data->include_originid)
appendStringInfo(ctx->out, " ORIGIN_ID %d", txn->origin_id);
OutputPluginWrite(ctx, last_write);
}

View File

@ -398,6 +398,12 @@ static void BeginToText(const char* stream, uint32 *curPos, PQExpBuffer res)
appendBinaryPQExpBuffer(res, &stream[*curPos], timeLen);
*curPos += timeLen;
}
if (stream[*curPos] == 'O') {
*curPos += 1;
uint32 originid = ntohl(*(uint32 *)(&stream[*curPos]));
*curPos += sizeof(uint32);
appendPQExpBuffer(res, " origin_id: %d", originid);
}
}
/*

View File

@ -1544,6 +1544,8 @@ void ParseDecodingOptionPlugin(ListCell* option, PluginTestDecodingData* data, O
SetConfigOption("logical_sender_timeout", strVal(elem->arg), PGC_USERSET, PGC_S_OVERRIDE);
} else if (strncmp(elem->defname, "max-reorderbuffer-in-memory", sizeof("max-reorderbuffer-in-memory")) == 0) {
CheckIntOption(elem, &data->max_reorderbuffer_in_memory, 0, 0, maxReorderBuffer);
} else if (strncmp(elem->defname, "include-originid", sizeof("include-originid")) == 0) {
CheckBooleanOption(elem, &data->include_originid, true);
} else if (strncmp(elem->defname, "parallel-decode-num", sizeof("parallel-decode-num")) != 0) {
ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"),

View File

@ -324,6 +324,7 @@ void ParseCommitXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf,
change->endLsn = buf->endptr;
change->nsubxacts = nsubxacts;
change->commitTime = commit_time;
change->origin_id = XLogRecGetOrigin(buf->record);
if (nsubxacts > 0) {
MemoryContext oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.pdecode_cxt[slotId].parallelDecodeCtx);

View File

@ -841,6 +841,7 @@ static logicalLog* ParallelDecodeCommitOrAbort(ParallelReorderBufferChange* chan
logChange->endLsn = change->endLsn;
logChange->nsubxacts = change->nsubxacts;
logChange->commitTime = change->commitTime;
logChange->origin_id = change->origin_id;
int slotId = worker->slotId;
Size subXidSize = sizeof(TransactionId) * change->nsubxacts;
if (subXidSize > 0 && change->subXids != NULL) {

View File

@ -943,6 +943,8 @@ static void ParseDecodingOption(ParallelDecodeOption *data, ListCell *option)
}
} else if (strncmp(elem->defname, "sender-timeout", sizeof("sender-timeout")) == 0 && elem->arg != NULL) {
SetConfigOption("logical_sender_timeout", strVal(elem->arg), PGC_USERSET, PGC_S_OVERRIDE);
} else if (strncmp(elem->defname, "include-originid", sizeof("include-originid")) == 0) {
CheckBooleanOption(elem, &data->include_originid, false);
} else if (strncmp(elem->defname, "parallel-decode-num", sizeof("parallel-decode-num")) != 0) {
ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"),
@ -972,6 +974,7 @@ static void initParallelDecodeOption(ParallelDecodeOption *pOptions, int paralle
pOptions->sending_batch = 0;
pOptions->decode_change = parallel_decode_change_to_bin;
pOptions->parallel_queue_size = DEFAULT_PARALLEL_QUEUE_SIZE;
pOptions->include_originid = false;
/* GUC */
DecodeOptionsDefault *defaultOption = LogicalDecodeGetOptionsDefault();

View File

@ -1496,8 +1496,8 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco
{
if (pdata->pOptions.decode_style == 'b') {
int curPos = out->len;
const uint32 beginBaseLen = 25; /* this length does not include the seperator 'P' */
pq_sendint32(out, beginBaseLen);
uint32 beginLen = 25; /* this length does not include the seperator 'P' */
pq_sendint32(out, beginLen);
pq_sendint64(out, txn->first_lsn);
appendStringInfoChar(out, 'B');
pq_sendint64(out, change->csn);
@ -1507,10 +1507,17 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco
const char *timeStamp = timestamptz_to_str(txn->commit_time);
pq_sendint32(out, (uint32)(strlen(timeStamp)));
appendStringInfoString(out, timeStamp);
uint32 beginLen = htonl(beginBaseLen + 1 + sizeof(uint32) + strlen(timeStamp));
errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32));
securec_check(rc, "", "");
beginLen += 1 + sizeof(uint32) + strlen(timeStamp);
}
if (pdata->pOptions.include_originid) {
appendStringInfoChar(out, 'O');
pq_sendint32(out, txn->origin_id);
beginLen += 1 + sizeof(uint32);
}
beginLen = htonl(beginLen);
errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32));
securec_check(rc, "", "");
} else {
int curPos = out->len;
uint32 beginLen = 0;
@ -1525,6 +1532,9 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco
const char *timeStamp = timestamptz_to_str(txn->commit_time);
appendStringInfo(out, " commit_time: %s", timeStamp);
}
if (pdata->pOptions.include_originid) {
appendStringInfo(out, " origin_id: %d", txn->origin_id);
}
if (batchSending) {
beginLen = htonl((uint32)(out->len - curPos) - (uint32)sizeof(uint32));
errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32));

View File

@ -4822,6 +4822,7 @@ static void LogicalLogHandleAbortOrCommit(ParallelReorderBuffer *prb, logicalLog
txn->final_lsn = logChange->finalLsn;
txn->nsubtxns = logChange->nsubxacts;
txn->commit_time = logChange->commitTime;
txn->origin_id = logChange->origin_id;
bool isForget = false;
if (XLByteLT(txn->final_lsn, g_Logicaldispatcher[slotId].startpoint) ||

View File

@ -134,6 +134,7 @@ typedef struct {
ParallelDecodeChangeCB decode_change;
List *tableWhiteList;
int parallel_queue_size;
bool include_originid;
} ParallelDecodeOption;
typedef struct {
@ -151,6 +152,7 @@ typedef struct {
int max_txn_in_memory;
int max_reorderbuffer_in_memory;
List *tableWhiteList;
bool include_originid;
} PluginTestDecodingData;
typedef struct ParallelLogicalDecodingContext {

View File

@ -53,6 +53,7 @@ typedef struct logicalLog {
int nsubxacts;
TransactionId *subXids;
TimestampTz commitTime;
RepOriginId origin_id;
HTAB* toast_hash;
ParallelReorderBufferTXN *txn;
logicalLog *freeNext;
@ -148,6 +149,7 @@ typedef struct ParallelReorderBufferChange {
TransactionId * subXids;
dlist_node node;
TimestampTz commitTime;
RepOriginId origin_id;
} ParallelReorderBufferChange;
typedef struct ParallelReorderBufferTXN {