add m_streamEnterCount, optimize main thread quit function

This commit is contained in:
zhangwh
2023-07-13 16:51:01 +08:00
parent 67100fa4b9
commit 449b17d9c1
2 changed files with 18 additions and 5 deletions

View File

@ -338,6 +338,7 @@ StreamNodeGroup::StreamNodeGroup()
m_streamNum(1),
m_createThreadNum(0),
m_streamEnter(0),
m_streamEnterCount(0),
m_canceled(false),
m_needClean(false),
m_errorStop(false),
@ -611,6 +612,7 @@ void StreamNodeGroup::quitSyncPoint()
/* signal the top consumer if i am the last stream thread. */
streamLock.lock();
m_streamEnter++;
m_streamEnterCount++;
Assert(u_sess->stream_cxt.producer_obj != NULL);
pair = (u_sess->stream_cxt.producer_obj)->getPair();
@ -626,8 +628,10 @@ void StreamNodeGroup::quitSyncPoint()
if (m_quitWaitCond < 0) {
ereport(WARNING, (errmsg("Stream sub thread m_quitWaitCond invalid: %d. "
"To get backtrace detail, set backtrace_min_messages=warning.", m_quitWaitCond)));
ereport(LOG, (errmsg("Stream info, smp id: %u, m_streamEnter: %d, ThreadId: %u, m_createThreadNum: %d, m_size: %d",
u_sess->stream_cxt.smp_id, m_streamEnter, (u_sess->stream_cxt.producer_obj)->getThreadId(), m_createThreadNum, m_size)));
ereport(LOG, (errmsg("Stream info, smp id: %u, m_streamEnter: %d, m_streamEnterCount: %d, "
"ThreadId: %u, m_createThreadNum: %d, m_size: %d",
u_sess->stream_cxt.smp_id, m_streamEnter, m_streamEnterCount,
(u_sess->stream_cxt.producer_obj)->getThreadId(), m_createThreadNum, m_size)));
}
if (m_quitWaitCond <= 0)
pthread_cond_broadcast(&m_cond);
@ -656,8 +660,8 @@ void StreamNodeGroup::quitSyncPoint()
if (m_quitWaitCond < 0) {
ereport(WARNING, (errmsg("Stream top consumer thread m_quitWaitCond invalid: %d. "
"To get backtrace detail, set backtrace_min_messages=warning.", m_quitWaitCond)));
ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_createThreadNum: %d, m_size: %d",
m_streamEnter, m_createThreadNum, m_size)));
ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_streamEnterCount: %d, m_createThreadNum: %d, m_size: %d",
m_streamEnter, m_streamEnterCount, m_createThreadNum, m_size)));
}
if (m_quitWaitCond <= 0)
pthread_cond_broadcast(&m_cond);
@ -699,6 +703,12 @@ void StreamNodeGroup::quitSyncPoint()
u_sess->stream_cxt.in_waiting_quit = false;
}
if (m_streamEnterCount < m_createThreadNum) {
ereport(WARNING, (errmsg("Stream top consumer thread m_streamEnterCount invalid: %d", m_streamEnterCount)));
ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_streamEnterCount: %d, m_createThreadNum: %d, m_size: %d",
m_streamEnter, m_streamEnterCount, m_createThreadNum, m_size)));
}
streamLock.unLock();
}
} else
@ -1079,7 +1089,7 @@ void StreamNodeGroup::deInit(StreamObjStatus status)
do {
streamLock1.lock();
if (m_streamEnter == 0)
if (m_streamEnter <= 0 && m_streamEnterCount >= m_createThreadNum)
saveQuit = true;
streamLock1.unLock();

View File

@ -471,6 +471,9 @@ private:
/* A flag to indicate stream enter the quit point. */
volatile int m_streamEnter;
/* A counter to indicate stream enter the quit point. */
volatile int m_streamEnterCount;
/* Mutex and condition waiting for all thread in the node group is complete. */
pthread_mutex_t m_mutex;