!157 PITR支持恢复到指定LSN

Merge pull request !157 from victor_zhc/dev
This commit is contained in:
opengauss-bot
2020-08-29 17:07:55 +08:00
committed by Gitee
8 changed files with 118 additions and 2 deletions

View File

@ -32,7 +32,7 @@ OBJS = acl.o arrayfuncs.o array_selfuncs.o array_typanalyze.o \
rowtypes.o regexp.o regproc.o ruleutils.o selfuncs.o \
tid.o timestamp.o varbit.o varchar.o varlena.o version.o xid.o \
network.o mac.o inet_cidr_ntop.o inet_net_pton.o \
ri_triggers.o pg_lzcompress.o pg_locale.o formatting.o \
ri_triggers.o pg_lzcompress.o pg_lsn.o pg_locale.o formatting.o \
ascii.o quote.o pgstatfuncs.o encode.o dbsize.o genfile.o trigfuncs.o \
tsginidx.o tsgistidx.o tsquery.o tsquery_cleanup.o tsquery_gist.o \
tsquery_op.o tsquery_rewrite.o tsquery_util.o tsrank.o \

View File

@ -0,0 +1,51 @@
/* -----------------------------------------------------------------------
*
* PostgreSQL locale utilities
*
* Portions Copyright (c) 2002-2012, PostgreSQL Global Development Group
*
* src/backend/utils/adt/pg_lsn.c
*
* -----------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/hash.h"
#include "funcapi.h"
#include "libpq/pqformat.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#define MAXPG_LSNCOMPONENT 8
/*----------------------------------------------------------
* Formatting and conversion routines.
*---------------------------------------------------------*/
Datum pg_lsn_in(PG_FUNCTION_ARGS)
{
char* str = PG_GETARG_CSTRING(0);
int len1, len2;
uint32 id, off;
XLogRecPtr result;
/* Sanity check input format. */
len1 = strspn(str, "0123456789abcdefABCDEF");
if (len1 < 1 || len1 > MAXPG_LSNCOMPONENT || str[len1] != '/')
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"", "pg_lsn", str)));
len2 = strspn(str + len1 + 1, "0123456789abcdefABCDEF");
if (len2 < 1 || len2 > MAXPG_LSNCOMPONENT || str[len1 + 1 + len2] != '\0')
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
errmsg("invalid input syntax for type %s: \"%s\"", "pg_lsn", str)));
/* Decode result. */
id = (uint32)strtoul(str, NULL, 16);
off = (uint32)strtoul(str + len1 + 1, NULL, 16);
result = ((uint64)id << 32) | off;
PG_RETURN_LSN(result);
}

View File

@ -405,6 +405,7 @@ static void knl_t_xlog_init(knl_t_xlog_context* xlog_cxt)
xlog_cxt->recoveryTargetTime = 0;
xlog_cxt->recoveryTargetBarrierId = NULL;
xlog_cxt->recoveryTargetName = NULL;
xlog_cxt->recoveryTargetLSN = InvalidXLogRecPtr;
xlog_cxt->StandbyModeRequested = false;
xlog_cxt->PrimaryConnInfo = NULL;
xlog_cxt->TriggerFile = NULL;
@ -412,6 +413,7 @@ static void knl_t_xlog_init(knl_t_xlog_context* xlog_cxt)
xlog_cxt->recoveryTriggered = false;
xlog_cxt->recoveryStopXid = InvalidTransactionId;
xlog_cxt->recoveryStopTime = 0;
xlog_cxt->recoveryStopLSN = InvalidXLogRecPtr;
rc = memset_s(xlog_cxt->recoveryStopName, MAXFNAMELEN * sizeof(char), 0, MAXFNAMELEN * sizeof(char));
securec_check(rc, "\0", "\0");
xlog_cxt->recoveryStopAfter = false;

View File

@ -79,6 +79,8 @@
#
#recovery_target_xid = ''
#
#recovery_target_lsn = '' # e.g. '0/FFFFFFF'
#
#recovery_target_barrier = ''
#
#recovery_target_inclusive = true

View File

