@ -220,13 +220,15 @@ void SSGetRecoveryXlogPath()
|
||||
securec_check_ss(rc, "", "");
|
||||
}
|
||||
|
||||
void SSDoradoGetInstidList()
|
||||
void SSDoradoGetXlogPathList()
|
||||
{
|
||||
errno_t rc = EOK;
|
||||
for (int i = 0; i < DMS_MAX_INSTANCE; i++) {
|
||||
g_instance.dms_cxt.SSRecoveryInfo.instid_list[i] = -1;
|
||||
rc = memset_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, '\0', MAXPGPATH);
|
||||
securec_check_c(rc, "\0", "\0");
|
||||
}
|
||||
struct dirent *entry;
|
||||
errno_t rc = EOK;
|
||||
|
||||
DIR* dssdir = opendir(g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name);
|
||||
if (dssdir == NULL) {
|
||||
ereport(PANIC, (errcode_for_file_access(), errmsg("Error opening dssdir %s",
|
||||
@ -240,7 +242,9 @@ void SSDoradoGetInstidList()
|
||||
if (strlen(entry->d_name) > len) {
|
||||
rc = memmove_s(entry->d_name, MAX_PATH, entry->d_name + len, strlen(entry->d_name) - len + 1);
|
||||
securec_check_c(rc, "\0", "\0");
|
||||
g_instance.dms_cxt.SSRecoveryInfo.instid_list[index++] = atoi(entry->d_name);
|
||||
rc = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[index++], MAXPGPATH, MAXPGPATH - 1,
|
||||
"%s/%s%d", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_xlog", atoi(entry->d_name));
|
||||
securec_check_ss(rc, "", "");
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
@ -439,5 +443,6 @@ void SSDoradoRefreshMode(ClusterRunMode doradoMode)
|
||||
g_instance.dms_cxt.SSReformerControl.clusterRunMode = doradoMode;
|
||||
SSUpdateReformerCtrl();
|
||||
LWLockRelease(ControlFileLock);
|
||||
ereport(LOG, (errmsg("zatest: SSDoradoRefreshMode change control file cluster run mode to: %d", g_instance.dms_cxt.SSReformerControl.clusterRunMode)));
|
||||
ereport(LOG, (errmsg("SSDoradoRefreshMode change control file cluster run mode to: %d",
|
||||
g_instance.dms_cxt.SSReformerControl.clusterRunMode)));
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@
|
||||
#include "replication/walreceiver.h"
|
||||
#include "replication/dcf_replication.h"
|
||||
#include "replication/shared_storage_walreceiver.h"
|
||||
#include "replication/ss_cluster_replication.h"
|
||||
#include "storage/ipc.h"
|
||||
|
||||
namespace extreme_rto {
|
||||
@ -553,8 +554,26 @@ int ParallelXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
|
||||
readLen = ParallelXLogReadWorkBufRead(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI);
|
||||
} else {
|
||||
if (ENABLE_DMS && ENABLE_DSS) {
|
||||
readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr,
|
||||
xlogreader->readBuf, readTLI, NULL);
|
||||
if (SS_REPLICATION_DORADO_CLUSTER) {
|
||||
for (int i = 0; i < DMS_MAX_INSTANCE; i++) {
|
||||
if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') {
|
||||
break;
|
||||
}
|
||||
readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr,
|
||||
xlogreader->readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i]);
|
||||
|
||||
if(readLen > 0) {
|
||||
break;
|
||||
}
|
||||
if(t_thrd.xlog_cxt.readFile >= 0) {
|
||||
close(t_thrd.xlog_cxt.readFile);
|
||||
t_thrd.xlog_cxt.readFile = -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
readLen = SSXLogPageRead(xlogreader, targetPagePtr, reqLen, targetRecPtr,
|
||||
xlogreader->readBuf, readTLI, NULL);
|
||||
}
|
||||
} else {
|
||||
readLen = ParallelXLogPageReadFile(xlogreader, targetPagePtr, reqLen, targetRecPtr, readTLI);
|
||||
}
|
||||
@ -959,9 +978,15 @@ XLogRecord *XLogParallelReadNextRecord(XLogReaderState *xlogreader)
|
||||
latestRecordCrc = record->xl_crc;
|
||||
latestRecordLen = record->xl_tot_len;
|
||||
ADD_ABNORMAL_POSITION(9);
|
||||
if (SS_REPLICATION_DORADO_CLUSTER) {
|
||||
t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0;
|
||||
}
|
||||
/* Great, got a record */
|
||||
return record;
|
||||
} else {
|
||||
if (SS_REPLICATION_DORADO_CLUSTER) {
|
||||
t_thrd.xlog_cxt.ssXlogReadFailedTimes++;
|
||||
}
|
||||
/* No valid record available from this source */
|
||||
t_thrd.xlog_cxt.failedSources |= t_thrd.xlog_cxt.readSource;
|
||||
|
||||
|
||||
@ -5219,23 +5219,29 @@ static void CleanupBackupHistory(void)
|
||||
* we will read xlog in path where last read success
|
||||
*/
|
||||
XLogRecord *SSXLogReadRecordErgodic(XLogReaderState *state, XLogRecPtr RecPtr,
|
||||
char **errormsg, char* dssdata, int* idList) {
|
||||
char xlogPath[MAXPGPATH];
|
||||
char **errormsg) {
|
||||
XLogRecord *record = NULL;
|
||||
errno_t errorno = 0;
|
||||
|
||||
for (int i = 0; i < DMS_MAX_INSTANCE; i++) {
|
||||
if (idList[i] == -1) {
|
||||
if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') {
|
||||
break;
|
||||
}
|
||||
errorno = snprintf_s(xlogPath, MAXPGPATH, MAXPGPATH - 1, "%s/%s%d", dssdata, "pg_xlog", idList[i]);
|
||||
securec_check_ss(errorno, "", "");
|
||||
record = XLogReadRecord(state, RecPtr, errormsg, true, xlogPath);
|
||||
char *curPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i];
|
||||
record = XLogReadRecord(state, RecPtr, errormsg, true, curPath);
|
||||
if (record != NULL) {
|
||||
/* read success, exchange index */
|
||||
int buf = idList[i];
|
||||
idList[i] = idList[0];
|
||||
idList[0] = buf;
|
||||
if (i != 0) {
|
||||
/* read success, exchange index */
|
||||
char exPath[MAXPGPATH];
|
||||
errorno = snprintf_s(exPath, MAXPGPATH, MAXPGPATH - 1, curPath);
|
||||
securec_check_ss(errorno, "", "");
|
||||
errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i], MAXPGPATH, MAXPGPATH - 1,
|
||||
g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0]);
|
||||
securec_check_ss(errorno, "", "");
|
||||
errorno = snprintf_s(g_instance.dms_cxt.SSRecoveryInfo.xlog_list[0], MAXPGPATH, MAXPGPATH - 1, exPath);
|
||||
securec_check_ss(errorno, "", "");
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
if (t_thrd.xlog_cxt.readFile >= 0) {
|
||||
@ -5277,8 +5283,7 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in
|
||||
for (;;) {
|
||||
char *errormsg = NULL;
|
||||
if (SS_REPLICATION_DORADO_CLUSTER) {
|
||||
record = SSXLogReadRecordErgodic(xlogreader, RecPtr, &errormsg,
|
||||
g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, g_instance.dms_cxt.SSRecoveryInfo.instid_list);
|
||||
record = SSXLogReadRecordErgodic(xlogreader, RecPtr, &errormsg);
|
||||
} else {
|
||||
record = XLogReadRecord(xlogreader, RecPtr, &errormsg);
|
||||
}
|
||||
@ -9218,7 +9223,7 @@ void StartupXLOG(void)
|
||||
if (ENABLE_DMS && ENABLE_DSS) {
|
||||
SSGetRecoveryXlogPath();
|
||||
if (SS_REPLICATION_DORADO_CLUSTER) {
|
||||
SSDoradoGetInstidList();
|
||||
SSDoradoGetXlogPathList();
|
||||
}
|
||||
xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER);
|
||||
close_readFile_if_open();
|
||||
|
||||
@ -89,7 +89,7 @@ typedef struct ss_recovery_info {
|
||||
char recovery_xlog_dir[MAXPGPATH];
|
||||
int recovery_inst_id;
|
||||
volatile SSGlobalClusterState cluster_ondemand_status;
|
||||
int instid_list[DMS_MAX_INSTANCE];
|
||||
char xlog_list[DMS_MAX_INSTANCE][MAXPGPATH];;
|
||||
LWLock* update_seg_lock;
|
||||
bool new_primary_reset_walbuf_flag;
|
||||
bool ready_to_startup; // when DB start (except failover), the flag will set true
|
||||
|
||||
@ -43,7 +43,7 @@ int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XL
|
||||
int readLen);
|
||||
XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize);
|
||||
void SSGetRecoveryXlogPath();
|
||||
void SSDoradoGetInstidList();
|
||||
void SSDoradoGetXlogPathList();
|
||||
void SSUpdateReformerCtrl();
|
||||
void SSReadControlFile(int id, bool updateDmsCtx = false);
|
||||
void SSClearSegCache();
|
||||
|
||||
Reference in New Issue
Block a user