解决按需回放下,hashmap内存不足导致回放卡住的问题

This commit is contained in:
chendong76
2024-06-13 16:11:33 +08:00
committed by yaoxin
parent b5acfd6a00
commit 6bcb754afd
7 changed files with 158 additions and 71 deletions

View File

@ -125,10 +125,10 @@ bool SSRecoveryApplyDelay()
return true;
}
OnDemandNotifyHashMapPruneIfNeed();
while (g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag || SS_ONDEMAND_RECOVERY_PAUSE) {
/* might change the trigger file's location */
RedoInterruptCallBack();
pg_usleep(REFORM_WAIT_TIME);
}

View File

@ -444,7 +444,7 @@ void HandleStartupInterruptsForExtremeRto()
static void SetOndemandXLogParseFlagValue(uint32 maxParseBufNum)
{
g_ondemandXLogParseMemFullValue = maxParseBufNum * ONDEMAND_FORCE_PRUNE_RATIO;
g_ondemandXLogParseMemApproachFullVaule = maxParseBufNum * ONDEMAND_DISTRIBUTE_RATIO;
g_ondemandXLogParseMemCancelPauseVaule = maxParseBufNum * ONDEMAND_DISTRIBUTE_CANCEL_RATIO;
g_ondemandRealtimeBuildQueueFullValue = REALTIME_BUILD_RECORD_QUEUE_SIZE * ONDEMAND_FORCE_PRUNE_RATIO;
}

View File

