Tuple lock fit logical deocde

This commit is contained in:
TotaJ
2021-12-20 10:06:57 +08:00
parent 974d7e8726
commit 3dfe9dee3f
3 changed files with 592 additions and 580 deletions

View File

@ -873,15 +873,21 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
Size datalen_old;
Size tuplelen_old;
char *data_old = NULL;
Size heapUpdateSize;
if ((XLogRecGetInfo(r) & XLOG_TUPLE_LOCK_UPGRADE_FLAG) == 0) {
heapUpdateSize = SizeOfOldHeapUpdate;
} else {
heapUpdateSize = SizeOfHeapUpdate;
}
/* adapt 64 xid, if this tuple is the first tuple of a new page */
bool is_init = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0;
/* caution, remaining data in record is not aligned */
data_old = (char *)xlrec + SizeOfHeapUpdate;
data_old = (char *)xlrec + heapUpdateSize;
if (is_init) {
datalen_old = XLogRecGetDataLen(r) - SizeOfHeapUpdate - sizeof(TransactionId);
datalen_old = XLogRecGetDataLen(r) - heapUpdateSize - sizeof(TransactionId);
} else {
datalen_old = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
datalen_old = XLogRecGetDataLen(r) - heapUpdateSize;
}
tuplelen_old = datalen_old - SizeOfHeapHeader;
if (tuplelen_old == 0 && !AllocSizeIsValid(tuplelen_old)) {
@ -1054,8 +1060,14 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferChange *change = NULL;
RelFileNode target_node;
int rc = 0;
xlrec = (xl_heap_delete *)GetXlrec(r);
Size heapDeleteSize;
if ((XLogRecGetInfo(r) & XLOG_TUPLE_LOCK_UPGRADE_FLAG) == 0) {
heapDeleteSize = SizeOfOldHeapDelete;
} else {
heapDeleteSize = SizeOfHeapDelete;
}
xlrec = (xl_heap_delete *)GetXlrec(r);
/* only interested in our database */
XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
if (target_node.dbNode != ctx->slot->data.database)
@ -1064,7 +1076,7 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
Size datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
Size datalen = XLogRecGetDataLen(r) - heapDeleteSize;
if (datalen == 0 && !AllocSizeIsValid(datalen)) {
ereport(WARNING, (errmsg("tuplelen is invalid(%lu), tuplelen, don't decode it", datalen)));
return;
@ -1077,10 +1089,10 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/* old primary key stored */
if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) {
Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
Assert(XLogRecGetDataLen(r) > (heapDeleteSize + SizeOfHeapHeader));
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder, datalen);
DecodeXLogTuple((char *)xlrec + SizeOfHeapDelete, datalen, change->data.tp.oldtuple);
DecodeXLogTuple((char *)xlrec + heapDeleteSize, datalen, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;