From 5cc8be474994b1c96cbcf85a8b1d17885d5be2db Mon Sep 17 00:00:00 2001 From: z00520711 Date: Mon, 1 Aug 2022 10:49:09 +0800 Subject: [PATCH] parallel decoding fix walsender lsn bug Signed-off-by: unknown <1836611252@qq.com> --- .../storage/replication/logical/logical.cpp | 2 +- .../logical/parallel_reorderbuffer.cpp | 45 +++++++++++++------ .../storage/replication/walsender.cpp | 8 ++-- src/include/replication/logical.h | 4 ++ 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/src/gausskernel/storage/replication/logical/logical.cpp b/src/gausskernel/storage/replication/logical/logical.cpp index bbd1ed957..fb4fd02af 100644 --- a/src/gausskernel/storage/replication/logical/logical.cpp +++ b/src/gausskernel/storage/replication/logical/logical.cpp @@ -279,7 +279,7 @@ static ParallelLogicalDecodingContext *ParallelStartupDecodingContext(List *outp ctx->output_plugin_options = output_plugin_options; ctx->fast_forward = fast_forward; - + ctx->writeLocationBuffer = makeStringInfo(); (void)MemoryContextSwitchTo(old_context); return ctx; diff --git a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp index 9e82a7d1a..556ae47c5 100644 --- a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp @@ -1297,26 +1297,37 @@ static void ParallelCheckPrepare(StringInfo out, logicalLog *change, ParallelDec /* * Parallel decoding check whether this batch should be sent. */ -static void ParallelCheckBatch(StringInfo out, ParallelDecodingData *pdata, int slotId, +static void ParallelCheckBatch(ParallelLogicalDecodingContext *ctx, ParallelDecodingData *pdata, int slotId, MemoryContext *oldCtxPtr, bool emptyXact) { if (emptyXact) { MemoryContextSwitchTo(*oldCtxPtr); return; } - if (out->len > g_Logicaldispatcher[slotId].pOptions.sending_batch * g_batch_unit_length) { + if (ctx->out->len > g_Logicaldispatcher[slotId].pOptions.sending_batch * g_batch_unit_length) { if (pdata->pOptions.decode_style == 'b') { - appendStringInfoChar(out, 'F'); // the finishing char + appendStringInfoChar(ctx->out, 'F'); // the finishing char } else if (g_Logicaldispatcher[slotId].pOptions.sending_batch > 0) { - pq_sendint32(out, 0); + pq_sendint32(ctx->out, 0); } MemoryContextSwitchTo(*oldCtxPtr); - WalSndWriteDataHelper(out, 0, 0, false); + resetStringInfo(ctx->writeLocationBuffer); + if (XLogRecPtrIsValid(ctx->write_location)) { + pq_sendint64(ctx->writeLocationBuffer, ctx->write_location); + /* ctx->out->data[0] is msgtype 'w', skip it */ + errno_t rc = memcpy_s(&(ctx->out->data[1]), sizeof(uint64), ctx->writeLocationBuffer->data, sizeof(uint64)); + securec_check(rc, "\0", "\0"); + rc = memcpy_s(&(ctx->out->data[1 + sizeof(uint64)]), sizeof(uint64), ctx->writeLocationBuffer->data, + sizeof(uint64)); + securec_check(rc, "\0", "\0"); + } + + WalSndWriteDataHelper(ctx->out, 0, 0, false); g_Logicaldispatcher[slotId].decodeTime = GetCurrentTimestamp(); g_Logicaldispatcher[slotId].remainPatch = false; } else { if (pdata->pOptions.decode_style == 'b') { - appendStringInfoChar(out, 'P'); + appendStringInfoChar(ctx->out, 'P'); } g_Logicaldispatcher[slotId].remainPatch = true; MemoryContextSwitchTo(*oldCtxPtr); @@ -1326,11 +1337,11 @@ static void ParallelCheckBatch(StringInfo out, ParallelDecodingData *pdata, int /* * Parallel decoding deal with batch sending. */ -static inline void ParallelHandleBatch(StringInfo out, logicalLog *change, ParallelDecodingData *pdata, int slotId, - MemoryContext *oldCtxPtr) +static inline void ParallelHandleBatch(ParallelLogicalDecodingContext *ctx, logicalLog *change, + ParallelDecodingData *pdata, int slotId, MemoryContext *oldCtxPtr) { - ParallelCheckBatch(out, pdata, slotId, oldCtxPtr, false); - ParallelCheckPrepare(out, change, pdata, slotId, oldCtxPtr); + ParallelCheckBatch(ctx, pdata, slotId, oldCtxPtr, false); + ParallelCheckPrepare(ctx->out, change, pdata, slotId, oldCtxPtr); } /* @@ -1453,7 +1464,7 @@ void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change, if (!pdata->pOptions.skip_empty_xacts) { ParallelOutputBegin(ctx->out, change, pdata, txn, g_Logicaldispatcher[slotId].pOptions.sending_batch > 0); - ParallelHandleBatch(ctx->out, change, pdata, slotId, &oldCtx); + ParallelHandleBatch(ctx, change, pdata, slotId, &oldCtx); } ParallelReorderBufferIterTXNState *volatile iterstate = NULL; @@ -1464,20 +1475,26 @@ void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change, stopDecode = true; } if (!stopDecode && logChange->out != NULL && logChange->out->len != 0) { + if (XLByteLT(ctx->write_location, logChange->lsn)) { + ctx->write_location = logChange->lsn; + } if (pdata->pOptions.skip_empty_xacts && !pdata->pOptions.xact_wrote_changes) { ParallelOutputBegin(ctx->out, change, pdata, txn, g_Logicaldispatcher[slotId].pOptions.sending_batch > 0); - ParallelHandleBatch(ctx->out, change, pdata, slotId, &oldCtx); + ParallelHandleBatch(ctx, change, pdata, slotId, &oldCtx); pdata->pOptions.xact_wrote_changes = true; } appendBinaryStringInfo(ctx->out, logChange->out->data, logChange->out->len); - ParallelHandleBatch(ctx->out, change, pdata, slotId, &oldCtx); + ParallelHandleBatch(ctx, change, pdata, slotId, &oldCtx); } dlist_delete(&logChange->node); FreeLogicalLog(logChange, slotId); } + if (XLByteLT(ctx->write_location, change->lsn)) { + ctx->write_location = change->lsn; + } if (!pdata->pOptions.skip_empty_xacts || pdata->pOptions.xact_wrote_changes) { ParallelOutputCommit(ctx->out, change, pdata, txn, g_Logicaldispatcher[slotId].pOptions.sending_batch > 0); @@ -1485,7 +1502,7 @@ void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change, ParallelReorderBufferCleanupTXN(rb, txn); ParallelReorderBufferIterTXNFinish(iterstate, slotId); - ParallelCheckBatch(ctx->out, pdata, slotId, &oldCtx, + ParallelCheckBatch(ctx, pdata, slotId, &oldCtx, (pdata->pOptions.skip_empty_xacts && !pdata->pOptions.xact_wrote_changes)); MemoryContextReset(pdata->context); } diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index f2caff6f4..b33322f55 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -1779,9 +1779,6 @@ void WalSndWriteDataHelper(StringInfo out, XLogRecPtr lsn, TransactionId xid, bo { errno_t rc; - /* output previously gathered data in a CopyData packet */ - pq_putmessage_noblock('d', out->data, out->len); - /* * Fill the send timestamp last, so that it is taken as late as * possible. This is somewhat ugly, but the protocol's set as it's already @@ -1793,6 +1790,11 @@ void WalSndWriteDataHelper(StringInfo out, XLogRecPtr lsn, TransactionId xid, bo t_thrd.walsender_cxt.tmpbuf->data, sizeof(int64)); securec_check(rc, "\0", "\0"); + /* output previously gathered data in a CopyData packet */ + pq_putmessage_noblock('d', out->data, out->len); + + CHECK_FOR_INTERRUPTS(); + /* fast path */ /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 0c59e31ff..6e2978f20 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -196,6 +196,10 @@ typedef struct ParallelLogicalDecodingContext { bool random_mode; bool isParallel; + /* + * Buffer for updating write_location + */ + StringInfo writeLocationBuffer; } ParallelLogicalDecodingContext; typedef struct ParallelDecodeWorker {