diff --git a/src/common/backend/port/sysv_sema.cpp b/src/common/backend/port/sysv_sema.cpp index 8b8b9f432..fb1aff5b2 100644 --- a/src/common/backend/port/sysv_sema.cpp +++ b/src/common/backend/port/sysv_sema.cpp @@ -494,6 +494,27 @@ bool PGSemaphoreTryLock(PGSemaphore sema) return true; } +/* + * PGSemaphoreLockTimeout + * + * Do not block obtaining locks until timeout + * Return the remaining time. 0 indicates that no semaphore is obtained within the sepcified time, otherwise, + * the semaphore is obtained within the specified time and the remaining time is returned. + */ +int PGSemaphoreLockTimeout(PGSemaphore sema, int timeout_ms) +{ + int remainingTime = timeout_ms; + do { + if (PGSemaphoreTryLock(sema)) { + break; + } + pg_usleep(1000L); + remainingTime--; + } while (remainingTime > 0); + + return remainingTime; +} + /* * @@GaussDB@@ * Brief : cancel the semphore release on shmem exit diff --git a/src/common/backend/utils/probes.d b/src/common/backend/utils/probes.d index c5d593ffc..f6ab6f5fe 100644 --- a/src/common/backend/utils/probes.d +++ b/src/common/backend/utils/probes.d @@ -30,6 +30,7 @@ provider postgresql { probe lwlock__acquire(const char *, LWLockMode); probe lwlock__release(const char *); + probe lwlock__downgrade(const char *); probe lwlock__wait__start(const char *, LWLockMode); probe lwlock__wait__done(const char *, LWLockMode); probe lwlock__condacquire(const char *, LWLockMode); diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 105b1cf82..41ae638da 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -1022,21 +1022,8 @@ bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode) { bool get_lock = false; int wait_tickets = (SS_PRIMARY_MODE) ? 500 : 5000; - int cur_tickets = 0; - - do { - get_lock = LWLockConditionalAcquire(lock, mode); - if (get_lock) { - break; - } - - pg_usleep(1000L); - cur_tickets++; - if (cur_tickets >= wait_tickets) { - break; - } - } while (true); + get_lock = LWLockAcquireTimeout(lock, mode, wait_tickets); if (!get_lock) { ereport(WARNING, (errcode(MOD_DMS), (errmsg("[SS lwlock] request LWLock:%p timeout, LWLockMode:%d, timeout:%dms", lock, mode, wait_tickets)))); diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 4b47ede19..54cd2d6e7 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -6392,8 +6392,8 @@ retry: * Because the two sessions on the standby node will hold the content lock at the shared level, * at the same time, even if one of them fails, release the lock and sleep, the other will hold * it during this time, and the MES thread from the host will never get the exclusive lock on - * this page. - * + * this page. + * * However, the session on the primary side holds the exclusive lock, which prevents the MES * for standby from taking the shared lock, which eventually leads to a deadlock. * @@ -6412,14 +6412,14 @@ retry: } else if (dms_standby_retry_read) { /* * We're on standby, and we have got the page, but we're holding an exclusive lock, - * which isn't good, so release the lock and start over. + * which isn't good, so lock need downgrade. * - * A good idea would be to add the ability to lock downgrade for LWLock. + * The lock downgrade function is only applicable for downgrading exclusive locks to shared locks. */ + Assert(mode == BUFFER_LOCK_EXCLUSIVE && origin_mode == BUFFER_LOCK_SHARE); mode = origin_mode; dms_standby_retry_read = false; - LWLockRelease(buf->content_lock); - goto retry; + LWLockDowngrade(buf->content_lock); } } diff --git a/src/gausskernel/storage/lmgr/lwlock.cpp b/src/gausskernel/storage/lmgr/lwlock.cpp index cf03ed965..37d4e1ed6 100644 --- a/src/gausskernel/storage/lmgr/lwlock.cpp +++ b/src/gausskernel/storage/lmgr/lwlock.cpp @@ -1487,6 +1487,152 @@ bool LWLockAcquire(LWLock *lock, LWLockMode mode, bool need_update_lockid) return result; } +/* + * LWLockAcquireTimeout - acquire a lightweight lock in the specified mode + * + * If the lock is not available, sleep until it is or until the timeout is + * reached. + * + * Side effect: cancel/die interrupts are held off until lock release. + */ +bool LWLockAcquireTimeout(LWLock *lock, LWLockMode mode, int timeout_ms) +{ + PGPROC *proc = t_thrd.proc; + int extraWaits = 0; + + AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); + PRINT_LWDEBUG("LWLockAcquireTimeout", lock, mode); +#ifdef LWLOCK_STATS + lwlock_stats *lwstats = NULL; + lwstats = get_lwlock_stats_entry(lock); + + /* Count lock acquisition attempts */ + if (mode == LW_EXCLUSIVE) { + lwstats->ex_acquire_count++; + } else { + lwstats->sh_acquire_count++; + } +#endif // LWLOCK_STATS + + /* + * We can't wait if we haven't got a PGPROC. This should only occur + * during bootstrap or shared memory initialization. Put an Assert here + * to catch unsafe coding practices. + */ + Assert(!(proc == NULL && IsUnderPostmaster)); + + if (t_thrd.storage_cxt.num_held_lwlocks >= MAX_SIMUL_LWLOCKS) { + ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("too many LWLocks taken"))); + } + + remember_lwlock_acquire(lock, mode); + + HOLD_INTERRUPTS(); + + while (timeout_ms > 0) { + bool mustwait = false; + + /* + * Try to grab the lock the first time, we're not in the waitqueue + * yet/anymore. + */ + mustwait = LWLockAttemptLock(lock, mode); + if (!mustwait) { + LOG_LWDEBUG("LWLockAcquireTimeout", lock, "immediately acquired lock"); + break; // got the lock + } + + instr_stmt_report_lock(LWLOCK_WAIT_START, mode, NULL, lock->tranche); + pgstat_report_waitevent(PG_WAIT_LWLOCK | lock->tranche); + + // add to the queue + LWLockQueueSelf(lock, mode); + + mustwait = LWLockAttemptLock(lock, mode); + /* ok, grabbed the lock the second time round, need to undo queueing */ + if (!mustwait) { + LOG_LWDEBUG("LWLockAcquireTimeout", lock, "acquired, undoing queue"); + LWLockDequeueSelf(lock, mode); + pgstat_report_waitevent(WAIT_EVENT_END); + instr_stmt_report_lock(LWLOCK_WAIT_END); + break; // got the lock + } + + LOG_LWDEBUG("LWLockAcquireTimeout", lock, "waiting"); + +#ifdef LWLOCK_STATS + lwstats->block_count++; +#endif + TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode); + + while (timeout_ms > 0) { + timeout_ms = PGSemaphoreLockTimeout(&proc->sem, timeout_ms); + + if (!proc->lwWaiting) { + if (!proc->lwIsVictim) { + break; + } + pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK); + while (extraWaits-- > 0) { + PGSemaphoreUnlock(&proc->sem); + } + proc->lwIsVictim = false; + timeout_ms = 0; + ereport(WARNING, + (errmsg("The deadlock detection targeted us as victims, " + "but we are LWLockAcquireTimeout, " + "just trade it as 'Failed to waiting lwlock'."))); + } + extraWaits++; + } + if (timeout_ms == 0) { + LOG_LWDEBUG("LWLockAcquireTimeout", lock, "acquired, undoing queue"); + LWLockDequeueSelf(lock, mode); + } + + /* Retrying, allow LWLockRelease to release waiters again. */ + pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_RELEASE_OK); + +#ifdef LOCK_DEBUG + { + /* not waiting anymore */ + uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); + Assert(nwaiters < MAX_BACKENDS); + } +#endif + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode); + pgstat_report_waitevent(WAIT_EVENT_END); + instr_stmt_report_lock(LWLOCK_WAIT_END); + + LOG_LWDEBUG("LWLockAcquireTimeout", lock, "awakened"); + } + + /* just LWLock acquire timeout end */ + TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode); + + forget_lwlock_acquire(); + + if (timeout_ms > 0) { + /* Add lock to list of locks held by this backend */ + t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].lock = lock; + t_thrd.storage_cxt.held_lwlocks[t_thrd.storage_cxt.num_held_lwlocks].mode = mode; + t_thrd.storage_cxt.lwlock_held_times[t_thrd.storage_cxt.num_held_lwlocks] = + (u_sess->attr.attr_common.pgstat_track_activities ? GetCurrentTimestamp() : (TimestampTz)0); + t_thrd.storage_cxt.num_held_lwlocks++; + } else { + RESUME_INTERRUPTS(); + } + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while ((extraWaits--) > 0) { + PGSemaphoreUnlock(&proc->sem); + } + + return timeout_ms > 0; +} + /* * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode * @@ -1865,7 +2011,7 @@ void LWLockRelease(LWLock *lock) /* Remove lock from list of locks held. Usually, but not always, it will * be the latest-acquired lock; so search array backwards. */ - for (i = t_thrd.storage_cxt.num_held_lwlocks; --i >= 0;) { + for (i = t_thrd.storage_cxt.num_held_lwlocks - 1; i >= 0; --i) { if (lock == t_thrd.storage_cxt.held_lwlocks[i].lock) { mode = t_thrd.storage_cxt.held_lwlocks[i].mode; break; @@ -1932,6 +2078,81 @@ void LWLockRelease(LWLock *lock) RESUME_INTERRUPTS(); } +/* + * LWLockDowngrade - downgrade exclusive lock to shared lock + */ +void LWLockDowngrade(LWLock *lock) +{ + PGPROC *proc = t_thrd.proc; + LWLockMode mode = LW_EXCLUSIVE; + uint64 old_state; + /* Check if it is necessary to wake up the processes in the waiting queue. Default does not wake up */ + bool check_waiters = false; + int i; + + /* + * Ensure that the thread already holds an exclusive lock, if not, report an error and stop the operation + * The range of parameter i is [0, num_held_lwlocks - 1], num_held_lwlocks is the number of held locks. + */ + for (i = t_thrd.storage_cxt.num_held_lwlocks - 1; i >= 0; --i) { + if (lock == t_thrd.storage_cxt.held_lwlocks[i].lock) { + mode = t_thrd.storage_cxt.held_lwlocks[i].mode; + break; + } + } + if ((i < 0) || (mode != LW_EXCLUSIVE)) { + ereport(ERROR, (errcode(ERRCODE_LOCK_NOT_AVAILABLE), errmsg("lock %s not held or not exclusive mode", T_NAME(lock)))); + } + + /* if lwlock is held longer than 1min, ereport the detail and backtrace */ + TimestampTz now = (u_sess->attr.attr_common.pgstat_track_activities ? GetCurrentTimestamp() : (TimestampTz)0); + if (u_sess->attr.attr_common.pgstat_track_activities && + TimestampDifferenceExceeds(t_thrd.storage_cxt.lwlock_held_times[i], now, MSECS_PER_MIN)) { + force_backtrace_messages = true; + int old_backtrace_min_messages = u_sess->attr.attr_common.backtrace_min_messages; + u_sess->attr.attr_common.backtrace_min_messages = LOG; + ereport(LOG, ((errmsg("lwlock %s mode %d is held " + "for %ld ms longer than 1 min", T_NAME(lock), (int)(t_thrd.storage_cxt.held_lwlocks[i].mode), + now - t_thrd.storage_cxt.lwlock_held_times[i])))); + u_sess->attr.attr_common.backtrace_min_messages = old_backtrace_min_messages; + } + + PRINT_LWDEBUG("LWLockDowngrade", lock, mode); + + uint32 mask_id = LOCK_THREADID_MASK; + uint64 ref_one_by_thread = LOCK_REFCOUNT_ONE_BY_THREADID; + old_state = pg_atomic_read_u64((volatile uint64 *)&lock->state); + uint64 desired_state = 0; + /* + * Add shared lock flag bits through atomic operation comparison and exchange. + * loop until we've determined whether we could acquire the lock or not. + */ + do { + desired_state = old_state + ref_one_by_thread; + } while (!pg_atomic_compare_exchange_u8((((volatile uint8*)&lock->state) + mask_id), ((uint8*)&old_state) + mask_id, (desired_state >> (8 * mask_id)))); + + /* Remove the flag for exclusive lock */ + TsAnnotateRWLockReleased(&lock->rwlock, 1); + old_state = pg_atomic_sub_fetch_u64(&lock->state, LW_VAL_EXCLUSIVE); + + t_thrd.storage_cxt.held_lwlocks[i].mode = LW_SHARED; + LOG_LWDEBUG("LWLockDowngrade", lock, "exclusive LWLock downgrade to shared LWLock"); + + /* nobody else can have that kind of lock */ + Assert(!(old_state & LW_VAL_EXCLUSIVE)); + + /* Ensure that no other processes hold the lock, and if there are waiting processes, wake them up */ + check_waiters = + ((old_state & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) == (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)); + + if (check_waiters) { + LOG_LWDEBUG("LWLockDowngrade", lock, "waking up waiters"); + LWLockWakeup(lock); + } + + TRACE_POSTGRESQL_LWLOCK_DOWNGRADE(T_NAME(lock)); +} + /* * LWLockReleaseClearVar - release a previously acquired lock, reset variable */ diff --git a/src/include/storage/lock/lwlock.h b/src/include/storage/lock/lwlock.h index 591a1222f..3aeee95bc 100644 --- a/src/include/storage/lock/lwlock.h +++ b/src/include/storage/lock/lwlock.h @@ -409,9 +409,11 @@ extern void DumpLWLockInfo(); extern LWLock* LWLockAssign(int trancheId, int tag = 0); extern void LWLockInitialize(LWLock* lock, int tranche_id, int tag = 0); extern bool LWLockAcquire(LWLock* lock, LWLockMode mode, bool need_update_lockid = false); +extern bool LWLockAcquireTimeout(LWLock* lock, LWLockMode mode, int timeout_ms); extern bool LWLockConditionalAcquire(LWLock* lock, LWLockMode mode); extern bool LWLockAcquireOrWait(LWLock* lock, LWLockMode mode); extern void LWLockRelease(LWLock* lock); +extern void LWLockDowngrade(LWLock* lock); extern void LWLockReleaseClearVar(LWLock* lock, uint64* valptr, uint64 val); extern void LWLockReleaseAll(void); extern bool LWLockHeldByMe(LWLock* lock); diff --git a/src/include/storage/lock/pg_sema.h b/src/include/storage/lock/pg_sema.h index 5ad8536a7..54f691b49 100644 --- a/src/include/storage/lock/pg_sema.h +++ b/src/include/storage/lock/pg_sema.h @@ -81,6 +81,8 @@ extern void PGSemaphoreUnlock(PGSemaphore sema); /* Lock a semaphore only if able to do so without blocking */ extern bool PGSemaphoreTryLock(PGSemaphore sema); +extern int PGSemaphoreLockTimeout(PGSemaphore sema, int timeout_ms); + extern void cancelSemphoreRelease(void); #endif /* PG_SEMA_H */