From 0f41674e9dfbbd7e731e9321932ca981b025b8ac Mon Sep 17 00:00:00 2001 From: TotaJ Date: Mon, 20 Dec 2021 10:06:57 +0800 Subject: [PATCH] Tuple lock fit logical deocde --- .../storage/replication/logical/decode.cpp | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/gausskernel/storage/replication/logical/decode.cpp b/src/gausskernel/storage/replication/logical/decode.cpp index ce9ed0789..095e654fd 100644 --- a/src/gausskernel/storage/replication/logical/decode.cpp +++ b/src/gausskernel/storage/replication/logical/decode.cpp @@ -873,15 +873,21 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Size datalen_old; Size tuplelen_old; char *data_old = NULL; + Size heapUpdateSize; + if ((XLogRecGetInfo(r) & XLOG_TUPLE_LOCK_UPGRADE_FLAG) == 0) { + heapUpdateSize = SizeOfOldHeapUpdate; + } else { + heapUpdateSize = SizeOfHeapUpdate; + } /* adapt 64 xid, if this tuple is the first tuple of a new page */ bool is_init = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0; /* caution, remaining data in record is not aligned */ - data_old = (char *)xlrec + SizeOfHeapUpdate; + data_old = (char *)xlrec + heapUpdateSize; if (is_init) { - datalen_old = XLogRecGetDataLen(r) - SizeOfHeapUpdate - sizeof(TransactionId); + datalen_old = XLogRecGetDataLen(r) - heapUpdateSize - sizeof(TransactionId); } else { - datalen_old = XLogRecGetDataLen(r) - SizeOfHeapUpdate; + datalen_old = XLogRecGetDataLen(r) - heapUpdateSize; } tuplelen_old = datalen_old - SizeOfHeapHeader; if (tuplelen_old == 0 && !AllocSizeIsValid(tuplelen_old)) { @@ -1054,8 +1060,14 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferChange *change = NULL; RelFileNode target_node; int rc = 0; - xlrec = (xl_heap_delete *)GetXlrec(r); + Size heapDeleteSize; + if ((XLogRecGetInfo(r) & XLOG_TUPLE_LOCK_UPGRADE_FLAG) == 0) { + heapDeleteSize = SizeOfOldHeapDelete; + } else { + heapDeleteSize = SizeOfHeapDelete; + } + xlrec = (xl_heap_delete *)GetXlrec(r); /* only interested in our database */ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); if (target_node.dbNode != ctx->slot->data.database) @@ -1064,7 +1076,7 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; - Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete; + Size datalen = XLogRecGetDataLen(r) - heapDeleteSize; if (datalen == 0 && !AllocSizeIsValid(datalen)) { ereport(WARNING, (errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", datalen))); return; @@ -1077,10 +1089,10 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* old primary key stored */ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) { - Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); + Assert(XLogRecGetDataLen(r) > (heapDeleteSize + SizeOfHeapHeader)); change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder, datalen); - DecodeXLogTuple((char *)xlrec + SizeOfHeapDelete, datalen, change->data.tp.oldtuple); + DecodeXLogTuple((char *)xlrec + heapDeleteSize, datalen, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true;