diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0528ac38d59..7aa2a354f2d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6542,8 +6542,18 @@ CreateCheckPoint(int flags) * according to synchronized LSNs of replication slots. The slot's LSN * might be advanced concurrently, so we call this before * CheckPointReplicationSlots() synchronizes replication slots. + * + * We acquire the Allocation lock to serialize the minimum LSN calculation + * with concurrent slot WAL reservation. This ensures that the WAL + * position being reserved is either included in the miminum LSN or is + * beyond or equal to the redo pointer of the current checkpoint (See + * ReplicationSlotReserveWal for details), thus preventing its removal by + * checkpoints. Note that this lock is required only during checkpoints + * where WAL removal is dictated by the slot's minimum LSN. */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + LWLockRelease(ReplicationSlotAllocationLock); /* * In some cases there are groups of actions that must all occur on one @@ -6716,7 +6726,10 @@ CreateCheckPoint(int flags) /* * Recalculate the current minimum LSN to be used in the WAL segment * cleanup. Then, we must synchronize the replication slots again in - * order to make this LSN safe to use. + * order to make this LSN safe to use. Here, we don't need to acquire + * the ReplicationSlotAllocationLock to serialize the minimum LSN + * computation with slot reservation as the RedoRecPtr is not updated + * after the previous computation of minimum LSN. */ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); CheckPointReplicationSlots(); @@ -7087,8 +7100,16 @@ CreateRestartPoint(int flags) * according to synchronized LSNs of replication slots. The slot's LSN * might be advanced concurrently, so we call this before * CheckPointReplicationSlots() synchronizes replication slots. + * + * We acquire the Allocation lock to serialize the minimum LSN calculation + * with concurrent slot WAL reservation. This ensures that the WAL + * position being reserved is either included in the miminum LSN or is + * beyond or equal to the redo pointer of the current checkpoint (See + * ReplicationSlotReserveWal for details). */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + LWLockRelease(ReplicationSlotAllocationLock); if (log_checkpoints) LogCheckpointStart(flags, true); @@ -7178,7 +7199,10 @@ CreateRestartPoint(int flags) /* * Recalculate the current minimum LSN to be used in the WAL segment * cleanup. Then, we must synchronize the replication slots again in - * order to make this LSN safe to use. + * order to make this LSN safe to use. Here, we don't need to acquire + * the ReplicationSlotAllocationLock to serialize the minimum LSN + * computation with slot reservation as the RedoRecPtr is not updated + * after the previous computation of minimum LSN. */ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); CheckPointReplicationSlots(); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1d022b76d07..1f4aad52c4a 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1213,71 +1213,73 @@ void ReplicationSlotReserveWal(void) { ReplicationSlot *slot = MyReplicationSlot; + XLogSegNo segno; + XLogRecPtr restart_lsn; Assert(slot != NULL); Assert(slot->data.restart_lsn == InvalidXLogRecPtr); /* - * The replication slot mechanism is used to prevent removal of required - * WAL. As there is no interlock between this routine and checkpoints, WAL - * segments could concurrently be removed when a now stale return value of - * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that - * this happens we'll just retry. + * The replication slot mechanism is used to prevent the removal of + * required WAL. + * + * Acquire an exclusive lock to prevent the checkpoint process from + * concurrently computing the minimum slot LSN (see the call to + * XLogGetReplicationSlotMinimumLSN in CreateCheckPoint). This ensures + * that the WAL reserved for replication cannot be removed during a + * checkpoint. + * + * The mechanism is reliable because if WAL reservation occurs first, the + * checkpoint must wait for the restart_lsn update before determining the + * minimum non-removable LSN. On the other hand, if the checkpoint happens + * first, subsequent WAL reservations will select positions at or beyond + * the redo pointer of that checkpoint. */ - while (true) + LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); + + /* + * For logical slots log a standby snapshot and start logical decoding at + * exactly that position. That allows the slot to start up more quickly. + * + * That's not needed (or indeed helpful) for physical slots as they'll + * start replay at the last logged checkpoint anyway. Instead return the + * location of the last redo LSN, where a base backup has to start replay + * at. + */ + if (!RecoveryInProgress() && SlotIsLogical(slot)) { - XLogSegNo segno; - XLogRecPtr restart_lsn; + XLogRecPtr flushptr; - /* - * For logical slots log a standby snapshot and start logical decoding - * at exactly that position. That allows the slot to start up more - * quickly. - * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. - */ - if (!RecoveryInProgress() && SlotIsLogical(slot)) - { - XLogRecPtr flushptr; + /* start at current insert position */ + restart_lsn = GetXLogInsertRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); - /* start at current insert position */ - restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); - } - else - { - restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - } - - /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); - - /* - * If all required WAL is still there, great, otherwise retry. The - * slot should prevent further removal of WAL, unless there's a - * concurrent ReplicationSlotsComputeRequiredLSN() after we've written - * the new restart_lsn above, so normally we should never need to loop - * more than twice. - */ - XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); - if (XLogGetLastRemovedSegno() < segno) - break; + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); } + else + { + restart_lsn = GetRedoRecPtr(); + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + } + + /* prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + /* Checkpoint shouldn't remove the required WAL. */ + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + if (XLogGetLastRemovedSegno() >= segno) + elog(ERROR, "WAL required by replication slot %s has been removed concurrently", + NameStr(slot->data.name)); + + LWLockRelease(ReplicationSlotAllocationLock); } /*