diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp index 9c2f23ac1..e5640a392 100644 --- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp +++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp @@ -1267,7 +1267,7 @@ void ReplicationSlotDropAtPubNode(char *slotname, bool missing_ok) } } - appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname)); + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s WAIT", quote_identifier(slotname)); res = WalReceiverFuncTable[GET_FUNC_IDX].walrcv_exec(cmd.data, 0, NULL); if (res->status != WALRCV_OK_COMMAND) { diff --git a/src/gausskernel/process/postmaster/pgstat.cpp b/src/gausskernel/process/postmaster/pgstat.cpp index 6b30c6b05..55faa5019 100644 --- a/src/gausskernel/process/postmaster/pgstat.cpp +++ b/src/gausskernel/process/postmaster/pgstat.cpp @@ -4598,6 +4598,9 @@ const char* pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_REPLICATION_ORIGIN_DROP: event_name = "ReplicationOriginDrop"; break; + case WAIT_EVENT_REPLICATION_SLOT_DROP: + event_name = "ReplicationSlotDrop"; + break; default: event_name = "unknown wait event"; break; diff --git a/src/gausskernel/storage/replication/logical/origin.cpp b/src/gausskernel/storage/replication/logical/origin.cpp index c9eab9704..826a69b14 100644 --- a/src/gausskernel/storage/replication/logical/origin.cpp +++ b/src/gausskernel/storage/replication/logical/origin.cpp @@ -299,7 +299,7 @@ restart: LWLockRelease(ReplicationOriginLock); pthread_mutex_lock(&state->originMutex); struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); + clock_gettime(CLOCK_MONOTONIC, &ts); ts.tv_sec += SECS_PER_MINUTE / 2; pgstat_report_waitevent(WAIT_EVENT_REPLICATION_ORIGIN_DROP); (void)pthread_cond_timedwait(&state->orginCV, &state->originMutex, &ts); diff --git a/src/gausskernel/storage/replication/repl_gram.y b/src/gausskernel/storage/replication/repl_gram.y index e5ae4f9ce..d84156a5c 100755 --- a/src/gausskernel/storage/replication/repl_gram.y +++ b/src/gausskernel/storage/replication/repl_gram.y @@ -87,6 +87,7 @@ %token K_LABEL %token K_PROGRESS %token K_FAST +%token K_WAIT %token K_NOWAIT %token K_BUILDSTANDBY %token K_OBSMODE @@ -407,8 +408,17 @@ advance_catalog_xmin: DropReplicationSlotCmd *cmd; cmd = makeNode(DropReplicationSlotCmd); cmd->slotname = $2; + cmd->wait = false; $$ = (Node *) cmd; } + | K_DROP_REPLICATION_SLOT IDENT K_WAIT + { + DropReplicationSlotCmd *cmd; + cmd = makeNode(DropReplicationSlotCmd); + cmd->slotname = $2; + cmd->wait = true; + $$ = (Node *) cmd; + } ; diff --git a/src/gausskernel/storage/replication/repl_scanner.l b/src/gausskernel/storage/replication/repl_scanner.l index 8c73b9c9c..bcfc125d8 100755 --- a/src/gausskernel/storage/replication/repl_scanner.l +++ b/src/gausskernel/storage/replication/repl_scanner.l @@ -120,6 +120,7 @@ LOGICAL { return K_LOGICAL; } SLOT { return K_SLOT; } USE_SNAPSHOT { return K_USE_SNAPSHOT; } ADVANCE_CATALOG_XMIN { return K_ADVANCE_CATALOG_XMIN; } +WAIT { return K_WAIT; } "," { return ','; } ";" { return ';'; } diff --git a/src/gausskernel/storage/replication/slot.cpp b/src/gausskernel/storage/replication/slot.cpp index ced6b615c..900a833c4 100755 --- a/src/gausskernel/storage/replication/slot.cpp +++ b/src/gausskernel/storage/replication/slot.cpp @@ -110,6 +110,19 @@ void ReplicationSlotsShmemInit(void) /* everything else is zeroed by the memset above */ SpinLockInit(&slot->mutex); slot->io_in_progress_lock = LWLockAssign(LWTRANCHE_REPLICATION_SLOT); + rc = pthread_condattr_init(&slot->slotAttr); + if (rc != 0) { + elog(FATAL, "Fail to init conattr for replication slot"); + } + rc = pthread_condattr_setclock(&slot->slotAttr, CLOCK_MONOTONIC); + if (rc != 0) { + elog(FATAL, "Fail to setclock replication slot"); + } + rc = pthread_cond_init(&slot->active_cv, &slot->slotAttr); + if (rc != 0) { + elog(FATAL, "Fail to init cond for replication slot"); + } + slot->active_mutex = PTHREAD_MUTEX_INITIALIZER; } } } @@ -459,21 +472,30 @@ void ReplicationSlotCreate(const char *name, ReplicationSlotPersistency persiste * let somebody else try to allocate a slot. */ LWLockRelease(ReplicationSlotAllocationLock); + + /* Let everybody know we've modified this slot. */ + (void)pthread_mutex_lock(&slot->active_mutex); + pthread_cond_broadcast(&slot->active_cv); + (void)pthread_mutex_unlock(&slot->active_mutex); } /* * @param[IN] allowDrop: only used in dropping a logical replication slot. * Find a previously created slot and mark it as used by this backend. */ -void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDrop) +void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDrop, bool nowait) { ReplicationSlot *slot = NULL; int i; bool active = false; int slot_idx = -1; + struct timespec time_to_wait; +retry: Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL); + CHECK_FOR_INTERRUPTS(); + ReplicationSlotValidateName(name, ERROR); /* Search for the named slot and mark it active if we find it. */ @@ -499,18 +521,34 @@ void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDro if (slot == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); /* We allow dropping active logical replication slots on standby or for subscription in opengauss. */ - if (active) { + if (active && nowait) { if (((slot->data.database != InvalidOid #ifndef ENABLE_MULTIPLE_NODES && !allowDrop #endif - ) || isDummyStandby != slot->data.isDummyStandby) && strcmp(slot->data.plugin.data, "pgoutput") != 0) + ) || isDummyStandby != slot->data.isDummyStandby)) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is already active", name))); else { ereport(WARNING, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is already active", name))); } + } else if (active && !nowait) { + (void)pthread_mutex_lock(&slot->active_mutex); + clock_gettime(CLOCK_MONOTONIC, &time_to_wait); + time_to_wait.tv_sec += SECS_PER_MINUTE / 2; + pgstat_report_waitevent(WAIT_EVENT_REPLICATION_SLOT_DROP); + (void)pthread_cond_timedwait(&slot->active_cv, &slot->active_mutex, &time_to_wait); + pgstat_report_waitevent(WAIT_EVENT_END); + (void)pthread_mutex_unlock(&slot->active_mutex); + + goto retry; } + + /* Let everybody know we've modified this slot */ + (void)pthread_mutex_lock(&slot->active_mutex); + pthread_cond_broadcast(&slot->active_cv); + (void)pthread_mutex_unlock(&slot->active_mutex); + if (slot->data.database != InvalidOid) { slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->candidate_restart_valid = InvalidXLogRecPtr; @@ -617,11 +655,6 @@ void ReplicationSlotRelease(void) * data. */ ReplicationSlotDropAcquired(); - } else { - /* Mark slot inactive. We're not freeing it, just disconnecting. */ volatile ReplicationSlot *vslot = slot; - SpinLockAcquire(&slot->mutex); - vslot->active = false; - SpinLockRelease(&slot->mutex); } /* @@ -637,6 +670,17 @@ void ReplicationSlotRelease(void) ReplicationSlotsComputeRequiredXmin(false); } + if (GET_SLOT_PERSISTENCY(slot->data) != RS_EPHEMERAL) { + /* Mark slot inactive. We're not freeing it, just disconnecting. */ + volatile ReplicationSlot *vslot = slot; + SpinLockAcquire(&slot->mutex); + vslot->active = false; + SpinLockRelease(&slot->mutex); + (void)pthread_mutex_lock(&slot->active_mutex); + pthread_cond_broadcast(&slot->active_cv); + (void)pthread_mutex_unlock(&slot->active_mutex); + } + t_thrd.slot_cxt.MyReplicationSlot = NULL; /* might not have been set when we've been a plain slot */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -648,7 +692,7 @@ void ReplicationSlotRelease(void) /* * Permanently drop replication slot identified by the passed in name. */ -void ReplicationSlotDrop(const char *name, bool for_backup) +void ReplicationSlotDrop(const char *name, bool for_backup, bool nowait) { bool isLogical = false; bool is_archive_slot = false; @@ -663,7 +707,7 @@ void ReplicationSlotDrop(const char *name, bool for_backup) Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL); /* We allow dropping active logical replication slots on standby in opengauss. */ - ReplicationSlotAcquire(name, false, RecoveryInProgress()); + ReplicationSlotAcquire(name, false, RecoveryInProgress(), nowait); if (t_thrd.slot_cxt.MyReplicationSlot->archive_config != NULL) { is_archive_slot = true; } @@ -743,6 +787,10 @@ static void ReplicationSlotDropAcquired(void) vslot->active = false; SpinLockRelease(&slot->mutex); + (void)pthread_mutex_lock(&slot->active_mutex); + pthread_cond_broadcast(&slot->active_cv); + (void)pthread_mutex_unlock(&slot->active_mutex); + ereport(fail_softly ? WARNING : ERROR, (errcode_for_file_access(), errmsg("could not rename \"%s\" to \"%s\": %m", path, tmppath))); } @@ -761,6 +809,9 @@ static void ReplicationSlotDropAcquired(void) ReleaseArchiveSlotInfo(slot); } LWLockRelease(ReplicationSlotControlLock); + (void)pthread_mutex_lock(&slot->active_mutex); + pthread_cond_broadcast(&slot->active_cv); + (void)pthread_mutex_unlock(&slot->active_mutex); /* * Slot is dead and doesn't prevent resource removal anymore, recompute diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 65dd62500..8f0a6a74b 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -1479,7 +1479,7 @@ static void DropReplicationSlot(DropReplicationSlotCmd *cmd) if (IsLogicalSlot(cmd->slotname)) { MarkPostmasterChildNormal(); CheckPMstateAndRecoveryInProgress(); - ReplicationSlotDrop(cmd->slotname); + ReplicationSlotDrop(cmd->slotname, false, !cmd->wait); log_slot_drop(cmd->slotname); } else { ReplicationSlotDrop(cmd->slotname); diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 1003562a2..33cfc0246 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -82,6 +82,7 @@ typedef struct CreateReplicationSlotCmd { typedef struct DropReplicationSlotCmd { NodeTag type; char* slotname; + bool wait; } DropReplicationSlotCmd; /* ---------------------- diff --git a/src/include/pgstat.h b/src/include/pgstat.h index b08a763bc..3164572db 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1343,6 +1343,7 @@ typedef enum WaitEventIO { WAIT_EVENT_LOGICAL_SYNC_DATA, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE, WAIT_EVENT_REPLICATION_ORIGIN_DROP, + WAIT_EVENT_REPLICATION_SLOT_DROP, IO_EVENT_NUM = WAIT_EVENT_LOGCTRL_SLEEP - WAIT_EVENT_BUFFILE_READ + 1 // MUST be last, DO NOT use this value. } WaitEventIO; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index b0a666e68..53b9015c7 100755 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -153,6 +153,11 @@ typedef struct ReplicationSlot { /* is somebody performing io on this slot? */ LWLock *io_in_progress_lock; + /* Condition variable signalled when active_pid changes */ + pthread_mutex_t active_mutex; + pthread_cond_t active_cv; + pthread_condattr_t slotAttr; + /* all the remaining data is only used for logical slots */ /* ---- @@ -262,8 +267,8 @@ extern void ReplicationSlotsShmemInit(void); extern void ReplicationSlotCreate(const char* name, ReplicationSlotPersistency persistency, bool isDummyStandby, Oid databaseId, XLogRecPtr restart_lsn, char* extra_content = NULL, bool encrypted = false); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char* name, bool for_backup = false); -extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby, bool allowDrop = false); +extern void ReplicationSlotDrop(const char* name, bool for_backup = false, bool nowait = true); +extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby, bool allowDrop = false, bool nowait = true); extern bool IsReplicationSlotActive(const char *name); extern bool IsLogicalReplicationSlot(const char *name); bool ReplicationSlotFind(const char* name);