修复undo校验panic等问题

This commit is contained in:
徐达标
2024-06-29 08:42:53 +00:00
parent c248f6838e
commit 6b241ba9ce
15 changed files with 108 additions and 74 deletions

View File

@ -810,7 +810,7 @@ static void PutTranslotInfoToTuple(int zoneId, uint32 offset, TransactionSlot *s
rc = snprintf_s(textBuffer, sizeof(textBuffer), sizeof(textBuffer) - 1, UNDO_REC_PTR_FORMAT, offset);
securec_check_ss(rc, "\0", "\0");
values[ARR_4] = CStringGetTextDatum(textBuffer);
if (TransactionIdDidCommit((uint64)slot->XactId())) {
if (UHeapTransactionIdDidCommit((uint64)slot->XactId())) {
values[ARR_5] = COMMITED_STATUS;
} else if (TransactionIdIsInProgress((uint64)slot->XactId())) {
values[ARR_5] = INPROCESS_STATUS;
@ -978,7 +978,7 @@ static void ReadTranslotFromMemory(int startIdx, int endIdx,
UNDO_PTR_GET_OFFSET(slotPtr));
securec_check_ss(rc, "\0", "\0");
values[ARR_4] = CStringGetTextDatum(textBuffer);
if (TransactionIdDidCommit((uint64)slot->XactId())) {
if (UHeapTransactionIdDidCommit((uint64)slot->XactId())) {
values[ARR_5] = COMMITED_STATUS;
} else if (TransactionIdIsInProgress((uint64)slot->XactId())) {
values[ARR_5] = INPROCESS_STATUS;
@ -1222,7 +1222,7 @@ static uint64 UndoSpaceSize(UndoSpaceType type)
} else {
usp = ((UndoZone *)g_instance.undo_cxt.uZones[idx])->GetSlotSpace();
}
used += (uint64)usp->Used();
used += (uint64)usp->Used(idx);
}
return used;
}
@ -1271,7 +1271,7 @@ static void ReadUndoSpaceFromShared(int id, TupleDesc *tupleDesc, Tuplestorestat
rc = snprintf_s(textBuffer, sizeof(textBuffer), sizeof(textBuffer) - 1, UNDO_REC_PTR_FORMAT, used);
securec_check_ss(rc, "\0", "\0");
values[ARR_4] = CStringGetTextDatum(textBuffer);
rc = snprintf_s(textBuffer, sizeof(textBuffer), sizeof(textBuffer) - 1, UNDO_REC_PTR_FORMAT, usp->Used());
rc = snprintf_s(textBuffer, sizeof(textBuffer), sizeof(textBuffer) - 1, UNDO_REC_PTR_FORMAT, usp->Used(idx));
securec_check_ss(rc, "\0", "\0");
values[ARR_5] = CStringGetTextDatum(textBuffer);
rc = snprintf_s(textBuffer, sizeof(textBuffer), sizeof(textBuffer) - 1, UNDO_REC_PTR_FORMAT, usp->LSN());

View File

@ -305,22 +305,8 @@ loop:
bool UHeapXidVisibleInSnapshot(TransactionId xid, Snapshot snapshot,
TransactionIdStatus *hintstatus, Buffer buffer, bool *sync)
{
if (!GTM_LITE_MODE || snapshot->gtm_snapshot_type == GTM_SNAPSHOT_TYPE_LOCAL) {
/*
* Make a quick range check to eliminate most XIDs without looking at the
* CSN log.
*/
if (TransactionIdPrecedes(xid, snapshot->xmin)) {
return true;
}
/*
* Any xid >= xmax is in-progress (or aborted, but we don't distinguish
* that here.
*/
if (GTM_MODE && TransactionIdFollowsOrEquals(xid, snapshot->xmax)) {
return false;
}
if (TransactionIdOlderThanAllUndo(xid)) {
return CommittedXidVisibleInSnapshot(xid, snapshot, buffer);
}
return XidVisibleInSnapshot(xid, snapshot, hintstatus, buffer, sync);

View File

@ -636,8 +636,10 @@ restart:
/* by here, buf is write locked */
Buffer nextBuf = ReadRecycleQueueBuffer(rel, header->nextBlkno);
LockBuffer(nextBuf, BT_WRITE);
/* change endpoint, and insert xlog */
RecycleQueueChangeEndpoint(rel, buf, nextBuf, true);
if ((header->flags & URQ_HEAD_PAGE) != 0) {
/* change endpoint, and insert xlog */
RecycleQueueChangeEndpoint(rel, buf, nextBuf, true);
}
/* release current page, and return the next page */
UnlockReleaseBuffer(buf);
buf = nextBuf;

View File

@ -2003,7 +2003,7 @@ TM_Result UHeapDelete(Relation relation, ItemPointer tid, CommandId cid, Snapsho
{
UHeapTupleData utuple;
Buffer buffer;
UndoRecPtr prevUrecptr;
UndoRecPtr prevUrecptr = INVALID_UNDO_REC_PTR;
int transSlotId;
bool lockReacquired;
TransactionId fxid = GetTopTransactionId();
@ -2418,7 +2418,7 @@ TM_Result UHeapUpdate(Relation relation, Relation parentRelation, ItemPointer ot
UndoRecPtr urecptr;
UndoRecPtr newUrecptr;
UndoRecPtr prevUrecptr = INVALID_UNDO_REC_PTR;
UndoRecPtr newPrevUrecptr;
UndoRecPtr newPrevUrecptr = INVALID_UNDO_REC_PTR;
Page page;
BlockNumber block;
ItemPointerData ctid;
@ -2776,8 +2776,11 @@ check_tup_satisfies_update:
useInplaceUpdate = false;
useLinkUpdate = false;
} else if (!useInplaceUpdate) {
useInplaceUpdate = UHeapPagePruneOpt(relation, buffer, oldOffnum,
newtupsize - oldtupsize);
bool pruned = UHeapPagePruneOpt(relation, buffer, oldOffnum, newtupsize - oldtupsize);
lp = UPageGetRowPtr(page, oldOffnum);
if (pruned && (RowPtrGetOffset(lp) + newtupsize <= BLCKSZ)) {
useInplaceUpdate = true;
}
/* The page might have been modified, so refresh disk_tuple */
oldtup.disk_tuple = (UHeapDiskTuple)UPageGetRowData(page, lp);
}
@ -3894,6 +3897,10 @@ int UHeapPageReserveTransactionSlot(Relation relation, Buffer buf, TransactionId
* Unable to find an unused TD slot or reuse one.
* Try to extend the ITL array now.
*/
if (urecPtr != NULL) {
urecPtr = INVALID_UNDO_REC_PTR;
}
nExtended = UPageExtendTDSlots(relation, buf);
if (nExtended > 0) {
/*
@ -5638,6 +5645,10 @@ void UHeapAbortSpeculative(Relation relation, UHeapTuple utuple)
START_CRIT_SECTION();
/* Apply undo action for an INSERT */
if (urec->Blkno() != blkno) {
ereport(PANIC, (errmodule(MOD_USTORE), errcode(ERRCODE_DATA_CORRUPTED),
errmsg("Blkno %u of undorecord is different from buffer %u.", urec->Blkno(), blkno)));
}
ExecuteUndoForInsert(relation, buffer, urec->Offset(), urec->Xid());
int nline = UHeapPageGetMaxOffsetNumber(page);

View File

@ -405,7 +405,10 @@ int UHeapUndoActions(URecVector *urecvec, int startIdx, int endIdx, TransactionI
for (int i = startIdx; i <= endIdx; i++) {
UndoRecord *undorecord = (*urecvec)[i];
uint8 undotype = undorecord->Utype();
if (undorecord->Blkno() != blkno) {
ereport(PANIC, (errmodule(MOD_USTORE), errcode(ERRCODE_DATA_CORRUPTED),
errmsg("Blkno %u of undorecord is different from buffer %u.", undorecord->Blkno(), blkno)));
}
/*
* If the current UndoRecPtr on the slot is less than the
* UndoRecPtr of the current undorecord, then it means this undorecord

View File

@ -777,7 +777,7 @@ static bool VerifyUTuple(Relation rel, Page page, BlockNumber blkno, OffsetNumbe
}
if (isInvalidSlot) {
if (!TransactionIdDidCommit(tupXid)) {
if (!UHeapTransactionIdDidCommit(tupXid)) {
ereport(logLevel, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("tup xid not commit, tupxid = %lu.", tupXid)));
}

View File

@ -1368,7 +1368,7 @@ bool UHeapIndexBuildNextPage(UHeapScanDesc scan)
int nAborted = 0;
for (int slotNo = 0; slotNo < numSlots; slotNo++) {
TransactionId xid = tdSlots[slotNo].xactid;
if (!TransactionIdIsValid(xid) || TransactionIdIsCurrentTransactionId(xid) || TransactionIdDidCommit(xid)) {
if (!TransactionIdIsValid(xid) || TransactionIdIsCurrentTransactionId(xid) || UHeapTransactionIdDidCommit(xid)) {
continue; /* xid visible in SnapshotNow */
}

View File

@ -2167,10 +2167,9 @@ static bool GetTupleFromUndo(UndoRecPtr urecAdd, UHeapTuple currentTuple, UHeapT
*/
while (1) {
TransactionId lastXid = InvalidTransactionId;
UndoRecPtr urp = INVALID_UNDO_REC_PTR;
state = GetTupleFromUndoRecord(urecAdd, prevUndoXid, buffer, offnum, &hdr, visibleTuple,
&freeTuple, &uinfo, ctid, &lastXid, NULL);
int zoneId = (int)UNDO_PTR_GET_ZONE_ID(urp);
int zoneId = (int)UNDO_PTR_GET_ZONE_ID(urecAdd);
undo::UndoZone *uzone = undo::UndoZoneGroup::GetUndoZone(zoneId, false);
if (state == UNDO_TRAVERSAL_ABORT) {
ereport(ERROR, (errmodule(MOD_UNDO), errmsg(
@ -2189,7 +2188,7 @@ static bool GetTupleFromUndo(UndoRecPtr urecAdd, UHeapTuple currentTuple, UHeapT
GetTopTransactionIdIfAny(), PtrGetVal(*visibleTuple, table_oid), blkno, offnum, lastXid,
pg_atomic_read_u64(&g_instance.undo_cxt.globalRecycleXid),
pg_atomic_read_u64(&g_instance.undo_cxt.globalFrozenXid),
urp, zoneId, PtrGetVal(uzone, GetInsertURecPtr()), PtrGetVal(uzone, GetForceDiscardURecPtr()),
urecAdd, zoneId, PtrGetVal(uzone, GetInsertURecPtr()), PtrGetVal(uzone, GetForceDiscardURecPtr()),
PtrGetVal(uzone, GetDiscardURecPtr()), PtrGetVal(uzone, GetRecycleXid()),
PtrGetVal(snapshot, satisfies), PtrGetVal(snapshot, xmin))));
} else if (state == UNDO_TRAVERSAL_END || state == UNDO_TRAVERSAL_ENDCHAIN) {
@ -2204,19 +2203,19 @@ static bool GetTupleFromUndo(UndoRecPtr urecAdd, UHeapTuple currentTuple, UHeapT
ereport(ERROR, (errmodule(MOD_USTORE), errmsg(
"snapshot too old! "
"Reason: Need fetch undo record. "
"LogInfo: urp %lu, undo state %d, tuple flag %u, tupTd %d, tupXid %lu. "
"LogInfo: undo state %d, tuple flag %u, tupTd %d, tupXid %lu. "
"Td: tdxid %lu, tdid %d, undoptr %lu. "
"TransInfo: xid %lu, oid %u, tid(%u, %u), "
"globalRecycleXid %lu, globalFrozenXid %lu. "
"ZoneInfo: urp: %lu, zid %d, insertURecPtr %lu, forceDiscardURecPtr %lu, "
"discardURecPtr %lu, recycleXid %lu. "
"Snapshot: type %d, xmin %lu.",
urecAdd, state, hdr.flag, tupTdid, tupXid,
state, hdr.flag, tupTdid, tupXid,
uinfo.xid, uinfo.td_slot, uinfo.urec_add,
GetTopTransactionIdIfAny(), tableOid, blkno, offnum,
pg_atomic_read_u64(&g_instance.undo_cxt.globalRecycleXid),
pg_atomic_read_u64(&g_instance.undo_cxt.globalFrozenXid),
urp, zoneId, PtrGetVal(uzone, GetInsertURecPtr()), PtrGetVal(uzone, GetForceDiscardURecPtr()),
urecAdd, zoneId, PtrGetVal(uzone, GetInsertURecPtr()), PtrGetVal(uzone, GetForceDiscardURecPtr()),
PtrGetVal(uzone, GetDiscardURecPtr()), PtrGetVal(uzone, GetRecycleXid()),
snapshot->satisfies, snapshot->xmin)));
} else if (state != UNDO_TRAVERSAL_COMPLETE || uinfo.td_slot == UHEAPTUP_SLOT_FROZEN ||

View File

@ -139,7 +139,7 @@ void PrepareUndoMeta(XlogUndoMeta *meta, UndoPersistence upersistence, UndoRecPt
}
uzone->AdvanceInsertURecPtr(UNDO_PTR_GET_OFFSET(lastRecord), lastRecordSize);
if (uzone->GetForceDiscardURecPtr() > uzone->GetInsertURecPtr()) {
ereport(PANIC, (errmodule(MOD_UNDO), errmsg(UNDOFORMAT("zone %d forceDiscardURecPtr %lu > insertURecPtr %lu."),
ereport(WARNING, (errmodule(MOD_UNDO), errmsg(UNDOFORMAT("zone %d forceDiscardURecPtr %lu > insertURecPtr %lu."),
uzone->GetZoneId(), uzone->GetForceDiscardURecPtr(), uzone->GetInsertURecPtr())));
}
uzone->GetSlotBuffer().Lock();

View File

@ -298,7 +298,6 @@ bool RecycleUndoSpace(UndoZone *zone, TransactionId recycleXmin, TransactionId f
if (zone->GetPersitentLevel() == UNDO_PERMANENT) {
needWal = true;
}
Assert(recycle <= allocate);
while (recycle < allocate) {
UndoSlotBuffer& slotBuf = g_slotBufferCache->FetchTransactionBuffer(recycle);
UndoRecPtr startUndoPtr = INVALID_UNDO_REC_PTR;
@ -407,7 +406,6 @@ bool RecycleUndoSpace(UndoZone *zone, TransactionId recycleXmin, TransactionId f
int startZid = UNDO_PTR_GET_ZONE_ID(startUndoPtr);
int endZid = UNDO_PTR_GET_ZONE_ID(oldestEndUndoPtr);
if (unlikely(startZid != endZid)) {
Assert(UNDO_PTR_GET_OFFSET(endUndoPtr) == UNDO_LOG_MAX_SIZE);
oldestEndUndoPtr = MAKE_UNDO_PTR(startZid, UNDO_LOG_MAX_SIZE);
}
zone->SetDiscardURecPtr(oldestEndUndoPtr);
@ -418,7 +416,6 @@ bool RecycleUndoSpace(UndoZone *zone, TransactionId recycleXmin, TransactionId f
*oldestRecycleXid = recycleXid;
zone->SetForceDiscardURecPtr(endUndoPtr);
zone->SetRecycleTSlotPtr(recycle);
Assert(zone->GetForceDiscardURecPtr() <= zone->GetInsertURecPtr());
result = true;
XLogRecPtr lsn = InvalidXLogRecPtr;

View File

@ -39,13 +39,14 @@ static uint32 USEG_BLOCKS(uint32 dbId)
return UNDO_META_SEG_SIZE;
}
uint32 UndoSpace::Used(void)
uint32 UndoSpace::Used(int zoneId)
{
WHITEBOX_TEST_STUB(UNDO_USED_FAILED, WhiteboxDefaultErrorEmit);
if (tail_ < head_) {
ereport(PANIC, (errmodule(MOD_UNDO),
errmsg(UNDOFORMAT("space tail %lu < head %lu."), tail_, head_)));
ereport(WARNING, (errmodule(MOD_UNDO),
errmsg(UNDOFORMAT("zoneId %d space tail %lu < head %lu."), zoneId, tail_, head_)));
return 0;
}
return (uint32)((tail_ - head_) / BLCKSZ);
}
@ -75,7 +76,6 @@ void UndoSpace::ExtendUndoLog(int zid, UndoLogOffset offset, uint32 dbId)
{
RelFileNode rnode;
UndoLogOffset tail = tail_;
Assert(tail < offset && head_ <= tail_);
BlockNumber blockno;
UNDO_PTR_ASSIGN_REL_FILE_NODE(rnode, MAKE_UNDO_PTR(zid, offset), dbId);
SMgrRelation reln = smgropen(rnode, InvalidBackendId);
@ -124,7 +124,6 @@ void UndoSpace::UnlinkUndoLog(int zid, UndoLogOffset offset, uint32 dbId)
old_head = head_;
SetHead(offset);
}
Assert(head < offset && head_ <= tail_);
UNDO_PTR_ASSIGN_REL_FILE_NODE(rnode, MAKE_UNDO_PTR(zid, offset), dbId);
SMgrRelation reln = smgropen(rnode, InvalidBackendId);
uint64 segSize = USEG_SIZE(dbId);
@ -140,11 +139,13 @@ void UndoSpace::UnlinkUndoLog(int zid, UndoLogOffset offset, uint32 dbId)
"unlink undo log, zid=%d, dbid=%u, new_head=%lu, segId:%lu."),
zid, dbId, offset, head/segSize)));
if (g_instance.undo_cxt.undoTotalSize < segBlocks) {
ereport(PANIC, (errmodule(MOD_UNDO), errmsg(UNDOFORMAT(
ereport(WARNING, (errmodule(MOD_UNDO), errmsg(UNDOFORMAT(
"unlink undo log, total blocks=%u < segment size."),
g_instance.undo_cxt.undoTotalSize)));
pg_atomic_write_u32(&g_instance.undo_cxt.undoTotalSize, 0);
} else {
pg_atomic_fetch_sub_u32(&g_instance.undo_cxt.undoTotalSize, segBlocks);
}
pg_atomic_fetch_sub_u32(&g_instance.undo_cxt.undoTotalSize, segBlocks);
head += segSize;
}
smgrclose(reln);
@ -453,7 +454,7 @@ void UndoSpace::RecoveryUndoSpace(int fd, UndoSpaceType type)
usp->CreateNonExistsUndoFile(zoneId, UNDO_SLOT_DB_OID);
segSize = USEG_SIZE(UNDO_DB_OID);
}
pg_atomic_fetch_add_u32(&g_instance.undo_cxt.undoTotalSize, usp->Used());
pg_atomic_fetch_add_u32(&g_instance.undo_cxt.undoTotalSize, usp->Used(zoneId));
uint64 transUndoThresholdSize = UNDO_SPACE_THRESHOLD_PER_TRANS * BLCKSZ;
const uint64 MAX_OFFSET = (UNDO_LOG_MAX_SIZE - transUndoThresholdSize) - segSize;
if (usp->Tail() < usp->Head() || usp->Tail() > MAX_OFFSET) {

View File

@ -328,10 +328,10 @@ UndoSlotPtr GetNextSlotPtr(UndoSlotPtr slotPtr)
BlockNumber block = (BlockNumber)(slotOffset / BLCKSZ);
UndoSlotOffset blkOffset = slotOffset % BLCKSZ;
UndoSlotOffset offset = blkOffset + MAXALIGN(sizeof(undo::TransactionSlot));
if (BLCKSZ - offset < MAXALIGN(sizeof(undo::TransactionSlot))) {
offset = (block + 1) * BLCKSZ + UNDO_LOG_BLOCK_HEADER_SIZE;
if (BLCKSZ < MAXALIGN(sizeof(undo::TransactionSlot)) + offset) {
offset = (UndoSlotOffset)(block + 1) * BLCKSZ + UNDO_LOG_BLOCK_HEADER_SIZE;
} else {
offset += block * BLCKSZ;
offset += (UndoSlotOffset)block * BLCKSZ;
}
Assert (offset <= UNDO_LOG_MAX_SIZE);
return MAKE_UNDO_PTR(UNDO_PTR_GET_ZONE_ID(slotPtr), offset);

View File

@ -232,7 +232,6 @@ TransactionSlot *UndoZone::AllocTransactionSlot(UndoSlotPtr slotPtr, Transaction
slot->Init(xid, dbid);
Assert(slot->DbId() == u_sess->proc_cxt.MyDatabaseId && TransactionIdIsValid(slot->XactId()));
allocateTSlotPtr_ = UNDO_PTR_GET_OFFSET(undo::GetNextSlotPtr(slotPtr));
Assert(allocateTSlotPtr_ >= recycleTSlotPtr_);
return slot;
}
@ -370,7 +369,6 @@ void UndoZone::ReleaseSlotSpace(UndoRecPtr startSlotPtr, UndoRecPtr endSlotPtr,
slotSpace_.LockSpace();
UndoRecPtr prevHead = MAKE_UNDO_PTR(zid_, head);
slotSpace_.UnlinkUndoLog(zid_, endSegno * UNDO_META_SEGMENT_SIZE, UNDO_SLOT_DB_OID);
Assert(slotSpace_.Head() <= allocateTSlotPtr_);
if (pLevel_ == UNDO_PERMANENT && !(t_thrd.undorecycler_cxt.is_recovery_in_progress)) {
START_CRIT_SECTION();
slotSpace_.MarkDirty();
@ -411,10 +409,10 @@ void UndoZone::PrepareSwitch(void)
{
WHITEBOX_TEST_STUB(UNDO_PREPARE_SWITCH_FAILED, WhiteboxDefaultErrorEmit);
if (undoSpace_.Tail() != UNDO_LOG_MAX_SIZE) {
ereport(PANIC, (errmsg(UNDOFORMAT(
"Undo space switch fail, expect tail(%lu), real tail(%lu)."),
UNDO_LOG_MAX_SIZE, undoSpace_.Tail())));
if (undoSpace_.Tail() > UNDO_LOG_MAX_SIZE - UNDO_LOG_SEGMENT_SIZE) {
ereport(WARNING, (errmsg(UNDOFORMAT(
"Undo space switch fail, zoneid %d, expect tail(%lu), real tail(%lu), real head(%lu)"),
zid_, UNDO_LOG_MAX_SIZE - UNDO_LOG_SEGMENT_SIZE, undoSpace_.Tail(), undoSpace_.Head())));
}
if (pLevel_ == UNDO_PERMANENT) {
LockUndoZone();
@ -800,6 +798,51 @@ static int ReleaseUndoZoneId(int zid, UndoPersistence upersistence)
return tempZid;
}
static UndoZone *getUnusedZone(UndoPersistence upersistence, int *retZid, int oldZid)
{
int zid = -1;
UndoZone *newUzone = NULL;
if (upersistence >= UNDO_PERSISTENT_BUTT || upersistence < UNDO_PERMANENT) {
ereport(ERROR, (errmsg("getUnusedZone upersistence out of range [%d]",
upersistence)));
}
int basecount = (int)upersistence * PERSIST_ZONE_COUNT;
for (int i = PERSIST_ZONE_COUNT - 1; i >= 0; i--) {
zid = i + basecount;
newUzone = UndoZoneGroup::GetUndoZone(zid, true);
if (newUzone == NULL) {
zid = -1;
ereport(WARNING, (errmsg(UNDOFORMAT("can not palloc undo zone memory for zone %d"), zid)));
continue;
}
if (newUzone->Attached() || newUzone->GetPersitentLevel() != upersistence ||
newUzone->GetUndoSpace()->Tail() != 0 || newUzone->GetLSN() != 0 ||
newUzone->GetFrozenSlotPtr() != INVALID_UNDO_SLOT_PTR ||
newUzone->GetRecycleXid() != InvalidTransactionId ||
newUzone->GetFrozenXid() != InvalidTransactionId ||
newUzone->GetAttachPid() != 0 ||
UNDO_PTR_GET_OFFSET(newUzone->GetInsertURecPtr()) != UNDO_LOG_BLOCK_HEADER_SIZE ||
UNDO_PTR_GET_OFFSET(newUzone->GetDiscardURecPtr()) != UNDO_LOG_BLOCK_HEADER_SIZE ||
UNDO_PTR_GET_OFFSET(newUzone->GetForceDiscardURecPtr()) != UNDO_LOG_BLOCK_HEADER_SIZE ||
UNDO_PTR_GET_OFFSET(newUzone->GetAllocateTSlotPtr()) != UNDO_LOG_BLOCK_HEADER_SIZE ||
UNDO_PTR_GET_OFFSET(newUzone->GetRecycleTSlotPtr()) != UNDO_LOG_BLOCK_HEADER_SIZE) {
zid = -1;
continue;
}
if (zid == oldZid) {
continue;
}
break;
}
if (!IS_VALID_ZONE_ID(zid)) {
ereport(ERROR, (errmsg("SwitchZone: zone id is invalid, there're too many working threads.")));
}
*retZid = zid;
g_instance.undo_cxt.uZoneBitmap[upersistence] =
bms_del_member(g_instance.undo_cxt.uZoneBitmap[upersistence], (zid - basecount));
return newUzone;
}
void UndoZoneGroup::ReleaseZone(int zid, UndoPersistence upersistence)
{
Assert(IS_VALID_ZONE_ID(zid));
@ -842,18 +885,7 @@ UndoZone *UndoZoneGroup::SwitchZone(int zid, UndoPersistence upersistence)
int retZid = -1;
uzone->PrepareSwitch();
LWLockAcquire(UndoZoneLock, LW_EXCLUSIVE);
retZid = AllocateUndoZoneId(upersistence);
if (!IS_VALID_ZONE_ID(retZid)) {
ereport(ERROR, (errmsg("SwitchZone: zone id is invalid, there're too many working threads.")));
}
UndoZone *newUzone = UndoZoneGroup::GetUndoZone(retZid, true);
if (newUzone == NULL || newUzone->GetUndoSpace()->Tail() != 0) {
ereport(PANIC,
(errmsg(UNDOFORMAT("can not palloc undo zone memory, tail = %lu."),
newUzone->GetUndoSpace()->Tail())));
}
UndoZone *newUzone = getUnusedZone(upersistence, &retZid, zid);
WHITEBOX_TEST_STUB(UNDO_SWITCH_ZONE_FAILED, WhiteboxDefaultErrorEmit);
newUzone->Attach();
LWLockRelease(UndoZoneLock);

View File

@ -63,7 +63,7 @@ public:
{
return lsn_;
}
uint32 Used(void);
uint32 Used(int zoneId);
/* Setter, used for redo. */
inline void SetHead(UndoRecPtr head)

View File

@ -293,17 +293,20 @@ public:
inline uint64 UndoSize(void)
{
if (insertURecPtr_ < forceDiscardURecPtr_) {
ereport(PANIC, (errmodule(MOD_UNDO),
errmsg(UNDOFORMAT("insertURecPtr_ %lu < forceDiscardURecPtr_ %lu."),
insertURecPtr_, forceDiscardURecPtr_)));
ereport(WARNING, (errmodule(MOD_UNDO),
errmsg(UNDOFORMAT("zoneid %d, insertURecPtr_ %lu < forceDiscardURecPtr_ %lu."),
zid_, insertURecPtr_, forceDiscardURecPtr_)));
return 0;
}
return ((insertURecPtr_ - forceDiscardURecPtr_) / BLCKSZ);
}
inline uint64 SlotSize(void)
{
if (allocateTSlotPtr_ < recycleTSlotPtr_) {
ereport(PANIC, (errmodule(MOD_UNDO),
errmsg(UNDOFORMAT("allocateTSlotPtr_ %lu < recycleTSlotPtr_ %lu."), allocateTSlotPtr_, recycleTSlotPtr_)));
ereport(WARNING, (errmodule(MOD_UNDO),
errmsg(UNDOFORMAT("zoneid %d, allocateTSlotPtr_ %lu < recycleTSlotPtr_ %lu."),
zid_, allocateTSlotPtr_, recycleTSlotPtr_)));
return 0;
}
return ((allocateTSlotPtr_ - recycleTSlotPtr_) / BLCKSZ);
}