fix rollback remnants

This commit is contained in:
yaoxin
2024-06-11 21:25:11 +08:00
parent 7a9a104e7c
commit ecfc504975
10 changed files with 145 additions and 40 deletions

View File

@ -7109,6 +7109,23 @@ bool RecheckIndexTuple(const IndexScanDesc scan, TupleTableSlot *slot)
bool result = IndexTupleSize(itup) == IndexTupleSize(trueItup) &&
memcmp(itup, trueItup, IndexTupleSize(itup)) == 0;
if (so->indexInfo->ii_Predicate != NIL) {
List* predicate = NIL;
predicate = so->indexInfo->ii_PredicateState;
if (predicate == NIL) {
if (so->fakeEstate->es_is_flt_frame) {
predicate = (List*)ExecPrepareQualByFlatten(so->indexInfo->ii_Predicate, so->fakeEstate);
} else {
predicate = (List*)ExecPrepareExpr((Expr*)so->indexInfo->ii_Predicate, so->fakeEstate);
}
so->indexInfo->ii_PredicateState = predicate;
}
if (!ExecQual(predicate, econtext, false)) {
result = false;
}
}
/* be tidy */
MemoryContextSwitchTo(oldContext);
ResetExprContext(econtext); /* reset per tuple context */

View File

@ -8326,7 +8326,7 @@ void TryExecuteUndoActions(TransactionState s, UndoPersistence pLevel)
{
UndoSlotPtr slotPtr = t_thrd.undo_cxt.slotPtr[pLevel];
ExecuteUndoActions(slot->XactId(), s->latest_urp[pLevel], s->first_urp[pLevel],
slotPtr, !IsSubTransaction());
slotPtr, !IsSubTransaction(), pLevel);
}
PG_CATCH();
{
@ -8374,16 +8374,32 @@ void TryExecuteUndoActions(TransactionState s, UndoPersistence pLevel)
void ApplyUndoActions()
{
TransactionState s = CurrentTransactionState;
bool needRollback = false;
undo::TransactionSlot *slot = NULL;
TransactionId topXid = GetTopTransactionIdIfAny();
if (!s->perform_undo) {
for (int i = 0; i < UNDO_PERSISTENCE_LEVELS; i++) {
if (s->latest_urp[i] || (!IsSubTransaction() && t_thrd.undo_cxt.slotPtr[i] != INVALID_UNDO_SLOT_PTR)) {
slot = static_cast<undo::TransactionSlot *>(t_thrd.undo_cxt.slots[i]);
Assert(slot != NULL && topXid == slot->XactId());
if (slot != NULL && topXid == slot->XactId()) {
needRollback = true;
break;
}
}
}
if (!needRollback) {
return;
}
if (t_thrd.xact_cxt.executeSubxactUndo) {
ereport(WARNING, (errmodule(MOD_USTORE),
errmsg("Failed to execute undo for subxact, rollback of the entire transaction will be done by "
errmsg("[Rollback skip] xid(%ld), curxid(%lu), start(%lu), end(%lu). "
"Failed to execute undo for subxact, rollback of the entire transaction will be done by "
"asynchronous rollback or page-level rollback. Remark info, firstUrp(%lu,%lu,%lu), "
"lastestUrp(%lu,%lu,%lu), lastestXactUrp(%lu,%lu,%lu).",
slot->XactId(), GetTopTransactionIdIfAny(), slot->StartUndoPtr(), slot->EndUndoPtr(),
s->first_urp[0], s->first_urp[1], s->first_urp[UNDO_PERSISTENCE_LEVELS - 1],
s->latest_urp[0], s->latest_urp[1], s->latest_urp[UNDO_PERSISTENCE_LEVELS - 1],
s->latest_urp_xact[0], s->latest_urp_xact[1], s->latest_urp_xact[UNDO_PERSISTENCE_LEVELS - 1])));
@ -8426,10 +8442,29 @@ void ApplyUndoActions()
s->state = TRANS_UNDO;
for (int i = 0; i < UNDO_PERSISTENCE_LEVELS; i++) {
slot = static_cast<undo::TransactionSlot *>(t_thrd.undo_cxt.slots[i]);
if (slot == NULL) {
continue;
}
if (s->latest_urp[i]) {
WaitState oldStatus = pgstat_report_waitstatus(STATE_WAIT_TRANSACTION_ROLLBACK);
TryExecuteUndoActions(s, (UndoPersistence)i);
pgstat_report_waitstatus(oldStatus);
} else if (!IsSubTransaction() && t_thrd.undo_cxt.slotPtr[i] != INVALID_UNDO_SLOT_PTR) {
Assert(slot != NULL && topXid == slot->XactId());
if (slot != NULL && topXid == slot->XactId()) {
ereport(WARNING, (errmodule(MOD_USTORE),
errmsg("[Rollback skip] xid(%ld), curxid(%lu), start(%lu), end(%lu). "
"There is no undo record in the top-level transaction. "
"FirstUrp(%lu,%lu,%lu), lastestUrp(%lu,%lu,%lu), lastestXactUrp(%lu,%lu,%lu).",
slot->XactId(), GetTopTransactionIdIfAny(), slot->StartUndoPtr(), slot->EndUndoPtr(),
s->first_urp[0], s->first_urp[1], s->first_urp[UNDO_PERSISTENCE_LEVELS - 1],
s->latest_urp[0], s->latest_urp[1], s->latest_urp[UNDO_PERSISTENCE_LEVELS - 1],
s->latest_urp_xact[0], s->latest_urp_xact[1], s->latest_urp_xact[UNDO_PERSISTENCE_LEVELS - 1])));
UndoRecPtr prev = undo::GetPrevUrp(slot->EndUndoPtr());
(void)VerifyAndDoUndoActions(slot->XactId(), prev, slot->StartUndoPtr(), true, true);
undo::UpdateRollbackFinish(t_thrd.undo_cxt.slotPtr[i]);
}
}
}
@ -8472,6 +8507,17 @@ extern void ResetUndoActionsInfo(void)
CurrentTransactionState->latest_urp[i] = INVALID_UNDO_REC_PTR;
CurrentTransactionState->latest_urp_xact[i] = INVALID_UNDO_REC_PTR;
}
if (IsSubTransaction()) {
return;
}
t_thrd.undo_cxt.transUndoSize = 0;
for (int i = 0; i < UNDO_PERSISTENCE_LEVELS; i++) {
t_thrd.undo_cxt.prevXid[i] = InvalidTransactionId;
t_thrd.undo_cxt.slots[i] = NULL;
t_thrd.undo_cxt.slotPtr[i] = INVALID_UNDO_REC_PTR;
}
}
/*

View File

@ -1348,5 +1348,13 @@ bool IndexPagePrepareForXid(Relation rel, Page page, TransactionId xid, bool nee
int64 newBase = oldestXmin - FirstNormalTransactionId; /* reserve 3 more xid for Invalid/Bootstrap/Frozen */
int64 delta = newBase - curBase;
IndexPageShiftBase(rel, page, delta, needWal, buf);
if (xid < opaque->xid_base + FirstNormalTransactionId || xid > opaque->xid_base + MaxShortTransactionId) {
ereport(ERROR, (errcode(ERRCODE_CANNOT_MODIFY_XIDBASE),
errmsg("Page fix failed. \"%s\", now xid is %lu,"
"page xid base is %lu, oldestXmin is %lu.",
RelationGetRelationName(rel), xid, opaque->xid_base, oldestXmin)));
}
return hasPruned;
}

View File

@ -44,26 +44,14 @@ static int GetUndoApplySize()
return (int)applySize;
}
/*
* execute_undo_actions - Execute the undo actions
*
* xid - Transaction id that is getting rolled back.
* fromUrecptr - undo record pointer from where to start applying undo action.
* toUrecptr - undo record pointer upto which point apply undo action.
* isTopTxn - true if rollback is for top transaction.
*/
void ExecuteUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPtr toUrecptr,
UndoSlotPtr slotPtr, bool isTopTxn)
bool VerifyAndDoUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPtr toUrecptr,
bool isTopTxn, bool isVerify)
{
Assert(toUrecptr != INVALID_UNDO_REC_PTR && fromUrecptr != INVALID_UNDO_REC_PTR);
Assert(slotPtr != INVALID_UNDO_REC_PTR);
Assert(fullXid != InvalidTransactionId);
if (isVerify && u_sess->attr.attr_storage.ustore_verify_level < USTORE_VERIFY_FAST) {
return true;
}
int undoApplySize = GetUndoApplySize();
UndoRecPtr urecPtr = fromUrecptr;
VerifyMemoryContext();
UndoRecord *urec = New(CurrentMemoryContext)UndoRecord();
urec->SetUrp(toUrecptr);
urec->SetMemoryContext(CurrentMemoryContext);
if (isTopTxn) {
/*
@ -78,18 +66,23 @@ void ExecuteUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPt
* worker processed it and backend tries to process it some later
* point.
*/
VerifyMemoryContext();
UndoRecord *urec = New(CurrentMemoryContext) UndoRecord();
urec->SetMemoryContext(CurrentMemoryContext);
urec->SetUrp(toUrecptr);
UndoTraversalState rc = FetchUndoRecord(urec, NULL, InvalidBlockNumber, InvalidOffsetNumber,
InvalidTransactionId, false, NULL);
DELETE_EX(urec);
/* already processed. */
if (rc != UNDO_TRAVERSAL_COMPLETE) {
DELETE_EX(urec);
return;
ereport(ERROR, (errmodule(MOD_USTORE),
errmsg("[Rollbakc Skip]: xid(%lu), toUrecptr(%lu), fromUrecptr(%lu), rc(%d)",
fullXid, toUrecptr, fromUrecptr, rc)));
return false;
}
}
DELETE_EX(urec);
/*
/*
* Fetch the multiple undo records which can fit into uur_segment; sort
* them in order of reloid and block number then apply them together
* page-wise. Repeat this until we get invalid undo record pointer.
@ -112,16 +105,23 @@ void ExecuteUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPt
* for this transaction, otherwise we need to fetch the next batch of
* the undo records.
*/
if (!IS_VALID_UNDO_REC_PTR(urecPtr))
if (!IS_VALID_UNDO_REC_PTR(urecPtr)){
break;
}
/*
* Fetch multiple undo record in bulk.
* This will return a URecVector which contains an array of UndoRecords.
*/
URecVector *urecvec = FetchUndoRecordRange(&urecPtr, toUrecptr, undoApplySize, false);
if (urecvec->Size() == 0)
if (urecvec->Size() == 0){
ereport(ERROR, (errmodule(MOD_USTORE),
errmsg("[Rollbakc Skip]: xid(%lu), toUrecptr(%lu), fromUrecptr(%lu)",
fullXid, toUrecptr, fromUrecptr)));
break;
}
if (isTopTxn && !IS_VALID_UNDO_REC_PTR(urecPtr)) {
containsFullChain = true;
@ -136,7 +136,6 @@ void ExecuteUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPt
int i = 0;
for (i = 0; i < urecvec->Size(); i++) {
UndoRecord *uur = (*urecvec)[i];
if (currRelfilenode != InvalidOid && (currTablespace != uur->Tablespace() ||
currRelfilenode != uur->Relfilenode() || currBlkno != uur->Blkno())) {
preRetCode = RmgrTable[RM_UHEAP_ID].rm_undo(urecvec, startIndex, i - 1, fullXid,
@ -157,8 +156,35 @@ void ExecuteUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPt
DELETE_EX(urecvec);
} while (true);
return true;
}
/*
* execute_undo_actions - Execute the undo actions
*
* xid - Transaction id that is getting rolled back.
* fromUrecptr - undo record pointer from where to start applying undo action.
* toUrecptr - undo record pointer upto which point apply undo action.
* isTopTxn - true if rollback is for top transaction.
*/
void ExecuteUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPtr toUrecptr,
UndoSlotPtr slotPtr, bool isTopTxn, UndoPersistence plevel)
{
Assert(toUrecptr != INVALID_UNDO_REC_PTR && fromUrecptr != INVALID_UNDO_REC_PTR);
Assert(slotPtr != INVALID_UNDO_REC_PTR);
Assert(fullXid != InvalidTransactionId);
if (!VerifyAndDoUndoActions(fullXid, fromUrecptr, toUrecptr, isTopTxn, false)) {
return;
}
if (isTopTxn) {
if (plevel != UNDO_PERSISTENT_BUTT) {
undo::TransactionSlot *slot = static_cast<undo::TransactionSlot *>(t_thrd.undo_cxt.slots[plevel]);
UndoRecPtr prev = undo::GetPrevUrp(slot->EndUndoPtr());
if (prev != fromUrecptr || slot->StartUndoPtr() != toUrecptr) {
(void)VerifyAndDoUndoActions(slot->XactId(), prev, slot->StartUndoPtr(), isTopTxn, true);
}
}
undo::UpdateRollbackFinish(slotPtr);
}
}

View File

@ -46,6 +46,7 @@
#include "gssignal/gs_signal.h"
#include "access/ustore/knl_undoworker.h"
#include "access/ustore/knl_undorequest.h"
#include "access/gtm.h"
#define InvalidPid ((ThreadId)(-1))
@ -196,6 +197,16 @@ void UndoWorkerShmemInit(void)
}
}
void UndoLuncherQuitAndClean(int code, Datum arg)
{
#ifdef ENABLE_MULTIPLE_NODES
CloseGTM();
#endif
ereport(LOG, (errmsg("undo launcher shutting down")));
t_thrd.undolauncher_cxt.UndoWorkerShmem->undo_launcher_pid = 0;
DisownLatch(&t_thrd.undolauncher_cxt.UndoWorkerShmem->latch);
}
NON_EXEC_STATIC void UndoLauncherMain()
{
sigjmp_buf localSigjmpBuf;
@ -256,6 +267,8 @@ NON_EXEC_STATIC void UndoLauncherMain()
t_thrd.proc_cxt.PostInit->SetDatabaseAndUser(NULL, InvalidOid, NULL);
t_thrd.proc_cxt.PostInit->InitUndoLauncher();
on_proc_exit(UndoLuncherQuitAndClean, 0);
SetProcessingMode(NormalProcessing);
/* Unblock signals (they were blocked when the postmaster forked us) */
@ -316,8 +329,5 @@ NON_EXEC_STATIC void UndoLauncherMain()
}
shutdown:
ereport(LOG, (errmsg("undo launcher shutting down")));
t_thrd.undolauncher_cxt.UndoWorkerShmem->undo_launcher_pid = 0;
DisownLatch(&t_thrd.undolauncher_cxt.UndoWorkerShmem->latch);
proc_exit(0);
}

