!6682 LWLock支持超时等锁、X锁降级机制

Merge pull request !6682 from hejiahuan11/LWLock
This commit is contained in:
opengauss_bot
2024-11-19 09:34:07 +00:00
committed by Gitee
7 changed files with 255 additions and 21 deletions

View File

@ -494,6 +494,27 @@ bool PGSemaphoreTryLock(PGSemaphore sema)
return true;
}
/*
* PGSemaphoreLockTimeout
*
* Do not block obtaining locks until timeout
* Return the remaining time. 0 indicates that no semaphore is obtained within the sepcified time, otherwise,
* the semaphore is obtained within the specified time and the remaining time is returned.
*/
int PGSemaphoreLockTimeout(PGSemaphore sema, int timeout_ms)
{
int remainingTime = timeout_ms;
do {
if (PGSemaphoreTryLock(sema)) {
break;
}
pg_usleep(1000L);
remainingTime--;
} while (remainingTime > 0);
return remainingTime;
}
/*
* @@GaussDB@@
* Brief : cancel the semphore release on shmem exit

View File

@ -30,6 +30,7 @@ provider postgresql {
probe lwlock__acquire(const char *, LWLockMode);
probe lwlock__release(const char *);
probe lwlock__downgrade(const char *);
probe lwlock__wait__start(const char *, LWLockMode);
probe lwlock__wait__done(const char *, LWLockMode);
probe lwlock__condacquire(const char *, LWLockMode);

View File

@ -1022,21 +1022,8 @@ bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode)
{
bool get_lock = false;
int wait_tickets = (SS_PRIMARY_MODE) ? 500 : 5000;
int cur_tickets = 0;
do {
get_lock = LWLockConditionalAcquire(lock, mode);
if (get_lock) {
break;
}
pg_usleep(1000L);
cur_tickets++;
if (cur_tickets >= wait_tickets) {
break;
}
} while (true);
get_lock = LWLockAcquireTimeout(lock, mode, wait_tickets);
if (!get_lock) {
ereport(WARNING, (errcode(MOD_DMS), (errmsg("[SS lwlock] request LWLock:%p timeout, LWLockMode:%d, timeout:%dms",
lock, mode, wait_tickets))));

View File

@ -6392,8 +6392,8 @@ retry:
* Because the two sessions on the standby node will hold the content lock at the shared level,
* at the same time, even if one of them fails, release the lock and sleep, the other will hold
* it during this time, and the MES thread from the host will never get the exclusive lock on
* this page.
*
* this page.
*
* However, the session on the primary side holds the exclusive lock, which prevents the MES
* for standby from taking the shared lock, which eventually leads to a deadlock.
*
@ -6412,14 +6412,14 @@ retry:
} else if (dms_standby_retry_read) {
/*
* We're on standby, and we have got the page, but we're holding an exclusive lock,
* which isn't good, so release the lock and start over.
* which isn't good, so lock need downgrade.
*
* A good idea would be to add the ability to lock downgrade for LWLock.
* The lock downgrade function is only applicable for downgrading exclusive locks to shared locks.
*/
Assert(mode == BUFFER_LOCK_EXCLUSIVE && origin_mode == BUFFER_LOCK_SHARE);
mode = origin_mode;
dms_standby_retry_read = false;
LWLockRelease(buf->content_lock);
goto retry;
LWLockDowngrade(buf->content_lock);
}
}

View File

@ -1487,6 +1487,152 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid)
return result;
}
/*
 * LWLockAcquireTimeout - acquire a lightweight lock in the specified mode
 *
 * If the lock is not available, sleep until it is or until the timeout is
 * reached.
 *
 * Side effect: cancel/die interrupts are held off until lock release.
 */
