diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 820af4e7a..21e3eade1 100644 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1301,6 +1301,7 @@ static void knl_t_walsender_init(knl_t_walsender_context* walsender_cxt) walsender_cxt->tmpbuf = (StringInfoData*)palloc0(sizeof(StringInfoData)); walsender_cxt->remotePort = 0; walsender_cxt->walSndCaughtUp = false; + walsender_cxt->advancePrimaryConn = NULL; walsender_cxt->last_check_timeout_timestamp = 0; } diff --git a/src/gausskernel/storage/replication/logical/logical.cpp b/src/gausskernel/storage/replication/logical/logical.cpp index 85eaffb9b..127c6e69d 100644 --- a/src/gausskernel/storage/replication/logical/logical.cpp +++ b/src/gausskernel/storage/replication/logical/logical.cpp @@ -1063,15 +1063,17 @@ void CloseLogicalAdvanceConnect() } /* Notify the primary to advance logical replication slot. */ -void NotifyPrimaryAdvance(XLogRecPtr flush) +void NotifyPrimaryAdvance(XLogRecPtr restart, XLogRecPtr flush) { char query[256]; PGresult* res = NULL; int nRet = 0; nRet = snprintf_s(query, sizeof(query), sizeof(query) - 1, - "ADVANCE_REPLICATION SLOT \"%s\" LOGICAL %X/%X", + "ADVANCE_REPLICATION SLOT \"%s\" LOGICAL %X/%X %X/%X", NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name), + (uint32)(restart >> 32), + (uint32)restart, (uint32)(flush >> 32), (uint32)flush); diff --git a/src/gausskernel/storage/replication/repl_gram.y b/src/gausskernel/storage/replication/repl_gram.y index 1045b72e6..de6c23ef3 100755 --- a/src/gausskernel/storage/replication/repl_gram.y +++ b/src/gausskernel/storage/replication/repl_gram.y @@ -317,15 +317,16 @@ start_logical_replication: } ; -/* ADVANCE_REPLICATION SLOT slot LOGICAL %X/%X options */ +/* ADVANCE_REPLICATION SLOT slot LOGICAL %X/%X %X/%X */ advance_logical_replication: - K_ADVANCE_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR + K_ADVANCE_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR RECPTR { AdvanceReplicationCmd *cmd; cmd = makeNode(AdvanceReplicationCmd); cmd->kind = REPLICATION_KIND_LOGICAL;; cmd->slotname = $3; - cmd->restartpoint = $5; + cmd->restart_lsn = $5; + cmd->confirmed_flush = $6; $$ = (Node *) cmd; } ; diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 181a938da..6987d6761 100644 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -1426,11 +1426,11 @@ static void AdvanceLogicalReplication(AdvanceReplicationCmd *cmd) * possition accordingly. */ flushRecPtr = GetFlushRecPtr(); - if (XLByteLT(flushRecPtr, cmd->restartpoint)) { - cmd->restartpoint = flushRecPtr; + if (XLByteLT(flushRecPtr, cmd->confirmed_flush)) { + cmd->confirmed_flush = flushRecPtr; } - if (XLogRecPtrIsInvalid(cmd->restartpoint)) { + if (XLogRecPtrIsInvalid(cmd->confirmed_flush)) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid target wal lsn while advancing " "logical replication restart lsn."))); @@ -1447,26 +1447,36 @@ static void AdvanceLogicalReplication(AdvanceReplicationCmd *cmd) * not available anymore. */ minLsn = t_thrd.slot_cxt.MyReplicationSlot->data.confirmed_flush; - if (XLByteLT(cmd->restartpoint, minLsn)) { + if (XLByteLT(cmd->confirmed_flush, minLsn)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot move slot to %X/%X, minimum is %X/%X", - (uint32)(cmd->restartpoint >> 32), (uint32)cmd->restartpoint, + (uint32)(cmd->confirmed_flush >> 32), (uint32)cmd->confirmed_flush, (uint32)(minLsn >> 32), (uint32)(minLsn)))); } - LogicalConfirmReceivedLocation(cmd->restartpoint); + LogicalConfirmReceivedLocation(cmd->confirmed_flush); + + /* Advance the restart_lsn in primary. */ + volatile ReplicationSlot *slot = t_thrd.slot_cxt.MyReplicationSlot; + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = cmd->restart_lsn; + SpinLockRelease(&slot->mutex); + ReplicationSlotMarkDirty(); log_slot_advance(&t_thrd.slot_cxt.MyReplicationSlot->data); if (log_min_messages <= DEBUG2) { - ereport(LOG, (errmsg("AdvanceLogicalReplication, slotname = %s, endpoint = %X/%X.", + ereport(LOG, (errmsg("AdvanceLogicalReplication, slotname = %s, restart_lsn = %X/%X, " + "confirmed_flush = %X/%X.", cmd->slotname, - (uint32)(cmd->restartpoint >> 32), - (uint32)cmd->restartpoint))); + (uint32)(cmd->restart_lsn >> 32), + (uint32)cmd->restart_lsn, + (uint32)(cmd->confirmed_flush >> 32), + (uint32)cmd->confirmed_flush))); } rc = snprintf_s(xpos, sizeof(xpos), sizeof(xpos) - 1, - "%X/%X", (uint32)(cmd->restartpoint >> 32), (uint32)cmd->restartpoint); + "%X/%X", (uint32)(cmd->confirmed_flush >> 32), (uint32)cmd->confirmed_flush); securec_check_ss(rc, "\0", "\0"); pq_beginmessage(&buf, 'T'); @@ -2249,7 +2259,7 @@ static void AdvanceReplicationSlot(XLogRecPtr flush) LogicalConfirmReceivedLocation(flush); if (RecoveryInProgress() && OidIsValid(t_thrd.slot_cxt.MyReplicationSlot->data.database)) { /* Notify the primary to advance logical slot location */ - NotifyPrimaryAdvance(flush); + NotifyPrimaryAdvance(t_thrd.slot_cxt.MyReplicationSlot->data.restart_lsn, flush); } } else { PhysicalConfirmReceivedLocation(flush); diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 0d7d90581..20cfdb95b 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -103,7 +103,8 @@ typedef struct AdvanceReplicationCmd { NodeTag type; ReplicationKind kind; char* slotname; - XLogRecPtr restartpoint; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_flush; } AdvanceReplicationCmd; /* ---------------------- diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 6ed8dd051..909f3af5a 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -103,5 +103,5 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRe extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext* ctx, RepOriginId origin_id); extern void CloseLogicalAdvanceConnect(); -extern void NotifyPrimaryAdvance(XLogRecPtr flush); +extern void NotifyPrimaryAdvance(XLogRecPtr restart, XLogRecPtr flush); #endif