diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index fc0304576..9df968ea3 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -415,8 +415,8 @@ static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr); static XLogRecPtr XLogInsertRecordSingle(XLogRecData *rdata, XLogRecPtr fpw_lsn); -static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int expectReadLen, - XLogRecPtr targetRecPtr, char *buf, TimeLineID *readTLI, char* xlog_path); +static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path); void ArchiveXlogForForceFinishRedo(XLogReaderState *xlogreader, TermFileData *term_file); TermFileData GetTermFileDataAndClear(void); XLogRecPtr mpfl_read_max_flush_lsn(); @@ -19024,8 +19024,8 @@ static void SSOndemandXlogCopy(XLogSegNo copySegNo, uint32 startOffset, char *co t_thrd.ondemand_xlog_copy_cxt.openLogOff += copyBytes; } -static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int expectReadLen, - XLogRecPtr targetRecPtr, char *buf, TimeLineID *readTLI, char* xlog_path) +static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, + XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path) { /* Load reader private data */ XLogPageReadPrivate *readprivate = (XLogPageReadPrivate *)xlogreader->private_data; @@ -19037,6 +19037,7 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int bool randAccess = IsExtremeRedo() ? false : readprivate->randAccess; XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl; XLogSegNo replayedSegNo; + uint32 actualBytes; #ifdef USE_ASSERT_CHECKING XLogSegNo targetSegNo; @@ -19071,11 +19072,11 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int } XLByteToSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo); - XLByteAdvance(RecPtr, expectReadLen); + XLByteAdvance(RecPtr, reqLen); retry: /* See if we need to retrieve more data */ - if (t_thrd.xlog_cxt.readFile < 0 || !DORADO_STANDBY_CLUSTER || + if (t_thrd.xlog_cxt.readFile < 0 || (t_thrd.xlog_cxt.readSource == XLOG_FROM_STREAM && XLByteLT(t_thrd.xlog_cxt.receivedUpto, RecPtr))) { if (t_thrd.xlog_cxt.StandbyMode && t_thrd.xlog_cxt.startup_processing && DORADO_STANDBY_CLUSTER) { /* @@ -19347,8 +19348,7 @@ retry: } /* Don't try to read from a source that just failed */ sources &= ~t_thrd.xlog_cxt.failedSources; - t_thrd.xlog_cxt.readFile = SSXLogFileReadAnyTLI(t_thrd.xlog_cxt.readSegNo, - emode, sources, xlog_path); + t_thrd.xlog_cxt.readFile = XLogFileReadAnyTLI(t_thrd.xlog_cxt.readSegNo, DEBUG2, sources); if (t_thrd.xlog_cxt.readFile >= 0) { break; } @@ -19412,33 +19412,65 @@ retry: /* Read the requested page */ t_thrd.xlog_cxt.readOff = targetPageOff; - int actualBytes; - if (xlogreader->preReadBuf != NULL) { - actualBytes = SSReadXlogInternal(xlogreader, targetPagePtr, targetRecPtr, buf, XLOG_BLCKSZ); + if (DORADO_STANDBY_CLUSTER_MAINSTANDBY_NODE) { +try_again: + if (lseek(t_thrd.xlog_cxt.readFile, (off_t)t_thrd.xlog_cxt.readOff, SEEK_SET) < 0) { + ereport(emode_for_corrupt_record(emode, RecPtr), + (errcode_for_file_access(), + errmsg("could not seek in log file %s to offset %u: %m", + XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo), + t_thrd.xlog_cxt.readOff))); + if (errno == EINTR) { + errno = 0; + pg_usleep(1000); + goto try_again; + } + goto next_record_is_invalid; + } + pgstat_report_waitevent(WAIT_EVENT_WAL_READ); + actualBytes = read(t_thrd.xlog_cxt.readFile, readBuf, XLOG_BLCKSZ); + pgstat_report_waitevent(WAIT_EVENT_END); + if (actualBytes != XLOG_BLCKSZ) { + ereport(emode_for_corrupt_record(emode, RecPtr), + (errcode_for_file_access(), + errmsg("could not read from log file %s to offset %u: %m", + XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo), + t_thrd.xlog_cxt.readOff))); + if (errno == EINTR) { + errno = 0; + pg_usleep(1000); + goto try_again; + } + goto next_record_is_invalid; + } } else { - actualBytes = (int)pread(t_thrd.xlog_cxt.readFile, buf, XLOG_BLCKSZ, t_thrd.xlog_cxt.readOff); - } + if (xlogreader->preReadBuf != NULL) { + actualBytes = (uint32)SSReadXlogInternal(xlogreader, targetPagePtr, targetRecPtr, readBuf, XLOG_BLCKSZ); + } else { + actualBytes = (uint32)pread(t_thrd.xlog_cxt.readFile, readBuf, XLOG_BLCKSZ, t_thrd.xlog_cxt.readOff); + } - if (actualBytes != XLOG_BLCKSZ) { - ereport(LOG, (errcode_for_file_access(), errmsg("read xlog(start:%X/%X, pos:%u len:%d) failed : %m", - static_cast(targetPagePtr >> BIT_NUM_INT32), - static_cast(targetPagePtr), targetPageOff, - expectReadLen))); - ereport(emode_for_corrupt_record(emode, RecPtr), - (errcode_for_file_access(), - errmsg("could not read from log file %s to offset %u: %m", - XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo), - t_thrd.xlog_cxt.readOff))); - goto next_record_is_invalid; + if (actualBytes != XLOG_BLCKSZ) { + ereport(LOG, (errcode_for_file_access(), errmsg("read xlog(start:%X/%X, pos:%u len:%d) failed : %m", + static_cast(targetPagePtr >> BIT_NUM_INT32), + static_cast(targetPagePtr), targetPageOff, + reqLen))); + ereport(emode_for_corrupt_record(emode, RecPtr), + (errcode_for_file_access(), + errmsg("could not read from log file %s to offset %u: %m", + XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo), + t_thrd.xlog_cxt.readOff))); + goto next_record_is_invalid; + } } Assert(targetSegNo == t_thrd.xlog_cxt.readSegNo); Assert(targetPageOff == t_thrd.xlog_cxt.readOff); - Assert((uint32)expectReadLen <= t_thrd.xlog_cxt.readLen); + Assert((uint32)reqLen <= t_thrd.xlog_cxt.readLen); *readTLI = t_thrd.xlog_cxt.curFileTLI; - return (int)t_thrd.xlog_cxt.readLen; + return t_thrd.xlog_cxt.readLen; next_record_is_invalid: t_thrd.xlog_cxt.failedSources |= t_thrd.xlog_cxt.readSource;