bool LWLockAcquireTimeout(LWLock *lock, LWLockMode mode, int timeout_ms)
{
PGPROC *proc = t_thrd.proc;
int extraWaits = 0;
AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
PRINT_LWDEBUG("LWLockAcquireTimeout", lock, mode);
#ifdef LWLOCK_STATS
lwlock_stats *lwstats = NULL;
lwstats = get_lwlock_stats_entry(lock);
/* Count lock acquisition attempts */
if (mode == LW_EXCLUSIVE) {
lwstats->ex_acquire_count++;
} else {
lwstats->sh_acquire_count++;
}
#endif // LWLOCK_STATS
/*
* We can't wait if we haven't got a PGPROC. This should only occur
* during bootstrap or shared memory initialization. Put an Assert here
* to catch unsafe coding practices.
*/
Assert(!(proc == NULL && IsUnderPostmaster));
if (t_thrd.storage_cxt.num_held_lwlocks >= MAX_SIMUL_LWLOCKS) {
ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("too many LWLocks taken")));
}
remember_lwlock_acquire(lock, mode);
HOLD_INTERRUPTS();
while (timeout_ms > 0) {
bool mustwait = false;
/*
* Try to grab the lock the first time, we're not in the waitqueue
* yet/anymore.
*/
mustwait = LWLockAttemptLock(lock, mode);
if (!mustwait) {
LOG_LWDEBUG("LWLockAcquireTimeout", lock, "immediately acquired lock");
break; // got the lock
}
instr_stmt_report_lock(LWLOCK_WAIT_START, mode, NULL, lock->tranche);
pgstat_report_waitevent(PG_WAIT_LWLOCK | lock->tranche);
// add to the queue
LWLockQueueSelf(lock, mode);
mustwait = LWLockAttemptLock(lock, mode);
/* ok, grabbed the lock the second time round, need to undo queueing */
if (!mustwait) {
LOG_LWDEBUG("LWLockAcquireTimeout", lock, "acquired, undoing queue");
LWLockDequeueSelf(lock, mode);
pgstat_report_waitevent(WAIT_EVENT_END);
instr_stmt_report_lock(LWLOCK_WAIT_END);
break; // got the lock
}
LOG_LWDEBUG("LWLockAcquireTimeout", lock, "waiting");
#ifdef LWLOCK_STATS
lwstats->block_count++;
#endif
TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
while (timeout_ms > 0) {
timeout_ms = PGSemaphoreLockTimeout(&proc->sem, timeout_ms);
if (!proc->lwWaiting) {
if (!proc->lwIsVictim) {
break;
}
pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK);
while (extraWaits-- > 0) {
PGSemaphoreUnlock(&proc->sem);
}
proc->lwIsVictim = false;
timeout_ms = 0;
ereport(WARNING,
(errmsg("The deadlock detection targeted us as victims, "
"but we are LWLockAcquireTimeout, "
"just trade it as 'Failed to waiting lwlock'.")));
}
extraWaits++;
}
if (timeout_ms == 0) {
LOG_LWDEBUG("LWLockAcquireTimeout", lock, "acquired, undoing queue");
LWLockDequeueSelf(lock, mode);
}
/* Retrying, allow LWLockRelease to release waiters again. */
pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK);
#ifdef LOCK_DEBUG
{
/* not waiting anymore */
uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
Assert(nwaiters < MAX_BACKENDS);
}
#endif
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
pgstat_report_waitevent(WAIT_EVENT_END);
instr_stmt_report_lock(LWLOCK_WAIT_END);
LOG_LWDEBUG("LWLockAcquireTimeout", lock, "awakened");
}
/* just LWLock acquire timeout end */
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
forget_lwlock_acquire();
if (timeout_ms > 0) {
/* Add lock to list of locks held by this backend */
t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].lock = lock;
t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].mode = mode;
t_thrd.storage_cxt.lwlock_held_times[t_thrd.storage_cxt.num_held_lwlocks] =
(u_sess->attr.attr_common.pgstat_track_activities ? GetCurrentTimestamp() : (TimestampTz)0);
t_thrd.storage_cxt.num_held_lwlocks++;
} else {
RESUME_INTERRUPTS();
}
/*
* Fix the process wait semaphore's count for any absorbed wakeups.
*/
while ((extraWaits--) > 0) {
PGSemaphoreUnlock(&proc->sem);
}
return timeout_ms > 0;
}
/*
* LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
*
@ -1865,7 +2011,7 @@ void LWLockRelease(LWLock *lock)
/* Remove lock from list of locks held. Usually, but not always, it will
* be the latest-acquired lock; so search array backwards. */
for (i = t_thrd.storage_cxt.num_held_lwlocks; --i >= 0;) {
for (i = t_thrd.storage_cxt.num_held_lwlocks - 1; i >= 0; --i) {
if (lock == t_thrd.storage_cxt.held_lwlocks[i].lock) {
mode = t_thrd.storage_cxt.held_lwlocks[i].mode;
break;
@ -1932,6 +2078,81 @@ void LWLockRelease(LWLock *lock)
RESUME_INTERRUPTS();
}
/*
* LWLockDowngrade - downgrade exclusive lock to shared lock
*/
void LWLockDowngrade(LWLock *lock)
{
PGPROC *proc = t_thrd.proc;
LWLockMode mode = LW_EXCLUSIVE;
uint64 old_state;
/* Check if it is necessary to wake up the processes in the waiting queue. Default does not wake up */
bool check_waiters = false;
int i;
/*
* Ensure that the thread already holds an exclusive lock, if not, report an error and stop the operation
* The range of parameter i is [0, num_held_lwlocks - 1], num_held_lwlocks is the number of held locks.
*/
for (i = t_thrd.storage_cxt.num_held_lwlocks - 1; i >= 0; --i) {
if (lock == t_thrd.storage_cxt.held_lwlocks[i].lock) {
mode = t_thrd.storage_cxt.held_lwlocks[i].mode;
break;
}
}
if ((i < 0) || (mode != LW_EXCLUSIVE)) {
ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("lock %s not held or not exclusive mode", T_NAME(lock))));
}
/* if lwlock is held longer than 1min, ereport the detail and backtrace */
TimestampTz now = (u_sess->attr.attr_common.pgstat_track_activities ? GetCurrentTimestamp() : (TimestampTz)0);
if (u_sess->attr.attr_common.pgstat_track_activities &&
TimestampDifferenceExceeds(t_thrd.storage_cxt.lwlock_held_times[i], now, MSECS_PER_MIN)) {
force_backtrace_messages = true;
int old_backtrace_min_messages = u_sess->attr.attr_common.backtrace_min_messages;
u_sess->attr.attr_common.backtrace_min_messages = LOG;
ereport(LOG, ((errmsg("lwlock %s mode %d is held "
"for %ld ms longer than 1 min", T_NAME(lock), (int)(t_thrd.storage_cxt.held_lwlocks[i].mode),
now - t_thrd.storage_cxt.lwlock_held_times[i]))));
u_sess->attr.attr_common.backtrace_min_messages = old_backtrace_min_messages;
}
PRINT_LWDEBUG("LWLockDowngrade", lock, mode);
uint32 mask_id = LOCK_THREADID_MASK;
uint64 ref_one_by_thread = LOCK_REFCOUNT_ONE_BY_THREADID;
old_state = pg_atomic_read_u64((volatile uint64 *)&lock->state);
uint64 desired_state = 0;
/*
* Add shared lock flag bits through atomic operation comparison and exchange.
* loop until we've determined whether we could acquire the lock or not.
*/
do {
desired_state = old_state + ref_one_by_thread;
} while (!pg_atomic_compare_exchange_u8((((volatile uint8*)&lock->state) + mask_id), ((uint8*)&old_state) + mask_id, (desired_state >> (8 * mask_id))));
/* Remove the flag for exclusive lock */
TsAnnotateRWLockReleased(&lock->rwlock, 1);
old_state = pg_atomic_sub_fetch_u64(&lock->state, LW_VAL_EXCLUSIVE);
t_thrd.storage_cxt.held_lwlocks[i].mode = LW_SHARED;
LOG_LWDEBUG("LWLockDowngrade", lock, "exclusive LWLock downgrade to shared LWLock");
/* nobody else can have that kind of lock */
Assert(!(old_state & LW_VAL_EXCLUSIVE));
/* Ensure that no other processes hold the lock, and if there are waiting processes, wake them up */
check_waiters =
((old_state & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) == (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK));
if (check_waiters) {
LOG_LWDEBUG("LWLockDowngrade", lock, "waking up waiters");
LWLockWakeup(lock);
}
TRACE_POSTGRESQL_LWLOCK_DOWNGRADE(T_NAME(lock));
}
/*
* LWLockReleaseClearVar - release a previously acquired lock, reset variable
*/

