!5847 修复逻辑复制场景下极致RTO回放过程中字段没有对齐的问题

Merge pull request !5847 from wofanzheng/master
This commit is contained in:
opengauss_bot
2024-07-24 08:47:53 +00:00
committed by Gitee
10 changed files with 104 additions and 73 deletions

View File

@ -432,6 +432,7 @@ void XLogRecSetBlockDataState(XLogReaderState *record, uint32 blockid, XLogRecPa
XLogRecSetBlockDataStateContent(record, blockid, blockdatarec);
recordblockstate->blockparse.blockhead.is_conflict_type = is_conflict_type;
recordblockstate->blockparse.blockhead.hasCSN = XLogRecHasCSN(record);
}
void XLogRecSetAuxiBlkNumState(XLogBlockDataParse *blockdatarec, BlockNumber auxilaryblkn1, BlockNumber auxilaryblkn2)
@ -505,6 +506,16 @@ void GetXlUndoHeaderExtraData(char **currLogPtr, XlUndoHeaderExtra *xlundohdrext
} else {
xlundohdrextra->partitionOid = 0;
}
if ((flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0) {
*currLogPtr += sizeof(TransactionId);
xlundohdrextra->size += sizeof(TransactionId);
}
if ((flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0) {
uint32 toastLen = *(uint32 *)(*currLogPtr);
*currLogPtr += sizeof(uint32) + toastLen;
xlundohdrextra->size += sizeof(uint32) + toastLen;
}
}
/* Set uheap undo insert block state for xlog record */
@ -512,7 +523,8 @@ RelFileNode XLogRecSetUHeapUndoInsertBlockState(XLogReaderState *record,
XlUHeapInsert *xlrec, insertUndoParse *parseBlock, DecodedBkpBlock *decodebkp)
{
RelFileNode rnode;
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert);
bool hasCSN = XLogRecHasCSN(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
GetXlUndoHeaderExtraData(&currLogPtr, &parseBlock->xlundohdrextra, xlundohdr->flag);
@ -539,7 +551,8 @@ RelFileNode XLogRecSetUHeapUndoDeleteBlockState(XLogReaderState *record,
RelFileNode rnode;
Size recordlen = XLogRecGetDataLen(record);
deleteUndoParse *parseBlock = &blockundo->deleteUndoParse;
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete);
bool hasCSN = XLogRecHasCSN(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
GetXlUndoHeaderExtraData(&currLogPtr, &parseBlock->xlundohdrextra, xlundohdr->flag);
@ -550,7 +563,7 @@ RelFileNode XLogRecSetUHeapUndoDeleteBlockState(XLogReaderState *record,
blockundo->maindata = (char *)currLogPtr;
blockundo->recordlen = recordlen - SizeOfUHeapDelete -
SizeOfXLUndoHeader - parseBlock->xlundohdrextra.size -
undoMetaSize - SizeOfUHeapHeader;
undoMetaSize - SizeOfUHeapHeader - SizeOfXLOGCSN(hasCSN);
parseBlock->recxid = XLogRecGetXid(record);
parseBlock->offnum = xlrec->offnum;
@ -574,7 +587,8 @@ RelFileNode XLogRecSetUHeapUndoUpdateBlockState(XLogReaderState *record,
RelFileNode rnode;
Size recordlen = XLogRecGetDataLen(record);
updateUndoParse *parseBlock = &blockundo->updateUndoParse;
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate);
bool hasCSN = XLogRecHasCSN(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN));
XlUndoHeader *xlnewundohdr;
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
@ -606,7 +620,7 @@ RelFileNode XLogRecSetUHeapUndoUpdateBlockState(XLogReaderState *record,
blockundo->recordlen = recordlen - SizeOfUHeapUpdate -
SizeOfXLUndoHeader - parseBlock->xlundohdrextra.size -
SizeOfXLUndoHeader - parseBlock->xlnewundohdrextra.size -
undoMetaSize - initPageXtraInfo - SizeOfUHeapHeader;
undoMetaSize - initPageXtraInfo - SizeOfUHeapHeader - SizeOfXLOGCSN(hasCSN);
} else {
int *undoXorDeltaSizePtr = (int *)currLogPtr;
parseBlock->undoXorDeltaSize = *undoXorDeltaSizePtr;
@ -651,6 +665,9 @@ RelFileNode XLogRecSetUHeapUndoMultiInsertBlockState(XLogReaderState *record,
currLogPtr += sizeof(TransactionId) + sizeof(uint16);
}
bool hasCSN = XLogRecHasCSN(record);
currLogPtr = currLogPtr + SizeOfXLOGCSN(hasCSN);
blockundo->maindata = (char *)currLogPtr;
xlrec = (XlUHeapMultiInsert *)((char *)currLogPtr);

View File

@ -115,7 +115,7 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record)
Size blkDataLen = 0;
XlUHeapInsert *xlrec = (XlUHeapInsert *)rec;
XlUHeapHeader *uheapHeader = (XlUHeapHeader *) XLogRecGetBlockData(record, 0, &blkDataLen);
hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
hasCSN = XLogRecHasCSN(record);
bool isInit = (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) != 0;
if (isInit) {
appendStringInfo(buf, "XLOG_UHEAP_INSERT insert(init): ");
@ -128,7 +128,7 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record)
appendStringInfo(buf, "TupInfo: ");
appendStringInfo(buf, "tupoffset %u, flag %u. ", (uint16)xlrec->offnum, (uint8)xlrec->flags);
XlUndoHeader *xlundohdr =
(XlUndoHeader *)((char *)rec + SizeOfUHeapInsert + (hasCSN ? sizeof(CommitSeqNo) : 0));
(XlUndoHeader *)((char *)rec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN));
currLogPtr = GetUndoHeader(xlundohdr, &partitionOid, &blkprev, &prevUrp, &subXid, &toastLen);
appendStringInfo(buf, "UndoInfo: ");
appendStringInfo(buf,
@ -173,9 +173,9 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record)
curxlogptr += sizeof(uint16);
}
hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
hasCSN = XLogRecHasCSN(record);
XlUHeapMultiInsert *xlrec =
(XlUHeapMultiInsert *)((char *)curxlogptr + (hasCSN ? sizeof(CommitSeqNo) : 0));
(XlUHeapMultiInsert *)((char *)curxlogptr + SizeOfXLOGCSN(hasCSN));
curxlogptr = (char *)xlrec + SizeOfUHeapMultiInsert;
int nranges = *(int *)curxlogptr;
@ -206,14 +206,14 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record)
}
case XLOG_UHEAP_DELETE: {
XlUHeapDelete *xlrec = (XlUHeapDelete *)rec;
hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
hasCSN = XLogRecHasCSN(record);
appendStringInfo(buf, "XLOG_UHEAP_DELETE: ");
appendStringInfo(buf, "TupInfo: ");
appendStringInfo(buf, "oldxid %lu, tupoffset %u, td_id %u, flag %u. ", xlrec->oldxid,
(uint16)xlrec->offnum, (uint8)xlrec->td_id, (uint8)xlrec->flag);
XlUndoHeader *xlundohdr =
(XlUndoHeader *)((char *)rec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0));
(XlUndoHeader *)((char *)rec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN));
currLogPtr = GetUndoHeader(xlundohdr, &partitionOid, &blkprev, &prevUrp, &subXid,
&toastLen);
appendStringInfo(buf, "UndoInfo: ");
@ -235,7 +235,7 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record)
errno_t rc;
char *recdata = XLogRecGetBlockData(record, 0, &datalen);
char *recdataEnd = recdata + datalen;
hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
hasCSN = XLogRecHasCSN(record);
XlUndoHeader *xlundohdr = NULL;
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)rec;
@ -245,7 +245,7 @@ void UHeapDesc(StringInfo buf, XLogReaderState *record)
"oldxid %lu, old tupoffset %u, new tupoffset %u, old_tuple_td_id %u, old_tuple_flag %u. ",
xlrec->oldxid, (uint16)xlrec->old_offnum, (uint16)xlrec->new_offnum,
(uint8)xlrec->old_tuple_td_id, (uint16)xlrec->old_tuple_flag);
xlundohdr = (XlUndoHeader *)((char *)rec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0));
xlundohdr = (XlUndoHeader *)((char *)rec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN));
currLogPtr = GetUndoHeader(xlundohdr, &partitionOid, &blkprev, &prevUrp, &subXid, &toastLen);
appendStringInfo(buf, "UndoInfo(oldpage): ");
appendStringInfo(buf,

View File

@ -1458,7 +1458,7 @@ static void skipUndoRecBody(char **currLogPtr, XlUndoHeader *xlundohdr)
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0) {
uint32 toastLen = *(uint32 *)(*currLogPtr);
*currLogPtr += sizeof(toastLen) + toastLen;
*currLogPtr += sizeof(uint32) + toastLen;
}
}
@ -1468,9 +1468,9 @@ static void TrackUheapInsert(XLogReaderState *record)
BlockNumber undoStartBlk;
UndoSlotPtr slotPtr;
RelFileNode rnode;
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + (hasCSN ? sizeof(CommitSeqNo) : 0));
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
skipUndoRecBody(&currLogPtr, xlundohdr);
@ -1500,9 +1500,9 @@ static void TrackUheapDelete(XLogReaderState *record)
BlockNumber undoStartBlk;
UndoSlotPtr slotPtr;
RelFileNode rnode;
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0));
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
skipUndoRecBody(&currLogPtr, xlundohdr);
@ -1533,9 +1533,9 @@ static void TrackUheapUpdate(XLogReaderState *record)
BlockNumber undoStartBlk;
UndoSlotPtr slotPtr;
RelFileNode rnode;
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0));
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
skipUndoRecBody(&currLogPtr, xlundohdr);
@ -1582,7 +1582,7 @@ static void TrackUheapMultiInsert(XLogReaderState *record)
int nranges;
UndoRecPtr *urpvec = NULL;
bool isinit = (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) != 0;
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
bool hasCSN = XLogRecHasCSN(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)XLogRecGetData(record);
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);

