From de64649afa23c4162bfabc7bf77ea60ced3c37b9 Mon Sep 17 00:00:00 2001 From: chenzhikai <895543892@qq.com> Date: Thu, 14 Sep 2023 21:16:55 +0800 Subject: [PATCH] =?UTF-8?q?build=20check=E6=80=A7=E8=83=BD=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/pg_ctl/pg_build.cpp | 2 +- src/bin/pg_ctl/pg_ctl.cpp | 107 ------------------ src/bin/pg_rewind/parsexlog.cpp | 67 ++++++++++- .../storage/access/transam/xlogreader.cpp | 92 +++++---------- src/include/access/xlogreader.h | 5 +- src/include/tool_common.h | 1 + 6 files changed, 96 insertions(+), 178 deletions(-) diff --git a/src/bin/pg_ctl/pg_build.cpp b/src/bin/pg_ctl/pg_build.cpp index 451a2d017..29d3a8ea9 100755 --- a/src/bin/pg_ctl/pg_build.cpp +++ b/src/bin/pg_ctl/pg_build.cpp @@ -609,7 +609,7 @@ void get_conninfo(const char* filename) } if (build_mode == CROSS_CLUSTER_FULL_BUILD || build_mode == CROSS_CLUSTER_INC_BUILD || - build_mode == CROSS_CLUSTER_STANDBY_FULL_BUILD || build_mode == BUILD_CHECK) { + build_mode == CROSS_CLUSTER_STANDBY_FULL_BUILD || ss_instance_config.dss.enable_dss) { /* For shared storage cluster */ conninfo_para = config_para_cross_cluster_build; } else { diff --git a/src/bin/pg_ctl/pg_ctl.cpp b/src/bin/pg_ctl/pg_ctl.cpp index 57bf63823..734a011ca 100755 --- a/src/bin/pg_ctl/pg_ctl.cpp +++ b/src/bin/pg_ctl/pg_ctl.cpp @@ -221,8 +221,6 @@ static char remove_member_file[MAXPGPATH]; static char change_role_file[MAXPGPATH]; static char* new_role = "passive"; static char start_minority_file[MAXPGPATH]; -static int get_instance_id(void); -static int ss_get_primary_id(void); static int ss_get_inter_node_nums(const char* interconn_url); bool ss_read_config(void); static unsigned int vote_num = 0; @@ -7338,111 +7336,6 @@ static void free_ctl() FREE_AND_RESET(ss_instance_config.dss.vgdata); } -static int get_instance_id(void) -{ - PGconn* conn = NULL; - PGresult* res = NULL; - const char* sql_string = "show ss_instance_id;"; - char* instid = NULL; - - conn = get_connectionex(); - if (PQstatus(conn) != CONNECTION_OK) { - pg_log(PG_WARNING, _("could not connect to server: %s"), PQerrorMessage(conn)); - return -1; - } - - /* Get local role from the local server. */ - res = PQexec(conn, sql_string); - if (PQresultStatus(res) != PGRES_TUPLES_OK) { - PQclear(res); - pg_log(PG_WARNING, _("could not get local role from the local server: %s"), PQerrorMessage(conn)); - close_connection(); - conn = NULL; - return -1; - } - - if (PQnfields(res) != 1 || PQntuples(res) != 1) { - int ntuples = PQntuples(res); - int nfields = PQnfields(res); - - PQclear(res); - pg_log(PG_WARNING, - _("invalid response from primary server: " - "Expected 1 tuple with 1 fields, got %d tuples with %d fields."), - ntuples, - nfields); - close_connection(); - conn = NULL; - return -1; - } - - instid = PQgetvalue(res, 0, 0); - - PQclear(res); - close_connection(); - conn = NULL; - - return atoi(instid); -} - -static int ss_get_primary_id(void) -{ - if (ss_instance_config.dss.socketpath == NULL) { - return -1; - } - - if (ss_instance_config.dss.vgname == NULL) { - return -1; - } - - int fd = -1; - int len = 0; - int err = 0; - struct stat statbuf; - char control_file_path[MAXPGPATH]; - - err = memset_s(control_file_path, MAXPGPATH, 0, MAXPGPATH); - securec_check_c(err, "\0", "\0"); - err = snprintf_s(control_file_path, MAXPGPATH, MAXPGPATH - 1, "%s/pg_control", ss_instance_config.dss.vgname); - securec_check_ss_c(err, "\0", "\0"); - - if (dss_device_init(ss_instance_config.dss.socketpath, true) != DSS_SUCCESS) { - pg_log(PG_WARNING, _("failed to init dss device\n")); - exit(1); - } - - fd = open(control_file_path, O_RDONLY | PG_BINARY, 0); - if(fd < 0) { - pg_log(PG_WARNING, _("failed to open pg_contol\n")); - close(fd); - fd = -1; - exit(1); - } - - if (stat(control_file_path, &statbuf) < 0) { - pg_log(PG_WARNING, _("failed to stat pg_contol\n")); - close(fd); - fd = -1; - exit(1); - } - - len = statbuf.st_size; - char* tmpBuffer = (char*)malloc(len + 1); - - if ((read(fd, tmpBuffer, len)) != len) { - close(fd); - fd = -1; - pg_log(PG_WARNING, _("failed to read pg_contol\n")); - exit(1); - } - - ss_reformer_ctrl_t* reformerCtrl; - - /* Calculate the offset to obtain the primary_id of the last page */ - reformerCtrl = (ss_reformer_ctrl_t*)(tmpBuffer + REFORMER_CTL_INSTANCEID * PG_CONTROL_SIZE); - return reformerCtrl->primaryInstId; -} - static int ss_get_inter_node_nums(const char* interconn_url) { errno_t rc; diff --git a/src/bin/pg_rewind/parsexlog.cpp b/src/bin/pg_rewind/parsexlog.cpp index 24b183cf1..11937ffc8 100644 --- a/src/bin/pg_rewind/parsexlog.cpp +++ b/src/bin/pg_rewind/parsexlog.cpp @@ -26,6 +26,7 @@ #include "replication/slot.h" #include "access/xlogreader.h" #include "catalog/pg_control.h" +#include "storage/file/fio_device.h" #include #define CONFIG_CASCADE_STANDBY "cascade_standby" @@ -131,6 +132,50 @@ XLogRecPtr readOneRecord(const char* datadir, XLogRecPtr ptr, TimeLineID tli) return endptr; } +int SSInitXlogDir(char*** xlogDirs) +{ + int xlogDirNum = 0; + *xlogDirs = (char**)malloc(SS_MAX_INST * sizeof(char*)); + if (*xlogDirs == NULL) { + return -1; + } + + for (int i = 0; i < SS_MAX_INST; i++) { + (*xlogDirs)[i] = (char*)malloc(MAXPGPATH * sizeof(char)); + if ((*xlogDirs)[i] == NULL) { + for (int j = 0; j < i; j++) { + free((*xlogDirs)[j]); + } + free(*xlogDirs); + return -1; + } + } + + DIR* dir = opendir(ss_instance_config.dss.vgname); + struct dirent* entry = NULL; + while (dir != NULL && (entry = readdir(dir)) != NULL) { + if (strncmp(entry->d_name, "pg_xlog", strlen("pg_xlog")) == 0) { + snprintf((*xlogDirs)[xlogDirNum], MAXPGPATH, "%s/%s", ss_instance_config.dss.vgname, entry->d_name); + xlogDirNum++; + if (xlogDirNum >= SS_MAX_INST) { + break; + } + } + } + closedir(dir); + return xlogDirNum; +} + +void FreeXlogDir(char** xlogDirs) +{ + if (ss_instance_config.dss.enable_dss && xlogDirs != NULL) { + for (int i = 0; i < SS_MAX_INST; i++) { + free(xlogDirs[i]); + } + free(xlogDirs); + } +} + BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRecPtr startrec, XLogRecPtr* lastchkptrec, TimeLineID* lastchkpttli, XLogRecPtr *lastchkptredo, uint32 term) { @@ -142,7 +187,6 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec #endif XLogRecPtr max_lsn; char returnmsg[MAX_ERR_MSG_LENTH] = {0}; - char dssxlogdir[MAXPGPATH] = {0}; pg_crc32 maxLsnCrc = 0; XLogRecord* record = NULL; XLogRecPtr searchptr; @@ -154,17 +198,26 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec int ret = 0; TimestampTz start_time; TimestampTz current_time; + /* maybe have some xlog dir in enable_dss mode */ + char** xlogDirs = NULL; + int xlogDirNum = 0; /* * local max lsn must be exists, or change to full build. */ if (ss_instance_config.dss.enable_dss) { - max_lsn = SSFindMaxLSN(datadir_target, returnmsg, XLOG_READER_MAX_MSGLENTH, &maxLsnCrc, ss_instance_config.dss.vgname); + xlogDirNum = SSInitXlogDir(&xlogDirs); + if (xlogDirNum <= 0) { + pg_log(PG_FATAL, "init xlog dirs failed\n"); + return BUILD_FATAL; + } + max_lsn = SSFindMaxLSN(datadir_target, returnmsg, XLOG_READER_MAX_MSGLENTH, &maxLsnCrc, xlogDirs, xlogDirNum); } else { max_lsn = FindMaxLSN(datadir_target, returnmsg, XLOG_READER_MAX_MSGLENTH, &maxLsnCrc); } if (XLogRecPtrIsInvalid(max_lsn)) { pg_fatal("find max lsn fail, errmsg:%s\n", returnmsg); + FreeXlogDir(xlogDirs); return BUILD_FATAL; } pg_log(PG_PROGRESS, "find max lsn success, %s", returnmsg); @@ -174,6 +227,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &readprivate); if (xlogreader == NULL) { pg_log(PG_ERROR, "out of memory\n"); + FreeXlogDir(xlogDirs); return BUILD_ERROR; } @@ -182,7 +236,6 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec securec_check_ss_c(ret, "\0", "\0"); get_conninfo(pg_conf_file); - searchptr = max_lsn; start_time = localGetCurrentTimestamp(); current_time = start_time; @@ -192,12 +245,13 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec "try 300s, could not find any common checkpoint, need to do full build\n"); XLogReaderFree(xlogreader); CloseXlogFile(); + FreeXlogDir(xlogDirs); return BUILD_FATAL; } uint8 info; if (ss_instance_config.dss.enable_dss) { - record = XLogReadRecordFromAllDir(ss_instance_config.dss.vgname, xlogreader, searchptr, &errormsg); + record = XLogReadRecordFromAllDir(xlogDirs, xlogDirNum, xlogreader, searchptr, &errormsg); } else { record = XLogReadRecord(xlogreader, searchptr, &errormsg); } @@ -212,6 +266,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec } XLogReaderFree(xlogreader); CloseXlogFile(); + FreeXlogDir(xlogDirs); return BUILD_FATAL; } @@ -226,7 +281,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec if (xlogreader->ReadRecPtr <= startrec && XLogRecGetRmid(xlogreader) == RM_XLOG_ID && (info == XLOG_CHECKPOINT_SHUTDOWN || info == XLOG_CHECKPOINT_ONLINE)) { if (checkCommonAncestorByXlog(xlogreader->ReadRecPtr, record->xl_crc, term) == true) { - CheckPoint checkPoint; + CheckPoint checkPoint; errorno = memcpy_s(&checkPoint, sizeof(CheckPoint), XLogRecGetData(xlogreader), sizeof(CheckPoint)); securec_check_c(errorno, "", ""); *lastchkptrec = searchptr; @@ -242,6 +297,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec if (increment_return_code != BUILD_SUCCESS) { XLogReaderFree(xlogreader); CloseXlogFile(); + FreeXlogDir(xlogDirs); return increment_return_code; } } @@ -252,6 +308,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec XLogReaderFree(xlogreader); CloseXlogFile(); + FreeXlogDir(xlogDirs); PG_CHECKBUILD_AND_RETURN(); /* no common checkpoint between target and source, need full build */ if (XLogRecPtrIsInvalid(searchptr)) { diff --git a/src/gausskernel/storage/access/transam/xlogreader.cpp b/src/gausskernel/storage/access/transam/xlogreader.cpp index 966cd3911..8891a678b 100644 --- a/src/gausskernel/storage/access/transam/xlogreader.cpp +++ b/src/gausskernel/storage/access/transam/xlogreader.cpp @@ -1326,60 +1326,40 @@ tryAgain: return XLOG_BLCKSZ; } -XLogRecord* XLogReadRecordFromAllDir(char* dirPath, XLogReaderState *xlogReader, XLogRecPtr curLsn, char** errorMsg) +XLogRecord* XLogReadRecordFromAllDir(char** xlogDirs, int xlogDirNum, XLogReaderState *xlogReader, XLogRecPtr curLsn, char** errorMsg) { - DIR* dir = opendir(dirPath); - struct dirent* entry = NULL; - char xlogDirStr[MAXPGPATH]; - errno_t rc = EOK; XLogRecord* record = NULL; - - while (dir != NULL && (entry = readdir(dir)) != NULL) { - if (strncmp(entry->d_name, "pg_xlog", strlen("pg_xlog")) == 0) { - rc = snprintf_s(xlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s/%s", dirPath, entry->d_name); - securec_check_ss_c(rc, "", ""); - record = XLogReadRecord(xlogReader, curLsn, errorMsg, true, xlogDirStr); - if (record != NULL) { - break; - } else { - CLOSE_FD(xlogreadfd); - } + for (int i = 0; i < xlogDirNum; i++) { + record = XLogReadRecord(xlogReader, curLsn, errorMsg, true, xlogDirs[i]); + if (record != NULL) { + break; + } else { + CLOSE_FD(xlogreadfd); } } - (void)closedir(dir); return record; } -void FindMaxXlogFileName(char* dirPath, char* maxXLogFileName) +void SSFindMaxXlogFileName(char* maxXLogFileName, char** xlogDirs, int xlogDirNum) { - DIR* dir = opendir(dirPath); - struct dirent* entry = NULL; - char xlogDirStr[MAXPGPATH]; errno_t rc = EOK; - - while (dir != NULL && (entry = readdir(dir)) != NULL) { - if (strncmp(entry->d_name, "pg_xlog", strlen("pg_xlog")) == 0) { - rc = snprintf_s(xlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s/%s", dirPath, entry->d_name); - securec_check_ss_c(rc, "", ""); - DIR* subDir = opendir(xlogDirStr); - struct dirent* subDirEntry = NULL; - while (subDir != NULL && (subDirEntry = readdir(subDir)) != NULL) { - if (strlen(subDirEntry->d_name) == 24 && strspn(subDirEntry->d_name, "0123456789ABCDEF") == 24 && - (strlen(maxXLogFileName) == 0 || strcmp(maxXLogFileName, subDirEntry->d_name) < 0)) { - rc = strncpy_s(maxXLogFileName, MAXPGPATH, subDirEntry->d_name, strlen(subDirEntry->d_name) + 1); - securec_check(rc, "", ""); - maxXLogFileName[strlen(subDirEntry->d_name)] = '\0'; - } + for (int i = 0; i < xlogDirNum; i++) { + DIR* subDir = opendir(xlogDirs[i]); + struct dirent* subDirEntry = NULL; + while (subDir != NULL && (subDirEntry = readdir(subDir)) != NULL) { + if (strlen(subDirEntry->d_name) == 24 && strspn(subDirEntry->d_name, "0123456789ABCDEF") == 24 && + (strlen(maxXLogFileName) == 0 || strcmp(maxXLogFileName, subDirEntry->d_name) < 0)) { + rc = strncpy_s(maxXLogFileName, MAXPGPATH, subDirEntry->d_name, strlen(subDirEntry->d_name) + 1); + securec_check(rc, "", ""); + maxXLogFileName[strlen(subDirEntry->d_name)] = '\0'; } - (void)closedir(subDir); } + (void)closedir(subDir); } - (void)closedir(dir); } -XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, char* dssDirStr) +XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, char** xlogDirs, int xlogDirNum) { - struct dirent *entry = NULL; XLogReaderState *xlogReader = NULL; XLogPageReadPrivate readPrivate = { .datadir = NULL, @@ -1395,13 +1375,10 @@ XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 bool findValidXLogFile = false; uint32 xlogReadLogid = -1; uint32 xlogReadLogSeg = -1; - char dssXlogDirStr[MAXPGPATH]; errno_t rc = EOK; - DIR* dssDir = NULL; - bool breakLoops = false; /* Ranking xlog from large to small */ - FindMaxXlogFileName(dssDirStr, maxXLogFileName); + SSFindMaxXlogFileName(maxXLogFileName, xlogDirs, xlogDirNum); if (sscanf_s(maxXLogFileName, "%08X%08X%08X", &tli, &xlogReadLogid, &xlogReadLogSeg) != 3) { rc = snprintf_s(returnMsg, XLOG_READER_MAX_MSGLENTH, XLOG_READER_MAX_MSGLENTH - 1, @@ -1432,29 +1409,18 @@ XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 /* Start to find the max lsn from a valid xlogfile */ startLsn = (xlogReadLogSeg * XLogSegSize) + ((XLogRecPtr)xlogReadLogid * XLogSegmentsPerXLogId * XLogSegSize); - while (!XLogRecPtrIsInvalid(startLsn) && !breakLoops) { - dssDir = opendir(dssDirStr); + while (!XLogRecPtrIsInvalid(startLsn) && !findValidXLogFile) { /* find the first valid record from the bigger xlogrecord. then break */ - while (dssDir != NULL && (entry = readdir(dssDir)) != NULL) { - if (strncmp(entry->d_name, "pg_xlog", strlen("pg_xlog")) == 0) { - rc = snprintf_s(dssXlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s/%s", dssDirStr, entry->d_name); -#ifndef FRONTEND - securec_check_ss(rc, "", ""); -#else - securec_check_ss_c(rc, "", ""); -#endif - curLsn = XLogFindNextRecord(xlogReader, startLsn, NULL, dssXlogDirStr); - if (XLogRecPtrIsInvalid(curLsn)) { - CLOSE_FD(xlogreadfd); - } else { - findValidXLogFile = true; - breakLoops = true; - break; - } + for (int i = 0; i < xlogDirNum; i++) { + curLsn = XLogFindNextRecord(xlogReader, startLsn, NULL, xlogDirs[i]); + if (XLogRecPtrIsInvalid(curLsn)) { + CLOSE_FD(xlogreadfd); + } else { + findValidXLogFile = true; + break; } } startLsn = startLsn - XLogSegSize; - (void)closedir(dssDir); } CLOSE_FD(xlogreadfd); @@ -1476,7 +1442,7 @@ XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 /* find the max lsn. */ while(true) { - record = XLogReadRecordFromAllDir(dssDirStr, xlogReader, curLsn, &errorMsg); + record = XLogReadRecordFromAllDir(xlogDirs, xlogDirNum, xlogReader, curLsn, &errorMsg); if (record == NULL) { break; } diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index bbd4ded83..3fe5e5dbc 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -42,7 +42,8 @@ extern void XLogReaderFree(XLogReaderState* state); /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ extern struct XLogRecord* XLogReadRecord( XLogReaderState* state, XLogRecPtr recptr, char** errormsg, bool doDecode = true, char* xlog_path = NULL); -extern struct XLogRecord* XLogReadRecordFromAllDir(char* dirPath, XLogReaderState *xlogReader, XLogRecPtr curLsn, char** errorMsg); +extern struct XLogRecord* XLogReadRecordFromAllDir( + char** xlogDirs, int xlogDirNum, XLogReaderState *xlogReader, XLogRecPtr curLsn, char** errorMsg); extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum, XLogPhyBlock *pblk = NULL); @@ -57,7 +58,7 @@ extern void XLogRecGetVMPhysicalBlock(const XLogReaderState *record, uint8 block extern void XLogReaderInvalReadState(XLogReaderState* state); extern XLogRecPtr XLogFindNextRecord(XLogReaderState* state, XLogRecPtr RecPtr, XLogRecPtr *endPtr = NULL, char* xlog_path = NULL); -extern XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, char* dssDirStr); +extern XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, char** xlogDirs, int xlogDirNum); extern XLogRecPtr FindMaxLSN(char* workingpath, char* returnmsg, int msg_len, pg_crc32* maxLsnCrc, uint32 *maxLsnLen = NULL, TimeLineID *returnTli = NULL, char* xlog_path = NULL); extern XLogRecPtr FindMinLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *minLsnCrc); diff --git a/src/include/tool_common.h b/src/include/tool_common.h index 3f84091ce..ba96059d0 100644 --- a/src/include/tool_common.h +++ b/src/include/tool_common.h @@ -26,6 +26,7 @@ #include "storage/file/fio_device_com.h" #define MAXPGPATH 1024 +#define SS_MAX_INST 64 #define T_SS_XLOGDIR \ (g_enable_dss ? g_datadir.xlogDir : "pg_xlog")