diff --git a/src/gausskernel/storage/replication/logical/decode.cpp b/src/gausskernel/storage/replication/logical/decode.cpp index 87edfdeab..abfd769ed 100644 --- a/src/gausskernel/storage/replication/logical/decode.cpp +++ b/src/gausskernel/storage/replication/logical/decode.cpp @@ -1750,7 +1750,11 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.snapshotcsn = curCSN; change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx, XLogRecGetXid(r), buf->origptr, change); + if (xlrec->flags & XLH_DELETE_IS_SUPER) { + ReorderBufferRemoveChangeForUpsert(ctx, XLogRecGetXid(r), buf->origptr); + } else { + ReorderBufferQueueChange(ctx, XLogRecGetXid(r), buf->origptr, change); + } } /* diff --git a/src/gausskernel/storage/replication/logical/reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/reorderbuffer.cpp index 326a505c8..123bb3b35 100644 --- a/src/gausskernel/storage/replication/logical/reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/reorderbuffer.cpp @@ -645,6 +645,38 @@ void ReorderBufferQueueChange(LogicalDecodingContext *ctx, TransactionId xid, XL ReorderBufferCheckSerializeTXN(ctx, txn); } +/* + * Special handling specifically for XLH_DELETE_IS_SUPER + * + * XLH_DELETE_IS_SUPER records are generated during the execution of an UPSERT statement. + * Specifically, after performing an INSERT, a conflict is detected with a concurrently + * committed transaction, leading to a DELETE and a UPDATE. + * + * Such a UPSERT with INSERT, DELETE and UPDATE cannot be logical decoded normally, because + * it would be unexecutable while INSERT might cause a key-conflict error. So when we meet + * such a delete record, we remove the last INSERT change and don't add DELETE change. + */ +void ReorderBufferRemoveChangeForUpsert(LogicalDecodingContext *ctx, TransactionId xid, XLogRecPtr lsn) +{ + ReorderBufferTXN *txn = NULL; + + txn = ReorderBufferTXNByXid(ctx->reorder, xid, true, NULL, lsn, true); + + /* there must be at least one entry */ + Assert(txn->nentries > 0); + + dlist_node *node = dlist_tail_node(&txn->changes); + ReorderBufferChange* entry = dlist_container(ReorderBufferChange, node, node); + + /* last entry must be INSERT */ + Assert(entry->action == REORDER_BUFFER_CHANGE_INSERT); + + dlist_delete(node); + txn->nentries--; + txn->nentries_mem--; + pfree(entry); +} + /* * A transactional DDL message is queued to be processed upon commit. */ diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index fa802d617..8ebbbeef1 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -505,6 +505,7 @@ void ReorderBufferReturnChange(ReorderBuffer*, ReorderBufferChange*); ReorderBufferUTupleBuf *ReorderBufferGetUTupleBuf(ReorderBuffer*, Size tuple_len); void ReorderBufferQueueChange(LogicalDecodingContext*, TransactionId, XLogRecPtr lsn, ReorderBufferChange*); +void ReorderBufferRemoveChangeForUpsert(LogicalDecodingContext *ctx, TransactionId xid, XLogRecPtr lsn); void ReorderBufferCommit(ReorderBuffer*, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, RepOriginId origin_id, XLogRecPtr origin_lsn, CommitSeqNo csn, TimestampTz commit_time); void ReorderBufferAssignChild(ReorderBuffer*, TransactionId, TransactionId, XLogRecPtr commit_lsn);