Files
openGauss-server/src/include/threadpool/threadpool_group.h
2020-06-30 17:38:27 +08:00

144 lines
3.8 KiB
C++
Executable File

/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* threadpool_group.h
* Thread pool group controls listener and worker threads.
*
* IDENTIFICATION
* src/include/threadpool/threadpool_group.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef THREAD_POOL_GROUP_H
#define THREAD_POOL_GROUP_H
#include "c.h"
#include "utils/memutils.h"
#include "knl/knl_variable.h"
#define NUM_THREADPOOL_STATUS_ELEM 7
#define STATUS_INFO_SIZE 256
typedef enum { WORKER_SLOT_UNUSE = 0, WORKER_SLOT_INUSE } WorkerSlotStatus;
typedef struct WorkerStatus {
int spawntick;
TimestampTz lastSpawnTime;
WorkerSlotStatus slotStatus;
} WorkerStatus;
struct ThreadWorkerSentry {
WorkerStatus stat;
ThreadPoolWorker* worker;
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
};
typedef struct ThreadPoolStat {
int groupId;
int numaId;
int bindCpuNum;
int listenerNum;
char workerInfo[STATUS_INFO_SIZE];
char sessionInfo[STATUS_INFO_SIZE];
} ThreadPoolStat;
class ThreadPoolGroup : public BaseObject {
public:
ThreadPoolListener* m_listener;
ThreadPoolGroup(int maxWorkerNum, int expectWorkerNum,
int groupId, int numaId, int cpuNum, int* cpuArr);
~ThreadPoolGroup();
void init(bool enableNumaDistribute);
void ReleaseWorkerSlot(int i);
void AddWorker(int i);
void ShutdownWorker();
void AddWorkerIfNecessary();
bool EnlargeWorkers(int enlargeNum);
void ReduceWorkers(int reduceNum);
void ShutDownPendingWorkers();
void WaitReady();
float4 GetSessionPerThread();
void GetThreadPoolGroupStat(ThreadPoolStat* stat);
bool IsGroupHang();
inline ThreadPoolListener* GetListener()
{
return m_listener;
}
inline int GetGroupId()
{
return m_groupId;
}
inline int GetNumaId()
{
return m_numaId;
}
inline bool AllSessionClosed()
{
return (m_sessionCount <= 0);
}
inline bool AllThreadShutDown()
{
return (m_workerNum <= 0);
}
friend class ThreadPoolWorker;
friend class ThreadPoolListener;
friend class ThreadPoolScheduler;
private:
void AttachThreadToCPU(ThreadId thread, int cpu);
void AttachThreadToNodeLevel(ThreadId thread) const;
private:
/*
* threadpool_status
* node name | group id | binding numaId | binding CpuNum | listener num |
* expect worker | actual worker | idle worker | session number | waiting serve session |
* run session(= actual worker - idle worker) | idle session
*/
int m_maxWorkerNum;
int m_defaultWorkerNum;
volatile int m_workerNum;
volatile int m_listenerNum;
volatile int m_expectWorkerNum;
volatile int m_idleWorkerNum;
volatile int m_pendingWorkerNum;
volatile int m_sessionCount; // all session count;
volatile int m_waitServeSessionCount; // wait for worker to server
volatile int m_processTaskCount;
int m_groupId;
int m_numaId;
int m_groupCpuNum;
int* m_groupCpuArr;
ThreadWorkerSentry* m_workers;
MemoryContext m_context;
pthread_mutex_t m_mutex;
bool m_enableNumaDistribute;
cpu_set_t m_nodeCpuSet; /* for numa node distribution only */
};
#endif /* THREAD_POOL_GROUP_H */