Add a WAIT option to DROP_REPLICATION_SLOT

This commit is contained in:
chenxiaobin19
2023-02-16 22:18:46 +08:00
parent ca299a3b9b
commit adbb73b632
10 changed files with 87 additions and 15 deletions

View File

@ -1267,7 +1267,7 @@ void ReplicationSlotDropAtPubNode(char *slotname, bool missing_ok)
}
}
appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname));
res = WalReceiverFuncTable[GET_FUNC_IDX].walrcv_exec(cmd.data, 0, NULL);
if (res->status != WALRCV_OK_COMMAND) {

View File

@ -4598,6 +4598,9 @@ const char* pgstat_get_wait_io(WaitEventIO w)
case WAIT_EVENT_REPLICATION_ORIGIN_DROP:
event_name = "ReplicationOriginDrop";
break;
case WAIT_EVENT_REPLICATION_SLOT_DROP:
event_name = "ReplicationSlotDrop";
break;
default:
event_name = "unknown wait event";
break;

View File

@ -299,7 +299,7 @@ restart:
LWLockRelease(ReplicationOriginLock);
pthread_mutex_lock(&state->originMutex);
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
clock_gettime(CLOCK_MONOTONIC, &ts);
ts.tv_sec += SECS_PER_MINUTE / 2;
pgstat_report_waitevent(WAIT_EVENT_REPLICATION_ORIGIN_DROP);
(void)pthread_cond_timedwait(&state->orginCV, &state->originMutex, &ts);

View File

@ -87,6 +87,7 @@
%token K_LABEL
%token K_PROGRESS
%token K_FAST
%token K_WAIT
%token K_NOWAIT
%token K_BUILDSTANDBY
%token K_OBSMODE
@ -407,8 +408,17 @@ advance_catalog_xmin:
DropReplicationSlotCmd *cmd;
cmd = makeNode(DropReplicationSlotCmd);
cmd->slotname = $2;
cmd->wait = false;
$$ = (Node *) cmd;
}
| K_DROP_REPLICATION_SLOT IDENT K_WAIT
{
DropReplicationSlotCmd *cmd;
cmd = makeNode(DropReplicationSlotCmd);
cmd->slotname = $2;
cmd->wait = true;
$$ = (Node *) cmd;
}
;

View File

@ -120,6 +120,7 @@ LOGICAL { return K_LOGICAL; }
SLOT { return K_SLOT; }
USE_SNAPSHOT { return K_USE_SNAPSHOT; }
ADVANCE_CATALOG_XMIN { return K_ADVANCE_CATALOG_XMIN; }
WAIT { return K_WAIT; }
"," { return ','; }
";" { return ';'; }

View File

