build check性能优化

This commit is contained in:
chenzhikai
2023-09-14 21:16:55 +08:00
parent a2484b7b0d
commit de64649afa
6 changed files with 96 additions and 178 deletions

View File

@ -609,7 +609,7 @@ void get_conninfo(const char* filename)
}
if (build_mode == CROSS_CLUSTER_FULL_BUILD || build_mode == CROSS_CLUSTER_INC_BUILD ||
build_mode == CROSS_CLUSTER_STANDBY_FULL_BUILD || build_mode == BUILD_CHECK) {
build_mode == CROSS_CLUSTER_STANDBY_FULL_BUILD || ss_instance_config.dss.enable_dss) {
/* For shared storage cluster */
conninfo_para = config_para_cross_cluster_build;
} else {

View File

@ -221,8 +221,6 @@ static char remove_member_file[MAXPGPATH];
static char change_role_file[MAXPGPATH];
static char* new_role = "passive";
static char start_minority_file[MAXPGPATH];
static int get_instance_id(void);
static int ss_get_primary_id(void);
static int ss_get_inter_node_nums(const char* interconn_url);
bool ss_read_config(void);
static unsigned int vote_num = 0;
@ -7338,111 +7336,6 @@ static void free_ctl()
FREE_AND_RESET(ss_instance_config.dss.vgdata);
}
static int get_instance_id(void)
{
PGconn* conn = NULL;
PGresult* res = NULL;
const char* sql_string = "show ss_instance_id;";
char* instid = NULL;
conn = get_connectionex();
if (PQstatus(conn) != CONNECTION_OK) {
pg_log(PG_WARNING, _("could not connect to server: %s"), PQerrorMessage(conn));
return -1;
}
/* Get local role from the local server. */
res = PQexec(conn, sql_string);
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
PQclear(res);
pg_log(PG_WARNING, _("could not get local role from the local server: %s"), PQerrorMessage(conn));
close_connection();
conn = NULL;
return -1;
}
if (PQnfields(res) != 1 || PQntuples(res) != 1) {
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
PQclear(res);
pg_log(PG_WARNING,
_("invalid response from primary server: "
"Expected 1 tuple with 1 fields, got %d tuples with %d fields."),
ntuples,
nfields);
close_connection();
conn = NULL;
return -1;
}
instid = PQgetvalue(res, 0, 0);
PQclear(res);
close_connection();
conn = NULL;
return atoi(instid);
}
static int ss_get_primary_id(void)
{
if (ss_instance_config.dss.socketpath == NULL) {
return -1;
}
if (ss_instance_config.dss.vgname == NULL) {
return -1;
}
int fd = -1;
int len = 0;
int err = 0;
struct stat statbuf;
char control_file_path[MAXPGPATH];
err = memset_s(control_file_path, MAXPGPATH, 0, MAXPGPATH);
securec_check_c(err, "\0", "\0");
err = snprintf_s(control_file_path, MAXPGPATH, MAXPGPATH - 1, "%s/pg_control", ss_instance_config.dss.vgname);
securec_check_ss_c(err, "\0", "\0");
if (dss_device_init(ss_instance_config.dss.socketpath, true) != DSS_SUCCESS) {
pg_log(PG_WARNING, _("failed to init dss device\n"));
exit(1);
}
fd = open(control_file_path, O_RDONLY | PG_BINARY, 0);
if(fd < 0) {
pg_log(PG_WARNING, _("failed to open pg_contol\n"));
close(fd);
fd = -1;
exit(1);
}
if (stat(control_file_path, &statbuf) < 0) {
pg_log(PG_WARNING, _("failed to stat pg_contol\n"));
close(fd);
fd = -1;
exit(1);
}
len = statbuf.st_size;
char* tmpBuffer = (char*)malloc(len + 1);
if ((read(fd, tmpBuffer, len)) != len) {
close(fd);
fd = -1;
pg_log(PG_WARNING, _("failed to read pg_contol\n"));
exit(1);
}
ss_reformer_ctrl_t* reformerCtrl;
/* Calculate the offset to obtain the primary_id of the last page */
reformerCtrl = (ss_reformer_ctrl_t*)(tmpBuffer + REFORMER_CTL_INSTANCEID * PG_CONTROL_SIZE);
return reformerCtrl->primaryInstId;
}
static int ss_get_inter_node_nums(const char* interconn_url)
{
errno_t rc;

View File

@ -26,6 +26,7 @@
#include "replication/slot.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "storage/file/fio_device.h"
#include <stdlib.h>
#define CONFIG_CASCADE_STANDBY "cascade_standby"
@ -131,6 +132,50 @@ XLogRecPtr readOneRecord(const char* datadir, XLogRecPtr ptr, TimeLineID tli)
return endptr;
}
int SSInitXlogDir(char*** xlogDirs)
{
int xlogDirNum = 0;
*xlogDirs = (char**)malloc(SS_MAX_INST * sizeof(char*));
if (*xlogDirs == NULL) {
return -1;
}
for (int i = 0; i < SS_MAX_INST; i++) {
(*xlogDirs)[i] = (char*)malloc(MAXPGPATH * sizeof(char));
if ((*xlogDirs)[i] == NULL) {
for (int j = 0; j < i; j++) {
free((*xlogDirs)[j]);
}
free(*xlogDirs);
return -1;
}
}
DIR* dir = opendir(ss_instance_config.dss.vgname);
struct dirent* entry = NULL;
while (dir != NULL && (entry = readdir(dir)) != NULL) {
if (strncmp(entry->d_name, "pg_xlog", strlen("pg_xlog")) == 0) {
snprintf((*xlogDirs)[xlogDirNum], MAXPGPATH, "%s/%s", ss_instance_config.dss.vgname, entry->d_name);
xlogDirNum++;
if (xlogDirNum >= SS_MAX_INST) {
break;
}
}
}
closedir(dir);
return xlogDirNum;
}
void FreeXlogDir(char** xlogDirs)
{
if (ss_instance_config.dss.enable_dss && xlogDirs != NULL) {
for (int i = 0; i < SS_MAX_INST; i++) {
free(xlogDirs[i]);
}
free(xlogDirs);
}
}
BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRecPtr startrec, XLogRecPtr* lastchkptrec,
TimeLineID* lastchkpttli, XLogRecPtr *lastchkptredo, uint32 term)
{
@ -142,7 +187,6 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
#endif
XLogRecPtr max_lsn;
char returnmsg[MAX_ERR_MSG_LENTH] = {0};
char dssxlogdir[MAXPGPATH] = {0};
pg_crc32 maxLsnCrc = 0;
XLogRecord* record = NULL;
XLogRecPtr searchptr;
@ -154,17 +198,26 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
int ret = 0;
TimestampTz start_time;
TimestampTz current_time;
/* maybe have some xlog dir in enable_dss mode */
char** xlogDirs = NULL;
int xlogDirNum = 0;
/*
* local max lsn must be exists, or change to full build.
*/
if (ss_instance_config.dss.enable_dss) {
max_lsn = SSFindMaxLSN(datadir_target, returnmsg, XLOG_READER_MAX_MSGLENTH, &maxLsnCrc, ss_instance_config.dss.vgname);
xlogDirNum = SSInitXlogDir(&xlogDirs);
if (xlogDirNum <= 0) {
pg_log(PG_FATAL, "init xlog dirs failed\n");
return BUILD_FATAL;
}
max_lsn = SSFindMaxLSN(datadir_target, returnmsg, XLOG_READER_MAX_MSGLENTH, &maxLsnCrc, xlogDirs, xlogDirNum);
} else {
max_lsn = FindMaxLSN(datadir_target, returnmsg, XLOG_READER_MAX_MSGLENTH, &maxLsnCrc);
}
if (XLogRecPtrIsInvalid(max_lsn)) {
pg_fatal("find max lsn fail, errmsg:%s\n", returnmsg);
FreeXlogDir(xlogDirs);
return BUILD_FATAL;
}
pg_log(PG_PROGRESS, "find max lsn success, %s", returnmsg);
@ -174,6 +227,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &readprivate);
if (xlogreader == NULL) {
pg_log(PG_ERROR, "out of memory\n");
FreeXlogDir(xlogDirs);
return BUILD_ERROR;
}
@ -182,7 +236,6 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
securec_check_ss_c(ret, "\0", "\0");
get_conninfo(pg_conf_file);
searchptr = max_lsn;
start_time = localGetCurrentTimestamp();
current_time = start_time;
@ -192,12 +245,13 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
"try 300s, could not find any common checkpoint, need to do full build\n");
XLogReaderFree(xlogreader);
CloseXlogFile();
FreeXlogDir(xlogDirs);
return BUILD_FATAL;
}
uint8 info;
if (ss_instance_config.dss.enable_dss) {
record = XLogReadRecordFromAllDir(ss_instance_config.dss.vgname, xlogreader, searchptr, &errormsg);
record = XLogReadRecordFromAllDir(xlogDirs, xlogDirNum, xlogreader, searchptr, &errormsg);
} else {
record = XLogReadRecord(xlogreader, searchptr, &errormsg);
}
@ -212,6 +266,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
}
XLogReaderFree(xlogreader);
CloseXlogFile();
FreeXlogDir(xlogDirs);
return BUILD_FATAL;
}
@ -226,7 +281,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
if (xlogreader->ReadRecPtr <= startrec && XLogRecGetRmid(xlogreader) == RM_XLOG_ID &&
(info == XLOG_CHECKPOINT_SHUTDOWN || info == XLOG_CHECKPOINT_ONLINE)) {
if (checkCommonAncestorByXlog(xlogreader->ReadRecPtr, record->xl_crc, term) == true) {
CheckPoint checkPoint;
CheckPoint checkPoint;
errorno = memcpy_s(&checkPoint, sizeof(CheckPoint), XLogRecGetData(xlogreader), sizeof(CheckPoint));
securec_check_c(errorno, "", "");
*lastchkptrec = searchptr;
@ -242,6 +297,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
if (increment_return_code != BUILD_SUCCESS) {
XLogReaderFree(xlogreader);
CloseXlogFile();
FreeXlogDir(xlogDirs);
return increment_return_code;
}
}
@ -252,6 +308,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
XLogReaderFree(xlogreader);
CloseXlogFile();
FreeXlogDir(xlogDirs);
PG_CHECKBUILD_AND_RETURN();
/* no common checkpoint between target and source, need full build */
if (XLogRecPtrIsInvalid(searchptr)) {

View File

@ -1326,60 +1326,40 @@ tryAgain:
return XLOG_BLCKSZ;
}
XLogRecord* XLogReadRecordFromAllDir(char* dirPath, XLogReaderState *xlogReader, XLogRecPtr curLsn, char** errorMsg)
XLogRecord* XLogReadRecordFromAllDir(char** xlogDirs, int xlogDirNum, XLogReaderState *xlogReader, XLogRecPtr curLsn, char** errorMsg)
{
DIR* dir = opendir(dirPath);
struct dirent* entry = NULL;
char xlogDirStr[MAXPGPATH];
errno_t rc = EOK;
XLogRecord* record = NULL;
while (dir != NULL && (entry = readdir(dir)) != NULL) {
if (strncmp(entry->d_name, "pg_xlog", strlen("pg_xlog")) == 0) {
rc = snprintf_s(xlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s/%s", dirPath, entry->d_name);
securec_check_ss_c(rc, "", "");
record = XLogReadRecord(xlogReader, curLsn, errorMsg, true, xlogDirStr);
if (record != NULL) {
break;
} else {
CLOSE_FD(xlogreadfd);
}
for (int i = 0; i < xlogDirNum; i++) {
record = XLogReadRecord(xlogReader, curLsn, errorMsg, true, xlogDirs[i]);
if (record != NULL) {
break;
} else {
CLOSE_FD(xlogreadfd);
}
}
(void)closedir(dir);
return record;
}
void FindMaxXlogFileName(char* dirPath, char* maxXLogFileName)
void SSFindMaxXlogFileName(char* maxXLogFileName, char** xlogDirs, int xlogDirNum)
{
DIR* dir = opendir(dirPath);
struct dirent* entry = NULL;
char xlogDirStr[MAXPGPATH];
errno_t rc = EOK;
while (dir != NULL && (entry = readdir(dir)) != NULL) {
if (strncmp(entry->d_name, "pg_xlog", strlen("pg_xlog")) == 0) {
rc = snprintf_s(xlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s/%s", dirPath, entry->d_name);
securec_check_ss_c(rc, "", "");
DIR* subDir = opendir(xlogDirStr);
struct dirent* subDirEntry = NULL;
while (subDir != NULL && (subDirEntry = readdir(subDir)) != NULL) {
if (strlen(subDirEntry->d_name) == 24 && strspn(subDirEntry->d_name, "0123456789ABCDEF") == 24 &&
(strlen(maxXLogFileName) == 0 || strcmp(maxXLogFileName, subDirEntry->d_name) < 0)) {
rc = strncpy_s(maxXLogFileName, MAXPGPATH, subDirEntry->d_name, strlen(subDirEntry->d_name) + 1);
securec_check(rc, "", "");
maxXLogFileName[strlen(subDirEntry->d_name)] = '\0';
}
for (int i = 0; i < xlogDirNum; i++) {
DIR* subDir = opendir(xlogDirs[i]);
struct dirent* subDirEntry = NULL;
while (subDir != NULL && (subDirEntry = readdir(subDir)) != NULL) {
if (strlen(subDirEntry->d_name) == 24 && strspn(subDirEntry->d_name, "0123456789ABCDEF") == 24 &&
(strlen(maxXLogFileName) == 0 || strcmp(maxXLogFileName, subDirEntry->d_name) < 0)) {
rc = strncpy_s(maxXLogFileName, MAXPGPATH, subDirEntry->d_name, strlen(subDirEntry->d_name) + 1);
securec_check(rc, "", "");
maxXLogFileName[strlen(subDirEntry->d_name)] = '\0';
}
(void)closedir(subDir);
}
(void)closedir(subDir);
}
(void)closedir(dir);
}
XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, char* dssDirStr)
XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, char** xlogDirs, int xlogDirNum)
{
struct dirent *entry = NULL;
XLogReaderState *xlogReader = NULL;
XLogPageReadPrivate readPrivate = {
.datadir = NULL,
@ -1395,13 +1375,10 @@ XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32
bool findValidXLogFile = false;
uint32 xlogReadLogid = -1;
uint32 xlogReadLogSeg = -1;
char dssXlogDirStr[MAXPGPATH];
errno_t rc = EOK;
DIR* dssDir = NULL;
bool breakLoops = false;
/* Ranking xlog from large to small */
FindMaxXlogFileName(dssDirStr, maxXLogFileName);
SSFindMaxXlogFileName(maxXLogFileName, xlogDirs, xlogDirNum);
if (sscanf_s(maxXLogFileName, "%08X%08X%08X", &tli, &xlogReadLogid, &xlogReadLogSeg) != 3) {
rc = snprintf_s(returnMsg, XLOG_READER_MAX_MSGLENTH, XLOG_READER_MAX_MSGLENTH - 1,
@ -1432,29 +1409,18 @@ XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32
/* Start to find the max lsn from a valid xlogfile */
startLsn = (xlogReadLogSeg * XLogSegSize) + ((XLogRecPtr)xlogReadLogid * XLogSegmentsPerXLogId * XLogSegSize);
while (!XLogRecPtrIsInvalid(startLsn) && !breakLoops) {
dssDir = opendir(dssDirStr);
while (!XLogRecPtrIsInvalid(startLsn) && !findValidXLogFile) {
/* find the first valid record from the bigger xlogrecord. then break */
while (dssDir != NULL && (entry = readdir(dssDir)) != NULL) {
if (strncmp(entry->d_name, "pg_xlog", strlen("pg_xlog")) == 0) {
rc = snprintf_s(dssXlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s/%s", dssDirStr, entry->d_name);
#ifndef FRONTEND
securec_check_ss(rc, "", "");
#else
securec_check_ss_c(rc, "", "");
#endif
curLsn = XLogFindNextRecord(xlogReader, startLsn, NULL, dssXlogDirStr);
if (XLogRecPtrIsInvalid(curLsn)) {
CLOSE_FD(xlogreadfd);
} else {
findValidXLogFile = true;
breakLoops = true;
break;
}
for (int i = 0; i < xlogDirNum; i++) {
curLsn = XLogFindNextRecord(xlogReader, startLsn, NULL, xlogDirs[i]);
if (XLogRecPtrIsInvalid(curLsn)) {
CLOSE_FD(xlogreadfd);
} else {
findValidXLogFile = true;
break;
}
}
startLsn = startLsn - XLogSegSize;
(void)closedir(dssDir);
}
CLOSE_FD(xlogreadfd);
@ -1476,7 +1442,7 @@ XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32
/* find the max lsn. */
while(true) {
record = XLogReadRecordFromAllDir(dssDirStr, xlogReader, curLsn, &errorMsg);
record = XLogReadRecordFromAllDir(xlogDirs, xlogDirNum, xlogReader, curLsn, &errorMsg);
if (record == NULL) {
break;
}

View File

@ -42,7 +42,8 @@ extern void XLogReaderFree(XLogReaderState* state);
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
extern struct XLogRecord* XLogReadRecord(
XLogReaderState* state, XLogRecPtr recptr, char** errormsg, bool doDecode = true, char* xlog_path = NULL);
extern struct XLogRecord* XLogReadRecordFromAllDir(char* dirPath, XLogReaderState *xlogReader, XLogRecPtr curLsn, char** errorMsg);
extern struct XLogRecord* XLogReadRecordFromAllDir(
char** xlogDirs, int xlogDirNum, XLogReaderState *xlogReader, XLogRecPtr curLsn, char** errorMsg);
extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileNode *rnode, ForkNumber *forknum,
BlockNumber *blknum, XLogPhyBlock *pblk = NULL);
@ -57,7 +58,7 @@ extern void XLogRecGetVMPhysicalBlock(const XLogReaderState *record, uint8 block
extern void XLogReaderInvalReadState(XLogReaderState* state);
extern XLogRecPtr XLogFindNextRecord(XLogReaderState* state, XLogRecPtr RecPtr, XLogRecPtr *endPtr = NULL, char* xlog_path = NULL);
extern XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, char* dssDirStr);
extern XLogRecPtr SSFindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, char** xlogDirs, int xlogDirNum);
extern XLogRecPtr FindMaxLSN(char* workingpath, char* returnmsg, int msg_len, pg_crc32* maxLsnCrc,
uint32 *maxLsnLen = NULL, TimeLineID *returnTli = NULL, char* xlog_path = NULL);
extern XLogRecPtr FindMinLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *minLsnCrc);

View File

@ -26,6 +26,7 @@
#include "storage/file/fio_device_com.h"
#define MAXPGPATH 1024
#define SS_MAX_INST 64
#define T_SS_XLOGDIR \
(g_enable_dss ? g_datadir.xlogDir : "pg_xlog")