diff --git a/src/gausskernel/storage/access/transam/xloginsert.cpp b/src/gausskernel/storage/access/transam/xloginsert.cpp index df1f55f9d..022e9555a 100755 --- a/src/gausskernel/storage/access/transam/xloginsert.cpp +++ b/src/gausskernel/storage/access/transam/xloginsert.cpp @@ -957,8 +957,7 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogFPWInfo fpw_ *(scratch++) = XLR_BLOCK_ID_ORIGIN; remained_size--; if (istoast && t_thrd.proc->workingVersionNum >= PARALLEL_DECODE_VERSION_NUM && XLogLogicalInfoActive()) { - const int toastFlag = 1 << 8; - m_session_id |= toastFlag; + m_session_id = (int)((uint32)(m_session_id) | TOAST_FLAG); } XLOG_ASSEMBLE_ONE_ITEM(scratch, sizeof(m_session_id), &m_session_id, remained_size); } diff --git a/src/gausskernel/storage/replication/logical/decode.cpp b/src/gausskernel/storage/replication/logical/decode.cpp index 189e8fe39..9bca465a5 100644 --- a/src/gausskernel/storage/replication/logical/decode.cpp +++ b/src/gausskernel/storage/replication/logical/decode.cpp @@ -890,7 +890,6 @@ static void AreaDecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } - static void DecodeUheapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { uint8 info = XLogRecGetInfo(buf->record) & XLOG_UHEAP_OPMASK; @@ -974,12 +973,9 @@ static void AreaDecodeUheapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf } } - - bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { - const int toastMask = (1 << 8) - 1; - origin_id &= toastMask; + origin_id = (int)((uint32)(origin_id) & TOAST_MASK); if (ctx->callbacks.filter_by_origin_cb == NULL) return false; @@ -1043,8 +1039,6 @@ static void AreaDecodingChange(ReorderBufferChange *change, LogicalDecodingConte RelationClose(relation); } - - /* * Consolidated commit record handling between the different form of commit * records. diff --git a/src/gausskernel/storage/replication/logical/logical_parse.cpp b/src/gausskernel/storage/replication/logical/logical_parse.cpp index 502ed5217..778e37ee7 100644 --- a/src/gausskernel/storage/replication/logical/logical_parse.cpp +++ b/src/gausskernel/storage/replication/logical/logical_parse.cpp @@ -68,7 +68,6 @@ void ParseHeap3Op(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa void ParseUheapOp(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, ParallelDecodeReaderWorker *worker); extern Pointer GetXlrec(XLogReaderState *record); -static const int toastMask = 1 << 8; /* * After initially parsing the log, put tuple change into the queue * and wait for the decoder thread to parse it. @@ -84,7 +83,7 @@ static const int toastMask = 1 << 8; static bool ParallelFilterByOrigin(ParallelLogicalDecodingContext *ctx, RepOriginId origin_id) { ParallelDecodingData* data = (ParallelDecodingData *)ctx->output_plugin_private; - origin_id &= toastMask - 1; + origin_id = (int)((uint32)(origin_id) & TOAST_MASK); if (data->pOptions.only_local && origin_id != InvalidRepOriginId) { return true; } @@ -554,10 +553,10 @@ void ParseInsertXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, } /* - * Reuse the origin field in the log. If bit 8 is non-zero, + * Reuse the origin field in the log. If the highest bit is non-zero, * the current table is considered to be toast table. */ - bool istoast = (((XLogRecGetOrigin(r)) & toastMask) != 0); + bool istoast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0); if (istoast) { ereport(DEBUG2, (errmsg("ParallelDecodeInsert %d", istoast))); } @@ -632,7 +631,7 @@ void ParseUInsert(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa change->data.tp.snapshotcsn = curCSN; change->data.tp.clear_toast_afterwards = true; - bool isToast = (((XLogRecGetOrigin(r)) & toastMask) != 0); + bool isToast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0); SplicingToastTuple(change, ctx, &needDecode, isToast, worker, false); if (needDecode) { PutChangeQueue(slotId, change); @@ -703,10 +702,10 @@ void ParseUpdateXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, return; /* - * Reuse the origin field in the log. If bit 8 is non-zero, + * Reuse the origin field in the log. If the highest bit is non-zero, * the current table is considered to be toast table. */ - bool istoast = (((XLogRecGetOrigin(r)) & toastMask) != 0); + bool istoast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0); change = ParallelReorderBufferGetChange(ctx->reorder, slotId); change->action = PARALLEL_REORDER_BUFFER_CHANGE_UPDATE; @@ -769,7 +768,7 @@ void ParseUUpdate(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa return; } - bool isToast = (((XLogRecGetOrigin(r)) & toastMask) != 0); + bool isToast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0); ParallelReorderBufferChange *change = ParallelReorderBufferGetChange(ctx->reorder, slotId); change->action = PARALLEL_REORDER_BUFFER_CHANGE_UUPDATE; change->lsn = ctx->reader->ReadRecPtr; @@ -819,7 +818,7 @@ void ParseDeleteXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, int rc = 0; Size datalen = 0; bool needDecode = false; - bool istoast = (((XLogRecGetOrigin(r)) & toastMask) != 0); + bool istoast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0); if (istoast) { ereport(DEBUG2, (errmsg("ParallelDecodeDelete %d", istoast))); } @@ -880,7 +879,7 @@ void ParseUDelete(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa int slotId = worker->slotId; RelFileNode targetNode = {0, 0, 0, 0}; bool needDecode = false; - bool isToast = (((XLogRecGetOrigin(r)) & toastMask) != 0); + bool isToast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0); xlrec = (XlUHeapDelete *)UGetXlrec(r); CommitSeqNo curCSN = *(CommitSeqNo *)((char *)xlrec + SizeOfUHeapDelete); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5b0d34757..a7448d56d 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -45,6 +45,9 @@ /* Maximum number of max replication slots */ #define MAX_REPLICATION_SLOT_NUM 100 +/* FLag and mask for TOAST in parallel decoding */ +#define TOAST_FLAG ((uint32)1 << 31) +#define TOAST_MASK (((uint32)1 << 31) - 1) typedef void (*LogicalOutputPluginWriterWrite)( struct LogicalDecodingContext* lr, XLogRecPtr Ptr, TransactionId xid, bool last_write);