!4910 双集群极致rto优化

Merge pull request !4910 from zhengxue/zx_master_new_log_last
This commit is contained in:
opengauss_bot
2024-03-01 11:51:08 +00:00
committed by Gitee
4 changed files with 92 additions and 73 deletions

View File

@ -393,18 +393,6 @@ void HandlePageRedoInterruptsImpl(uint64 clearRedoFdCountInc = 1)
proc_exit(1);
}
static uint64 clearRedoFdCount = 0;
const uint64 clearRedoFdCountMask = 0x7FFFFFF;
clearRedoFdCount += clearRedoFdCountInc;
if (clearRedoFdCount > clearRedoFdCountMask && GetSMgrRelationHash() != NULL &&
(g_redoWorker->role == REDO_PAGE_WORKER || g_redoWorker->role == REDO_PAGE_MNG)) {
clearRedoFdCount = 0;
long hash_num = hash_get_num_entries(GetSMgrRelationHash());
if (hash_num >= MAX_CLEAR_SMGR_NUM) {
smgrcloseall();
}
}
}
void HandlePageRedoInterrupts()
@ -972,15 +960,21 @@ void PageManagerProcSegFullSyncState(XLogRecParseState *parseState)
void PageManagerProcSegPipeLineSyncState(XLogRecParseState *parseState)
{
if (!SS_DISASTER_STANDBY_CLUSTER) {
WaitCurrentPipeLineRedoWorkersQueueEmpty();
if (SS_DISASTER_STANDBY_CLUSTER) {
PageRedoPipeline *myRedoLine = &g_dispatcher->pageLines[g_redoWorker->slotId];
const uint32 WorkerNumPerMng = myRedoLine->redoThdNum;
uint32 work_id = WorkerNumPerMng - 1;
parseState->nextrecord = NULL;
AddPageRedoItem(myRedoLine->redoThd[work_id], parseState);
} else {
WaitCurrentPipeLineRedoWorkersQueueEmpty();
MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx);
RedoPageManagerDdlAction(parseState);
(void)MemoryContextSwitchTo(oldCtx);
XLogBlockParseStateRelease(parseState);
}
MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx);
RedoPageManagerDdlAction(parseState);
(void)MemoryContextSwitchTo(oldCtx);
XLogBlockParseStateRelease(parseState);
}
static void WaitNextBarrier(XLogRecParseState *parseState)
@ -1633,6 +1627,9 @@ void RedoPageWorkerMain()
redoblockstate->blockparse.blockhead.end_ptr);
CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_6]);
break;
case BLOCK_DATA_SEG_EXTEND:
ProcSegPageCommonRedo(redoblockstate);
break;
default:
break;
}
@ -2141,7 +2138,6 @@ void XLogReadPageWorkerMain()
g_redoWorker->lastReplayedReadRecPtr = xlogreader->ReadRecPtr;
g_redoWorker->lastReplayedEndRecPtr = xlogreader->EndRecPtr;
PushToWorkerLsn(send_lsn_forwarder_for_check_to_hot_standby(g_redoWorker->lastReplayedEndRecPtr));
if (FORCE_FINISH_ENABLED) {
CheckAndDoForceFinish(xlogreader);
}

View File

