parallel decoding fix walsender lsn bug

Signed-off-by: unknown <1836611252@qq.com>
This commit is contained in:
z00520711
2022-08-01 10:49:09 +08:00
committed by unknown
parent e94d589c4f
commit 5cc8be4749
4 changed files with 41 additions and 18 deletions

View File

@ -279,7 +279,7 @@ static ParallelLogicalDecodingContext *ParallelStartupDecodingContext(List *outp
ctx->output_plugin_options = output_plugin_options;
ctx->fast_forward = fast_forward;
ctx->writeLocationBuffer = makeStringInfo();
(void)MemoryContextSwitchTo(old_context);
return ctx;

View File

@ -1297,26 +1297,37 @@ static void ParallelCheckPrepare(StringInfo out, logicalLog *change, ParallelDec
/*
* Parallel decoding check whether this batch should be sent.
*/
static void ParallelCheckBatch(StringInfo out, ParallelDecodingData *pdata, int slotId,
static void ParallelCheckBatch(ParallelLogicalDecodingContext *ctx, ParallelDecodingData *pdata, int slotId,
MemoryContext *oldCtxPtr, bool emptyXact)
{
if (emptyXact) {
MemoryContextSwitchTo(*oldCtxPtr);
return;
}
if (out->len > g_Logicaldispatcher[slotId].pOptions.sending_batch * g_batch_unit_length) {
if (ctx->out->len > g_Logicaldispatcher[slotId].pOptions.sending_batch * g_batch_unit_length) {
if (pdata->pOptions.decode_style == 'b') {
appendStringInfoChar(out, 'F'); // the finishing char
appendStringInfoChar(ctx->out, 'F'); // the finishing char
} else if (g_Logicaldispatcher[slotId].pOptions.sending_batch > 0) {
pq_sendint32(out, 0);
pq_sendint32(ctx->out, 0);
}
MemoryContextSwitchTo(*oldCtxPtr);
WalSndWriteDataHelper(out, 0, 0, false);
resetStringInfo(ctx->writeLocationBuffer);
if (XLogRecPtrIsValid(ctx->write_location)) {
pq_sendint64(ctx->writeLocationBuffer, ctx->write_location);
/* ctx->out->data[0] is msgtype 'w', skip it */
errno_t rc = memcpy_s(&(ctx->out->data[1]), sizeof(uint64), ctx->writeLocationBuffer->data, sizeof(uint64));
securec_check(rc, "\0", "\0");
rc = memcpy_s(&(ctx->out->data[1 + sizeof(uint64)]), sizeof(uint64), ctx->writeLocationBuffer->data,
sizeof(uint64));
securec_check(rc, "\0", "\0");
}
WalSndWriteDataHelper(ctx->out, 0, 0, false);
g_Logicaldispatcher[slotId].decodeTime = GetCurrentTimestamp();
g_Logicaldispatcher[slotId].remainPatch = false;
} else {
if (pdata->pOptions.decode_style == 'b') {
appendStringInfoChar(out, 'P');
appendStringInfoChar(ctx->out, 'P');
}
g_Logicaldispatcher[slotId].remainPatch = true;
MemoryContextSwitchTo(*oldCtxPtr);
@ -1326,11 +1337,11 @@ static void ParallelCheckBatch(StringInfo out, ParallelDecodingData *pdata, int
/*
* Parallel decoding deal with batch sending.
*/
static inline void ParallelHandleBatch(StringInfo out, logicalLog *change, ParallelDecodingData *pdata, int slotId,
MemoryContext *oldCtxPtr)
static inline void ParallelHandleBatch(ParallelLogicalDecodingContext *ctx, logicalLog *change,
ParallelDecodingData *pdata, int slotId, MemoryContext *oldCtxPtr)
{
ParallelCheckBatch(out, pdata, slotId, oldCtxPtr, false);
ParallelCheckPrepare(out, change, pdata, slotId, oldCtxPtr);
ParallelCheckBatch(ctx, pdata, slotId, oldCtxPtr, false);
ParallelCheckPrepare(ctx->out, change, pdata, slotId, oldCtxPtr);
}
/*
@ -1453,7 +1464,7 @@ void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change,
if (!pdata->pOptions.skip_empty_xacts) {
ParallelOutputBegin(ctx->out, change, pdata, txn, g_Logicaldispatcher[slotId].pOptions.sending_batch > 0);
ParallelHandleBatch(ctx->out, change, pdata, slotId, &oldCtx);
ParallelHandleBatch(ctx, change, pdata, slotId, &oldCtx);
}
ParallelReorderBufferIterTXNState *volatile iterstate = NULL;
@ -1464,20 +1475,26 @@ void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change,
stopDecode = true;
}
if (!stopDecode && logChange->out != NULL && logChange->out->len != 0) {
if (XLByteLT(ctx->write_location, logChange->lsn)) {
ctx->write_location = logChange->lsn;
}
if (pdata->pOptions.skip_empty_xacts && !pdata->pOptions.xact_wrote_changes) {
ParallelOutputBegin(ctx->out, change, pdata, txn,
g_Logicaldispatcher[slotId].pOptions.sending_batch > 0);
ParallelHandleBatch(ctx->out, change, pdata, slotId, &oldCtx);
ParallelHandleBatch(ctx, change, pdata, slotId, &oldCtx);
pdata->pOptions.xact_wrote_changes = true;
}
appendBinaryStringInfo(ctx->out, logChange->out->data, logChange->out->len);
ParallelHandleBatch(ctx->out, change, pdata, slotId, &oldCtx);
ParallelHandleBatch(ctx, change, pdata, slotId, &oldCtx);
}
dlist_delete(&logChange->node);
FreeLogicalLog(logChange, slotId);
}
if (XLByteLT(ctx->write_location, change->lsn)) {
ctx->write_location = change->lsn;
}
if (!pdata->pOptions.skip_empty_xacts || pdata->pOptions.xact_wrote_changes) {
ParallelOutputCommit(ctx->out, change, pdata, txn,
g_Logicaldispatcher[slotId].pOptions.sending_batch > 0);
@ -1485,7 +1502,7 @@ void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change,
ParallelReorderBufferCleanupTXN(rb, txn);
ParallelReorderBufferIterTXNFinish(iterstate, slotId);
ParallelCheckBatch(ctx->out, pdata, slotId, &oldCtx,
ParallelCheckBatch(ctx, pdata, slotId, &oldCtx,
(pdata->pOptions.skip_empty_xacts && !pdata->pOptions.xact_wrote_changes));
MemoryContextReset(pdata->context);
}

View File

@ -1779,9 +1779,6 @@ void WalSndWriteDataHelper(StringInfo out, XLogRecPtr lsn, TransactionId xid, bo
{
errno_t rc;
/* output previously gathered data in a CopyData packet */
pq_putmessage_noblock('d', out->data, out->len);
/*
* Fill the send timestamp last, so that it is taken as late as
* possible. This is somewhat ugly, but the protocol's set as it's already
@ -1793,6 +1790,11 @@ void WalSndWriteDataHelper(StringInfo out, XLogRecPtr lsn, TransactionId xid, bo
t_thrd.walsender_cxt.tmpbuf->data, sizeof(int64));
securec_check(rc, "\0", "\0");
/* output previously gathered data in a CopyData packet */
pq_putmessage_noblock('d', out->data, out->len);
CHECK_FOR_INTERRUPTS();
/* fast path */
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)

View File

@ -196,6 +196,10 @@ typedef struct ParallelLogicalDecodingContext {
bool random_mode;
bool isParallel;
/*
* Buffer for updating write_location
*/
StringInfo writeLocationBuffer;
} ParallelLogicalDecodingContext;
typedef struct ParallelDecodeWorker {