diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 70a539f25..30d497a05 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -5314,9 +5314,15 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in latestValidRecord = t_thrd.xlog_cxt.ReadRecPtr; latestRecordCrc = record->xl_crc; latestRecordLen = record->xl_tot_len; + if (SS_REPLICATION_DORADO_CLUSTER) { + t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0; + } /* Great, got a record */ return record; } else { + if (SS_REPLICATION_DORADO_CLUSTER) { + t_thrd.xlog_cxt.ssXlogReadFailedTimes++; + } /* No valid record available from this source */ if (streamFailCount < XLOG_STREAM_READREC_MAXTRY) { streamFailCount++; @@ -8875,6 +8881,7 @@ void StartupXLOG(void) t_thrd.xlog_cxt.startup_processing = true; t_thrd.xlog_cxt.RedoDone = false; t_thrd.xlog_cxt.currentRetryTimes = 0; + t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0; t_thrd.xlog_cxt.forceFinishHappened = false; g_instance.comm_cxt.predo_cxt.redoPf.recovery_done_ptr = 0; g_instance.comm_cxt.predo_cxt.redoPf.redo_done_time = 0; @@ -19370,7 +19377,6 @@ static int SSDoradoReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePt XLogRecPtr RecPtr = targetPagePtr; uint32 targetPageOff; bool processtrxn = false; - bool fetching_ckpt = readprivate->fetching_ckpt; bool randAccess = readprivate->randAccess; XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl; XLogSegNo replayedSegNo; @@ -19484,12 +19490,6 @@ retry: pg_memory_barrier(); if (SS_REPLICATION_MAIN_STANBY_NODE && WalRcvIsDone() && CheckForFailoverTrigger()) { - t_thrd.xlog_cxt.receivedUpto = GetWalRcvWriteRecPtr(NULL); - if (XLByteLT(RecPtr, t_thrd.xlog_cxt.receivedUpto)) { - /* wait xlog redo done */ - continue; - } - ProcTxnWorkLoad(true); ereport(LOG, (errmsg("RecPtr(%X/%X), receivedUpto(%X/%X).", (uint32)(RecPtr >> 32), (uint32)RecPtr, (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), @@ -19524,124 +19524,97 @@ retry: close(t_thrd.xlog_cxt.readFile); t_thrd.xlog_cxt.readFile = -1; } + t_thrd.xlog_cxt.failedSources = 0; /* Reset curFileTLI if random fetch. */ if (randAccess) { t_thrd.xlog_cxt.curFileTLI = 0; } - + if (SS_REPLICATION_MAIN_STANBY_NODE && CheckForFailoverTrigger()) { + goto triggered; + } /* * Try to restore the file from archive, or read an * existing file from pg_xlog. */ sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; - if (XLByteLE(t_thrd.xlog_cxt.receivedUpto, expectedRecPtr)) { - t_thrd.xlog_cxt.failedSources = 0; - /* - * Before we sleep, re-scan for possible new timelines - * if we were requested to recover to the latest - * timeline. - */ - if (t_thrd.xlog_cxt.recoveryTargetIsLatest) { - if (rescanLatestTimeLine()) { - continue; - } + if (t_thrd.startup_cxt.shutdown_requested) { + ereport(LOG, (errmsg("startup shutdown"))); + proc_exit(0); + } + + /* + * Before we sleep, re-scan for possible new timelines + * if we were requested to recover to the latest + * timeline. + */ + if (t_thrd.xlog_cxt.recoveryTargetIsLatest) { + if (rescanLatestTimeLine()) { + continue; } + } - if (!xlogctl->IsRecoveryDone) { - g_instance.comm_cxt.predo_cxt.redoPf.redo_done_time = GetCurrentTimestamp(); - g_instance.comm_cxt.predo_cxt.redoPf.recovery_done_ptr = t_thrd.xlog_cxt.ReadRecPtr; - ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), - errmsg("XLogPageRead IsRecoveryDone is set true, " - "ReadRecPtr:%X/%X, EndRecPtr:%X/%X, " - "receivedUpto:%X/%X, CtlInfo_insertHead:%X/%X.", - (uint32)(t_thrd.xlog_cxt.ReadRecPtr >> 32), - (uint32)(t_thrd.xlog_cxt.ReadRecPtr), - (uint32)(t_thrd.xlog_cxt.EndRecPtr >> 32), - (uint32)(t_thrd.xlog_cxt.EndRecPtr), - (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), - (uint32)(t_thrd.xlog_cxt.receivedUpto), - (uint32)(g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead >> 32), - (uint32)(g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead)))); - parallel_recovery::redo_dump_all_stats(); + if (!xlogctl->IsRecoveryDone) { + g_instance.comm_cxt.predo_cxt.redoPf.redo_done_time = GetCurrentTimestamp(); + g_instance.comm_cxt.predo_cxt.redoPf.recovery_done_ptr = t_thrd.xlog_cxt.ReadRecPtr; + ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("XLogPageRead IsRecoveryDone is set true, " + "ReadRecPtr:%X/%X, EndRecPtr:%X/%X, " + "receivedUpto:%X/%X, CtlInfo_insertHead:%X/%X.", + (uint32)(t_thrd.xlog_cxt.ReadRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.ReadRecPtr), + (uint32)(t_thrd.xlog_cxt.EndRecPtr >> 32), + (uint32)(t_thrd.xlog_cxt.EndRecPtr), + (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), + (uint32)(t_thrd.xlog_cxt.receivedUpto), + (uint32)(g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead >> 32), + (uint32)(g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead)))); + parallel_recovery::redo_dump_all_stats(); + } + + /* + * signal postmaster to update local redo end + * point to gaussdb state file. + */ + ProcTxnWorkLoad(true); + if (!xlogctl->IsRecoveryDone) { + SendPostmasterSignal(PMSIGNAL_LOCAL_RECOVERY_DONE); + if (SS_PERFORMING_SWITCHOVER) { + g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_PROMOTED; } + } - /* - * signal postmaster to update local redo end - * point to gaussdb state file. - */ - ProcTxnWorkLoad(true); - if (!xlogctl->IsRecoveryDone) { - SendPostmasterSignal(PMSIGNAL_LOCAL_RECOVERY_DONE); - if (SS_PERFORMING_SWITCHOVER) { - g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_PROMOTED; - } - } - - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->IsRecoveryDone = true; - SpinLockRelease(&xlogctl->info_lck); - /* - * If primary_conninfo is set, launch walreceiver to - * try to stream the missing WAL, before retrying to - * restore from archive/pg_xlog. - * - * If fetching_ckpt is TRUE, RecPtr points to the - * initial checkpoint location. In that case, we use - * RedoStartLSN as the streaming start position - * instead of RecPtr, so that when we later jump - * backwards to start redo at RedoStartLSN, we will - * have the logs streamed already. - */ - load_server_mode(); - - if (SS_REPLICATION_MAIN_STANBY_NODE && CheckForFailoverTrigger()) { - goto triggered; - } - ProcTxnWorkLoad(false); + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->IsRecoveryDone = true; + SpinLockRelease(&xlogctl->info_lck); + /* + * If primary_conninfo is set, launch walreceiver to + * try to stream the missing WAL, before retrying to + * restore from archive/pg_xlog. + * + * If fetching_ckpt is TRUE, RecPtr points to the + * initial checkpoint location. In that case, we use + * RedoStartLSN as the streaming start position + * instead of RecPtr, so that when we later jump + * backwards to start redo at RedoStartLSN, we will + * have the logs streamed already. + */ + load_server_mode(); + ProcTxnWorkLoad(false); + if (!SS_IN_REFORM) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; rename_recovery_conf_for_roach(); ereport(LOG, (errmsg("request xlog stream from dorado copy at %X/%X.", - fetching_ckpt ? (uint32)(t_thrd.xlog_cxt.RedoStartLSN >> 32) - : (uint32)(targetRecPtr >> 32), - fetching_ckpt ? (uint32)t_thrd.xlog_cxt.RedoStartLSN - : (uint32)targetRecPtr))); + (uint32)(targetRecPtr >> 32), + (uint32)targetRecPtr))); ShutdownWalRcv(); SpinLockAcquire(&walrcv->mutex); walrcv->receivedUpto = 0; SpinLockRelease(&walrcv->mutex); - RequestXLogStreaming(fetching_ckpt ? &t_thrd.xlog_cxt.RedoStartLSN : &targetRecPtr, 0, - REPCONNTARGET_SHARED_STORAGE, 0); - continue; - } - - if (CheckForFailoverTrigger()) { - XLogRecPtr receivedUpto = GetWalRcvWriteRecPtr(NULL); - XLogRecPtr EndRecPtrTemp = t_thrd.xlog_cxt.EndRecPtr; - XLByteAdvance(EndRecPtrTemp, SizeOfXLogRecord); - if (XLByteLT(EndRecPtrTemp, receivedUpto) && !FORCE_FINISH_ENABLED && - t_thrd.xlog_cxt.currentRetryTimes++ < g_retryTimes) { - ereport(WARNING, (errmsg("there are some received xlog have not been redo " - "the tail of last redo lsn:%X/%X, received lsn:%X/%X, retry %d times", - (uint32)(EndRecPtrTemp >> 32), (uint32)EndRecPtrTemp, - (uint32)(receivedUpto >> 32), (uint32)receivedUpto, - t_thrd.xlog_cxt.currentRetryTimes))); - return -1; - } - ereport(LOG, - (errmsg("read record failed when promoting, current lsn (%X/%X), received lsn(%X/%X)," - "sources[%u], failedSources[%u], readSource[%u], readFile[%d], readId[%u]," - "readSeg[%u], readOff[%u], readLen[%u]", - (uint32)(RecPtr >> 32), (uint32)RecPtr, - (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32), - (uint32)t_thrd.xlog_cxt.receivedUpto, sources, t_thrd.xlog_cxt.failedSources, - t_thrd.xlog_cxt.readSource, t_thrd.xlog_cxt.readFile, - (uint32)(t_thrd.xlog_cxt.readSegNo >> 32), (uint32)t_thrd.xlog_cxt.readSegNo, - t_thrd.xlog_cxt.readOff, t_thrd.xlog_cxt.readLen))); - goto triggered; + RequestXLogStreaming(&targetRecPtr, 0, REPCONNTARGET_SHARED_STORAGE, 0); } break; } @@ -19664,10 +19637,11 @@ retry: } } +needread: if (t_thrd.xlog_cxt.readFile < 0) { t_thrd.xlog_cxt.readFile = SSXLogFileOpenAnyTLI(t_thrd.xlog_cxt.readSegNo, emode, sources, xlog_path); if (t_thrd.xlog_cxt.readFile < 0) { - ereport(LOG, (errmsg("%s read failed, change xlog file.", xlog_path))); + ereport(LOG, (errmsg("%s open failed, change xlog file.", xlog_path))); goto next_record_is_invalid; } } @@ -19696,7 +19670,11 @@ next_record_is_invalid: t_thrd.xlog_cxt.readSource = 0; return -1; + triggered: + if (t_thrd.xlog_cxt.ssXlogReadFailedTimes < XLOG_STREAM_READREC_MAXTRY) { + goto needread; + } if (t_thrd.xlog_cxt.readFile >= 0) { close(t_thrd.xlog_cxt.readFile); } @@ -19704,7 +19682,7 @@ triggered: t_thrd.xlog_cxt.readLen = 0; t_thrd.xlog_cxt.readSource = 0; t_thrd.xlog_cxt.recoveryTriggered = true; - + return -1; } diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index bba36f6dd..6b1c7b11f 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -781,6 +781,7 @@ typedef struct knl_t_xlog_context { /* for switchover failed when load xlog record invalid retry count */ int currentRetryTimes; RedoTimeCost timeCost[TIME_COST_NUM]; + int ssXlogReadFailedTimes; } knl_t_xlog_context; typedef struct knl_t_dfs_context {