From de6c90d964b0aa62c76af8f1a249e4eac14b9c9e Mon Sep 17 00:00:00 2001 From: zhangao_za <18829237393@163.com> Date: Fri, 17 Nov 2023 10:28:23 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B5=84=E6=BA=90=E6=B1=A0=E5=8C=96=E5=8F=8C?= =?UTF-8?q?=E9=9B=86=E7=BE=A4=E6=94=AF=E6=8C=81=E6=9E=81=E8=87=B4rto?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ddes/adapter/ss_reform_common.cpp | 15 ++++++---- .../access/transam/extreme_rto/xlog_read.cpp | 29 +++++++++++++++++-- .../storage/access/transam/xlog.cpp | 29 +++++++++++-------- src/include/ddes/dms/ss_dms_recovery.h | 2 +- src/include/ddes/dms/ss_reform_common.h | 2 +- 5 files changed, 56 insertions(+), 21 deletions(-) diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index 63c1abb47..be8f12368 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -220,13 +220,15 @@ void SSGetRecoveryXlogPath() securec_check_ss(rc, "", ""); } -void SSDoradoGetInstidList() +void SSDoradoGetXlogPathList() { + errno_t rc = EOK; for (int i = 0; i < DMS_MAX_INSTANCE; i++) { - g_instance.dms_cxt.SSRecoveryInfo.instid_list[i] = -1; + rc = memset_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, '\0', MAXPGPATH); + securec_check_c(rc, "\0", "\0"); } struct dirent *entry; - errno_t rc = EOK; + DIR* dssdir = opendir(g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name); if (dssdir == NULL) { ereport(PANIC, (errcode_for_file_access(), errmsg("Error opening dssdir %s", @@ -240,7 +242,9 @@ void SSDoradoGetInstidList() if (strlen(entry->d_name) > len) { rc = memmove_s(entry->d_name, MAX_PATH, entry->d_name + len, strlen(entry->d_name) - len + 1); securec_check_c(rc, "\0", "\0"); - g_instance.dms_cxt.SSRecoveryInfo.instid_list[index++] = atoi(entry->d_name); + rc = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[index++], MAXPGPATH, MAXPGPATH - 1, + "%s/%s%d", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_xlog", atoi(entry->d_name)); + securec_check_ss(rc, "", ""); } } else { continue; @@ -439,5 +443,6 @@ void SSDoradoRefreshMode(ClusterRunMode doradoMode) g_instance.dms_cxt.SSReformerControl.clusterRunMode = doradoMode; SSUpdateReformerCtrl(); LWLockRelease(ControlFileLock); - ereport(LOG, (errmsg("zatest: SSDoradoRefreshMode change control file cluster run mode to: %d", g_instance.dms_cxt.SSReformerControl.clusterRunMode))); + ereport(LOG, (errmsg("SSDoradoRefreshMode change control file cluster run mode to: %d", + g_instance.dms_cxt.SSReformerControl.clusterRunMode))); } diff --git a/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp b/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp index e6e4b4da9..103310251 100644 --- a/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp @@ -29,6 +29,7 @@ #include "replication/walreceiver.h" #include "replication/dcf_replication.h" #include "replication/shared_storage_walreceiver.h" +#include "replication/ss_cluster_replication.h" #include "storage/ipc.h" namespace extreme_rto { @@ -553,8 +554,26 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, readLen = ParallelXLogReadWorkBufRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI); } else { if (ENABLE_DMS && ENABLE_DSS) { - readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, - xlogreader->readBuf, readTLI, NULL); + if (SS_REPLICATION_DORADO_CLUSTER) { + for (int i = 0; i < DMS_MAX_INSTANCE; i++) { + if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') { + break; + } + readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, + xlogreader->readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i]); + + if(readLen > 0) { + break; + } + if(t_thrd.xlog_cxt.readFile >= 0) { + close(t_thrd.xlog_cxt.readFile); + t_thrd.xlog_cxt.readFile = -1; + } + } + } else { + readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, + xlogreader->readBuf, readTLI, NULL); + } } else { readLen = ParallelXLogPageReadFile(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI); } @@ -959,9 +978,15 @@ XLogRecord *XLogParallelReadNextRecord(XLogReaderState *xlogreader) latestRecordCrc = record->xl_crc; latestRecordLen = record->xl_tot_len; ADD_ABNORMAL_POSITION(9); + if (SS_REPLICATION_DORADO_CLUSTER) { + t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0; + } /* Great, got a record */ return record; } else { + if (SS_REPLICATION_DORADO_CLUSTER) { + t_thrd.xlog_cxt.ssXlogReadFailedTimes++; + } /* No valid record available from this source */ t_thrd.xlog_cxt.failedSources |= t_thrd.xlog_cxt.readSource; diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index fba7a041b..ccba5e5bf 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -5219,23 +5219,29 @@ static void CleanupBackupHistory(void) * we will read xlog in path where last read success */ XLogRecord *SSXLogReadRecordErgodic(XLogReaderState *state, XLogRecPtr RecPtr, - char **errormsg, char* dssdata, int* idList) { - char xlogPath[MAXPGPATH]; + char **errormsg) { XLogRecord *record = NULL; errno_t errorno = 0; for (int i = 0; i < DMS_MAX_INSTANCE; i++) { - if (idList[i] == -1) { + if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') { break; } - errorno = snprintf_s(xlogPath, MAXPGPATH, MAXPGPATH - 1, "%s/%s%d", dssdata, "pg_xlog", idList[i]); - securec_check_ss(errorno, "", ""); - record = XLogReadRecord(state, RecPtr, errormsg, true, xlogPath); + char *curPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i]; + record = XLogReadRecord(state, RecPtr, errormsg, true, curPath); if (record != NULL) { /* read success, exchange index */ - int buf = idList[i]; - idList[i] = idList[0]; - idList[0] = buf; + if (i != 0) { + /* read success, exchange index */ + char exPath[MAXPGPATH]; + errorno = snprintf_s(exPath, MAXPGPATH, MAXPGPATH - 1, curPath); + securec_check_ss(errorno, "", ""); + errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, MAXPGPATH - 1, + g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0]); + securec_check_ss(errorno, "", ""); + errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0], MAXPGPATH, MAXPGPATH - 1, exPath); + securec_check_ss(errorno, "", ""); + } break; } else { if (t_thrd.xlog_cxt.readFile >= 0) { @@ -5277,8 +5283,7 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in for (;;) { char *errormsg = NULL; if (SS_REPLICATION_DORADO_CLUSTER) { - record = SSXLogReadRecordErgodic(xlogreader, RecPtr, &errormsg, - g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, g_instance.dms_cxt.SSRecoveryInfo.instid_list); + record = SSXLogReadRecordErgodic(xlogreader, RecPtr, &errormsg); } else { record = XLogReadRecord(xlogreader, RecPtr, &errormsg); } @@ -9218,7 +9223,7 @@ void StartupXLOG(void) if (ENABLE_DMS && ENABLE_DSS) { SSGetRecoveryXlogPath(); if (SS_REPLICATION_DORADO_CLUSTER) { - SSDoradoGetInstidList(); + SSDoradoGetXlogPathList(); } xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); close_readFile_if_open(); diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index 980c5d941..901566f7c 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -89,7 +89,7 @@ typedef struct ss_recovery_info { char recovery_xlog_dir[MAXPGPATH]; int recovery_inst_id; volatile SSGlobalClusterState cluster_ondemand_status; - int instid_list[DMS_MAX_INSTANCE]; + char xlog_list[DMS_MAX_INSTANCE][MAXPGPATH];; LWLock* update_seg_lock; bool new_primary_reset_walbuf_flag; bool ready_to_startup; // when DB start (except failover), the flag will set true diff --git a/src/include/ddes/dms/ss_reform_common.h b/src/include/ddes/dms/ss_reform_common.h index 9e1386943..559a33fa1 100644 --- a/src/include/ddes/dms/ss_reform_common.h +++ b/src/include/ddes/dms/ss_reform_common.h @@ -43,7 +43,7 @@ int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XL int readLen); XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize); void SSGetRecoveryXlogPath(); -void SSDoradoGetInstidList(); +void SSDoradoGetXlogPathList(); void SSUpdateReformerCtrl(); void SSReadControlFile(int id, bool updateDmsCtx = false); void SSClearSegCache();