!3643 dorado双集群日志回放问题

Merge pull request !3643 from Carl/master
This commit is contained in:
opengauss_bot
2023-06-29 06:28:20 +00:00
committed by Gitee

View File

@ -415,8 +415,8 @@ static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
static XLogRecPtr XLogInsertRecordSingle(XLogRecData *rdata, XLogRecPtr fpw_lsn);
static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int expectReadLen,
XLogRecPtr targetRecPtr, char *buf, TimeLineID *readTLI, char* xlog_path);
static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path);
void ArchiveXlogForForceFinishRedo(XLogReaderState *xlogreader, TermFileData *term_file);
TermFileData GetTermFileDataAndClear(void);
XLogRecPtr mpfl_read_max_flush_lsn();
@ -19024,8 +19024,8 @@ static void SSOndemandXlogCopy(XLogSegNo copySegNo, uint32 startOffset, char *co
t_thrd.ondemand_xlog_copy_cxt.openLogOff += copyBytes;
}
static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int expectReadLen,
XLogRecPtr targetRecPtr, char *buf, TimeLineID *readTLI, char* xlog_path)
static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path)
{
/* Load reader private data */
XLogPageReadPrivate *readprivate = (XLogPageReadPrivate *)xlogreader->private_data;
@ -19037,6 +19037,7 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int
bool randAccess = IsExtremeRedo() ? false : readprivate->randAccess;
XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl;
XLogSegNo replayedSegNo;
uint32 actualBytes;
#ifdef USE_ASSERT_CHECKING
XLogSegNo targetSegNo;
@ -19071,11 +19072,11 @@ static int SSReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int
}
XLByteToSeg(targetPagePtr, t_thrd.xlog_cxt.readSegNo);
XLByteAdvance(RecPtr, expectReadLen);
XLByteAdvance(RecPtr, reqLen);
retry:
/* See if we need to retrieve more data */
if (t_thrd.xlog_cxt.readFile < 0 || !DORADO_STANDBY_CLUSTER ||
if (t_thrd.xlog_cxt.readFile < 0 ||
(t_thrd.xlog_cxt.readSource == XLOG_FROM_STREAM && XLByteLT(t_thrd.xlog_cxt.receivedUpto, RecPtr))) {
if (t_thrd.xlog_cxt.StandbyMode && t_thrd.xlog_cxt.startup_processing && DORADO_STANDBY_CLUSTER) {
/*
@ -19347,8 +19348,7 @@ retry:
}
/* Don't try to read from a source that just failed */
sources &= ~t_thrd.xlog_cxt.failedSources;
t_thrd.xlog_cxt.readFile = SSXLogFileReadAnyTLI(t_thrd.xlog_cxt.readSegNo,
emode, sources, xlog_path);
t_thrd.xlog_cxt.readFile = XLogFileReadAnyTLI(t_thrd.xlog_cxt.readSegNo, DEBUG2, sources);
if (t_thrd.xlog_cxt.readFile >= 0) {
break;
}
@ -19412,33 +19412,65 @@ retry:
/* Read the requested page */
t_thrd.xlog_cxt.readOff = targetPageOff;
int actualBytes;
if (xlogreader->preReadBuf != NULL) {
actualBytes = SSReadXlogInternal(xlogreader, targetPagePtr, targetRecPtr, buf, XLOG_BLCKSZ);
if (DORADO_STANDBY_CLUSTER_MAINSTANDBY_NODE) {
try_again:
if (lseek(t_thrd.xlog_cxt.readFile, (off_t)t_thrd.xlog_cxt.readOff, SEEK_SET) < 0) {
ereport(emode_for_corrupt_record(emode, RecPtr),
(errcode_for_file_access(),
errmsg("could not seek in log file %s to offset %u: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo),
t_thrd.xlog_cxt.readOff)));
if (errno == EINTR) {
errno = 0;
pg_usleep(1000);
goto try_again;
}
goto next_record_is_invalid;
}
pgstat_report_waitevent(WAIT_EVENT_WAL_READ);
actualBytes = read(t_thrd.xlog_cxt.readFile, readBuf, XLOG_BLCKSZ);
pgstat_report_waitevent(WAIT_EVENT_END);
if (actualBytes != XLOG_BLCKSZ) {
ereport(emode_for_corrupt_record(emode, RecPtr),
(errcode_for_file_access(),
errmsg("could not read from log file %s to offset %u: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo),
t_thrd.xlog_cxt.readOff)));
if (errno == EINTR) {
errno = 0;
pg_usleep(1000);
goto try_again;
}
goto next_record_is_invalid;
}
} else {
actualBytes = (int)pread(t_thrd.xlog_cxt.readFile, buf, XLOG_BLCKSZ, t_thrd.xlog_cxt.readOff);
}
if (xlogreader->preReadBuf != NULL) {
actualBytes = (uint32)SSReadXlogInternal(xlogreader, targetPagePtr, targetRecPtr, readBuf, XLOG_BLCKSZ);
} else {
actualBytes = (uint32)pread(t_thrd.xlog_cxt.readFile, readBuf, XLOG_BLCKSZ, t_thrd.xlog_cxt.readOff);
}
if (actualBytes != XLOG_BLCKSZ) {
ereport(LOG, (errcode_for_file_access(), errmsg("read xlog(start:%X/%X, pos:%u len:%d) failed : %m",
static_cast<uint32>(targetPagePtr >> BIT_NUM_INT32),
static_cast<uint32>(targetPagePtr), targetPageOff,
expectReadLen)));
ereport(emode_for_corrupt_record(emode, RecPtr),
(errcode_for_file_access(),
errmsg("could not read from log file %s to offset %u: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo),
t_thrd.xlog_cxt.readOff)));
goto next_record_is_invalid;
if (actualBytes != XLOG_BLCKSZ) {
ereport(LOG, (errcode_for_file_access(), errmsg("read xlog(start:%X/%X, pos:%u len:%d) failed : %m",
static_cast<uint32>(targetPagePtr >> BIT_NUM_INT32),
static_cast<uint32>(targetPagePtr), targetPageOff,
reqLen)));
ereport(emode_for_corrupt_record(emode, RecPtr),
(errcode_for_file_access(),
errmsg("could not read from log file %s to offset %u: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, t_thrd.xlog_cxt.readSegNo),
t_thrd.xlog_cxt.readOff)));
goto next_record_is_invalid;
}
}
Assert(targetSegNo == t_thrd.xlog_cxt.readSegNo);
Assert(targetPageOff == t_thrd.xlog_cxt.readOff);
Assert((uint32)expectReadLen <= t_thrd.xlog_cxt.readLen);
Assert((uint32)reqLen <= t_thrd.xlog_cxt.readLen);
*readTLI = t_thrd.xlog_cxt.curFileTLI;
return (int)t_thrd.xlog_cxt.readLen;
return t_thrd.xlog_cxt.readLen;
next_record_is_invalid:
t_thrd.xlog_cxt.failedSources |= t_thrd.xlog_cxt.readSource;