diff --git a/src/gausskernel/storage/access/transam/extreme_rto/batch_redo.cpp b/src/gausskernel/storage/access/transam/extreme_rto/batch_redo.cpp index 1f41cdc6b..c81eb9f5e 100644 --- a/src/gausskernel/storage/access/transam/extreme_rto/batch_redo.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/batch_redo.cpp @@ -144,7 +144,7 @@ void PRTrackRemoveEntry(HTAB *hashMap, RedoItemHashEntry *entry) DoRecordCheck(prev, InvalidXLogRecPtr, false); } - ereport(DEBUG1, (errmsg("PRTrackRemoveEntry:record(%X/%X) relation %u/%u/%u forknum %u blocknum %u dropped(%p)", + ereport(LOG, (errmsg("PRTrackRemoveEntry:record(%X/%X) relation %u/%u/%u forknum %u blocknum %u dropped(%p)", (uint32)(prev->blockparse.blockhead.end_ptr >> 32), (uint32)(prev->blockparse.blockhead.end_ptr), prev->blockparse.blockhead.spcNode, prev->blockparse.blockhead.dbNode, prev->blockparse.blockhead.relNode, diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/batch_redo.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/batch_redo.cpp index 99e02ae6b..6bc394c8e 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/batch_redo.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/batch_redo.cpp @@ -278,7 +278,7 @@ void PRTrackRemoveEntry(HTAB *hashMap, RedoItemHashEntry *entry) DoRecordCheck(prev, InvalidXLogRecPtr, false); } - ereport(LOG, (errmsg("PRTrackRemoveEntry:record(%X/%X) relation %u/%u/%u forknum %u blocknum %u dropped(%p)", + ereport(DEBUG1, (errmsg("PRTrackRemoveEntry:record(%X/%X) relation %u/%u/%u forknum %u blocknum %u dropped(%p)", (uint32)(prev->blockparse.blockhead.end_ptr >> 32), (uint32)(prev->blockparse.blockhead.end_ptr), prev->blockparse.blockhead.spcNode, prev->blockparse.blockhead.dbNode, prev->blockparse.blockhead.relNode, diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp index c67001534..88c296a61 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp @@ -147,6 +147,7 @@ static bool tryLockHashMap(LWLock *lock, HTAB *hashMap, RedoItemTag redoItemTag, static void PageManagerPruneIfRealtimeBuildFailover(); static void RealtimeBuildReleaseRecoveryLatch(int code, Datum arg); static void OnDemandPageManagerRedoSegParseState(XLogRecParseState *preState); +static void SegWorkerRedoAllSegBlockRecord(); RefOperate recordRefOperate = { AddRefRecord, @@ -927,7 +928,7 @@ void RedoPageManagerDdlAction(XLogRecParseState *parsestate) case BLOCK_DATA_SEG_FULL_SYNC_TYPE: case BLOCK_DATA_SEG_EXTEND: { - /* PARSE_TYPE_SEG will handle by seg worker, function SSProcSegPageCommonRedo */ + /* PARSE_TYPE_SEG will handle by seg worker, function SSProcSegParseState */ Assert(GetCurrentXLogRecParseType(parsestate) == PARSE_TYPE_DDL); } ProcSegPageCommonRedo(parsestate); @@ -1249,49 +1250,20 @@ static void SSReleaseRefRecordWithoutReplay(XLogRecParseState *redoblockstate) redoblockstate->refrecord = NULL; } -static void SSProcSegPageCommonRedo(XLogRecParseState *parseState) -{ - uint8 info = XLogBlockHeadGetInfo(&parseState->blockparse.blockhead) & ~XLR_INFO_MASK; - switch (info) { - // has child list - case XLOG_SEG_ATOMIC_OPERATION: - case XLOG_SEG_SEGMENT_EXTEND: - case XLOG_SEG_INIT_MAPPAGE: - case XLOG_SEG_INIT_INVRSPTR_PAGE: - case XLOG_SEG_ADD_NEW_GROUP: - { - XLogRecParseState *child = - (XLogRecParseState *)parseState->blockparse.extra_rec.blocksegfullsyncrec.childState; - AddSegHashMap(child); - break; - } - case XLOG_SEG_CREATE_EXTENT_GROUP: - case XLOG_SEG_SPACE_SHRINK: - case XLOG_SEG_NEW_PAGE: - case XLOG_SEG_SPACE_DROP: - Assert(!SS_ONDEMAND_REALTIME_BUILD_NORMAL); - ProcSegPageCommonRedo(parseState); - break; - default: - ereport(PANIC, (errmsg("SSProcSegPageCommonRedo: unknown op code %u", info))); - break; - } -} - -void OnDemandSegWorkerProcSegFullSyncState(XLogRecParseState *parsestate) +void SSProcSegFullSyncState(XLogRecParseState *parsestate) { MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx); - SSProcSegPageCommonRedo(parsestate); + ProcSegPageCommonRedo(parsestate); (void)MemoryContextSwitchTo(oldCtx); parsestate->nextrecord = NULL; XLogBlockParseStateRelease(parsestate); } -void OnDemandSegWorkerProcSegPipeLineSyncState(XLogRecParseState *parseState) +void SSProcSegPipeLineSyncState(XLogRecParseState *parseState) { MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx); - SSProcSegPageCommonRedo(parseState); + ProcSegPageCommonRedo(parseState); (void)MemoryContextSwitchTo(oldCtx); XLogBlockParseStateRelease(parseState); @@ -1312,14 +1284,36 @@ static void WaitNextBarrier(XLogRecParseState *parseState) } } -static void OnDemandSegWorkerRedoSegParseState(XLogRecParseState *preState) +static void SegWorkerProcSegRecordQueuePrune() +{ + // do not distribute g_hashmapPruneMark, because hashMapMng already prune MAX for seg record + XLogRecPtr ckptPtr = pg_atomic_read_u64(&g_dispatcher->ckptRedoPtr); + XLogRecPtr prunePtr = InvalidXLogRecPtr; + do { + prunePtr = pg_atomic_read_u64(&g_dispatcher->pageLines[SEG_PROC_PIPELINE_SLOT].htabThd->nextPrunePtr); + RedoInterruptCallBack(); + pg_usleep(100000L); /* 100 ms */ + } while (XLByteLT(prunePtr, ckptPtr)); + g_redoWorker->nextPrunePtr = ckptPtr; +} + +static void SegWorkerRedoIfRealtimeBuildFailover() +{ + if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && g_redoWorker->inRealtimeBuild) { + SegWorkerProcSegRecordQueuePrune(); + SegWorkerRedoAllSegBlockRecord(); + g_redoWorker->inRealtimeBuild = false; + } +} + +static void SSProcSegParseState(XLogRecParseState *preState) { switch (preState->blockparse.blockhead.block_valid) { case BLOCK_DATA_SEG_EXTEND: - OnDemandSegWorkerProcSegPipeLineSyncState(preState); + SSProcSegPipeLineSyncState(preState); break; case BLOCK_DATA_SEG_FULL_SYNC_TYPE: - OnDemandSegWorkerProcSegFullSyncState(preState); + SSProcSegFullSyncState(preState); break; case BLOCK_DATA_SEG_FILE_EXTEND_TYPE: default: @@ -1330,6 +1324,16 @@ static void OnDemandSegWorkerRedoSegParseState(XLogRecParseState *preState) } } +static void OnDemandSegWorkerRedoSegParseState(XLogRecParseState *preState) +{ + SegWorkerRedoIfRealtimeBuildFailover(); + if (g_redoWorker->inRealtimeBuild) { + AddSegHashMap(preState); + } else { + SSProcSegParseState(preState); + } +} + static void OnDemandDispatchSegParseStateToSegWorker(XLogRecParseState *preState) { Assert(g_redoWorker->slotId == SEG_PROC_PIPELINE_SLOT); @@ -1979,7 +1983,7 @@ static void SegWorkerRedoAllSegBlockRecord() { while (!SPSCBlockingQueueIsEmpty(g_dispatcher->segQueue)) { XLogRecParseState *segRecord = (XLogRecParseState *)SPSCBlockingQueueTop(g_dispatcher->segQueue); - SegPageRedoChildState(segRecord); + SSProcSegParseState(segRecord); SPSCBlockingQueuePop(g_dispatcher->segQueue); } } @@ -3329,15 +3333,7 @@ void HashMapManagerMain() if (XLByteLT(ckptRedoPtr, segRecord->blockparse.blockhead.end_ptr)) { break; } -#ifdef USE_ASSERT_CHECKING - XLogRecParseState *procState = segRecord; - while (procState != NULL) { - XLogRecParseState *redoblockstate = procState; - procState = (XLogRecParseState *)procState->nextrecord; - DoRecordCheck(redoblockstate, InvalidXLogRecPtr, false); - } -#endif - XLogBlockParseStateRelease(segRecord); + ReleaseBlockParseStateIfNotReplay(segRecord); SPSCBlockingQueuePop(g_dispatcher->segQueue); } diff --git a/src/include/access/xlogproc.h b/src/include/access/xlogproc.h index 88ca8e945..0e3766bfd 100755 --- a/src/include/access/xlogproc.h +++ b/src/include/access/xlogproc.h @@ -1168,7 +1168,6 @@ XLogRecParseState* xlog_redo_parse_to_block(XLogReaderState* record, uint32* blo XLogRecParseState* smgr_redo_parse_to_block(XLogReaderState* record, uint32* blocknum); XLogRecParseState* segpage_redo_parse_to_block(XLogReaderState* record, uint32* blocknum); void ProcSegPageCommonRedo(XLogRecParseState *parseState); -void SegPageRedoChildState(XLogRecParseState *childStateList); void ProcSegPageJustFreeChildState(XLogRecParseState *parseState); XLogRecParseState* XactXlogClogParseToBlock(XLogReaderState* record, XLogRecParseState* recordstatehead, uint32* blocknum, TransactionId xid, int nsubxids, TransactionId* subxids, CLogXidStatus status);