diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index a8572c79c..f88d6f97f 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -2172,7 +2172,7 @@ int CBOndemandRedoPageForStandby(void *block_key, int32 *redo_status) } if (SS_IN_REFORM) { - ereport(WARNING, (errmodule(MOD_DMS), + ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[SS][On-demand][%u/%u/%u/%d %d-%u] Reform happend when primary redo page for standby," "return ONDEMAND_REDO_FAIL.", tag->rnode.spcNode, tag->rnode.dbNode, diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp index c73a4c206..e4b16d88f 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp @@ -2612,10 +2612,16 @@ void StartupSendFowarder(RedoItem *item) AddPageRedoItem(g_dispatcher->auxiliaryLine.ctrlThd, item); } -void StartupSendMarkToBatchRedo(RedoItem *item) +void StartupSendHashmapPruneMarkToBatchRedo() { for (uint32 i = 0; i < g_dispatcher->pageLineNum; ++i) { - AddPageRedoItem(g_dispatcher->pageLines[i].batchThd, item); + if (SPSCBlockingQueueIsFull(g_dispatcher->pageLines[i].batchThd->queue)) { + ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("[On-demand]StartupSendHashmapPruneMarkToBatchRedo, " + "pageline %d is full, don't send mark.", i))); + continue; + } + AddPageRedoItem(g_dispatcher->pageLines[i].batchThd, &ondemand_extreme_rto::g_hashmapPruneMark); } } diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp index e71ca3095..cce0e24a4 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp @@ -729,7 +729,7 @@ XLogRecPtr GetRedoLocInCheckpointRecord(XLogReaderState *record) void OnDemandNotifyHashMapPruneIfNeed() { if (SS_ONDEMAND_RECOVERY_HASHMAP_FULL) { - ondemand_extreme_rto::StartupSendMarkToBatchRedo(&ondemand_extreme_rto::g_hashmapPruneMark); + ondemand_extreme_rto::StartupSendHashmapPruneMarkToBatchRedo(); } } diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/spsc_blocking_queue.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/spsc_blocking_queue.cpp index 8b9a340af..d78c75a11 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/spsc_blocking_queue.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/spsc_blocking_queue.cpp @@ -229,6 +229,13 @@ bool SPSCBlockingQueueIsEmpty(SPSCBlockingQueue *queue) return (COUNT(head, tail, queue->mask) == 0); } +bool SPSCBlockingQueueIsFull(SPSCBlockingQueue *queue) +{ + uint32 head = pg_atomic_read_u32(&queue->writeHead); + uint32 tail = pg_atomic_read_u32(&queue->readTail); + return (SPACE(head, tail, queue->mask) == 0); +} + void *SPSCBlockingQueueTop(SPSCBlockingQueue *queue) { uint32 head; diff --git a/src/include/access/ondemand_extreme_rto/dispatcher.h b/src/include/access/ondemand_extreme_rto/dispatcher.h index 938802ffd..c2e69d89b 100644 --- a/src/include/access/ondemand_extreme_rto/dispatcher.h +++ b/src/include/access/ondemand_extreme_rto/dispatcher.h @@ -267,7 +267,7 @@ List *CheckImcompleteAction(List *imcompleteActionList); void SetPageWorkStateByThreadId(uint32 threadState); void GetReplayedRecPtr(XLogRecPtr *startPtr, XLogRecPtr *endPtr); void StartupSendFowarder(RedoItem *item); -void StartupSendMarkToBatchRedo(RedoItem *item); +void StartupSendHashmapPruneMarkToBatchRedo(); XLogRecPtr GetSafeMinCheckPoint(); RedoWaitInfo redo_get_io_event(int32 event_id); void redo_get_worker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen); diff --git a/src/include/access/ondemand_extreme_rto/spsc_blocking_queue.h b/src/include/access/ondemand_extreme_rto/spsc_blocking_queue.h index e0bb26872..f5efa9d6f 100644 --- a/src/include/access/ondemand_extreme_rto/spsc_blocking_queue.h +++ b/src/include/access/ondemand_extreme_rto/spsc_blocking_queue.h @@ -52,6 +52,7 @@ void SPSCBlockingQueueDestroy(SPSCBlockingQueue *queue); bool SPSCBlockingQueuePut(SPSCBlockingQueue *queue, void *element); void *SPSCBlockingQueueTake(SPSCBlockingQueue *queue); bool SPSCBlockingQueueIsEmpty(SPSCBlockingQueue *queue); +bool SPSCBlockingQueueIsFull(SPSCBlockingQueue *queue); void *SPSCBlockingQueueTop(SPSCBlockingQueue *queue); void SPSCBlockingQueuePop(SPSCBlockingQueue *queue); void DumpQueue(const SPSCBlockingQueue *queue);