diff --git a/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp index 83bead554..5efe9b6d1 100755 --- a/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp @@ -393,18 +393,6 @@ void HandlePageRedoInterruptsImpl(uint64 clearRedoFdCountInc = 1) proc_exit(1); } - - static uint64 clearRedoFdCount = 0; - const uint64 clearRedoFdCountMask = 0x7FFFFFF; - clearRedoFdCount += clearRedoFdCountInc; - if (clearRedoFdCount > clearRedoFdCountMask && GetSMgrRelationHash() != NULL && - (g_redoWorker->role == REDO_PAGE_WORKER || g_redoWorker->role == REDO_PAGE_MNG)) { - clearRedoFdCount = 0; - long hash_num = hash_get_num_entries(GetSMgrRelationHash()); - if (hash_num >= MAX_CLEAR_SMGR_NUM) { - smgrcloseall(); - } - } } void HandlePageRedoInterrupts() @@ -972,15 +960,21 @@ void PageManagerProcSegFullSyncState(XLogRecParseState *parseState) void PageManagerProcSegPipeLineSyncState(XLogRecParseState *parseState) { - if (!SS_DISASTER_STANDBY_CLUSTER) { - WaitCurrentPipeLineRedoWorkersQueueEmpty(); + if (SS_DISASTER_STANDBY_CLUSTER) { + PageRedoPipeline *myRedoLine = &g_dispatcher->pageLines[g_redoWorker->slotId]; + const uint32 WorkerNumPerMng = myRedoLine->redoThdNum; + uint32 work_id = WorkerNumPerMng - 1; + parseState->nextrecord = NULL; + AddPageRedoItem(myRedoLine->redoThd[work_id], parseState); + } else { + WaitCurrentPipeLineRedoWorkersQueueEmpty(); + MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx); + + RedoPageManagerDdlAction(parseState); + + (void)MemoryContextSwitchTo(oldCtx); + XLogBlockParseStateRelease(parseState); } - MemoryContext oldCtx = MemoryContextSwitchTo(g_redoWorker->oldCtx); - - RedoPageManagerDdlAction(parseState); - - (void)MemoryContextSwitchTo(oldCtx); - XLogBlockParseStateRelease(parseState); } static void WaitNextBarrier(XLogRecParseState *parseState) @@ -1633,6 +1627,9 @@ void RedoPageWorkerMain() redoblockstate->blockparse.blockhead.end_ptr); CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_6]); break; + case BLOCK_DATA_SEG_EXTEND: + ProcSegPageCommonRedo(redoblockstate); + break; default: break; } @@ -2141,7 +2138,6 @@ void XLogReadPageWorkerMain() g_redoWorker->lastReplayedReadRecPtr = xlogreader->ReadRecPtr; g_redoWorker->lastReplayedEndRecPtr = xlogreader->EndRecPtr; PushToWorkerLsn(send_lsn_forwarder_for_check_to_hot_standby(g_redoWorker->lastReplayedEndRecPtr)); - if (FORCE_FINISH_ENABLED) { CheckAndDoForceFinish(xlogreader); } diff --git a/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp b/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp index b9e093c06..c0b5f5cd8 100644 --- a/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp @@ -542,7 +542,7 @@ triggered: } int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, - TimeLineID *readTLI) + TimeLineID *readTLI, char* xlogPath) { int readLen = -1; pg_atomic_write_u64(&g_dispatcher->rtoXlogBufState.targetRecPtr, targetRecPtr); @@ -556,43 +556,8 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, } else { if (ENABLE_DMS && ENABLE_DSS) { if (SS_DORADO_CLUSTER) { - for (int i = 0; i < DMS_MAX_INSTANCE; i++) { - if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') { - break; - } - - char *curPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i]; - readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, - xlogreader->readBuf, readTLI, curPath); - - /* read success, exchange index */ - if (readLen > 0) { - /* exchange index if xlog_list[0] is not current xlog path */ - if (i != 0) { - char exPath[MAXPGPATH]; - errorno = snprintf_s(exPath, MAXPGPATH, MAXPGPATH - 1, curPath); - securec_check_ss(errorno, "", ""); - errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, MAXPGPATH - 1, - g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0]); - securec_check_ss(errorno, "", ""); - errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0], MAXPGPATH, MAXPGPATH - 1, exPath); - securec_check_ss(errorno, "", ""); - } - break; - } - - if (t_thrd.xlog_cxt.readFile >= 0) { - close(t_thrd.xlog_cxt.readFile); - t_thrd.xlog_cxt.readFile = -1; - } - - /* If record which is read from file is NULL, when preReadStartPtr is not set InvalidXlogPreReadStartPtr - * then exhchanging file, due to preread 64M now RecPtr < preReadStartPtr, so record still is got from - * preReadBuf and record still is bad. Therefore, preReadStartPtr need to set InvalidXlogPreReadStartPtr - * so that record is read from next file on disk instead of preReadBuf. - */ - xlogreader->preReadStartPtr = InvalidXlogPreReadStartPtr; - } + readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, + xlogreader->readBuf, readTLI, xlogPath); } else { readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, xlogreader->readBuf, readTLI, NULL); @@ -601,6 +566,11 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, readLen = ParallelXLogPageReadFile(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI); } } + + /* current path haven't xlog file for this xlog */ + if (SS_DORADO_CLUSTER && readLen < 0) { + return -1; + } if (readLen > 0 || t_thrd.xlog_cxt.recoveryTriggered || !t_thrd.xlog_cxt.StandbyMode || DoEarlyExit()) { return readLen; @@ -613,7 +583,7 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, return readLen; } -int ParallelReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) +int ParallelReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, char* xlogPath) { int readLen; uint32 targetPageOff; @@ -635,7 +605,7 @@ int ParallelReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int req * so that we can validate it. */ readLen = ParallelXLogPageRead(state, pageptr, Max(reqLen, (int)SizeOfXLogShortPHD), state->currRecPtr, - &state->readPageTLI); + &state->readPageTLI, xlogPath); if (readLen < 0) { goto err; } @@ -653,7 +623,8 @@ int ParallelReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int req /* still not enough */ if (readLen < (int)XLogPageHeaderSize(hdr)) { - readLen = ParallelXLogPageRead(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, &state->readPageTLI); + readLen = ParallelXLogPageRead(state, pageptr, XLogPageHeaderSize(hdr), state->currRecPtr, + &state->readPageTLI, xlogPath); if (readLen < 0) { goto err; } @@ -678,7 +649,7 @@ err: return -1; } -XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) +XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg, char* xlogPath) { XLogRecord *record = NULL; XLogRecPtr targetPagePtr; @@ -737,7 +708,8 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * * enough byte to cover the whole record header, or at least the part of * it that fits on the same page. */ - readOff = ParallelReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); + readOff = ParallelReadPageInternal(state, targetPagePtr, Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + xlogPath); if (readOff < 0) { report_invalid_record(state, "read xlog page failed at %X/%X", (uint32)(RecPtr >> 32), (uint32)RecPtr); goto err; @@ -821,7 +793,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * uint32 gotlen; errno_t errorno = EOK; - readOff = ParallelReadPageInternal(state, targetPagePtr, XLOG_BLCKSZ); + readOff = ParallelReadPageInternal(state, targetPagePtr, XLOG_BLCKSZ, xlogPath); if (readOff < 0) { goto err; } @@ -838,7 +810,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * /* Wait for the next page to become available */ readOff = ParallelReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ)); + Min(total_len - gotlen + SizeOfXLogShortPHD, XLOG_BLCKSZ), xlogPath); if (readOff < 0) goto err; @@ -865,7 +837,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * /* Append the continuation from this page to the buffer */ pageHeaderSize = XLogPageHeaderSize(pageHeader); if (readOff < (int)pageHeaderSize) - readOff = ParallelReadPageInternal(state, targetPagePtr, pageHeaderSize); + readOff = ParallelReadPageInternal(state, targetPagePtr, pageHeaderSize, xlogPath); Assert((int)pageHeaderSize <= readOff); @@ -875,7 +847,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * len = pageHeader->xlp_rem_len; if (readOff < (int)(pageHeaderSize + len)) - readOff = ParallelReadPageInternal(state, targetPagePtr, pageHeaderSize + len); + readOff = ParallelReadPageInternal(state, targetPagePtr, pageHeaderSize + len, xlogPath); errorno = memcpy_s(buffer, total_len - gotlen, (char *)contdata, len); securec_check_c(errorno, "", ""); @@ -903,7 +875,7 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * XLByteAdvance(state->EndRecPtr, (pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len))); } else { /* Wait for the record data to become available */ - readOff = ParallelReadPageInternal(state, targetPagePtr, Min(targetRecOff + total_len, XLOG_BLCKSZ)); + readOff = ParallelReadPageInternal(state, targetPagePtr, Min(targetRecOff + total_len, XLOG_BLCKSZ), xlogPath); if (readOff < 0) { goto err; } @@ -945,6 +917,52 @@ err: return NULL; } +/* +* in ss dorado double cluster, we need read xlogpath ergodic, +* we will read xlog in path where last read success +*/ +XLogRecord *SSExtremeXLogReadRecordErgodic(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) +{ + XLogRecord *record = NULL; + errno_t errorno = 0; + + for (int i = 0; i < DMS_MAX_INSTANCE; i++) { + if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') { + break; + } + char *curPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i]; + record = ParallelReadRecord(state, InvalidXLogRecPtr, errormsg, curPath); + if (record != NULL) { + /* read success, exchange index */ + if (i != 0) { + /* read success, exchange index */ + char exPath[MAXPGPATH]; + errorno = snprintf_s(exPath, MAXPGPATH, MAXPGPATH - 1, curPath); + securec_check_ss(errorno, "", ""); + errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, MAXPGPATH - 1, + g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0]); + securec_check_ss(errorno, "", ""); + errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0], MAXPGPATH, MAXPGPATH - 1, exPath); + securec_check_ss(errorno, "", ""); + } + break; + } else { + if (t_thrd.xlog_cxt.readFile >= 0) { + close(t_thrd.xlog_cxt.readFile); + t_thrd.xlog_cxt.readFile = -1; + } + + /* If record which is read from file is NULL, when preReadStartPtr is not set InvalidXlogPreReadStartPtr + * then exhchanging file, due to preread 64M now RecPtr < preReadStartPtr, so record still is got from + * preReadBuf and record still is bad. Therefore, preReadStartPtr need to set InvalidXlogPreReadStartPtr + * so that record is read from next file on disk instead of preReadBuf. + */ + state->preReadStartPtr = InvalidXlogPreReadStartPtr; + } + } + return record; +} + XLogRecord *XLogParallelReadNextRecord(XLogReaderState *xlogreader) { XLogRecord *record = NULL; @@ -953,8 +971,13 @@ XLogRecord *XLogParallelReadNextRecord(XLogReaderState *xlogreader) t_thrd.xlog_cxt.failedSources = 0; for (;;) { char *errormsg = NULL; + + if (SS_DORADO_CLUSTER) { + record = SSExtremeXLogReadRecordErgodic(xlogreader, InvalidXLogRecPtr, &errormsg); + } else { + record = ParallelReadRecord(xlogreader, InvalidXLogRecPtr, &errormsg, NULL); + } - record = ParallelReadRecord(xlogreader, InvalidXLogRecPtr, &errormsg); t_thrd.xlog_cxt.ReadRecPtr = xlogreader->ReadRecPtr; t_thrd.xlog_cxt.EndRecPtr = xlogreader->EndRecPtr; g_instance.comm_cxt.predo_cxt.redoPf.read_ptr = t_thrd.xlog_cxt.ReadRecPtr; diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 8bcd577de..8685660e5 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -5513,9 +5513,8 @@ XLogRecord *SSXLogReadRecordErgodic(XLogReaderState *state, XLogRecPtr RecPtr, close(t_thrd.xlog_cxt.readFile); t_thrd.xlog_cxt.readFile = -1; } - /* If record which is read from file is NULL, when preReadStartPtr is not set InvalidXlogPreReadStartPtr - * then exhchanging file, due to preread 64M now RecPtr < preReadStartPtr, so record still is got from + * then exhchanging file, due to preread 64M now RecPtr < preReadStartPtr, so record still is got from * preReadBuf and record still is bad. Therefore, preReadStartPtr need to set InvalidXlogPreReadStartPtr * so that record is read from next file on disk instead of preReadBuf. */ diff --git a/src/gausskernel/storage/replication/basebackup.cpp b/src/gausskernel/storage/replication/basebackup.cpp index ea2e3b8ac..5c1d105ff 100755 --- a/src/gausskernel/storage/replication/basebackup.cpp +++ b/src/gausskernel/storage/replication/basebackup.cpp @@ -1534,7 +1534,8 @@ static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tab /* when ss dorado replication enabled, "+data/pg_replication/" also need to copy when backup */ int pathNameLen = strlen("+data/pg_xlog"); - if (strcmp(pathbuf, "./pg_xlog") == 0 || strncmp(pathbuf, "+data/pg_xlog", pathNameLen) == 0 || strcmp(pathbuf, "+data/pg_replication") == 0) { + if (strcmp(pathbuf, "./pg_xlog") == 0 || strncmp(pathbuf, "+data/pg_xlog", pathNameLen) == 0 || + strcmp(pathbuf, "+data/pg_replication") == 0 || strcmp(pathbuf, "+data/pg_tblspc") == 0) { if (!sizeonly) { /* If pg_xlog is a symlink, write it as a directory anyway */ #ifndef WIN32