From 218df0982d90a7c9e1c21193c95d2d858d115ad2 Mon Sep 17 00:00:00 2001 From: Chunling Wang Date: Tue, 8 Nov 2022 15:41:29 +0800 Subject: [PATCH] fix #I5UDEI Check hang when m_readySessionList is not empty while all workers are idle --- .../threadpool/threadpool_controler.cpp | 2 +- .../process/threadpool/threadpool_group.cpp | 29 ++++++++++----- .../threadpool/threadpool_listener.cpp | 36 +++++++++++++++++-- .../threadpool/threadpool_scheduler.cpp | 16 +++++++-- src/include/threadpool/threadpool_group.h | 11 +++--- src/include/threadpool/threadpool_listener.h | 3 ++ src/include/threadpool/threadpool_scheduler.h | 1 + 7 files changed, 79 insertions(+), 19 deletions(-) diff --git a/src/gausskernel/process/threadpool/threadpool_controler.cpp b/src/gausskernel/process/threadpool/threadpool_controler.cpp index cf51834e7..33e7ea11a 100644 --- a/src/gausskernel/process/threadpool/threadpool_controler.cpp +++ b/src/gausskernel/process/threadpool/threadpool_controler.cpp @@ -882,7 +882,7 @@ int ThreadPoolControler::DispatchSession(Port* port) return STATUS_ERROR; } /* if this group is hanged, we don't accept new session */ - if (grp->IsGroupHanged()) { + if (grp->isGroupAlreadyTooBusy()) { ereport(WARNING, (errmodule(MOD_THREAD_POOL), errmsg("Group[%d] is too busy to add new session for now.", grp->GetGroupId()))); diff --git a/src/gausskernel/process/threadpool/threadpool_group.cpp b/src/gausskernel/process/threadpool/threadpool_group.cpp index 1fa52aa1c..f45371f86 100644 --- a/src/gausskernel/process/threadpool/threadpool_group.cpp +++ b/src/gausskernel/process/threadpool/threadpool_group.cpp @@ -72,7 +72,7 @@ ThreadPoolGroup::ThreadPoolGroup(int maxWorkerNum, int expectWorkerNum, int maxS m_sessionCount(0), m_waitServeSessionCount(0), m_processTaskCount(0), - m_hasHanged(0), + m_isTooBusy(0), m_groupId(groupId), m_numaId(numaId), m_groupCpuNum(cpuNum), @@ -372,25 +372,38 @@ void ThreadPoolGroup::ShutDownThreads() alock.unLock(); } -bool ThreadPoolGroup::IsGroupHang() +bool ThreadPoolGroup::IsGroupTooBusy() { if (pg_atomic_exchange_u32((volatile uint32*)&m_processTaskCount, 0) != 0 || m_idleWorkerNum != 0) return false; - bool ishang = m_listener->GetSessIshang(&m_current_time, &m_sessionId); - return ishang; + bool isTooBusy = m_listener->GetSessIshang(&m_current_time, &m_sessionId); + return isTooBusy; } -void ThreadPoolGroup::SetGroupHanged(bool isHang) +bool ThreadPoolGroup::CheckGroupHang() { - pg_atomic_exchange_u32((volatile uint32*)&m_hasHanged, (uint32)isHang); + if (m_waitServeSessionCount == 0 || m_idleWorkerNum < m_workerNum) + return false; + + bool isHang = m_listener->GetSessIshang(&m_current_time, &m_sessionId); + if (!isHang) + return false; + + m_listener->WakeupForHang(); + return true; } -bool ThreadPoolGroup::IsGroupHanged() +void ThreadPoolGroup::SetGroupTooBusy(bool isTooBusy) +{ + pg_atomic_exchange_u32((volatile uint32*)&m_isTooBusy, (uint32)isTooBusy); +} + +bool ThreadPoolGroup::isGroupAlreadyTooBusy() { pg_memory_barrier(); - return m_hasHanged != 0; + return m_isTooBusy != 0; } void ThreadPoolGroup::AttachThreadToCPU(ThreadId thread, int cpu) diff --git a/src/gausskernel/process/threadpool/threadpool_listener.cpp b/src/gausskernel/process/threadpool/threadpool_listener.cpp index eb92d7daa..dc62f596b 100644 --- a/src/gausskernel/process/threadpool/threadpool_listener.cpp +++ b/src/gausskernel/process/threadpool/threadpool_listener.cpp @@ -62,6 +62,11 @@ static void ListenerSIGUSR1Handler(SIGNAL_ARGS) t_thrd.threadpool_cxt.listener->m_reaperAllSession = true; } +static void ListenerSIGUSR2Handler(SIGNAL_ARGS) +{ + // Do nothing but wakeup for epoll +} + static void ListenerSIGKILLHandler(SIGNAL_ARGS) { t_thrd.threadpool_cxt.listener->m_getKilled = true; @@ -79,7 +84,7 @@ void TpoolListenerMain(ThreadPoolListener* listener) (void)gspqsignal(SIGQUIT, SIG_IGN); (void)gspqsignal(SIGPIPE, SIG_IGN); (void)gspqsignal(SIGUSR1, ListenerSIGUSR1Handler); - (void)gspqsignal(SIGUSR2, SIG_IGN); + (void)gspqsignal(SIGUSR2, ListenerSIGUSR2Handler); (void)gspqsignal(SIGFPE, FloatExceptionHandler); (void)gspqsignal(SIGCHLD, SIG_DFL); (void)gspqsignal(SIGHUP, SIG_IGN); @@ -113,6 +118,7 @@ ThreadPoolListener::ThreadPoolListener(ThreadPoolGroup* group) m_epollEvents = NULL; m_reaperAllSession = false; m_getKilled = false; + m_isHang = 0; m_freeWorkerList = New(CurrentMemoryContext) DllistWithLock(); m_readySessionList = New(CurrentMemoryContext) DllistWithLock(); m_idleSessionList = New(CurrentMemoryContext) DllistWithLock(); @@ -362,6 +368,10 @@ void ThreadPoolListener::WaitTask() ReaperAllSession(); } + if (unlikely(m_isHang != 0)) { + WakeupReadySessionList(); + } + /* as we specify timeout -1, so 0 will not be return, either > 0 or < 0 */ if (ENABLE_THREAD_POOL_DN_LOGICCONN) { nevents = CommEpollWait(m_epollFd, m_epollEvents, GLOBAL_MAX_SESSION_NUM, -1); @@ -454,7 +464,7 @@ void ThreadPoolListener::DispatchSession(knl_session_context* session) errmsg("%s remove session:%lu from idleSessionList to readySessionList", __func__, session->session_id))); INSTR_TIME_SET_CURRENT(session->last_access_time); - + /* Add new session to the head so the connection request can be quickly processed. */ if (session->status == KNL_SESS_UNINIT) { AddIdleSessionToHead(session); @@ -512,6 +522,28 @@ bool ThreadPoolListener::GetSessIshang(instr_time* current_time, uint64* session return ishang; } +void ThreadPoolListener::WakeupForHang() { + pg_atomic_exchange_u32((volatile uint32*)&m_isHang, 1); + gs_signal_send(m_tid, SIGUSR2); +} + +void ThreadPoolListener::WakeupReadySessionList() { + Dlelem *elem = m_readySessionList->RemoveHead(); + knl_session_context *sess = NULL; + // last time WakeupReadySession() is not finished, but m_isHang is set again + while (elem != NULL && m_group->m_idleWorkerNum > 0) { + sess = (knl_session_context *)DLE_VAL(elem); + ereport(DEBUG2, + (errmodule(MOD_THREAD_POOL), + errmsg("WakeupReadySessionList remove a session:%lu from m_readySessionList", sess->session_id))); + DispatchSession(sess); + elem = m_readySessionList->RemoveHead(); + } + // m_isHang maybe set true when we do checkGroupHang again before it, now we will miss one time. + // But if group is actually hang, m_isHang will be set true again. + pg_atomic_exchange_u32((volatile uint32*)&m_isHang, 0); +} + Dlelem *ThreadPoolListener::GetFreeWorker(knl_session_context* session) { /* only lite mode need find right threadworker, diff --git a/src/gausskernel/process/threadpool/threadpool_scheduler.cpp b/src/gausskernel/process/threadpool/threadpool_scheduler.cpp index a1080f693..4e3a0f931 100644 --- a/src/gausskernel/process/threadpool/threadpool_scheduler.cpp +++ b/src/gausskernel/process/threadpool/threadpool_scheduler.cpp @@ -95,6 +95,7 @@ void TpoolSchedulerMain(ThreadPoolScheduler *scheduler) reloadConfigFileIfNecessary(); scheduler->DynamicAdjustThreadPool(); scheduler->GPCScheduleCleaner(&gpc_count); + scheduler->CheckGroupHang(); g_threadPoolControler->GetSessionCtrl()->CheckSessionTimeout(); #ifndef ENABLE_MULTIPLE_NODES g_threadPoolControler->GetSessionCtrl()->CheckIdleInTransactionSessionTimeout(); @@ -157,6 +158,15 @@ void ThreadPoolScheduler::GPCScheduleCleaner(int* gpc_count) (*gpc_count)++; } +void ThreadPoolScheduler::CheckGroupHang() +{ + for (int i = 0; i < m_groupNum; i++) { + if (pmState == PM_RUN) { + m_groups[i]->CheckGroupHang(); + } + } +} + void ThreadPoolScheduler::ShutDown() const { if (m_tid != 0) @@ -167,12 +177,12 @@ void ThreadPoolScheduler::AdjustWorkerPool(int idx) { ThreadPoolGroup* group = m_groups[idx]; /* When no idle worker and no task has been processed, the system may hang. */ - if (group->IsGroupHang()) { + if (group->IsGroupTooBusy()) { m_hangTestCount[idx]++; m_freeTestCount[idx] = 0; EnlargeWorkerIfNecessage(idx); } else { - group->SetGroupHanged(false); + group->SetGroupTooBusy(false); m_hangTestCount[idx] = 0; m_freeTestCount[idx]++; ReduceWorkerIfNecessary(idx); @@ -208,7 +218,7 @@ void ThreadPoolScheduler::EnlargeWorkerIfNecessage(int groupIdx) "and the thread num in pool exceed maximum, " "so we need to close all new sessions.", MAX_HANG_TIME); /* set flag for don't accept new session */ - group->SetGroupHanged(true); + group->SetGroupTooBusy(true); } } diff --git a/src/include/threadpool/threadpool_group.h b/src/include/threadpool/threadpool_group.h index a097ec8ef..a7ff35928 100644 --- a/src/include/threadpool/threadpool_group.h +++ b/src/include/threadpool/threadpool_group.h @@ -87,10 +87,11 @@ public: float4 GetSessionPerThread(); void GetThreadPoolGroupStat(ThreadPoolStat* stat); /* get ready session list check for hang */ - bool IsGroupHang(); - /* check for hang flag */ - void SetGroupHanged(bool isHang); - bool IsGroupHanged(); + bool IsGroupTooBusy(); + bool CheckGroupHang(); + /* check for too busy flag */ + void SetGroupTooBusy(bool isTooBusy); + bool isGroupAlreadyTooBusy(); inline ThreadPoolListener* GetListener() { @@ -150,7 +151,7 @@ private: volatile int m_sessionCount; // all session count; volatile int m_waitServeSessionCount; // wait for worker to server volatile int m_processTaskCount; - volatile int m_hasHanged; + volatile int m_isTooBusy; int m_groupId; int m_numaId; diff --git a/src/include/threadpool/threadpool_listener.h b/src/include/threadpool/threadpool_listener.h index 11f50aa4c..cc8ad0676 100644 --- a/src/include/threadpool/threadpool_listener.h +++ b/src/include/threadpool/threadpool_listener.h @@ -36,6 +36,7 @@ public: ThreadPoolGroup* m_group; volatile bool m_reaperAllSession; bool m_getKilled; + volatile int m_isHang; ThreadPoolListener(ThreadPoolGroup* group); ~ThreadPoolListener(); @@ -52,6 +53,8 @@ public: void ReaperAllSession(); void ShutDown() const; bool GetSessIshang(instr_time* current_time, uint64* sessionId); + void WakeupForHang(); + void WakeupReadySessionList(); inline ThreadPoolGroup* GetGroup() { diff --git a/src/include/threadpool/threadpool_scheduler.h b/src/include/threadpool/threadpool_scheduler.h index 4760be6fd..f067f4fb6 100644 --- a/src/include/threadpool/threadpool_scheduler.h +++ b/src/include/threadpool/threadpool_scheduler.h @@ -32,6 +32,7 @@ public: int StartUp(); void DynamicAdjustThreadPool(); void GPCScheduleCleaner(int* gpc_count); + void CheckGroupHang(); void ShutDown() const; void SigHupHandler(); inline ThreadId GetThreadId()