解决按需回放实时构建下,段页式日志回放顺序可能不一致问题

This commit is contained in:
chendong76
2024-05-15 15:21:02 +08:00
committed by yaoxin
parent e6af95bd73
commit 3a1bd80a5b
4 changed files with 45 additions and 50 deletions

View File

@ -144,7 +144,7 @@ void PRTrackRemoveEntry(HTAB *hashMap, RedoItemHashEntry *entry)
DoRecordCheck(prev, InvalidXLogRecPtr, false);
}
ereport(DEBUG1, (errmsg("PRTrackRemoveEntry:record(%X/%X) relation %u/%u/%u forknum %u blocknum %u dropped(%p)",
ereport(LOG, (errmsg("PRTrackRemoveEntry:record(%X/%X) relation %u/%u/%u forknum %u blocknum %u dropped(%p)",
(uint32)(prev->blockparse.blockhead.end_ptr >> 32),
(uint32)(prev->blockparse.blockhead.end_ptr), prev->blockparse.blockhead.spcNode,
prev->blockparse.blockhead.dbNode, prev->blockparse.blockhead.relNode,

View File

@ -278,7 +278,7 @@ void PRTrackRemoveEntry(HTAB *hashMap, RedoItemHashEntry *entry)
DoRecordCheck(prev, InvalidXLogRecPtr, false);
}
ereport(LOG, (errmsg("PRTrackRemoveEntry:record(%X/%X) relation %u/%u/%u forknum %u blocknum %u dropped(%p)",
ereport(DEBUG1, (errmsg("PRTrackRemoveEntry:record(%X/%X) relation %u/%u/%u forknum %u blocknum %u dropped(%p)",
(uint32)(prev->blockparse.blockhead.end_ptr >> 32),
(uint32)(prev->blockparse.blockhead.end_ptr), prev->blockparse.blockhead.spcNode,
prev->blockparse.blockhead.dbNode, prev->blockparse.blockhead.relNode,

View File

@ -147,6 +147,7 @@ static bool tryLockHashMap(LWLock *lock, HTAB *hashMap, RedoItemTag redoItemTag,
static void PageManagerPruneIfRealtimeBuildFailover();
static void RealtimeBuildReleaseRecoveryLatch(int code, Datum arg);
static void OnDemandPageManagerRedoSegParseState(XLogRecParseState *preState);
static void SegWorkerRedoAllSegBlockRecord();
RefOperate recordRefOperate = {
AddRefRecord,
@ -927,7 +928,7 @@ void RedoPageManagerDdlAction(XLogRecParseState *parsestate)
case BLOCK_DATA_SEG_FULL_SYNC_TYPE:
case BLOCK_DATA_SEG_EXTEND:
{
/* PARSE_TYPE_SEG will handle by seg worker, function SSProcSegPageCommonRedo */
/* PARSE_TYPE_SEG will handle by seg worker, function SSProcSegParseState */
Assert(GetCurrentXLogRecParseType(parsestate) == PARSE_TYPE_DDL);
}
ProcSegPageCommonRedo(parsestate);
@ -1249,49 +1250,20 @@ static void SSReleaseRefRecordWithoutReplay(XLogRecParseState *redoblockstate)
redoblockstate->refrecord = NULL;
}
static void SSProcSegPageCommonRedo(XLogRecParseState *parseState)
{
uint8 info = XLogBlockHeadGetInfo(&parseState->blockparse.blockhead) & ~XLR_INFO_MASK;
switch (info) {
// has child list
case XLOG_SEG_ATOMIC_OPERATION:
case XLOG_SEG_SEGMENT_EXTEND:
case XLOG_SEG_INIT_MAPPAGE:
case XLOG_SEG_INIT_INVRSPTR_PAGE:
case XLOG_SEG_ADD_NEW_GROUP:
{
XLogRecParseState *child =
(XLogRecParseState *)parseState->blockparse.extra_rec.blocksegfullsyncrec.childState;
AddSegHashMap(child);
break;
}
case XLOG_SEG_CREATE_EXTENT_GROUP:
case XLOG_SEG_SPACE_SHRINK:
case XLOG_SEG_NEW_PAGE:
case XLOG_SEG_SPACE_DROP:
Assert(!SS_ONDEMAND_REALTIME_BUILD_NORMAL);
ProcSegPageCommonRedo(parseState);
break;
default:
ereport(PANIC, (errmsg("SSProcSegPageCommonRedo: unknown op code %u", info)));
break;
}
}
void OnDemandSegWorkerProcSegFullSyncState(XLogRecParseState *parsestate)
void SSProcSegFullSyncState(XLogRecParseState *parsestate)
{
MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx);
SSProcSegPageCommonRedo(parsestate);
ProcSegPageCommonRedo(parsestate);
(void)MemoryContextSwitchTo(oldCtx);
parsestate->nextrecord = NULL;
XLogBlockParseStateRelease(parsestate);
}
void OnDemandSegWorkerProcSegPipeLineSyncState(XLogRecParseState *parseState)
void SSProcSegPipeLineSyncState(XLogRecParseState *parseState)
{
MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx);
SSProcSegPageCommonRedo(parseState);
ProcSegPageCommonRedo(parseState);
(void)MemoryContextSwitchTo(oldCtx);
XLogBlockParseStateRelease(parseState);
@ -1312,14 +1284,36 @@ static void WaitNextBarrier(XLogRecParseState *parseState)
}
}
static void OnDemandSegWorkerRedoSegParseState(XLogRecParseState *preState)
static void SegWorkerProcSegRecordQueuePrune()
{
// do not distribute g_hashmapPruneMark, because hashMapMng already prune MAX for seg record
XLogRecPtr ckptPtr = pg_atomic_read_u64(&g_dispatcher->ckptRedoPtr);
XLogRecPtr prunePtr = InvalidXLogRecPtr;
do {
prunePtr = pg_atomic_read_u64(&g_dispatcher->pageLines[SEG_PROC_PIPELINE_SLOT].htabThd->nextPrunePtr);
RedoInterruptCallBack();
pg_usleep(100000L); /* 100 ms */
} while (XLByteLT(prunePtr, ckptPtr));
g_redoWorker->nextPrunePtr = ckptPtr;
}
static void SegWorkerRedoIfRealtimeBuildFailover()
{
if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && g_redoWorker->inRealtimeBuild) {
SegWorkerProcSegRecordQueuePrune();
SegWorkerRedoAllSegBlockRecord();
g_redoWorker->inRealtimeBuild = false;
}
}
static void SSProcSegParseState(XLogRecParseState *preState)
{
switch (preState->blockparse.blockhead.block_valid) {
case BLOCK_DATA_SEG_EXTEND:
OnDemandSegWorkerProcSegPipeLineSyncState(preState);
SSProcSegPipeLineSyncState(preState);
break;
case BLOCK_DATA_SEG_FULL_SYNC_TYPE:
OnDemandSegWorkerProcSegFullSyncState(preState);
SSProcSegFullSyncState(preState);
break;
case BLOCK_DATA_SEG_FILE_EXTEND_TYPE:
default:
@ -1330,6 +1324,16 @@ static void OnDemandSegWorkerRedoSegParseState(XLogRecParseState *preState)
}
}
static void OnDemandSegWorkerRedoSegParseState(XLogRecParseState *preState)
{
SegWorkerRedoIfRealtimeBuildFailover();
if (g_redoWorker->inRealtimeBuild) {
AddSegHashMap(preState);
} else {
SSProcSegParseState(preState);
}
}
static void OnDemandDispatchSegParseStateToSegWorker(XLogRecParseState *preState)
{
Assert(g_redoWorker->slotId == SEG_PROC_PIPELINE_SLOT);
@ -1979,7 +1983,7 @@ static void SegWorkerRedoAllSegBlockRecord()
{
while (!SPSCBlockingQueueIsEmpty(g_dispatcher->segQueue)) {
XLogRecParseState *segRecord = (XLogRecParseState *)SPSCBlockingQueueTop(g_dispatcher->segQueue);
SegPageRedoChildState(segRecord);
SSProcSegParseState(segRecord);
SPSCBlockingQueuePop(g_dispatcher->segQueue);
}
}
@ -3329,15 +3333,7 @@ void HashMapManagerMain()
if (XLByteLT(ckptRedoPtr, segRecord->blockparse.blockhead.end_ptr)) {
break;
}
#ifdef USE_ASSERT_CHECKING
XLogRecParseState *procState = segRecord;
while (procState != NULL) {
XLogRecParseState *redoblockstate = procState;
procState = (XLogRecParseState *)procState->nextrecord;
DoRecordCheck(redoblockstate, InvalidXLogRecPtr, false);
}
#endif
XLogBlockParseStateRelease(segRecord);
ReleaseBlockParseStateIfNotReplay(segRecord);
SPSCBlockingQueuePop(g_dispatcher->segQueue);
}

View File

@ -1168,7 +1168,6 @@ XLogRecParseState* xlog_redo_parse_to_block(XLogReaderState* record, uint32* blo
XLogRecParseState* smgr_redo_parse_to_block(XLogReaderState* record, uint32* blocknum);
XLogRecParseState* segpage_redo_parse_to_block(XLogReaderState* record, uint32* blocknum);
void ProcSegPageCommonRedo(XLogRecParseState *parseState);
void SegPageRedoChildState(XLogRecParseState *childStateList);
void ProcSegPageJustFreeChildState(XLogRecParseState *parseState);
XLogRecParseState* XactXlogClogParseToBlock(XLogReaderState* record, XLogRecParseState* recordstatehead,
uint32* blocknum, TransactionId xid, int nsubxids, TransactionId* subxids, CLogXidStatus status);