【bugfix】修复按需回放build阶段有概率卡在StartupSendMarkToBatchRedo的问题
This commit is contained in:
@ -2172,7 +2172,7 @@ int CBOndemandRedoPageForStandby(void *block_key, int32 *redo_status)
|
||||
}
|
||||
|
||||
if (SS_IN_REFORM) {
|
||||
ereport(WARNING, (errmodule(MOD_DMS),
|
||||
ereport(DEBUG1, (errmodule(MOD_DMS),
|
||||
errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Reform happend when primary redo page for standby,"
|
||||
"return ONDEMAND_REDO_FAIL.",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode,
|
||||
|
||||
@ -2612,10 +2612,16 @@ void StartupSendFowarder(RedoItem *item)
|
||||
AddPageRedoItem(g_dispatcher->auxiliaryLine.ctrlThd, item);
|
||||
}
|
||||
|
||||
void StartupSendMarkToBatchRedo(RedoItem *item)
|
||||
void StartupSendHashmapPruneMarkToBatchRedo()
|
||||
{
|
||||
for (uint32 i = 0; i < g_dispatcher->pageLineNum; ++i) {
|
||||
AddPageRedoItem(g_dispatcher->pageLines[i].batchThd, item);
|
||||
if (SPSCBlockingQueueIsFull(g_dispatcher->pageLines[i].batchThd->queue)) {
|
||||
ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
|
||||
errmsg("[On-demand]StartupSendHashmapPruneMarkToBatchRedo, "
|
||||
"pageline %d is full, don't send mark.", i)));
|
||||
continue;
|
||||
}
|
||||
AddPageRedoItem(g_dispatcher->pageLines[i].batchThd, &ondemand_extreme_rto::g_hashmapPruneMark);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -729,7 +729,7 @@ XLogRecPtr GetRedoLocInCheckpointRecord(XLogReaderState *record)
|
||||
void OnDemandNotifyHashMapPruneIfNeed()
|
||||
{
|
||||
if (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) {
|
||||
ondemand_extreme_rto::StartupSendMarkToBatchRedo(&ondemand_extreme_rto::g_hashmapPruneMark);
|
||||
ondemand_extreme_rto::StartupSendHashmapPruneMarkToBatchRedo();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -229,6 +229,13 @@ bool SPSCBlockingQueueIsEmpty(SPSCBlockingQueue *queue)
|
||||
return (COUNT(head, tail, queue->mask) == 0);
|
||||
}
|
||||
|
||||
bool SPSCBlockingQueueIsFull(SPSCBlockingQueue *queue)
|
||||
{
|
||||
uint32 head = pg_atomic_read_u32(&queue->writeHead);
|
||||
uint32 tail = pg_atomic_read_u32(&queue->readTail);
|
||||
return (SPACE(head, tail, queue->mask) == 0);
|
||||
}
|
||||
|
||||
void *SPSCBlockingQueueTop(SPSCBlockingQueue *queue)
|
||||
{
|
||||
uint32 head;
|
||||
|
||||
@ -267,7 +267,7 @@ List *CheckImcompleteAction(List *imcompleteActionList);
|
||||
void SetPageWorkStateByThreadId(uint32 threadState);
|
||||
void GetReplayedRecPtr(XLogRecPtr *startPtr, XLogRecPtr *endPtr);
|
||||
void StartupSendFowarder(RedoItem *item);
|
||||
void StartupSendMarkToBatchRedo(RedoItem *item);
|
||||
void StartupSendHashmapPruneMarkToBatchRedo();
|
||||
XLogRecPtr GetSafeMinCheckPoint();
|
||||
RedoWaitInfo redo_get_io_event(int32 event_id);
|
||||
void redo_get_worker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
|
||||
|
||||
@ -52,6 +52,7 @@ void SPSCBlockingQueueDestroy(SPSCBlockingQueue *queue);
|
||||
bool SPSCBlockingQueuePut(SPSCBlockingQueue *queue, void *element);
|
||||
void *SPSCBlockingQueueTake(SPSCBlockingQueue *queue);
|
||||
bool SPSCBlockingQueueIsEmpty(SPSCBlockingQueue *queue);
|
||||
bool SPSCBlockingQueueIsFull(SPSCBlockingQueue *queue);
|
||||
void *SPSCBlockingQueueTop(SPSCBlockingQueue *queue);
|
||||
void SPSCBlockingQueuePop(SPSCBlockingQueue *queue);
|
||||
void DumpQueue(const SPSCBlockingQueue *queue);
|
||||
|
||||
Reference in New Issue
Block a user