@ -110,6 +110,19 @@ void ReplicationSlotsShmemInit(void)
/* everything else is zeroed by the memset above */
SpinLockInit(&slot->mutex);
slot->io_in_progress_lock = LWLockAssign(LWTRANCHE_REPLICATION_SLOT);
rc = pthread_condattr_init(&slot->slotAttr);
if (rc != 0) {
elog(FATAL, "Fail to init conattr for replication slot");
}
rc = pthread_condattr_setclock(&slot->slotAttr, CLOCK_MONOTONIC);
if (rc != 0) {
elog(FATAL, "Fail to setclock replication slot");
}
rc = pthread_cond_init(&slot->active_cv, &slot->slotAttr);
if (rc != 0) {
elog(FATAL, "Fail to init cond for replication slot");
}
slot->active_mutex = PTHREAD_MUTEX_INITIALIZER;
}
}
}
@ -459,21 +472,30 @@ void ReplicationSlotCreate(const char *name, ReplicationSlotPersistency persiste
* let somebody else try to allocate a slot.
*/
LWLockRelease(ReplicationSlotAllocationLock);
/* Let everybody know we've modified this slot. */
(void)pthread_mutex_lock(&slot->active_mutex);
pthread_cond_broadcast(&slot->active_cv);
(void)pthread_mutex_unlock(&slot->active_mutex);
}
/*
* @param[IN] allowDrop: only used in dropping a logical replication slot.
* Find a previously created slot and mark it as used by this backend.
*/
void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDrop)
void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDrop, bool nowait)
{
ReplicationSlot *slot = NULL;
int i;
bool active = false;
int slot_idx = -1;
struct timespec time_to_wait;
retry:
Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL);
CHECK_FOR_INTERRUPTS();
ReplicationSlotValidateName(name, ERROR);
/* Search for the named slot and mark it active if we find it. */
@ -499,18 +521,34 @@ void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDro
if (slot == NULL)
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name)));
/* We allow dropping active logical replication slots on standby or for subscription in opengauss. */
if (active) {
if (active && nowait) {
if (((slot->data.database != InvalidOid
#ifndef ENABLE_MULTIPLE_NODES
&& !allowDrop
#endif
) || isDummyStandby != slot->data.isDummyStandby) && strcmp(slot->data.plugin.data, "pgoutput") != 0)
) || isDummyStandby != slot->data.isDummyStandby))
ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is already active", name)));
else {
ereport(WARNING,
(errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is already active", name)));
}
} else if (active && !nowait) {
(void)pthread_mutex_lock(&slot->active_mutex);
clock_gettime(CLOCK_MONOTONIC, &time_to_wait);
time_to_wait.tv_sec += SECS_PER_MINUTE / 2;
pgstat_report_waitevent(WAIT_EVENT_REPLICATION_SLOT_DROP);
(void)pthread_cond_timedwait(&slot->active_cv, &slot->active_mutex, &time_to_wait);
pgstat_report_waitevent(WAIT_EVENT_END);
(void)pthread_mutex_unlock(&slot->active_mutex);
goto retry;
}
/* Let everybody know we've modified this slot */
(void)pthread_mutex_lock(&slot->active_mutex);
pthread_cond_broadcast(&slot->active_cv);
(void)pthread_mutex_unlock(&slot->active_mutex);
if (slot->data.database != InvalidOid) {
slot->candidate_restart_lsn = InvalidXLogRecPtr;
slot->candidate_restart_valid = InvalidXLogRecPtr;
@ -617,11 +655,6 @@ void ReplicationSlotRelease(void)
* data.
*/
ReplicationSlotDropAcquired();
} else {
/* Mark slot inactive. We're not freeing it, just disconnecting. */ volatile ReplicationSlot *vslot = slot;
SpinLockAcquire(&slot->mutex);
vslot->active = false;
SpinLockRelease(&slot->mutex);
}
/*
@ -637,6 +670,17 @@ void ReplicationSlotRelease(void)
ReplicationSlotsComputeRequiredXmin(false);
}
if (GET_SLOT_PERSISTENCY(slot->data) != RS_EPHEMERAL) {
/* Mark slot inactive. We're not freeing it, just disconnecting. */
volatile ReplicationSlot *vslot = slot;
SpinLockAcquire(&slot->mutex);
vslot->active = false;
SpinLockRelease(&slot->mutex);
(void)pthread_mutex_lock(&slot->active_mutex);
pthread_cond_broadcast(&slot->active_cv);
(void)pthread_mutex_unlock(&slot->active_mutex);
}
t_thrd.slot_cxt.MyReplicationSlot = NULL;
/* might not have been set when we've been a plain slot */
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
@ -648,7 +692,7 @@ void ReplicationSlotRelease(void)
/*
* Permanently drop replication slot identified by the passed in name.
*/
void ReplicationSlotDrop(const char *name, bool for_backup)
void ReplicationSlotDrop(const char *name, bool for_backup, bool nowait)
{
bool isLogical = false;
bool is_archive_slot = false;
@ -663,7 +707,7 @@ void ReplicationSlotDrop(const char *name, bool for_backup)
Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL);
/* We allow dropping active logical replication slots on standby in opengauss. */
ReplicationSlotAcquire(name, false, RecoveryInProgress());
ReplicationSlotAcquire(name, false, RecoveryInProgress(), nowait);
if (t_thrd.slot_cxt.MyReplicationSlot->archive_config != NULL) {
is_archive_slot = true;
}
@ -743,6 +787,10 @@ static void ReplicationSlotDropAcquired(void)
vslot->active = false;
SpinLockRelease(&slot->mutex);
(void)pthread_mutex_lock(&slot->active_mutex);
pthread_cond_broadcast(&slot->active_cv);
(void)pthread_mutex_unlock(&slot->active_mutex);
ereport(fail_softly ? WARNING : ERROR,
(errcode_for_file_access(), errmsg("could not rename \"%s\" to \"%s\": %m", path, tmppath)));
}
@ -761,6 +809,9 @@ static void ReplicationSlotDropAcquired(void)
ReleaseArchiveSlotInfo(slot);
}
LWLockRelease(ReplicationSlotControlLock);
(void)pthread_mutex_lock(&slot->active_mutex);
pthread_cond_broadcast(&slot->active_cv);
(void)pthread_mutex_unlock(&slot->active_mutex);
/*
* Slot is dead and doesn't prevent resource removal anymore, recompute

View File

@ -1479,7 +1479,7 @@ static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
if (IsLogicalSlot(cmd->slotname)) {
MarkPostmasterChildNormal();
CheckPMstateAndRecoveryInProgress();
ReplicationSlotDrop(cmd->slotname);
ReplicationSlotDrop(cmd->slotname, false, !cmd->wait);
log_slot_drop(cmd->slotname);
} else {
ReplicationSlotDrop(cmd->slotname);

View File

@ -82,6 +82,7 @@ typedef struct CreateReplicationSlotCmd {
typedef struct DropReplicationSlotCmd {
NodeTag type;
char* slotname;
bool wait;
} DropReplicationSlotCmd;
/* ----------------------

View File

@ -1343,6 +1343,7 @@ typedef enum WaitEventIO {
WAIT_EVENT_LOGICAL_SYNC_DATA,
WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
WAIT_EVENT_REPLICATION_ORIGIN_DROP,
WAIT_EVENT_REPLICATION_SLOT_DROP,
IO_EVENT_NUM = WAIT_EVENT_LOGCTRL_SLEEP - WAIT_EVENT_BUFFILE_READ + 1 // MUST be last, DO NOT use this value.
} WaitEventIO;

View File

@ -153,6 +153,11 @@ typedef struct ReplicationSlot {
/* is somebody performing io on this slot? */
LWLock *io_in_progress_lock;
/* Condition variable signalled when active_pid changes */
pthread_mutex_t active_mutex;
pthread_cond_t active_cv;
pthread_condattr_t slotAttr;
/* all the remaining data is only used for logical slots */
/* ----
@ -262,8 +267,8 @@ extern void ReplicationSlotsShmemInit(void);
extern void ReplicationSlotCreate(const char* name, ReplicationSlotPersistency persistency, bool isDummyStandby,
Oid databaseId, XLogRecPtr restart_lsn, char* extra_content = NULL, bool encrypted = false);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char* name, bool for_backup = false);
extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby, bool allowDrop = false);
extern void ReplicationSlotDrop(const char* name, bool for_backup = false, bool nowait = true);
extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby, bool allowDrop = false, bool nowait = true);
extern bool IsReplicationSlotActive(const char *name);
extern bool IsLogicalReplicationSlot(const char *name);
bool ReplicationSlotFind(const char* name);