@ -542,7 +542,7 @@ triggered:
}
int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr,
TimeLineID *readTLI)
TimeLineID *readTLI, char* xlogPath)
{
int readLen = -1;
pg_atomic_write_u64(&g_dispatcher->rtoXlogBufState.targetRecPtr, targetRecPtr);
@ -556,43 +556,8 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
} else {
if (ENABLE_DMS && ENABLE_DSS) {
if (SS_DORADO_CLUSTER) {
for (int i = 0; i < DMS_MAX_INSTANCE; i++) {
if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') {
break;
}
char *curPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i];
readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr,
xlogreader->readBuf, readTLI, curPath);
/* read success, exchange index */
if (readLen > 0) {
/* exchange index if xlog_list[0] is not current xlog path */
if (i != 0) {
char exPath[MAXPGPATH];
errorno = snprintf_s(exPath, MAXPGPATH, MAXPGPATH - 1, curPath);
securec_check_ss(errorno, "", "");
errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, MAXPGPATH - 1,
g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0]);
securec_check_ss(errorno, "", "");
errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0], MAXPGPATH, MAXPGPATH - 1, exPath);
securec_check_ss(errorno, "", "");
}
break;
}
if (t_thrd.xlog_cxt.readFile >= 0) {
close(t_thrd.xlog_cxt.readFile);
t_thrd.xlog_cxt.readFile = -1;
}
/* If record which is read from file is NULL, when preReadStartPtr is not set InvalidXlogPreReadStartPtr
* then exhchanging file, due to preread 64M now RecPtr < preReadStartPtr, so record still is got from
* preReadBuf and record still is bad. Therefore, preReadStartPtr need to set InvalidXlogPreReadStartPtr
* so that record is read from next file on disk instead of preReadBuf.
*/
xlogreader->preReadStartPtr = InvalidXlogPreReadStartPtr;
}
readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr,
xlogreader->readBuf, readTLI, xlogPath);
} else {
readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr,
xlogreader->readBuf, readTLI, NULL);
@ -601,6 +566,11 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
readLen = ParallelXLogPageReadFile(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI);
}
}
/* current path haven't xlog file for this xlog */
if (SS_DORADO_CLUSTER && readLen < 0) {
return -1;
}
if (readLen > 0 || t_thrd.xlog_cxt.recoveryTriggered || !t_thrd.xlog_cxt.StandbyMode || DoEarlyExit()) {
return readLen;
@ -613,7 +583,7 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
return readLen;
}
int ParallelReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
int ParallelReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, char* xlogPath)
{
int readLen;
uint32 targetPageOff;
@ -635,7 +605,7 @@ int ParallelReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int req
* so that we can validate it.
*/
readLen = ParallelXLogPageRead(state, pageptr, Max(reqLen, (int)SizeOfXLogShortPHD), state->currRecPtr,
&state->readPageTLI);
&state->readPageTLI, xlogPath);
if (readLen < 0) {
goto err;
}
@ -653,7 +623,8 @@ int ParallelReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int req
/* still not enough */
if (readLen < (int)XLogPageHeaderSize(hdr)) {
readLen = ParallelXLogPageRead(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, &state->readPageTLI);
readLen = ParallelXLogPageRead(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr,
&state->readPageTLI, xlogPath);
if (readLen < 0) {
goto err;
}
@ -678,7 +649,7 @@ err:
return -1;
}
XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg, char* xlogPath)
{
XLogRecord *record = NULL;
XLogRecPtr targetPagePtr;
@ -737,7 +708,8 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char *
* enough byte to cover the whole record header, or at least the part of
* it that fits on the same page.
*/
readOff = ParallelReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
readOff = ParallelReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
xlogPath);
if (readOff < 0) {
report_invalid_record(state, "read xlog page failed at %X/%X", (uint32)(RecPtr >> 32), (uint32)RecPtr);
goto err;
@ -821,7 +793,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char *
uint32 gotlen;
errno_t errorno = EOK;
readOff = ParallelReadPageInternal(state, targetPagePtr, XLOG_BLCKSZ);
readOff = ParallelReadPageInternal(state, targetPagePtr, XLOG_BLCKSZ, xlogPath);
if (readOff < 0) {
goto err;
}
@ -838,7 +810,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char *
/* Wait for the next page to become available */
readOff = ParallelReadPageInternal(state, targetPagePtr,
Min(total_len - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ));
Min(total_len - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ), xlogPath);
if (readOff < 0)
goto err;
@ -865,7 +837,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char *
/* Append the continuation from this page to the buffer */
pageHeaderSize = XLogPageHeaderSize(pageHeader);
if (readOff < (int)pageHeaderSize)
readOff = ParallelReadPageInternal(state, targetPagePtr, pageHeaderSize);
readOff = ParallelReadPageInternal(state, targetPagePtr, pageHeaderSize, xlogPath);
Assert((int)pageHeaderSize <= readOff);
@ -875,7 +847,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char *
len = pageHeader->xlp_rem_len;
if (readOff < (int)(pageHeaderSize + len))
readOff = ParallelReadPageInternal(state, targetPagePtr, pageHeaderSize + len);
readOff = ParallelReadPageInternal(state, targetPagePtr, pageHeaderSize + len, xlogPath);
errorno = memcpy_s(buffer, total_len - gotlen, (char *)contdata, len);
securec_check_c(errorno, "", "");
@ -903,7 +875,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char *
XLByteAdvance(state->EndRecPtr, (pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len)));
} else {
/* Wait for the record data to become available */
readOff = ParallelReadPageInternal(state, targetPagePtr, Min(targetRecOff + total_len, XLOG_BLCKSZ));
readOff = ParallelReadPageInternal(state, targetPagePtr, Min(targetRecOff + total_len, XLOG_BLCKSZ), xlogPath);
if (readOff < 0) {
goto err;
}
@ -945,6 +917,52 @@ err:
return NULL;
}
/*
* in ss dorado double cluster, we need read xlogpath ergodic,
* we will read xlog in path where last read success
*/
XLogRecord *SSExtremeXLogReadRecordErgodic(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
{
XLogRecord *record = NULL;
errno_t errorno = 0;
for (int i = 0; i < DMS_MAX_INSTANCE; i++) {
if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') {
break;
}
char *curPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i];
record = ParallelReadRecord(state, InvalidXLogRecPtr, errormsg, curPath);
if (record != NULL) {
/* read success, exchange index */
if (i != 0) {
/* read success, exchange index */
char exPath[MAXPGPATH];
errorno = snprintf_s(exPath, MAXPGPATH, MAXPGPATH - 1, curPath);
securec_check_ss(errorno, "", "");
errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, MAXPGPATH - 1,
g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0]);
securec_check_ss(errorno, "", "");
errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0], MAXPGPATH, MAXPGPATH - 1, exPath);
securec_check_ss(errorno, "", "");
}
break;
} else {
if (t_thrd.xlog_cxt.readFile >= 0) {
close(t_thrd.xlog_cxt.readFile);
t_thrd.xlog_cxt.readFile = -1;
}
/* If record which is read from file is NULL, when preReadStartPtr is not set InvalidXlogPreReadStartPtr
* then exhchanging file, due to preread 64M now RecPtr < preReadStartPtr, so record still is got from
* preReadBuf and record still is bad. Therefore, preReadStartPtr need to set InvalidXlogPreReadStartPtr
* so that record is read from next file on disk instead of preReadBuf.
*/
state->preReadStartPtr = InvalidXlogPreReadStartPtr;
}
}
return record;
}
XLogRecord *XLogParallelReadNextRecord(XLogReaderState *xlogreader)
{
XLogRecord *record = NULL;
@ -953,8 +971,13 @@ XLogRecord *XLogParallelReadNextRecord(XLogReaderState *xlogreader)
t_thrd.xlog_cxt.failedSources = 0;
for (;;) {
char *errormsg = NULL;
if (SS_DORADO_CLUSTER) {
record = SSExtremeXLogReadRecordErgodic(xlogreader, InvalidXLogRecPtr, &errormsg);
} else {
record = ParallelReadRecord(xlogreader, InvalidXLogRecPtr, &errormsg, NULL);
}
record = ParallelReadRecord(xlogreader, InvalidXLogRecPtr, &errormsg);
t_thrd.xlog_cxt.ReadRecPtr = xlogreader->ReadRecPtr;
t_thrd.xlog_cxt.EndRecPtr = xlogreader->EndRecPtr;
g_instance.comm_cxt.predo_cxt.redoPf.read_ptr = t_thrd.xlog_cxt.ReadRecPtr;

View File

@ -5513,9 +5513,8 @@ XLogRecord *SSXLogReadRecordErgodic(XLogReaderState *state, XLogRecPtr RecPtr,
close(t_thrd.xlog_cxt.readFile);
t_thrd.xlog_cxt.readFile = -1;
}
/* If record which is read from file is NULL, when preReadStartPtr is not set InvalidXlogPreReadStartPtr
* then exhchanging file, due to preread 64M now RecPtr < preReadStartPtr, so record still is got from
* then exhchanging file, due to preread 64M now RecPtr < preReadStartPtr, so record still is got from
* preReadBuf and record still is bad. Therefore, preReadStartPtr need to set InvalidXlogPreReadStartPtr
* so that record is read from next file on disk instead of preReadBuf.
*/

View File

@ -1534,7 +1534,8 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab
/* when ss dorado replication enabled, "+data/pg_replication/" also need to copy when backup */
int pathNameLen = strlen("+data/pg_xlog");
if (strcmp(pathbuf, "./pg_xlog") == 0 || strncmp(pathbuf, "+data/pg_xlog", pathNameLen) == 0 || strcmp(pathbuf, "+data/pg_replication") == 0) {
if (strcmp(pathbuf, "./pg_xlog") == 0 || strncmp(pathbuf, "+data/pg_xlog", pathNameLen) == 0 ||
strcmp(pathbuf, "+data/pg_replication") == 0 || strcmp(pathbuf, "+data/pg_tblspc") == 0) {
if (!sizeonly) {
/* If pg_xlog is a symlink, write it as a directory anyway */
#ifndef WIN32