@ -116,7 +116,7 @@ RedoItem g_forceDistributeMark;
RedoItem g_hashmapPruneMark;
uint32 g_ondemandXLogParseMemFullValue = 0;
uint32 g_ondemandXLogParseMemApproachFullVaule = 0;
uint32 g_ondemandXLogParseMemCancelPauseVaule = 0;
uint32 g_ondemandRealtimeBuildQueueFullValue = 0;
static const int PAGE_REDO_WORKER_ARG = 3;
@ -148,6 +148,11 @@ static void PageManagerPruneIfRealtimeBuildFailover();
static void RealtimeBuildReleaseRecoveryLatch(int code, Datum arg);
static void OnDemandPageManagerRedoSegParseState(XLogRecParseState *preState);
static void SegWorkerRedoAllSegBlockRecord();
static void OndemandPauseRedoAndRequestPrimaryDoCkpt(OndemandCheckPauseCB activatePauseFunc,
OndemandCheckPauseCB inactivatePauseFunc, OndemandProcPauseStatusCB refreshPauseStatusFunc,
OndemandProcPauseStatusCB logPauseStatusFunc, ondemand_recovery_pause_status_t pauseState,
bool onlyInRealtimeBuild);
static void BatchRedoProcIfXLogParseMemFull();
RefOperate recordRefOperate = {
AddRefRecord,
@ -174,7 +179,7 @@ static inline bool OndemandXLogParseMemFull()
static inline bool OndemandXLogParseMemApproachFull()
{
return (pg_atomic_read_u32(&g_dispatcher->parseManager.memctl.usedblknum) > g_ondemandXLogParseMemApproachFullVaule);
return (pg_atomic_read_u32(&g_dispatcher->parseManager.memctl.usedblknum) > g_ondemandXLogParseMemCancelPauseVaule);
}
static inline bool OndemandTrxnQueueFullInRealtimeBuild()
@ -611,16 +616,28 @@ void BatchRedoSendMarkToPageRedoManager(RedoItem *sendMark)
AddPageRedoItem(myRedoLine->managerThd, sendMark);
}
/*
* If hashmap is full, we send prune/distribute signal to pageRedoMng and release hashmap.
* We need call this func in these scenario:
* 1. batchRedo's queue is empty, so we send release signal from startup to all batchRedo pipelines
* 2. batchRedo's queue is not empty, so we call this func when we parse xlog record
*/
static void BatchRedoProcIfXLogParseMemFull()
{
while (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) {
if (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) {
if (SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
BatchRedoSendMarkToPageRedoManager(&g_hashmapPruneMark);
} else {
BatchRedoSendMarkToPageRedoManager(&g_forceDistributeMark);
}
RedoInterruptCallBack();
pg_usleep(100000); // 100 ms
// wait until hashmap have enough free block-records or current pipeline do not use any block-records
do {
if (pg_atomic_read_u32(&g_redoWorker->parseManager.memctl.usedblknum) == 0) {
break;
}
RedoInterruptCallBack();
pg_usleep(100000L); // 100 ms
} while (SS_ONDEMAND_RECOVERY_HASHMAP_FULL);
}
}
@ -638,6 +655,8 @@ bool BatchRedoDistributeItems(void **eleArry, uint32 eleNum)
smgrcloseall();
} else if (eleArry[i] == (void *)&g_cleanInvalidPageMark) {
forget_range_invalid_pages((void *)eleArry[i]);
} else if (eleArry[i] == (void *)&g_hashmapPruneMark) {
BatchRedoProcIfXLogParseMemFull();
} else {
BatchRedoProcIfXLogParseMemFull();
GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]);
@ -728,7 +747,7 @@ void RedoPageManagerDistributeToAllOneBlock(XLogRecParseState *ddlParseState)
}
}
void ReleaseRecParseState(PageRedoPipeline *myRedoLine, HTAB *redoItemHash, RedoItemHashEntry *redoItemEntry, uint32 workId)
void ReleaseRecParseState(HTAB *redoItemHash, RedoItemHashEntry *redoItemEntry)
{
XLogRecParseState *cur_state = redoItemEntry->head;
XLogRecParseState *releaseHeadState = redoItemEntry->head;
@ -809,7 +828,9 @@ void RedoPageManagerDistributeToRedoThd(PageRedoPipeline *myRedoLine,
static void WaitSegRedoWorkersQueueEmpty()
{
while (!SPSCBlockingQueueIsEmpty(g_dispatcher->auxiliaryLine.segRedoThd->queue)) {
while (!SPSCBlockingQueueIsEmpty(g_dispatcher->auxiliaryLine.segRedoThd->queue) ||
!SPSCBlockingQueueIsEmpty(g_dispatcher->segQueue)) {
pg_usleep(100000L); /* 100 ms */
RedoInterruptCallBack();
}
}
@ -827,7 +848,7 @@ void RedoPageManagerDistributeBlockRecord(XLogRecParseState *parsestate)
while ((redoItemEntry = (RedoItemHashEntry *)hash_seq_search(&status)) != NULL) {
uint32 workId = GetWorkerId(&redoItemEntry->redoItemTag, WorkerNumPerMng);
ReleaseRecParseState(myRedoLine, curMap, redoItemEntry, workId);
ReleaseRecParseState(curMap, redoItemEntry);
RedoPageManagerDistributeToRedoThd(myRedoLine, curMap, redoItemEntry, workId);
}
@ -848,7 +869,7 @@ void WaitCurrentPipeLineRedoWorkersQueueEmpty()
}
}
static void ReleaseReplayedInParse(PageRedoPipeline* myRedoLine, uint32 workerNum)
static void ReleaseReplayedInParse()
{
HASH_SEQ_STATUS status;
RedoItemHashEntry *redoItemEntry = NULL;
@ -856,10 +877,7 @@ static void ReleaseReplayedInParse(PageRedoPipeline* myRedoLine, uint32 workerNu
hash_seq_init(&status, curMap);
while ((redoItemEntry = (RedoItemHashEntry *)hash_seq_search(&status)) != NULL) {
if (g_redoWorker->slotId == GetSlotId(redoItemEntry->redoItemTag.rNode, 0, 0, GetBatchCount())) {
uint32 workId = GetWorkerId(&redoItemEntry->redoItemTag, workerNum);
ReleaseRecParseState(myRedoLine, curMap, redoItemEntry, workId);
}
ReleaseRecParseState(curMap, redoItemEntry);
}
}
@ -871,11 +889,19 @@ static void WaitAndTryReleaseWorkerReplayedRec(PageRedoPipeline *myRedoLine, uin
for (uint32 i = 0; i < workerNum; i++) {
if (!RedoWorkerIsIdle(myRedoLine->redoThd[i])) {
queueIsEmpty = false;
ReleaseReplayedInParse(myRedoLine, workerNum);
pg_usleep(50000L);
pg_usleep(50000L); /* 50 ms */
break;
}
}
ReleaseReplayedInParse();
}
}
static void ReleaseRecParseStateUntilRedoNotPause()
{
while (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) {
ReleaseReplayedInParse();
pg_usleep(50000L); /* 50 ms */
}
}
@ -900,8 +926,7 @@ void PageManagerDispatchEndMarkAndWait()
WaitPageRedoWorkerReachLastMark(g_dispatcher->auxiliaryLine.segRedoThd);
}
WaitPageRedoWorkerReachLastMark(myRedoLine->htabThd);
ReleaseReplayedInParse(myRedoLine, WorkerNumPerMng);
ReleaseReplayedInParse();
}
void RedoPageManagerDdlAction(XLogRecParseState *parsestate)
@ -1354,7 +1379,7 @@ static void OnDemandPageManagerProcSegParseState(XLogRecParseState *preState, XL
static bool WaitPrimaryDoCheckpointAndAllPRTrackEmpty(XLogRecParseState *preState, HTAB *redoItemHash)
{
if (SS_ONDEMAND_REALTIME_BUILD_DISABLED) {
if (!SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
return false;
}
@ -1591,12 +1616,8 @@ bool PageManagerRedoDistributeItems(void **eleArry, uint32 eleNum)
Assert(!SS_ONDEMAND_REALTIME_BUILD_NORMAL);
// double check
if (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) {
ereport(WARNING, (errcode(ERRCODE_LOG),
errmsg("[On-demand] Parse buffer num approach critical value, distribute block record by force,"
" slotid %d, usedblknum %d, totalblknum %d", g_redoWorker->slotId,
pg_atomic_read_u32(&g_dispatcher->parseManager.memctl.usedblknum),
g_dispatcher->parseManager.memctl.totalblknum)));
RedoPageManagerDistributeBlockRecord(NULL);
ReleaseRecParseStateUntilRedoNotPause();
}
continue;
}
@ -2531,6 +2552,13 @@ void StartupSendFowarder(RedoItem *item)
AddPageRedoItem(g_dispatcher->auxiliaryLine.ctrlThd, item);
}
void StartupSendMarkToBatchRedo(RedoItem *item)
{
for (uint32 i = 0; i < g_dispatcher->pageLineNum; ++i) {
AddPageRedoItem(g_dispatcher->pageLines[i].batchThd, item);
}
}
void SendLsnFowarder()
{
// update and read in the same thread, so no need atomic operation
@ -3401,12 +3429,12 @@ void OndemandCtrlWorkerMain()
OndemandUpdateXLogParseMemUsedBlkNum();
CountAndGetRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_1], g_redoWorker->timeCostList[TIME_COST_STEP_2]);
OndemandRequestPrimaryDoCkptIfNeed();
OndemandProcPauseStatus();
CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_2]);
RedoInterruptCallBack();
ADD_ABNORMAL_POSITION(13);
pg_usleep(500000L); /* 500 ms */
pg_usleep(50000L); /* 50 ms */
} while (true);
SPSCBlockingQueuePop(g_redoWorker->queue);
@ -3988,48 +4016,92 @@ static XLogRecPtr RequestPrimaryCkptAndUpdateCkptRedoPtr()
}
static void OndemandPauseRedoAndRequestPrimaryDoCkpt(OndemandCheckPauseCB activatePauseFunc,
OndemandCheckPauseCB inactivatePauseFunc, OndemandRefreshPauseStatusCB refreshPauseStatusFunc,
ondemand_recovery_pause_status_t pauseState)
OndemandCheckPauseCB continuePauseFunc, OndemandProcPauseStatusCB refreshPauseStatusFunc,
OndemandProcPauseStatusCB logPauseStatusFunc, ondemand_recovery_pause_status_t pauseState,
bool onlyInRealtimeBuild)
{
while (activatePauseFunc() && SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
if (onlyInRealtimeBuild && !SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
return;
}
if (activatePauseFunc()) {
int sleepTime = 0;
int level = SS_ONDEMAND_REALTIME_BUILD_NORMAL ? LOG : WARNING;
g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status = pauseState;
(void)RequestPrimaryCkptAndUpdateCkptRedoPtr();
ereport(level, (errcode(ERRCODE_LOG),
errmsg("[On-demand] ondemand recovery meet pause status, type %d", pauseState)));
do {
// other redo workers will proc pause state directly if primary node crash
if (SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
(void)RequestPrimaryCkptAndUpdateCkptRedoPtr();
}
if ((inactivatePauseFunc != NULL) && !inactivatePauseFunc()) {
break;
}
if (refreshPauseStatusFunc != NULL) {
refreshPauseStatusFunc();
}
if (refreshPauseStatusFunc != NULL) {
refreshPauseStatusFunc();
}
RedoInterruptCallBack();
pg_usleep(100000L); /* 100 ms */
if (sleepTime++ % ONDEMAND_LOG_PAUSE_STATUS_TIME == 0) {
logPauseStatusFunc();
sleepTime = 1;
}
RedoInterruptCallBack();
pg_usleep(100000L); /* 100 ms */
} while (continuePauseFunc());
}
g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status = NOT_PAUSE;
}
void OndemandRequestPrimaryDoCkptIfNeed()
static void OndemandLogHashMapUsedStatus()
{
if (!SS_ONDEMAND_REALTIME_BUILD_NORMAL) {
return;
}
ereport(LOG, (errcode(ERRCODE_LOG),
errmsg("[On-demand] hashmap usedblknum %lu, totalblknum %lu, pause value %lu, continue value %lu",
pg_atomic_read_u32(&g_dispatcher->parseManager.memctl.usedblknum),
g_dispatcher->parseManager.memctl.totalblknum, g_ondemandXLogParseMemFullValue,
g_ondemandXLogParseMemCancelPauseVaule)));
}
static void OndemandLogTrxnQueueUsedStatus()
{
ereport(LOG, (errcode(ERRCODE_LOG),
errmsg("[On-demand] trxn queue usedblknum %lu, totalblknum %lu, pause value %lu",
SPSCGetQueueCount(g_dispatcher->trxnQueue), REALTIME_BUILD_RECORD_QUEUE_SIZE,
g_ondemandRealtimeBuildQueueFullValue)));
}
static void OndemandLogSegQueueUsedStatus()
{
ereport(LOG, (errcode(ERRCODE_LOG),
errmsg("[On-demand] seg queue usedblknum %lu, totalblknum %lu, pause value %lu",
SPSCGetQueueCount(g_dispatcher->segQueue), REALTIME_BUILD_RECORD_QUEUE_SIZE,
g_ondemandRealtimeBuildQueueFullValue)));
}
static void OndemandLogSyncRecordStatus()
{
XLogRecPtr ckptRedoPtr = pg_atomic_read_u64(&g_dispatcher->ckptRedoPtr);
XLogRecPtr syncRecordPtr = pg_atomic_read_u64(&g_dispatcher->syncRecordPtr);
ereport(LOG, (errcode(ERRCODE_LOG),
errmsg("[On-demand] primary checkpoint redo lsn %X/%X, sync record lsn %X/%X",
(uint32)(ckptRedoPtr >> 32), (uint32)ckptRedoPtr, (uint32)(syncRecordPtr >> 32), (uint32)syncRecordPtr)));
}
void OndemandProcPauseStatus()
{
/* check whether parse mem is not enough */
OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandXLogParseMemApproachFull, &OndemandXLogParseMemFull,
&OndemandUpdateXLogParseMemUsedBlkNum, PAUSE_FOR_PRUNE_HASHMAP);
OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandXLogParseMemFull, &OndemandXLogParseMemApproachFull,
&OndemandUpdateXLogParseMemUsedBlkNum, &OndemandLogHashMapUsedStatus, PAUSE_FOR_PRUNE_HASHMAP, false);
/* check whether trxn record queue is full */
OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandTrxnQueueFullInRealtimeBuild, NULL, NULL,
PAUSE_FOR_PRUNE_TRXN_QUEUE);
OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandTrxnQueueFullInRealtimeBuild,
&OndemandTrxnQueueFullInRealtimeBuild, NULL, &OndemandLogTrxnQueueUsedStatus, PAUSE_FOR_PRUNE_TRXN_QUEUE, true);
/* check whether seg record queue is full */
OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandSegQueueFullInRealtimeBuild, NULL, NULL,
PAUSE_FOR_PRUNE_SEG_QUEUE);
OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandSegQueueFullInRealtimeBuild,
&OndemandTrxnQueueFullInRealtimeBuild, NULL, &OndemandLogSegQueueUsedStatus, PAUSE_FOR_PRUNE_SEG_QUEUE, true);
/* check whether redo workers need handle sync record */
OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandNeedHandleSyncRecord, NULL, NULL,
PAUSE_FOR_SYNC_REDO);
OndemandPauseRedoAndRequestPrimaryDoCkpt(&OndemandNeedHandleSyncRecord, &OndemandNeedHandleSyncRecord, NULL,
&OndemandLogSyncRecordStatus, PAUSE_FOR_SYNC_REDO, true);
}
bool SSXLogParseRecordNeedReplayInOndemandRealtimeBuild(XLogRecParseState *redoblockstate)

