!121 gs_basebackup support format tar (mot is not supported currently)

Merge pull request !121 from 吴岳川/gs_basebackup
This commit is contained in:
opengauss-bot
2020-08-25 19:24:50 +08:00
committed by Gitee
16 changed files with 1750 additions and 520 deletions

View File

@ -34,7 +34,8 @@ endif
OBJS=receivelog.o streamutil.o $(WIN32RES) $(top_builddir)/src/lib/elog/elog.a $(top_builddir)/src/bin/pg_ctl/fetchmot.o \
xlogreader.o xlogreader_common.o $(WIN32RES) $(top_builddir)/src/lib/elog/elog.a $(top_builddir)/src/lib/build_query/libbuildquery.a \
$(top_builddir)/src/lib/pgcommon/libpgcommon.a \
$(top_builddir)/src/lib/hotpatch/client/libhotpatchclient.a
$(top_builddir)/src/lib/hotpatch/client/libhotpatchclient.a \
$(top_builddir)/src/common/backend/lib/string.o
all: gs_basebackup pg_receivexlog pg_recvlogical
@ -60,15 +61,19 @@ install: all installdirs
$(INSTALL_PROGRAM) gs_basebackup$(X) '$(DESTDIR)$(bindir)/gs_basebackup$(X)'
$(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
$(INSTALL_PROGRAM) pg_recvlogical$(X) '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
rm -f '$(DESTDIR)$(bindir)/gs_tar$(X)'
ln -s gs_basebackup$(X) '$(DESTDIR)$(bindir)/gs_tar$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/gs_tar$(X)'
rm -f '$(DESTDIR)$(bindir)/gs_basebackup$(X)'
rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
rm -f '$(DESTDIR)$(bindir)/pg_recvlogical$(X)'
clean distclean maintainer-clean:
rm -f gs_basebackup$(X) pg_receivexlog$(X) pg_recvlogical$(X) $(OBJS) \
rm -f gs_basebackup$(X) pg_receivexlog$(X) pg_recvlogical$(X) gs_tar$(X) $(OBJS) \
pg_basebackup.o pg_receivexlog.o pg_recvlogical.o *.depend
# Be sure that the necessary archives are compiled

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,6 @@ ifneq "$(MAKECMDGOALS)" "clean"
endif
endif
endif
OBJS = binaryheap.o ilist.o dllist.o stringinfo.o bipartite_match.o hyperloglog.o
OBJS = binaryheap.o ilist.o dllist.o stringinfo.o bipartite_match.o hyperloglog.o string.o
include $(top_srcdir)/src/gausskernel/common.mk

View File

@ -0,0 +1,39 @@
/*-------------------------------------------------------------------------
*
* string.c
* string handling helpers
*
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* IDENTIFICATION
* src/common/string.c
*
* -------------------------------------------------------------------------
*/
#ifndef FRONTEND
#include "postgres.h"
#else
#include "postgres_fe.h"
#endif
#include "lib/string.h"
/*
* Returns whether the string `str' has the postfix `end'.
*/
bool pg_str_endswith(const char* str, const char* end)
{
size_t slen = strlen(str);
size_t elen = strlen(end);
/* can't be a postfix if longer */
if (elen > slen) {
return false;
}
/* compare the end of the strings */
str += slen - elen;
return strcmp(str, end) == 0;
}

View File

@ -110,12 +110,12 @@ static void CheckTablespaceOptions(const Oid tablespaceOid, Datum options);
Datum CanonicalizeTablespaceOptions(Datum datum);
#define CANONICALIZE_PATH(path) \
do { \
if (NULL != (path)) { \
path = pstrdup(path); \
canonicalize_path(path); \
} \
#define CANONICALIZE_PATH(path) \
do { \
if (NULL != (path)) { \
path = pstrdup(path); \
canonicalize_path(path); \
} \
} while (0)
/*
@ -344,7 +344,7 @@ static bool parse_maxsize(const char* value, int64* result, const char** hintmsg
endptr += 1;
val *= KB_PER_TB;
} else if (*endptr == 'P'|| *endptr == 'p') {
} else if (*endptr == 'P' || *endptr == 'p') {
if (val > MAX_PB_VALUE) {
appendStringInfo(&buf, "Value exceeds max size %ld with unit PB", MAX_PB_VALUE);
*hintmsg = buf.data;
@ -1141,7 +1141,8 @@ static void create_tablespace_directories(const char* location, const Oid tables
(errcode(ERRCODE_UNDEFINED_FILE),
errmsg("directory \"%s\" does not exist", location),
t_thrd.xlog_cxt.InRecovery ? errhint("Create this directory for the tablespace before "
"restarting the server.") : 0));
"restarting the server.")
: 0));
else
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not set permissions on directory \"%s\": %m", location)));
@ -1326,7 +1327,7 @@ void CreateExternalDirectories(const Oid tablespaceOid, Datum options)
srvOptions = (DfsSrvOptions*)palloc0(sizeof(DfsSrvOptions));
srvOptions->filesystem = DatumGetTablespaceOptionValue(options, TABLESPACE_OPTION_FILESYSTEM);
if (srvOptions->filesystem == NULL||
if (srvOptions->filesystem == NULL ||
0 != pg_strncasecmp(srvOptions->filesystem, FILESYSTEM_HDFS, strlen(srvOptions->filesystem))) {
pfree_ext(srvOptions);
return;
@ -1712,6 +1713,47 @@ bool directory_is_empty(const char* path)
return true;
}
/*
* remove_tablespace_symlink
*
* This function removes symlinks in pg_tblspc. On Windows, junction points
* act like directories so we must be able to apply rmdir. This function
* works like the symlink removal code in destroy_tablespace_directories,
* except that failure to remove is always an ERROR. But if the file doesn't
* exist at all, that's OK.
*/
void remove_tablespace_symlink(const char* linkloc)
{
struct stat st;
if (lstat(linkloc, &st) < 0) {
if (errno == ENOENT)
return;
ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", linkloc)));
}
if (S_ISDIR(st.st_mode)) {
/*
* This will fail if the directory isn't empty, but not if it's a
* junction point.
*/
if (rmdir(linkloc) < 0 && errno != ENOENT)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove directory \"%s\": %m", linkloc)));
}
#ifdef S_ISLNK
else if (S_ISLNK(st.st_mode)) {
if (unlink(linkloc) < 0 && errno != ENOENT)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not remove symbolic link \"%s\": %m", linkloc)));
}
#endif
else {
/* Refuse to remove anything that's not a directory or symlink */
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("\"%s\" is not a directory or symbolic link", linkloc)));
}
}
/*
* Rename a tablespace
*/
@ -2551,12 +2593,10 @@ void xlog_create_tblspc(Oid ts_id, char* ts_path, bool isRelativePath)
errno_t rc = EOK;
if (t_thrd.proc_cxt.DataDir[strlen(t_thrd.proc_cxt.DataDir)] == '/') {
rc = snprintf_s(
location, len, len - 1, "%s%s/%s", t_thrd.proc_cxt.DataDir, PG_LOCATION_DIR, ts_path);
rc = snprintf_s(location, len, len - 1, "%s%s/%s", t_thrd.proc_cxt.DataDir, PG_LOCATION_DIR, ts_path);
securec_check_ss(rc, "\0", "\0");
} else {
rc = snprintf_s(
location, len, len - 1, "%s/%s/%s", t_thrd.proc_cxt.DataDir, PG_LOCATION_DIR, ts_path);
rc = snprintf_s(location, len, len - 1, "%s/%s/%s", t_thrd.proc_cxt.DataDir, PG_LOCATION_DIR, ts_path);
securec_check_ss(rc, "\0", "\0");
}
}
@ -2621,8 +2661,8 @@ void tblspc_redo(XLogReaderState* record)
errhint("You can remove the directories manually if necessary.")));
}
} else {
ereport(PANIC,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("tblspc_redo: unknown op code %u", (uint)info)));
ereport(
PANIC, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("tblspc_redo: unknown op code %u", (uint)info)));
}
t_thrd.xlog_cxt.needImmediateCkp = true;
@ -2718,7 +2758,7 @@ inline void TableSpaceUsageManager::ResetUsageSlot(TableSpaceUsageSlot* info)
* 2. the bucket must have been locked
*/
inline void TableSpaceUsageManager::ResetBucket(TableSpaceUsageBucket* bucket)
{
{
for (int counter = 0; counter < TABLESPACE_BUCKET_CONFLICT_LISTLEN; counter++) {
TableSpaceUsageManager::ResetUsageSlot(&bucket->spcUsage[counter]);
}

View File

@ -1,7 +1,7 @@
/* -------------------------------------------------------------------------
*
* xlog.cpp
* PostgreSQL transaction log manager
* PostgreSQL transaction log manager
*
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
@ -54,6 +54,8 @@
#include "access/xlogproc.h"
#include "access/parallel_recovery/dispatcher.h"
#include "commands/tablespace.h"
#include "catalog/catalog.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
@ -144,6 +146,7 @@ const char* DemoteModeDescs[] = {"unknown", "smart", "fast", "immediate"};
const int DemoteModeNum = sizeof(DemoteModeDescs) / sizeof(char*);
static const uint64 REDO_SPEED_LOG_LEN = (XLogSegSize * 16);
static const int PG_TBLSPCS = 10; /* strlen(pg_tblspcs/) */
THR_LOCAL bool redo_oldversion_xlog = false;
@ -153,7 +156,7 @@ THR_LOCAL bool redo_oldversion_xlog = false;
* future XLOG segment as long as there aren't already XLOGfileslop future
* segments; else we'll delete it. This could be made a separate GUC
* variable, but at present I think it's sufficient to hardwire it as
* 2*CheckPointSegments+1. Under normal conditions, a checkpoint will free
* 2*CheckPointSegments+1. Under normal conditions, a checkpoint will free
* no more than 2*CheckPointSegments log segments, and we want to recycle all
* of them; the +1 allows boundary cases to happen without wasting a
* delete/create-segment cycle.
@ -163,22 +166,20 @@ THR_LOCAL bool redo_oldversion_xlog = false;
/*
* GUC support
*/
struct config_enum_entry sync_method_options[] = {
{ "fsync", SYNC_METHOD_FSYNC, false },
struct config_enum_entry sync_method_options[] = {{"fsync", SYNC_METHOD_FSYNC, false},
#ifdef HAVE_FSYNC_WRITETHROUGH
{ "fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false },
{"fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false},
#endif
#ifdef HAVE_FDATASYNC
{ "fdatasync", SYNC_METHOD_FDATASYNC, false },
{"fdatasync", SYNC_METHOD_FDATASYNC, false},
#endif
#ifdef OPEN_SYNC_FLAG
{ "open_sync", SYNC_METHOD_OPEN, false },
{"open_sync", SYNC_METHOD_OPEN, false},
#endif
#ifdef OPEN_DATASYNC_FLAG
{ "open_datasync", SYNC_METHOD_OPEN_DSYNC, false },
{"open_datasync", SYNC_METHOD_OPEN_DSYNC, false},
#endif
{ NULL, 0, false }
};
{NULL, 0, false}};
XLogRecPtr latestValidRecord = InvalidXLogRecPtr;
XLogSegNo XlogRemoveSegPrimary = InvalidXLogSegPtr;
@ -536,6 +537,7 @@ static void xlog_outrec(StringInfo buf, XLogReaderState* record);
#endif
static void pg_start_backup_callback(int code, Datum arg);
static bool read_backup_label(XLogRecPtr* checkPointLoc, bool* backupEndRequired, bool* backupFromStandby);
static bool read_tablespace_map(List** tablespaces);
static int get_sync_bit(int method);
static void ResetSlotLSNEndRecovery(StringInfo slotname);
static void ShutdownReadFileFacility(void);
@ -1042,12 +1044,12 @@ static XLogRecPtr XLogInsertRecordSingle(XLogRecData* rdata, XLogRecPtr fpw_lsn,
* record to the shared WAL buffer cache is a two-step process:
*
* 1. Reserve the right amount of space from the WAL. The current head of
* reserved space is kept in Insert->CurrBytePos, and is protected by
* insertpos_lck.
* reserved space is kept in Insert->CurrBytePos, and is protected by
* insertpos_lck.
*
* 2. Copy the record to the reserved WAL space. This involves finding the
* correct WAL buffer containing the reserved space, and copying the
* record in place. This can be done concurrently in multiple processes.
* correct WAL buffer containing the reserved space, and copying the
* record in place. This can be done concurrently in multiple processes.
*
* To keep track of which insertions are still in-progress, each concurrent
* inserter acquires an insertion lock. In addition to just indicating that
@ -1683,7 +1685,7 @@ static void WALInsertLockAcquireExclusive(void)
int nNumaNodes = g_instance.shmem_cxt.numaNodeNum;
for (int processorIndex = 0; processorIndex < nNumaNodes; processorIndex++) {
int limit = ((processorIndex < (nNumaNodes - 1)) ? g_instance.xlog_cxt.num_locks_in_group
: (g_instance.xlog_cxt.num_locks_in_group - 1));
: (g_instance.xlog_cxt.num_locks_in_group - 1));
for (i = 0; i < limit; i++) {
WALInsertLock* pInsertLock = &t_thrd.shemem_ptr_cxt.GlobalWALInsertLocks[processorIndex][i].l;
LWLockAcquire(&pInsertLock->lock, LW_EXCLUSIVE);
@ -2731,7 +2733,7 @@ static void XLogWrite(const XLogwrtRqst& WriteRqst, bool flexible)
XLByteLT(t_thrd.xlog_cxt.LogwrtResult->Flush, t_thrd.xlog_cxt.LogwrtResult->Write)) {
/*
* Could get here without iterating above loop, in which case we might
* have no open file or the wrong one. However, we do not need to
* have no open file or the wrong one. However, we do not need to
* fsync more than one file.
*/
if (u_sess->attr.attr_storage.sync_method != SYNC_METHOD_OPEN &&
@ -2799,7 +2801,7 @@ void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
/*
* If the WALWriter is sleeping, we should kick it to make it come out of
* low-power mode. Otherwise, determine whether there's a full page of
* low-power mode. Otherwise, determine whether there's a full page of
* WAL available to write.
*/
if (!sleeping) {
@ -2900,7 +2902,7 @@ XLogRecPtr XLogGetReplicationSlotMaximumLSN(void)
/*
* Reset replication slot before promote to primary.
* We should free the slotname in caller.
* We should free the slotname in caller.
*/
static void ResetSlotLSNEndRecovery(StringInfo slotname)
{
@ -3233,7 +3235,7 @@ void XLogFlush(XLogRecPtr record, bool LogicalPage)
* the whole system due to corruption on one data page. In particular, if
* the bad page is encountered again during recovery then we would be
* unable to restart the database at all! (This scenario actually
* happened in the field several times with 7.1 releases.) As of 8.4, bad
* happened in the field several times with 7.1 releases.) As of 8.4, bad
* LSNs encountered during recovery are UpdateMinRecoveryPoint's problem;
* the only time we can reach here during recovery is while flushing the
* end-of-recovery checkpoint record, and we don't expect that to have a
@ -3324,9 +3326,9 @@ bool PreInitXlogFileInternal(XLogRecPtr requestLsn)
* We normally flush only completed blocks; but if there is nothing to do on
* that basis, we check for unflushed async commits in the current incomplete
* block, and flush through the latest one of those. Thus, if async commits
* are not being used, we will flush complete blocks only. We can guarantee
* are not being used, we will flush complete blocks only. We can guarantee
* that async commits reach disk after at most three cycles; normally only
* one or two. (When flushing complete blocks, we allow XLogWrite to write
* one or two. (When flushing complete blocks, we allow XLogWrite to write
* "flexibly", meaning it can stop at the end of the buffer ring; this makes a
* difference only with very high load or long wal_writer_delay, but imposes
* one extra cycle for the worst case for async commits.)
@ -3523,7 +3525,7 @@ bool XLogNeedsFlush(XLogRecPtr record)
* log, seg: identify segment to be created/opened.
*
* *use_existent: if TRUE, OK to use a pre-existing file (else, any
* pre-existing file will be deleted). On return, TRUE if a pre-existing
* pre-existing file will be deleted). On return, TRUE if a pre-existing
* file was used.
*
* use_lock: if TRUE, acquire ControlFileLock while moving file into
@ -3598,11 +3600,11 @@ int XLogFileInit(XLogSegNo logsegno, bool* use_existent, bool use_lock)
}
/*
* Zero-fill the file. We have to do this the hard way to ensure that all
* Zero-fill the file. We have to do this the hard way to ensure that all
* the file space has really been allocated --- on platforms that allow
* "holes" in files, just seeking to the end doesn't allocate intermediate
* space. This way, we know that we have all the space and (after the
* fsync below) that all the indirect blocks are down on disk. Therefore,
* fsync below) that all the indirect blocks are down on disk. Therefore,
* fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
* log file.
*
@ -3688,10 +3690,10 @@ int XLogFileInit(XLogSegNo logsegno, bool* use_existent, bool use_lock)
* log, seg: identify segment to be created.
*
* srcTLI, srclog, srcseg: identify segment to be copied (could be from
* a different timeline)
* a different timeline)
*
* Currently this is only used during recovery, and so there are no locking
* considerations. But we should be just as tense as XLogFileInit to avoid
* considerations. But we should be just as tense as XLogFileInit to avoid
* emplacing a bogus file.
*/
static void XLogFileCopy(XLogSegNo destsegno, TimeLineID srcTLI, XLogSegNo srcsegno)
@ -4033,7 +4035,7 @@ static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, uint32 sources)
* the timelines listed in expectedTLIs.
*
* We expect curFileTLI on entry to be the TLI of the preceding file in
* sequence, or 0 if there was no predecessor. We do not allow curFileTLI
* sequence, or 0 if there was no predecessor. We do not allow curFileTLI
* to go backwards; this prevents us from picking up the wrong file when a
* parent timeline extends to higher segment numbers than the child we
* want to read.
@ -4089,7 +4091,7 @@ static void XLogFileClose(void)
/*
* WAL segment files will not be re-read in normal operation, so we advise
* the OS to release any cached pages. But do not do so if WAL archiving
* the OS to release any cached pages. But do not do so if WAL archiving
* or streaming is active, because archiver and walsender process could
* use the cache to read the WAL segment.
*/
@ -4730,7 +4732,7 @@ static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr endptr)
/*
* We ignore the timeline part of the XLOG segment identifiers in
* deciding whether a segment is still needed. This ensures that we
* deciding whether a segment is still needed. This ensures that we
* won't prematurely remove a segment from a parent timeline. We could
* probably be a little more proactive about removing segments of
* non-parent timelines, but that would be a whole lot more
@ -5697,7 +5699,7 @@ XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader)
* Try to read a timeline's history file.
*
* If successful, return the list of component TLIs (the given TLI followed by
* its ancestor TLIs). If we can't find the history file, assume that the
* its ancestor TLIs). If we can't find the history file, assume that the
* timeline has no parents, and return a list of just the specified timeline
* ID.
*/
@ -5924,7 +5926,7 @@ static TimeLineID findNewestTimeLine(TimeLineID startTLI)
* I/O routines for pg_control
*
* *ControlFile is a buffer in shared memory that holds an image of the
* contents of pg_control. WriteControlFile() initializes pg_control
* contents of pg_control. WriteControlFile() initializes pg_control
* given a preloaded buffer, ReadControlFile() loads the buffer from
* the pg_control file (during postmaster or standalone-backend startup),
* and UpdateControlFile() rewrites pg_control after we modify xlog state.
@ -6436,7 +6438,7 @@ bool check_wal_buffers(int* newval, void** extra, GucSource source)
if (*newval == -1) {
/*
* If we haven't yet changed the boot_val default of -1, just let it
* be. We'll fix it when XLOGShmemSize is called.
* be. We'll fix it when XLOGShmemSize is called.
*/
if (g_instance.attr.attr_storage.XLOGbuffers == -1) {
return true;
@ -6526,8 +6528,8 @@ void XLOGShmemInit(void)
if (foundCFile || foundXLog) {
/* both should be present or neither */
if (unlikely(!(foundCFile && foundXLog))) {
ereport(PANIC, (errmsg("Control File(%d) and XLOG ctl(%d) should be present both.",
foundCFile, foundXLog)));
ereport(
PANIC, (errmsg("Control File(%d) and XLOG ctl(%d) should be present both.", foundCFile, foundXLog)));
}
/* Initialize local copy of WALInsertLocks and register the tranche */
@ -7039,7 +7041,7 @@ static void readRecoveryCommandFile(void)
/*
* If user specified recovery_target_timeline, validate it or compute the
* "latest" value. We can't do this until after we've gotten the restore
* "latest" value. We can't do this until after we've gotten the restore
* command and set InArchiveRecovery, because we need to fetch timeline
* history files from the archive.
*/
@ -7648,19 +7650,20 @@ int GetXLogReceiptSource()
* Note that text field supplied is a parameter name and does not require
* translation
*/
#define RecoveryRequiresIntParameter(param_name, currValue, minValue) do { \
if ((currValue) < (minValue)) { \
ereport(ERROR, \
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), \
errmsg("hot standby is not possible because " \
"%s = %d is a lower setting than on the master server " \
"(its value was %d)", \
param_name, \
currValue, \
minValue), \
errhint("You might need to increase the value of %s ", param_name))); \
} \
} while (0)
#define RecoveryRequiresIntParameter(param_name, currValue, minValue) \
do { \
if ((currValue) < (minValue)) { \
ereport(ERROR, \
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), \
errmsg("hot standby is not possible because " \
"%s = %d is a lower setting than on the master server " \
"(its value was %d)", \
param_name, \
currValue, \
minValue), \
errhint("You might need to increase the value of %s ", param_name))); \
} \
} while (0)
/*
* Check to see if required parameters are set high enough on this server
@ -7797,31 +7800,32 @@ void ResourceManagerStop(void)
}
}
#define RecoveryXlogReader(_oldXlogReader, _xlogreader) do { \
if (get_real_recovery_parallelism() > 1) { \
if (GetRedoWorkerCount() > 0) { \
errno_t errorno; \
(_oldXlogReader)->ReadRecPtr = (_xlogreader)->ReadRecPtr; \
(_oldXlogReader)->EndRecPtr = (_xlogreader)->EndRecPtr; \
(_oldXlogReader)->readSegNo = (_xlogreader)->readSegNo; \
(_oldXlogReader)->readOff = (_xlogreader)->readOff; \
(_oldXlogReader)->readLen = (_xlogreader)->readLen; \
(_oldXlogReader)->readPageTLI = (_xlogreader)->readPageTLI; \
(_oldXlogReader)->curReadOff = (_xlogreader)->curReadOff; \
(_oldXlogReader)->curReadSegNo = (_xlogreader)->curReadSegNo; \
(_oldXlogReader)->currRecPtr = (_xlogreader)->currRecPtr; \
(_oldXlogReader)->latestPagePtr = (_xlogreader)->latestPagePtr; \
(_oldXlogReader)->latestPageTLI = (_xlogreader)->latestPageTLI; \
(_oldXlogReader)->isPRProcess = false; \
errorno = memcpy_s( \
(_oldXlogReader)->readBuf, XLOG_BLCKSZ, (_xlogreader)->readBuf, (_oldXlogReader)->readLen); \
securec_check(errorno, "", ""); \
ResetDecoder(_oldXlogReader); \
(_xlogreader) = (_oldXlogReader); \
} \
EndDispatcherContext(); \
} \
} while (0)
#define RecoveryXlogReader(_oldXlogReader, _xlogreader) \
do { \
if (get_real_recovery_parallelism() > 1) { \
if (GetRedoWorkerCount() > 0) { \
errno_t errorno; \
(_oldXlogReader)->ReadRecPtr = (_xlogreader)->ReadRecPtr; \
(_oldXlogReader)->EndRecPtr = (_xlogreader)->EndRecPtr; \
(_oldXlogReader)->readSegNo = (_xlogreader)->readSegNo; \
(_oldXlogReader)->readOff = (_xlogreader)->readOff; \
(_oldXlogReader)->readLen = (_xlogreader)->readLen; \
(_oldXlogReader)->readPageTLI = (_xlogreader)->readPageTLI; \
(_oldXlogReader)->curReadOff = (_xlogreader)->curReadOff; \
(_oldXlogReader)->curReadSegNo = (_xlogreader)->curReadSegNo; \
(_oldXlogReader)->currRecPtr = (_xlogreader)->currRecPtr; \
(_oldXlogReader)->latestPagePtr = (_xlogreader)->latestPagePtr; \
(_oldXlogReader)->latestPageTLI = (_xlogreader)->latestPageTLI; \
(_oldXlogReader)->isPRProcess = false; \
errorno = memcpy_s( \
(_oldXlogReader)->readBuf, XLOG_BLCKSZ, (_xlogreader)->readBuf, (_oldXlogReader)->readLen); \
securec_check(errorno, "", ""); \
ResetDecoder(_oldXlogReader); \
(_xlogreader) = (_oldXlogReader); \
} \
EndDispatcherContext(); \
} \
} while (0)
static void EndRedoXlog()
{
@ -7847,6 +7851,7 @@ void StartupXLOG(void)
bool DBStateShutdown = false;
bool reachedStopPoint = false;
bool haveBackupLabel = false;
bool haveTblspcMap = false;
XLogRecPtr RecPtr, checkPointLoc, EndOfLog;
XLogSegNo endLogSegNo;
XLogRecord* record = NULL;
@ -7859,6 +7864,7 @@ void StartupXLOG(void)
bool RecoveryByPending = false; /* recovery caused by pending mode */
bool ArchiveRecoveryByPending = false; /* archive recovery caused by pending mode */
bool AbnormalShutdown = true;
struct stat st;
errno_t rcm = 0;
TransactionId latestCompletedXid;
bool wasCheckpoint = false;
@ -7943,7 +7949,7 @@ void StartupXLOG(void)
/* delete xlogtemp files. */
remove_xlogtemp_files();
/*
* Clear out any old relcache cache files. This is *necessary* if we do
* Clear out any old relcache cache files. This is *necessary* if we do
* any WAL replay, since that would probably result in the cache files
* being out of sync with database reality. In theory we could leave them
* in place if the database had been cleanly shut down, but it seems
@ -8058,6 +8064,7 @@ void StartupXLOG(void)
startupInitRoachBackup();
g_instance.comm_cxt.predo_cxt.redoPf.redo_start_time = GetCurrentTimestamp();
if (read_backup_label(&checkPointLoc, &backupEndRequired, &backupFromStandby)) {
List* tablespaces = NIL;
/*
* Archive recovery was requested, and thanks to the backup label file,
* we know how far we need to replay to reach consistency. Enter
@ -8110,9 +8117,63 @@ void StartupXLOG(void)
t_thrd.proc_cxt.DataDir)));
wasShutdown = false; /* keep compiler quiet */
}
/* read the tablespace_map file if present and create symlinks. */
if (read_tablespace_map(&tablespaces)) {
ListCell* lc;
errno_t rc = EOK;
foreach (lc, tablespaces) {
tablespaceinfo* ti = (tablespaceinfo*)lfirst(lc);
int length = PG_TBLSPCS + strlen(ti->oid) + 1;
char* linkloc = (char*)palloc0(length);
rc = snprintf_s(linkloc, length, length - 1, "pg_tblspc/%s", ti->oid);
securec_check_ss_c(rc, "", "");
/*
* Remove the existing symlink if any and Create the symlink
* under PGDATA.
*/
remove_tablespace_symlink(linkloc);
if (symlink(ti->path, linkloc) < 0) {
pfree(linkloc);
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not create symbolic link \"%s\": %m", linkloc)));
}
pfree(linkloc);
pfree(ti->oid);
pfree(ti->path);
pfree(ti);
}
/* set flag to delete it later */
haveTblspcMap = true;
}
/* set flag to delete it later */
haveBackupLabel = true;
} else {
/*
* If tablespace_map file is present without backup_label file, there
* is no use of such file. There is no harm in retaining it, but it
* is better to get rid of the map file so that we don't have any
* redundant file in data directory and it will avoid any sort of
* confusion. It seems prudent though to just rename the file out
* of the way rather than delete it completely, also we ignore any
* error that occurs in rename operation as even if map file is
* present without backup_label file, it is harmless.
*/
if (stat(TABLESPACE_MAP, &st) == 0) {
unlink(TABLESPACE_MAP_OLD);
if (durable_rename(TABLESPACE_MAP, TABLESPACE_MAP_OLD, DEBUG1) == 0)
ereport(LOG,
(errmsg("ignoring file \"%s\" because no file \"%s\" exists", TABLESPACE_MAP, BACKUP_LABEL_FILE),
errdetail("File \"%s\" was renamed to \"%s\".", TABLESPACE_MAP, TABLESPACE_MAP_OLD)));
else
ereport(LOG,
(errmsg("ignoring \"%s\" file because no \"%s\" file exists", TABLESPACE_MAP, BACKUP_LABEL_FILE),
errdetail("Could not rename file \"%s\" to \"%s\": %m.", TABLESPACE_MAP, TABLESPACE_MAP_OLD)));
}
/*
* It's possible that archive recovery was requested, but we don't
* know how far we need to replay the WAL before we reach consistency.
@ -8487,6 +8548,17 @@ void StartupXLOG(void)
unlink(BACKUP_LABEL_OLD);
durable_rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD, FATAL);
}
/*
* If there was a tablespace_map file, it's done its job and the
* symlinks have been created. We must get rid of the map file so
* that if we crash during recovery, we don't create symlinks again.
* It seems prudent though to just rename the file out of the way
* rather than delete it completely.
*/
if (haveTblspcMap) {
unlink(TABLESPACE_MAP_OLD);
durable_rename(TABLESPACE_MAP, TABLESPACE_MAP_OLD, FATAL);
}
/* Check that the GUCs used to generate the WAL allow recovery */
CheckRequiredParameterValues(DBStateShutdown);
@ -8890,7 +8962,7 @@ void StartupXLOG(void)
* recovery to force fetching the files (which would be required at end of
* recovery, e.g., timeline history file) from archive or pg_xlog.
*/
t_thrd.xlog_cxt.StandbyMode = false;
t_thrd.xlog_cxt.StandbyMode = false;
/*
* Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL.
@ -9184,7 +9256,7 @@ void StartupXLOG(void)
}
/*
* All done. Allow backends to write WAL. (Although the bool flag is
* All done. Allow backends to write WAL. (Although the bool flag is
* probably atomic in itself, we use the info_lck here to ensure that
* there are no race conditions concerning visibility of other recent
* updates to shared memory.)
@ -9378,7 +9450,7 @@ bool RecoveryInProgress(void)
/*
* Initialize TimeLineID and RedoRecPtr when we discover that recovery
* is finished. InitPostgres() relies upon this behaviour to ensure
* that InitXLOGAccess() is called at backend startup. (If you change
* that InitXLOGAccess() is called at backend startup. (If you change
* this, see also LocalSetXLogInsertAllowed.)
*/
if (!t_thrd.xlog_cxt.LocalRecoveryInProgress) {
@ -9639,8 +9711,7 @@ void GetFullPageWriteInfo(XLogFPWInfo* fpwInfo_p)
{
bool incremental = g_instance.attr.attr_storage.enableIncrementalCheckpoint;
fpwInfo_p->redoRecPtr = t_thrd.xlog_cxt.RedoRecPtr;
fpwInfo_p->doPageWrites = t_thrd.xlog_cxt.doPageWrites &&
(!incremental || (!dw_enabled() && incremental));
fpwInfo_p->doPageWrites = t_thrd.xlog_cxt.doPageWrites && (!incremental || (!dw_enabled() && incremental));
fpwInfo_p->forcePageWrites = t_thrd.shemem_ptr_cxt.XLogCtl->FpwBeforeFirstCkpt && !IsInitdb &&
(!incremental || (!dw_enabled() && incremental));
@ -9944,13 +10015,13 @@ void CkptWaitFlushIfneed(XLogRecPtr redoLsn)
* Perform a checkpoint --- either during shutdown, or on-the-fly
*
* flags is a bitwise OR of the following:
* CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
* CHECKPOINT_END_OF_RECOVERY: checkpoint is for end of WAL recovery.
* CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
* ignoring checkpoint_completion_target parameter.
* CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occurred
* since the last one (implied by CHECKPOINT_IS_SHUTDOWN or
* CHECKPOINT_END_OF_RECOVERY).
* CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
* CHECKPOINT_END_OF_RECOVERY: checkpoint is for end of WAL recovery.
* CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
* ignoring checkpoint_completion_target parameter.
* CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occurred
* since the last one (implied by CHECKPOINT_IS_SHUTDOWN or
* CHECKPOINT_END_OF_RECOVERY).
*
* Note: flags contains other bits, of interest here only for logging purposes.
* In particular note that this routine is synchronous and does not pay
@ -10071,7 +10142,7 @@ void CreateCheckPoint(int flags)
/*
* If this isn't a shutdown or forced checkpoint, and we have not inserted
* any XLOG records since the start of the last checkpoint, skip the
* checkpoint. The idea here is to avoid inserting duplicate checkpoints
* checkpoint. The idea here is to avoid inserting duplicate checkpoints
* when the system is idle. That wastes log space, and more importantly it
* exposes us to possible loss of both current and previous checkpoint
* records if the machine crashes just as we're writing the update.
@ -10135,7 +10206,7 @@ void CreateCheckPoint(int flags)
if (XLByteEQ(curMinRecLSN, t_thrd.xlog_cxt.RedoRecPtr)) {
WALInsertLockRelease();
LWLockRelease(CheckpointLock);
LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
if (u_sess->attr.attr_storage.log_pagewriter) {
@ -10143,11 +10214,11 @@ void CreateCheckPoint(int flags)
(errmodule(MOD_INCRE_CKPT),
errmsg("Checkpoint meets prev checkpoint lsn is %08X/%08X, now min rec lsn is %08X/%08X, "
"checkpoint flag is %d",
(uint32)(t_thrd.xlog_cxt.RedoRecPtr >> XLOG_LSN_SWAP),
(uint32)t_thrd.xlog_cxt.RedoRecPtr,
(uint32)(curMinRecLSN >> XLOG_LSN_SWAP),
(uint32)curMinRecLSN,
flags)));
(uint32)(t_thrd.xlog_cxt.RedoRecPtr >> XLOG_LSN_SWAP),
(uint32)t_thrd.xlog_cxt.RedoRecPtr,
(uint32)(curMinRecLSN >> XLOG_LSN_SWAP),
(uint32)curMinRecLSN,
flags)));
}
if (dw_enabled()) {
smgrsync_with_absorption();
@ -10195,7 +10266,7 @@ void CreateCheckPoint(int flags)
* the buffer flush work. Those XLOG records are logically after the
* checkpoint, even though physically before it. Got that?
*/
if (!doFullCheckpoint && (dw_enabled() || !Insert->fullPageWrites)) {
if (!doFullCheckpoint && (dw_enabled() || !Insert->fullPageWrites)) {
/*
* Incremental Checkpoint use queue first page recLSN, when the dirty page queue is empty,
* choose the dirty page queue recLSN. Dirty page queue lsn has computed redo ptr when
@ -10414,11 +10485,9 @@ void CreateCheckPoint(int flags)
*/
XLogBeginInsert();
errno_t rcm = memcpy_s(&checkPointNew, sizeof(CheckPoint), &checkPoint, sizeof(CheckPoint));
securec_check(rcm, "", "");
if (IsBootstrapProcessingMode() ||
!COMMITSEQNO_IS_COMMITTED(t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo)) {
if (IsBootstrapProcessingMode() || !COMMITSEQNO_IS_COMMITTED(t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo)) {
checkPointNew.next_csn = COMMITSEQNO_FIRST_NORMAL + 1;
} else {
checkPointNew.next_csn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo;
@ -10616,7 +10685,7 @@ static void RecoveryRestartPoint(const CheckPoint* checkPoint, XLogRecPtr record
bool IsRestartPointSafe(const XLogRecPtr checkPoint)
{
/*
* Is it safe to restartpoint? We must ask each of the resource managers
* Is it safe to restartpoint? We must ask each of the resource managers
* whether they have any partial state information that might prevent a
* correct restart from this point. If so, we skip this opportunity, but
* return at the next checkpoint record for another try.
@ -11053,7 +11122,7 @@ void XLogPutNextOid(Oid nextOid)
* We need not flush the NEXTOID record immediately, because any of the
* just-allocated OIDs could only reach disk as part of a tuple insert or
* update that would have its own XLOG record that must follow the NEXTOID
* record. Therefore, the standard buffer LSN interlock applied to those
* record. Therefore, the standard buffer LSN interlock applied to those
* records will ensure no such OID reaches disk before the NEXTOID record
* does.
*
@ -11692,7 +11761,7 @@ static int get_sync_bit(int method)
/*
* Optimize writes by bypassing kernel cache with O_DIRECT when using
* O_SYNC/O_FSYNC and O_DSYNC. But only if archiving and streaming are
* O_SYNC/O_FSYNC and O_DSYNC. But only if archiving and streaming are
* disabled, otherwise the archive command or walsender process will read
* the WAL soon after writing it, which is guaranteed to cause a physical
* read if we bypassed the kernel cache. We also skip the
@ -11903,7 +11972,8 @@ void startupInitRoachBackup(void)
* Every successfully started non-exclusive backup must
* be stopped by calling do_pg_stop_backup() or do_pg_abort_backup().
*/
XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfile)
XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfile, DIR* tblspcdir,
char** tblspcmapfile, List** tablespaces, bool infotbssize, bool needtblspcmapfile)
{
bool exclusive = (labelfile == NULL);
bool backup_started_in_recovery = false;
@ -11916,6 +11986,7 @@ XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfi
struct stat stat_buf;
FILE* fp = NULL;
StringInfoData labelfbuf;
StringInfoData tblspc_mapfbuf;
errno_t errorno = EOK;
gstrace_entry(GS_TRC_ID_do_pg_start_backup);
@ -11959,7 +12030,7 @@ XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfi
* during an on-line backup even if not doing so at other times, because
* it's quite possible for the backup dump to obtain a "torn" (partially
* written) copy of a database page if it reads the page concurrently with
* our write to the same page. This can be fixed as long as the first
* our write to the same page. This can be fixed as long as the first
* write to the page in the WAL sequence is a full-page write. Hence, we
* turn on forcePageWrites and then force a CHECKPOINT, to ensure there
* are no dirty pages in shared memory that might get dumped while the
@ -11998,6 +12069,9 @@ XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfi
PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum)BoolGetDatum(exclusive));
{
bool gotUniqueStartpoint = false;
struct dirent* de;
tablespaceinfo* ti;
int datadirpathlen;
/*
* Force an XLOG file switch before the checkpoint, to ensure that the
@ -12005,7 +12079,7 @@ XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfi
* old timeline IDs. That would otherwise happen if you called
* pg_start_backup() right after restoring from a PITR archive: the
* first WAL segment containing the startup checkpoint has pages in
* the beginning with the old timeline ID. That can cause trouble at
* the beginning with the old timeline ID. That can cause trouble at
* recovery: we won't have a history file covering the old timeline if
* pg_xlog directory was not included in the base backup and the WAL
* archive was cleared too before starting the backup.
@ -12028,7 +12102,7 @@ XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfi
bool checkpointfpw = false;
/*
* Force a CHECKPOINT. Aside from being necessary to prevent torn
* Force a CHECKPOINT. Aside from being necessary to prevent torn
* page problems, this guarantees that two successive backup runs
* will have different checkpoint positions and hence different
* history file names, even if nothing happened in between.
@ -12123,6 +12197,72 @@ XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfi
(uint32)((_logSegNo) % XLogSegmentsPerXLogId));
securec_check_ss(errorno, "", "");
/*
* Construct tablespace_map file
*/
initStringInfo(&tblspc_mapfbuf);
datadirpathlen = strlen(t_thrd.proc_cxt.DataDir);
/* Collect information about all tablespaces */
while ((de = ReadDir(tblspcdir, "pg_tblspc")) != NULL) {
char fullpath[MAXPGPATH + PG_TBLSPCS];
char linkpath[MAXPGPATH];
char* relpath = NULL;
int rllen;
errno_t errorno = EOK;
errorno = memset_s(fullpath, MAXPGPATH, '\0', MAXPGPATH);
securec_check(errorno, "", "");
errorno = memset_s(linkpath, MAXPGPATH, '\0', MAXPGPATH);
securec_check(errorno, "", "");
/* Skip special stuff */
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;
snprintf(fullpath, sizeof(fullpath), "pg_tblspc/%s", de->d_name);
#if defined(HAVE_READLINK) || defined(WIN32)
rllen = readlink(fullpath, linkpath, sizeof(linkpath));
if (rllen < 0) {
ereport(WARNING, (errmsg("could not read symbolic link \"%s\": %m", fullpath)));
continue;
} else if (rllen >= (int)sizeof(linkpath)) {
ereport(WARNING, (errmsg("symbolic link \"%s\" target is too long", fullpath)));
continue;
}
linkpath[rllen] = '\0';
/*
* Relpath holds the relative path of the tablespace directory
* when it's located within PGDATA, or NULL if it's located
* elsewhere.
*/
if (rllen > datadirpathlen && strncmp(linkpath, t_thrd.proc_cxt.DataDir, datadirpathlen) == 0 &&
IS_DIR_SEP(linkpath[datadirpathlen]))
relpath = linkpath + datadirpathlen + 1;
ti = (tablespaceinfo*)palloc(sizeof(tablespaceinfo));
ti->oid = pstrdup(de->d_name);
ti->path = pstrdup(linkpath);
ti->relativePath = relpath ? pstrdup(relpath) : NULL;
ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
if (tablespaces)
*tablespaces = lappend(*tablespaces, ti);
appendStringInfo(&tblspc_mapfbuf, "%s %s\n", ti->oid, ti->path);
#else
/*
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("tablespaces are not supported on this platform")));
#endif
}
/*
* Construct backup label file
*/
@ -12179,8 +12319,35 @@ XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfi
ereport(ERROR, (errcode_for_file_access(), errmsg("could not write file \"%s\": %m", fileName)));
}
pfree(labelfbuf.data);
/* Write backup tablespace_map file. */
if (tblspc_mapfbuf.len > 0) {
if (stat(TABLESPACE_MAP, &stat_buf) != 0) {
if (errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", TABLESPACE_MAP)));
} else
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a backup is already in progress"),
errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
TABLESPACE_MAP)));
fp = AllocateFile(TABLESPACE_MAP, "w");
if (!fp)
ereport(
ERROR, (errcode_for_file_access(), errmsg("could not create file \"%s\": %m", TABLESPACE_MAP)));
if (fwrite(tblspc_mapfbuf.data, tblspc_mapfbuf.len, 1, fp) != 1 || fflush(fp) != 0 ||
pg_fsync(fileno(fp)) != 0 || ferror(fp) || FreeFile(fp))
ereport(
ERROR, (errcode_for_file_access(), errmsg("could not write file \"%s\": %m", TABLESPACE_MAP)));
}
pfree(tblspc_mapfbuf.data);
} else {
*labelfile = labelfbuf.data;
if (tblspc_mapfbuf.len > 0)
*tblspcmapfile = tblspc_mapfbuf.data;
}
}
PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum)BoolGetDatum(exclusive));
@ -13461,7 +13628,7 @@ XLogRecPtr GetXLogWriteRecPtr(void)
*
* If we see a backup_label during recovery, we assume that we are recovering
* from a backup dump file, and we therefore roll forward from the checkpoint
* identified by the label file, NOT what pg_control says. This avoids the
* identified by the label file, NOT what pg_control says. This avoids the
* problem that pg_control might have been archived one or more checkpoints
* later than the start of the dump, and so if we rely on it as the start
* point, we will fail to restore a consistent database state.
@ -13547,6 +13714,74 @@ static bool read_backup_label(XLogRecPtr* checkPointLoc, bool* backupEndRequired
return true;
}
/*
* read_tablespace_map: check to see if a tablespace_map file is present
*
* If we see a tablespace_map file during recovery, we assume that we are
* recovering from a backup dump file, and we therefore need to create symlinks
* as per the information present in tablespace_map file.
*
* Returns TRUE if a tablespace_map file was found (and fills the link
* information for all the tablespace links present in file); returns FALSE
* if not.
*/
static bool read_tablespace_map(List** tablespaces)
{
tablespaceinfo* ti;
FILE* lfp;
char tbsoid[MAXPGPATH];
char* tbslinkpath;
char str[MAXPGPATH];
int ch, prev_ch = -1, i = 0, n;
/*
* See if tablespace_map file is present
*/
lfp = AllocateFile(TABLESPACE_MAP, "r");
if (!lfp) {
if (errno != ENOENT)
ereport(FATAL, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", TABLESPACE_MAP)));
return false; /* it's not there, all is fine */
}
/*
* Read and parse the link name and path lines from tablespace_map file
* (this code is pretty crude, but we are not expecting any variability in
* the file format). While taking backup we embed escape character '\\'
* before newline in tablespace path, so that during reading of
* tablespace_map file, we could distinguish newline in tablespace path
* and end of line. Now while reading tablespace_map file, remove the
* escape character that has been added in tablespace path during backup.
*/
while ((ch = fgetc(lfp)) != EOF) {
if ((ch == '\n' || ch == '\r') && prev_ch != '\\') {
str[i] = '\0';
if (sscanf(str, "%s %n", tbsoid, &n) != 1)
ereport(FATAL,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("invalid data in file \"%s\"", TABLESPACE_MAP)));
tbslinkpath = str + n;
i = 0;
ti = (tablespaceinfo*)palloc(sizeof(tablespaceinfo));
ti->oid = pstrdup(tbsoid);
ti->path = pstrdup(tbslinkpath);
*tablespaces = lappend(*tablespaces, ti);
continue;
} else if ((ch == '\n' || ch == '\r') && prev_ch == '\\')
str[i - 1] = ch;
else
str[i++] = ch;
prev_ch = ch;
}
if (ferror(lfp) || FreeFile(lfp))
ereport(FATAL, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", TABLESPACE_MAP)));
return true;
}
/* * Error context callback for errors occurring during rm_redo(). */
void rm_redo_error_callback(void* arg)
{
@ -13596,11 +13831,39 @@ void CancelBackup(void)
ereport(LOG,
(errmsg("online backup mode canceled"),
errdetail("\"%s\" was renamed to \"%s\".", BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
return;
}
/* if the tablespace_map file is not there, return */
if (stat(TABLESPACE_MAP, &stat_buf) < 0) {
ereport(LOG,
(errmsg("online backup mode canceled"),
errdetail("File \"%s\" was renamed to \"%s\".", BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
return;
}
/* remove leftover file from previously canceled backup if it exists */
unlink(TABLESPACE_MAP_OLD);
if (durable_rename(TABLESPACE_MAP, TABLESPACE_MAP_OLD, DEBUG1) == 0) {
ereport(LOG,
(errmsg("online backup mode canceled"),
errdetail("Files \"%s\" and \"%s\" were renamed to "
"\"%s\" and \"%s\", respectively.",
BACKUP_LABEL_FILE,
TABLESPACE_MAP,
BACKUP_LABEL_OLD,
TABLESPACE_MAP_OLD)));
} else {
ereport(WARNING,
(errcode_for_file_access(),
errmsg("online backup mode was not canceled"),
errdetail("Could not rename \"%s\" to \"%s\": %m.", BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
errmsg("online backup mode canceled"),
errdetail("File \"%s\" was renamed to \"%s\", but "
"file \"%s\" could not be renamed to \"%s\": %m.",
BACKUP_LABEL_FILE,
BACKUP_LABEL_OLD,
TABLESPACE_MAP,
TABLESPACE_MAP_OLD)));
}
}
@ -13983,8 +14246,8 @@ retry:
RecoveryFromDummyStandby()
? (GetWalRcvDummyStandbySyncPercent() == SYNC_DUMMY_STANDBY_END &&
(g_instance.attr.attr_storage.enable_mix_replication
? true
: GetDataRcvDummyStandbySyncPercent() == SYNC_DUMMY_STANDBY_END))
? true
: GetDataRcvDummyStandbySyncPercent() == SYNC_DUMMY_STANDBY_END))
: true;
if (CheckForFailoverTrigger() && dummy_status &&
@ -14167,7 +14430,7 @@ retry:
ereport(LOG,
(errmsg("request xlog stream at %X/%X.",
fetching_ckpt ? (uint32)(t_thrd.xlog_cxt.RedoStartLSN >> 32)
: (uint32)(targetRecPtr >> 32),
: (uint32)(targetRecPtr >> 32),
fetching_ckpt ? (uint32)t_thrd.xlog_cxt.RedoStartLSN : (uint32)targetRecPtr)));
ShutdownWalRcv();
t_thrd.xlog_cxt.receivedUpto = 0;
@ -14225,7 +14488,7 @@ retry:
ereport(LOG,
(errmsg("request xlog stream at %X/%X.",
fetching_ckpt ? (uint32)(t_thrd.xlog_cxt.RedoStartLSN >> 32)
: (uint32)(targetRecPtr >> 32),
: (uint32)(targetRecPtr >> 32),
fetching_ckpt ? (uint32)t_thrd.xlog_cxt.RedoStartLSN : (uint32)targetRecPtr)));
ShutdownWalRcv();
t_thrd.xlog_cxt.receivedUpto = 0;
@ -14441,9 +14704,9 @@ triggered:
* in the current WAL page, previously read by XLogPageRead().
*
* 'emode' is the error mode that would be used to report a file-not-found
* or legitimate end-of-WAL situation. Generally, we use it as-is, but if
* or legitimate end-of-WAL situation. Generally, we use it as-is, but if
* we're retrying the exact same record that we've tried previously, only
* complain the first time to keep the noise down. However, we only do when
* complain the first time to keep the noise down. However, we only do when
* reading from pg_xlog, because we don't expect any invalid records in archive
* or in records streamed from master. Files in the archive should be complete,
* and we should never hit the end of WAL because we stop and wait for more WAL
@ -14821,7 +15084,7 @@ void SetWalWriterSleeping(bool sleeping)
/*
* CloseXlogFilesAtThreadExit
* Close opened xlog files at thread exit time
* Close opened xlog files at thread exit time
*/
void CloseXlogFilesAtThreadExit(void)
{

View File

@ -70,12 +70,13 @@ Datum pg_start_backup(PG_FUNCTION_ARGS)
bool fast = PG_GETARG_BOOL(1);
char* backupidstr = NULL;
XLogRecPtr startpoint;
DIR *dir;
char startxlogstr[MAXFNAMELEN];
errno_t errorno = EOK;
backupidstr = text_to_cstring(backupid);
startpoint = do_pg_start_backup(backupidstr, fast, NULL);
dir = AllocateDir("pg_tblspc");
startpoint = do_pg_start_backup(backupidstr, fast, NULL, dir, NULL, NULL, false, true);
errorno = snprintf_s(startxlogstr,
sizeof(startxlogstr),

View File

@ -53,6 +53,7 @@ typedef struct {
bool fastcheckpoint;
bool nowait;
bool includewal;
bool sendtblspcmapfile;
} basebackup_options;
#define BUILD_PATH_LEN 2560 /* (MAXPGPATH*2 + 512) */
@ -70,14 +71,15 @@ const int MATCH_SIX = 6;
XLogRecPtr XlogCopyStartPtr = InvalidXLogRecPtr;
static int64 sendDir(const char* path, int basepathlen, bool sizeonly, List* tablespaces, bool skipmot = true);
static int64 sendTablespace(const char* path, bool sizeonly);
static int64 sendDir(
const char* path, int basepathlen, bool sizeonly, List* tablespaces, bool sendtblspclinks, bool skipmot = true);
static bool sendFile(char* readfilename, char* tarfilename, struct stat* statbuf, bool missing_ok);
static void sendFileWithContent(const char* filename, const char* content);
static void _tarWriteHeader(const char* filename, const char* linktarget, struct stat* statbuf);
static void send_int8_string(StringInfoData* buf, int64 intval);
static void SendBackupHeader(List* tablespaces);
static void SendMotCheckpointHeader(const char* path);
static int CompareWalFileNames(const void* a, const void* b);
static void base_backup_cleanup(int code, Datum arg);
static void perform_base_backup(basebackup_options* opt, DIR* tblspcdir);
static void parse_basebackup_options(List* options, basebackup_options* opt);
@ -218,11 +220,20 @@ static void perform_base_backup(basebackup_options* opt, DIR* tblspcdir)
XLogRecPtr endptr;
XLogRecPtr minlsn;
char* labelfile = NULL;
char* tblspc_map_file = NULL;
int datadirpathlen;
List* tablespaces = NIL;
datadirpathlen = strlen(t_thrd.proc_cxt.DataDir);
startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
startptr = do_pg_start_backup(opt->label,
opt->fastcheckpoint,
&labelfile,
tblspcdir,
&tblspc_map_file,
&tablespaces,
opt->progress,
opt->sendtblspcmapfile);
/* Get the slot minimum LSN */
ReplicationSlotsComputeRequiredXmin(false);
ReplicationSlotsComputeRequiredLSN(NULL);
@ -247,74 +258,12 @@ static void perform_base_backup(basebackup_options* opt, DIR* tblspcdir)
PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum)0);
{
List* tablespaces = NIL;
ListCell* lc = NULL;
struct dirent* de;
tablespaceinfo* ti = NULL;
/* Collect information about all tablespaces */
while ((de = ReadDir(tblspcdir, "pg_tblspc")) != NULL) {
char fullpath[MAXPGPATH];
char linkpath[MAXPGPATH];
char* relpath = NULL;
int rllen;
errno_t errorno = EOK;
int nRet = 0;
errorno = memset_s(fullpath, MAXPGPATH, '\0', MAXPGPATH);
securec_check(errorno, "", "");
errorno = memset_s(linkpath, MAXPGPATH, '\0', MAXPGPATH);
securec_check(errorno, "", "");
/* Skip special stuff */
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;
nRet = snprintf_s(fullpath, MAXPGPATH, MAXPGPATH - 1, "pg_tblspc/%s", de->d_name);
securec_check_ss(nRet, "", "");
#if defined(HAVE_READLINK) || defined(WIN32)
rllen = readlink(fullpath, linkpath, sizeof(linkpath));
if (rllen < 0) {
ereport(WARNING, (errmsg("could not read symbolic link \"%s\": %m", fullpath)));
continue;
} else if (rllen >= (int)sizeof(linkpath)) {
ereport(WARNING, (errmsg("symbolic link \"%s\" target is too long", fullpath)));
continue;
}
linkpath[rllen] = '\0';
/*
* Relpath holds the relative path of the tablespace directory
* when it's located within PGDATA, or NULL if it's located
* elsewhere.
*/
if (rllen > datadirpathlen && strncmp(linkpath, t_thrd.proc_cxt.DataDir, datadirpathlen) == 0 &&
IS_DIR_SEP(linkpath[datadirpathlen]))
relpath = linkpath + datadirpathlen + 1;
ti = (tablespaceinfo*)palloc(sizeof(tablespaceinfo));
ti->oid = pstrdup(de->d_name);
ti->path = pstrdup(linkpath);
ti->relativePath = relpath ? pstrdup(relpath) : NULL;
ti->size = opt->progress ? sendTablespace(fullpath, true) : -1;
tablespaces = lappend(tablespaces, ti);
#else
/*
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("tablespaces are not supported on this platform")));
#endif
}
/* Add a node for the base directory at the end */
ti = (tablespaceinfo*)palloc0(sizeof(tablespaceinfo));
ti->size = opt->progress ? sendDir(".", 1, true, tablespaces) : -1;
ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
tablespaces = (List*)lappend(tablespaces, ti);
/* Send tablespace header */
@ -344,8 +293,12 @@ static void perform_base_backup(basebackup_options* opt, DIR* tblspcdir)
/* Skip the tablespace if it's created in GAUSSDATA */
sendTablespace(iterti->path, false);
} else {
/* data dir */
sendDir(".", 1, false, tablespaces);
/* Then the tablespace_map file, if required... */
if (tblspc_map_file && opt->sendtblspcmapfile) {
sendFileWithContent(TABLESPACE_MAP, tblspc_map_file);
sendDir(".", 1, false, tablespaces, false);
} else
sendDir(".", 1, false, tablespaces, true);
}
/* In the main tar, include pg_control last. */
@ -391,13 +344,228 @@ static void perform_base_backup(basebackup_options* opt, DIR* tblspcdir)
endptr = do_pg_stop_backup(labelfile, !opt->nowait);
SendXlogRecPtrResult(endptr);
if (opt->includewal) {
/*
* We've left the last tar file "open", so we can now append the
* required WAL files to it.
*/
char pathbuf[MAXPGPATH];
XLogSegNo segno;
XLogSegNo startsegno;
XLogSegNo endsegno;
struct stat statbuf;
List* historyFileList = NIL;
List* walFileList = NIL;
char** walFiles;
int nWalFiles;
char firstoff[MAXFNAMELEN];
char lastoff[MAXFNAMELEN];
DIR* dir;
struct dirent* de;
int i;
ListCell* lc;
TimeLineID tli;
/*
* I'd rather not worry about timelines here, so scan pg_xlog and
* include all WAL files in the range between 'startptr' and 'endptr',
* regardless of the timeline the file is stamped with. If there are
* some spurious WAL files belonging to timelines that don't belong in
* this server's history, they will be included too. Normally there
* shouldn't be such files, but if there are, there's little harm in
* including them.
*/
XLByteToSeg(startptr, startsegno);
XLogFileName(firstoff, t_thrd.xlog_cxt.ThisTimeLineID, startsegno);
XLByteToPrevSeg(endptr, endsegno);
XLogFileName(lastoff, t_thrd.xlog_cxt.ThisTimeLineID, endsegno);
dir = AllocateDir("pg_xlog");
if (!dir) {
ereport(ERROR, (errmsg("could not open directory \"%s\": %m", "pg_xlog")));
}
while ((de = ReadDir(dir, "pg_xlog")) != NULL) {
/* Does it look like a WAL segment, and is it in the range? */
if (strlen(de->d_name) == 24 && strspn(de->d_name, "0123456789ABCDEF") == 24 &&
strcmp(de->d_name + 8, firstoff + 8) >= 0 && strcmp(de->d_name + 8, lastoff + 8) <= 0) {
walFileList = lappend(walFileList, pstrdup(de->d_name));
} else if (strlen(de->d_name) == 8 + strlen(".history") && strspn(de->d_name, "0123456789ABCDEF") == 8 &&
strcmp(de->d_name + 8, ".history") == 0) {
/* Does it look like a timeline history file? */
historyFileList = lappend(historyFileList, pstrdup(de->d_name));
}
}
FreeDir(dir);
/*
* Before we go any further, check that none of the WAL segments we
* need were removed.
*/
CheckXLogRemoved(startsegno, t_thrd.xlog_cxt.ThisTimeLineID);
/*
* Put the WAL filenames into an array, and sort. We send the files in
* order from oldest to newest, to reduce the chance that a file is
* recycled before we get a chance to send it over.
*/
nWalFiles = list_length(walFileList);
walFiles = (char**)palloc0(nWalFiles * sizeof(char*));
i = 0;
foreach (lc, walFileList) {
walFiles[i++] = (char*)lfirst(lc);
}
qsort(walFiles, nWalFiles, sizeof(char*), CompareWalFileNames);
/*
* There must be at least one xlog file in the pg_xlog directory,
* since we are doing backup-including-xlog.
*/
if (nWalFiles < 1) {
ereport(ERROR, (errmsg("could not find any WAL files")));
}
/*
* Sanity check: the first and last segment should cover startptr and
* endptr, with no gaps in between.
*/
XLogFromFileName(walFiles[0], &tli, &segno);
if (segno != startsegno) {
char startfname[MAXFNAMELEN];
XLogFileName(startfname, t_thrd.xlog_cxt.ThisTimeLineID, startsegno);
ereport(ERROR, (errmsg("could not find WAL file \"%s\"", startfname)));
}
for (i = 0; i < nWalFiles; i++) {
XLogSegNo currsegno = segno;
XLogSegNo nextsegno = segno + 1;
XLogFromFileName(walFiles[i], &tli, &segno);
if (!(nextsegno == segno || currsegno == segno)) {
char nextfname[MAXFNAMELEN];
XLogFileName(nextfname, t_thrd.xlog_cxt.ThisTimeLineID, nextsegno);
ereport(ERROR, (errmsg("could not find WAL file \"%s\"", nextfname)));
}
}
if (segno != endsegno) {
char endfname[MAXFNAMELEN];
XLogFileName(endfname, t_thrd.xlog_cxt.ThisTimeLineID, endsegno);
ereport(ERROR, (errmsg("could not find WAL file \"%s\"", endfname)));
}
/* Ok, we have everything we need. Send the WAL files. */
for (i = 0; i < nWalFiles; i++) {
FILE* fp;
char buf[TAR_SEND_SIZE];
size_t cnt;
pgoff_t len = 0;
snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFiles[i]);
XLogFromFileName(walFiles[i], &tli, &segno);
fp = AllocateFile(pathbuf, "rb");
if (fp == NULL) {
int save_errno = errno;
/*
* Most likely reason for this is that the file was already
* removed by a checkpoint, so check for that to get a better
* error message.
*/
CheckXLogRemoved(segno, tli);
errno = save_errno;
ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", pathbuf)));
}
if (fstat(fileno(fp), &statbuf) != 0) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", pathbuf)));
}
if (statbuf.st_size != XLogSegSize) {
CheckXLogRemoved(segno, tli);
ereport(ERROR, (errcode_for_file_access(), errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}
/* send the WAL file itself */
_tarWriteHeader(pathbuf, NULL, &statbuf);
while ((cnt = fread(buf, 1, Min((uint32)sizeof(buf), XLogSegSize - len), fp)) > 0) {
CheckXLogRemoved(segno, tli);
/* Send the chunk as a CopyData message */
if (pq_putmessage('d', buf, cnt)) {
ereport(ERROR, (errmsg("base backup could not send data, aborting backup")));
}
len += cnt;
if (len == XLogSegSize)
break;
}
if (len != XLogSegSize) {
CheckXLogRemoved(segno, tli);
ereport(ERROR, (errcode_for_file_access(), errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}
/* XLogSegSize is a multiple of 512, so no need for padding */
FreeFile(fp);
/*
* Mark file as archived, otherwise files can get archived again
* after promotion of a new node. This is in line with
* walreceiver.c always doing a XLogArchiveForceDone() after a
* complete segment.
*/
StatusFilePath(pathbuf, walFiles[i], ".done");
sendFileWithContent(pathbuf, "");
}
/*
* Send timeline history files too. Only the latest timeline history
* file is required for recovery, and even that only if there happens
* to be a timeline switch in the first WAL segment that contains the
* checkpoint record, or if we're taking a base backup from a standby
* server and the target timeline changes while the backup is taken.
* But they are small and highly useful for debugging purposes, so
* better include them all, always.
*/
foreach (lc, historyFileList) {
char* fname = (char*)lfirst(lc);
snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
if (lstat(pathbuf, &statbuf) != 0)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", pathbuf)));
sendFile(pathbuf, pathbuf, &statbuf, false);
/* unconditionally mark file as archived */
StatusFilePath(pathbuf, fname, ".done");
sendFileWithContent(pathbuf, "");
}
/* Send CopyDone message for the last tar file */
pq_putemptymessage('c');
}
SendXlogRecPtrResult(endptr);
LWLockAcquire(FullBuildXlogCopyStartPtrLock, LW_EXCLUSIVE);
XlogCopyStartPtr = InvalidXLogRecPtr;
LWLockRelease(FullBuildXlogCopyStartPtrLock);
}
/*
* list_sort comparison function, to compare log/seg portion of WAL segment
* filenames, ignoring the timeline portion.
*/
static int CompareWalFileNames(const void* a, const void* b)
{
char* fna = *((char**)a);
char* fnb = *((char**)b);
return strcmp(fna + 8, fnb + 8);
}
/*
* Called when ERROR or FATAL happens in PerformMotCheckpointFetch() after
* we have started the operation - make sure we end it!
@ -430,23 +598,17 @@ void PerformMotCheckpointFetch()
}
if (getcwd(cwd, sizeof(cwd)) == NULL) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not get current work dir : %m")));
ereport(ERROR, (errcode_for_file_access(), errmsg("could not get current work dir : %m")));
}
chkptDir = MOTCheckpointFetchDirName();
if (chkptDir == NULL) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not get mot checkpoint dir : %m")));
ereport(ERROR, (errcode_for_file_access(), errmsg("could not get mot checkpoint dir : %m")));
}
workingDir = MOTCheckpointFetchWorkingDir();
if (workingDir == NULL) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not get mot checkpoint working dir : %m")));
ereport(ERROR, (errcode_for_file_access(), errmsg("could not get mot checkpoint working dir : %m")));
}
if (strncmp(cwd, workingDir, strlen(workingDir) - 1) == 0) {
@ -482,14 +644,13 @@ void PerformMotCheckpointFetch()
struct stat statbuf;
if (lstat(ctrlFilePath, &statbuf) != 0) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not stat mot control file \"%s\": %m", ctrlFilePath)));
(errcode_for_file_access(), errmsg("could not stat mot control file \"%s\": %m", ctrlFilePath)));
}
sendFile((char*)ctrlFilePath, (char*)ctrlFilePath, &statbuf, false);
/* send the checkpoint dir */
sendDir(fullChkptDir, 1, false, NIL, false);
sendDir(fullChkptDir, 1, false, NIL, false, false);
/* CopyDone */
pq_putemptymessage_noblock('c');
@ -510,6 +671,7 @@ static void parse_basebackup_options(List* options, basebackup_options* opt)
bool o_fast = false;
bool o_nowait = false;
bool o_wal = false;
bool o_tablespace_map = false;
errno_t rc = 0;
rc = memset_s(opt, sizeof(*opt), 0, sizeof(*opt));
@ -542,6 +704,11 @@ static void parse_basebackup_options(List* options, basebackup_options* opt)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
opt->includewal = true;
o_wal = true;
} else if (strcmp(defel->defname, "tablespace_map") == 0) {
if (o_tablespace_map)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
opt->sendtblspcmapfile = true;
o_tablespace_map = true;
} else
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("option \"%s\" not recognized", defel->defname)));
}
@ -722,7 +889,7 @@ static void SendMotCheckpointHeader(const char* path)
if (path != NULL) {
/* Send one datarow message */
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 1); /* number of columns */
pq_sendint16(&buf, 1); /* number of columns */
pq_sendint32(&buf, strlen(path)); /* length */
pq_sendbytes(&buf, path, strlen(path));
pq_endmessage_noblock(&buf);
@ -818,7 +985,7 @@ static void sendFileWithContent(const char* filename, const char* content)
*
* Only used to send auxiliary tablespaces, not GAUSSDATA.
*/
static int64 sendTablespace(const char* path, bool sizeonly)
int64 sendTablespace(const char* path, bool sizeonly)
{
int64 size = 0;
char pathbuf[MAXPGPATH] = {0};
@ -857,7 +1024,7 @@ static int64 sendTablespace(const char* path, bool sizeonly)
size = 512; /* Size of the header just added */
/* Send all the files in the tablespace version directory */
size += sendDir(pathbuf, strlen(path), sizeonly, NIL);
size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
return size;
}
@ -870,7 +1037,8 @@ static int64 sendTablespace(const char* path, bool sizeonly)
* Omit any directory in the tablespaces list, to avoid backing up
* tablespaces twice when they were created inside PGDATA.
*/
static int64 sendDir(const char* path, int basepathlen, bool sizeonly, List* tablespaces, bool skipmot)
static int64 sendDir(
const char* path, int basepathlen, bool sizeonly, List* tablespaces, bool sendtblspclinks, bool skipmot)
{
DIR* dir = NULL;
struct dirent* de = NULL;
@ -1070,9 +1238,50 @@ static int64 sendDir(const char* path, int basepathlen, bool sizeonly, List* tab
}
}
size += 512; /* Size of the header just added */
if (!sizeonly) {
#ifndef WIN32
if (S_ISLNK(statbuf.st_mode)) {
#else
if (pgwin32_is_junction(pathbuf)) {
#endif
#if defined(HAVE_READLINK) || defined(WIN32)
char linkpath[MAXPGPATH] = {0};
int rllen;
rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
if (rllen < 0)
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not read symbolic link \"%s\": %m", pathbuf)));
if (rllen >= (int)sizeof(linkpath))
ereport(ERROR,
(errcode(ERRCODE_NAME_TOO_LONG),
errmsg("symbolic link \"%s\" target is too long", pathbuf)));
linkpath[MAXPGPATH - 1] = '\0';
_tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf);
#else
/*
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("tablespaces are not supported on this platform")));
continue;
#endif /* HAVE_READLINK */
} else if (S_ISDIR(statbuf.st_mode)) {
/*
* Also send archive_status directory (by hackishly reusing
* statbuf from above ...).
*/
statbuf.st_mode = S_IFDIR | S_IRWXU;
_tarWriteHeader("pg_xlog/archive_status", NULL, &statbuf);
}
}
size += 512; /* Size of the header just added */
continue; /* don't recurse into pg_xlog */
}
/* Allow symbolic links in pg_tblspc only */
if (strcmp(path, "./pg_tblspc") == 0 &&
#ifndef WIN32
@ -1137,8 +1346,13 @@ static int64 sendDir(const char* path, int basepathlen, bool sizeonly, List* tab
break;
}
}
/*
* skip sending directories inside pg_tblspc, if not required.
*/
if (strcmp(pathbuf, "./pg_tblspc") == 0 && !sendtblspclinks)
skip_this_dir = true;
if (!skip_this_dir)
size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces);
size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
} else if (S_ISREG(statbuf.st_mode)) {
bool sent = false;
@ -1191,19 +1405,16 @@ bool check_base_path(const char* fname, int* segNo)
if (nmatch == MATCH_FOUR) {
return false;
}
nmatch = sscanf_s(fname, "base/%u/%u_b%d.%u",
&rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname, "base/%u/%u_b%d.%u", &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
if (nmatch == MATCH_THREE || nmatch == MATCH_FOUR) {
return true;
}
nmatch = sscanf_s(fname, "base/%u/%u_b%d_vm.%u",
&rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname, "base/%u/%u_b%d_vm.%u", &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
if (nmatch == MATCH_THREE || nmatch == MATCH_FOUR) {
return true;
}
nmatch = sscanf_s(fname, "base/%u/%u_b%d_fsm.%u",
&rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname, "base/%u/%u_b%d_fsm.%u", &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
if (nmatch == MATCH_THREE || nmatch == MATCH_FOUR) {
return true;
}
@ -1248,20 +1459,41 @@ bool check_rel_tblspac_path(const char* fname, int* segNo)
return false;
}
nmatch = sscanf_s(fname, "pg_tblspc/%u/%[^/]/%u/%u_b%d.%u",
&rnode.spcNode, buf, sizeof(buf), &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname,
"pg_tblspc/%u/%[^/]/%u/%u_b%d.%u",
&rnode.spcNode,
buf,
sizeof(buf),
&rnode.dbNode,
&rnode.relNode,
&rnode.bucketNode,
segNo);
if (nmatch == MATCH_FIVE || nmatch == MATCH_SIX) {
return true;
}
nmatch = sscanf_s(fname, "pg_tblspc/%u/%[^/]/%u/%u_b%d_fsm.%u",
&rnode.spcNode, buf, sizeof(buf), &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname,
"pg_tblspc/%u/%[^/]/%u/%u_b%d_fsm.%u",
&rnode.spcNode,
buf,
sizeof(buf),
&rnode.dbNode,
&rnode.relNode,
&rnode.bucketNode,
segNo);
if (nmatch == MATCH_FIVE || nmatch == MATCH_SIX) {
return true;
}
nmatch = sscanf_s(fname, "pg_tblspc/%u/%[^/]/%u/%u_b%d_vm.%u",
&rnode.spcNode, buf, sizeof(buf), &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname,
"pg_tblspc/%u/%[^/]/%u/%u_b%d_vm.%u",
&rnode.spcNode,
buf,
sizeof(buf),
&rnode.dbNode,
&rnode.relNode,
&rnode.bucketNode,
segNo);
if (nmatch == MATCH_FIVE || nmatch == MATCH_SIX) {
return true;
}
@ -1320,20 +1552,38 @@ bool check_abs_tblspac_path(const char* fname, int* segNo)
if (nmatch == MATCH_SIX) {
return false;
}
nmatch = sscanf_s(fname, "PG_9.2_201611171_%[^/]/%u/%u_b%d.%u",
buf, sizeof(buf), &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname,
"PG_9.2_201611171_%[^/]/%u/%u_b%d.%u",
buf,
sizeof(buf),
&rnode.dbNode,
&rnode.relNode,
&rnode.bucketNode,
segNo);
if (nmatch == MATCH_FIVE || nmatch == MATCH_SIX) {
return true;
}
nmatch = sscanf_s(fname, "PG_9.2_201611171_%[^/]/%u/%u_b%d_fsm.%u",
buf, sizeof(buf), &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname,
"PG_9.2_201611171_%[^/]/%u/%u_b%d_fsm.%u",
buf,
sizeof(buf),
&rnode.dbNode,
&rnode.relNode,
&rnode.bucketNode,
segNo);
if (nmatch == MATCH_FIVE || nmatch == MATCH_SIX) {
return true;
}
nmatch = sscanf_s(fname, "PG_9.2_201611171_%[^/]/%u/%u_b%d_vm.%u",
buf, sizeof(buf), &rnode.dbNode, &rnode.relNode, &rnode.bucketNode, segNo);
nmatch = sscanf_s(fname,
"PG_9.2_201611171_%[^/]/%u/%u_b%d_vm.%u",
buf,
sizeof(buf),
&rnode.dbNode,
&rnode.relNode,
&rnode.bucketNode,
segNo);
if (nmatch == MATCH_FIVE || nmatch == MATCH_SIX) {
return true;
}
@ -1450,7 +1700,7 @@ static bool sendFile(char* readfilename, char* tarfilename, struct stat* statbuf
ereport(DEBUG1, (errmsg("sendFile, filename is %s, isNeedCheck is %d", readfilename, isNeedCheck)));
/* make sure data file size is integer multiple of BLCKSZ and change statbuf if needed */
if(isNeedCheck) {
if (isNeedCheck) {
statbuf->st_size = statbuf->st_size - (statbuf->st_size % BLCKSZ);
}
@ -1461,20 +1711,21 @@ static bool sendFile(char* readfilename, char* tarfilename, struct stat* statbuf
if (t_thrd.walsender_cxt.walsender_ready_to_stop)
ereport(ERROR, (errcode_for_file_access(), errmsg("base backup receive stop message, aborting backup")));
recheck:
if (cnt != (size_t) Min(TAR_SEND_SIZE, statbuf->st_size - len)) {
if (cnt != (size_t)Min(TAR_SEND_SIZE, statbuf->st_size - len)) {
if (ferror(fp)) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read file \"%s\": %m", readfilename)));
ereport(ERROR, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", readfilename)));
}
}
if (g_instance.attr.attr_storage.enableIncrementalCheckpoint && isNeedCheck) {
/* len and cnt must be integer multiple of BLCKSZ. */
if (len % BLCKSZ != 0 || cnt % BLCKSZ != 0) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("base backup file length cannot be divisibed by 8k: file %s, len %ld, cnt %ld, aborting backup",
readfilename, len, cnt)));
(errcode_for_file_access(),
errmsg("base backup file length cannot be divisibed by 8k: file %s, len %ld, cnt %ld, aborting "
"backup",
readfilename,
len,
cnt)));
}
for (check_loc = 0; (unsigned int)(check_loc) < cnt; check_loc += BLCKSZ) {
blkno = len / BLCKSZ + check_loc / BLCKSZ + (segNo * ((BlockNumber)RELSEG_SIZE));
@ -1496,9 +1747,12 @@ static bool sendFile(char* readfilename, char* tarfilename, struct stat* statbuf
goto recheck;
} else if (cnt > 0 && retryCnt == MAX_RETRY_LIMIT) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("base backup cheksum failed in file \"%s\"(computed: %d, recorded: %d), aborting backup",
readfilename, checksum, phdr->pd_checksum)));
(errcode_for_file_access(),
errmsg("base backup cheksum failed in file \"%s\"(computed: %d, recorded: %d), "
"aborting backup",
readfilename,
checksum,
phdr->pd_checksum)));
} else {
retryCnt = 0;
break;

View File

@ -86,6 +86,7 @@
%token K_FAST
%token K_NOWAIT
%token K_WAL
%token K_TABLESPACE_MAP
%token K_DATA
%token K_START_REPLICATION
%token K_FETCH_MOT_CHECKPOINT
@ -202,7 +203,7 @@ identify_channel:
;
/*
* BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
* BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [TABLESPACE_MAP]
*/
base_backup:
K_BASE_BACKUP base_backup_opt_list
@ -242,6 +243,11 @@ base_backup_opt:
$$ = makeDefElem("nowait",
(Node *)makeInteger(TRUE));
}
| K_TABLESPACE_MAP
{
$$ = makeDefElem("tablespace_map",
(Node *)makeInteger(TRUE));
}
;
/*

View File

@ -95,6 +95,7 @@ LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; }
PROGRESS { return K_PROGRESS; }
WAL { return K_WAL; }
TABLESPACE_MAP { return K_TABLESPACE_MAP; }
DATA { return K_DATA; }
START_REPLICATION { return K_START_REPLICATION; }
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }

View File

@ -5,6 +5,7 @@
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* src/include/access/xlog.h
*/
@ -21,7 +22,6 @@
#include "access/parallel_recovery/redo_item.h"
#include "access/htup.h"
/* Sync methods */
#define SYNC_METHOD_FSYNC 0
#define SYNC_METHOD_FDATASYNC 1
@ -94,12 +94,7 @@ typedef enum {
} RecoveryTargetType;
/* WAL levels */
typedef enum WalLevel {
WAL_LEVEL_MINIMAL = 0,
WAL_LEVEL_ARCHIVE,
WAL_LEVEL_HOT_STANDBY,
WAL_LEVEL_LOGICAL
} WalLevel;
typedef enum WalLevel { WAL_LEVEL_MINIMAL = 0, WAL_LEVEL_ARCHIVE, WAL_LEVEL_HOT_STANDBY, WAL_LEVEL_LOGICAL } WalLevel;
#define XLogArchivingActive() \
(u_sess->attr.attr_common.XLogArchiveMode && g_instance.attr.attr_storage.wal_level >= WAL_LEVEL_ARCHIVE)
@ -148,7 +143,7 @@ extern const int DemoteModeNum;
0x0002 /* Like shutdown checkpoint, \
* but issued at end of WAL \
* recovery */
#define CHECKPOINT_FILE_SYNC 0x0004 /* File sync */
#define CHECKPOINT_FILE_SYNC 0x0004 /* File sync */
#define CHECKPOINT_IMMEDIATE 0x0008 /* Do it without delays */
#define CHECKPOINT_FORCE 0x0010 /* Force even if no activity */
/* These are important to RequestCheckpoint */
@ -163,7 +158,6 @@ extern const int DemoteModeNum;
#define LAZY_BACKWRITE 0x0400 /* lazy backwrite */
#define PAGERANGE_BACKWRITE 0x0800 /* PageRangeBackWrite */
/* Checkpoint statistics */
typedef struct CheckpointStatsData {
TimestampTz ckpt_start_t; /* start of checkpoint */
@ -199,7 +193,7 @@ typedef struct XLogFPWInfo {
struct XLogRecData;
/*
/*
* Shared-memory data structures for XLOG control
*
* LogwrtRqst indicates a byte position that we need to write and/or fsync
@ -350,7 +344,8 @@ extern XLogRecPtr GetDDLDelayStartPtr(void);
/*
* Starting/stopping a base backup
*/
extern XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfile);
extern XLogRecPtr do_pg_start_backup(const char* backupidstr, bool fast, char** labelfile, DIR* tblspcdir,
char** tblspcmapfile, List** tablespaces, bool infotbssize, bool needtblspcmapfile);
extern void startupInitRoachBackup(void);
extern void set_start_backup_flag(bool startFlag);
extern bool get_startBackup_flag(void);
@ -381,7 +376,6 @@ extern void WaitCheckpointSync(void);
void GetRecoveryLatch();
void ReLeaseRecoveryLatch();
extern XLogRecPtr XlogRemoveSegPrimary;
/* File path names (all relative to $PGDATA) */
@ -408,5 +402,7 @@ typedef struct delayddlrange {
#define ENABLE_DDL_DELAY_TIMEOUT 60000
#define ROACH_BARRIER_PREFIX "roach_barrier_"
#endif /* XLOG_H */
#define TABLESPACE_MAP "tablespace_map"
#define TABLESPACE_MAP_OLD "tablespace_map.old"
#endif /* XLOG_H */

View File

@ -1,87 +1,87 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* GaussAlarm_client.h
*
*
*
* IDENTIFICATION
* src/include/alarm/GaussAlarm_client.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef __GAUSS_ALARM_CLENT_H__
#define __GAUSS_ALARM_CLENT_H__
#ifdef __cplusplus
#if __cplusplus
extern "C" {
#endif
#endif
#ifndef CM_ALARM_TYPE_MACRO
/* �澯���� */
typedef enum tagCM_ALARM_TYPE {
ALM_ALARM_TYPE_FAULT = 0, /* ���� */
// ALM_ALARM_TYPE_EVENT = 1, /* �¼� */
ALM_ALARM_TYPE_RESUME = 2, /* �ָ� */
ALM_ALARM_TYPE_OPLOG = 3, /* ������־ */
ALM_ALARM_TYPE_EVENT = 4, /* �¼�--> ������־ */
ALM_ALARM_TYPE_DELETE = 5, /* ɾ�� */
} CM_ALARM_TYPE;
#define CM_ALARM_TYPE_MACRO
#endif /* #ifndef CM_ALARM_TYPE_MACRO */
#ifndef CM_SUBSYSTEM_ID_MACRO
/* �澯ģ��ID���� */
typedef enum tagCM_SUBSYSTEM_ID {
SUBSYSID_CM = 0, /* CM */
SUBSYSID_PRO = 1, /* protocol */
SUBSYSID_TS = 2, /* TS� */
SUBSYSID_CA = 3, /* CA */
SUBSYSID_MDS = 4, /* MDS */
SUBSYSID_DS = 5, /* DS */
SUBSYSID_DLM = 6, /* BASE DLM */
SUBSYSID_MONC = 7, /* MONC */
SUBSYSID_TRNS = 8, /* TRNS */
SUBSYSID_NVCACHE = 9, /* NVCACHE */
SUBSYSID_PMA = 10, /* PMA */
SUBSYSID_BASE = 11, /* BASE */
SUBSYSID_MONS = 12, /* MONS */
SUBSYSID_NOFS = 13, /* NOFS */
SUBSYSID_SQL = 20, /* ��˹���ݿ� */
SUBSYSID_HD = 21, /* hadoop */
SUBSYSID_MDM = 22, /* �ļ�ϵͳԪ���ݹ��� */
SUBSYSID_BUTT /* ����ֵ */
} CM_SUBSYSTEM_ID;
#define CM_SUBSYSTEM_ID_MACRO
#endif /*# ifndef CM_SUBSYSTEM_ID_MACRO */
/**
����Ʒ���͸澯.
alarmMsg�����󳤶�Ϊ1024���������ֻᱻ�ض�
����ֵ�� 0 �ɹ�����0 ʧ��
*/
int Gauss_alarm_report(int moduleID, long long alarmID, CM_ALARM_TYPE type, char* alarmMsg, int msgLength);
#ifdef __cplusplus
#if __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* GaussAlarm_client.h
*
*
*
* IDENTIFICATION
* src/include/alarm/GaussAlarm_client.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef __GAUSS_ALARM_CLENT_H__
#define __GAUSS_ALARM_CLENT_H__
#ifdef __cplusplus
#if __cplusplus
extern "C" {
#endif
#endif
#ifndef CM_ALARM_TYPE_MACRO
/* �澯���� */
typedef enum tagCM_ALARM_TYPE {
ALM_ALARM_TYPE_FAULT = 0, /* ���� */
// ALM_ALARM_TYPE_EVENT = 1, /* �¼� */
ALM_ALARM_TYPE_RESUME = 2, /* �ָ� */
ALM_ALARM_TYPE_OPLOG = 3, /* ������־ */
ALM_ALARM_TYPE_EVENT = 4, /* �¼�--> ������־ */
ALM_ALARM_TYPE_DELETE = 5, /* ɾ�� */
} CM_ALARM_TYPE;
#define CM_ALARM_TYPE_MACRO
#endif /* #ifndef CM_ALARM_TYPE_MACRO */
#ifndef CM_SUBSYSTEM_ID_MACRO
/* �澯ģ��ID���� */
typedef enum tagCM_SUBSYSTEM_ID {
SUBSYSID_CM = 0, /* CM */
SUBSYSID_PRO = 1, /* protocol */
SUBSYSID_TS = 2, /* TS� */
SUBSYSID_CA = 3, /* CA */
SUBSYSID_MDS = 4, /* MDS */
SUBSYSID_DS = 5, /* DS */
SUBSYSID_DLM = 6, /* BASE DLM */
SUBSYSID_MONC = 7, /* MONC */
SUBSYSID_TRNS = 8, /* TRNS */
SUBSYSID_NVCACHE = 9, /* NVCACHE */
SUBSYSID_PMA = 10, /* PMA */
SUBSYSID_BASE = 11, /* BASE */
SUBSYSID_MONS = 12, /* MONS */
SUBSYSID_NOFS = 13, /* NOFS */
SUBSYSID_SQL = 20, /* ��˹���ݿ� */
SUBSYSID_HD = 21, /* hadoop */
SUBSYSID_MDM = 22, /* �ļ�ϵͳԪ���ݹ��� */
SUBSYSID_BUTT /* ����ֵ */
} CM_SUBSYSTEM_ID;
#define CM_SUBSYSTEM_ID_MACRO
#endif /*# ifndef CM_SUBSYSTEM_ID_MACRO */
/**
����Ʒ���͸澯.
alarmMsg�����󳤶�Ϊ1024���������ֻᱻ�ض�
����ֵ�� 0 �ɹ�����0 ʧ��
*/
int Gauss_alarm_report(int moduleID, long long alarmID, CM_ALARM_TYPE type, char* alarmMsg, int msgLength);
#ifdef __cplusplus
#if __cplusplus
}
#endif
#endif
#endif //__GAUSS_ALARM_CLENT_H__

View File

@ -126,6 +126,7 @@ extern Oid get_tablespace_oid(const char* tablespacename, bool missing_ok);
extern char* get_tablespace_name(Oid spc_oid);
extern bool directory_is_empty(const char* path);
extern void remove_tablespace_symlink(const char *linkloc);
extern void check_create_dir(char* location);
extern void tblspc_redo(XLogReaderState* rptr);

15
src/include/lib/string.h Normal file
View File

@ -0,0 +1,15 @@
/*
* string.h
* string handling helpers
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/common/string.h
*/
#ifndef COMMON_STRING_H
#define COMMON_STRING_H
extern bool pg_str_endswith(const char* str, const char* end);
#endif /* COMMON_STRING_H */

View File

@ -33,6 +33,7 @@ typedef struct {
extern XLogRecPtr XlogCopyStartPtr;
extern void SendBaseBackup(BaseBackupCmd* cmd);
extern int64 sendTablespace(const char* path, bool sizeonly);
extern bool is_row_data_file(const char* filePath, int* segNo);
/* ut test */

View File

@ -235,6 +235,18 @@ extern void TimestampDifference(TimestampTz start_time, TimestampTz stop_time, l
extern int ComputeTimeStamp(TimestampTz start);
extern bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec);
/*
* Prototypes for functions to deal with integer timestamps, when the native
* format is float timestamps.
*/
#ifndef HAVE_INT64_TIMESTAMP
extern int64 GetCurrentIntegerTimestamp(void);
extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
#else
#define GetCurrentIntegerTimestamp() GetCurrentTimestamp()
#define IntegerTimestampToTimestampTz(timestamp) (timestamp)
#endif
extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
extern pg_time_t timestamptz_to_time_t(TimestampTz t);