资源池化适配 pg_resetxlog 工具

This commit is contained in:
zhaobingyu
2022-12-07 09:53:27 +08:00
parent 93107e0d21
commit 97bc7cd191
3 changed files with 234 additions and 65 deletions

View File

@ -1,4 +1,7 @@
#This is the main CMAKE for build all components.
execute_process(
COMMAND ln -fs ${PROJECT_SRC_DIR}/gausskernel/storage/dss/dss_adaptor.cpp ${CMAKE_CURRENT_SOURCE_DIR}/dss_adaptor.cpp
)
# pg_resetxlog bin
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} TGT_resetxlog_SRC)

View File

@ -23,6 +23,7 @@ ifneq "$(MAKECMDGOALS)" "clean"
endif
endif
OBJS= pg_resetxlog.o $(WIN32RES)
OBJS += $(top_builddir)/src/gausskernel/storage/dss/dss_adaptor.o
all: pg_resetxlog

View File

@ -45,9 +45,6 @@
#include <sys/time.h>
#include <time.h>
#include <unistd.h>
#ifdef HAVE_GETOPT_H
#include <getopt.h>
#endif
#include "tool_common.h"
#include "access/transam.h"
@ -57,6 +54,9 @@
#include "access/xlog_internal.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "getopt_long.h"
#include "storage/dss/dss_adaptor.h"
#include "storage/file/fio_device.h"
extern int optind;
extern char* optarg;
@ -66,6 +66,10 @@ static XLogSegNo newXlogSegNo; /* XLogSegNo of new XLOG segment */
static bool guessed = false; /* T if we had to guess at any values */
static const char* progname;
static void DssInit(void);
static void SetGlobalDssParam(void);
static int ReadNonDssControlFile(int *fd, char * buffer);
static int ReadDssControlFile(int *fd, char *buffer);
static bool ReadControlFile(void);
static void GuessControlValues(void);
static void PrintControlValues(bool guessed);
@ -79,6 +83,16 @@ static void usage(void);
#define XLOG_NAME_LENGTH 24
const uint64 FREEZE_MAX_AGE = 2000000000;
typedef struct DssOptions
{
bool enable_dss;
int64 ss_nodeid;
char *vgname;
char *socketpath;
} DssOptions;
/* DSS connect parameters */
static DssOptions dss;
int main(int argc, char* argv[])
{
@ -97,6 +111,14 @@ int main(int argc, char* argv[])
char* DataDir = NULL;
int fd = -1;
uint64 tmpValue;
int option_index;
static struct option long_options[] = {{"enable-dss", no_argument, NULL, 1},
{"socketpath", required_argument, NULL, 2},
{NULL, 0, NULL, 0}};
/* init DSS parameters */
DssInit();
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_resetxlog"));
@ -117,7 +139,7 @@ int main(int argc, char* argv[])
}
}
while ((c = getopt(argc, argv, "fl:m:no:O:x:e:")) != -1) {
while ((c = getopt_long(argc, argv, "fl:m:no:OI:x:e:", long_options, &option_index)) != -1) {
switch (c) {
case 'f':
force = true;
@ -202,18 +224,56 @@ int main(int argc, char* argv[])
minXlogSegNo = (uint64)log_temp * XLogSegmentsPerXLogId + seg_temp;
break;
case 'I':
if (atoi(optarg) < MIN_INSTANCEID || atoi(optarg) > MAX_INSTANCEID) {
fprintf(stderr, _("%s: unexpected node id specified, valid range is %d - %d.\n"),
progname, MIN_INSTANCEID, MAX_INSTANCEID);
exit(1);
}
dss.ss_nodeid = atoi(optarg);
break;
case 1:
dss.enable_dss = true;
break;
case 2:
dss.socketpath = strdup(optarg);
break;
default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
}
}
/* the data directory is not necessary in dss mode */
if (optind >= argc) {
fprintf(stderr, _("%s: no data directory specified\n"), progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
}
if (dss.enable_dss) {
if (dss.socketpath == NULL) {
fprintf(stderr, _("%s: socketpath cannot be NULL when enable dss\n"), progname);
exit(1);
}
if (dss.ss_nodeid == INVALID_INSTANCEID) {
fprintf(stderr, _("%s: you should give an instance id when enable dss\n"), progname);
exit(1);
}
} else {
if (dss.socketpath != NULL) {
fprintf(stderr, _("%s: socketpath cannot be set when disable dss\n"), progname);
exit(1);
}
if (dss.ss_nodeid != INVALID_INSTANCEID) {
fprintf(stderr, _("%s: node id cannot be set when disable dss\n"), progname);
exit(1);
}
}
/*
* Don't allow pg_resetxlog to be run as root, to avoid overwriting the
* ownership of files in the data directory. We need only check for root
@ -229,18 +289,32 @@ int main(int argc, char* argv[])
#endif
DataDir = argv[optind];
if (DataDir == NULL || strlen(DataDir) == 0) {
fprintf(stderr, _("%s: could not change directory to \"<NULL>\""), progname);
fprintf(stderr, _("%s: the data directory is \"<NULL>\""), progname);
exit(1);
}
if (chdir(DataDir) < 0) {
fprintf(stderr, _("%s: could not change directory to \"%s\": %s\n"), progname, DataDir, strerror(errno));
if (!dss.enable_dss) {
if (chdir(DataDir) < 0) {
fprintf(stderr, _("%s: could not change directory to \"%s\": %s\n"),
progname, DataDir, strerror(errno));
exit(1);
}
} else {
dss.vgname = strdup(DataDir);
}
/* set DSS connect parameters */
if (dss.enable_dss)
SetGlobalDssParam();
/* register for dssapi */
if (dss_device_init(dss.socketpath, dss.enable_dss) != DSS_SUCCESS) {
fprintf(stderr, _("%s: fail to init dss device\n"), progname);
exit(1);
}
initDataPathStruct(false);
initDataPathStruct(dss.enable_dss);
/*
* Check for a postmaster lock file --- if there is one, refuse to
@ -353,6 +427,86 @@ int main(int argc, char* argv[])
return 0;
}
static void DssInit(void)
{
dss.enable_dss = false;
dss.ss_nodeid = INVALID_INSTANCEID;
dss.socketpath = NULL;
dss.vgname = NULL;
}
static void SetGlobalDssParam(void)
{
g_datadir.instance_id = dss.ss_nodeid;
errno_t rc = strcpy_s(g_datadir.dss_data, strlen(dss.vgname) + 1, dss.vgname);
securec_check_c(rc, "\0", "\0");
XLogSegmentSize = DSS_XLOG_SEG_SIZE;
}
/*
* Try to read the existing pg_control file.
*/
static int ReadNonDssControlFile(int *fd, char *buffer)
{
int len = 0;
len = read(*fd, buffer, PG_CONTROL_SIZE);
if (len < 0) {
fprintf(stderr, _("%s: could not read file \"%s\": %s\n"),
progname, T_XLOG_CONTROL_FILE, strerror(errno));
free(buffer);
buffer = NULL;
close(*fd);
*fd = -1;
exit(1);
}
return len;
}
/*
* Try to read the existing pg_control file in DSS mode.
*/
static int ReadDssControlFile(int *fd, char *buffer)
{
char *tmpDssSrc;
struct stat statbuf;
int len;
errno_t rc;
if (stat(T_XLOG_CONTROL_FILE, &statbuf) < 0) {
fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
progname, T_XLOG_CONTROL_FILE, strerror(errno));
free(buffer);
buffer = NULL;
close(*fd);
*fd = -1;
exit(1);
}
len = statbuf.st_size;
char *tmpBuffer = (char*)malloc(len + 1);
if (read(*fd, tmpBuffer, len) != len) {
fprintf(stderr, _("%s: could not read file \"%s\": %s\n"),
progname, T_XLOG_CONTROL_FILE, strerror(errno));
free(buffer);
buffer = NULL;
free(tmpBuffer);
tmpBuffer = NULL;
close(*fd);
*fd = -1;
exit(1);
}
tmpBuffer[len] = '\0';
tmpDssSrc = tmpBuffer;
tmpDssSrc += dss.ss_nodeid * PG_CONTROL_SIZE;
rc = memcpy_s(buffer, PG_CONTROL_SIZE, tmpDssSrc, PG_CONTROL_SIZE);
securec_check_c(rc, "\0", "\0");
free(tmpBuffer);
tmpBuffer = NULL;
return PG_CONTROL_SIZE;
}
/*
* Try to read the existing pg_control file.
*
@ -378,7 +532,7 @@ static bool ReadControlFile(void)
progname,
T_XLOG_CONTROL_FILE,
strerror(errno));
if (errno == ENOENT)
if (!dss.enable_dss && errno == ENOENT)
fprintf(stderr,
_("If you are sure the data directory path is correct, execute\n"
" touch %s\n"
@ -395,15 +549,10 @@ static bool ReadControlFile(void)
fd = -1;
exit(1);
}
len = read(fd, buffer, PG_CONTROL_SIZE);
if (len < 0) {
fprintf(stderr, _("%s: could not read file \"%s\": %s\n"), progname, T_XLOG_CONTROL_FILE, strerror(errno));
free(buffer);
buffer = NULL;
close(fd);
fd = -1;
exit(1);
}
if (dss.enable_dss)
len = ReadDssControlFile(&fd, buffer);
else
len = ReadNonDssControlFile(&fd, buffer);
close(fd);
fd = -1;
@ -590,7 +739,8 @@ static void PrintControlValues(bool guessed)
static void RewriteControlFile(void)
{
int fd = -1;
char buffer[PG_CONTROL_SIZE] = {0}; /* need not be aligned */
/* need to be aligned */
char buffer[PG_CONTROL_SIZE] __attribute__((__aligned__(ALIGNOF_BUFFER)));
errno_t rc = 0;
/*
* Adjust fields as needed to force an empty XLOG starting at
@ -640,7 +790,8 @@ static void RewriteControlFile(void)
rc = memcpy_s(buffer, PG_CONTROL_SIZE, &ControlFile, sizeof(ControlFileData));
securec_check_c(rc, "", "");
unlink(T_XLOG_CONTROL_FILE);
if (!dss.enable_dss)
unlink(T_XLOG_CONTROL_FILE);
fd = open(T_XLOG_CONTROL_FILE, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
@ -648,6 +799,18 @@ static void RewriteControlFile(void)
exit(1);
}
errno = 0;
if (dss.enable_dss) {
off_t seekpos = (off_t)BLCKSZ * dss.ss_nodeid;
if (lseek(fd, seekpos, SEEK_SET) < 0) {
fprintf(stderr, _("%s: Can not seek the node id %ld of \"%s\": %s\n"),
progname, dss.ss_nodeid, T_XLOG_CONTROL_FILE, strerror(errno));
close(fd);
fd = -1;
exit(1);
}
}
errno = 0;
if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE) {
/* if write didn't set errno, assume problem is no disk space */
@ -703,9 +866,9 @@ static void FindEndOfXLOG(void)
* assume any present have been used; in most scenarios this should be
* conservative, because of xlog.c's attempts to pre-create files.
*/
xldir = opendir(XLOGDIR);
xldir = opendir(T_SS_XLOGDIR);
if (xldir == NULL) {
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, XLOGDIR, strerror(errno));
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, T_SS_XLOGDIR, strerror(errno));
exit(1);
}
@ -749,7 +912,7 @@ static void FindEndOfXLOG(void)
#endif
if (errno) {
fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, XLOGDIR, strerror(errno));
fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, T_SS_XLOGDIR, strerror(errno));
closedir(xldir);
exit(1);
}
@ -781,16 +944,16 @@ static void KillExistingXLOG(void)
char path[MAXPGPATH] = {0};
int nRet = 0;
xldir = opendir(XLOGDIR);
xldir = opendir(T_SS_XLOGDIR);
if (xldir == NULL) {
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, XLOGDIR, strerror(errno));
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, T_SS_XLOGDIR, strerror(errno));
exit(1);
}
errno = 0;
while ((xlde = readdir(xldir)) != NULL) {
if (strlen(xlde->d_name) == 24 && strspn(xlde->d_name, "0123456789ABCDEF") == 24) {
nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", XLOGDIR, xlde->d_name);
nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", T_SS_XLOGDIR, xlde->d_name);
securec_check_ss_c(nRet, "\0", "\0");
if (unlink(path) < 0) {
fprintf(stderr, _("%s: could not delete file \"%s\": %s\n"), progname, path, strerror(errno));
@ -811,7 +974,7 @@ static void KillExistingXLOG(void)
#endif
if (errno) {
fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, XLOGDIR, strerror(errno));
fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, T_SS_XLOGDIR, strerror(errno));
closedir(xldir);
exit(1);
}
@ -826,13 +989,15 @@ static void KillExistingArchiveStatus(void)
DIR* xldir = NULL;
struct dirent* xlde = NULL;
char path[MAXPGPATH] = {0};
char archStatDir[MAXPGPATH] = {0};
int nRet = 0;
#define ARCHSTATDIR XLOGDIR "/archive_status"
nRet = snprintf_s(archStatDir, MAXPGPATH, MAXPGPATH - 1, "%s/%s", T_SS_XLOGDIR, "/archive_status");
securec_check_ss_c(nRet, "\0", "\0");
xldir = opendir(ARCHSTATDIR);
xldir = opendir(archStatDir);
if (xldir == NULL) {
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, ARCHSTATDIR, strerror(errno));
fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), progname, archStatDir, strerror(errno));
exit(1);
}
@ -840,7 +1005,7 @@ static void KillExistingArchiveStatus(void)
while ((xlde = readdir(xldir)) != NULL) {
if (strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
(strcmp(xlde->d_name + 24, ".ready") == 0 || strcmp(xlde->d_name + 24, ".done") == 0)) {
nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", ARCHSTATDIR, xlde->d_name);
nRet = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", archStatDir, xlde->d_name);
securec_check_ss_c(nRet, "\0", "\0");
if (unlink(path) < 0) {
fprintf(stderr, _("%s: could not delete file \"%s\": %s\n"), progname, path, strerror(errno));
@ -861,7 +1026,7 @@ static void KillExistingArchiveStatus(void)
#endif
if (errno) {
fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, ARCHSTATDIR, strerror(errno));
fprintf(stderr, _("%s: could not read from directory \"%s\": %s\n"), progname, archStatDir, strerror(errno));
closedir(xldir);
exit(1);
}
@ -874,23 +1039,18 @@ static void KillExistingArchiveStatus(void)
*/
static void WriteEmptyXLOG(void)
{
char* buffer = NULL;
XLogPageHeader page;
XLogLongPageHeader longpage;
XLogRecord* record = NULL;
pg_crc32 crc;
char path[MAXPGPATH] = {0};
int fd = -1;
int nbytes = 0;
uint64 nbytes = 0;
char* recptr = NULL;
errno_t rc = EOK;
/* need to be aligned */
char buffer[PG_CONTROL_SIZE] __attribute__((__aligned__(ALIGNOF_BUFFER)));
/* Use malloc() to ensure buffer is MAXALIGNED */
buffer = (char*)malloc(XLOG_BLCKSZ);
if (buffer == NULL) {
fprintf(stderr, _("%s: out of memory\n"), progname);
exit(1);
}
page = (XLogPageHeader)buffer;
rc = memset_s(buffer, XLOG_BLCKSZ, 0, XLOG_BLCKSZ);
securec_check_c(rc, "\0", "\0");
@ -928,7 +1088,8 @@ static void WriteEmptyXLOG(void)
rc = snprintf_s(path,
MAXPGPATH,
MAXPGPATH - 1,
XLOGDIR "/%08X%08X%08X",
"%s/%08X%08X%08X",
T_SS_XLOGDIR,
ControlFile.checkPointCopy.ThisTimeLineID,
(uint32)((newXlogSegNo) / XLogSegmentsPerXLogId),
(uint32)((newXlogSegNo) % XLogSegmentsPerXLogId));
@ -939,10 +1100,6 @@ static void WriteEmptyXLOG(void)
fd = open(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
fprintf(stderr, _("%s: could not open file \"%s\": %s\n"), progname, path, strerror(errno));
free(buffer);
buffer = NULL;
exit(1);
}
@ -952,37 +1109,42 @@ static void WriteEmptyXLOG(void)
if (errno == 0)
errno = ENOSPC;
fprintf(stderr, _("%s: could not write file \"%s\": %s\n"), progname, path, strerror(errno));
free(buffer);
buffer = NULL;
close(fd);
fd = -1;
exit(1);
}
/* Fill the rest of the file with zeroes */
rc = memset_s(buffer, XLOG_BLCKSZ, 0, XLOG_BLCKSZ);
securec_check_c(rc, "", "");
for (nbytes = XLOG_BLCKSZ; (unsigned int)(nbytes) < XLogSegSize; nbytes += XLOG_BLCKSZ) {
if (dss.enable_dss) {
/* extend file and fill space at once to avoid performance issue */
errno = 0;
if (write(fd, buffer, XLOG_BLCKSZ) != XLOG_BLCKSZ) {
if (errno == 0)
errno = ENOSPC;
if (ftruncate(fd, XLogSegSize) != 0) {
int save_errno = errno;
/* if write didn't set errno, assume problem is no disk space */
errno = save_errno ? save_errno : ENOSPC;
fprintf(stderr, _("%s: could not write file \"%s\": %s\n"), progname, path, strerror(errno));
free(buffer);
buffer = NULL;
close(fd);
fd = -1;
exit(1);
}
} else {
/* Fill the rest of the file with zeroes */
rc = memset_s(buffer, XLOG_BLCKSZ, 0, XLOG_BLCKSZ);
securec_check_c(rc, "", "");
for (nbytes = XLOG_BLCKSZ; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ) {
errno = 0;
if (write(fd, buffer, XLOG_BLCKSZ) != XLOG_BLCKSZ) {
if (errno == 0)
errno = ENOSPC;
fprintf(stderr, _("%s: could not write file \"%s\": %s\n"), progname, path, strerror(errno));
close(fd);
fd = -1;
exit(1);
}
}
}
if (fsync(fd) != 0) {
fprintf(stderr, _("%s: fsync error: %s\n"), progname, strerror(errno));
free(buffer);
buffer = NULL;
close(fd);
fd = -1;
exit(1);
@ -990,8 +1152,6 @@ static void WriteEmptyXLOG(void)
close(fd);
fd = -1;
free(buffer);
buffer = NULL;
}
static void usage(void)
@ -1001,7 +1161,7 @@ static void usage(void)
printf(_("Options:\n"));
printf(_(" -e XIDEPOCH set next transaction ID epoch\n"));
printf(_(" -f force update to be done\n"));
printf(_(" -l xlogfile force minimum WAL starting location for new transaction log\n"));
printf(_(" -l xlogfile force minimum WAL starting location for new transaction log\n"));
printf(_(" -m XID set next multitransaction ID\n"));
printf(_(" -n no update, just show extracted control values (for testing)\n"));
printf(_(" -o OID set next OID\n"));
@ -1009,6 +1169,11 @@ static void usage(void)
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -x XID set next transaction ID\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_(" -I INSTANCE_ID\n"));
printf(_(" the information of specified instance\n"));
printf(_(" --enable-dss enable shared storage mode\n"));
printf(_(" --socketpath=SOCKETPATH\n"));
printf(_(" dss connect socket file path\n"));
#if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS)))
printf("\nReport bugs to GaussDB support.\n");
#else