diff --git a/src/gausskernel/process/stream/streamCore.cpp b/src/gausskernel/process/stream/streamCore.cpp index 40903ce83..19c34ef8c 100755 --- a/src/gausskernel/process/stream/streamCore.cpp +++ b/src/gausskernel/process/stream/streamCore.cpp @@ -338,6 +338,7 @@ StreamNodeGroup::StreamNodeGroup() m_streamNum(1), m_createThreadNum(0), m_streamEnter(0), + m_streamEnterCount(0), m_canceled(false), m_needClean(false), m_errorStop(false), @@ -602,6 +603,8 @@ void StreamNodeGroup::cancelStreamThread() */ void StreamNodeGroup::quitSyncPoint() { + int timeout = 30; + if (StreamThreadAmI() == true) { StreamPair* pair = NULL; AutoMutexLock streamLock(&m_mutex); @@ -609,6 +612,7 @@ void StreamNodeGroup::quitSyncPoint() /* signal the top consumer if i am the last stream thread. */ streamLock.lock(); m_streamEnter++; + m_streamEnterCount++; Assert(u_sess->stream_cxt.producer_obj != NULL); pair = (u_sess->stream_cxt.producer_obj)->getPair(); @@ -621,11 +625,24 @@ void StreamNodeGroup::quitSyncPoint() Assert(m_quitWaitCond >= 0); Assert(pair->expectThreadNum >= pair->createThreadNum); - if (m_quitWaitCond == 0) + if (m_quitWaitCond < 0) { + ereport(WARNING, (errmsg("Stream sub thread m_quitWaitCond invalid: %d. " + "To get backtrace detail, set backtrace_min_messages=warning.", m_quitWaitCond))); + ereport(LOG, (errmsg("Stream info, smp id: %u, m_streamEnter: %d, m_streamEnterCount: %d, " + "ThreadId: %u, m_createThreadNum: %d, m_size: %d", + u_sess->stream_cxt.smp_id, m_streamEnter, m_streamEnterCount, + (u_sess->stream_cxt.producer_obj)->getThreadId(), m_createThreadNum, m_size))); + } + if (m_quitWaitCond <= 0) pthread_cond_broadcast(&m_cond); else { - while (m_quitWaitCond != 0) - pthread_cond_wait(&m_cond, &m_mutex); + struct timespec ts; + while (m_quitWaitCond > 0) { + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += timeout; + ts.tv_nsec = 0; + pthread_cond_timedwait(&m_cond, &m_mutex, &ts); + } } streamLock.unLock(); } else if (StreamTopConsumerAmI() == true) { @@ -640,7 +657,13 @@ void StreamNodeGroup::quitSyncPoint() Assert(m_quitWaitCond >= 0); Assert(m_size >= m_createThreadNum); - if (m_quitWaitCond == 0) + if (m_quitWaitCond < 0) { + ereport(WARNING, (errmsg("Stream top consumer thread m_quitWaitCond invalid: %d. " + "To get backtrace detail, set backtrace_min_messages=warning.", m_quitWaitCond))); + ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_streamEnterCount: %d, m_createThreadNum: %d, m_size: %d", + m_streamEnter, m_streamEnterCount, m_createThreadNum, m_size))); + } + if (m_quitWaitCond <= 0) pthread_cond_broadcast(&m_cond); else { /* @@ -668,13 +691,24 @@ void StreamNodeGroup::quitSyncPoint() */ CHECK_FOR_INTERRUPTS(); - while (m_quitWaitCond != 0) - pthread_cond_wait(&m_cond, &m_mutex); + struct timespec ts; + while (m_quitWaitCond > 0) { + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += timeout; + ts.tv_nsec = 0; + pthread_cond_timedwait(&m_cond, &m_mutex, &ts); + } t_thrd.int_cxt.ImmediateInterruptOK = false; u_sess->stream_cxt.in_waiting_quit = false; } + if (m_streamEnterCount < m_createThreadNum) { + ereport(WARNING, (errmsg("Stream top consumer thread m_streamEnterCount invalid: %d", m_streamEnterCount))); + ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_streamEnterCount: %d, m_createThreadNum: %d, m_size: %d", + m_streamEnter, m_streamEnterCount, m_createThreadNum, m_size))); + } + streamLock.unLock(); } } else @@ -1055,7 +1089,7 @@ void StreamNodeGroup::deInit(StreamObjStatus status) do { streamLock1.lock(); - if (m_streamEnter == 0) + if (m_streamEnter <= 0 && m_streamEnterCount >= m_createThreadNum) saveQuit = true; streamLock1.unLock(); diff --git a/src/include/distributelayer/streamCore.h b/src/include/distributelayer/streamCore.h index f356afd3c..d899cbf01 100755 --- a/src/include/distributelayer/streamCore.h +++ b/src/include/distributelayer/streamCore.h @@ -471,6 +471,9 @@ private: /* A flag to indicate stream enter the quit point. */ volatile int m_streamEnter; + /* A counter to indicate stream enter the quit point. */ + volatile int m_streamEnterCount; + /* Mutex and condition waiting for all thread in the node group is complete. */ pthread_mutex_t m_mutex;