View File

@ -92,23 +92,24 @@ void *OndemandXLogMemCtlInit(RedoMemManager *memctl, Size itemsize, int itemnum)
memctl->memslot[i - 1].buf_id = i; /* start from 1 , 0 is invalidbuffer */
memctl->memslot[i - 1].freeNext = i - 1;
}
memctl->firstfreeslot = memctl->totalblknum;
memctl->firstreleaseslot = InvalidBuffer;
// only used firstreleaseslot of globalmemctl
memctl->firstfreeslot = InvalidBuffer;
memctl->firstreleaseslot = memctl->totalblknum;
return (void *)t_thrd.storage_cxt.ondemandXLogMem;
}
static RedoMemSlot *OndemandGlobalXLogMemAlloc()
{
RedoMemManager *glbmemctl = &ondemand_extreme_rto::g_dispatcher->parseManager.memctl;
Buffer firstfreebuffer = AtomicReadBuffer(&glbmemctl->firstfreeslot);
while (firstfreebuffer != InvalidBuffer) {
RedoMemSlot *firstfreeslot = &glbmemctl->memslot[firstfreebuffer - 1];
Buffer nextfreebuffer = firstfreeslot->freeNext;
if (AtomicCompareExchangeBuffer(&glbmemctl->firstfreeslot, &firstfreebuffer, nextfreebuffer)) {
firstfreeslot->freeNext = InvalidBuffer;
return firstfreeslot;
Buffer firstreleasebuffer = AtomicReadBuffer(&glbmemctl->firstreleaseslot);
while (firstreleasebuffer != InvalidBuffer) {
RedoMemSlot *firstreleaseslot = &glbmemctl->memslot[firstreleasebuffer - 1];
Buffer nextreleasebuffer = firstreleaseslot->freeNext;
if (AtomicCompareExchangeBuffer(&glbmemctl->firstreleaseslot, &firstreleasebuffer, nextreleasebuffer)) {
firstreleaseslot->freeNext = InvalidBuffer;
return firstreleaseslot;
}
firstfreebuffer = AtomicReadBuffer(&glbmemctl->firstfreeslot);
firstreleasebuffer = AtomicReadBuffer(&glbmemctl->firstreleaseslot);
}
return NULL;
}
@ -116,10 +117,10 @@ static RedoMemSlot *OndemandGlobalXLogMemAlloc()
static void OndemandGlobalXLogMemReleaseIfNeed(RedoMemManager *memctl)
{
RedoMemManager *glbmemctl = &ondemand_extreme_rto::g_dispatcher->parseManager.memctl;
if (AtomicReadBuffer(&glbmemctl->firstfreeslot) == InvalidBuffer) {
if (AtomicReadBuffer(&glbmemctl->firstreleaseslot) == InvalidBuffer) {
Buffer firstreleaseslot = AtomicExchangeBuffer(&memctl->firstreleaseslot, InvalidBuffer);
Buffer invalidbuffer = InvalidBuffer;
if (!AtomicCompareExchangeBuffer(&glbmemctl->firstfreeslot, &invalidbuffer, firstreleaseslot)) {
if (!AtomicCompareExchangeBuffer(&glbmemctl->firstreleaseslot, &invalidbuffer, firstreleaseslot)) {
AtomicWriteBuffer(&memctl->firstreleaseslot, firstreleaseslot);
}
}
@ -156,18 +157,22 @@ RedoMemSlot *OndemandXLogMemAlloc(RedoMemManager *memctl)
void OndemandXLogMemRelease(RedoMemManager *memctl, Buffer bufferid)
{
RedoMemSlot *bufferslot;
RedoMemManager *releasememctl = memctl;
if (!RedoMemIsValid(memctl, bufferid)) {
ereport(PANIC, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
errmsg("XLogMemRelease failed!, totalblknum:%u, buf_id:%u", memctl->totalblknum, bufferid)));
/* panic */
}
bufferslot = &(memctl->memslot[bufferid - 1]);
Assert(bufferslot->freeNext == InvalidBuffer);
Buffer oldFirst = AtomicReadBuffer(&memctl->firstreleaseslot);
// release to global firstreleaseslot directly if hashmap full
if (unlikely(SS_ONDEMAND_RECOVERY_HASHMAP_FULL)) {
releasememctl = &ondemand_extreme_rto::g_dispatcher->parseManager.memctl;
}
Buffer oldFirst = AtomicReadBuffer(&releasememctl->firstreleaseslot);
pg_memory_barrier();
do {
AtomicWriteBuffer(&bufferslot->freeNext, oldFirst);
} while (!AtomicCompareExchangeBuffer(&memctl->firstreleaseslot, &oldFirst, bufferid));
} while (!AtomicCompareExchangeBuffer(&releasememctl->firstreleaseslot, &oldFirst, bufferid));
pg_atomic_fetch_sub_u32(&memctl->usedblknum, 1);
OndemandGlobalXLogMemReleaseIfNeed(memctl);
@ -577,4 +582,11 @@ XLogRecPtr GetRedoLocInCheckpointRecord(XLogReaderState *record)
checkPoint = checkPointUndo.ori_checkpoint;
}
return checkPoint.redo;
}
void OnDemandNotifyHashMapPruneIfNeed()
{
if (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) {
ondemand_extreme_rto::StartupSendMarkToBatchRedo(&ondemand_extreme_rto::g_hashmapPruneMark);
}
}

View File

@ -265,6 +265,7 @@ List *CheckImcompleteAction(List *imcompleteActionList);
void SetPageWorkStateByThreadId(uint32 threadState);
void GetReplayedRecPtr(XLogRecPtr *startPtr, XLogRecPtr *endPtr);
void StartupSendFowarder(RedoItem *item);
void StartupSendMarkToBatchRedo(RedoItem *item);
XLogRecPtr GetSafeMinCheckPoint();
RedoWaitInfo redo_get_io_event(int32 event_id);
void redo_get_worker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);

View File

@ -41,10 +41,11 @@
namespace ondemand_extreme_rto {
#define ONDEMAND_DISTRIBUTE_RATIO 0.95
#define ONDEMAND_DISTRIBUTE_CANCEL_RATIO 0.5
#define ONDEMAND_FORCE_PRUNE_RATIO 0.99
#define ONDEMAND_HASHTAB_SWITCH_LIMIT 100000
#define SEG_PROC_PIPELINE_SLOT 0
#define ONDEMAND_LOG_PAUSE_STATUS_TIME 30
#define ONDEMAND_HASHMAP_ENTRY_REDO_DONE 0
#define ONDEMAND_HASHMAP_ENTRY_REDOING 1
@ -58,11 +59,11 @@ static const uint32 MAX_REMOTE_READ_INFO_NUM = 100;
static const uint32 ADVANCE_GLOBALLSN_INTERVAL = 1; /* unit second */
extern uint32 g_ondemandXLogParseMemFullValue;
extern uint32 g_ondemandXLogParseMemApproachFullVaule;
extern uint32 g_ondemandXLogParseMemCancelPauseVaule;
extern uint32 g_ondemandRealtimeBuildQueueFullValue;
typedef bool (*OndemandCheckPauseCB)(void);
typedef void (*OndemandRefreshPauseStatusCB)(void);
typedef void (*OndemandProcPauseStatusCB)(void);
typedef enum {
REDO_BATCH,
@ -276,7 +277,7 @@ int checkBlockRedoStateAndTryHashMapLock(BufferDesc* bufHdr, ForkNumber forkNum,
bool checkBlockRedoDoneFromHashMapAndLock(LWLock **lock, RedoItemTag redoItemTag, RedoItemHashEntry **redoItemEntry,
bool holdLock);
void RedoWorkerQueueCallBack();
void OndemandRequestPrimaryDoCkptIfNeed();
void OndemandProcPauseStatus();
void GetOndemandRecoveryStatus(ondemand_recovery_stat *stat);
void ReleaseBlockParseStateIfNotReplay(XLogRecParseState *preState, bool isChildState = false);
bool SSXLogParseRecordNeedReplayInOndemandRealtimeBuild(XLogRecParseState *redoblockstate);

View File

@ -51,6 +51,7 @@ void OnDemandWaitRealtimeBuildShutDown();
void OnDemandBackupControlFile(ControlFileData* controlFile);
XLogRecPtr GetRedoLocInCheckpointRecord(XLogReaderState *record);
void OnDemandUpdateRealtimeBuildPrunePtr();
void OnDemandNotifyHashMapPruneIfNeed();
XLogRecParseType GetCurrentXLogRecParseType(XLogRecParseState *preState);
bool IsRecParseStateHaveChildState(XLogRecParseState *checkState);