View File

@ -153,7 +153,7 @@ static void UndoPerformWork(UndoWorkInfo undowork)
undowork->startUndoPtr, undowork->endUndoPtr);
ExecuteUndoActions(undowork->xid, undowork->startUndoPtr, /* last undorecord created in the txn */
undowork->endUndoPtr, /* first undorecord created in the txn */
undowork->slotPtr, true);
undowork->slotPtr, true, UNDO_PERSISTENT_BUTT);
}
PG_CATCH();
{

View File

@ -128,12 +128,6 @@ void UndoRecord::Reset(UndoRecPtr urp)
if (BufferIsValid(buff_)) {
if (!IS_VALID_UNDO_REC_PTR(urp) || (UNDO_PTR_GET_ZONE_ID(urp) != UNDO_PTR_GET_ZONE_ID(urp_)) ||
(UNDO_PTR_GET_BLOCK_NUM(urp) != BufferGetBlockNumber(buff_))) {
BufferDesc *buf_desc = GetBufferDescriptor(buff_ - 1);
if (LWLockHeldByMe(buf_desc->content_lock)) {
ereport(LOG, (errmodule(MOD_UNDO),
errmsg("Release Buffer %d when Reset UndoRecord from %lu to %lu.", buff_, urp_, urp)));
LockBuffer(buff_, BUFFER_LOCK_UNLOCK);
}
ReleaseBuffer(buff_);
buff_ = InvalidBuffer;
}

View File

@ -87,7 +87,7 @@ void UndoShutdownHandler(SIGNAL_ARGS)
bool AsyncRollback(UndoZone *zone, UndoSlotPtr recycle, TransactionSlot *slot)
{
if (slot->NeedRollback()) {
if (zone->GetPersitentLevel() == UNDO_TEMP || zone->GetPersitentLevel() == UNDO_UNLOGGED) {
if (zone->GetPersitentLevel() == UNDO_TEMP) {
return true;
}
if (!u_sess->attr.attr_storage.enable_ustore_async_rollback) {

View File

@ -72,8 +72,10 @@ bool IsRollbackRequestHashFull();
bool AddRollbackRequest(TransactionId xid, UndoRecPtr fromAddr, UndoRecPtr toAddr, Oid dbid, UndoSlotPtr slotPtr);
bool RemoveRollbackRequest(TransactionId xid, UndoRecPtr startAddr, ThreadId pid);
void ReportFailedRollbackRequest(TransactionId xid, UndoRecPtr fromAddr, UndoRecPtr toAddr, Oid dbid);
bool VerifyAndDoUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPtr toUrecptr,
bool isTopTxn, bool isVerify);
void ExecuteUndoActions(TransactionId fullXid, UndoRecPtr fromUrecptr, UndoRecPtr toUrecptr, UndoSlotPtr slotPtr,
bool nopartial);
bool nopartial, UndoPersistence plevel);
void ExecuteUndoActionsPage(UndoRecPtr urp, Relation relation, Buffer buf, TransactionId xid);
int UHeapUndoActions(URecVector *urecvector, int startIdx, int endIdx, TransactionId xid, Oid reloid, Oid partitionoid,
BlockNumber blkno, bool isFullChain, int preRetCode, Oid *preReloid, Oid *prePartitionoid);

View File

@ -1513,6 +1513,8 @@ exception when others then
end;
/
call p1_test_ustore();
--?ARNING: [Rollback skip] xid(.*), curxid(.*), start(.*), end(.*). There is no undo record in the top-level transaction. FirstUrp(.*), lastestUrp(.*), lastestXactUrp(.*).
CONTEXT: PL/pgSQL function p1_test_ustore() line 6 at ROLLBACK
p1_test_ustore
----------------