@ -101,6 +101,7 @@
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/relmapper.h"
#include "utils/snapmgr.h"
@ -6987,6 +6988,27 @@ static void readRecoveryCommandFile(void)
}
ereport(DEBUG2, (errmsg_internal("recovery_target_name = '%s'", t_thrd.xlog_cxt.recoveryTargetName)));
} else if (strcmp(item->name, "recovery_target_lsn") == 0) {
/*
* if recovery_target_xid or recovery_target_name or recovery_target_time
* specified, then this overrides recovery_target_lsn
*/
if (t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_XID ||
t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_NAME ||
t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_TIME) {
continue;
}
t_thrd.xlog_cxt.recoveryTarget = RECOVERY_TARGET_LSN;
/*
* Convert the LSN string given by the user to XLogRecPtr form.
*/
t_thrd.xlog_cxt.recoveryTargetLSN = DatumGetLSN(DirectFunctionCall3(
pg_lsn_in, CStringGetDatum(item->value), ObjectIdGetDatum(InvalidOid), Int32GetDatum(-1)));
ereport(DEBUG2,
(errmsg_internal("recovery_target_lsn = '%X/%X'",
(uint32)(t_thrd.xlog_cxt.recoveryTargetLSN >> 32),
(uint32)t_thrd.xlog_cxt.recoveryTargetLSN)));
} else if (strcmp(item->name, "recovery_target_inclusive") == 0) {
// does nothing if a recovery_target is not also set
if (!parse_bool(item->value, &t_thrd.xlog_cxt.recoveryTargetInclusive)) {
@ -7362,6 +7384,32 @@ static bool recoveryStopsHere(XLogReaderState* record, bool* includeThis)
return false;
}
/* Check if target LSN has been reached */
if (t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_LSN &&
record->ReadRecPtr >= t_thrd.xlog_cxt.recoveryTargetLSN) {
*includeThis = t_thrd.xlog_cxt.recoveryTargetInclusive;
t_thrd.xlog_cxt.recoveryStopAfter = *includeThis;
t_thrd.xlog_cxt.recoveryStopXid = InvalidTransactionId;
t_thrd.xlog_cxt.recoveryStopLSN = record->ReadRecPtr;
t_thrd.xlog_cxt.recoveryStopTime = 0;
t_thrd.xlog_cxt.recoveryStopName[0] = '\0';
if (t_thrd.xlog_cxt.recoveryStopAfter) {
ereport(LOG,
(errmsg("recovery stopping after WAL location (LSN) \"%X/%X\"",
(uint32)(t_thrd.xlog_cxt.recoveryStopLSN >> 32),
(uint32)t_thrd.xlog_cxt.recoveryStopLSN)));
} else {
ereport(LOG,
(errmsg("recovery stopping before WAL location (LSN) \"%X/%X\"",
(uint32)(t_thrd.xlog_cxt.recoveryStopLSN >> 32),
(uint32)t_thrd.xlog_cxt.recoveryStopLSN)));
}
return true;
}
/* Do we have a PITR target at all? */
if (t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_UNSET) {
// Save timestamp of latest transaction commit/abort if this is a
@ -7442,6 +7490,7 @@ static bool recoveryStopsHere(XLogReaderState* record, bool* includeThis)
t_thrd.xlog_cxt.recoveryStopXid = XLogRecGetXid(record);
t_thrd.xlog_cxt.recoveryStopTime = recordXtime;
t_thrd.xlog_cxt.recoveryStopAfter = *includeThis;
t_thrd.xlog_cxt.recoveryStopLSN = InvalidXLogRecPtr;
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT) {
if (t_thrd.xlog_cxt.recoveryStopAfter)
@ -8034,6 +8083,11 @@ void StartupXLOG(void)
#endif
else if (t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_NAME) {
ereport(LOG, (errmsg("starting point-in-time recovery to \"%s\"", t_thrd.xlog_cxt.recoveryTargetName)));
} else if (t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_LSN) {
ereport(LOG,
(errmsg("starting point-in-time recovery to WAL location (LSN) \"%X/%X\"",
(uint32)(t_thrd.xlog_cxt.recoveryTargetLSN >> 32),
(uint32)t_thrd.xlog_cxt.recoveryTargetLSN)));
} else {
ereport(LOG, (errmsg("starting archive recovery")));
}

View File

@ -86,7 +86,8 @@ typedef enum {
RECOVERY_TARGET_UNSET,
RECOVERY_TARGET_XID,
RECOVERY_TARGET_TIME,
RECOVERY_TARGET_NAME
RECOVERY_TARGET_NAME,
RECOVERY_TARGET_LSN
#ifdef PGXC
,
RECOVERY_TARGET_BARRIER

View File

@ -453,6 +453,7 @@ typedef struct knl_t_xlog_context {
TimestampTz recoveryTargetTime;
char* recoveryTargetBarrierId;
char* recoveryTargetName;
XLogRecPtr recoveryTargetLSN;
/* options taken from recovery.conf for XLOG streaming */
bool StandbyModeRequested;
@ -468,6 +469,7 @@ typedef struct knl_t_xlog_context {
/* if recoveryStopsHere returns true, it saves actual stop xid/time/name here */
TransactionId recoveryStopXid;
TimestampTz recoveryStopTime;
XLogRecPtr recoveryStopLSN;
char recoveryStopName[MAXFNAMELEN];
bool recoveryStopAfter;

View File

@ -1502,6 +1502,10 @@ extern void encryptBlockOrCUData(
extern void decryptBlockOrCUData(
const char* cipherText, const size_t cipherLength, char* plainText, size_t* plainLength);
extern bool isEncryptedCluster();
/* pg_lsn.cpp */
extern Datum pg_lsn_in(PG_FUNCTION_ARGS);
// template function implementation
//
/* tsdb */