support logical decoding on standby
This commit is contained in:
@ -49,7 +49,7 @@ static bool do_drop_slot = false;
|
||||
static char** options;
|
||||
static size_t noptions = 0;
|
||||
static bool g_change_plugin = false;
|
||||
char* plugin = "mppdb_decoding";
|
||||
static const char* plugin = NULL;
|
||||
|
||||
/* Global State */
|
||||
static int outfd = -1;
|
||||
@ -805,9 +805,8 @@ static void process_free_option(void)
|
||||
if (dbuser != NULL) {
|
||||
pfree_ext(dbuser);
|
||||
}
|
||||
if (g_change_plugin) {
|
||||
if (g_change_plugin || plugin != NULL) {
|
||||
pfree_ext(plugin);
|
||||
} else if (plugin != NULL) {
|
||||
plugin = NULL;
|
||||
}
|
||||
if (replication_slot != NULL) {
|
||||
@ -827,6 +826,7 @@ int main(int argc, char** argv)
|
||||
{
|
||||
PGresult* res = NULL;
|
||||
progname = get_progname("pg_recvlogical");
|
||||
plugin = pg_strdup("mppdb_decoding");
|
||||
int rc = 0;
|
||||
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_recvlogical"));
|
||||
|
||||
|
@ -442,6 +442,7 @@ static const TagStr g_tagStrArr[] = {{T_Invalid, "Invalid"},
|
||||
{T_CreateReplicationSlotCmd, "CreateReplicationSlotCmd"},
|
||||
{T_DropReplicationSlotCmd, "DropReplicationSlotCmd"},
|
||||
{T_StartReplicationCmd, "StartReplicationCmd"},
|
||||
{T_AdvanceReplicationCmd, "AdvanceReplicationCmd"},
|
||||
{T_StartDataReplicationCmd, "StartDataReplicationCmd"},
|
||||
{T_FetchMotCheckpointCmd, "FetchMotCheckpointCmd"},
|
||||
{T_TriggerData, "TriggerData"},
|
||||
|
@ -38,11 +38,13 @@
|
||||
#include "access/xlogdefs.h"
|
||||
#include "access/transam.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "libpq/libpq-int.h"
|
||||
|
||||
#include "replication/decode.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/reorderbuffer.h"
|
||||
#include "replication/snapbuild.h"
|
||||
#include "replication/walreceiver.h"
|
||||
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procarray.h"
|
||||
@ -901,3 +903,208 @@ void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
|
||||
}
|
||||
LWLockRelease(LogicalReplicationSlotPersistentDataLock);
|
||||
}
|
||||
|
||||
/* Connect primary to advance logical replication slot. */
|
||||
void LogicalAdvanceConnect()
|
||||
{
|
||||
char conninfoRepl[MAXCONNINFO + 75];
|
||||
char conninfo[MAXCONNINFO];
|
||||
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
|
||||
PGresult* res = NULL;
|
||||
int count = 0;
|
||||
int retryNum = 10;
|
||||
uint32 remoteSversion;
|
||||
uint32 localSversion;
|
||||
char *remotePversion = NULL;
|
||||
char *localPversion = NULL;
|
||||
uint32 remoteTerm;
|
||||
uint32 localTerm;
|
||||
errno_t rc = 0;
|
||||
int nRet = 0;
|
||||
|
||||
rc = memset_s(conninfo, MAXCONNINFO, 0, MAXCONNINFO);
|
||||
securec_check(rc, "\0", "\0");
|
||||
|
||||
/* Fetch information required to start streaming */
|
||||
rc = strncpy_s(conninfo, MAXCONNINFO, (char *)walrcv->conninfo, MAXCONNINFO - 1);
|
||||
securec_check(rc, "\0", "\0");
|
||||
|
||||
nRet = snprintf_s(conninfoRepl, sizeof(conninfoRepl), sizeof(conninfoRepl) - 1,
|
||||
"%s dbname=replication replication=true "
|
||||
"fallback_application_name=%s "
|
||||
"connect_timeout=%d",
|
||||
conninfo,
|
||||
(u_sess->attr.attr_common.application_name &&
|
||||
strlen(u_sess->attr.attr_common.application_name) > 0) ?
|
||||
u_sess->attr.attr_common.application_name : "pg_recvlogical_sender",
|
||||
u_sess->attr.attr_storage.wal_receiver_connect_timeout);
|
||||
securec_check_ss(nRet, "", "");
|
||||
|
||||
retry:
|
||||
/* 1. try to connect to primary */
|
||||
t_thrd.walsender_cxt.advancePrimaryConn = PQconnectdb(conninfoRepl);
|
||||
if (PQstatus(t_thrd.walsender_cxt.advancePrimaryConn) != CONNECTION_OK) {
|
||||
if (++count < retryNum) {
|
||||
ereport(LOG,
|
||||
(errmsg("pg_recvlogical_sender could not connect to the remote server, "
|
||||
"the connection info :%s : %s",
|
||||
conninfo, PQerrorMessage(t_thrd.walsender_cxt.advancePrimaryConn))));
|
||||
|
||||
PQfinish(t_thrd.walsender_cxt.advancePrimaryConn);
|
||||
t_thrd.walsender_cxt.advancePrimaryConn = NULL;
|
||||
|
||||
/* sleep 0.1 s */
|
||||
pg_usleep(100000L);
|
||||
goto retry;
|
||||
}
|
||||
ereport(FATAL,
|
||||
(errmsg("pg_recvlogical_sender could not connect to the remote server, "
|
||||
"we have tried %d times, the connection info :%s : %s",
|
||||
count, conninfo, PQerrorMessage(t_thrd.walsender_cxt.advancePrimaryConn))));
|
||||
}
|
||||
|
||||
/* 2. identify version */
|
||||
res = PQexec(t_thrd.walsender_cxt.advancePrimaryConn, "IDENTIFY_VERSION");
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
|
||||
PQclear(res);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("could not receive database system version and protocol "
|
||||
"version from the remote server: %s",
|
||||
PQerrorMessage(t_thrd.walsender_cxt.advancePrimaryConn))));
|
||||
|
||||
return;
|
||||
}
|
||||
if (PQnfields(res) != 3 || PQntuples(res) != 1) {
|
||||
int numTuples = PQntuples(res);
|
||||
int numFields = PQnfields(res);
|
||||
|
||||
PQclear(res);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("invalid response from remote server"),
|
||||
errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
|
||||
numTuples, numFields)));
|
||||
|
||||
return;
|
||||
}
|
||||
remoteSversion = pg_strtoint32(PQgetvalue(res, 0, 0));
|
||||
localSversion = PG_VERSION_NUM;
|
||||
remotePversion = PQgetvalue(res, 0, 1);
|
||||
localPversion = pstrdup(PG_PROTOCOL_VERSION);
|
||||
remoteTerm = pg_strtoint32(PQgetvalue(res, 0, 2));
|
||||
localTerm = Max(g_instance.comm_cxt.localinfo_cxt.term_from_file,
|
||||
g_instance.comm_cxt.localinfo_cxt.term_from_xlog);
|
||||
ereport(LOG, (errmsg("remote term[%u], local term[%u]", remoteTerm, localTerm)));
|
||||
|
||||
if (remoteSversion != localSversion ||
|
||||
strncmp(remotePversion, localPversion, strlen(PG_PROTOCOL_VERSION)) != 0) {
|
||||
PQclear(res);
|
||||
|
||||
if (remoteSversion != localSversion) {
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("database system version is different between the remote and local"),
|
||||
errdetail("The remote's system version is %u, the local's system version is %u.",
|
||||
remoteSversion, localSversion)));
|
||||
} else {
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("the remote protocal version %s is not the same as "
|
||||
"the local protocal version %s.",
|
||||
remotePversion, localPversion)));
|
||||
}
|
||||
|
||||
if (localPversion != NULL) {
|
||||
pfree(localPversion);
|
||||
localPversion = NULL;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
|
||||
/* 3. connect to primary, check remote role */
|
||||
res = PQexec(t_thrd.walsender_cxt.advancePrimaryConn, "IDENTIFY_MODE");
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
|
||||
PQclear(res);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("could not receive the ongoing mode infomation from "
|
||||
"the remote server: %s",
|
||||
PQerrorMessage(t_thrd.walsender_cxt.advancePrimaryConn))));
|
||||
|
||||
return;
|
||||
}
|
||||
if (PQnfields(res) != 1 || PQntuples(res) != 1) {
|
||||
int numTuples = PQntuples(res);
|
||||
int numFields = PQnfields(res);
|
||||
|
||||
PQclear(res);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("invalid response from remote server"),
|
||||
errdetail("Expected 1 tuple with 1 fields, got %d tuples with %d fields.",
|
||||
numTuples, numFields)));
|
||||
|
||||
return;
|
||||
}
|
||||
ServerMode remoteMode = (ServerMode)pg_strtoint32(PQgetvalue(res, 0, 0));
|
||||
if (!t_thrd.walreceiver_cxt.AmWalReceiverForFailover && remoteMode != PRIMARY_MODE) {
|
||||
PQclear(res);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("the mode of the remote server must be primary, current is %s",
|
||||
wal_get_role_string(remoteMode))));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
/* Clean the connection for advance logical replication slot. */
|
||||
void CloseLogicalAdvanceConnect()
|
||||
{
|
||||
if (t_thrd.walsender_cxt.advancePrimaryConn != NULL) {
|
||||
PQfinish(t_thrd.walsender_cxt.advancePrimaryConn);
|
||||
t_thrd.walsender_cxt.advancePrimaryConn = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* Notify the primary to advance logical replication slot. */
|
||||
void NotifyPrimaryAdvance(XLogRecPtr flush)
|
||||
{
|
||||
char query[256];
|
||||
PGresult* res = NULL;
|
||||
int nRet = 0;
|
||||
|
||||
nRet = snprintf_s(query, sizeof(query), sizeof(query) - 1,
|
||||
"ADVANCE_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
|
||||
NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name),
|
||||
(uint32)(flush >> 32),
|
||||
(uint32)flush);
|
||||
|
||||
securec_check_ss_c(nRet, "\0", "\0");
|
||||
|
||||
if (t_thrd.walsender_cxt.advancePrimaryConn == NULL) {
|
||||
LogicalAdvanceConnect();
|
||||
}
|
||||
res = PQexec(t_thrd.walsender_cxt.advancePrimaryConn, query);
|
||||
if (PQresultStatus(res) != PGRES_TUPLES_OK) {
|
||||
PQclear(res);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("could not send replication command \"%s\": %s\n",
|
||||
query, PQerrorMessage(t_thrd.walsender_cxt.advancePrimaryConn))));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (PQnfields(res) != 2 || PQntuples(res) != 1) {
|
||||
int numTuples = PQntuples(res);
|
||||
int numFields = PQnfields(res);
|
||||
|
||||
PQclear(res);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
|
||||
errmsg("invalid response from remote server"),
|
||||
errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
|
||||
numTuples, numFields)));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
PQclear(res);
|
||||
}
|
||||
|
@ -92,6 +92,7 @@
|
||||
%token K_DATA
|
||||
%token K_START_REPLICATION
|
||||
%token K_FETCH_MOT_CHECKPOINT
|
||||
%token K_ADVANCE_REPLICATION
|
||||
%token K_CREATE_REPLICATION_SLOT
|
||||
%token K_DROP_REPLICATION_SLOT
|
||||
%token K_PHYSICAL
|
||||
@ -99,7 +100,7 @@
|
||||
%token K_SLOT
|
||||
|
||||
%type <node> command
|
||||
%type <node> base_backup start_replication start_data_replication fetch_mot_checkpoint start_logical_replication identify_system identify_version identify_mode identify_consistence create_replication_slot drop_replication_slot identify_maxlsn identify_channel identify_az
|
||||
%type <node> base_backup start_replication start_data_replication fetch_mot_checkpoint start_logical_replication advance_logical_replication identify_system identify_version identify_mode identify_consistence create_replication_slot drop_replication_slot identify_maxlsn identify_channel identify_az
|
||||
%type <list> base_backup_opt_list
|
||||
%type <defelt> base_backup_opt
|
||||
%type <list> plugin_options plugin_opt_list
|
||||
@ -128,6 +129,7 @@ command:
|
||||
| start_data_replication
|
||||
| fetch_mot_checkpoint
|
||||
| start_logical_replication
|
||||
| advance_logical_replication
|
||||
| create_replication_slot
|
||||
| drop_replication_slot
|
||||
| identify_maxlsn
|
||||
@ -315,6 +317,19 @@ start_logical_replication:
|
||||
}
|
||||
;
|
||||
|
||||
/* ADVANCE_REPLICATION SLOT slot LOGICAL %X/%X */
|
||||
advance_logical_replication:
|
||||
K_ADVANCE_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR
|
||||
{
|
||||
AdvanceReplicationCmd *cmd;
|
||||
cmd = makeNode(AdvanceReplicationCmd);
|
||||
cmd->kind = REPLICATION_KIND_LOGICAL;;
|
||||
cmd->slotname = $3;
|
||||
cmd->restartpoint = $5;
|
||||
$$ = (Node *) cmd;
|
||||
}
|
||||
;
|
||||
|
||||
/* CREATE_REPLICATION_SLOT SLOT slot [%X/%X] */
|
||||
create_replication_slot:
|
||||
/* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL [init_slot_lsn] */
|
||||
|
@ -100,6 +100,7 @@ WAL { return K_WAL; }
|
||||
TABLESPACE_MAP { return K_TABLESPACE_MAP; }
|
||||
DATA { return K_DATA; }
|
||||
START_REPLICATION { return K_START_REPLICATION; }
|
||||
ADVANCE_REPLICATION { return K_ADVANCE_REPLICATION; }
|
||||
CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; }
|
||||
DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; }
|
||||
PHYSICAL { return K_PHYSICAL; }
|
||||
|
@ -417,7 +417,11 @@ void ReplicationSlotAcquire(const char *name, bool isDummyStandby)
|
||||
if (slot == NULL)
|
||||
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name)));
|
||||
if (active) {
|
||||
if (slot->data.database != InvalidOid || isDummyStandby != slot->data.isDummyStandby)
|
||||
if ((slot->data.database != InvalidOid || isDummyStandby != slot->data.isDummyStandby)
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
&& !RecoveryInProgress()
|
||||
#endif
|
||||
)
|
||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is already active", name)));
|
||||
else {
|
||||
ereport(WARNING,
|
||||
@ -435,6 +439,31 @@ void ReplicationSlotAcquire(const char *name, bool isDummyStandby)
|
||||
t_thrd.slot_cxt.MyReplicationSlot = slot;
|
||||
}
|
||||
|
||||
/*
|
||||
* Find out if we have an active slot by slot name
|
||||
*/
|
||||
bool IsReplicationSlotActive(const char *name)
|
||||
{
|
||||
bool active = false;
|
||||
|
||||
Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL);
|
||||
|
||||
ReplicationSlotValidateName(name, ERROR);
|
||||
|
||||
/* Search for the named slot to identify whether it is active or not. */
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
|
||||
for (int i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) {
|
||||
ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i];
|
||||
if (s->active && strcmp(name, NameStr(s->data.name)) == 0) {
|
||||
active = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
return active;
|
||||
}
|
||||
|
||||
/*
|
||||
* Find out if we have a slot by slot name
|
||||
*/
|
||||
|
@ -91,7 +91,13 @@ void log_slot_advance(const ReplicationSlotPersistentData *slotInfo)
|
||||
XLogFlush(Ptr);
|
||||
if (g_instance.attr.attr_storage.max_wal_senders > 0)
|
||||
WalSndWakeup();
|
||||
|
||||
END_CRIT_SECTION();
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
if (u_sess->attr.attr_storage.guc_synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
|
||||
SyncRepWaitForLSN(Ptr);
|
||||
#endif
|
||||
}
|
||||
|
||||
void log_slot_drop(const char *name)
|
||||
@ -829,6 +835,17 @@ void redo_slot_advance(const ReplicationSlotPersistentData *slotInfo)
|
||||
|
||||
Assert(!t_thrd.slot_cxt.MyReplicationSlot);
|
||||
|
||||
/*
|
||||
* If logical replication slot is active on the current standby, the current
|
||||
* standby notify the primary to advance the logical replication slot.
|
||||
* Thus, we do not redo the slot_advance log.
|
||||
*/
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
if (IsReplicationSlotActive(NameStr(slotInfo->name))) {
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Acquire the slot so we "own" it */
|
||||
ReplicationSlotAcquire(NameStr(slotInfo->name), false);
|
||||
rc = memcpy_s(&t_thrd.slot_cxt.MyReplicationSlot->data, sizeof(ReplicationSlotPersistentData), slotInfo,
|
||||
|
@ -160,6 +160,7 @@ static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
|
||||
static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
|
||||
static void StartReplication(StartReplicationCmd *cmd);
|
||||
static void StartLogicalReplication(StartReplicationCmd *cmd);
|
||||
static void AdvanceLogicalReplication(AdvanceReplicationCmd *cmd);
|
||||
static void ProcessStandbyMessage(void);
|
||||
static void ProcessStandbyReplyMessage(void);
|
||||
static void ProcessStandbyHSFeedbackMessage(void);
|
||||
@ -347,10 +348,12 @@ int WalSenderMain(void)
|
||||
/* check PMstate and RecoveryInProgress */
|
||||
void CheckPMstateAndRecoveryInProgress(void)
|
||||
{
|
||||
#ifdef ENABLE_MULTIPLE_NODES
|
||||
if (!PMstateIsRun() || RecoveryInProgress()) {
|
||||
ereport(ERROR, (errcode(ERRCODE_LOGICAL_DECODE_ERROR),
|
||||
errmsg("can't decode in pmState is not run or recovery in progress.")));
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1384,6 +1387,115 @@ static void StartLogicalReplication(StartReplicationCmd *cmd)
|
||||
EndCommand("COPY 0", DestRemote);
|
||||
}
|
||||
|
||||
/*
|
||||
* Notify the primary to advance logical replication slot.
|
||||
*/
|
||||
static void AdvanceLogicalReplication(AdvanceReplicationCmd *cmd)
|
||||
{
|
||||
StringInfoData buf;
|
||||
XLogRecPtr flushRecPtr;
|
||||
XLogRecPtr minLsn;
|
||||
char xpos[MAXFNAMELEN];
|
||||
int rc = 0;
|
||||
|
||||
if (RecoveryInProgress()) {
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_OPERATION),
|
||||
errmsg("couldn't advance in recovery")));
|
||||
}
|
||||
|
||||
Assert(!t_thrd.slot_cxt.MyReplicationSlot);
|
||||
|
||||
/*
|
||||
* We can't move slot past what's been flushed so clamp the target
|
||||
* possition accordingly.
|
||||
*/
|
||||
flushRecPtr = GetFlushRecPtr();
|
||||
if (XLByteLT(flushRecPtr, cmd->restartpoint)) {
|
||||
cmd->restartpoint = flushRecPtr;
|
||||
}
|
||||
|
||||
if (XLogRecPtrIsInvalid(cmd->restartpoint)) {
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid target wal lsn while advancing "
|
||||
"logical replication restart lsn.")));
|
||||
}
|
||||
|
||||
/* Acquire the slot so we "own" it */
|
||||
ReplicationSlotAcquire(cmd->slotname, false);
|
||||
|
||||
Assert(OidIsValid(t_thrd.slot_cxt.MyReplicationSlot->data.database));
|
||||
|
||||
/*
|
||||
* Check if the slot is not moving backwards. Logical slots have confirmed
|
||||
* consumption up to confirmed_lsn, meaning that data older than that is
|
||||
* not available anymore.
|
||||
*/
|
||||
minLsn = t_thrd.slot_cxt.MyReplicationSlot->data.confirmed_flush;
|
||||
if (XLByteLT(cmd->restartpoint, minLsn)) {
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot move slot to %X/%X, minimum is %X/%X",
|
||||
(uint32)(cmd->restartpoint >> 32), (uint32)cmd->restartpoint,
|
||||
(uint32)(minLsn >> 32), (uint32)(minLsn))));
|
||||
}
|
||||
|
||||
LogicalConfirmReceivedLocation(cmd->restartpoint);
|
||||
ReplicationSlotMarkDirty();
|
||||
log_slot_advance(&t_thrd.slot_cxt.MyReplicationSlot->data);
|
||||
|
||||
if (log_min_messages <= DEBUG2) {
|
||||
ereport(LOG, (errmsg("AdvanceLogicalReplication, slotname = %s, endpoint = %X/%X.",
|
||||
cmd->slotname,
|
||||
(uint32)(cmd->restartpoint >> 32),
|
||||
(uint32)cmd->restartpoint)));
|
||||
}
|
||||
|
||||
rc = snprintf_s(xpos, sizeof(xpos), sizeof(xpos) - 1,
|
||||
"%X/%X", (uint32)(cmd->restartpoint >> 32), (uint32)cmd->restartpoint);
|
||||
securec_check_ss(rc, "\0", "\0");
|
||||
|
||||
pq_beginmessage(&buf, 'T');
|
||||
pq_sendint16(&buf, 2); /* 2 field */
|
||||
|
||||
/* first field: slot name */
|
||||
pq_sendstring(&buf, "slot_name"); /* col name */
|
||||
pq_sendint32(&buf, 0); /* table oid */
|
||||
pq_sendint16(&buf, 0); /* attnum */
|
||||
pq_sendint32(&buf, TEXTOID); /* type oid */
|
||||
pq_sendint16(&buf, UINT16_MAX); /* typlen */
|
||||
pq_sendint32(&buf, 0); /* typmod */
|
||||
pq_sendint16(&buf, 0); /* format code */
|
||||
|
||||
/* second field: LSN at which we became consistent */
|
||||
pq_sendstring(&buf, "confirmed_flush"); /* col name */
|
||||
pq_sendint32(&buf, 0); /* table oid */
|
||||
pq_sendint16(&buf, 0); /* attnum */
|
||||
pq_sendint32(&buf, TEXTOID); /* type oid */
|
||||
pq_sendint16(&buf, UINT16_MAX); /* typlen */
|
||||
pq_sendint32(&buf, 0); /* typmod */
|
||||
pq_sendint16(&buf, 0); /* format code */
|
||||
pq_endmessage_noblock(&buf);
|
||||
|
||||
/* Send a DataRow message */
|
||||
pq_beginmessage(&buf, 'D');
|
||||
pq_sendint16(&buf, 2); /* # of columns */
|
||||
|
||||
/* slot_name */
|
||||
pq_sendint32(&buf, strlen(cmd->slotname)); /* col1 len */
|
||||
pq_sendbytes(&buf, cmd->slotname, strlen(cmd->slotname));
|
||||
|
||||
/* consistent wal location */
|
||||
pq_sendint32(&buf, strlen(xpos)); /* col2 len */
|
||||
pq_sendbytes(&buf, xpos, strlen(xpos));
|
||||
pq_endmessage_noblock(&buf);
|
||||
|
||||
/* Send CommandComplete and ReadyForQuery messages */
|
||||
EndCommand_noblock("SELECT", DestRemote);
|
||||
ReadyForQuery_noblock(DestRemote, u_sess->attr.attr_storage.wal_sender_timeout);
|
||||
/* ReadyForQuery did pq_flush_if_available for us */
|
||||
|
||||
ReplicationSlotRelease();
|
||||
}
|
||||
|
||||
/*
|
||||
* LogicalDecodingContext 'prepare_write' callback.
|
||||
*
|
||||
@ -1687,6 +1799,16 @@ static bool cmdStringLengthCheck(const char* cmd_string)
|
||||
strncmp(cmd_string, "DROP_REPLICATION_SLOT", strlen("DROP_REPLICATION_SLOT")) == 0) {
|
||||
sub_cmd = strtok_r(comd, " ", &rm_cmd);
|
||||
slot_name = strtok_r(NULL, " ", &rm_cmd);
|
||||
/* ADVANCE_REPLICATION SLOT slotname LOGICAL %X/%X */
|
||||
} else if (cmd_length > strlen("ADVANCE_REPLICATION") &&
|
||||
strncmp(cmd_string, "ADVANCE_REPLICATION", strlen("ADVANCE_REPLICATION")) == 0) {
|
||||
sub_cmd = strtok_r(comd, " ", &rm_cmd);
|
||||
sub_cmd = strtok_r(NULL, " ", &rm_cmd);
|
||||
if (strlen(sub_cmd) != strlen("SLOT") ||
|
||||
strncmp(sub_cmd, "SLOT", strlen("SLOT")) != 0) {
|
||||
return false;
|
||||
}
|
||||
slot_name = strtok_r(NULL, " ", &rm_cmd);
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
@ -1767,6 +1889,14 @@ static void IdentifyCommand(Node* cmd_node, bool* replication_started, const cha
|
||||
break;
|
||||
}
|
||||
|
||||
case T_AdvanceReplicationCmd: {
|
||||
AdvanceReplicationCmd *cmd = (AdvanceReplicationCmd *)cmd_node;
|
||||
if (cmd->kind == REPLICATION_KIND_LOGICAL) {
|
||||
AdvanceLogicalReplication(cmd);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
#ifdef ENABLE_MOT
|
||||
case T_FetchMotCheckpointCmd:
|
||||
PerformMotCheckpointFetch();
|
||||
@ -2171,6 +2301,10 @@ static void ProcessStandbyReplyMessage(void)
|
||||
if (t_thrd.slot_cxt.MyReplicationSlot && (!XLByteEQ(reply.flush, InvalidXLogRecPtr))) {
|
||||
if (t_thrd.slot_cxt.MyReplicationSlot->data.database != InvalidOid) {
|
||||
LogicalConfirmReceivedLocation(reply.flush);
|
||||
if (RecoveryInProgress() && OidIsValid(t_thrd.slot_cxt.MyReplicationSlot->data.database)) {
|
||||
/* Notify the primary to advance logical slot location */
|
||||
NotifyPrimaryAdvance(reply.flush);
|
||||
}
|
||||
} else {
|
||||
PhysicalConfirmReceivedLocation(reply.flush);
|
||||
}
|
||||
@ -3476,6 +3610,9 @@ static void WalSndKill(int code, Datum arg)
|
||||
|
||||
Assert(walsnd != NULL);
|
||||
|
||||
/* Clean the connection for advance logical replication slot. */
|
||||
CloseLogicalAdvanceConnect();
|
||||
|
||||
/*
|
||||
* Clear MyWalSnd first; then disown the latch. This is so that signal
|
||||
* handlers won't try to touch the latch after it's no longer ours.
|
||||
|
@ -2188,6 +2188,8 @@ typedef struct knl_t_walsender_context {
|
||||
int remotePort;
|
||||
/* Have we caught up with primary? */
|
||||
bool walSndCaughtUp;
|
||||
/* Notify primary to advance logical replication slot. */
|
||||
struct pg_conn* advancePrimaryConn;
|
||||
} knl_t_walsender_context;
|
||||
|
||||
typedef struct knl_t_walreceiverfuncs_context {
|
||||
|
@ -547,6 +547,7 @@ typedef enum NodeTag {
|
||||
T_CreateReplicationSlotCmd,
|
||||
T_DropReplicationSlotCmd,
|
||||
T_StartReplicationCmd,
|
||||
T_AdvanceReplicationCmd,
|
||||
T_StartDataReplicationCmd,
|
||||
T_FetchMotCheckpointCmd,
|
||||
|
||||
|
@ -95,6 +95,17 @@ typedef struct StartReplicationCmd {
|
||||
List* options;
|
||||
} StartReplicationCmd;
|
||||
|
||||
/* ----------------------
|
||||
* ADVANCE_REPLICATION command
|
||||
* ----------------------
|
||||
*/
|
||||
typedef struct AdvanceReplicationCmd {
|
||||
NodeTag type;
|
||||
ReplicationKind kind;
|
||||
char* slotname;
|
||||
XLogRecPtr restartpoint;
|
||||
} AdvanceReplicationCmd;
|
||||
|
||||
/* ----------------------
|
||||
* START_REPLICATION(DATA) command
|
||||
* ----------------------
|
||||
|
@ -102,4 +102,6 @@ extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
|
||||
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn);
|
||||
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
|
||||
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext* ctx, RepOriginId origin_id);
|
||||
extern void CloseLogicalAdvanceConnect();
|
||||
extern void NotifyPrimaryAdvance(XLogRecPtr flush);
|
||||
#endif
|
||||
|
@ -208,6 +208,7 @@ extern void ReplicationSlotCreate(const char* name, ReplicationSlotPersistency p
|
||||
extern void ReplicationSlotPersist(void);
|
||||
extern void ReplicationSlotDrop(const char* name, bool for_backup = false);
|
||||
extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby);
|
||||
extern bool IsReplicationSlotActive(const char *name);
|
||||
bool ReplicationSlotFind(const char* name);
|
||||
extern void ReplicationSlotRelease(void);
|
||||
extern void ReplicationSlotSave(void);
|
||||
|
Reference in New Issue
Block a user