View File

@ -499,11 +499,17 @@ static char *ReachXlUndoHeaderEnd(XlUndoHeader *xlundohdr)
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
currLogPtr += sizeof(Oid);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0) {
currLogPtr += sizeof(TransactionId);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST)) {
currLogPtr += sizeof(uint32) + *(uint32 *)((char *)currLogPtr);
}
return currLogPtr;
}
void UHeapXlogInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool isinit, bool istoast, void *blkdata,
Size datalen, TransactionId recxid, Size *freespace)
Size datalen, TransactionId recxid, Size *freespace, bool isHasCSN)
{
char *data = (char *)blkdata;
Page page = buffer->pageinfo.page;
@ -520,7 +526,7 @@ void UHeapXlogInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool
UHeapDiskTuple utup = GetUHeapDiskTupleFromRedoData(data, &newlen, tbuf);
undo::XlogUndoMeta undometa;
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(isHasCSN));
char *currLogPtr = ReachXlUndoHeaderEnd(xlundohdr);
XlogUndoMeta *xlundometa = (XlogUndoMeta *)((char *)currLogPtr);
UndoRecPtr urecptr = xlundohdr->urecptr;
@ -562,7 +568,7 @@ void UHeapXlogInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool
PageSetLSN(page, buffer->lsn);
}
void UHeapXlogDeleteOperatorPage(RedoBufferInfo *buffer, void *recorddata, Size recordlen, TransactionId recxid)
void UHeapXlogDeleteOperatorPage(RedoBufferInfo *buffer, void *recorddata, Size recordlen, TransactionId recxid, bool isHasCSN)
{
Page page = buffer->pageinfo.page;
TupleBuffer tbuf;
@ -576,7 +582,7 @@ void UHeapXlogDeleteOperatorPage(RedoBufferInfo *buffer, void *recorddata, Size
UHeapTupleData utup;
undo::XlogUndoMeta undometa;
RowPtr *rp;
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(isHasCSN));
char *currLogPtr = ReachXlUndoHeaderEnd(xlundohdr);
Size shiftSize = currLogPtr - (char *)xlrec;
XlogUndoMeta *xlundometa = (XlogUndoMeta *)((char *)currLogPtr);
@ -629,13 +635,13 @@ void UHeapXlogDeleteOperatorPage(RedoBufferInfo *buffer, void *recorddata, Size
void UHeapXlogUpdateOperatorOldpage(UpdateRedoBuffers* buffers, void *recorddata,
bool inplaceUpdate, bool blockInplaceUpdate, UHeapTupleData *oldtup, bool sameBlock,
BlockNumber blk, TransactionId recordxid)
BlockNumber blk, TransactionId recordxid, bool isHasCSN)
{
XLogRecPtr lsn = buffers->oldbuffer.lsn;
Page oldpage = buffers->oldbuffer.pageinfo.page;
Pointer recData = (Pointer)recorddata;
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)recData;
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(isHasCSN));
UndoRecPtr urecptr = xlundohdr->urecptr;
Buffer oldbuf = buffers->oldbuffer.buf;
RowPtr *rp = NULL;
@ -679,7 +685,7 @@ void UHeapXlogUpdateOperatorOldpage(UpdateRedoBuffers* buffers, void *recorddata
Size UHeapXlogUpdateOperatorNewpage(UpdateRedoBuffers* buffers, void *recorddata,
bool inplaceUpdate, bool blockInplaceUpdate, void *blkdata, UHeapTupleData *oldtup,
Size recordlen, Size data_len, bool isinit, bool istoast, bool sameBlock,
TransactionId recordxid, UpdateRedoAffixLens *affixLens)
TransactionId recordxid, UpdateRedoAffixLens *affixLens, bool isHasCSN)
{
XLogRecPtr lsn = buffers->newbuffer.lsn;
TupleBuffer tbuf;
@ -691,7 +697,7 @@ Size UHeapXlogUpdateOperatorNewpage(UpdateRedoBuffers* buffers, void *recorddata
Pointer recData = (Pointer)recorddata;
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)recData;
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(isHasCSN));
char *curxlogptr = ReachXlUndoHeaderEnd(xlundohdr);
UndoRecPtr urecptr = xlundohdr->urecptr;
errno_t rc = EOK;
@ -1024,7 +1030,7 @@ static UHeapDiskTuple GetUHeapDiskTupleFromMultiInsertRedoData(char **data, int
}
void UHeapXlogMultiInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool isinit, bool istoast,
void *blkdata, Size datalen, TransactionId recxid, Size *freespace)
void *blkdata, Size datalen, TransactionId recxid, Size *freespace, bool isHasCSN)
{
char *data = (char *)blkdata;
Page page = buffer->pageinfo.page;
@ -1068,6 +1074,7 @@ void UHeapXlogMultiInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata,
uheappage->pd_multi_base = 0;
}
curxlogptr = curxlogptr + SizeOfXLOGCSN(isHasCSN);
xlrec = (XlUHeapMultiInsert *)((char *)curxlogptr);
curxlogptr = (char *)xlrec + SizeOfUHeapMultiInsert;
UndoRecPtr *urpvec = NULL;
@ -1309,6 +1316,7 @@ static void UHeapXlogInsertBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b
{
bool isinit = (XLogBlockHeadGetInfo(blockhead) & XLOG_UHEAP_INIT_PAGE) != 0;
bool istoast = (XLogBlockHeadGetInfo(blockhead) & XLOG_UHEAP_INIT_TOAST_PAGE) != 0;
bool hasCSN = blockhead->hasCSN;
TransactionId recordxid = XLogBlockHeadGetXid(blockhead);
XLogBlockDataParse *datadecode = blockdatarec;
XLogRedoAction action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo);
@ -1320,7 +1328,7 @@ static void UHeapXlogInsertBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b
blkdata = XLogBlockDataGetBlockData(datadecode, &blkdatalen);
Assert(blkdata != NULL);
UHeapXlogInsertOperatorPage(bufferinfo, maindata, isinit, istoast, (void *)blkdata, blkdatalen, recordxid,
NULL);
NULL, hasCSN);
MakeRedoBufferDirty(bufferinfo);
}
}
@ -1330,12 +1338,13 @@ static void UHeapXlogDeleteBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b
TransactionId recordxid = XLogBlockHeadGetXid(blockhead);
XLogBlockDataParse *datadecode = blockdatarec;
XLogRedoAction action;
bool hasCSN = blockhead->hasCSN;
action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo);
if (action == BLK_NEEDS_REDO) {
char *maindata = XLogBlockDataGetMainData(datadecode, NULL);
Size recordlen = datadecode->main_data_len;
UHeapXlogDeleteOperatorPage(bufferinfo, (void *)maindata, recordlen, recordxid);
UHeapXlogDeleteOperatorPage(bufferinfo, (void *)maindata, recordlen, recordxid, hasCSN);
MakeRedoBufferDirty(bufferinfo);
}
}
@ -1356,6 +1365,7 @@ static void UHeapXlogUpdateBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b
UpdateRedoAffixLens affixLens = {0, 0};
UHeapTupleData oldtup;
Size freespace = 0;
bool hasCSN = blockhead->hasCSN;
action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo);
if (action == BLK_NEEDS_REDO) {
if (XLogBlockDataGetBlockId(datadecode) == UHEAP_UPDATE_NEW_BLOCK_NUM) {
@ -1370,7 +1380,7 @@ static void UHeapXlogUpdateBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b
buffers.oldbuffer.lsn = buffers.newbuffer.lsn;
UHeapXlogUpdateOperatorOldpage(&buffers, (void *)maindata, inplaceUpdate,
blockInplaceUpdate, &oldtup, sameBlock, oldblk, recordxid);
blockInplaceUpdate, &oldtup, sameBlock, oldblk, recordxid, hasCSN);
}
blkdata = XLogBlockDataGetBlockData(datadecode, &blkdatalen);
@ -1379,7 +1389,7 @@ static void UHeapXlogUpdateBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b
Size dataLen = datadecode->blockdata.data_len;
freespace = UHeapXlogUpdateOperatorNewpage(&buffers, (void *)maindata, inplaceUpdate,
blockInplaceUpdate, (void *)blkdata, &oldtup, recordlen, dataLen, isinit,
istoast, sameBlock, recordxid, &affixLens);
istoast, sameBlock, recordxid, &affixLens, hasCSN);
/* may should free space */
if (!inplaceUpdate && freespace < BLCKSZ / FREESPACE_FRACTION) {
RelFileNode rnode;
@ -1400,7 +1410,7 @@ static void UHeapXlogUpdateBlock(XLogBlockHead *blockhead, XLogBlockDataParse *b
}
UHeapXlogUpdateOperatorOldpage(&buffers, (void *)maindata, inplaceUpdate,
blockInplaceUpdate, &oldtup, sameBlock, newblk, recordxid);
blockInplaceUpdate, &oldtup, sameBlock, newblk, recordxid, hasCSN);
}
MakeRedoBufferDirty(bufferinfo);
@ -1415,6 +1425,7 @@ static void UHeapXlogMultiInsertBlock(XLogBlockHead *blockhead, XLogBlockDataPar
TransactionId recordxid = XLogBlockHeadGetXid(blockhead);
XLogBlockDataParse *datadecode = blockdatarec;
XLogRedoAction action;
bool hasCSN = blockhead->hasCSN;
action = XLogCheckBlockDataRedoAction(datadecode, bufferinfo);
if (action == BLK_NEEDS_REDO) {
@ -1425,7 +1436,7 @@ static void UHeapXlogMultiInsertBlock(XLogBlockHead *blockhead, XLogBlockDataPar
blkdata = XLogBlockDataGetBlockData(datadecode, &blkdatalen);
Assert(blkdata != NULL);
UHeapXlogMultiInsertOperatorPage(bufferinfo, maindata, isinit, istoast, (void *)blkdata, blkdatalen, recordxid,
NULL);
NULL, hasCSN);
MakeRedoBufferDirty(bufferinfo);
}
}

