From b8cf7459a60f5547c83b8cbfb07c8b6c83ee6343 Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Mon, 13 Mar 2023 21:40:11 +0800 Subject: [PATCH] add include-originid option for logical decode --- contrib/mppdb_decoding/mppdb_decoding.cpp | 3 +++ src/bin/pg_basebackup/pg_recvlogical.cpp | 6 ++++++ .../storage/replication/logical/logical.cpp | 2 ++ .../replication/logical/logical_parse.cpp | 1 + .../replication/logical/parallel_decode.cpp | 1 + .../logical/parallel_decode_worker.cpp | 3 +++ .../logical/parallel_reorderbuffer.cpp | 20 ++++++++++++++----- .../storage/replication/walsender.cpp | 1 + src/include/replication/logical.h | 2 ++ .../replication/parallel_reorderbuffer.h | 2 ++ 10 files changed, 36 insertions(+), 5 deletions(-) diff --git a/contrib/mppdb_decoding/mppdb_decoding.cpp b/contrib/mppdb_decoding/mppdb_decoding.cpp index b5c05f2cb..621a8cfc5 100644 --- a/contrib/mppdb_decoding/mppdb_decoding.cpp +++ b/contrib/mppdb_decoding/mppdb_decoding.cpp @@ -104,6 +104,7 @@ static void pg_decode_startup(LogicalDecodingContext* ctx, OutputPluginOptions* data->skip_empty_xacts = false; data->only_local = true; data->tableWhiteList = NIL; + data->include_originid = false; /* read default option from GUC */ DecodeOptionsDefault *defaultOption = LogicalDecodeGetOptionsDefault(); @@ -150,6 +151,8 @@ static void pg_output_begin(LogicalDecodingContext* ctx, PluginTestDecodingData* appendStringInfo(ctx->out, "BEGIN %lu", txn->xid); else appendStringInfoString(ctx->out, "BEGIN"); + if (data->include_originid) + appendStringInfo(ctx->out, " ORIGIN_ID %d", txn->origin_id); OutputPluginWrite(ctx, last_write); } diff --git a/src/bin/pg_basebackup/pg_recvlogical.cpp b/src/bin/pg_basebackup/pg_recvlogical.cpp index cf415e7b7..a8269e2aa 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.cpp +++ b/src/bin/pg_basebackup/pg_recvlogical.cpp @@ -398,6 +398,12 @@ static void BeginToText(const char* stream, uint32 *curPos, PQExpBuffer res) appendBinaryPQExpBuffer(res, &stream[*curPos], timeLen); *curPos += timeLen; } + if (stream[*curPos] == 'O') { + *curPos += 1; + uint32 originid = ntohl(*(uint32 *)(&stream[*curPos])); + *curPos += sizeof(uint32); + appendPQExpBuffer(res, " origin_id: %d", originid); + } } /* diff --git a/src/gausskernel/storage/replication/logical/logical.cpp b/src/gausskernel/storage/replication/logical/logical.cpp index 45febd236..fe159643a 100644 --- a/src/gausskernel/storage/replication/logical/logical.cpp +++ b/src/gausskernel/storage/replication/logical/logical.cpp @@ -1544,6 +1544,8 @@ void ParseDecodingOptionPlugin(ListCell* option, PluginTestDecodingData* data, O SetConfigOption("logical_sender_timeout", strVal(elem->arg), PGC_USERSET, PGC_S_OVERRIDE); } else if (strncmp(elem->defname, "max-reorderbuffer-in-memory", sizeof("max-reorderbuffer-in-memory")) == 0) { CheckIntOption(elem, &data->max_reorderbuffer_in_memory, 0, 0, maxReorderBuffer); + } else if (strncmp(elem->defname, "include-originid", sizeof("include-originid")) == 0) { + CheckBooleanOption(elem, &data->include_originid, true); } else if (strncmp(elem->defname, "parallel-decode-num", sizeof("parallel-decode-num")) != 0) { ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"), diff --git a/src/gausskernel/storage/replication/logical/logical_parse.cpp b/src/gausskernel/storage/replication/logical/logical_parse.cpp index bf2bb41b1..2c297c1c2 100644 --- a/src/gausskernel/storage/replication/logical/logical_parse.cpp +++ b/src/gausskernel/storage/replication/logical/logical_parse.cpp @@ -324,6 +324,7 @@ void ParseCommitXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, change->endLsn = buf->endptr; change->nsubxacts = nsubxacts; change->commitTime = commit_time; + change->origin_id = XLogRecGetOrigin(buf->record); if (nsubxacts > 0) { MemoryContext oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.pdecode_cxt[slotId].parallelDecodeCtx); diff --git a/src/gausskernel/storage/replication/logical/parallel_decode.cpp b/src/gausskernel/storage/replication/logical/parallel_decode.cpp index 3634780a0..519f2a321 100644 --- a/src/gausskernel/storage/replication/logical/parallel_decode.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_decode.cpp @@ -841,6 +841,7 @@ static logicalLog* ParallelDecodeCommitOrAbort(ParallelReorderBufferChange* chan logChange->endLsn = change->endLsn; logChange->nsubxacts = change->nsubxacts; logChange->commitTime = change->commitTime; + logChange->origin_id = change->origin_id; int slotId = worker->slotId; Size subXidSize = sizeof(TransactionId) * change->nsubxacts; if (subXidSize > 0 && change->subXids != NULL) { diff --git a/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp b/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp index 4555b07c3..4a1f9b02f 100644 --- a/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_decode_worker.cpp @@ -943,6 +943,8 @@ static void ParseDecodingOption(ParallelDecodeOption *data, ListCell *option) } } else if (strncmp(elem->defname, "sender-timeout", sizeof("sender-timeout")) == 0 && elem->arg != NULL) { SetConfigOption("logical_sender_timeout", strVal(elem->arg), PGC_USERSET, PGC_S_OVERRIDE); + } else if (strncmp(elem->defname, "include-originid", sizeof("include-originid")) == 0) { + CheckBooleanOption(elem, &data->include_originid, false); } else if (strncmp(elem->defname, "parallel-decode-num", sizeof("parallel-decode-num")) != 0) { ereport(ERROR, (errmodule(MOD_LOGICAL_DECODE), errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"), @@ -972,6 +974,7 @@ static void initParallelDecodeOption(ParallelDecodeOption *pOptions, int paralle pOptions->sending_batch = 0; pOptions->decode_change = parallel_decode_change_to_bin; pOptions->parallel_queue_size = DEFAULT_PARALLEL_QUEUE_SIZE; + pOptions->include_originid = false; /* GUC */ DecodeOptionsDefault *defaultOption = LogicalDecodeGetOptionsDefault(); diff --git a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp index c8a1cc6ac..18785e93f 100644 --- a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp @@ -1496,8 +1496,8 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco { if (pdata->pOptions.decode_style == 'b') { int curPos = out->len; - const uint32 beginBaseLen = 25; /* this length does not include the seperator 'P' */ - pq_sendint32(out, beginBaseLen); + uint32 beginLen = 25; /* this length does not include the seperator 'P' */ + pq_sendint32(out, beginLen); pq_sendint64(out, txn->first_lsn); appendStringInfoChar(out, 'B'); pq_sendint64(out, change->csn); @@ -1507,10 +1507,17 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco const char *timeStamp = timestamptz_to_str(txn->commit_time); pq_sendint32(out, (uint32)(strlen(timeStamp))); appendStringInfoString(out, timeStamp); - uint32 beginLen = htonl(beginBaseLen + 1 + sizeof(uint32) + strlen(timeStamp)); - errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32)); - securec_check(rc, "", ""); + beginLen += 1 + sizeof(uint32) + strlen(timeStamp); } + if (pdata->pOptions.include_originid) { + appendStringInfoChar(out, 'O'); + pq_sendint32(out, txn->origin_id); + beginLen += 1 + sizeof(uint32); + } + + beginLen = htonl(beginLen); + errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32)); + securec_check(rc, "", ""); } else { int curPos = out->len; uint32 beginLen = 0; @@ -1525,6 +1532,9 @@ static void ParallelOutputBegin(StringInfo out, logicalLog *change, ParallelDeco const char *timeStamp = timestamptz_to_str(txn->commit_time); appendStringInfo(out, " commit_time: %s", timeStamp); } + if (pdata->pOptions.include_originid) { + appendStringInfo(out, " origin_id: %d", txn->origin_id); + } if (batchSending) { beginLen = htonl((uint32)(out->len - curPos) - (uint32)sizeof(uint32)); errno_t rc = memcpy_s(out->data + curPos, sizeof(uint32), &beginLen, sizeof(uint32)); diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 6af9c6e9d..ce9fb5104 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -4822,6 +4822,7 @@ static void LogicalLogHandleAbortOrCommit(ParallelReorderBuffer *prb, logicalLog txn->final_lsn = logChange->finalLsn; txn->nsubtxns = logChange->nsubxacts; txn->commit_time = logChange->commitTime; + txn->origin_id = logChange->origin_id; bool isForget = false; if (XLByteLT(txn->final_lsn, g_Logicaldispatcher[slotId].startpoint) || diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index a3a51880a..75d491a4d 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -134,6 +134,7 @@ typedef struct { ParallelDecodeChangeCB decode_change; List *tableWhiteList; int parallel_queue_size; + bool include_originid; } ParallelDecodeOption; typedef struct { @@ -151,6 +152,7 @@ typedef struct { int max_txn_in_memory; int max_reorderbuffer_in_memory; List *tableWhiteList; + bool include_originid; } PluginTestDecodingData; typedef struct ParallelLogicalDecodingContext { diff --git a/src/include/replication/parallel_reorderbuffer.h b/src/include/replication/parallel_reorderbuffer.h index 3d8f1dc7a..07a4cc1d3 100644 --- a/src/include/replication/parallel_reorderbuffer.h +++ b/src/include/replication/parallel_reorderbuffer.h @@ -53,6 +53,7 @@ typedef struct logicalLog { int nsubxacts; TransactionId *subXids; TimestampTz commitTime; + RepOriginId origin_id; HTAB* toast_hash; ParallelReorderBufferTXN *txn; logicalLog *freeNext; @@ -148,6 +149,7 @@ typedef struct ParallelReorderBufferChange { TransactionId * subXids; dlist_node node; TimestampTz commitTime; + RepOriginId origin_id; } ParallelReorderBufferChange; typedef struct ParallelReorderBufferTXN {