@ -5314,9 +5314,15 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in
|
||||
latestValidRecord = t_thrd.xlog_cxt.ReadRecPtr;
|
||||
latestRecordCrc = record->xl_crc;
|
||||
latestRecordLen = record->xl_tot_len;
|
||||
if (SS_REPLICATION_DORADO_CLUSTER) {
|
||||
t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0;
|
||||
}
|
||||
/* Great, got a record */
|
||||
return record;
|
||||
} else {
|
||||
if (SS_REPLICATION_DORADO_CLUSTER) {
|
||||
t_thrd.xlog_cxt.ssXlogReadFailedTimes++;
|
||||
}
|
||||
/* No valid record available from this source */
|
||||
if (streamFailCount < XLOG_STREAM_READREC_MAXTRY) {
|
||||
streamFailCount++;
|
||||
@ -8875,6 +8881,7 @@ void StartupXLOG(void)
|
||||
t_thrd.xlog_cxt.startup_processing = true;
|
||||
t_thrd.xlog_cxt.RedoDone = false;
|
||||
t_thrd.xlog_cxt.currentRetryTimes = 0;
|
||||
t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0;
|
||||
t_thrd.xlog_cxt.forceFinishHappened = false;
|
||||
g_instance.comm_cxt.predo_cxt.redoPf.recovery_done_ptr = 0;
|
||||
g_instance.comm_cxt.predo_cxt.redoPf.redo_done_time = 0;
|
||||
@ -19370,7 +19377,6 @@ static int SSDoradoReadXLog(XLogReaderState *xlogreader, XLogRecPtr targetPagePt
|
||||
XLogRecPtr RecPtr = targetPagePtr;
|
||||
uint32 targetPageOff;
|
||||
bool processtrxn = false;
|
||||
bool fetching_ckpt = readprivate->fetching_ckpt;
|
||||
bool randAccess = readprivate->randAccess;
|
||||
XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl;
|
||||
XLogSegNo replayedSegNo;
|
||||
@ -19484,12 +19490,6 @@ retry:
|
||||
pg_memory_barrier();
|
||||
|
||||
if (SS_REPLICATION_MAIN_STANBY_NODE && WalRcvIsDone() && CheckForFailoverTrigger()) {
|
||||
t_thrd.xlog_cxt.receivedUpto = GetWalRcvWriteRecPtr(NULL);
|
||||
if (XLByteLT(RecPtr, t_thrd.xlog_cxt.receivedUpto)) {
|
||||
/* wait xlog redo done */
|
||||
continue;
|
||||
}
|
||||
|
||||
ProcTxnWorkLoad(true);
|
||||
ereport(LOG, (errmsg("RecPtr(%X/%X), receivedUpto(%X/%X).", (uint32)(RecPtr >> 32),
|
||||
(uint32)RecPtr, (uint32)(t_thrd.xlog_cxt.receivedUpto >> 32),
|
||||
@ -19524,124 +19524,97 @@ retry:
|
||||
close(t_thrd.xlog_cxt.readFile);
|
||||
t_thrd.xlog_cxt.readFile = -1;
|
||||
}
|
||||
t_thrd.xlog_cxt.failedSources = 0;
|
||||
/* Reset curFileTLI if random fetch. */
|
||||
if (randAccess) {
|
||||
t_thrd.xlog_cxt.curFileTLI = 0;
|
||||
}
|
||||
|
||||
if (SS_REPLICATION_MAIN_STANBY_NODE && CheckForFailoverTrigger()) {
|
||||
goto triggered;
|
||||
}
|
||||
/*
|
||||
* Try to restore the file from archive, or read an
|
||||
* existing file from pg_xlog.
|
||||
*/
|
||||
sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
|
||||
if (XLByteLE(t_thrd.xlog_cxt.receivedUpto, expectedRecPtr)) {
|
||||
t_thrd.xlog_cxt.failedSources = 0;
|
||||
|
||||
/*
|
||||
* Before we sleep, re-scan for possible new timelines
|
||||
* if we were requested to recover to the latest
|
||||
* timeline.
|
||||
*/
|
||||
if (t_thrd.xlog_cxt.recoveryTargetIsLatest) {
|
||||
if (rescanLatestTimeLine()) {
|
||||
continue;
|
||||
}
|
||||
if (t_thrd.startup_cxt.shutdown_requested) {
|
||||
ereport(LOG, (errmsg("startup shutdown")));
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Before we sleep, re-scan for possible new timelines
|
||||
* if we were requested to recover to the latest
|
||||
* timeline.
|
||||
*/
|
||||
if (t_thrd.xlog_cxt.recoveryTargetIsLatest) {
|
||||
if (rescanLatestTimeLine()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (!xlogctl->IsRecoveryDone) {
|
||||
g_instance.comm_cxt.predo_cxt.redoPf.redo_done_time = GetCurrentTimestamp();
|
||||
g_instance.comm_cxt.predo_cxt.redoPf.recovery_done_ptr = t_thrd.xlog_cxt.ReadRecPtr;
|
||||
ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
|
||||
errmsg("XLogPageRead IsRecoveryDone is set true, "
|
||||
"ReadRecPtr:%X/%X, EndRecPtr:%X/%X, "
|
||||
"receivedUpto:%X/%X, CtlInfo_insertHead:%X/%X.",
|
||||
(uint32)(t_thrd.xlog_cxt.ReadRecPtr >> 32),
|
||||
(uint32)(t_thrd.xlog_cxt.ReadRecPtr),
|
||||
(uint32)(t_thrd.xlog_cxt.EndRecPtr >> 32),
|
||||
(uint32)(t_thrd.xlog_cxt.EndRecPtr),
|
||||
(uint32)(t_thrd.xlog_cxt.receivedUpto >> 32),
|
||||
(uint32)(t_thrd.xlog_cxt.receivedUpto),
|
||||
(uint32)(g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead >> 32),
|
||||
(uint32)(g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead))));
|
||||
parallel_recovery::redo_dump_all_stats();
|
||||
if (!xlogctl->IsRecoveryDone) {
|
||||
g_instance.comm_cxt.predo_cxt.redoPf.redo_done_time = GetCurrentTimestamp();
|
||||
g_instance.comm_cxt.predo_cxt.redoPf.recovery_done_ptr = t_thrd.xlog_cxt.ReadRecPtr;
|
||||
ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
|
||||
errmsg("XLogPageRead IsRecoveryDone is set true, "
|
||||
"ReadRecPtr:%X/%X, EndRecPtr:%X/%X, "
|
||||
"receivedUpto:%X/%X, CtlInfo_insertHead:%X/%X.",
|
||||
(uint32)(t_thrd.xlog_cxt.ReadRecPtr >> 32),
|
||||
(uint32)(t_thrd.xlog_cxt.ReadRecPtr),
|
||||
(uint32)(t_thrd.xlog_cxt.EndRecPtr >> 32),
|
||||
(uint32)(t_thrd.xlog_cxt.EndRecPtr),
|
||||
(uint32)(t_thrd.xlog_cxt.receivedUpto >> 32),
|
||||
(uint32)(t_thrd.xlog_cxt.receivedUpto),
|
||||
(uint32)(g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead >> 32),
|
||||
(uint32)(g_instance.xlog_cxt.ssReplicationXLogCtl->insertHead))));
|
||||
parallel_recovery::redo_dump_all_stats();
|
||||
}
|
||||
|
||||
/*
|
||||
* signal postmaster to update local redo end
|
||||
* point to gaussdb state file.
|
||||
*/
|
||||
ProcTxnWorkLoad(true);
|
||||
if (!xlogctl->IsRecoveryDone) {
|
||||
SendPostmasterSignal(PMSIGNAL_LOCAL_RECOVERY_DONE);
|
||||
if (SS_PERFORMING_SWITCHOVER) {
|
||||
g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_PROMOTED;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* signal postmaster to update local redo end
|
||||
* point to gaussdb state file.
|
||||
*/
|
||||
ProcTxnWorkLoad(true);
|
||||
if (!xlogctl->IsRecoveryDone) {
|
||||
SendPostmasterSignal(PMSIGNAL_LOCAL_RECOVERY_DONE);
|
||||
if (SS_PERFORMING_SWITCHOVER) {
|
||||
g_instance.dms_cxt.SSClusterState = NODESTATE_STANDBY_PROMOTED;
|
||||
}
|
||||
}
|
||||
|
||||
SpinLockAcquire(&xlogctl->info_lck);
|
||||
xlogctl->IsRecoveryDone = true;
|
||||
SpinLockRelease(&xlogctl->info_lck);
|
||||
/*
|
||||
* If primary_conninfo is set, launch walreceiver to
|
||||
* try to stream the missing WAL, before retrying to
|
||||
* restore from archive/pg_xlog.
|
||||
*
|
||||
* If fetching_ckpt is TRUE, RecPtr points to the
|
||||
* initial checkpoint location. In that case, we use
|
||||
* RedoStartLSN as the streaming start position
|
||||
* instead of RecPtr, so that when we later jump
|
||||
* backwards to start redo at RedoStartLSN, we will
|
||||
* have the logs streamed already.
|
||||
*/
|
||||
load_server_mode();
|
||||
|
||||
if (SS_REPLICATION_MAIN_STANBY_NODE && CheckForFailoverTrigger()) {
|
||||
goto triggered;
|
||||
}
|
||||
ProcTxnWorkLoad(false);
|
||||
SpinLockAcquire(&xlogctl->info_lck);
|
||||
xlogctl->IsRecoveryDone = true;
|
||||
SpinLockRelease(&xlogctl->info_lck);
|
||||
/*
|
||||
* If primary_conninfo is set, launch walreceiver to
|
||||
* try to stream the missing WAL, before retrying to
|
||||
* restore from archive/pg_xlog.
|
||||
*
|
||||
* If fetching_ckpt is TRUE, RecPtr points to the
|
||||
* initial checkpoint location. In that case, we use
|
||||
* RedoStartLSN as the streaming start position
|
||||
* instead of RecPtr, so that when we later jump
|
||||
* backwards to start redo at RedoStartLSN, we will
|
||||
* have the logs streamed already.
|
||||
*/
|
||||
load_server_mode();
|
||||
ProcTxnWorkLoad(false);
|
||||
if (!SS_IN_REFORM) {
|
||||
/* use volatile pointer to prevent code rearrangement */
|
||||
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
|
||||
rename_recovery_conf_for_roach();
|
||||
|
||||
ereport(LOG, (errmsg("request xlog stream from dorado copy at %X/%X.",
|
||||
fetching_ckpt ? (uint32)(t_thrd.xlog_cxt.RedoStartLSN >> 32)
|
||||
: (uint32)(targetRecPtr >> 32),
|
||||
fetching_ckpt ? (uint32)t_thrd.xlog_cxt.RedoStartLSN
|
||||
: (uint32)targetRecPtr)));
|
||||
(uint32)(targetRecPtr >> 32),
|
||||
(uint32)targetRecPtr)));
|
||||
ShutdownWalRcv();
|
||||
SpinLockAcquire(&walrcv->mutex);
|
||||
walrcv->receivedUpto = 0;
|
||||
SpinLockRelease(&walrcv->mutex);
|
||||
|
||||
RequestXLogStreaming(fetching_ckpt ? &t_thrd.xlog_cxt.RedoStartLSN : &targetRecPtr, 0,
|
||||
REPCONNTARGET_SHARED_STORAGE, 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (CheckForFailoverTrigger()) {
|
||||
XLogRecPtr receivedUpto = GetWalRcvWriteRecPtr(NULL);
|
||||
XLogRecPtr EndRecPtrTemp = t_thrd.xlog_cxt.EndRecPtr;
|
||||
XLByteAdvance(EndRecPtrTemp, SizeOfXLogRecord);
|
||||
if (XLByteLT(EndRecPtrTemp, receivedUpto) && !FORCE_FINISH_ENABLED &&
|
||||
t_thrd.xlog_cxt.currentRetryTimes++ < g_retryTimes) {
|
||||
ereport(WARNING, (errmsg("there are some received xlog have not been redo "
|
||||
"the tail of last redo lsn:%X/%X, received lsn:%X/%X, retry %d times",
|
||||
(uint32)(EndRecPtrTemp >> 32), (uint32)EndRecPtrTemp,
|
||||
(uint32)(receivedUpto >> 32), (uint32)receivedUpto,
|
||||
t_thrd.xlog_cxt.currentRetryTimes)));
|
||||
return -1;
|
||||
}
|
||||
ereport(LOG,
|
||||
(errmsg("read record failed when promoting, current lsn (%X/%X), received lsn(%X/%X),"
|
||||
"sources[%u], failedSources[%u], readSource[%u], readFile[%d], readId[%u],"
|
||||
"readSeg[%u], readOff[%u], readLen[%u]",
|
||||
(uint32)(RecPtr >> 32), (uint32)RecPtr,
|
||||
(uint32)(t_thrd.xlog_cxt.receivedUpto >> 32),
|
||||
(uint32)t_thrd.xlog_cxt.receivedUpto, sources, t_thrd.xlog_cxt.failedSources,
|
||||
t_thrd.xlog_cxt.readSource, t_thrd.xlog_cxt.readFile,
|
||||
(uint32)(t_thrd.xlog_cxt.readSegNo >> 32), (uint32)t_thrd.xlog_cxt.readSegNo,
|
||||
t_thrd.xlog_cxt.readOff, t_thrd.xlog_cxt.readLen)));
|
||||
goto triggered;
|
||||
RequestXLogStreaming(&targetRecPtr, 0, REPCONNTARGET_SHARED_STORAGE, 0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -19664,10 +19637,11 @@ retry:
|
||||
}
|
||||
}
|
||||
|
||||
needread:
|
||||
if (t_thrd.xlog_cxt.readFile < 0) {
|
||||
t_thrd.xlog_cxt.readFile = SSXLogFileOpenAnyTLI(t_thrd.xlog_cxt.readSegNo, emode, sources, xlog_path);
|
||||
if (t_thrd.xlog_cxt.readFile < 0) {
|
||||
ereport(LOG, (errmsg("%s read failed, change xlog file.", xlog_path)));
|
||||
ereport(LOG, (errmsg("%s open failed, change xlog file.", xlog_path)));
|
||||
goto next_record_is_invalid;
|
||||
}
|
||||
}
|
||||
@ -19696,7 +19670,11 @@ next_record_is_invalid:
|
||||
t_thrd.xlog_cxt.readSource = 0;
|
||||
|
||||
return -1;
|
||||
|
||||
triggered:
|
||||
if (t_thrd.xlog_cxt.ssXlogReadFailedTimes < XLOG_STREAM_READREC_MAXTRY) {
|
||||
goto needread;
|
||||
}
|
||||
if (t_thrd.xlog_cxt.readFile >= 0) {
|
||||
close(t_thrd.xlog_cxt.readFile);
|
||||
}
|
||||
@ -19704,7 +19682,7 @@ triggered:
|
||||
t_thrd.xlog_cxt.readLen = 0;
|
||||
t_thrd.xlog_cxt.readSource = 0;
|
||||
t_thrd.xlog_cxt.recoveryTriggered = true;
|
||||
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
@ -781,6 +781,7 @@ typedef struct knl_t_xlog_context {
|
||||
/* for switchover failed when load xlog record invalid retry count */
|
||||
int currentRetryTimes;
|
||||
RedoTimeCost timeCost[TIME_COST_NUM];
|
||||
int ssXlogReadFailedTimes;
|
||||
} knl_t_xlog_context;
|
||||
|
||||
typedef struct knl_t_dfs_context {
|
||||
|
||||
Reference in New Issue
Block a user