双集群build check

This commit is contained in:
chenzhikai
2023-07-18 16:42:52 +08:00
parent c61c753586
commit 5c2d39f3ad
14 changed files with 369 additions and 75 deletions

View File

@ -1014,9 +1014,9 @@ static bool ReceiveAndUnpackTarFile(PGconn* conn, PGresult* res, int rownum)
}
/* pg_control will be written into a specified postion of main stanby corresponding to */
if (instance_config.dss.enable_dss && strcmp(filename, "+data/pg_control") == 0) {
if (ss_instance_config.dss.enable_dss && strcmp(filename, "+data/pg_control") == 0) {
pg_log(PG_WARNING, _("file size %d. \n"), r);
int main_standby_id = instance_config.dss.instance_id;
int main_standby_id = ss_instance_config.dss.instance_id;
off_t seekpos = (off_t)BLCKSZ * main_standby_id;
fseek(file, seekpos, SEEK_SET);
}
@ -1180,7 +1180,7 @@ static bool BaseBackup(const char* dirname, uint32 term)
errno_t rc = EOK;
int nRet = 0;
struct stat st;
char *dssdir = instance_config.dss.vgdata;
char *dssdir = ss_instance_config.dss.vgname;
pqsignal(SIGCHLD, BuildReaper); /* handle child termination */
/* concat file and path */
@ -1227,7 +1227,7 @@ static bool BaseBackup(const char* dirname, uint32 term)
delete_datadir(dirname);
/* delete data/ and pg_tblspc/ in dss, but keep .config */
if (instance_config.dss.enable_dss) {
if (ss_instance_config.dss.enable_dss) {
delete_datadir(dssdir);
}
show_full_build_process("clear old target dir success");
@ -1533,7 +1533,7 @@ static bool BaseBackup(const char* dirname, uint32 term)
* in order to avoid sharing the same dssserver session,
* we will not start logstreaming here
*/
if (!instance_config.dss.enable_dss) {
if (!ss_instance_config.dss.enable_dss) {
BeginGetXlogbyStream(xlogstart, timeline, sysidentifier, xlog_location, term, res);
}
@ -1657,7 +1657,7 @@ static bool BaseBackup(const char* dirname, uint32 term)
}
#endif
if (instance_config.dss.enable_dss) {
if (ss_instance_config.dss.enable_dss) {
BeginGetXlogbyStream(xlogstart, timeline, sysidentifier, xlog_location, term, res);
}
@ -1749,7 +1749,7 @@ static bool BaseBackup(const char* dirname, uint32 term)
/* fsync all data come from source */
if (!no_need_fsync) {
show_full_build_process("starting fsync all files come from source.");
if (instance_config.dss.enable_dss) {
if (ss_instance_config.dss.enable_dss) {
(void) fsync_pgdata(dssdir);
} else {
(void) fsync_pgdata(basedir);
@ -1762,7 +1762,7 @@ static bool BaseBackup(const char* dirname, uint32 term)
if (g_is_obsmode) {
backupDWFileSuccess = backup_dw_file(basedir);
} else {
if (instance_config.dss.enable_dss) {
if (ss_instance_config.dss.enable_dss) {
backupDWFileSuccess = ss_backup_dw_file(dssdir);
} else {
backupDWFileSuccess = backup_dw_file(dirname);
@ -1789,7 +1789,7 @@ static bool BaseBackup(const char* dirname, uint32 term)
return false;
}
if (instance_config.dss.enable_dss) {
if (ss_instance_config.dss.enable_dss) {
nRet = snprintf_s(tblspcPath, MAXPGPATH, MAXPGPATH, "%s/pg_tblspc", dssdir);
} else {
nRet = snprintf_s(tblspcPath, MAXPGPATH, MAXPGPATH, "%s/pg_tblspc", dirname);
@ -2370,7 +2370,7 @@ static bool ss_backup_dw_file(const char* target_dir)
/* Delete the dw file, if it exists. */
rc = snprintf_s(dw_path, PATH_MAX, PATH_MAX - 1, "%s/pg_doublewrite%d", target_dir,
instance_config.dss.instance_id);
ss_instance_config.dss.instance_id);
securec_check_ss_c(rc, "\0", "\0");
/* check whether directory is exits or not, if not exit then mkdir it */
@ -2506,10 +2506,10 @@ void get_xlog_location(char (&xlog_location)[MAXPGPATH])
struct stat stbuf;
int nRet = 0;
if (instance_config.dss.enable_dss) {
char *dssdir = instance_config.dss.vgdata;
if (ss_instance_config.dss.enable_dss) {
char *dssdir = ss_instance_config.dss.vgname;
nRet = snprintf_s(xlog_location, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog%d", dssdir,
instance_config.dss.instance_id);
ss_instance_config.dss.instance_id);
} else {
nRet = snprintf_s(xlog_location, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog", basedir);
}
@ -2725,8 +2725,8 @@ bool RenameTblspcDir(char *dataDir)
return true;
}
if (instance_config.dss.enable_dss) {
char *dssdir = instance_config.dss.vgdata;
if (ss_instance_config.dss.enable_dss) {
char *dssdir = ss_instance_config.dss.vgname;
rc = snprintf_s(tblspcParentPath, MAXPGPATH, MAXPGPATH - 1, "%s/%s", dssdir, "pg_tblspc");
} else {
rc = snprintf_s(tblspcParentPath, MAXPGPATH, MAXPGPATH - 1, "%s/%s", dataDir, "pg_tblspc");

View File

@ -72,7 +72,6 @@ char g_repl_uuid[MAX_VALUE_LEN] = {0};
int g_replconn_idx = -1;
int g_replication_type = -1;
bool is_cross_region_build = false;
SSInstanceConfig instance_config;
#define RT_WITH_DUMMY_STANDBY 0
#define RT_WITH_MULTI_STANDBY 1
@ -610,7 +609,7 @@ void get_conninfo(const char* filename)
}
if (build_mode == CROSS_CLUSTER_FULL_BUILD || build_mode == CROSS_CLUSTER_INC_BUILD ||
build_mode == CROSS_CLUSTER_STANDBY_FULL_BUILD) {
build_mode == CROSS_CLUSTER_STANDBY_FULL_BUILD || build_mode == BUILD_CHECK) {
/* For shared storage cluster */
conninfo_para = config_para_cross_cluster_build;
} else {
@ -1238,7 +1237,7 @@ int IsBeginWith(const char *str1, char *str2)
bool SsIsSkipPath(const char* dirname, bool needskipall)
{
if (!instance_config.dss.enable_dss) {
if (!ss_instance_config.dss.enable_dss) {
return false;
}
@ -1271,7 +1270,7 @@ bool SsIsSkipPath(const char* dirname, bool needskipall)
char instanceId[MAX_INSTANCEID_LEN] = {0};
errno_t rc = EOK;
rc = snprintf_s(instanceId, sizeof(instanceId), sizeof(instanceId) - 1, "%d",
instance_config.dss.instance_id);
ss_instance_config.dss.instance_id);
securec_check_ss_c(rc, "\0", "\0");
/* not skip pg_xlog directory in file systerm */
if (strlen(dirname) > dirNameLen && strcmp(dirname + dirNameLen, instanceId) != 0)
@ -1575,7 +1574,7 @@ void delete_datadir(const char* dirname)
*/
if (strncmp(dirname, "+", 1) == 0 ) {
nRet = snprintf_s(xlogpath, MAXPGPATH, sizeof(xlogpath) - 1, "%s/pg_xlog%d", dirname,
instance_config.dss.instance_id);
ss_instance_config.dss.instance_id);
} else {
nRet = snprintf_s(xlogpath, MAXPGPATH, sizeof(xlogpath) - 1, "%s/pg_xlog", dirname);
}
@ -1788,7 +1787,7 @@ void fsync_pgdata(const char *pg_data)
if (is_dss_file(pg_data)) {
errorno = snprintf_s(pg_xlog, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog%d", pg_data,
instance_config.dss.instance_id);
ss_instance_config.dss.instance_id);
} else {
errorno = snprintf_s(pg_xlog, MAXPGPATH, MAXPGPATH - 1, "%s/pg_xlog", pg_data);
}

View File

@ -44,7 +44,6 @@ extern char conninfo_global[MAX_REPLNODE_NUM][MAX_VALUE_LEN];
extern int standby_recv_timeout;
extern int standby_connect_timeout; /* 120 sec = default */
extern char gaussdb_state_file[MAXPGPATH];
extern SSInstanceConfig instance_config;
void delete_datadir(const char* dirname);

View File

@ -223,7 +223,7 @@ static char* new_role = "passive";
static char start_minority_file[MAXPGPATH];
static int get_instance_id(void);
static int ss_get_primary_id(void);
static bool ss_read_dorado_config(void);
bool ss_read_config(void);
static unsigned int vote_num = 0;
static unsigned int xmode = 2;
static char postport_lock_file[MAXPGPATH];
@ -368,6 +368,7 @@ static void do_overwrite(void);
static void do_full_restore(void);
static void kill_proton_force(void);
static void SigAlarmHandler(int arg);
static bool DoBuildCheck(uint32 term);
int ExecuteCmd(const char* command, struct timeval timeout);
static int find_guc_optval(const char** optlines, const char* optname, char* optval);
@ -4373,6 +4374,10 @@ static void do_build(uint32 term)
else if (build_mode == COPY_SECURE_FILES_BUILD) {
buildSuccess = DoCopySecureFileBuild(term);
}
/* check need we build and can we do inc build */
else if (build_mode == BUILD_CHECK) {
buildSuccess = DoBuildCheck(term);
}
if (!buildSuccess) {
exit(1);
@ -4805,6 +4810,116 @@ static bool GetRemoteNodeName()
return true;
}
/*
* @@GaussDB@@
* Brief : the check need to build
* Description :
* Notes :
*/
bool build_check_main(uint32 term)
{
BuildErrorCode status = BUILD_SUCCESS;
PGresult* res = NULL;
errno_t errorno = EOK;
char* sysidentifier = NULL;
uint32 timeline;
char connstrSource[MAXPGPATH] = {0};
g_inc_fail_reason = DEFAULT_REASON;
CheckBuildParameter();
check_nested_pgconf();
/*
* Save connection info from command line or openGauss file.
*/
get_conninfo(pg_conf_file);
/* Find a available connection. */
streamConn = check_and_conn(standby_connect_timeout, standby_recv_timeout, term);
if (streamConn == NULL) {
pg_log(PG_WARNING, _("could not connect to server.\n"));
g_inc_fail_reason = CONN_PRIMARY_FAIL;
return false;
}
/* Concate connection str to primary host for performing rewind. */
errorno = sprintf_s(connstrSource,
sizeof(connstrSource),
"host=%s port=%s dbname=postgres application_name=gs_rewind connect_timeout=5 rw_timeout=600",
(streamConn->pghost != NULL) ? streamConn->pghost : streamConn->pghostaddr,
streamConn->pgport);
securec_check_ss_c(errorno, "\0", "\0");
/*
* Run IDENTIFY_SYSTEM so we can get sys identifier and timeline.
*/
res = PQexec(streamConn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
pg_log(PG_WARNING, _("could not identify system: %s"), PQerrorMessage(streamConn));
PQfinish(streamConn);
streamConn = NULL;
PQclear(res);
return false;
}
if (PQntuples(res) != 1 || PQnfields(res) != 4) {
pg_log(PG_WARNING, _("could not identify system, got %d rows and %d fields\n"), PQntuples(res), PQnfields(res));
PQfinish(streamConn);
streamConn = NULL;
PQclear(res);
return false;
}
sysidentifier = pg_strdup(PQgetvalue(res, 0, 0));
timeline = atoi(PQgetvalue(res, 0, 1));
PQclear(res);
if (streamConn != NULL) {
PQfinish(streamConn);
streamConn = NULL;
}
/* Pretend to be gs_rewind and perform rewind. */
progname = "gs_rewind";
status = do_build_check(pg_data, connstrSource, sysidentifier, timeline, term);
libpqDisconnect();
if (sysidentifier != NULL) {
pg_free(sysidentifier);
sysidentifier = NULL;
}
return (status == BUILD_SUCCESS);
}
/*
* build_mode:
* check: check we are or not need to build
*/
static bool DoBuildCheck(uint32 term)
{
bool buildSuccess = false;
char cwd[MAXPGPATH];
if (getcwd(cwd, MAXPGPATH) == NULL) {
pg_fatal(_("could not identify current directory: %s"), gs_strerror(errno));
exit(1);
}
pg_log(PG_WARNING, _("current workdir is (%s).\n"), cwd);
check_nested_pgconf();
replconn_num = get_replconn_number(pg_conf_file);
buildSuccess = build_check_main(term);
if (!buildSuccess) {
pg_log(PG_WARNING, _("%s failed(%s), need to do full build\n"), BuildModeToString(build_mode), pg_data);
} else {
pg_log(PG_WARNING, _("%s completed(%s).\n"), BuildModeToString(build_mode), pg_data);
}
return buildSuccess;
}
/*
* build_mode:
* AUTO_BUILD: do gs_rewind first, after failed 3 times, do full
@ -4962,18 +5077,18 @@ static int find_guc_optval(const char** optlines, const char* optname, char* opt
int ret;
errno_t rc = EOK;
lineno = find_gucoption(optlines, (const char*)optname, NULL, NULL, &offset, &len);
lineno = find_gucoption(optlines, (const char*)optname, NULL, NULL, &offset, &len, '\'');
if (lineno != INVALID_LINES_IDX) {
rc = strncpy_s(optval, MAX_VALUE_LEN, optlines[lineno] + offset + 1, (size_t)(Min(len - 1, MAX_VALUE_LEN) - 1));
rc = strncpy_s(optval, MAX_VALUE_LEN, optlines[lineno] + offset, (size_t)(Min(len, MAX_VALUE_LEN)));
securec_check_c(rc, "", "");
return lineno;
}
ret = snprintf_s(def_optname, sizeof(def_optname), sizeof(def_optname) - 1, "#%s", optname);
ret = snprintf_s(def_optname, sizeof(def_optname), sizeof(def_optname) - 1, "#%s", optname, '\'');
securec_check_ss_c(ret, "\0", "\0");
lineno = find_gucoption(optlines, (const char*)def_optname, NULL, NULL, &offset, &len);
if (lineno != INVALID_LINES_IDX) {
rc = strncpy_s(optval, MAX_VALUE_LEN, optlines[lineno] + offset + 1, (size_t)(Min(len - 1, MAX_VALUE_LEN) - 1));
rc = strncpy_s(optval, MAX_VALUE_LEN, optlines[lineno] + offset, (size_t)(Min(len, MAX_VALUE_LEN)));
securec_check_c(rc, "", "");
return lineno;
}
@ -5074,6 +5189,8 @@ const char *BuildModeToString(BuildMode mode)
break;
case COPY_SECURE_FILES_BUILD:
return "copy secure files build";
case BUILD_CHECK:
return "build check";
default:
return "unkwon";
break;
@ -6358,6 +6475,8 @@ int main(int argc, char** argv)
} else if (strcmp(optarg, "copy_upgrade_file") == 0) {
build_mode = COPY_SECURE_FILES_BUILD;
need_copy_upgrade_file = true;
} else if (strcmp(optarg, "check") == 0) {
build_mode = BUILD_CHECK;
}
break;
}
@ -6692,7 +6811,7 @@ int main(int argc, char** argv)
MIN_INSTANCEID, MAX_INSTANCEID );
goto Error;
}
instance_config.dss.instance_id = atoi(optarg);
ss_instance_config.dss.instance_id = atoi(optarg);
break;
case 1:
clear_backup_dir = true;
@ -6725,9 +6844,9 @@ int main(int argc, char** argv)
FREE_AND_RESET(vgdata);
FREE_AND_RESET(vgdata);
parse_vgname_args(optarg);
instance_config.dss.vgname = xstrdup(vgname);
instance_config.dss.vgdata = xstrdup(vgdata);
instance_config.dss.vglog = xstrdup(vglog);
ss_instance_config.dss.vgname = xstrdup(vgname);
ss_instance_config.dss.vgdata = xstrdup(vgdata);
ss_instance_config.dss.vglog = xstrdup(vglog);
break;
}
case 6:{
@ -6737,11 +6856,11 @@ int main(int argc, char** argv)
goto Error;
}
socketpath = xstrdup(optarg);
instance_config.dss.socketpath = xstrdup(optarg);
ss_instance_config.dss.socketpath = xstrdup(optarg);
break;
}
case 7:
instance_config.dss.enable_dss = true;
ss_instance_config.dss.enable_dss = true;
break;
case 8:{
check_input_for_security(optarg);
@ -6918,21 +7037,25 @@ int main(int argc, char** argv)
do_wait = false;
}
if (instance_config.dss.enable_dss) {
enable_dss = ss_read_config();
if (ss_instance_config.dss.enable_dss) {
// dss device init
if (dss_device_init(instance_config.dss.socketpath,
instance_config.dss.enable_dss) != DSS_SUCCESS) {
if (dss_device_init(ss_instance_config.dss.socketpath,
ss_instance_config.dss.enable_dss) != DSS_SUCCESS) {
pg_log(PG_WARNING, _("failed to init dss device\n"));
goto Error;
}
/* Prepare some g_datadir parameters */
g_datadir.instance_id = instance_config.dss.instance_id;
g_datadir.instance_id = ss_instance_config.dss.instance_id;
errno_t rc = strcpy_s(g_datadir.dss_data, strlen(instance_config.dss.vgdata) + 1, instance_config.dss.vgdata);
errno_t rc = strcpy_s(g_datadir.dss_data, strlen(ss_instance_config.dss.vgname) + 1, ss_instance_config.dss.vgname);
securec_check_c(rc, "\0", "\0");
rc = strcpy_s(g_datadir.dss_log, strlen(instance_config.dss.vglog) + 1, instance_config.dss.vglog);
if (ss_instance_config.dss.vglog == NULL) {
ss_instance_config.dss.vglog = ss_instance_config.dss.vgname;
}
rc = strcpy_s(g_datadir.dss_log, strlen(ss_instance_config.dss.vglog) + 1, ss_instance_config.dss.vglog);
securec_check_c(rc, "\0", "\0");
/* The default of XLogSegmentSize was set 16M during configure, we reassign 1G to XLogSegmentSize
@ -6940,7 +7063,7 @@ int main(int argc, char** argv)
XLogSegmentSize = DSS_XLOG_SEG_SIZE;
}
initDataPathStruct(instance_config.dss.enable_dss);
initDataPathStruct(ss_instance_config.dss.enable_dss);
SetConfigFilePath();
@ -7057,6 +7180,11 @@ int main(int argc, char** argv)
_("gs_ctl copy secure files from remote build ,datadir is %s,conn_str is \'%s\'\n"),
pg_data,
conn_str);
} else if (build_mode == BUILD_CHECK) {
pg_log(PG_PROGRESS,
_("gs_ctl build check ,datadir is %s,conn_str is \'%s\'\n"),
pg_data,
conn_str);
} else {
pg_log(PG_PROGRESS,
_("gs_ctl incremental build ,datadir is %s,conn_str is \'%s\'\n"),
@ -7076,6 +7204,10 @@ int main(int argc, char** argv)
pg_log(PG_PROGRESS,
_("gs_ctl full backup to obs ,datadir is %s\n"),
pg_data);
} else if (build_mode == BUILD_CHECK) {
pg_log(PG_PROGRESS,
_("gs_ctl build check ,datadir is %s\n"),
pg_data);
} else {
pg_log(PG_PROGRESS,
_("gs_ctl incremental build ,datadir is %s\n"),
@ -7250,11 +7382,11 @@ static int get_instance_id(void)
static int ss_get_primary_id(void)
{
if (instance_config.dss.socketpath == NULL) {
if (ss_instance_config.dss.socketpath == NULL) {
return -1;
}
if (instance_config.dss.vgname == NULL) {
if (ss_instance_config.dss.vgname == NULL) {
return -1;
}
@ -7266,10 +7398,10 @@ static int ss_get_primary_id(void)
err = memset_s(control_file_path, MAXPGPATH, 0, MAXPGPATH);
securec_check_c(err, "\0", "\0");
err = snprintf_s(control_file_path, MAXPGPATH, MAXPGPATH - 1, "%s/pg_control", instance_config.dss.vgname);
err = snprintf_s(control_file_path, MAXPGPATH, MAXPGPATH - 1, "%s/pg_control", ss_instance_config.dss.vgname);
securec_check_ss_c(err, "\0", "\0");
if (dss_device_init(instance_config.dss.socketpath, true) != DSS_SUCCESS) {
if (dss_device_init(ss_instance_config.dss.socketpath, true) != DSS_SUCCESS) {
pg_log(PG_WARNING, _("failed to init dss device\n"));
exit(1);
}
@ -7307,12 +7439,13 @@ static int ss_get_primary_id(void)
}
/*
* read dorado config, if it is dorado standby cluster,
* we will get ss_dss_conn_path and ss_dss_vg_name.
* read ss config, return enable_dss
* we will get ss_enable_dss, ss_dss_conn_path and ss_dss_vg_name.
*/
static bool ss_read_dorado_config(void)
bool ss_read_config(void)
{
char config_file[MAXPGPATH] = {0};
char enable_dss[MAXPGPATH] = {0};
char** optlines = NULL;
int ret = EOK;
@ -7320,19 +7453,19 @@ static bool ss_read_dorado_config(void)
securec_check_ss_c(ret, "\0", "\0");
config_file[MAXPGPATH - 1] = '\0';
optlines = readfile(config_file);
char cluster_run_mode[MAXPGPATH] = {0};
(void)find_guc_optval((const char**)optlines, "cluster_run_mode", cluster_run_mode);
(void)find_guc_optval((const char**)optlines, "ss_enable_dss", enable_dss);
/* this is not dorado cluster_standby, wo do not need to do anythiny else */
if(strncmp(cluster_run_mode, "cluster_standby", sizeof("cluster_standby")) != 0) {
/* this is not enable_dss, wo do not need to do anythiny else */
if(strncmp(enable_dss, "on", sizeof("on")) != 0) {
return false;
}
instance_config.dss.socketpath = (char*)malloc(sizeof(char) * MAXPGPATH);
instance_config.dss.vgname = (char*)malloc(sizeof(char) * MAXPGPATH);
(void)find_guc_optval((const char**)optlines, "ss_dss_conn_path", instance_config.dss.socketpath);
(void)find_guc_optval((const char**)optlines, "ss_dss_vg_name", (char*)instance_config.dss.vgname);
ss_instance_config.dss.enable_dss = true;
ss_instance_config.dss.socketpath = (char*)malloc(sizeof(char) * MAXPGPATH);
ss_instance_config.dss.vgname = (char*)malloc(sizeof(char) * MAXPGPATH);
(void)find_guc_optval((const char**)optlines, "ss_dss_conn_path", ss_instance_config.dss.socketpath);
(void)find_guc_optval((const char**)optlines, "ss_dss_vg_name", ss_instance_config.dss.vgname);
freefile(optlines);
optlines = NULL;
return true;

View File

@ -26,6 +26,7 @@
#include "catalog/catalog.h"
#include "PageCompression.h"
#include "catalog/pg_type.h"
#include "storage/file/fio_device.h"
PGconn* conn = NULL;
char source_slot_name[NAMEDATALEN] = {0};

View File

@ -26,6 +26,7 @@
#include "common/fe_memutils.h"
#include "common/build_query/build_query.h"
#include "replication/replicainternal.h"
#include "storage/file/fio_device.h"
#include <memory>
#define BLOCKSIZE (8 * 1024)

View File

@ -138,10 +138,12 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
#ifdef HAVE_INT64_TIMESTAMP
#define TIME_COUNT 60000000
#else
#define TIME_COUNT 60
#define TIME_COUNT 300
#endif
XLogRecPtr max_lsn;
char returnmsg[MAX_ERR_MSG_LENTH] = {0};
char dssdirdata[MAXPGPATH] = {0};
char* dssdir = dssdirdata;
pg_crc32 maxLsnCrc = 0;
XLogRecord* record = NULL;
XLogRecPtr searchptr;
@ -154,10 +156,17 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
TimestampTz start_time;
TimestampTz current_time;
if (ss_instance_config.dss.enable_dss) {
ret = snprintf_s(dssdirdata, MAXPGPATH, MAXPGPATH - 1, "%s/%s%d", ss_instance_config.dss.vgname, XLOGDIR, ss_instance_config.dss.instance_id);
securec_check_ss_c(ret, "", "");
} else {
dssdir = NULL;
}
/*
* local max lsn must be exists, or change to full build.
*/
max_lsn = FindMaxLSN(datadir_target, returnmsg, XLOG_READER_MAX_MSGLENTH, &maxLsnCrc);
max_lsn = FindMaxLSN(datadir_target, returnmsg, XLOG_READER_MAX_MSGLENTH, &maxLsnCrc, NULL, NULL, dssdir);
if (XLogRecPtrIsInvalid(max_lsn)) {
pg_fatal("find max lsn fail, errmsg:%s\n", returnmsg);
return BUILD_FATAL;
@ -177,20 +186,21 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
securec_check_ss_c(ret, "\0", "\0");
get_conninfo(pg_conf_file);
searchptr = max_lsn;
start_time = localGetCurrentTimestamp();
current_time = start_time;
while (!XLogRecPtrIsInvalid(searchptr)) {
if (current_time - start_time >= TIME_COUNT) {
pg_log(PG_FATAL,
"try 60s, could not find any common checkpoint, change to full build\n");
"try 300s, could not find any common checkpoint, need to do full build\n");
XLogReaderFree(xlogreader);
CloseXlogFile();
return BUILD_FATAL;
}
uint8 info;
record = XLogReadRecord(xlogreader, searchptr, &errormsg);
record = XLogReadRecord(xlogreader, searchptr, &errormsg, true, dssdir);
if (record == NULL) {
if (errormsg != NULL) {
pg_fatal("could not find previous WAL record at %X/%X: %s\n",
@ -245,7 +255,7 @@ BuildErrorCode findCommonCheckpoint(const char* datadir, TimeLineID tli, XLogRec
PG_CHECKBUILD_AND_RETURN();
/* no common checkpoint between target and source, need full build */
if (XLogRecPtrIsInvalid(searchptr)) {
pg_log(PG_FATAL, "could not find any common checkpoint, change to full build\n");
pg_log(PG_FATAL, "could not find any common checkpoint, must to do full build\n");
return BUILD_FATAL;
}
return BUILD_SUCCESS;

View File

@ -22,6 +22,7 @@
#include "fetch.h"
#include "file_ops.h"
#include "logging.h"
#include "storage/file/fio_device.h"
#include "access/xlog_internal.h"
#include "catalog/catversion.h"
@ -931,3 +932,130 @@ static BuildErrorCode TruncateAndRemoveXLog(XLogRecPtr endPtr, uint32 timeLine)
return BUILD_SUCCESS;
}
BuildErrorCode do_build_check(const char* pgdata, const char* connstr, char* sysidentifier, uint32 timeline, uint32 term)
{
TimeLineID lastcommontli;
XLogRecPtr chkptrec = InvalidXLogRecPtr;
TimeLineID chkpttli;
XLogRecPtr chkptredo = InvalidXLogRecPtr;
size_t size = 0;
char* buffer = NULL;
XLogRecPtr startrec;
errno_t errorno = EOK;
BuildErrorCode rv = BUILD_SUCCESS;
datadir_target = pg_strdup(pgdata);
if (connstr_source == NULL) {
connstr_source = pg_strdup(connstr);
}
if (connstr_source == NULL) {
pg_log(PG_WARNING, "%s: no source specified (--source-server)\n", progname);
pg_log(PG_WARNING, "Try \"%s --help\" for more information.\n", progname);
return BUILD_ERROR;
}
if (datadir_target == NULL) {
pg_log(PG_WARNING, "%s: no target data directory specified (--target-pgdata)\n", progname);
pg_log(PG_WARNING, "Try \"%s --help\" for more information.\n", progname);
return BUILD_ERROR;
}
if (term > PG_UINT32_MAX) {
pg_log(PG_PROGRESS, "%s: unexpected term specified\n", progname);
pg_log(PG_PROGRESS, "Try \"%s --help\" for more information.\n", progname);
return BUILD_ERROR;
}
/*
* Don't allow pg_rewind to be run as root, to avoid overwriting the
* ownership of files in the data directory. We need only check for root
* -- any other user won't have sufficient permissions to modify files in
* the data directory.
*/
if (geteuid() == 0) {
pg_log(PG_PROGRESS, "cannot be executed by \"root\"\n");
pg_log(PG_PROGRESS, "You must run %s as the PostgreSQL superuser.\n", progname);
exit(1);
}
/* Can't start new building until restore process success. */
if (is_in_restore_process(datadir_target)) {
pg_log(PG_PROGRESS,
"%s: last restore process hasn't completed, "
"can't start new building.\n",
progname);
return BUILD_ERROR;
}
/* Connect to remote server */
rv = libpqConnect(connstr_source);
PG_CHECKRETURN_AND_RETURN(rv);
rv = libpqGetParameters();
PG_CHECKRETURN_AND_RETURN(rv);
pg_log(PG_PROGRESS, "connect to primary success\n");
/*
* Ok, we have all the options and we're ready to start. Read in all the
* information we need from both clusters.
*/
buffer = slurpFile(ss_instance_config.dss.vgname, "pg_control", &size);
PG_CHECKBUILD_AND_RETURN();
digestControlFile(&ControlFile_target, (const char*)buffer);
pg_free(buffer);
buffer = NULL;
PG_CHECKBUILD_AND_RETURN();
pg_log(PG_PROGRESS,
"find last checkpoint at %X/%X and checkpoint redo at %X/%X from target control file\n",
(uint32)(ControlFile_target.checkPoint >> 32),
(uint32)(ControlFile_target.checkPoint),
(uint32)(ControlFile_target.checkPointCopy.redo >> 32),
(uint32)(ControlFile_target.checkPointCopy.redo));
buffer = fetchFile("+data/pg_control", &size);
PG_CHECKBUILD_AND_RETURN();
digestControlFile(&ControlFile_source, buffer);
pg_free(buffer);
buffer = NULL;
PG_CHECKBUILD_AND_RETURN();
pg_log(PG_PROGRESS, "get primary pg_control success\n");
/* Check if rewind can be performed */
rv = sanityChecks();
PG_CHECKRETURN_AND_RETURN(rv);
pg_log(PG_PROGRESS, "sanityChecks success\n");
lastcommontli = ControlFile_target.checkPointCopy.ThisTimeLineID;
pg_log(PG_PROGRESS,
"find last checkpoint at %X/%X and checkpoint redo at %X/%X from source control file\n",
(uint32)(ControlFile_source.checkPoint >> 32),
(uint32)(ControlFile_source.checkPoint),
(uint32)(ControlFile_source.checkPointCopy.redo >> 32),
(uint32)(ControlFile_source.checkPointCopy.redo));
/* Find the common checkpoint locaiton */
startrec = ControlFile_source.checkPoint <= ControlFile_target.checkPoint ?
ControlFile_source.checkPoint : ControlFile_target.checkPoint;
rv = findCommonCheckpoint(datadir_target, lastcommontli, startrec, &chkptrec, &chkpttli, &chkptredo, term);
PG_CHECKRETURN_AND_RETURN(rv);
pg_log(PG_PROGRESS, "find diverge point success\n");
if (chkptrec == ControlFile_target.checkPoint) {
pg_log(PG_PROGRESS, "do not need to build\n");
} else {
pg_log(PG_PROGRESS, "need to do incremental build\n");
}
/* Disconnect from remote server */
if (connstr_source != NULL) {
libpqDisconnect();
}
if (datadir_target != NULL) {
free(datadir_target);
datadir_target = NULL;
}
return BUILD_SUCCESS;
}

View File

@ -67,6 +67,7 @@ extern BuildErrorCode targetFilemapProcess(void);
void recordReadTest(const char* datadir, XLogRecPtr ptr, TimeLineID tli);
void openDebugLog(void);
bool FindConfirmedLSN(const char* dataDir, XLogRecPtr *confirmedLsn);
BuildErrorCode do_build_check(const char* pgdata, const char* connstr, char* sysidentifier, uint32 timeline, uint32 term);
BuildErrorCode CheckConfirmedLSNOnTarget(const char *datadir, TimeLineID tli, XLogRecPtr ckptRedo, XLogRecPtr confirmedLSN,
uint32 term);
bool CheckIfEanbedSaveSlots();

View File

@ -25,6 +25,18 @@
#include "securec_check.h"
#include "tool_common.h"
SSInstanceConfig ss_instance_config = {
.dss = {
.enable_dss = false,
.instance_id = -1,
.primaryInstId = -1,
.vgname = NULL,
.vglog = NULL,
.vgdata = NULL,
.socketpath = NULL,
},
};
datadir_t g_datadir; /* need init when used in first time */
static void initFileDataPathStruct(datadir_t *dataDir);

View File

@ -33,6 +33,7 @@
#include "utils/memutils.h"
#include "utils/elog.h"
#include "ddes/dms/ss_dms_recovery.h"
#include "storage/file/fio_device.h"
typedef struct XLogPageReadPrivate {
const char *datadir;
@ -1268,7 +1269,11 @@ tryAgain:
securec_check_ss_c(ss_c, "", "");
#endif
ss_c = snprintf_s(xlogfpath, MAXPGPATH, MAXPGPATH - 1, "%s/" XLOGDIR "/%s", readprivate->datadir, xlogfname);
if (xlog_path != NULL) {
ss_c = snprintf_s(xlogfpath, MAXPGPATH, MAXPGPATH - 1, "%s/%s", xlog_path, xlogfname);
} else {
ss_c = snprintf_s(xlogfpath, MAXPGPATH, MAXPGPATH - 1, "%s/" XLOGDIR "/%s", readprivate->datadir, xlogfname);
}
#ifndef FRONTEND
securec_check_ss(ss_c, "", "");
#else
@ -1313,8 +1318,8 @@ tryAgain:
return XLOG_BLCKSZ;
}
XLogRecPtr FindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc, uint32 *maxLsnLen,
TimeLineID *returnTli)
XLogRecPtr FindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *maxLsnCrc,
uint32 *maxLsnLen, TimeLineID *returnTli, char* xlog_path)
{
DIR *xlogDir = NULL;
struct dirent *dirEnt = NULL;
@ -1336,7 +1341,12 @@ XLogRecPtr FindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *
uint32 xlogReadLogSeg = -1;
errno_t rc = EOK;
rc = snprintf_s(xlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s/%s", workingPath, XLOGDIR);
if (xlog_path != NULL) {
rc = snprintf_s(xlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s", xlog_path);
} else {
rc = snprintf_s(xlogDirStr, MAXPGPATH, MAXPGPATH - 1, "%s/%s", workingPath, XLOGDIR);
}
#ifndef FRONTEND
securec_check_ss(rc, "", "");
#else
@ -1410,7 +1420,7 @@ XLogRecPtr FindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *
startLsn = (xlogReadLogSeg * XLogSegSize) + ((XLogRecPtr)xlogReadLogid * XLogSegmentsPerXLogId * XLogSegSize);
while (!XLogRecPtrIsInvalid(startLsn)) {
/* find the first valid record from the bigger xlogrecord. then break */
curLsn = XLogFindNextRecord(xlogReader, startLsn);
curLsn = XLogFindNextRecord(xlogReader, startLsn, NULL, xlogDirStr);
if (XLogRecPtrIsInvalid(curLsn)) {
if (xlogreadfd > 0) {
close(xlogreadfd);
@ -1446,7 +1456,7 @@ XLogRecPtr FindMaxLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *
/* find the max lsn. */
do {
record = XLogReadRecord(xlogReader, curLsn, &errorMsg);
record = XLogReadRecord(xlogReader, curLsn, &errorMsg, true, xlogDirStr);
if (record == NULL) {
break;
}

View File

@ -57,7 +57,7 @@ extern void XLogReaderInvalReadState(XLogReaderState* state);
extern XLogRecPtr XLogFindNextRecord(XLogReaderState* state, XLogRecPtr RecPtr, XLogRecPtr *endPtr = NULL, char* xlog_path = NULL);
extern XLogRecPtr FindMaxLSN(char* workingpath, char* returnmsg, int msg_len, pg_crc32* maxLsnCrc,
uint32 *maxLsnLen = NULL, TimeLineID *returnTli = NULL);
uint32 *maxLsnLen = NULL, TimeLineID *returnTli = NULL, char* xlog_path = NULL);
extern XLogRecPtr FindMinLSN(char *workingPath, char *returnMsg, int msgLen, pg_crc32 *minLsnCrc);
extern void CloseXlogFile(void);
extern int SimpleXLogPageRead(XLogReaderState* xlogreader, XLogRecPtr targetPagePtr, int reqLen,

View File

@ -104,7 +104,8 @@ typedef enum {
COPY_SECURE_FILES_BUILD,
CROSS_CLUSTER_FULL_BUILD,
CROSS_CLUSTER_INC_BUILD,
CROSS_CLUSTER_STANDBY_FULL_BUILD
CROSS_CLUSTER_STANDBY_FULL_BUILD,
BUILD_CHECK
} BuildMode;
typedef struct buildstate {

View File

@ -105,23 +105,22 @@ typedef struct st_datadir_t {
dw_subdatadir_t dwDir;
} datadir_t;
typedef struct DssOptions
{
/* DSS conntct parameters */
typedef struct DssOptions {
bool enable_dss;
int instance_id;
const char *vgname;
int primaryInstId;
char *vgname;
char *vglog;
char *vgdata;
char *socketpath;
int primaryInstId;
} DssOptions;
typedef struct SSInstanceConfig
{
/* DSS conntct parameters */
typedef struct SSInstanceConfig {
DssOptions dss;
} SSInstanceConfig;
extern SSInstanceConfig ss_instance_config;
extern datadir_t g_datadir;
void initDataPathStruct(bool enable_dss);