From 573423cac695fd7aff6bfbb2d3c6dd3b359e3293 Mon Sep 17 00:00:00 2001 From: Chunling Wang Date: Thu, 28 Jul 2022 18:28:43 +0800 Subject: [PATCH] issue#I5KOL1 Use pthread_cond_timedwait instead of pthread_cond_wait to handle m_readySessionList when all workers are idle --- .../process/threadpool/threadpool_listener.cpp | 4 ++++ .../process/threadpool/threadpool_stream.cpp | 6 +++++- .../process/threadpool/threadpool_worker.cpp | 14 +++++++++++--- src/include/threadpool/threadpool_listener.h | 1 + 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/gausskernel/process/threadpool/threadpool_listener.cpp b/src/gausskernel/process/threadpool/threadpool_listener.cpp index e3caab1a0..20ceff9a8 100644 --- a/src/gausskernel/process/threadpool/threadpool_listener.cpp +++ b/src/gausskernel/process/threadpool/threadpool_listener.cpp @@ -500,6 +500,10 @@ bool ThreadPoolListener::GetSessIshang(instr_time* current_time, uint64* session return ishang; } +bool ThreadPoolListener::hasNoReadySession() { + return (m_readySessionList->GetLength() == 0); +} + Dlelem *ThreadPoolListener::GetFreeWorker(knl_session_context* session) { /* only lite mode need find right threadworker, diff --git a/src/gausskernel/process/threadpool/threadpool_stream.cpp b/src/gausskernel/process/threadpool/threadpool_stream.cpp index 44234a822..0bf66858f 100644 --- a/src/gausskernel/process/threadpool/threadpool_stream.cpp +++ b/src/gausskernel/process/threadpool/threadpool_stream.cpp @@ -75,13 +75,17 @@ int ThreadPoolStream::StartUp(int idx, StreamProducer* producer, ThreadPoolGroup void ThreadPoolStream::WaitMission() { + struct timespec ts; PreventSignal(); pthread_mutex_lock(m_mutex); while (m_producer == NULL) { if (m_threadStatus == THREAD_EXIT) { break; } - pthread_cond_wait(m_cond, m_mutex); + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 10; // 10s + ts.tv_nsec = 0; + pthread_cond_timedwait(m_cond, m_mutex, &ts); } pthread_mutex_unlock(m_mutex); Assert(t_thrd.proc->pid == t_thrd.proc_cxt.MyProcPid); diff --git a/src/gausskernel/process/threadpool/threadpool_worker.cpp b/src/gausskernel/process/threadpool/threadpool_worker.cpp index 44cd0042f..039d53fa2 100644 --- a/src/gausskernel/process/threadpool/threadpool_worker.cpp +++ b/src/gausskernel/process/threadpool/threadpool_worker.cpp @@ -380,6 +380,7 @@ void ThreadPoolWorker::WaitNextSession() /* Return worker to pool unless we can get a task right now. */ ThreadPoolListener* lsn = m_group->GetListener(); Assert(lsn != NULL); + struct timespec ts; while (true) { /* Wait if the thread was turned into pending mode. */ @@ -406,11 +407,14 @@ void ThreadPoolWorker::WaitNextSession() WaitState oldStatus = pgstat_report_waitstatus(STATE_WAIT_COMM); pthread_mutex_lock(m_mutex); - while (!m_currentSession) { + while (!m_currentSession && lsn->hasNoReadySession()) { if (unlikely(m_threadStatus == THREAD_PENDING || m_threadStatus == THREAD_EXIT)) { break; } - pthread_cond_wait(m_cond, m_mutex); + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 10; // 10s + ts.tv_nsec = 0; + pthread_cond_timedwait(m_cond, m_mutex, &ts); } pthread_mutex_unlock(m_mutex); m_group->GetListener()->RemoveWorkerFromList(this); @@ -428,10 +432,14 @@ void ThreadPoolWorker::WaitNextSession() void ThreadPoolWorker::Pending() { + struct timespec ts; pg_atomic_fetch_sub_u32((volatile uint32*)&m_group->m_workerNum, 1); pthread_mutex_lock(m_mutex); while (m_threadStatus == THREAD_PENDING) { - pthread_cond_wait(m_cond, m_mutex); + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 10; // 10s + ts.tv_nsec = 0; + pthread_cond_timedwait(m_cond, m_mutex, &ts); } pthread_mutex_unlock(m_mutex); pg_atomic_fetch_add_u32((volatile uint32*)&m_group->m_workerNum, 1); diff --git a/src/include/threadpool/threadpool_listener.h b/src/include/threadpool/threadpool_listener.h index addfbfc0d..65709fbcb 100644 --- a/src/include/threadpool/threadpool_listener.h +++ b/src/include/threadpool/threadpool_listener.h @@ -52,6 +52,7 @@ public: void ReaperAllSession(); void ShutDown() const; bool GetSessIshang(instr_time* current_time, uint64* sessionId); + bool hasNoReadySession(); inline ThreadPoolGroup* GetGroup() {