@ -1301,6 +1301,7 @@ static void knl_t_walsender_init(knl_t_walsender_context* walsender_cxt)
|
||||
walsender_cxt->tmpbuf = (StringInfoData*)palloc0(sizeof(StringInfoData));
|
||||
walsender_cxt->remotePort = 0;
|
||||
walsender_cxt->walSndCaughtUp = false;
|
||||
walsender_cxt->advancePrimaryConn = NULL;
|
||||
walsender_cxt->last_check_timeout_timestamp = 0;
|
||||
}
|
||||
|
||||
|
@ -1063,15 +1063,17 @@ void CloseLogicalAdvanceConnect()
|
||||
}
|
||||
|
||||
/* Notify the primary to advance logical replication slot. */
|
||||
void NotifyPrimaryAdvance(XLogRecPtr flush)
|
||||
void NotifyPrimaryAdvance(XLogRecPtr restart, XLogRecPtr flush)
|
||||
{
|
||||
char query[256];
|
||||
PGresult* res = NULL;
|
||||
int nRet = 0;
|
||||
|
||||
nRet = snprintf_s(query, sizeof(query), sizeof(query) - 1,
|
||||
"ADVANCE_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
|
||||
"ADVANCE_REPLICATION SLOT \"%s\" LOGICAL %X/%X %X/%X",
|
||||
NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name),
|
||||
(uint32)(restart >> 32),
|
||||
(uint32)restart,
|
||||
(uint32)(flush >> 32),
|
||||
(uint32)flush);
|
||||
|
||||
|
@ -317,15 +317,16 @@ start_logical_replication:
|
||||
}
|
||||
;
|
||||
|
||||
/* ADVANCE_REPLICATION SLOT slot LOGICAL %X/%X options */
|
||||
/* ADVANCE_REPLICATION SLOT slot LOGICAL %X/%X %X/%X */
|
||||
advance_logical_replication:
|
||||
K_ADVANCE_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR
|
||||
K_ADVANCE_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR RECPTR
|
||||
{
|
||||
AdvanceReplicationCmd *cmd;
|
||||
cmd = makeNode(AdvanceReplicationCmd);
|
||||
cmd->kind = REPLICATION_KIND_LOGICAL;;
|
||||
cmd->slotname = $3;
|
||||
cmd->restartpoint = $5;
|
||||
cmd->restart_lsn = $5;
|
||||
cmd->confirmed_flush = $6;
|
||||
$$ = (Node *) cmd;
|
||||
}
|
||||
;
|
||||
|
@ -1426,11 +1426,11 @@ static void AdvanceLogicalReplication(AdvanceReplicationCmd *cmd)
|
||||
* possition accordingly.
|
||||
*/
|
||||
flushRecPtr = GetFlushRecPtr();
|
||||
if (XLByteLT(flushRecPtr, cmd->restartpoint)) {
|
||||
cmd->restartpoint = flushRecPtr;
|
||||
if (XLByteLT(flushRecPtr, cmd->confirmed_flush)) {
|
||||
cmd->confirmed_flush = flushRecPtr;
|
||||
}
|
||||
|
||||
if (XLogRecPtrIsInvalid(cmd->restartpoint)) {
|
||||
if (XLogRecPtrIsInvalid(cmd->confirmed_flush)) {
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("invalid target wal lsn while advancing "
|
||||
"logical replication restart lsn.")));
|
||||
@ -1447,26 +1447,36 @@ static void AdvanceLogicalReplication(AdvanceReplicationCmd *cmd)
|
||||
* not available anymore.
|
||||
*/
|
||||
minLsn = t_thrd.slot_cxt.MyReplicationSlot->data.confirmed_flush;
|
||||
if (XLByteLT(cmd->restartpoint, minLsn)) {
|
||||
if (XLByteLT(cmd->confirmed_flush, minLsn)) {
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot move slot to %X/%X, minimum is %X/%X",
|
||||
(uint32)(cmd->restartpoint >> 32), (uint32)cmd->restartpoint,
|
||||
(uint32)(cmd->confirmed_flush >> 32), (uint32)cmd->confirmed_flush,
|
||||
(uint32)(minLsn >> 32), (uint32)(minLsn))));
|
||||
}
|
||||
|
||||
LogicalConfirmReceivedLocation(cmd->restartpoint);
|
||||
LogicalConfirmReceivedLocation(cmd->confirmed_flush);
|
||||
|
||||
/* Advance the restart_lsn in primary. */
|
||||
volatile ReplicationSlot *slot = t_thrd.slot_cxt.MyReplicationSlot;
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
slot->data.restart_lsn = cmd->restart_lsn;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
ReplicationSlotMarkDirty();
|
||||
log_slot_advance(&t_thrd.slot_cxt.MyReplicationSlot->data);
|
||||
|
||||
if (log_min_messages <= DEBUG2) {
|
||||
ereport(LOG, (errmsg("AdvanceLogicalReplication, slotname = %s, endpoint = %X/%X.",
|
||||
ereport(LOG, (errmsg("AdvanceLogicalReplication, slotname = %s, restart_lsn = %X/%X, "
|
||||
"confirmed_flush = %X/%X.",
|
||||
cmd->slotname,
|
||||
(uint32)(cmd->restartpoint >> 32),
|
||||
(uint32)cmd->restartpoint)));
|
||||
(uint32)(cmd->restart_lsn >> 32),
|
||||
(uint32)cmd->restart_lsn,
|
||||
(uint32)(cmd->confirmed_flush >> 32),
|
||||
(uint32)cmd->confirmed_flush)));
|
||||
}
|
||||
|
||||
rc = snprintf_s(xpos, sizeof(xpos), sizeof(xpos) - 1,
|
||||
"%X/%X", (uint32)(cmd->restartpoint >> 32), (uint32)cmd->restartpoint);
|
||||
"%X/%X", (uint32)(cmd->confirmed_flush >> 32), (uint32)cmd->confirmed_flush);
|
||||
securec_check_ss(rc, "\0", "\0");
|
||||
|
||||
pq_beginmessage(&buf, 'T');
|
||||
@ -2249,7 +2259,7 @@ static void AdvanceReplicationSlot(XLogRecPtr flush)
|
||||
LogicalConfirmReceivedLocation(flush);
|
||||
if (RecoveryInProgress() && OidIsValid(t_thrd.slot_cxt.MyReplicationSlot->data.database)) {
|
||||
/* Notify the primary to advance logical slot location */
|
||||
NotifyPrimaryAdvance(flush);
|
||||
NotifyPrimaryAdvance(t_thrd.slot_cxt.MyReplicationSlot->data.restart_lsn, flush);
|
||||
}
|
||||
} else {
|
||||
PhysicalConfirmReceivedLocation(flush);
|
||||
|
@ -103,7 +103,8 @@ typedef struct AdvanceReplicationCmd {
|
||||
NodeTag type;
|
||||
ReplicationKind kind;
|
||||
char* slotname;
|
||||
XLogRecPtr restartpoint;
|
||||
XLogRecPtr restart_lsn;
|
||||
XLogRecPtr confirmed_flush;
|
||||
} AdvanceReplicationCmd;
|
||||
|
||||
/* ----------------------
|
||||
|
@ -103,5 +103,5 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRe
|
||||
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
|
||||
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext* ctx, RepOriginId origin_id);
|
||||
extern void CloseLogicalAdvanceConnect();
|
||||
extern void NotifyPrimaryAdvance(XLogRecPtr flush);
|
||||
extern void NotifyPrimaryAdvance(XLogRecPtr restart, XLogRecPtr flush);
|
||||
#endif
|
||||
|
Reference in New Issue
Block a user