Revert "issue#I5KOL1 Use pthread_cond_timedwait instead of pthread_cond_wait to handle"

This reverts commit 573423cac695fd7aff6bfbb2d3c6dd3b359e3293.
This commit is contained in:
gentle_hu
2022-09-15 20:19:26 +08:00
parent ff1e91fc41
commit d7a713ab1a
4 changed files with 4 additions and 21 deletions

View File

@ -501,10 +501,6 @@ bool ThreadPoolListener::GetSessIshang(instr_time* current_time, uint64* session
return ishang;
}
bool ThreadPoolListener::hasNoReadySession() {
return (m_readySessionList->GetLength() == 0);
}
Dlelem *ThreadPoolListener::GetFreeWorker(knl_session_context* session)
{
/* only lite mode need find right threadworker,

View File

@ -75,17 +75,13 @@ int ThreadPoolStream::StartUp(int idx, StreamProducer* producer, ThreadPoolGroup
void ThreadPoolStream::WaitMission()
{
struct timespec ts;
PreventSignal();
pthread_mutex_lock(m_mutex);
while (m_producer == NULL) {
if (m_threadStatus == THREAD_EXIT) {
break;
}
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 10; // 10s
ts.tv_nsec = 0;
pthread_cond_timedwait(m_cond, m_mutex, &ts);
pthread_cond_wait(m_cond, m_mutex);
}
pthread_mutex_unlock(m_mutex);
Assert(t_thrd.proc->pid == t_thrd.proc_cxt.MyProcPid);

View File

@ -400,7 +400,6 @@ void ThreadPoolWorker::WaitNextSession()
/* Return worker to pool unless we can get a task right now. */
ThreadPoolListener* lsn = m_group->GetListener();
Assert(lsn != NULL);
struct timespec ts;
while (true) {
/* Wait if the thread was turned into pending mode. */
@ -435,14 +434,11 @@ void ThreadPoolWorker::WaitNextSession()
WaitState oldStatus = pgstat_report_waitstatus(STATE_WAIT_COMM);
pthread_mutex_lock(m_mutex);
while (!m_currentSession && lsn->hasNoReadySession()) {
while (!m_currentSession) {
if (unlikely(m_threadStatus == THREAD_PENDING || m_threadStatus == THREAD_EXIT)) {
break;
}
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 10; // 10s
ts.tv_nsec = 0;
pthread_cond_timedwait(m_cond, m_mutex, &ts);
pthread_cond_wait(m_cond, m_mutex);
}
pthread_mutex_unlock(m_mutex);
m_group->GetListener()->RemoveWorkerFromList(this);
@ -460,14 +456,10 @@ void ThreadPoolWorker::WaitNextSession()
void ThreadPoolWorker::Pending()
{
struct timespec ts;
pg_atomic_fetch_sub_u32((volatile uint32*)&m_group->m_workerNum, 1);
pthread_mutex_lock(m_mutex);
while (m_threadStatus == THREAD_PENDING) {
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 10; // 10s
ts.tv_nsec = 0;
pthread_cond_timedwait(m_cond, m_mutex, &ts);
pthread_cond_wait(m_cond, m_mutex);
}
pthread_mutex_unlock(m_mutex);
pg_atomic_fetch_add_u32((volatile uint32*)&m_group->m_workerNum, 1);

View File

@ -52,7 +52,6 @@ public:
void ReaperAllSession();
void ShutDown() const;
bool GetSessIshang(instr_time* current_time, uint64* sessionId);
bool hasNoReadySession();
inline ThreadPoolGroup* GetGroup()
{