!836 修复stream线程池在启动stream线程数超出proc槽位后出现的hang问题

Merge pull request !836 from 杨皓/master
This commit is contained in:
opengauss-bot
2021-03-29 11:49:07 +08:00
committed by Gitee
5 changed files with 46 additions and 1 deletions

View File

@ -9721,6 +9721,8 @@ void SetExtraThreadInfo(knl_thread_arg* arg)
case THREADPOOL_STREAM: {
t_thrd.threadpool_cxt.stream = (ThreadPoolStream*)arg->payload;
t_thrd.threadpool_cxt.group = t_thrd.threadpool_cxt.stream->GetGroup();
StreamProducer* proObj = (StreamProducer*)t_thrd.threadpool_cxt.stream->GetProducer();
SetStreamWorkerInfo(proObj);
break;
}
#ifdef ENABLE_MULTIPLE_NODES

View File

@ -524,6 +524,11 @@ void StreamNodeGroup::signalStreamThreadInNodeGroup(int signo)
int ntimes = 1;
StreamProducer* producer = (StreamProducer*)m_streamArray[i].streamObj;
/* thread is is InvalidTid means thread not start up */
if (producer->getThreadId() == InvalidTid) {
continue;
}
/*
* Signal slot must be already registered if stream thread already inited.
* If not, wait 1ms once and then recheck. Sets the maximum total wait
@ -937,7 +942,12 @@ void StreamNodeGroup::syncQuit(StreamObjStatus status)
}
if (u_sess->stream_cxt.global_obj != NULL) {
if (STREAM_ERROR == status)
#ifdef ENABLE_MULTIPLE_NODES
if (status == STREAM_ERROR)
#else
if (status == STREAM_ERROR ||
unlikely(u_sess->stream_cxt.global_obj->GetStreamQuitStatus() == STREAM_ERROR))
#endif
u_sess->stream_cxt.global_obj->MarkSyncControllerStopFlagAll();
if (StreamTopConsumerAmI()) {

View File

@ -60,6 +60,7 @@ static void HandleStreamSigjmp();
static void execute_stream_plan(StreamProducer* producer);
static void execute_stream_end(StreamProducer* producer);
static void StreamQuitAndClean(int code, Datum arg);
static void ResetStreamWorkerInfo();
/* ----------------------------------------------------------------
* StreamMain
@ -107,6 +108,10 @@ int StreamMain()
/* We can now handle ereport(ERROR) */
t_thrd.log_cxt.PG_exception_stack = &local_sigjmp_buf;
if (IS_THREAD_POOL_STREAM) {
ResetStreamWorkerInfo();
}
while (true) {
if (IS_THREAD_POOL_STREAM) {
pgstat_report_activity(STATE_IDLE, NULL);
@ -638,6 +643,12 @@ void SetStreamWorkerInfo(StreamProducer* proObj)
u_sess->stream_cxt.producer_obj->getNodeGroupIdx(), &u_sess->exec_cxt.executor_stop_flag);
}
static void ResetStreamWorkerInfo()
{
u_sess->stream_cxt.producer_obj = NULL;
u_sess->stream_cxt.global_obj = NULL;
}
static void StoreStreamSyncParam(StreamSyncParam *syncParam)
{
syncParam->TempNamespace = u_sess->catalog_cxt.myTempNamespace;

View File

@ -56,6 +56,7 @@
#include "commands/trigger.h"
#include "commands/sequence.h"
#include "catalog/pg_hashbucket_fn.h"
#include "distributelayer/streamCore.h"
#include "distributelayer/streamMain.h"
#include "executor/lightProxy.h"
#include "executor/spi.h"
@ -4122,6 +4123,13 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback)
closeAllVfds();
#endif
#ifndef ENABLE_MULTIPLE_NODES
/* mark that stream query quits in error, to avoid stream threads not quit while PortalDrop */
if (u_sess->stream_cxt.global_obj != NULL) {
u_sess->stream_cxt.global_obj->MarkStreamQuitStatus(STREAM_ERROR);
}
#endif
s->savepointList = NULL;
TwoPhaseCommit = false;

View File

@ -422,7 +422,17 @@ public:
{
return m_recursiveVfdInvalid;
}
#ifndef ENABLE_MULTIPLE_NODES
inline void MarkStreamQuitStatus(StreamObjStatus status)
{
m_quitStatus = status;
}
inline StreamObjStatus GetStreamQuitStatus()
{
return m_quitStatus;
}
#endif
/* Mark recursive vfd is invalid before aborting transaction. */
static void MarkRecursiveVfdInvalid();
@ -495,6 +505,10 @@ private:
/* Mutex for stream connect sync. */
static pthread_mutex_t m_streamConnectSyncLock;
#ifndef ENABLE_MULTIPLE_NODES
/* Mark Stream query quit status. */
StreamObjStatus m_quitStatus;
#endif
};
extern bool IsThreadProcessStreamRecursive();