From 6bcb754afd5cb532a39ed573397b853bbccd257d Mon Sep 17 00:00:00 2001 From: chendong76 <1209756284@qq.com> Date: Thu, 13 Jun 2024 16:11:33 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=8C=89=E9=9C=80=E5=9B=9E?= =?UTF-8?q?=E6=94=BE=E4=B8=8B=EF=BC=8Chashmap=E5=86=85=E5=AD=98=E4=B8=8D?= =?UTF-8?q?=E8=B6=B3=E5=AF=BC=E8=87=B4=E5=9B=9E=E6=94=BE=E5=8D=A1=E4=BD=8F?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ddes/adapter/ss_dms_recovery.cpp | 2 +- .../ondemand_extreme_rto/dispatcher.cpp | 2 +- .../ondemand_extreme_rto/page_redo.cpp | 172 +++++++++++++----- .../ondemand_extreme_rto/redo_utils.cpp | 42 +++-- .../access/ondemand_extreme_rto/dispatcher.h | 1 + .../access/ondemand_extreme_rto/page_redo.h | 9 +- .../access/ondemand_extreme_rto/redo_utils.h | 1 + 7 files changed, 158 insertions(+), 71 deletions(-) diff --git a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp index 0d26545c2..a17d27c04 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp @@ -125,10 +125,10 @@ bool SSRecoveryApplyDelay() return true; } + OnDemandNotifyHashMapPruneIfNeed(); while (g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag || SS_ONDEMAND_RECOVERY_PAUSE) { /* might change the trigger file's location */ RedoInterruptCallBack(); - pg_usleep(REFORM_WAIT_TIME); } diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp index a11708718..95e3a4271 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp @@ -444,7 +444,7 @@ void HandleStartupInterruptsForExtremeRto() static void SetOndemandXLogParseFlagValue(uint32 maxParseBufNum) { g_ondemandXLogParseMemFullValue = maxParseBufNum * ONDEMAND_FORCE_PRUNE_RATIO; - g_ondemandXLogParseMemApproachFullVaule = maxParseBufNum * ONDEMAND_DISTRIBUTE_RATIO; + g_ondemandXLogParseMemCancelPauseVaule = maxParseBufNum * ONDEMAND_DISTRIBUTE_CANCEL_RATIO; g_ondemandRealtimeBuildQueueFullValue = REALTIME_BUILD_RECORD_QUEUE_SIZE * ONDEMAND_FORCE_PRUNE_RATIO; } diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp index b1d44337e..1dd7cea00 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp @@ -116,7 +116,7 @@ RedoItem g_forceDistributeMark; RedoItem g_hashmapPruneMark; uint32 g_ondemandXLogParseMemFullValue = 0; -uint32 g_ondemandXLogParseMemApproachFullVaule = 0; +uint32 g_ondemandXLogParseMemCancelPauseVaule = 0; uint32 g_ondemandRealtimeBuildQueueFullValue = 0; static const int PAGE_REDO_WORKER_ARG = 3; @@ -148,6 +148,11 @@ static void PageManagerPruneIfRealtimeBuildFailover(); static void RealtimeBuildReleaseRecoveryLatch(int code, Datum arg); static void OnDemandPageManagerRedoSegParseState(XLogRecParseState *preState); static void SegWorkerRedoAllSegBlockRecord(); +static void OndemandPauseRedoAndRequestPrimaryDoCkpt(OndemandCheckPauseCB activatePauseFunc, + OndemandCheckPauseCB inactivatePauseFunc, OndemandProcPauseStatusCB refreshPauseStatusFunc, + OndemandProcPauseStatusCB logPauseStatusFunc, ondemand_recovery_pause_status_t pauseState, + bool onlyInRealtimeBuild); +static void BatchRedoProcIfXLogParseMemFull(); RefOperate recordRefOperate = { AddRefRecord, @@ -174,7 +179,7 @@ static inline bool OndemandXLogParseMemFull() static inline bool OndemandXLogParseMemApproachFull() { - return (pg_atomic_read_u32(&g_dispatcher->parseManager.memctl.usedblknum) > g_ondemandXLogParseMemApproachFullVaule); + return (pg_atomic_read_u32(&g_dispatcher->parseManager.memctl.usedblknum) > g_ondemandXLogParseMemCancelPauseVaule); } static inline bool OndemandTrxnQueueFullInRealtimeBuild() @@ -611,16 +616,28 @@ void BatchRedoSendMarkToPageRedoManager(RedoItem *sendMark) AddPageRedoItem(myRedoLine->managerThd, sendMark); } +/* + * If hashmap is full, we send prune/distribute signal to pageRedoMng and release hashmap. + * We need call this func in these scenario: + * 1. batchRedo's queue is empty, so we send release signal from startup to all batchRedo pipelines + * 2. batchRedo's queue is not empty, so we call this func when we parse xlog record + */ static void BatchRedoProcIfXLogParseMemFull() { - while (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) { + if (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) { if (SS_ONDEMAND_REALTIME_BUILD_NORMAL) { BatchRedoSendMarkToPageRedoManager(&g_hashmapPruneMark); } else { BatchRedoSendMarkToPageRedoManager(&g_forceDistributeMark); } - RedoInterruptCallBack(); - pg_usleep(100000); // 100 ms + // wait until hashmap have enough free block-records or current pipeline do not use any block-records + do { + if (pg_atomic_read_u32(&g_redoWorker->parseManager.memctl.usedblknum) == 0) { + break; + } + RedoInterruptCallBack(); + pg_usleep(100000L); // 100 ms + } while (SS_ONDEMAND_RECOVERY_HASHMAP_FULL); } } @@ -638,6 +655,8 @@ bool BatchRedoDistributeItems(void **eleArry, uint32 eleNum) smgrcloseall(); } else if (eleArry[i] == (void *)&g_cleanInvalidPageMark) { forget_range_invalid_pages((void *)eleArry[i]); + } else if (eleArry[i] == (void *)&g_hashmapPruneMark) { + BatchRedoProcIfXLogParseMemFull(); } else { BatchRedoProcIfXLogParseMemFull(); GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]); @@ -728,7 +747,7 @@ void RedoPageManagerDistributeToAllOneBlock(XLogRecParseState *ddlParseState) } } -void ReleaseRecParseState(PageRedoPipeline *myRedoLine, HTAB *redoItemHash, RedoItemHashEntry *redoItemEntry, uint32 workId) +void ReleaseRecParseState(HTAB *redoItemHash, RedoItemHashEntry *redoItemEntry) { XLogRecParseState *cur_state = redoItemEntry->head; XLogRecParseState *releaseHeadState = redoItemEntry->head; @@ -809,7 +828,9 @@ void RedoPageManagerDistributeToRedoThd(PageRedoPipeline *myRedoLine, static void WaitSegRedoWorkersQueueEmpty() { - while (!SPSCBlockingQueueIsEmpty(g_dispatcher->auxiliaryLine.segRedoThd->queue)) { + while (!SPSCBlockingQueueIsEmpty(g_dispatcher->auxiliaryLine.segRedoThd->queue) || + !SPSCBlockingQueueIsEmpty(g_dispatcher->segQueue)) { + pg_usleep(100000L); /* 100 ms */ RedoInterruptCallBack(); } } @@ -827,7 +848,7 @@ void RedoPageManagerDistributeBlockRecord(XLogRecParseState *parsestate) while ((redoItemEntry = (RedoItemHashEntry *)hash_seq_search(&status)) != NULL) { uint32 workId = GetWorkerId(&redoItemEntry->redoItemTag, WorkerNumPerMng); - ReleaseRecParseState(myRedoLine, curMap, redoItemEntry, workId); + ReleaseRecParseState(curMap, redoItemEntry); RedoPageManagerDistributeToRedoThd(myRedoLine, curMap, redoItemEntry, workId); } @@ -848,7 +869,7 @@ void WaitCurrentPipeLineRedoWorkersQueueEmpty() } } -static void ReleaseReplayedInParse(PageRedoPipeline* myRedoLine, uint32 workerNum) +static void ReleaseReplayedInParse() { HASH_SEQ_STATUS status; RedoItemHashEntry *redoItemEntry = NULL; @@ -856,10 +877,7 @@ static void ReleaseReplayedInParse(PageRedoPipeline* myRedoLine, uint32 workerNu hash_seq_init(&status, curMap); while ((redoItemEntry = (RedoItemHashEntry *)hash_seq_search(&status)) != NULL) { - if (g_redoWorker->slotId == GetSlotId(redoItemEntry->redoItemTag.rNode, 0, 0, GetBatchCount())) { - uint32 workId = GetWorkerId(&redoItemEntry->redoItemTag, workerNum); - ReleaseRecParseState(myRedoLine, curMap, redoItemEntry, workId); - } + ReleaseRecParseState(curMap, redoItemEntry); } } @@ -871,11 +889,19 @@ static void WaitAndTryReleaseWorkerReplayedRec(PageRedoPipeline *myRedoLine, uin for (uint32 i = 0; i < workerNum; i++) { if (!RedoWorkerIsIdle(myRedoLine->redoThd[i])) { queueIsEmpty = false; - ReleaseReplayedInParse(myRedoLine, workerNum); - pg_usleep(50000L); + pg_usleep(50000L); /* 50 ms */ break; } } + ReleaseReplayedInParse(); + } +} + +static void ReleaseRecParseStateUntilRedoNotPause() +{ + while (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) { + ReleaseReplayedInParse(); + pg_usleep(50000L); /* 50 ms */ } } @@ -900,8 +926,7 @@ void PageManagerDispatchEndMarkAndWait() WaitPageRedoWorkerReachLastMark(g_dispatcher->auxiliaryLine.segRedoThd); } WaitPageRedoWorkerReachLastMark(myRedoLine->htabThd); - - ReleaseReplayedInParse(myRedoLine, WorkerNumPerMng); + ReleaseReplayedInParse(); } void RedoPageManagerDdlAction(XLogRecParseState *parsestate) @@ -1354,7 +1379,7 @@ static void OnDemandPageManagerProcSegParseState(XLogRecParseState *preState, XL static bool WaitPrimaryDoCheckpointAndAllPRTrackEmpty(XLogRecParseState *preState, HTAB *redoItemHash) { - if (SS_ONDEMAND_REALTIME_BUILD_DISABLED) { + if (!SS_ONDEMAND_REALTIME_BUILD_NORMAL) { return false; } @@ -1591,12 +1616,8 @@ bool PageManagerRedoDistributeItems(void **eleArry, uint32 eleNum) Assert(!SS_ONDEMAND_REALTIME_BUILD_NORMAL); // double check if (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) { - ereport(WARNING, (errcode(ERRCODE_LOG), - errmsg("[On-demand] Parse buffer num approach critical value, distribute block record by force," - " slotid %d, usedblknum %d, totalblknum %d", g_redoWorker->slotId, - pg_atomic_read_u32(&g_dispatcher->parseManager.memctl.usedblknum), - g_dispatcher->parseManager.memctl.totalblknum))); RedoPageManagerDistributeBlockRecord(NULL); + ReleaseRecParseStateUntilRedoNotPause(); } continue; } @@ -2531,6 +2552,13 @@ void StartupSendFowarder(RedoItem *item) AddPageRedoItem(g_dispatcher->auxiliaryLine.ctrlThd, item); } +void StartupSendMarkToBatchRedo(RedoItem *item) +{ + for (uint32 i = 0; i < g_dispatcher->pageLineNum; ++i) { + AddPageRedoItem(g_dispatcher->pageLines[i].batchThd, item); + } +} + void SendLsnFowarder() { // update and read in the same thread, so no need atomic operation @@ -3401,12 +3429,12 @@ void OndemandCtrlWorkerMain() OndemandUpdateXLogParseMemUsedBlkNum(); CountAndGetRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_1], g_redoWorker->timeCostList[TIME_COST_STEP_2]); - OndemandRequestPrimaryDoCkptIfNeed(); + OndemandProcPauseStatus(); CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_2]); RedoInterruptCallBack(); ADD_ABNORMAL_POSITION(13); - pg_usleep(500000L); /* 500 ms */ + pg_usleep(50000L); /* 50 ms */ } while (true); SPSCBlockingQueuePop(g_redoWorker->queue); @@ -3988,48 +4016,92 @@ static XLogRecPtr RequestPrimaryCkptAndUpdateCkptRedoPtr() } static void OndemandPauseRedoAndRequestPrimaryDoCkpt(OndemandCheckPauseCB activatePauseFunc, - OndemandCheckPauseCB inactivatePauseFunc, OndemandRefreshPauseStatusCB refreshPauseStatusFunc, - ondemand_recovery_pause_status_t pauseState) + OndemandCheckPauseCB continuePauseFunc, OndemandProcPauseStatusCB refreshPauseStatusFunc, + OndemandProcPauseStatusCB logPauseStatusFunc, ondemand_recovery_pause_status_t pauseState, + bool onlyInRealtimeBuild) { - while (activatePauseFunc() && SS_ONDEMAND_REALTIME_BUILD_NORMAL) { + if (onlyInRealtimeBuild && !SS_ONDEMAND_REALTIME_BUILD_NORMAL) { + return; + } + + if (activatePauseFunc()) { + int sleepTime = 0; + int level = SS_ONDEMAND_REALTIME_BUILD_NORMAL ? LOG : WARNING; g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status = pauseState; - (void)RequestPrimaryCkptAndUpdateCkptRedoPtr(); + ereport(level, (errcode(ERRCODE_LOG), + errmsg("[On-demand] ondemand recovery meet pause status, type %d", pauseState))); + do { + // other redo workers will proc pause state directly if primary node crash + if (SS_ONDEMAND_REALTIME_BUILD_NORMAL) { + (void)RequestPrimaryCkptAndUpdateCkptRedoPtr(); + } - if ((inactivatePauseFunc != NULL) && !inactivatePauseFunc()) { - break; - } + if (refreshPauseStatusFunc != NULL) { + refreshPauseStatusFunc(); + } - if (refreshPauseStatusFunc != NULL) { - refreshPauseStatusFunc(); - } - - RedoInterruptCallBack(); - pg_usleep(100000L); /* 100 ms */ + if (sleepTime++ % ONDEMAND_LOG_PAUSE_STATUS_TIME == 0) { + logPauseStatusFunc(); + sleepTime = 1; + } + RedoInterruptCallBack(); + pg_usleep(100000L); /* 100 ms */ + } while (continuePauseFunc()); } g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status = NOT_PAUSE; } -void OndemandRequestPrimaryDoCkptIfNeed() +static void OndemandLogHashMapUsedStatus() { - if (!SS_ONDEMAND_REALTIME_BUILD_NORMAL) { - return; - } + ereport(LOG, (errcode(ERRCODE_LOG), + errmsg("[On-demand] hashmap usedblknum %lu, totalblknum %lu, pause value %lu, continue value %lu", + pg_atomic_read_u32(&g_dispatcher->parseManager.memctl.usedblknum), + g_dispatcher->parseManager.memctl.totalblknum, g_ondemandXLogParseMemFullValue, + g_ondemandXLogParseMemCancelPauseVaule))); +} +static void OndemandLogTrxnQueueUsedStatus() +{ + ereport(LOG, (errcode(ERRCODE_LOG), + errmsg("[On-demand] trxn queue usedblknum %lu, totalblknum %lu, pause value %lu", + SPSCGetQueueCount(g_dispatcher->trxnQueue), REALTIME_BUILD_RECORD_QUEUE_SIZE, + g_ondemandRealtimeBuildQueueFullValue))); +} + +static void OndemandLogSegQueueUsedStatus() +{ + ereport(LOG, (errcode(ERRCODE_LOG), + errmsg("[On-demand] seg queue usedblknum %lu, totalblknum %lu, pause value %lu", + SPSCGetQueueCount(g_dispatcher->segQueue), REALTIME_BUILD_RECORD_QUEUE_SIZE, + g_ondemandRealtimeBuildQueueFullValue))); +} + +static void OndemandLogSyncRecordStatus() +{ + XLogRecPtr ckptRedoPtr = pg_atomic_read_u64(&g_dispatcher->ckptRedoPtr); + XLogRecPtr syncRecordPtr = pg_atomic_read_u64(&g_dispatcher->syncRecordPtr); + ereport(LOG, (errcode(ERRCODE_LOG), + errmsg("[On-demand] primary checkpoint redo lsn %X/%X, sync record lsn %X/%X", + (uint32)(ckptRedoPtr >> 32), (uint32)ckptRedoPtr, (uint32)(syncRecordPtr >> 32), (uint32)syncRecordPtr))); +} + +void OndemandProcPauseStatus() +{ /* check whether parse mem is not enough */ - OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandXLogParseMemApproachFull, &OndemandXLogParseMemFull, - &OndemandUpdateXLogParseMemUsedBlkNum, PAUSE_FOR_PRUNE_HASHMAP); + OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandXLogParseMemFull, &OndemandXLogParseMemApproachFull, + &OndemandUpdateXLogParseMemUsedBlkNum, &OndemandLogHashMapUsedStatus, PAUSE_FOR_PRUNE_HASHMAP, false); /* check whether trxn record queue is full */ - OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandTrxnQueueFullInRealtimeBuild, NULL, NULL, - PAUSE_FOR_PRUNE_TRXN_QUEUE); + OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandTrxnQueueFullInRealtimeBuild, + &OndemandTrxnQueueFullInRealtimeBuild, NULL, &OndemandLogTrxnQueueUsedStatus, PAUSE_FOR_PRUNE_TRXN_QUEUE, true); /* check whether seg record queue is full */ - OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandSegQueueFullInRealtimeBuild, NULL, NULL, - PAUSE_FOR_PRUNE_SEG_QUEUE); + OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandSegQueueFullInRealtimeBuild, + &OndemandTrxnQueueFullInRealtimeBuild, NULL, &OndemandLogSegQueueUsedStatus, PAUSE_FOR_PRUNE_SEG_QUEUE, true); /* check whether redo workers need handle sync record */ - OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandNeedHandleSyncRecord, NULL, NULL, - PAUSE_FOR_SYNC_REDO); + OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandNeedHandleSyncRecord, &OndemandNeedHandleSyncRecord, NULL, + &OndemandLogSyncRecordStatus, PAUSE_FOR_SYNC_REDO, true); } bool SSXLogParseRecordNeedReplayInOndemandRealtimeBuild(XLogRecParseState *redoblockstate) diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp index a9964ecf0..9e2110114 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp @@ -92,23 +92,24 @@ void *OndemandXLogMemCtlInit(RedoMemManager *memctl, Size itemsize, int itemnum) memctl->memslot[i - 1].buf_id = i; /* start from 1 , 0 is invalidbuffer */ memctl->memslot[i - 1].freeNext = i - 1; } - memctl->firstfreeslot = memctl->totalblknum; - memctl->firstreleaseslot = InvalidBuffer; + // only used firstreleaseslot of globalmemctl + memctl->firstfreeslot = InvalidBuffer; + memctl->firstreleaseslot = memctl->totalblknum; return (void *)t_thrd.storage_cxt.ondemandXLogMem; } static RedoMemSlot *OndemandGlobalXLogMemAlloc() { RedoMemManager *glbmemctl = &ondemand_extreme_rto::g_dispatcher->parseManager.memctl; - Buffer firstfreebuffer = AtomicReadBuffer(&glbmemctl->firstfreeslot); - while (firstfreebuffer != InvalidBuffer) { - RedoMemSlot *firstfreeslot = &glbmemctl->memslot[firstfreebuffer - 1]; - Buffer nextfreebuffer = firstfreeslot->freeNext; - if (AtomicCompareExchangeBuffer(&glbmemctl->firstfreeslot, &firstfreebuffer, nextfreebuffer)) { - firstfreeslot->freeNext = InvalidBuffer; - return firstfreeslot; + Buffer firstreleasebuffer = AtomicReadBuffer(&glbmemctl->firstreleaseslot); + while (firstreleasebuffer != InvalidBuffer) { + RedoMemSlot *firstreleaseslot = &glbmemctl->memslot[firstreleasebuffer - 1]; + Buffer nextreleasebuffer = firstreleaseslot->freeNext; + if (AtomicCompareExchangeBuffer(&glbmemctl->firstreleaseslot, &firstreleasebuffer, nextreleasebuffer)) { + firstreleaseslot->freeNext = InvalidBuffer; + return firstreleaseslot; } - firstfreebuffer = AtomicReadBuffer(&glbmemctl->firstfreeslot); + firstreleasebuffer = AtomicReadBuffer(&glbmemctl->firstreleaseslot); } return NULL; } @@ -116,10 +117,10 @@ static RedoMemSlot *OndemandGlobalXLogMemAlloc() static void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl) { RedoMemManager *glbmemctl = &ondemand_extreme_rto::g_dispatcher->parseManager.memctl; - if (AtomicReadBuffer(&glbmemctl->firstfreeslot) == InvalidBuffer) { + if (AtomicReadBuffer(&glbmemctl->firstreleaseslot) == InvalidBuffer) { Buffer firstreleaseslot = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer); Buffer invalidbuffer = InvalidBuffer; - if (!AtomicCompareExchangeBuffer(&glbmemctl->firstfreeslot, &invalidbuffer, firstreleaseslot)) { + if (!AtomicCompareExchangeBuffer(&glbmemctl->firstreleaseslot, &invalidbuffer, firstreleaseslot)) { AtomicWriteBuffer(&memctl->firstreleaseslot, firstreleaseslot); } } @@ -156,18 +157,22 @@ RedoMemSlot *OndemandXLogMemAlloc(RedoMemManager *memctl) void OndemandXLogMemRelease(RedoMemManager *memctl, Buffer bufferid) { RedoMemSlot *bufferslot; + RedoMemManager *releasememctl = memctl; if (!RedoMemIsValid(memctl, bufferid)) { ereport(PANIC, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), errmsg("XLogMemRelease failed!, totalblknum:%u, buf_id:%u", memctl->totalblknum, bufferid))); - /* panic */ } bufferslot = &(memctl->memslot[bufferid - 1]); Assert(bufferslot->freeNext == InvalidBuffer); - Buffer oldFirst = AtomicReadBuffer(&memctl->firstreleaseslot); + // release to global firstreleaseslot directly if hashmap full + if (unlikely(SS_ONDEMAND_RECOVERY_HASHMAP_FULL)) { + releasememctl = &ondemand_extreme_rto::g_dispatcher->parseManager.memctl; + } + Buffer oldFirst = AtomicReadBuffer(&releasememctl->firstreleaseslot); pg_memory_barrier(); do { AtomicWriteBuffer(&bufferslot->freeNext, oldFirst); - } while (!AtomicCompareExchangeBuffer(&memctl->firstreleaseslot, &oldFirst, bufferid)); + } while (!AtomicCompareExchangeBuffer(&releasememctl->firstreleaseslot, &oldFirst, bufferid)); pg_atomic_fetch_sub_u32(&memctl->usedblknum, 1); OndemandGlobalXLogMemReleaseIfNeed(memctl); @@ -577,4 +582,11 @@ XLogRecPtr GetRedoLocInCheckpointRecord(XLogReaderState *record) checkPoint = checkPointUndo.ori_checkpoint; } return checkPoint.redo; +} + +void OnDemandNotifyHashMapPruneIfNeed() +{ + if (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) { + ondemand_extreme_rto::StartupSendMarkToBatchRedo(&ondemand_extreme_rto::g_hashmapPruneMark); + } } \ No newline at end of file diff --git a/src/include/access/ondemand_extreme_rto/dispatcher.h b/src/include/access/ondemand_extreme_rto/dispatcher.h index dae92770f..1ed16edee 100644 --- a/src/include/access/ondemand_extreme_rto/dispatcher.h +++ b/src/include/access/ondemand_extreme_rto/dispatcher.h @@ -265,6 +265,7 @@ List *CheckImcompleteAction(List *imcompleteActionList); void SetPageWorkStateByThreadId(uint32 threadState); void GetReplayedRecPtr(XLogRecPtr *startPtr, XLogRecPtr *endPtr); void StartupSendFowarder(RedoItem *item); +void StartupSendMarkToBatchRedo(RedoItem *item); XLogRecPtr GetSafeMinCheckPoint(); RedoWaitInfo redo_get_io_event(int32 event_id); void redo_get_worker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen); diff --git a/src/include/access/ondemand_extreme_rto/page_redo.h b/src/include/access/ondemand_extreme_rto/page_redo.h index 84b76850f..2046e280a 100644 --- a/src/include/access/ondemand_extreme_rto/page_redo.h +++ b/src/include/access/ondemand_extreme_rto/page_redo.h @@ -41,10 +41,11 @@ namespace ondemand_extreme_rto { -#define ONDEMAND_DISTRIBUTE_RATIO 0.95 +#define ONDEMAND_DISTRIBUTE_CANCEL_RATIO 0.5 #define ONDEMAND_FORCE_PRUNE_RATIO 0.99 #define ONDEMAND_HASHTAB_SWITCH_LIMIT 100000 #define SEG_PROC_PIPELINE_SLOT 0 +#define ONDEMAND_LOG_PAUSE_STATUS_TIME 30 #define ONDEMAND_HASHMAP_ENTRY_REDO_DONE 0 #define ONDEMAND_HASHMAP_ENTRY_REDOING 1 @@ -58,11 +59,11 @@ static const uint32 MAX_REMOTE_READ_INFO_NUM = 100; static const uint32 ADVANCE_GLOBALLSN_INTERVAL = 1; /* unit second */ extern uint32 g_ondemandXLogParseMemFullValue; -extern uint32 g_ondemandXLogParseMemApproachFullVaule; +extern uint32 g_ondemandXLogParseMemCancelPauseVaule; extern uint32 g_ondemandRealtimeBuildQueueFullValue; typedef bool (*OndemandCheckPauseCB)(void); -typedef void (*OndemandRefreshPauseStatusCB)(void); +typedef void (*OndemandProcPauseStatusCB)(void); typedef enum { REDO_BATCH, @@ -276,7 +277,7 @@ int checkBlockRedoStateAndTryHashMapLock(BufferDesc* bufHdr, ForkNumber forkNum, bool checkBlockRedoDoneFromHashMapAndLock(LWLock **lock, RedoItemTag redoItemTag, RedoItemHashEntry **redoItemEntry, bool holdLock); void RedoWorkerQueueCallBack(); -void OndemandRequestPrimaryDoCkptIfNeed(); +void OndemandProcPauseStatus(); void GetOndemandRecoveryStatus(ondemand_recovery_stat *stat); void ReleaseBlockParseStateIfNotReplay(XLogRecParseState *preState, bool isChildState = false); bool SSXLogParseRecordNeedReplayInOndemandRealtimeBuild(XLogRecParseState *redoblockstate); diff --git a/src/include/access/ondemand_extreme_rto/redo_utils.h b/src/include/access/ondemand_extreme_rto/redo_utils.h index 182b69471..3dcbc83ee 100644 --- a/src/include/access/ondemand_extreme_rto/redo_utils.h +++ b/src/include/access/ondemand_extreme_rto/redo_utils.h @@ -51,6 +51,7 @@ void OnDemandWaitRealtimeBuildShutDown(); void OnDemandBackupControlFile(ControlFileData* controlFile); XLogRecPtr GetRedoLocInCheckpointRecord(XLogReaderState *record); void OnDemandUpdateRealtimeBuildPrunePtr(); +void OnDemandNotifyHashMapPruneIfNeed(); XLogRecParseType GetCurrentXLogRecParseType(XLogRecParseState *preState); bool IsRecParseStateHaveChildState(XLogRecParseState *checkState);