From d079103fcc18898130ee92ab19a79ea453667f68 Mon Sep 17 00:00:00 2001 From: wofanzheng <2399541971@qq.com> Date: Mon, 22 Jul 2024 14:18:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=80=BB=E8=BE=91=E5=A4=8D?= =?UTF-8?q?=E5=88=B6=E5=9C=BA=E6=99=AF=E4=B8=8B=E6=9E=81=E8=87=B4RTO?= =?UTF-8?q?=E5=9B=9E=E6=94=BE=E8=BF=87=E7=A8=8B=E4=B8=AD=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=E6=B2=A1=E6=9C=89=E5=AF=B9=E9=BD=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/access/redo/redo_xlogutils.cpp | 27 +++++++++--- .../storage/access/rmgrdesc/uheapdesc.cpp | 16 ++++---- .../storage/access/transam/cbmparsexlog.cpp | 16 ++++---- .../access/ustore/knl_uextremeredo.cpp | 41 ++++++++++++------- .../storage/access/ustore/knl_uredo.cpp | 32 +++++++-------- .../storage/replication/logical/decode.cpp | 40 +++++++++--------- src/include/access/ustore/knl_uextremeredo.h | 2 +- src/include/access/ustore/knl_uredo.h | 1 + src/include/access/xlogproc.h | 1 + src/include/access/xlogreader.h | 1 + 10 files changed, 104 insertions(+), 73 deletions(-) diff --git a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp index 249b47a81..04d758fa0 100644 --- a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp +++ b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp @@ -432,6 +432,7 @@ void XLogRecSetBlockDataState(XLogReaderState *record, uint32 blockid, XLogRecPa XLogRecSetBlockDataStateContent(record, blockid, blockdatarec); recordblockstate->blockparse.blockhead.is_conflict_type = is_conflict_type; + recordblockstate->blockparse.blockhead.hasCSN = XLogRecHasCSN(record); } void XLogRecSetAuxiBlkNumState(XLogBlockDataParse *blockdatarec, BlockNumber auxilaryblkn1, BlockNumber auxilaryblkn2) @@ -505,6 +506,16 @@ void GetXlUndoHeaderExtraData(char **currLogPtr, XlUndoHeaderExtra *xlundohdrext } else { xlundohdrextra->partitionOid = 0; } + if ((flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0) { + *currLogPtr += sizeof(TransactionId); + xlundohdrextra->size += sizeof(TransactionId); + } + if ((flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0) { + uint32 toastLen = *(uint32 *)(*currLogPtr); + *currLogPtr += sizeof(uint32) + toastLen; + xlundohdrextra->size += sizeof(uint32) + toastLen; + } + } /* Set uheap undo insert block state for xlog record */ @@ -512,7 +523,8 @@ RelFileNode XLogRecSetUHeapUndoInsertBlockState(XLogReaderState *record, XlUHeapInsert *xlrec, insertUndoParse *parseBlock, DecodedBkpBlock *decodebkp) { RelFileNode rnode; - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert); + bool hasCSN = XLogRecHasCSN(record); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN)); char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); GetXlUndoHeaderExtraData(&currLogPtr, &parseBlock->xlundohdrextra, xlundohdr->flag); @@ -539,7 +551,8 @@ RelFileNode XLogRecSetUHeapUndoDeleteBlockState(XLogReaderState *record, RelFileNode rnode; Size recordlen = XLogRecGetDataLen(record); deleteUndoParse *parseBlock = &blockundo->deleteUndoParse; - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete); + bool hasCSN = XLogRecHasCSN(record); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN)); char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); GetXlUndoHeaderExtraData(&currLogPtr, &parseBlock->xlundohdrextra, xlundohdr->flag); @@ -550,7 +563,7 @@ RelFileNode XLogRecSetUHeapUndoDeleteBlockState(XLogReaderState *record, blockundo->maindata = (char *)currLogPtr; blockundo->recordlen = recordlen - SizeOfUHeapDelete - SizeOfXLUndoHeader - parseBlock->xlundohdrextra.size - - undoMetaSize - SizeOfUHeapHeader; + undoMetaSize - SizeOfUHeapHeader - SizeOfXLOGCSN(hasCSN); parseBlock->recxid = XLogRecGetXid(record); parseBlock->offnum = xlrec->offnum; @@ -574,7 +587,8 @@ RelFileNode XLogRecSetUHeapUndoUpdateBlockState(XLogReaderState *record, RelFileNode rnode; Size recordlen = XLogRecGetDataLen(record); updateUndoParse *parseBlock = &blockundo->updateUndoParse; - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate); + bool hasCSN = XLogRecHasCSN(record); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN)); XlUndoHeader *xlnewundohdr; char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); @@ -606,7 +620,7 @@ RelFileNode XLogRecSetUHeapUndoUpdateBlockState(XLogReaderState *record, blockundo->recordlen = recordlen - SizeOfUHeapUpdate - SizeOfXLUndoHeader - parseBlock->xlundohdrextra.size - SizeOfXLUndoHeader - parseBlock->xlnewundohdrextra.size - - undoMetaSize - initPageXtraInfo - SizeOfUHeapHeader; + undoMetaSize - initPageXtraInfo - SizeOfUHeapHeader - SizeOfXLOGCSN(hasCSN); } else { int *undoXorDeltaSizePtr = (int *)currLogPtr; parseBlock->undoXorDeltaSize = *undoXorDeltaSizePtr; @@ -651,6 +665,9 @@ RelFileNode XLogRecSetUHeapUndoMultiInsertBlockState(XLogReaderState *record, currLogPtr += sizeof(TransactionId) + sizeof(uint16); } + bool hasCSN = XLogRecHasCSN(record); + currLogPtr = currLogPtr + SizeOfXLOGCSN(hasCSN); + blockundo->maindata = (char *)currLogPtr; xlrec = (XlUHeapMultiInsert *)((char *)currLogPtr); diff --git a/src/gausskernel/storage/access/rmgrdesc/uheapdesc.cpp b/src/gausskernel/storage/access/rmgrdesc/uheapdesc.cpp index 3bc857feb..b9ef2781e 100644 --- a/src/gausskernel/storage/access/rmgrdesc/uheapdesc.cpp +++ b/src/gausskernel/storage/access/rmgrdesc/uheapdesc.cpp @@ -115,7 +115,7 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record) Size blkDataLen = 0; XlUHeapInsert *xlrec = (XlUHeapInsert *)rec; XlUHeapHeader *uheapHeader = (XlUHeapHeader *) XLogRecGetBlockData(record, 0, &blkDataLen); - hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + hasCSN = XLogRecHasCSN(record); bool isInit = (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) != 0; if (isInit) { appendStringInfo(buf, "XLOG_UHEAP_INSERT insert(init): "); @@ -128,7 +128,7 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "TupInfo: "); appendStringInfo(buf, "tupoffset %u, flag %u. ", (uint16)xlrec->offnum, (uint8)xlrec->flags); XlUndoHeader *xlundohdr = - (XlUndoHeader *)((char *)rec + SizeOfUHeapInsert + (hasCSN ? sizeof(CommitSeqNo) : 0)); + (XlUndoHeader *)((char *)rec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN)); currLogPtr = GetUndoHeader(xlundohdr, &partitionOid, &blkprev, &prevUrp, &subXid, &toastLen); appendStringInfo(buf, "UndoInfo: "); appendStringInfo(buf, @@ -173,9 +173,9 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record) curxlogptr += sizeof(uint16); } - hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + hasCSN = XLogRecHasCSN(record); XlUHeapMultiInsert *xlrec = - (XlUHeapMultiInsert *)((char *)curxlogptr + (hasCSN ? sizeof(CommitSeqNo) : 0)); + (XlUHeapMultiInsert *)((char *)curxlogptr + SizeOfXLOGCSN(hasCSN)); curxlogptr = (char *)xlrec + SizeOfUHeapMultiInsert; int nranges = *(int *)curxlogptr; @@ -206,14 +206,14 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record) } case XLOG_UHEAP_DELETE: { XlUHeapDelete *xlrec = (XlUHeapDelete *)rec; - hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + hasCSN = XLogRecHasCSN(record); appendStringInfo(buf, "XLOG_UHEAP_DELETE: "); appendStringInfo(buf, "TupInfo: "); appendStringInfo(buf, "oldxid %lu, tupoffset %u, td_id %u, flag %u. ", xlrec->oldxid, (uint16)xlrec->offnum, (uint8)xlrec->td_id, (uint8)xlrec->flag); XlUndoHeader *xlundohdr = - (XlUndoHeader *)((char *)rec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0)); + (XlUndoHeader *)((char *)rec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN)); currLogPtr = GetUndoHeader(xlundohdr, &partitionOid, &blkprev, &prevUrp, &subXid, &toastLen); appendStringInfo(buf, "UndoInfo: "); @@ -235,7 +235,7 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record) errno_t rc; char *recdata = XLogRecGetBlockData(record, 0, &datalen); char *recdataEnd = recdata + datalen; - hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + hasCSN = XLogRecHasCSN(record); XlUndoHeader *xlundohdr = NULL; XlUHeapUpdate *xlrec = (XlUHeapUpdate *)rec; @@ -245,7 +245,7 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record) "oldxid %lu, old tupoffset %u, new tupoffset %u, old_tuple_td_id %u, old_tuple_flag %u. ", xlrec->oldxid, (uint16)xlrec->old_offnum, (uint16)xlrec->new_offnum, (uint8)xlrec->old_tuple_td_id, (uint16)xlrec->old_tuple_flag); - xlundohdr = (XlUndoHeader *)((char *)rec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0)); + xlundohdr = (XlUndoHeader *)((char *)rec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN)); currLogPtr = GetUndoHeader(xlundohdr, &partitionOid, &blkprev, &prevUrp, &subXid, &toastLen); appendStringInfo(buf, "UndoInfo(oldpage): "); appendStringInfo(buf, diff --git a/src/gausskernel/storage/access/transam/cbmparsexlog.cpp b/src/gausskernel/storage/access/transam/cbmparsexlog.cpp index 3c5739dab..c5407e1f9 100644 --- a/src/gausskernel/storage/access/transam/cbmparsexlog.cpp +++ b/src/gausskernel/storage/access/transam/cbmparsexlog.cpp @@ -1458,7 +1458,7 @@ static void skipUndoRecBody(char **currLogPtr, XlUndoHeader *xlundohdr) if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0) { uint32 toastLen = *(uint32 *)(*currLogPtr); - *currLogPtr += sizeof(toastLen) + toastLen; + *currLogPtr += sizeof(uint32) + toastLen; } } @@ -1468,9 +1468,9 @@ static void TrackUheapInsert(XLogReaderState *record) BlockNumber undoStartBlk; UndoSlotPtr slotPtr; RelFileNode rnode; - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + bool hasCSN = XLogRecHasCSN(record); XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record); - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + (hasCSN ? sizeof(CommitSeqNo) : 0)); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN)); char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); skipUndoRecBody(&currLogPtr, xlundohdr); @@ -1500,9 +1500,9 @@ static void TrackUheapDelete(XLogReaderState *record) BlockNumber undoStartBlk; UndoSlotPtr slotPtr; RelFileNode rnode; - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + bool hasCSN = XLogRecHasCSN(record); XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record); - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0)); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN)); char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); skipUndoRecBody(&currLogPtr, xlundohdr); @@ -1533,9 +1533,9 @@ static void TrackUheapUpdate(XLogReaderState *record) BlockNumber undoStartBlk; UndoSlotPtr slotPtr; RelFileNode rnode; - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + bool hasCSN = XLogRecHasCSN(record); XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record); - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0)); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN)); char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); skipUndoRecBody(&currLogPtr, xlundohdr); @@ -1582,7 +1582,7 @@ static void TrackUheapMultiInsert(XLogReaderState *record) int nranges; UndoRecPtr *urpvec = NULL; bool isinit = (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) != 0; - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + bool hasCSN = XLogRecHasCSN(record); XlUndoHeader *xlundohdr = (XlUndoHeader *)XLogRecGetData(record); char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); diff --git a/src/gausskernel/storage/access/ustore/knl_uextremeredo.cpp b/src/gausskernel/storage/access/ustore/knl_uextremeredo.cpp index 86b2b7747..0ce64ea4c 100644 --- a/src/gausskernel/storage/access/ustore/knl_uextremeredo.cpp +++ b/src/gausskernel/storage/access/ustore/knl_uextremeredo.cpp @@ -499,11 +499,17 @@ static char *ReachXlUndoHeaderEnd(XlUndoHeader *xlundohdr) if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) { currLogPtr += sizeof(Oid); } + if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0) { + currLogPtr += sizeof(TransactionId); + } + if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST)) { + currLogPtr += sizeof(uint32) + *(uint32 *)((char *)currLogPtr); + } return currLogPtr; } void UHeapXlogInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool isinit, bool istoast, void *blkdata, - Size datalen, TransactionId recxid, Size *freespace) + Size datalen, TransactionId recxid, Size *freespace, bool isHasCSN) { char *data = (char *)blkdata; Page page = buffer->pageinfo.page; @@ -520,7 +526,7 @@ void UHeapXlogInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool UHeapDiskTuple utup = GetUHeapDiskTupleFromRedoData(data, &newlen, tbuf); undo::XlogUndoMeta undometa; - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(isHasCSN)); char *currLogPtr = ReachXlUndoHeaderEnd(xlundohdr); XlogUndoMeta *xlundometa = (XlogUndoMeta *)((char *)currLogPtr); UndoRecPtr urecptr = xlundohdr->urecptr; @@ -562,7 +568,7 @@ void UHeapXlogInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool PageSetLSN(page, buffer->lsn); } -void UHeapXlogDeleteOperatorPage(RedoBufferInfo *buffer, void *recorddata, Size recordlen, TransactionId recxid) +void UHeapXlogDeleteOperatorPage(RedoBufferInfo *buffer, void *recorddata, Size recordlen, TransactionId recxid, bool isHasCSN) { Page page = buffer->pageinfo.page; TupleBuffer tbuf; @@ -576,7 +582,7 @@ void UHeapXlogDeleteOperatorPage(RedoBufferInfo *buffer, void *recorddata, Size UHeapTupleData utup; undo::XlogUndoMeta undometa; RowPtr *rp; - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(isHasCSN)); char *currLogPtr = ReachXlUndoHeaderEnd(xlundohdr); Size shiftSize = currLogPtr - (char *)xlrec; XlogUndoMeta *xlundometa = (XlogUndoMeta *)((char *)currLogPtr); @@ -629,13 +635,13 @@ void UHeapXlogDeleteOperatorPage(RedoBufferInfo *buffer, void *recorddata, Size void UHeapXlogUpdateOperatorOldpage(UpdateRedoBuffers* buffers, void *recorddata, bool inplaceUpdate, bool blockInplaceUpdate, UHeapTupleData *oldtup, bool sameBlock, - BlockNumber blk, TransactionId recordxid) + BlockNumber blk, TransactionId recordxid, bool isHasCSN) { XLogRecPtr lsn = buffers->oldbuffer.lsn; Page oldpage = buffers->oldbuffer.pageinfo.page; Pointer recData = (Pointer)recorddata; XlUHeapUpdate *xlrec = (XlUHeapUpdate *)recData; - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(isHasCSN)); UndoRecPtr urecptr = xlundohdr->urecptr; Buffer oldbuf = buffers->oldbuffer.buf; RowPtr *rp = NULL; @@ -679,7 +685,7 @@ void UHeapXlogUpdateOperatorOldpage(UpdateRedoBuffers* buffers, void *recorddata Size UHeapXlogUpdateOperatorNewpage(UpdateRedoBuffers* buffers, void *recorddata, bool inplaceUpdate, bool blockInplaceUpdate, void *blkdata, UHeapTupleData *oldtup, Size recordlen, Size data_len, bool isinit, bool istoast, bool sameBlock, - TransactionId recordxid, UpdateRedoAffixLens *affixLens) + TransactionId recordxid, UpdateRedoAffixLens *affixLens, bool isHasCSN) { XLogRecPtr lsn = buffers->newbuffer.lsn; TupleBuffer tbuf; @@ -691,7 +697,7 @@ Size UHeapXlogUpdateOperatorNewpage(UpdateRedoBuffers* buffers, void *recorddata Pointer recData = (Pointer)recorddata; XlUHeapUpdate *xlrec = (XlUHeapUpdate *)recData; - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(isHasCSN)); char *curxlogptr = ReachXlUndoHeaderEnd(xlundohdr); UndoRecPtr urecptr = xlundohdr->urecptr; errno_t rc = EOK; @@ -1024,7 +1030,7 @@ static UHeapDiskTuple GetUHeapDiskTupleFromMultiInsertRedoData(char **data, int } void UHeapXlogMultiInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool isinit, bool istoast, - void *blkdata, Size datalen, TransactionId recxid, Size *freespace) + void *blkdata, Size datalen, TransactionId recxid, Size *freespace, bool isHasCSN) { char *data = (char *)blkdata; Page page = buffer->pageinfo.page; @@ -1068,6 +1074,7 @@ void UHeapXlogMultiInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, uheappage->pd_multi_base = 0; } + curxlogptr = curxlogptr + SizeOfXLOGCSN(isHasCSN); xlrec = (XlUHeapMultiInsert *)((char *)curxlogptr); curxlogptr = (char *)xlrec + SizeOfUHeapMultiInsert; UndoRecPtr *urpvec = NULL; @@ -1309,6 +1316,7 @@ static void UHeapXlogInsertBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b { bool isinit = (XLogBlockHeadGetInfo(blockhead) & XLOG_UHEAP_INIT_PAGE) != 0; bool istoast = (XLogBlockHeadGetInfo(blockhead) & XLOG_UHEAP_INIT_TOAST_PAGE) != 0; + bool hasCSN = blockhead->hasCSN; TransactionId recordxid = XLogBlockHeadGetXid(blockhead); XLogBlockDataParse *datadecode = blockdatarec; XLogRedoAction action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo); @@ -1320,7 +1328,7 @@ static void UHeapXlogInsertBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b blkdata = XLogBlockDataGetBlockData(datadecode, &blkdatalen); Assert(blkdata != NULL); UHeapXlogInsertOperatorPage(bufferinfo, maindata, isinit, istoast, (void *)blkdata, blkdatalen, recordxid, - NULL); + NULL, hasCSN); MakeRedoBufferDirty(bufferinfo); } } @@ -1330,12 +1338,13 @@ static void UHeapXlogDeleteBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b TransactionId recordxid = XLogBlockHeadGetXid(blockhead); XLogBlockDataParse *datadecode = blockdatarec; XLogRedoAction action; + bool hasCSN = blockhead->hasCSN; action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo); if (action == BLK_NEEDS_REDO) { char *maindata = XLogBlockDataGetMainData(datadecode, NULL); Size recordlen = datadecode->main_data_len; - UHeapXlogDeleteOperatorPage(bufferinfo, (void *)maindata, recordlen, recordxid); + UHeapXlogDeleteOperatorPage(bufferinfo, (void *)maindata, recordlen, recordxid, hasCSN); MakeRedoBufferDirty(bufferinfo); } } @@ -1356,6 +1365,7 @@ static void UHeapXlogUpdateBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b UpdateRedoAffixLens affixLens = {0, 0}; UHeapTupleData oldtup; Size freespace = 0; + bool hasCSN = blockhead->hasCSN; action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo); if (action == BLK_NEEDS_REDO) { if (XLogBlockDataGetBlockId(datadecode) == UHEAP_UPDATE_NEW_BLOCK_NUM) { @@ -1370,7 +1380,7 @@ static void UHeapXlogUpdateBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b buffers.oldbuffer.lsn = buffers.newbuffer.lsn; UHeapXlogUpdateOperatorOldpage(&buffers, (void *)maindata, inplaceUpdate, - blockInplaceUpdate, &oldtup, sameBlock, oldblk, recordxid); + blockInplaceUpdate, &oldtup, sameBlock, oldblk, recordxid, hasCSN); } blkdata = XLogBlockDataGetBlockData(datadecode, &blkdatalen); @@ -1379,7 +1389,7 @@ static void UHeapXlogUpdateBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b Size dataLen = datadecode->blockdata.data_len; freespace = UHeapXlogUpdateOperatorNewpage(&buffers, (void *)maindata, inplaceUpdate, blockInplaceUpdate, (void *)blkdata, &oldtup, recordlen, dataLen, isinit, - istoast, sameBlock, recordxid, &affixLens); + istoast, sameBlock, recordxid, &affixLens, hasCSN); /* may should free space */ if (!inplaceUpdate && freespace < BLCKSZ / FREESPACE_FRACTION) { RelFileNode rnode; @@ -1400,7 +1410,7 @@ static void UHeapXlogUpdateBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b } UHeapXlogUpdateOperatorOldpage(&buffers, (void *)maindata, inplaceUpdate, - blockInplaceUpdate, &oldtup, sameBlock, newblk, recordxid); + blockInplaceUpdate, &oldtup, sameBlock, newblk, recordxid, hasCSN); } MakeRedoBufferDirty(bufferinfo); @@ -1415,6 +1425,7 @@ static void UHeapXlogMultiInsertBlock(XLogBlockHead *blockhead, XLogBlockDataPar TransactionId recordxid = XLogBlockHeadGetXid(blockhead); XLogBlockDataParse *datadecode = blockdatarec; XLogRedoAction action; + bool hasCSN = blockhead->hasCSN; action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo); if (action == BLK_NEEDS_REDO) { @@ -1425,7 +1436,7 @@ static void UHeapXlogMultiInsertBlock(XLogBlockHead *blockhead, XLogBlockDataPar blkdata = XLogBlockDataGetBlockData(datadecode, &blkdatalen); Assert(blkdata != NULL); UHeapXlogMultiInsertOperatorPage(bufferinfo, maindata, isinit, istoast, (void *)blkdata, blkdatalen, recordxid, - NULL); + NULL, hasCSN); MakeRedoBufferDirty(bufferinfo); } } diff --git a/src/gausskernel/storage/access/ustore/knl_uredo.cpp b/src/gausskernel/storage/access/ustore/knl_uredo.cpp index 114ef6c53..4b0e10134 100644 --- a/src/gausskernel/storage/access/ustore/knl_uredo.cpp +++ b/src/gausskernel/storage/access/ustore/knl_uredo.cpp @@ -102,9 +102,9 @@ static UndoRecPtr PrepareAndInsertUndoRecordForInsertRedo(XLogReaderState *recor UndoRecPtr invalidUrp = INVALID_UNDO_REC_PTR; Oid invalidPartitionOid = 0; - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + bool hasCSN = XLogRecHasCSN(record); XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record); - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + (hasCSN ? sizeof(CommitSeqNo) : 0)); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN)); char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) { blkprev = (UndoRecPtr *) ((char *)currLogPtr); @@ -184,10 +184,10 @@ static XLogRedoAction GetInsertRedoAction(XLogReaderState *record, RedoBufferInf XLogRedoAction action; if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) { - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + bool hasCSN = XLogRecHasCSN(record); XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record); XlUndoHeader *xlundohdr = - (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + (hasCSN ? sizeof(CommitSeqNo) : 0)); + (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN)); TransactionId *xidBase = (TransactionId *)((char *)xlundohdr + SizeOfXLUndoHeader + skipSize); uint16 *tdCount = (uint16 *)((char *)xidBase + sizeof(TransactionId)); @@ -309,9 +309,9 @@ static UndoRecPtr PrepareAndInsertUndoRecordForDeleteRedo(XLogReaderState *recor bool defaultHasSubXact = false; uint32 readSize = 0; - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + bool hasCSN = XLogRecHasCSN(record); XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record); - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0)); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN)); char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader); if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_SUB_XACT) != 0) { hasSubXact = (bool *) ((char *)currLogPtr); @@ -363,9 +363,9 @@ static UndoRecPtr PrepareAndInsertUndoRecordForDeleteRedo(XLogReaderState *recor * explicitly stored tuple. */ Size datalen = recordlen - SizeOfXLUndoHeader - SizeOfUHeapDelete - undoMetaSize - SizeOfUHeapHeader - - readSize - (hasCSN ? sizeof(CommitSeqNo) : 0); + readSize - SizeOfXLOGCSN(hasCSN); char *data = (char *)xlrec + SizeOfUHeapDelete + SizeOfXLUndoHeader + undoMetaSize + readSize + - (hasCSN ? sizeof(CommitSeqNo) : 0); + SizeOfXLOGCSN(hasCSN); utup->disk_tuple = GetUHeapDiskTupleFromRedoData(data, &datalen, tbuf); utup->disk_tuple_size = datalen; @@ -828,9 +828,9 @@ static UndoRecPtr PrepareAndInsertUndoRecordForUpdateRedo(XLogReaderState *recor bool inplaceUpdate = true; char *xlogXorDelta = NULL; - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; + bool hasCSN = XLogRecHasCSN(record); XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record); - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0)); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN)); UndoRecPtr urecptr = xlundohdr->urecptr; char *curxlogptr = ((char *)xlundohdr) + SizeOfXLUndoHeader; if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_SUB_XACT) != 0) { @@ -925,7 +925,7 @@ static UndoRecPtr PrepareAndInsertUndoRecordForUpdateRedo(XLogReaderState *recor char *data = (char *)curxlogptr; Size datalen = recordlen - SizeOfUHeapHeader - SizeOfXLUndoHeader - SizeOfUHeapUpdate - - undoMetaSize - SizeOfXLUndoHeader - initPageXtraInfo - readSize - (hasCSN ? sizeof(CommitSeqNo) : 0); + undoMetaSize - SizeOfXLUndoHeader - initPageXtraInfo - readSize - SizeOfXLOGCSN(hasCSN); oldtup->disk_tuple = GetUHeapDiskTupleFromRedoData(data, &datalen, tbuf); oldtup->disk_tuple_size = datalen; @@ -1434,8 +1434,8 @@ static UndoRecPtr PrepareAndInsertUndoRecordForMultiInsertRedo(XLogReaderState * curxlogptr += sizeof(uint16); } - bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; - curxlogptr = curxlogptr + (hasCSN ? sizeof(CommitSeqNo) : 0); + bool hasCSN = XLogRecHasCSN(record); + curxlogptr = curxlogptr + SizeOfXLOGCSN(hasCSN); (*xlrec) = (XlUHeapMultiInsert *)curxlogptr; curxlogptr = (char *)*xlrec + SizeOfUHeapMultiInsert; @@ -2132,7 +2132,7 @@ static TransactionId UHeapXlogGetCurrentXidInsert(XLogReaderState *record, bool { XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record); XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + - (hasCSN ? sizeof(CommitSeqNo) : 0)); + SizeOfXLOGCSN(hasCSN)); bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0); if (!hasCurrentXid) { @@ -2158,7 +2158,7 @@ static TransactionId UHeapXlogGetCurrentXidDelete(XLogReaderState *record, bool { XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record); XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + - (hasCSN ? sizeof(CommitSeqNo) : 0)); + SizeOfXLOGCSN(hasCSN)); bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0); if (!hasCurrentXid) { return XLogRecGetXid(record); @@ -2186,7 +2186,7 @@ static TransactionId UHeapXlogGetCurrentXidUpdate(XLogReaderState *record, bool { XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record); XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + - (hasCSN ? sizeof(CommitSeqNo) : 0)); + SizeOfXLOGCSN(hasCSN)); bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0); if (!hasCurrentXid) { diff --git a/src/gausskernel/storage/replication/logical/decode.cpp b/src/gausskernel/storage/replication/logical/decode.cpp index fe0a197e2..d6ebef543 100644 --- a/src/gausskernel/storage/replication/logical/decode.cpp +++ b/src/gausskernel/storage/replication/logical/decode.cpp @@ -1368,7 +1368,7 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* adapt 64 xid, if this tuple is the first tuple of a new page */ bool is_init = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0; /* caution, remaining data in record is not aligned */ - char *data_old = (char *)xlrec + heapUpdateSize + (hasCSN ? sizeof(CommitSeqNo) : 0); + char *data_old = (char *)xlrec + heapUpdateSize + SizeOfXLOGCSN(hasCSN); if (is_init) { datalen_old = XLogRecGetDataLen(r) - heapUpdateSize - sizeof(TransactionId); } else { @@ -1432,7 +1432,7 @@ static void AreaDecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* adapt 64 xid, if this tuple is the first tuple of a new page */ bool is_init = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0; /* caution, remaining data in record is not aligned */ - char *data_old = (char *)xlrec + heapUpdateSize + (hasCSN ? sizeof(CommitSeqNo) : 0); + char *data_old = (char *)xlrec + heapUpdateSize + SizeOfXLOGCSN(hasCSN); if (is_init) { datalen_old = XLogRecGetDataLen(r) - heapUpdateSize - sizeof(TransactionId); } else { @@ -1566,8 +1566,8 @@ static void DecodeUUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size datalenNew = 0; char *dataNew = XLogRecGetBlockData(r, 0, &datalenNew); - Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - (hasCSN ? sizeof(CommitSeqNo) : 0); - char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0); + Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - SizeOfXLOGCSN(hasCSN); + char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN); uint32 toastLen = 0; bool hasToast = false; char *toastPtr = UpdateOldTupleCalc(isInplaceUpdate, r, &dataOld, &tuplelenOld, &toastLen); @@ -1634,8 +1634,8 @@ static void AreaDecodeUUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf curCSN = *(CommitSeqNo *)((char *)xlrec + SizeOfUHeapUpdate); } bool isInplaceUpdate = (xlrec->flags & XLZ_NON_INPLACE_UPDATE) == 0; - Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - (hasCSN ? sizeof(CommitSeqNo) : 0); - char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0); + Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - SizeOfXLOGCSN(hasCSN); + char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN); uint32 toastLen = 0; char *toastPtr = UpdateOldTupleCalc(isInplaceUpdate, r, &dataOld, &tuplelenOld, &toastLen); bool hasToast = false; @@ -1737,8 +1737,8 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Assert(XLogRecGetDataLen(r) > (heapDeleteSize + SizeOfHeapHeader)); change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder, datalen); - DecodeXLogTuple((char *)xlrec + heapDeleteSize + (hasCSN ? sizeof(CommitSeqNo) : 0), - datalen - (hasCSN ? sizeof(CommitSeqNo) : 0), change->data.tp.oldtuple, true); + DecodeXLogTuple((char *)xlrec + heapDeleteSize + SizeOfXLOGCSN(hasCSN), + datalen - SizeOfXLOGCSN(hasCSN), change->data.tp.oldtuple, true); } change->data.tp.snapshotcsn = curCSN; change->data.tp.clear_toast_afterwards = true; @@ -1785,8 +1785,8 @@ static void AreaDecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Assert(XLogRecGetDataLen(r) > (heapDeleteSize + SizeOfHeapHeader)); change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder, datalen); - DecodeXLogTuple((char *)xlrec + heapDeleteSize + (hasCSN ? sizeof(CommitSeqNo) : 0), - datalen - (hasCSN ? sizeof(CommitSeqNo) : 0), change->data.tp.oldtuple, true); + DecodeXLogTuple((char *)xlrec + heapDeleteSize + SizeOfXLOGCSN(hasCSN), + datalen - SizeOfXLOGCSN(hasCSN), change->data.tp.oldtuple, true); } change->data.tp.snapshotcsn = curCSN; change->data.tp.clear_toast_afterwards = true; @@ -1819,9 +1819,9 @@ static void DecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) { return; } - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0)); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN)); bool hasToast = (xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0; - Size datalen = XLogRecGetDataLen(r) - SizeOfUHeapDelete - SizeOfXLUndoHeader - (hasCSN ? sizeof(CommitSeqNo) : 0); + Size datalen = XLogRecGetDataLen(r) - SizeOfUHeapDelete - SizeOfXLUndoHeader - SizeOfXLOGCSN(hasCSN); Size addLen = 0; uint32 toastLen = 0; UpdateUndoBody(&addLen, (char *)xlundohdr + SizeOfXLUndoHeader, xlundohdr->flag, &toastLen); @@ -1829,13 +1829,13 @@ static void DecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (toastLen > 0) { toastData = (char *)palloc0(toastLen); errno_t rc = memcpy_s(toastData, toastLen, - (char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + SizeOfXLUndoHeader + addLen, + (char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen, toastLen); securec_check(rc, "\0", "\0"); } addLen += toastLen; - Size metaLen = DecodeUndoMeta((char*)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + + Size metaLen = DecodeUndoMeta((char*)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen); addLen += metaLen; if (toastLen == 0 && (datalen == 0 || !AllocSizeIsValid(datalen))) { @@ -1850,7 +1850,7 @@ static void DecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) securec_check(rc, "\0", "\0"); char *dataold = - (char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + SizeOfXLUndoHeader + addLen; + (char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen; if (!hasToast) { change->data.utp.oldtuple = ReorderBufferGetUTupleBuf(ctx->reorder, datalen -addLen); DecodeXLogTuple(dataold, datalen - addLen, (ReorderBufferTupleBuf *)change->data.utp.oldtuple, false); @@ -1884,9 +1884,9 @@ static void AreaDecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) { return; } - XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0)); + XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN)); bool hasToast = (xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0; - Size datalen = XLogRecGetDataLen(r) - SizeOfUHeapDelete - SizeOfXLUndoHeader - (hasCSN ? sizeof(CommitSeqNo) : 0); + Size datalen = XLogRecGetDataLen(r) - SizeOfUHeapDelete - SizeOfXLUndoHeader - SizeOfXLOGCSN(hasCSN); Size addLen = 0; uint32 toastLen = 0; UpdateUndoBody(&addLen, (char *)xlundohdr + SizeOfXLUndoHeader, xlundohdr->flag, &toastLen); @@ -1894,13 +1894,13 @@ static void AreaDecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf if (toastLen > 0) { toastData = (char *)palloc0(toastLen); errno_t rc = memcpy_s(toastData, toastLen, - (char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + SizeOfXLUndoHeader + addLen, + (char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen, toastLen); securec_check(rc, "\0", "\0"); } addLen += toastLen; - Size metaLen = DecodeUndoMeta((char*)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + + Size metaLen = DecodeUndoMeta((char*)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen); addLen += metaLen; @@ -1911,7 +1911,7 @@ static void AreaDecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf securec_check(rc, "\0", "\0"); char *dataold = - (char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + SizeOfXLUndoHeader + addLen; + (char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen; change->data.utp.oldtuple = ReorderBufferGetUTupleBuf(ctx->reorder, datalen); if (!hasToast) { diff --git a/src/include/access/ustore/knl_uextremeredo.h b/src/include/access/ustore/knl_uextremeredo.h index 1d42eaf3f..af754caa0 100644 --- a/src/include/access/ustore/knl_uextremeredo.h +++ b/src/include/access/ustore/knl_uextremeredo.h @@ -44,7 +44,7 @@ void UHeapRedoDataBlock(XLogBlockHead *blockhead, XLogBlockDataParse *blockdatar void UHeap2RedoDataBlock(XLogBlockHead *blockhead, XLogBlockDataParse *blockdatarec, RedoBufferInfo *bufferinfo); void UHeapXlogInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool isinit, bool istoast, void *blkdata, - Size datalen, TransactionId recxid, Size *freespace); + Size datalen, TransactionId recxid, Size *freespace, bool isHasCSN); void RedoUHeapUndoBlock(XLogBlockHead *blockhead, XLogBlockUndoParse *blockdatarec, RedoBufferInfo *bufferinfo); void RedoUndoBlock(XLogBlockHead *blockhead, XLogBlockUndoParse *blockdatarec, RedoBufferInfo *bufferinfo); diff --git a/src/include/access/ustore/knl_uredo.h b/src/include/access/ustore/knl_uredo.h index ccfaff70a..36af9a63d 100644 --- a/src/include/access/ustore/knl_uredo.h +++ b/src/include/access/ustore/knl_uredo.h @@ -89,6 +89,7 @@ typedef struct XlUndoHeader { } XlUndoHeader; #define SizeOfXLUndoHeader (offsetof(XlUndoHeader, flag) + sizeof(uint8)) // 13 Bytes +#define SizeOfXLOGCSN(flag) (flag ? sizeof(CommitSeqNo) : 0) struct XlUndoHeaderExtra { UndoRecPtr blkprev; diff --git a/src/include/access/xlogproc.h b/src/include/access/xlogproc.h index 02656a94c..285376d9a 100755 --- a/src/include/access/xlogproc.h +++ b/src/include/access/xlogproc.h @@ -511,6 +511,7 @@ typedef struct { uint2 opt; bool is_conflict_type; /* whether wal log type is conflict with standby read if redo */ XLogPhyBlock pblk; + bool hasCSN; } XLogBlockHead; #define XLogBlockHeadEncodeSize (sizeof(XLogBlockHead)) diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 3fe5e5dbc..dc15269ac 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -85,6 +85,7 @@ extern bool DecodeXLogRecord(XLogReaderState* state, XLogRecord* record, char** #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) #define XLogRecHasBlockRef(decoder, block_id) ((decoder)->blocks[block_id].in_use) #define XLogRecHasBlockImage(decoder, block_id) ((decoder)->blocks[block_id].has_image) +#define XLogRecHasCSN(decoder) ((decoder)->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN; extern void RestoreBlockImage(const char* bkp_image, uint16 hole_offset, uint16 hole_length, char* page); extern char* XLogRecGetBlockData(XLogReaderState* record, uint8 block_id, Size* len);