View File

@ -409,9 +409,11 @@ extern void DumpLWLockInfo();
extern LWLock* LWLockAssign(int trancheId, int tag = 0);
extern void LWLockInitialize(LWLock* lock, int tranche_id, int tag = 0);
extern bool LWLockAcquire(LWLock* lock, LWLockMode mode, bool need_update_lockid = false);
extern bool LWLockAcquireTimeout(LWLock* lock, LWLockMode mode, int timeout_ms);
extern bool LWLockConditionalAcquire(LWLock* lock, LWLockMode mode);
extern bool LWLockAcquireOrWait(LWLock* lock, LWLockMode mode);
extern void LWLockRelease(LWLock* lock);
extern void LWLockDowngrade(LWLock* lock);
extern void LWLockReleaseClearVar(LWLock* lock, uint64* valptr, uint64 val);
extern void LWLockReleaseAll(void);
extern bool LWLockHeldByMe(LWLock* lock);

View File

@ -81,6 +81,8 @@ extern void PGSemaphoreUnlock(PGSemaphore sema);
/* Lock a semaphore only if able to do so without blocking */
extern bool PGSemaphoreTryLock(PGSemaphore sema);
extern int PGSemaphoreLockTimeout(PGSemaphore sema, int timeout_ms);
extern void cancelSemphoreRelease(void);
#endif /* PG_SEMA_H */