merge fixUpsert600 into 6.0.0

【回合6.0.0】修复upsert在冲突场景下逻辑解码的问题

Created-by: chenxiaobin19
Commit-by: chenxiaobin19
Merged-by: opengauss_bot
Description: 【标题】(请简要描述下实现的内容)

【实现内容】:
见:https://gitcode.com/opengauss/openGauss-server/pull/7788
【根因分析】:

【实现方案】:

【关联需求或issue】:
https://gitcode.com/opengauss/openGauss-server/issues/7065
【开发自验报告】:
1. 请附上自验结果(内容或者截图)
2. 是否可以添加fastcheck测试用例,如是,请补充fastcheck用例
3. 是否涉及资料修改,如是,在docs仓库补充资料
4. 是否考虑升级场景(系统表修改、日志持久化以及修改执行态数据格式)
5. 是否考虑在线扩容等扩展场景
6. 是否考虑异常场景/并发场景/前向兼容/性能场景
7. 是否对其他模块产生影响

【其他说明】:

See merge request: opengauss/openGauss-server!7793
This commit is contained in:
opengauss_bot
2025-05-29 19:59:52 +08:00
3 changed files with 38 additions and 1 deletions

View File

@ -1750,7 +1750,11 @@ static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->data.tp.snapshotcsn = curCSN;
change->data.tp.clear_toast_afterwards = true;
ReorderBufferQueueChange(ctx, XLogRecGetXid(r), buf->origptr, change);
if (xlrec->flags & XLH_DELETE_IS_SUPER) {
ReorderBufferRemoveChangeForUpsert(ctx, XLogRecGetXid(r), buf->origptr);
} else {
ReorderBufferQueueChange(ctx, XLogRecGetXid(r), buf->origptr, change);
}
}
/*

View File

@ -645,6 +645,38 @@ void ReorderBufferQueueChange(LogicalDecodingContext *ctx, TransactionId xid, XL
ReorderBufferCheckSerializeTXN(ctx, txn);
}
/*
* Special handling specifically for XLH_DELETE_IS_SUPER
*
* XLH_DELETE_IS_SUPER records are generated during the execution of an UPSERT statement.
* Specifically, after performing an INSERT, a conflict is detected with a concurrently
* committed transaction, leading to a DELETE and a UPDATE.
*
* Such a UPSERT with INSERT, DELETE and UPDATE cannot be logical decoded normally, because
* it would be unexecutable while INSERT might cause a key-conflict error. So when we meet
* such a delete record, we remove the last INSERT change and don't add DELETE change.
*/
void ReorderBufferRemoveChangeForUpsert(LogicalDecodingContext *ctx, TransactionId xid, XLogRecPtr lsn)
{
ReorderBufferTXN *txn = NULL;
txn = ReorderBufferTXNByXid(ctx->reorder, xid, true, NULL, lsn, true);
/* there must be at least one entry */
Assert(txn->nentries > 0);
dlist_node *node = dlist_tail_node(&txn->changes);
ReorderBufferChange* entry = dlist_container(ReorderBufferChange, node, node);
/* last entry must be INSERT */
Assert(entry->action == REORDER_BUFFER_CHANGE_INSERT);
dlist_delete(node);
txn->nentries--;
txn->nentries_mem--;
pfree(entry);
}
/*
* A transactional DDL message is queued to be processed upon commit.
*/

View File

@ -505,6 +505,7 @@ void ReorderBufferReturnChange(ReorderBuffer*, ReorderBufferChange*);
ReorderBufferUTupleBuf *ReorderBufferGetUTupleBuf(ReorderBuffer*, Size tuple_len);
void ReorderBufferQueueChange(LogicalDecodingContext*, TransactionId, XLogRecPtr lsn, ReorderBufferChange*);
void ReorderBufferRemoveChangeForUpsert(LogicalDecodingContext *ctx, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferCommit(ReorderBuffer*, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
RepOriginId origin_id, XLogRecPtr origin_lsn, CommitSeqNo csn, TimestampTz commit_time);
void ReorderBufferAssignChild(ReorderBuffer*, TransactionId, TransactionId, XLogRecPtr commit_lsn);