View File

@ -102,9 +102,9 @@ static UndoRecPtr PrepareAndInsertUndoRecordForInsertRedo(XLogReaderState *recor
UndoRecPtr invalidUrp = INVALID_UNDO_REC_PTR;
Oid invalidPartitionOid = 0;
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + (hasCSN ? sizeof(CommitSeqNo) : 0));
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
blkprev = (UndoRecPtr *) ((char *)currLogPtr);
@ -184,10 +184,10 @@ static XLogRedoAction GetInsertRedoAction(XLogReaderState *record, RedoBufferInf
XLogRedoAction action;
if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) {
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
XlUndoHeader *xlundohdr =
(XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + (hasCSN ? sizeof(CommitSeqNo) : 0));
(XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN));
TransactionId *xidBase = (TransactionId *)((char *)xlundohdr + SizeOfXLUndoHeader + skipSize);
uint16 *tdCount = (uint16 *)((char *)xidBase + sizeof(TransactionId));
@ -307,9 +307,9 @@ static UndoRecPtr PrepareAndInsertUndoRecordForDeleteRedo(XLogReaderState *recor
bool defaultHasSubXact = false;
uint32 readSize = 0;
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0));
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_SUB_XACT) != 0) {
hasSubXact = (bool *) ((char *)currLogPtr);
@ -361,9 +361,9 @@ static UndoRecPtr PrepareAndInsertUndoRecordForDeleteRedo(XLogReaderState *recor
* explicitly stored tuple.
*/
Size datalen = recordlen - SizeOfXLUndoHeader - SizeOfUHeapDelete - undoMetaSize - SizeOfUHeapHeader -
readSize - (hasCSN ? sizeof(CommitSeqNo) : 0);
readSize - SizeOfXLOGCSN(hasCSN);
char *data = (char *)xlrec + SizeOfUHeapDelete + SizeOfXLUndoHeader + undoMetaSize + readSize +
(hasCSN ? sizeof(CommitSeqNo) : 0);
SizeOfXLOGCSN(hasCSN);
utup->disk_tuple = GetUHeapDiskTupleFromRedoData(data, &datalen, tbuf);
utup->disk_tuple_size = datalen;
@ -817,9 +817,9 @@ static UndoRecPtr PrepareAndInsertUndoRecordForUpdateRedo(XLogReaderState *recor
bool inplaceUpdate = true;
char *xlogXorDelta = NULL;
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0));
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN));
UndoRecPtr urecptr = xlundohdr->urecptr;
char *curxlogptr = ((char *)xlundohdr) + SizeOfXLUndoHeader;
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_SUB_XACT) != 0) {
@ -914,7 +914,7 @@ static UndoRecPtr PrepareAndInsertUndoRecordForUpdateRedo(XLogReaderState *recor
char *data = (char *)curxlogptr;
Size datalen = recordlen - SizeOfUHeapHeader - SizeOfXLUndoHeader - SizeOfUHeapUpdate -
undoMetaSize - SizeOfXLUndoHeader - initPageXtraInfo - readSize - (hasCSN ? sizeof(CommitSeqNo) : 0);
undoMetaSize - SizeOfXLUndoHeader - initPageXtraInfo - readSize - SizeOfXLOGCSN(hasCSN);
oldtup->disk_tuple = GetUHeapDiskTupleFromRedoData(data, &datalen, tbuf);
oldtup->disk_tuple_size = datalen;
@ -1421,8 +1421,8 @@ static UndoRecPtr PrepareAndInsertUndoRecordForMultiInsertRedo(XLogReaderState *
curxlogptr += sizeof(uint16);
}
bool hasCSN = (record->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
curxlogptr = curxlogptr + (hasCSN ? sizeof(CommitSeqNo) : 0);
bool hasCSN = XLogRecHasCSN(record);
curxlogptr = curxlogptr + SizeOfXLOGCSN(hasCSN);
(*xlrec) = (XlUHeapMultiInsert *)curxlogptr;
curxlogptr = (char *)*xlrec + SizeOfUHeapMultiInsert;
@ -2104,7 +2104,7 @@ static TransactionId UHeapXlogGetCurrentXidInsert(XLogReaderState *record, bool
{
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert +
(hasCSN ? sizeof(CommitSeqNo) : 0));
SizeOfXLOGCSN(hasCSN));
bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0);
if (!hasCurrentXid) {
@ -2130,7 +2130,7 @@ static TransactionId UHeapXlogGetCurrentXidDelete(XLogReaderState *record, bool
{
XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete +
(hasCSN ? sizeof(CommitSeqNo) : 0));
SizeOfXLOGCSN(hasCSN));
bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0);
if (!hasCurrentXid) {
return XLogRecGetXid(record);
@ -2158,7 +2158,7 @@ static TransactionId UHeapXlogGetCurrentXidUpdate(XLogReaderState *record, bool
{
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate +
(hasCSN ? sizeof(CommitSeqNo) : 0));
SizeOfXLOGCSN(hasCSN));
bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0);
if (!hasCurrentXid) {

View File

@ -1368,7 +1368,7 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* adapt 64 xid, if this tuple is the first tuple of a new page */
bool is_init = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0;
/* caution, remaining data in record is not aligned */
char *data_old = (char *)xlrec + heapUpdateSize + (hasCSN ? sizeof(CommitSeqNo) : 0);
char *data_old = (char *)xlrec + heapUpdateSize + SizeOfXLOGCSN(hasCSN);
if (is_init) {
datalen_old = XLogRecGetDataLen(r) - heapUpdateSize - sizeof(TransactionId);
} else {
@ -1432,7 +1432,7 @@ static void AreaDecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* adapt 64 xid, if this tuple is the first tuple of a new page */
bool is_init = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0;
/* caution, remaining data in record is not aligned */
char *data_old = (char *)xlrec + heapUpdateSize + (hasCSN ? sizeof(CommitSeqNo) : 0);
char *data_old = (char *)xlrec + heapUpdateSize + SizeOfXLOGCSN(hasCSN);
if (is_init) {
datalen_old = XLogRecGetDataLen(r) - heapUpdateSize - sizeof(TransactionId);
} else {
@ -1566,8 +1566,8 @@ static void DecodeUUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
Size datalenNew = 0;
char *dataNew = XLogRecGetBlockData(r, 0, &datalenNew);
Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - (hasCSN ? sizeof(CommitSeqNo) : 0);
char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0);
Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - SizeOfXLOGCSN(hasCSN);
char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN);
uint32 toastLen = 0;
bool hasToast = false;
char *toastPtr = UpdateOldTupleCalc(isInplaceUpdate, r, &dataOld, &tuplelenOld, &toastLen);
@ -1634,8 +1634,8 @@ static void AreaDecodeUUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
curCSN = *(CommitSeqNo *)((char *)xlrec + SizeOfUHeapUpdate);
}
bool isInplaceUpdate = (xlrec->flags & XLZ_NON_INPLACE_UPDATE) == 0;
Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - (hasCSN ? sizeof(CommitSeqNo) : 0);
char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + (hasCSN ? sizeof(CommitSeqNo) : 0);
Size tuplelenOld = XLogRecGetDataLen(r) - SizeOfUHeapUpdate - SizeOfXLOGCSN(hasCSN);
char *dataOld = (char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN);
uint32 toastLen = 0;
char *toastPtr = UpdateOldTupleCalc(isInplaceUpdate, r, &dataOld, &tuplelenOld, &toastLen);
bool hasToast = false;
@ -1737,8 +1737,8 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
Assert(XLogRecGetDataLen(r) > (heapDeleteSize + SizeOfHeapHeader));
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple((char *)xlrec + heapDeleteSize + (hasCSN ? sizeof(CommitSeqNo) : 0),
datalen - (hasCSN ? sizeof(CommitSeqNo) : 0), change->data.tp.oldtuple, true);
DecodeXLogTuple((char *)xlrec + heapDeleteSize + SizeOfXLOGCSN(hasCSN),
datalen - SizeOfXLOGCSN(hasCSN), change->data.tp.oldtuple, true);
}
change->data.tp.snapshotcsn = curCSN;
change->data.tp.clear_toast_afterwards = true;
@ -1785,8 +1785,8 @@ static void AreaDecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
Assert(XLogRecGetDataLen(r) > (heapDeleteSize + SizeOfHeapHeader));
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple((char *)xlrec + heapDeleteSize + (hasCSN ? sizeof(CommitSeqNo) : 0),
datalen - (hasCSN ? sizeof(CommitSeqNo) : 0), change->data.tp.oldtuple, true);
DecodeXLogTuple((char *)xlrec + heapDeleteSize + SizeOfXLOGCSN(hasCSN),
datalen - SizeOfXLOGCSN(hasCSN), change->data.tp.oldtuple, true);
}
change->data.tp.snapshotcsn = curCSN;
change->data.tp.clear_toast_afterwards = true;
@ -1819,9 +1819,9 @@ static void DecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) {
return;
}
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0));
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN));
bool hasToast = (xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0;
Size datalen = XLogRecGetDataLen(r) - SizeOfUHeapDelete - SizeOfXLUndoHeader - (hasCSN ? sizeof(CommitSeqNo) : 0);
Size datalen = XLogRecGetDataLen(r) - SizeOfUHeapDelete - SizeOfXLUndoHeader - SizeOfXLOGCSN(hasCSN);
Size addLen = 0;
uint32 toastLen = 0;
UpdateUndoBody(&addLen, (char *)xlundohdr + SizeOfXLUndoHeader, xlundohdr->flag, &toastLen);
@ -1829,13 +1829,13 @@ static void DecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (toastLen > 0) {
toastData = (char *)palloc0(toastLen);
errno_t rc = memcpy_s(toastData, toastLen,
(char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + SizeOfXLUndoHeader + addLen,
(char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen,
toastLen);
securec_check(rc, "\0", "\0");
}
addLen += toastLen;
Size metaLen = DecodeUndoMeta((char*)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) +
Size metaLen = DecodeUndoMeta((char*)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) +
SizeOfXLUndoHeader + addLen);
addLen += metaLen;
if (toastLen == 0 && (datalen == 0 || !AllocSizeIsValid(datalen))) {
@ -1850,7 +1850,7 @@ static void DecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
securec_check(rc, "\0", "\0");
char *dataold =
(char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + SizeOfXLUndoHeader + addLen;
(char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen;
if (!hasToast) {
change->data.utp.oldtuple = ReorderBufferGetUTupleBuf(ctx->reorder, datalen -addLen);
DecodeXLogTuple(dataold, datalen - addLen, (ReorderBufferTupleBuf *)change->data.utp.oldtuple, false);
@ -1884,9 +1884,9 @@ static void AreaDecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) {
return;
}
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0));
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN));
bool hasToast = (xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0;
Size datalen = XLogRecGetDataLen(r) - SizeOfUHeapDelete - SizeOfXLUndoHeader - (hasCSN ? sizeof(CommitSeqNo) : 0);
Size datalen = XLogRecGetDataLen(r) - SizeOfUHeapDelete - SizeOfXLUndoHeader - SizeOfXLOGCSN(hasCSN);
Size addLen = 0;
uint32 toastLen = 0;
UpdateUndoBody(&addLen, (char *)xlundohdr + SizeOfXLUndoHeader, xlundohdr->flag, &toastLen);
@ -1894,13 +1894,13 @@ static void AreaDecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
if (toastLen > 0) {
toastData = (char *)palloc0(toastLen);
errno_t rc = memcpy_s(toastData, toastLen,
(char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + SizeOfXLUndoHeader + addLen,
(char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen,
toastLen);
securec_check(rc, "\0", "\0");
}
addLen += toastLen;
Size metaLen = DecodeUndoMeta((char*)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) +
Size metaLen = DecodeUndoMeta((char*)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) +
SizeOfXLUndoHeader + addLen);
addLen += metaLen;
@ -1911,7 +1911,7 @@ static void AreaDecodeUDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
securec_check(rc, "\0", "\0");
char *dataold =
(char *)xlrec + SizeOfUHeapDelete + (hasCSN ? sizeof(CommitSeqNo) : 0) + SizeOfXLUndoHeader + addLen;
(char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN) + SizeOfXLUndoHeader + addLen;
change->data.utp.oldtuple = ReorderBufferGetUTupleBuf(ctx->reorder, datalen);
if (!hasToast) {

View File

@ -44,7 +44,7 @@ void UHeapRedoDataBlock(XLogBlockHead *blockhead, XLogBlockDataParse *blockdatar
void UHeap2RedoDataBlock(XLogBlockHead *blockhead, XLogBlockDataParse *blockdatarec, RedoBufferInfo *bufferinfo);
void UHeapXlogInsertOperatorPage(RedoBufferInfo *buffer, void *recorddata, bool isinit, bool istoast, void *blkdata,
Size datalen, TransactionId recxid, Size *freespace);
Size datalen, TransactionId recxid, Size *freespace, bool isHasCSN);
void RedoUHeapUndoBlock(XLogBlockHead *blockhead, XLogBlockUndoParse *blockdatarec, RedoBufferInfo *bufferinfo);
void RedoUndoBlock(XLogBlockHead *blockhead, XLogBlockUndoParse *blockdatarec, RedoBufferInfo *bufferinfo);

View File

@ -89,6 +89,7 @@ typedef struct XlUndoHeader {
} XlUndoHeader;
#define SizeOfXLUndoHeader (offsetof(XlUndoHeader, flag) + sizeof(uint8)) // 13 Bytes
#define SizeOfXLOGCSN(flag) (flag ? sizeof(CommitSeqNo) : 0)
struct XlUndoHeaderExtra {
UndoRecPtr blkprev;

View File

@ -511,6 +511,7 @@ typedef struct {
uint2 opt;
bool is_conflict_type; /* whether wal log type is conflict with standby read if redo */
XLogPhyBlock pblk;
bool hasCSN;
} XLogBlockHead;
#define XLogBlockHeadEncodeSize (sizeof(XLogBlockHead))

View File

@ -85,6 +85,7 @@ extern bool DecodeXLogRecord(XLogReaderState* state, XLogRecord* record, char**
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
#define XLogRecHasBlockRef(decoder, block_id) ((decoder)->blocks[block_id].in_use)
#define XLogRecHasBlockImage(decoder, block_id) ((decoder)->blocks[block_id].has_image)
#define XLogRecHasCSN(decoder) ((decoder)->decoded_record->xl_term & XLOG_CONTAIN_CSN) == XLOG_CONTAIN_CSN;
extern void RestoreBlockImage(const char* bkp_image, uint16 hole_offset, uint16 hole_length, char* page);
extern char* XLogRecGetBlockData(XLogReaderState* record, uint8 block_id, Size* len);