From f09db307e12e539f9232c6d67535e291e60cd08f Mon Sep 17 00:00:00 2001 From: LiHeng Date: Wed, 4 Aug 2021 16:48:04 +0800 Subject: [PATCH] fix threadpool --- .../threadpool/threadpool_controler.cpp | 7 +++--- .../threadpool/threadpool_listener.cpp | 8 ++++++ .../process/threadpool/threadpool_stream.cpp | 6 ++++- .../process/threadpool/threadpool_worker.cpp | 25 +++++++++++++++++-- 4 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/gausskernel/process/threadpool/threadpool_controler.cpp b/src/gausskernel/process/threadpool/threadpool_controler.cpp index 1ba912988..3d85eecdf 100644 --- a/src/gausskernel/process/threadpool/threadpool_controler.cpp +++ b/src/gausskernel/process/threadpool/threadpool_controler.cpp @@ -558,11 +558,10 @@ void ThreadPoolControler::ConstrainThreadNum() { /* Thread pool size should not be larger than max_connections. */ if (MAX_THREAD_POOL_SIZE > g_instance.attr.attr_network.MaxConnections) { - m_maxPoolSize = g_instance.attr.attr_network.MaxConnections; ereport(LOG, (errcode(ERRCODE_OPERATE_INVALID_PARAM), - errmsg("Thread pool size %d should not be larger than max_connections %d, " - "so reduce thread pool size to max_connections", - m_threadNum, g_instance.attr.attr_network.MaxConnections))); + errmsg("Max thread pool size %d should not be larger than max_connections %d, " + "so reduce max thread pool size to max_connections", + MAX_THREAD_POOL_SIZE, g_instance.attr.attr_network.MaxConnections))); } m_maxPoolSize = Min(MAX_THREAD_POOL_SIZE, g_instance.attr.attr_network.MaxConnections); diff --git a/src/gausskernel/process/threadpool/threadpool_listener.cpp b/src/gausskernel/process/threadpool/threadpool_listener.cpp index 53cb8fdfe..7293801c3 100644 --- a/src/gausskernel/process/threadpool/threadpool_listener.cpp +++ b/src/gausskernel/process/threadpool/threadpool_listener.cpp @@ -231,6 +231,14 @@ void ThreadPoolListener::ReaperAllSession() " encounter FATAL problems before session close."))); ExitPostmaster(1); } + /* m_sessionCount should be sum of the list length of m_idleSessionList and m_readySessionList + and worker's attached session */ + pg_memory_barrier(); + if (m_idleSessionList->IsEmpty() && m_readySessionList->IsEmpty() && + m_group->m_workerNum - m_group->m_idleWorkerNum == 0) { + ereport(WARNING, (errmsg("SessionCount should be zero when no session in this group."))); + m_group->m_sessionCount = 0; + } elem = m_idleSessionList->RemoveHead(); while (elem != NULL) { diff --git a/src/gausskernel/process/threadpool/threadpool_stream.cpp b/src/gausskernel/process/threadpool/threadpool_stream.cpp index 281c0edef..7afad7a37 100644 --- a/src/gausskernel/process/threadpool/threadpool_stream.cpp +++ b/src/gausskernel/process/threadpool/threadpool_stream.cpp @@ -130,6 +130,8 @@ void ThreadPoolStream::InitStream() SetStreamWorkerInfo(m_producer); ExtractProduerInfo(); + SetProcessingMode(InitProcessing); + /* Init GUC option for this session. */ InitializeGUCOptions(); /* Read in remaining GUC variables */ @@ -142,7 +144,9 @@ void ThreadPoolStream::InitStream() t_thrd.proc_cxt.PostInit->SetDatabaseAndUser( u_sess->stream_cxt.producer_obj->getDbName(), InvalidOid, u_sess->stream_cxt.producer_obj->getUserName()); t_thrd.proc_cxt.PostInit->InitStreamSession(); - + + SetProcessingMode(NormalProcessing); + repair_guc_variables(); RestoreStreamSyncParam(&m_producer->m_syncParam); diff --git a/src/gausskernel/process/threadpool/threadpool_worker.cpp b/src/gausskernel/process/threadpool/threadpool_worker.cpp index 001f052d7..c4352fb13 100644 --- a/src/gausskernel/process/threadpool/threadpool_worker.cpp +++ b/src/gausskernel/process/threadpool/threadpool_worker.cpp @@ -396,6 +396,13 @@ void ThreadPoolWorker::ShutDownIfNecessary() RestoreThreadVariable(); proc_exit(0); } + /* there is time window which the cancle signal has arrived but ignored by prevent signal called before, + * so we rebuild the signal status here in case that happens. */ + if (unlikely(m_currentSession != NULL && m_currentSession->status == KNL_SESS_CLOSE)) { + ereport(LOG, (errmodule(MOD_THREAD_POOL), + errmsg("Cancle signal has arrived but ignored by prevent signal called before, rebuild it."))); + t_thrd.int_cxt.ClientConnectionLost = true; + } } void ThreadPoolWorker::CleanThread() @@ -425,6 +432,7 @@ void ThreadPoolWorker::CleanThread() } InterruptPending = false; + t_thrd.int_cxt.QueryCancelPending = false; t_thrd.libpq_cxt.PqSendStart = 0; t_thrd.libpq_cxt.PqSendPointer = 0; t_thrd.libpq_cxt.PqRecvLength = 0; @@ -494,8 +502,16 @@ bool ThreadPoolWorker::AttachSessionToThread() * Since thread pool worker may start earlier than startup finishing recovery, * init xlog access if necessary. */ - (void)RecoveryInProgress(); - + PG_TRY(); + { + (void)RecoveryInProgress(); + } + PG_CATCH(); + { + /* if init xlog has error, should throw fatal this thread */ + ereport(FATAL, (errmsg("init xlog failed, throw fatal for this thread"))); + } + PG_END_TRY(); #ifdef ENABLE_QUNIT set_qunit_case_number_hook(u_sess->utils_cxt.qunit_case_number, NULL); #endif @@ -660,6 +676,8 @@ static void init_session_share_memory() static bool InitSession(knl_session_context* session) { + /* non't send ereport to client now */ + t_thrd.postgres_cxt.whereToSendOutput = DestNone; /* Switch context to Session context. */ AutoContextSwitch memSwitch(session->mcxt_group->GetMemCxtGroup(MEMORY_CONTEXT_DEFAULT)); @@ -682,6 +700,9 @@ static bool InitSession(knl_session_context* session) /* Read in remaining GUC variables */ read_nondefault_variables(); + /* now safe to ereport to client */ + t_thrd.postgres_cxt.whereToSendOutput = DestRemote; + /* Init port and connection. */ if (!InitPort(session->proc_cxt.MyProcPort)) { /* reset some status below */