parallel decoding change toastMask to the highest bit

Offering: openGaussDev

More detail: parallel decoding fix initialization bug

Match-id-8e0cddece001059dedf185e00506ed2f72f370e8
This commit is contained in:
openGaussDev
2022-03-09 12:15:07 +08:00
committed by yanghao
parent 6e1770b403
commit 1f0543f212
4 changed files with 14 additions and 19 deletions

View File

@ -957,8 +957,7 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogFPWInfo fpw_
*(scratch++) = XLR_BLOCK_ID_ORIGIN;
remained_size--;
if (istoast && t_thrd.proc->workingVersionNum >= PARALLEL_DECODE_VERSION_NUM && XLogLogicalInfoActive()) {
const int toastFlag = 1 << 8;
m_session_id |= toastFlag;
m_session_id = (int)((uint32)(m_session_id) | TOAST_FLAG);
}
XLOG_ASSEMBLE_ONE_ITEM(scratch, sizeof(m_session_id), &m_session_id, remained_size);
}

View File

@ -890,7 +890,6 @@ static void AreaDecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
static void DecodeUheapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
uint8 info = XLogRecGetInfo(buf->record) & XLOG_UHEAP_OPMASK;
@ -974,12 +973,9 @@ static void AreaDecodeUheapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
}
}
bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
const int toastMask = (1 << 8) - 1;
origin_id &= toastMask;
origin_id = (int)((uint32)(origin_id) & TOAST_MASK);
if (ctx->callbacks.filter_by_origin_cb == NULL)
return false;
@ -1043,8 +1039,6 @@ static void AreaDecodingChange(ReorderBufferChange *change, LogicalDecodingConte
RelationClose(relation);
}
/*
* Consolidated commit record handling between the different form of commit
* records.

View File

@ -68,7 +68,6 @@ void ParseHeap3Op(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa
void ParseUheapOp(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, ParallelDecodeReaderWorker *worker);
extern Pointer GetXlrec(XLogReaderState *record);
static const int toastMask = 1 << 8;
/*
* After initially parsing the log, put tuple change into the queue
* and wait for the decoder thread to parse it.
@ -84,7 +83,7 @@ static const int toastMask = 1 << 8;
static bool ParallelFilterByOrigin(ParallelLogicalDecodingContext *ctx, RepOriginId origin_id)
{
ParallelDecodingData* data = (ParallelDecodingData *)ctx->output_plugin_private;
origin_id &= toastMask - 1;
origin_id = (int)((uint32)(origin_id) & TOAST_MASK);
if (data->pOptions.only_local && origin_id != InvalidRepOriginId) {
return true;
}
@ -554,10 +553,10 @@ void ParseInsertXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
/*
* Reuse the origin field in the log. If bit 8 is non-zero,
* Reuse the origin field in the log. If the highest bit is non-zero,
* the current table is considered to be toast table.
*/
bool istoast = (((XLogRecGetOrigin(r)) & toastMask) != 0);
bool istoast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0);
if (istoast) {
ereport(DEBUG2, (errmsg("ParallelDecodeInsert %d", istoast)));
}
@ -632,7 +631,7 @@ void ParseUInsert(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa
change->data.tp.snapshotcsn = curCSN;
change->data.tp.clear_toast_afterwards = true;
bool isToast = (((XLogRecGetOrigin(r)) & toastMask) != 0);
bool isToast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0);
SplicingToastTuple(change, ctx, &needDecode, isToast, worker, false);
if (needDecode) {
PutChangeQueue(slotId, change);
@ -703,10 +702,10 @@ void ParseUpdateXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf,
return;
/*
* Reuse the origin field in the log. If bit 8 is non-zero,
* Reuse the origin field in the log. If the highest bit is non-zero,
* the current table is considered to be toast table.
*/
bool istoast = (((XLogRecGetOrigin(r)) & toastMask) != 0);
bool istoast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0);
change = ParallelReorderBufferGetChange(ctx->reorder, slotId);
change->action = PARALLEL_REORDER_BUFFER_CHANGE_UPDATE;
@ -769,7 +768,7 @@ void ParseUUpdate(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa
return;
}
bool isToast = (((XLogRecGetOrigin(r)) & toastMask) != 0);
bool isToast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0);
ParallelReorderBufferChange *change = ParallelReorderBufferGetChange(ctx->reorder, slotId);
change->action = PARALLEL_REORDER_BUFFER_CHANGE_UUPDATE;
change->lsn = ctx->reader->ReadRecPtr;
@ -819,7 +818,7 @@ void ParseDeleteXlog(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf,
int rc = 0;
Size datalen = 0;
bool needDecode = false;
bool istoast = (((XLogRecGetOrigin(r)) & toastMask) != 0);
bool istoast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0);
if (istoast) {
ereport(DEBUG2, (errmsg("ParallelDecodeDelete %d", istoast)));
}
@ -880,7 +879,7 @@ void ParseUDelete(ParallelLogicalDecodingContext *ctx, XLogRecordBuffer *buf, Pa
int slotId = worker->slotId;
RelFileNode targetNode = {0, 0, 0, 0};
bool needDecode = false;
bool isToast = (((XLogRecGetOrigin(r)) & toastMask) != 0);
bool isToast = (((uint32)(XLogRecGetOrigin(r)) & TOAST_FLAG) != 0);
xlrec = (XlUHeapDelete *)UGetXlrec(r);
CommitSeqNo curCSN = *(CommitSeqNo *)((char *)xlrec + SizeOfUHeapDelete);

View File

@ -45,6 +45,9 @@
/* Maximum number of max replication slots */
#define MAX_REPLICATION_SLOT_NUM 100
/* FLag and mask for TOAST in parallel decoding */
#define TOAST_FLAG ((uint32)1 << 31)
#define TOAST_MASK (((uint32)1 << 31) - 1)
typedef void (*LogicalOutputPluginWriterWrite)(
struct LogicalDecodingContext* lr, XLogRecPtr Ptr, TransactionId xid, bool last_write);