fix cluster core
This commit is contained in:
@ -3813,8 +3813,14 @@ static void writetup_cluster(Tuplesortstate* state, int tapenum, SortTuple* stup
|
||||
static void readtup_cluster(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int tuplen)
|
||||
{
|
||||
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int) - sizeof(TransactionId) * 2;
|
||||
HeapTuple tuple = (HeapTuple) readtup_alloc(state, t_len + HEAPTUPLESIZE);
|
||||
tuple->tupTableType = HEAP_TUPLE;
|
||||
HeapTuple tuple;
|
||||
if (state->tupDesc->td_tam_ops == TableAmHeap) {
|
||||
tuple = (HeapTuple) readtup_alloc(state, t_len + HEAPTUPLESIZE);
|
||||
tuple->tupTableType = HEAP_TUPLE;
|
||||
} else {
|
||||
tuple = (HeapTuple) readtup_alloc(state, t_len + UHeapTupleDataSize);
|
||||
tuple->tupTableType = UHEAP_TUPLE;
|
||||
}
|
||||
|
||||
/* Reconstruct the HeapTupleData header */
|
||||
tuple->t_data = (HeapTupleHeader)((char*)tuple + HEAPTUPLESIZE);
|
||||
|
||||
@ -774,7 +774,7 @@ static void AssignTransactionId(TransactionState s)
|
||||
log_unknown_top = true;
|
||||
|
||||
/* allocate undo zone before generate a new xid. */
|
||||
if (!isSubXact && IsUnderPostmaster && !ENABLE_DSS) {
|
||||
if (g_instance.attr.attr_storage.enable_ustore && !isSubXact && IsUnderPostmaster && !ENABLE_DSS) {
|
||||
undo::AllocateUndoZone();
|
||||
pg_memory_barrier();
|
||||
}
|
||||
|
||||
@ -70,7 +70,7 @@ typedef struct registered_buffer {
|
||||
SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
|
||||
|
||||
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogFPWInfo fpw_info, XLogRecPtr *fpw_lsn,
|
||||
int bucket_id = -1, bool istoast = false);
|
||||
int bucket_id = -1, bool istoast = false, TransactionId xid = InvalidTransactionId);
|
||||
static void XLogResetLogicalPage(void);
|
||||
|
||||
/*
|
||||
@ -504,7 +504,7 @@ void XlogInsertSleep(void)
|
||||
* though not on the data they reference. This is OK since the XLogRecData
|
||||
* structs are always just temporaries in the calling code.
|
||||
*/
|
||||
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, int bucket_id, bool istoast)
|
||||
XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, int bucket_id, bool istoast, TransactionId xid)
|
||||
{
|
||||
XLogRecPtr EndPos;
|
||||
bool isSwitchoverBarrier = ((rmid == RM_BARRIER_ID) && (info == XLOG_BARRIER_SWITCHOVER));
|
||||
@ -550,7 +550,7 @@ XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, int bucket_id, bool istoast)
|
||||
*/
|
||||
GetFullPageWriteInfo(&fpw_info);
|
||||
|
||||
rdt = XLogRecordAssemble(rmid, info, fpw_info, &fpw_lsn, bucket_id, istoast);
|
||||
rdt = XLogRecordAssemble(rmid, info, fpw_info, &fpw_lsn, bucket_id, istoast, xid);
|
||||
|
||||
EndPos = XLogInsertRecord(rdt, fpw_lsn);
|
||||
} while (XLByteEQ(EndPos, InvalidXLogRecPtr));
|
||||
@ -685,7 +685,7 @@ static bool XLogNeedVMPhysicalLocation(RmgrId rmi, uint8 info, int blockId)
|
||||
*
|
||||
*/
|
||||
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogFPWInfo fpw_info, XLogRecPtr *fpw_lsn,
|
||||
int bucket_id, bool istoast)
|
||||
int bucket_id, bool istoast, TransactionId xid)
|
||||
{
|
||||
XLogRecData *rdt = NULL;
|
||||
uint32 total_len = 0;
|
||||
@ -1022,9 +1022,13 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogFPWInfo fpw_
|
||||
* once we know where in the WAL the record will be inserted. The CRC does
|
||||
* not include the record header yet.
|
||||
*/
|
||||
bool isUHeap = (rmid >= RM_UHEAP_ID) && (rmid <= RM_UHEAPUNDO_ID);
|
||||
if ((rmid >= RM_UHEAP_ID) && (rmid <= RM_UNDOACTION_ID)) {
|
||||
xid = TransactionIdIsValid(GetTopTransactionIdIfAny()) ? GetTopTransactionIdIfAny() : xid;
|
||||
} else {
|
||||
xid = GetCurrentTransactionIdIfAny();
|
||||
}
|
||||
|
||||
rechdr->xl_xid = (isUHeap) ? GetTopTransactionIdIfAny() : GetCurrentTransactionIdIfAny();
|
||||
rechdr->xl_xid = xid;
|
||||
rechdr->xl_tot_len = total_len;
|
||||
rechdr->xl_info = info;
|
||||
rechdr->xl_rmid = rmid;
|
||||
|
||||
@ -1930,7 +1930,6 @@ static void RedoUndoMultiInsertBlock(XLogBlockHead *blockhead, XLogBlockUndoPars
|
||||
* We can skip inserting undo records if the tuples are to be marked as
|
||||
* frozen.
|
||||
*/
|
||||
elog(LOG, "Undo record prepared: %d for Block Number: %d", nranges, blockdatarec->multiInsertUndoParse.blkno);
|
||||
if (!skipUndo && !skipInsert) {
|
||||
for (int i = 0; i < nranges; i++) {
|
||||
MemoryContext old_cxt = MemoryContextSwitchTo((*urecvec)[i]->mem_context());
|
||||
|
||||
@ -1464,7 +1464,6 @@ static UndoRecPtr PrepareAndInsertUndoRecordForMultiInsertRedo(XLogReaderState *
|
||||
* We can skip inserting undo records if the tuples are to be marked as
|
||||
* frozen.
|
||||
*/
|
||||
elog(LOG, "Undo record prepared: %d for Block Number: %d", nranges, blkno);
|
||||
if (!skipUndo && !skipInsert) {
|
||||
for (int i = 0; i < nranges; i++) {
|
||||
MemoryContext old_cxt = MemoryContextSwitchTo((*urecvec)[i]->mem_context());
|
||||
|
||||
@ -733,7 +733,7 @@ void InsertPreparedUndo(_in_ URecVector *urecvec, _in_ XLogRecPtr lsn)
|
||||
diffpage = true;
|
||||
}
|
||||
if (!t_thrd.xlog_cxt.InRecovery || PageGetLSN(page) < lsn) {
|
||||
if (startingByte != phdr->pd_upper) {
|
||||
if (startingByte > phdr->pd_upper) {
|
||||
ereport(LOG, (errmsg("undo record discontinuous,zid %u, buffer %d, startingByte %u, "
|
||||
"page start %u, page end %u, alreadyWritten %d, lastPageWritten %d, diffpage %s, urp %lu, "
|
||||
"newpage %s.", phdr->pd_prune_xid, buffer, startingByte, phdr->pd_lower, phdr->pd_upper,
|
||||
|
||||
@ -709,7 +709,7 @@ void UpdateRollbackFinish(UndoSlotPtr slotPtr)
|
||||
xlrec.slotPtr = slotPtr;
|
||||
XLogBeginInsert();
|
||||
XLogRegisterData((char *)&xlrec, sizeof(xlrec));
|
||||
lsn = XLogInsert(RM_UNDOACTION_ID, XLOG_ROLLBACK_FINISH);
|
||||
lsn = XLogInsert(RM_UNDOACTION_ID, XLOG_ROLLBACK_FINISH, InvalidBktId, false, slot->XactId());
|
||||
PageSetLSN(page, lsn);
|
||||
}
|
||||
MarkBufferDirty(buf.Buf());
|
||||
|
||||
@ -56,7 +56,7 @@ typedef enum XLOG_FPI_FOR_HINT_TYPE {
|
||||
|
||||
/* prototypes for public functions in xloginsert.c: */
|
||||
extern void XLogBeginInsert(void);
|
||||
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, int bucket_id = InvalidBktId, bool istoast = false);
|
||||
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, int bucket_id = InvalidBktId, bool istoast = false, TransactionId xid = 0);
|
||||
extern void XLogEnsureRecordSpace(int nbuffers, int ndatas);
|
||||
extern void XLogRegisterData(char* data, int len);
|
||||
extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags, TdeInfo* tdeinfo = NULL);
|
||||
|
||||
Reference in New Issue
Block a user