From 97bc7cd191fb159d7b1feb749345f0bfa65d78a0 Mon Sep 17 00:00:00 2001 From: zhaobingyu <1783692558@qq.com> Date: Wed, 7 Dec 2022 09:53:27 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B5=84=E6=BA=90=E6=B1=A0=E5=8C=96=E9=80=82?= =?UTF-8?q?=E9=85=8D=20pg=5Fresetxlog=20=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/pg_resetxlog/CMakeLists.txt | 3 + src/bin/pg_resetxlog/Makefile | 1 + src/bin/pg_resetxlog/pg_resetxlog.cpp | 295 ++++++++++++++++++++------ 3 files changed, 234 insertions(+), 65 deletions(-) diff --git a/src/bin/pg_resetxlog/CMakeLists.txt b/src/bin/pg_resetxlog/CMakeLists.txt index 8f2f3a415..f8b1a8894 100755 --- a/src/bin/pg_resetxlog/CMakeLists.txt +++ b/src/bin/pg_resetxlog/CMakeLists.txt @@ -1,4 +1,7 @@ #This is the main CMAKE for build all components. +execute_process( + COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/dss/dss_adaptor.cpp ${CMAKE_CURRENT_SOURCE_DIR}/dss_adaptor.cpp +) # pg_resetxlog bin AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} TGT_resetxlog_SRC) diff --git a/src/bin/pg_resetxlog/Makefile b/src/bin/pg_resetxlog/Makefile index 25d9810e3..35a3ffb42 100644 --- a/src/bin/pg_resetxlog/Makefile +++ b/src/bin/pg_resetxlog/Makefile @@ -23,6 +23,7 @@ ifneq "$(MAKECMDGOALS)" "clean" endif endif OBJS= pg_resetxlog.o $(WIN32RES) +OBJS += $(top_builddir)/src/gausskernel/storage/dss/dss_adaptor.o all: pg_resetxlog diff --git a/src/bin/pg_resetxlog/pg_resetxlog.cpp b/src/bin/pg_resetxlog/pg_resetxlog.cpp index 96434ad8d..2b22706cf 100644 --- a/src/bin/pg_resetxlog/pg_resetxlog.cpp +++ b/src/bin/pg_resetxlog/pg_resetxlog.cpp @@ -45,9 +45,6 @@ #include #include #include -#ifdef HAVE_GETOPT_H -#include -#endif #include "tool_common.h" #include "access/transam.h" @@ -57,6 +54,9 @@ #include "access/xlog_internal.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" +#include "getopt_long.h" +#include "storage/dss/dss_adaptor.h" +#include "storage/file/fio_device.h" extern int optind; extern char* optarg; @@ -66,6 +66,10 @@ static XLogSegNo newXlogSegNo; /* XLogSegNo of new XLOG segment */ static bool guessed = false; /* T if we had to guess at any values */ static const char* progname; +static void DssInit(void); +static void SetGlobalDssParam(void); +static int ReadNonDssControlFile(int *fd, char * buffer); +static int ReadDssControlFile(int *fd, char *buffer); static bool ReadControlFile(void); static void GuessControlValues(void); static void PrintControlValues(bool guessed); @@ -79,6 +83,16 @@ static void usage(void); #define XLOG_NAME_LENGTH 24 const uint64 FREEZE_MAX_AGE = 2000000000; +typedef struct DssOptions +{ + bool enable_dss; + int64 ss_nodeid; + char *vgname; + char *socketpath; +} DssOptions; + +/* DSS connect parameters */ +static DssOptions dss; int main(int argc, char* argv[]) { @@ -97,6 +111,14 @@ int main(int argc, char* argv[]) char* DataDir = NULL; int fd = -1; uint64 tmpValue; + int option_index; + + static struct option long_options[] = {{"enable-dss", no_argument, NULL, 1}, + {"socketpath", required_argument, NULL, 2}, + {NULL, 0, NULL, 0}}; + + /* init DSS parameters */ + DssInit(); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_resetxlog")); @@ -117,7 +139,7 @@ int main(int argc, char* argv[]) } } - while ((c = getopt(argc, argv, "fl:m:no:O:x:e:")) != -1) { + while ((c = getopt_long(argc, argv, "fl:m:no:OI:x:e:", long_options, &option_index)) != -1) { switch (c) { case 'f': force = true; @@ -202,18 +224,56 @@ int main(int argc, char* argv[]) minXlogSegNo = (uint64)log_temp * XLogSegmentsPerXLogId + seg_temp; break; + case 'I': + if (atoi(optarg) < MIN_INSTANCEID || atoi(optarg) > MAX_INSTANCEID) { + fprintf(stderr, _("%s: unexpected node id specified, valid range is %d - %d.\n"), + progname, MIN_INSTANCEID, MAX_INSTANCEID); + exit(1); + } + dss.ss_nodeid = atoi(optarg); + break; + + case 1: + dss.enable_dss = true; + break; + + case 2: + dss.socketpath = strdup(optarg); + break; + default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); } } + /* the data directory is not necessary in dss mode */ if (optind >= argc) { fprintf(stderr, _("%s: no data directory specified\n"), progname); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit(1); } + if (dss.enable_dss) { + if (dss.socketpath == NULL) { + fprintf(stderr, _("%s: socketpath cannot be NULL when enable dss\n"), progname); + exit(1); + } + if (dss.ss_nodeid == INVALID_INSTANCEID) { + fprintf(stderr, _("%s: you should give an instance id when enable dss\n"), progname); + exit(1); + } + } else { + if (dss.socketpath != NULL) { + fprintf(stderr, _("%s: socketpath cannot be set when disable dss\n"), progname); + exit(1); + } + if (dss.ss_nodeid != INVALID_INSTANCEID) { + fprintf(stderr, _("%s: node id cannot be set when disable dss\n"), progname); + exit(1); + } + } + /* * Don't allow pg_resetxlog to be run as root, to avoid overwriting the * ownership of files in the data directory. We need only check for root @@ -229,18 +289,32 @@ int main(int argc, char* argv[]) #endif DataDir = argv[optind]; - if (DataDir == NULL || strlen(DataDir) == 0) { - fprintf(stderr, _("%s: could not change directory to \"\""), progname); + fprintf(stderr, _("%s: the data directory is \"\""), progname); exit(1); } - if (chdir(DataDir) < 0) { - fprintf(stderr, _("%s: could not change directory to \"%s\": %s\n"), progname, DataDir, strerror(errno)); + if (!dss.enable_dss) { + if (chdir(DataDir) < 0) { + fprintf(stderr, _("%s: could not change directory to \"%s\": %s\n"), + progname, DataDir, strerror(errno)); + exit(1); + } + } else { + dss.vgname = strdup(DataDir); + } + + /* set DSS connect parameters */ + if (dss.enable_dss) + SetGlobalDssParam(); + + /* register for dssapi */ + if (dss_device_init(dss.socketpath, dss.enable_dss) != DSS_SUCCESS) { + fprintf(stderr, _("%s: fail to init dss device\n"), progname); exit(1); } - initDataPathStruct(false); + initDataPathStruct(dss.enable_dss); /* * Check for a postmaster lock file --- if there is one, refuse to @@ -353,6 +427,86 @@ int main(int argc, char* argv[]) return 0; } +static void DssInit(void) +{ + dss.enable_dss = false; + dss.ss_nodeid = INVALID_INSTANCEID; + dss.socketpath = NULL; + dss.vgname = NULL; +} + +static void SetGlobalDssParam(void) +{ + g_datadir.instance_id = dss.ss_nodeid; + errno_t rc = strcpy_s(g_datadir.dss_data, strlen(dss.vgname) + 1, dss.vgname); + securec_check_c(rc, "\0", "\0"); + XLogSegmentSize = DSS_XLOG_SEG_SIZE; +} + +/* + * Try to read the existing pg_control file. + */ +static int ReadNonDssControlFile(int *fd, char *buffer) +{ + int len = 0; + len = read(*fd, buffer, PG_CONTROL_SIZE); + if (len < 0) { + fprintf(stderr, _("%s: could not read file \"%s\": %s\n"), + progname, T_XLOG_CONTROL_FILE, strerror(errno)); + free(buffer); + buffer = NULL; + close(*fd); + *fd = -1; + exit(1); + } + return len; +} + +/* + * Try to read the existing pg_control file in DSS mode. + */ +static int ReadDssControlFile(int *fd, char *buffer) +{ + char *tmpDssSrc; + struct stat statbuf; + int len; + errno_t rc; + + if (stat(T_XLOG_CONTROL_FILE, &statbuf) < 0) { + fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"), + progname, T_XLOG_CONTROL_FILE, strerror(errno)); + free(buffer); + buffer = NULL; + close(*fd); + *fd = -1; + exit(1); + } + + len = statbuf.st_size; + + char *tmpBuffer = (char*)malloc(len + 1); + + if (read(*fd, tmpBuffer, len) != len) { + fprintf(stderr, _("%s: could not read file \"%s\": %s\n"), + progname, T_XLOG_CONTROL_FILE, strerror(errno)); + free(buffer); + buffer = NULL; + free(tmpBuffer); + tmpBuffer = NULL; + close(*fd); + *fd = -1; + exit(1); + } + tmpBuffer[len] = '\0'; + tmpDssSrc = tmpBuffer; + tmpDssSrc += dss.ss_nodeid * PG_CONTROL_SIZE; + rc = memcpy_s(buffer, PG_CONTROL_SIZE, tmpDssSrc, PG_CONTROL_SIZE); + securec_check_c(rc, "\0", "\0"); + free(tmpBuffer); + tmpBuffer = NULL; + return PG_CONTROL_SIZE; +} + /* * Try to read the existing pg_control file. * @@ -378,7 +532,7 @@ static bool ReadControlFile(void) progname, T_XLOG_CONTROL_FILE, strerror(errno)); - if (errno == ENOENT) + if (!dss.enable_dss && errno == ENOENT) fprintf(stderr, _("If you are sure the data directory path is correct, execute\n" " touch %s\n" @@ -395,15 +549,10 @@ static bool ReadControlFile(void) fd = -1; exit(1); } - len = read(fd, buffer, PG_CONTROL_SIZE); - if (len < 0) { - fprintf(stderr, _("%s: could not read file \"%s\": %s\n"), progname, T_XLOG_CONTROL_FILE, strerror(errno)); - free(buffer); - buffer = NULL; - close(fd); - fd = -1; - exit(1); - } + if (dss.enable_dss) + len = ReadDssControlFile(&fd, buffer); + else + len = ReadNonDssControlFile(&fd, buffer); close(fd); fd = -1; @@ -590,7 +739,8 @@ static void PrintControlValues(bool guessed) static void RewriteControlFile(void) { int fd = -1; - char buffer[PG_CONTROL_SIZE] = {0}; /* need not be aligned */ + /* need to be aligned */ + char buffer[PG_CONTROL_SIZE] __attribute__((__aligned__(ALIGNOF_BUFFER))); errno_t rc = 0; /* * Adjust fields as needed to force an empty XLOG starting at @@ -640,7 +790,8 @@ static void RewriteControlFile(void) rc = memcpy_s(buffer, PG_CONTROL_SIZE, &ControlFile, sizeof(ControlFileData)); securec_check_c(rc, "", ""); - unlink(T_XLOG_CONTROL_FILE); + if (!dss.enable_dss) + unlink(T_XLOG_CONTROL_FILE); fd = open(T_XLOG_CONTROL_FILE, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) { @@ -648,6 +799,18 @@ static void RewriteControlFile(void) exit(1); } + errno = 0; + if (dss.enable_dss) { + off_t seekpos = (off_t)BLCKSZ * dss.ss_nodeid; + if (lseek(fd, seekpos, SEEK_SET) < 0) { + fprintf(stderr, _("%s: Can not seek the node id %ld of \"%s\": %s\n"), + progname, dss.ss_nodeid, T_XLOG_CONTROL_FILE, strerror(errno)); + close(fd); + fd = -1; + exit(1); + } + } + errno = 0; if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE) { /* if write didn't set errno, assume problem is no disk space */ @@ -703,9 +866,9 @@ static void FindEndOfXLOG(void) * assume any present have been used; in most scenarios this should be * conservative, because of xlog.c's attempts to pre-create files. */ - xldir = opendir(XLOGDIR); + xldir = opendir(T_SS_XLOGDIR); if (xldir == NULL) { - fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, XLOGDIR, strerror(errno)); + fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, T_SS_XLOGDIR, strerror(errno)); exit(1); } @@ -749,7 +912,7 @@ static void FindEndOfXLOG(void) #endif if (errno) { - fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, XLOGDIR, strerror(errno)); + fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, T_SS_XLOGDIR, strerror(errno)); closedir(xldir); exit(1); } @@ -781,16 +944,16 @@ static void KillExistingXLOG(void) char path[MAXPGPATH] = {0}; int nRet = 0; - xldir = opendir(XLOGDIR); + xldir = opendir(T_SS_XLOGDIR); if (xldir == NULL) { - fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, XLOGDIR, strerror(errno)); + fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, T_SS_XLOGDIR, strerror(errno)); exit(1); } errno = 0; while ((xlde = readdir(xldir)) != NULL) { if (strlen(xlde->d_name) == 24 && strspn(xlde->d_name, "0123456789ABCDEF") == 24) { - nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", XLOGDIR, xlde->d_name); + nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", T_SS_XLOGDIR, xlde->d_name); securec_check_ss_c(nRet, "\0", "\0"); if (unlink(path) < 0) { fprintf(stderr, _("%s: could not delete file \"%s\": %s\n"), progname, path, strerror(errno)); @@ -811,7 +974,7 @@ static void KillExistingXLOG(void) #endif if (errno) { - fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, XLOGDIR, strerror(errno)); + fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, T_SS_XLOGDIR, strerror(errno)); closedir(xldir); exit(1); } @@ -826,13 +989,15 @@ static void KillExistingArchiveStatus(void) DIR* xldir = NULL; struct dirent* xlde = NULL; char path[MAXPGPATH] = {0}; + char archStatDir[MAXPGPATH] = {0}; int nRet = 0; -#define ARCHSTATDIR XLOGDIR "/archive_status" + nRet = snprintf_s(archStatDir, MAXPGPATH, MAXPGPATH - 1, "%s/%s", T_SS_XLOGDIR, "/archive_status"); + securec_check_ss_c(nRet, "\0", "\0"); - xldir = opendir(ARCHSTATDIR); + xldir = opendir(archStatDir); if (xldir == NULL) { - fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, ARCHSTATDIR, strerror(errno)); + fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, archStatDir, strerror(errno)); exit(1); } @@ -840,7 +1005,7 @@ static void KillExistingArchiveStatus(void) while ((xlde = readdir(xldir)) != NULL) { if (strspn(xlde->d_name, "0123456789ABCDEF") == 24 && (strcmp(xlde->d_name + 24, ".ready") == 0 || strcmp(xlde->d_name + 24, ".done") == 0)) { - nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", ARCHSTATDIR, xlde->d_name); + nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", archStatDir, xlde->d_name); securec_check_ss_c(nRet, "\0", "\0"); if (unlink(path) < 0) { fprintf(stderr, _("%s: could not delete file \"%s\": %s\n"), progname, path, strerror(errno)); @@ -861,7 +1026,7 @@ static void KillExistingArchiveStatus(void) #endif if (errno) { - fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, ARCHSTATDIR, strerror(errno)); + fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, archStatDir, strerror(errno)); closedir(xldir); exit(1); } @@ -874,23 +1039,18 @@ static void KillExistingArchiveStatus(void) */ static void WriteEmptyXLOG(void) { - char* buffer = NULL; XLogPageHeader page; XLogLongPageHeader longpage; XLogRecord* record = NULL; pg_crc32 crc; char path[MAXPGPATH] = {0}; int fd = -1; - int nbytes = 0; + uint64 nbytes = 0; char* recptr = NULL; errno_t rc = EOK; + /* need to be aligned */ + char buffer[PG_CONTROL_SIZE] __attribute__((__aligned__(ALIGNOF_BUFFER))); - /* Use malloc() to ensure buffer is MAXALIGNED */ - buffer = (char*)malloc(XLOG_BLCKSZ); - if (buffer == NULL) { - fprintf(stderr, _("%s: out of memory\n"), progname); - exit(1); - } page = (XLogPageHeader)buffer; rc = memset_s(buffer, XLOG_BLCKSZ, 0, XLOG_BLCKSZ); securec_check_c(rc, "\0", "\0"); @@ -928,7 +1088,8 @@ static void WriteEmptyXLOG(void) rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, - XLOGDIR "/%08X%08X%08X", + "%s/%08X%08X%08X", + T_SS_XLOGDIR, ControlFile.checkPointCopy.ThisTimeLineID, (uint32)((newXlogSegNo) / XLogSegmentsPerXLogId), (uint32)((newXlogSegNo) % XLogSegmentsPerXLogId)); @@ -939,10 +1100,6 @@ static void WriteEmptyXLOG(void) fd = open(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR); if (fd < 0) { fprintf(stderr, _("%s: could not open file \"%s\": %s\n"), progname, path, strerror(errno)); - - free(buffer); - buffer = NULL; - exit(1); } @@ -952,37 +1109,42 @@ static void WriteEmptyXLOG(void) if (errno == 0) errno = ENOSPC; fprintf(stderr, _("%s: could not write file \"%s\": %s\n"), progname, path, strerror(errno)); - free(buffer); - buffer = NULL; close(fd); fd = -1; exit(1); } - /* Fill the rest of the file with zeroes */ - rc = memset_s(buffer, XLOG_BLCKSZ, 0, XLOG_BLCKSZ); - securec_check_c(rc, "", ""); - for (nbytes = XLOG_BLCKSZ; (unsigned int)(nbytes) < XLogSegSize; nbytes += XLOG_BLCKSZ) { + if (dss.enable_dss) { + /* extend file and fill space at once to avoid performance issue */ errno = 0; - if (write(fd, buffer, XLOG_BLCKSZ) != XLOG_BLCKSZ) { - if (errno == 0) - errno = ENOSPC; + if (ftruncate(fd, XLogSegSize) != 0) { + int save_errno = errno; + /* if write didn't set errno, assume problem is no disk space */ + errno = save_errno ? save_errno : ENOSPC; fprintf(stderr, _("%s: could not write file \"%s\": %s\n"), progname, path, strerror(errno)); - - free(buffer); - buffer = NULL; close(fd); fd = -1; exit(1); } + } else { + /* Fill the rest of the file with zeroes */ + rc = memset_s(buffer, XLOG_BLCKSZ, 0, XLOG_BLCKSZ); + securec_check_c(rc, "", ""); + for (nbytes = XLOG_BLCKSZ; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ) { + errno = 0; + if (write(fd, buffer, XLOG_BLCKSZ) != XLOG_BLCKSZ) { + if (errno == 0) + errno = ENOSPC; + fprintf(stderr, _("%s: could not write file \"%s\": %s\n"), progname, path, strerror(errno)); + close(fd); + fd = -1; + exit(1); + } + } } if (fsync(fd) != 0) { fprintf(stderr, _("%s: fsync error: %s\n"), progname, strerror(errno)); - - free(buffer); - buffer = NULL; - close(fd); fd = -1; exit(1); @@ -990,8 +1152,6 @@ static void WriteEmptyXLOG(void) close(fd); fd = -1; - free(buffer); - buffer = NULL; } static void usage(void) @@ -1001,7 +1161,7 @@ static void usage(void) printf(_("Options:\n")); printf(_(" -e XIDEPOCH set next transaction ID epoch\n")); printf(_(" -f force update to be done\n")); - printf(_(" -l xlogfile force minimum WAL starting location for new transaction log\n")); + printf(_(" -l xlogfile force minimum WAL starting location for new transaction log\n")); printf(_(" -m XID set next multitransaction ID\n")); printf(_(" -n no update, just show extracted control values (for testing)\n")); printf(_(" -o OID set next OID\n")); @@ -1009,6 +1169,11 @@ static void usage(void) printf(_(" -V, --version output version information, then exit\n")); printf(_(" -x XID set next transaction ID\n")); printf(_(" -?, --help show this help, then exit\n")); + printf(_(" -I INSTANCE_ID\n")); + printf(_(" the information of specified instance\n")); + printf(_(" --enable-dss enable shared storage mode\n")); + printf(_(" --socketpath=SOCKETPATH\n")); + printf(_(" dss connect socket file path\n")); #if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS))) printf("\nReport bugs to GaussDB support.\n"); #else