From 384bb1ae8332b45d82c78b1425f5039a2086c9c5 Mon Sep 17 00:00:00 2001 From: yanghao Date: Thu, 18 Mar 2021 17:34:22 +0800 Subject: [PATCH] fix problem that stream thread pool may cause hang while exceed max_connections --- src/gausskernel/process/postmaster/postmaster.cpp | 2 ++ src/gausskernel/process/stream/streamCore.cpp | 12 +++++++++++- src/gausskernel/process/stream/streamMain.cpp | 11 +++++++++++ src/gausskernel/storage/access/transam/xact.cpp | 8 ++++++++ src/include/distributelayer/streamCore.h | 14 ++++++++++++++ 5 files changed, 46 insertions(+), 1 deletion(-) diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 19c162859..4288723e2 100755 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -9721,6 +9721,8 @@ void SetExtraThreadInfo(knl_thread_arg* arg) case THREADPOOL_STREAM: { t_thrd.threadpool_cxt.stream = (ThreadPoolStream*)arg->payload; t_thrd.threadpool_cxt.group = t_thrd.threadpool_cxt.stream->GetGroup(); + StreamProducer* proObj = (StreamProducer*)t_thrd.threadpool_cxt.stream->GetProducer(); + SetStreamWorkerInfo(proObj); break; } #ifdef ENABLE_MULTIPLE_NODES diff --git a/src/gausskernel/process/stream/streamCore.cpp b/src/gausskernel/process/stream/streamCore.cpp index 2952d0a80..1b9972696 100644 --- a/src/gausskernel/process/stream/streamCore.cpp +++ b/src/gausskernel/process/stream/streamCore.cpp @@ -524,6 +524,11 @@ void StreamNodeGroup::signalStreamThreadInNodeGroup(int signo) int ntimes = 1; StreamProducer* producer = (StreamProducer*)m_streamArray[i].streamObj; + /* thread is is InvalidTid means thread not start up */ + if (producer->getThreadId() == InvalidTid) { + continue; + } + /* * Signal slot must be already registered if stream thread already inited. * If not, wait 1ms once and then recheck. Sets the maximum total wait @@ -937,7 +942,12 @@ void StreamNodeGroup::syncQuit(StreamObjStatus status) } if (u_sess->stream_cxt.global_obj != NULL) { - if (STREAM_ERROR == status) +#ifdef ENABLE_MULTIPLE_NODES + if (status == STREAM_ERROR) +#else + if (status == STREAM_ERROR || + unlikely(u_sess->stream_cxt.global_obj->GetStreamQuitStatus() == STREAM_ERROR)) +#endif u_sess->stream_cxt.global_obj->MarkSyncControllerStopFlagAll(); if (StreamTopConsumerAmI()) { diff --git a/src/gausskernel/process/stream/streamMain.cpp b/src/gausskernel/process/stream/streamMain.cpp index 2bffd7217..6c7215205 100644 --- a/src/gausskernel/process/stream/streamMain.cpp +++ b/src/gausskernel/process/stream/streamMain.cpp @@ -60,6 +60,7 @@ static void HandleStreamSigjmp(); static void execute_stream_plan(StreamProducer* producer); static void execute_stream_end(StreamProducer* producer); static void StreamQuitAndClean(int code, Datum arg); +static void ResetStreamWorkerInfo(); /* ---------------------------------------------------------------- * StreamMain @@ -107,6 +108,10 @@ int StreamMain() /* We can now handle ereport(ERROR) */ t_thrd.log_cxt.PG_exception_stack = &local_sigjmp_buf; + if (IS_THREAD_POOL_STREAM) { + ResetStreamWorkerInfo(); + } + while (true) { if (IS_THREAD_POOL_STREAM) { pgstat_report_activity(STATE_IDLE, NULL); @@ -638,6 +643,12 @@ void SetStreamWorkerInfo(StreamProducer* proObj) u_sess->stream_cxt.producer_obj->getNodeGroupIdx(), &u_sess->exec_cxt.executor_stop_flag); } +static void ResetStreamWorkerInfo() +{ + u_sess->stream_cxt.producer_obj = NULL; + u_sess->stream_cxt.global_obj = NULL; +} + static void StoreStreamSyncParam(StreamSyncParam *syncParam) { syncParam->TempNamespace = u_sess->catalog_cxt.myTempNamespace; diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index 1e3d09df3..5a5e71522 100644 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -56,6 +56,7 @@ #include "commands/trigger.h" #include "commands/sequence.h" #include "catalog/pg_hashbucket_fn.h" +#include "distributelayer/streamCore.h" #include "distributelayer/streamMain.h" #include "executor/lightProxy.h" #include "executor/spi.h" @@ -4122,6 +4123,13 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback) closeAllVfds(); #endif +#ifndef ENABLE_MULTIPLE_NODES + /* mark that stream query quits in error, to avoid stream threads not quit while PortalDrop */ + if (u_sess->stream_cxt.global_obj != NULL) { + u_sess->stream_cxt.global_obj->MarkStreamQuitStatus(STREAM_ERROR); + } +#endif + s->savepointList = NULL; TwoPhaseCommit = false; diff --git a/src/include/distributelayer/streamCore.h b/src/include/distributelayer/streamCore.h index 71ee6bbc9..2a8aae119 100644 --- a/src/include/distributelayer/streamCore.h +++ b/src/include/distributelayer/streamCore.h @@ -422,7 +422,17 @@ public: { return m_recursiveVfdInvalid; } +#ifndef ENABLE_MULTIPLE_NODES + inline void MarkStreamQuitStatus(StreamObjStatus status) + { + m_quitStatus = status; + } + inline StreamObjStatus GetStreamQuitStatus() + { + return m_quitStatus; + } +#endif /* Mark recursive vfd is invalid before aborting transaction. */ static void MarkRecursiveVfdInvalid(); @@ -495,6 +505,10 @@ private: /* Mutex for stream connect sync. */ static pthread_mutex_t m_streamConnectSyncLock; +#ifndef ENABLE_MULTIPLE_NODES + /* Mark Stream query quit status. */ + StreamObjStatus m_quitStatus; +#endif }; extern bool IsThreadProcessStreamRecursive();