!3784 优化SMP并行退出机制,避免宕机或子线程残留

Merge pull request !3784 from npczwh1/smp_quitWaitCond
This commit is contained in:
opengauss_bot
2023-10-12 09:06:48 +00:00
committed by Gitee
2 changed files with 44 additions and 7 deletions

View File

@ -338,6 +338,7 @@ StreamNodeGroup::StreamNodeGroup()
m_streamNum(1),
m_createThreadNum(0),
m_streamEnter(0),
m_streamEnterCount(0),
m_canceled(false),
m_needClean(false),
m_errorStop(false),
@ -602,6 +603,8 @@ void StreamNodeGroup::cancelStreamThread()
*/
void StreamNodeGroup::quitSyncPoint()
{
int timeout = 30;
if (StreamThreadAmI() == true) {
StreamPair* pair = NULL;
AutoMutexLock streamLock(&m_mutex);
@ -609,6 +612,7 @@ void StreamNodeGroup::quitSyncPoint()
/* signal the top consumer if i am the last stream thread. */
streamLock.lock();
m_streamEnter++;
m_streamEnterCount++;
Assert(u_sess->stream_cxt.producer_obj != NULL);
pair = (u_sess->stream_cxt.producer_obj)->getPair();
@ -621,11 +625,24 @@ void StreamNodeGroup::quitSyncPoint()
Assert(m_quitWaitCond >= 0);
Assert(pair->expectThreadNum >= pair->createThreadNum);
if (m_quitWaitCond == 0)
if (m_quitWaitCond < 0) {
ereport(WARNING, (errmsg("Stream sub thread m_quitWaitCond invalid: %d. "
"To get backtrace detail, set backtrace_min_messages=warning.", m_quitWaitCond)));
ereport(LOG, (errmsg("Stream info, smp id: %u, m_streamEnter: %d, m_streamEnterCount: %d, "
"ThreadId: %u, m_createThreadNum: %d, m_size: %d",
u_sess->stream_cxt.smp_id, m_streamEnter, m_streamEnterCount,
(u_sess->stream_cxt.producer_obj)->getThreadId(), m_createThreadNum, m_size)));
}
if (m_quitWaitCond <= 0)
pthread_cond_broadcast(&m_cond);
else {
while (m_quitWaitCond != 0)
pthread_cond_wait(&m_cond, &m_mutex);
struct timespec ts;
while (m_quitWaitCond > 0) {
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += timeout;
ts.tv_nsec = 0;
pthread_cond_timedwait(&m_cond, &m_mutex, &ts);
}
}
streamLock.unLock();
} else if (StreamTopConsumerAmI() == true) {
@ -640,7 +657,13 @@ void StreamNodeGroup::quitSyncPoint()
Assert(m_quitWaitCond >= 0);
Assert(m_size >= m_createThreadNum);
if (m_quitWaitCond == 0)
if (m_quitWaitCond < 0) {
ereport(WARNING, (errmsg("Stream top consumer thread m_quitWaitCond invalid: %d. "
"To get backtrace detail, set backtrace_min_messages=warning.", m_quitWaitCond)));
ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_streamEnterCount: %d, m_createThreadNum: %d, m_size: %d",
m_streamEnter, m_streamEnterCount, m_createThreadNum, m_size)));
}
if (m_quitWaitCond <= 0)
pthread_cond_broadcast(&m_cond);
else {
/*
@ -668,13 +691,24 @@ void StreamNodeGroup::quitSyncPoint()
*/
CHECK_FOR_INTERRUPTS();
while (m_quitWaitCond != 0)
pthread_cond_wait(&m_cond, &m_mutex);
struct timespec ts;
while (m_quitWaitCond > 0) {
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += timeout;
ts.tv_nsec = 0;
pthread_cond_timedwait(&m_cond, &m_mutex, &ts);
}
t_thrd.int_cxt.ImmediateInterruptOK = false;
u_sess->stream_cxt.in_waiting_quit = false;
}
if (m_streamEnterCount < m_createThreadNum) {
ereport(WARNING, (errmsg("Stream top consumer thread m_streamEnterCount invalid: %d", m_streamEnterCount)));
ereport(LOG, (errmsg("Stream info, m_streamEnter: %d, m_streamEnterCount: %d, m_createThreadNum: %d, m_size: %d",
m_streamEnter, m_streamEnterCount, m_createThreadNum, m_size)));
}
streamLock.unLock();
}
} else
@ -1055,7 +1089,7 @@ void StreamNodeGroup::deInit(StreamObjStatus status)
do {
streamLock1.lock();
if (m_streamEnter == 0)
if (m_streamEnter <= 0 && m_streamEnterCount >= m_createThreadNum)
saveQuit = true;
streamLock1.unLock();

View File

@ -471,6 +471,9 @@ private:
/* A flag to indicate stream enter the quit point. */
volatile int m_streamEnter;
/* A counter to indicate stream enter the quit point. */
volatile int m_streamEnterCount;
/* Mutex and condition waiting for all thread in the node group is complete. */
pthread_mutex_t m_mutex;