fix #I5UDEI Check hang when m_readySessionList is not empty while all workers are idle

This commit is contained in:
Chunling Wang
2022-11-08 15:41:29 +08:00
parent 4106583369
commit 218df0982d
7 changed files with 79 additions and 19 deletions

View File

@ -882,7 +882,7 @@ int ThreadPoolControler::DispatchSession(Port* port)
return STATUS_ERROR;
}
/* if this group is hanged, we don't accept new session */
if (grp->IsGroupHanged()) {
if (grp->isGroupAlreadyTooBusy()) {
ereport(WARNING,
(errmodule(MOD_THREAD_POOL),
errmsg("Group[%d] is too busy to add new session for now.", grp->GetGroupId())));

View File

@ -72,7 +72,7 @@ ThreadPoolGroup::ThreadPoolGroup(int maxWorkerNum, int expectWorkerNum, int maxS
m_sessionCount(0),
m_waitServeSessionCount(0),
m_processTaskCount(0),
m_hasHanged(0),
m_isTooBusy(0),
m_groupId(groupId),
m_numaId(numaId),
m_groupCpuNum(cpuNum),
@ -372,25 +372,38 @@ void ThreadPoolGroup::ShutDownThreads()
alock.unLock();
}
bool ThreadPoolGroup::IsGroupHang()
bool ThreadPoolGroup::IsGroupTooBusy()
{
if (pg_atomic_exchange_u32((volatile uint32*)&m_processTaskCount, 0) != 0 ||
m_idleWorkerNum != 0)
return false;
bool ishang = m_listener->GetSessIshang(&m_current_time, &m_sessionId);
return ishang;
bool isTooBusy = m_listener->GetSessIshang(&m_current_time, &m_sessionId);
return isTooBusy;
}
void ThreadPoolGroup::SetGroupHanged(bool isHang)
bool ThreadPoolGroup::CheckGroupHang()
{
pg_atomic_exchange_u32((volatile uint32*)&m_hasHanged, (uint32)isHang);
if (m_waitServeSessionCount == 0 || m_idleWorkerNum < m_workerNum)
return false;
bool isHang = m_listener->GetSessIshang(&m_current_time, &m_sessionId);
if (!isHang)
return false;
m_listener->WakeupForHang();
return true;
}
bool ThreadPoolGroup::IsGroupHanged()
void ThreadPoolGroup::SetGroupTooBusy(bool isTooBusy)
{
pg_atomic_exchange_u32((volatile uint32*)&m_isTooBusy, (uint32)isTooBusy);
}
bool ThreadPoolGroup::isGroupAlreadyTooBusy()
{
pg_memory_barrier();
return m_hasHanged != 0;
return m_isTooBusy != 0;
}
void ThreadPoolGroup::AttachThreadToCPU(ThreadId thread, int cpu)

View File

@ -62,6 +62,11 @@ static void ListenerSIGUSR1Handler(SIGNAL_ARGS)
t_thrd.threadpool_cxt.listener->m_reaperAllSession = true;
}
static void ListenerSIGUSR2Handler(SIGNAL_ARGS)
{
// Do nothing but wakeup for epoll
}
static void ListenerSIGKILLHandler(SIGNAL_ARGS)
{
t_thrd.threadpool_cxt.listener->m_getKilled = true;
@ -79,7 +84,7 @@ void TpoolListenerMain(ThreadPoolListener* listener)
(void)gspqsignal(SIGQUIT, SIG_IGN);
(void)gspqsignal(SIGPIPE, SIG_IGN);
(void)gspqsignal(SIGUSR1, ListenerSIGUSR1Handler);
(void)gspqsignal(SIGUSR2, SIG_IGN);
(void)gspqsignal(SIGUSR2, ListenerSIGUSR2Handler);
(void)gspqsignal(SIGFPE, FloatExceptionHandler);
(void)gspqsignal(SIGCHLD, SIG_DFL);
(void)gspqsignal(SIGHUP, SIG_IGN);
@ -113,6 +118,7 @@ ThreadPoolListener::ThreadPoolListener(ThreadPoolGroup* group)
m_epollEvents = NULL;
m_reaperAllSession = false;
m_getKilled = false;
m_isHang = 0;
m_freeWorkerList = New(CurrentMemoryContext) DllistWithLock();
m_readySessionList = New(CurrentMemoryContext) DllistWithLock();
m_idleSessionList = New(CurrentMemoryContext) DllistWithLock();
@ -362,6 +368,10 @@ void ThreadPoolListener::WaitTask()
ReaperAllSession();
}
if (unlikely(m_isHang != 0)) {
WakeupReadySessionList();
}
/* as we specify timeout -1, so 0 will not be return, either > 0 or < 0 */
if (ENABLE_THREAD_POOL_DN_LOGICCONN) {
nevents = CommEpollWait(m_epollFd, m_epollEvents, GLOBAL_MAX_SESSION_NUM, -1);
@ -454,7 +464,7 @@ void ThreadPoolListener::DispatchSession(knl_session_context* session)
errmsg("%s remove session:%lu from idleSessionList to readySessionList",
__func__, session->session_id)));
INSTR_TIME_SET_CURRENT(session->last_access_time);
/* Add new session to the head so the connection request can be quickly processed. */
if (session->status == KNL_SESS_UNINIT) {
AddIdleSessionToHead(session);
@ -512,6 +522,28 @@ bool ThreadPoolListener::GetSessIshang(instr_time* current_time, uint64* session
return ishang;
}
void ThreadPoolListener::WakeupForHang() {
pg_atomic_exchange_u32((volatile uint32*)&m_isHang, 1);
gs_signal_send(m_tid, SIGUSR2);
}
void ThreadPoolListener::WakeupReadySessionList() {
Dlelem *elem = m_readySessionList->RemoveHead();
knl_session_context *sess = NULL;
// last time WakeupReadySession() is not finished, but m_isHang is set again
while (elem != NULL && m_group->m_idleWorkerNum > 0) {
sess = (knl_session_context *)DLE_VAL(elem);
ereport(DEBUG2,
(errmodule(MOD_THREAD_POOL),
errmsg("WakeupReadySessionList remove a session:%lu from m_readySessionList", sess->session_id)));
DispatchSession(sess);
elem = m_readySessionList->RemoveHead();
}
// m_isHang maybe set true when we do checkGroupHang again before it, now we will miss one time.
// But if group is actually hang, m_isHang will be set true again.
pg_atomic_exchange_u32((volatile uint32*)&m_isHang, 0);
}
Dlelem *ThreadPoolListener::GetFreeWorker(knl_session_context* session)
{
/* only lite mode need find right threadworker,

View File

@ -95,6 +95,7 @@ void TpoolSchedulerMain(ThreadPoolScheduler *scheduler)
reloadConfigFileIfNecessary();
scheduler->DynamicAdjustThreadPool();
scheduler->GPCScheduleCleaner(&gpc_count);
scheduler->CheckGroupHang();
g_threadPoolControler->GetSessionCtrl()->CheckSessionTimeout();
#ifndef ENABLE_MULTIPLE_NODES
g_threadPoolControler->GetSessionCtrl()->CheckIdleInTransactionSessionTimeout();
@ -157,6 +158,15 @@ void ThreadPoolScheduler::GPCScheduleCleaner(int* gpc_count)
(*gpc_count)++;
}
void ThreadPoolScheduler::CheckGroupHang()
{
for (int i = 0; i < m_groupNum; i++) {
if (pmState == PM_RUN) {
m_groups[i]->CheckGroupHang();
}
}
}
void ThreadPoolScheduler::ShutDown() const
{
if (m_tid != 0)
@ -167,12 +177,12 @@ void ThreadPoolScheduler::AdjustWorkerPool(int idx)
{
ThreadPoolGroup* group = m_groups[idx];
/* When no idle worker and no task has been processed, the system may hang. */
if (group->IsGroupHang()) {
if (group->IsGroupTooBusy()) {
m_hangTestCount[idx]++;
m_freeTestCount[idx] = 0;
EnlargeWorkerIfNecessage(idx);
} else {
group->SetGroupHanged(false);
group->SetGroupTooBusy(false);
m_hangTestCount[idx] = 0;
m_freeTestCount[idx]++;
ReduceWorkerIfNecessary(idx);
@ -208,7 +218,7 @@ void ThreadPoolScheduler::EnlargeWorkerIfNecessage(int groupIdx)
"and the thread num in pool exceed maximum, "
"so we need to close all new sessions.", MAX_HANG_TIME);
/* set flag for don't accept new session */
group->SetGroupHanged(true);
group->SetGroupTooBusy(true);
}
}

View File

@ -87,10 +87,11 @@ public:
float4 GetSessionPerThread();
void GetThreadPoolGroupStat(ThreadPoolStat* stat);
/* get ready session list check for hang */
bool IsGroupHang();
/* check for hang flag */
void SetGroupHanged(bool isHang);
bool IsGroupHanged();
bool IsGroupTooBusy();
bool CheckGroupHang();
/* check for too busy flag */
void SetGroupTooBusy(bool isTooBusy);
bool isGroupAlreadyTooBusy();
inline ThreadPoolListener* GetListener()
{
@ -150,7 +151,7 @@ private:
volatile int m_sessionCount; // all session count;
volatile int m_waitServeSessionCount; // wait for worker to server
volatile int m_processTaskCount;
volatile int m_hasHanged;
volatile int m_isTooBusy;
int m_groupId;
int m_numaId;

View File

@ -36,6 +36,7 @@ public:
ThreadPoolGroup* m_group;
volatile bool m_reaperAllSession;
bool m_getKilled;
volatile int m_isHang;
ThreadPoolListener(ThreadPoolGroup* group);
~ThreadPoolListener();
@ -52,6 +53,8 @@ public:
void ReaperAllSession();
void ShutDown() const;
bool GetSessIshang(instr_time* current_time, uint64* sessionId);
void WakeupForHang();
void WakeupReadySessionList();
inline ThreadPoolGroup* GetGroup()
{

View File

@ -32,6 +32,7 @@ public:
int StartUp();
void DynamicAdjustThreadPool();
void GPCScheduleCleaner(int* gpc_count);
void CheckGroupHang();
void ShutDown() const;
void SigHupHandler();
inline ThreadId GetThreadId()