diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp index 51c0e6954..d7eb38b47 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp @@ -1210,8 +1210,6 @@ void PageManagerProcLsnForwarder(RedoItem *lsnForwarder) PageManagerAddRedoItemToSegWorkers(lsnForwarder); PageManagerAddRedoItemToHashMapManager(lsnForwarder); - PageRedoPipeline *myRedoLine = &g_dispatcher->pageLines[g_redoWorker->slotId]; - PageManagerPruneIfRealtimeBuildFailover(); /* wait hashmapmng prune and segworker distribute segrecord to hashmap */ uint32 refCount; @@ -3415,6 +3413,7 @@ void HashMapManagerMain() SPSCBlockingQueuePop(g_dispatcher->segQueue); } + PageRedoPipeline *myRedoLine = &g_dispatcher->pageLines[g_redoWorker->slotId]; /** * step2: prune idle hashmap * @@ -3425,7 +3424,7 @@ void HashMapManagerMain() // the head of redoItem hashmap linked list ondemand_htab_ctrl_t *nextHtabCtrl = g_instance.comm_cxt.predo_cxt.redoItemHashCtrl[g_redoWorker->slotId]; // the tail of redoItem hashmap linked list - ondemand_htab_ctrl_t *targetHtabCtrl = g_dispatcher->pageLines[g_redoWorker->slotId].managerThd->redoItemHashCtrl; + ondemand_htab_ctrl_t *targetHtabCtrl = myRedoLine->managerThd->redoItemHashCtrl; // the processing redoItem hashmap ondemand_htab_ctrl_t *procHtabCtrl = nextHtabCtrl; while (nextHtabCtrl != targetHtabCtrl) { @@ -3452,6 +3451,9 @@ void HashMapManagerMain() } CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]); + // step4: release for global xlog memory + OndemandGlobalXLogMemReleaseIfNeed(&myRedoLine->batchThd->parseManager.memctl); + RedoInterruptCallBack(); ADD_ABNORMAL_POSITION(12); pg_usleep(500000L); /* 500 ms */ diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp index 9bfc8e045..e71ca3095 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp @@ -121,44 +121,75 @@ static RedoMemSlot *OndemandGlobalXLogMemAlloc() return NULL; } -static void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl) +static RedoMemSlot *GetTailSlot(RedoMemManager *memctl, Buffer headSlotBuffer) +{ + RedoMemSlot *tailSlot = &memctl->memslot[headSlotBuffer - 1]; + while (tailSlot->freeNext != InvalidBuffer) { + tailSlot = &memctl->memslot[tailSlot->freeNext - 1]; + } + return tailSlot; +} + +static void InsertBatchXLogMemToSlot(RedoMemManager *memctl, Buffer *targetSlot, Buffer headSlotBuffer) +{ + RedoMemSlot *tailSlot = GetTailSlot(memctl, headSlotBuffer); + Buffer oldFirst = AtomicReadBuffer(targetSlot); + pg_memory_barrier(); + do { + AtomicWriteBuffer(&tailSlot->freeNext, oldFirst); + } while (!AtomicCompareExchangeBuffer(targetSlot, &oldFirst, headSlotBuffer)); +} + +// used in hashmap manager, global firstreleaseslot should not be InvalidBuffer in healthy condition +void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl) { RedoMemManager *glbmemctl = &ondemand_extreme_rto::g_dispatcher->parseManager.memctl; if (AtomicReadBuffer(&glbmemctl->firstreleaseslot) == InvalidBuffer) { - Buffer firstreleaseslot = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer); - Buffer invalidbuffer = InvalidBuffer; - if (!AtomicCompareExchangeBuffer(&glbmemctl->firstreleaseslot, &invalidbuffer, firstreleaseslot)) { - AtomicWriteBuffer(&memctl->firstreleaseslot, firstreleaseslot); + Buffer firstReleaseSlotBuffer = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer); + if (firstReleaseSlotBuffer == InvalidBuffer) { + // set pipeline firstfreeslot to pipeline firstreleaseslot, for next loop to global firstreleaseslot + Buffer firstFreeSlotBuffer = AtomicExchangeBuffer(&memctl->firstfreeslot, InvalidBuffer); + if (firstFreeSlotBuffer != InvalidBuffer) { + InsertBatchXLogMemToSlot(memctl, &memctl->firstreleaseslot, firstFreeSlotBuffer); + } + } else { + // set pipeline firstreleaseslot for global firstreleaseslot + Buffer invalidBuffer = InvalidBuffer; + if (!AtomicCompareExchangeBuffer(&glbmemctl->firstreleaseslot, &invalidBuffer, firstReleaseSlotBuffer)) { + // exchange failed, give back + InsertBatchXLogMemToSlot(memctl, &memctl->firstreleaseslot, firstReleaseSlotBuffer); + } } } } RedoMemSlot *OndemandXLogMemAlloc(RedoMemManager *memctl) { - RedoMemSlot *nextfreeslot = NULL; + RedoMemSlot *nextFreeSlot = NULL; do { - if (memctl->firstfreeslot == InvalidBuffer) { - memctl->firstfreeslot = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer); + if (AtomicReadBuffer(&memctl->firstfreeslot) == InvalidBuffer) { + AtomicWriteBuffer(&memctl->firstfreeslot, AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer)); pg_read_barrier(); } - if (memctl->firstfreeslot != InvalidBuffer) { - nextfreeslot = &(memctl->memslot[memctl->firstfreeslot - 1]); - memctl->firstfreeslot = nextfreeslot->freeNext; - nextfreeslot->freeNext = InvalidBuffer; + Buffer firstFreeSlotBuffer = AtomicExchangeBuffer(&memctl->firstfreeslot, InvalidBuffer); + if (firstFreeSlotBuffer != InvalidBuffer) { + nextFreeSlot = &(memctl->memslot[firstFreeSlotBuffer - 1]); + AtomicWriteBuffer(&memctl->firstfreeslot, nextFreeSlot->freeNext); + nextFreeSlot->freeNext = InvalidBuffer; } - if (nextfreeslot == NULL) { - nextfreeslot = OndemandGlobalXLogMemAlloc(); + if (nextFreeSlot == NULL) { + nextFreeSlot = OndemandGlobalXLogMemAlloc(); } if (memctl->doInterrupt != NULL) { memctl->doInterrupt(); } - } while (nextfreeslot == NULL); + } while (nextFreeSlot == NULL); pg_atomic_fetch_add_u32(&memctl->usedblknum, 1); - return nextfreeslot; + return nextFreeSlot; } void OndemandXLogMemRelease(RedoMemManager *memctl, Buffer bufferid) @@ -181,8 +212,6 @@ void OndemandXLogMemRelease(RedoMemManager *memctl, Buffer bufferid) AtomicWriteBuffer(&bufferslot->freeNext, oldFirst); } while (!AtomicCompareExchangeBuffer(&releasememctl->firstreleaseslot, &oldFirst, bufferid)); pg_atomic_fetch_sub_u32(&memctl->usedblknum, 1); - - OndemandGlobalXLogMemReleaseIfNeed(memctl); } @@ -261,6 +290,7 @@ XLogRecParseState *OndemandXLogParseBufferAllocList(RedoParseManager *parsemanag void OndemandXLogParseBufferRelease(XLogRecParseState *recordstate) { if (recordstate->distributeStatus == XLOG_SKIP_DISTRIBUTE) { + Assert(!SS_ONDEMAND_REALTIME_BUILD_NORMAL); // alloc in pageRedoWorker or backends pfree(recordstate); return; @@ -335,7 +365,8 @@ long RedoRelationForOndemandExtremeRTO(Relation relation) { * @param dbId the dbNode of database * @return the redoEntry num of target database */ -long RedoDatabaseForOndemandExtremeRTO(Oid dbNode) { +long RedoDatabaseForOndemandExtremeRTO(Oid dbNode) +{ long entryNum = 0; Assert(OidIsValid(dbNode)); diff --git a/src/include/access/ondemand_extreme_rto/redo_utils.h b/src/include/access/ondemand_extreme_rto/redo_utils.h index 3dcbc83ee..07d72b51c 100644 --- a/src/include/access/ondemand_extreme_rto/redo_utils.h +++ b/src/include/access/ondemand_extreme_rto/redo_utils.h @@ -54,5 +54,6 @@ void OnDemandUpdateRealtimeBuildPrunePtr(); void OnDemandNotifyHashMapPruneIfNeed(); XLogRecParseType GetCurrentXLogRecParseType(XLogRecParseState *preState); bool IsRecParseStateHaveChildState(XLogRecParseState *checkState); +void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl); #endif /* ONDEMAND_EXTREME_RTO_REDO_UTILS_H */ \ No newline at end of file