[Improvement](brpc) Using a thread pool for RPC service avoiding std::mutex block brpc::bthread (#16639)
mainly include: - brpc service adds two types of thread pools. The number of "light" and "heavy" thread pools is different Classify the interfaces of be. Those related to data transmission are classified as heavy interfaces and others as light interfaces - Add some monitoring to the thread pool, including the queue size and the number of active threads. Use these - indicators to guide the configuration of the number of threads
This commit is contained in:
@ -35,7 +35,8 @@ CONF_Int32(be_port, "9060");
|
||||
// port for brpc
|
||||
CONF_Int32(brpc_port, "8060");
|
||||
|
||||
// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores
|
||||
// the number of bthreads for brpc, the default value is set to -1,
|
||||
// which means the number of bthreads is #cpu-cores
|
||||
CONF_Int32(brpc_num_threads, "-1");
|
||||
|
||||
// port to brpc server for single replica load
|
||||
@ -388,8 +389,15 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
|
||||
CONF_Int64(load_data_reserve_hours, "4");
|
||||
// log error log will be removed after this time
|
||||
CONF_mInt64(load_error_log_reserve_hours, "48");
|
||||
CONF_Int32(number_tablet_writer_threads, "16");
|
||||
CONF_Int32(number_slave_replica_download_threads, "64");
|
||||
|
||||
// be brpc interface is classified into two categories: light and heavy
|
||||
// each category has diffrent thread number
|
||||
// threads to handle heavy api interface, such as transmit_data/transmit_block etc
|
||||
CONF_Int32(brpc_heavy_work_pool_threads, "192");
|
||||
// threads to handle light api interface, such as exec_plan_fragment_prepare/exec_plan_fragment_start
|
||||
CONF_Int32(brpc_light_work_pool_threads, "32");
|
||||
CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
|
||||
CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");
|
||||
|
||||
// The maximum amount of data that can be processed by a stream load
|
||||
CONF_mInt64(streaming_load_max_mb, "10240");
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -190,8 +190,13 @@ private:
|
||||
|
||||
private:
|
||||
ExecEnv* _exec_env;
|
||||
PriorityThreadPool _tablet_worker_pool;
|
||||
PriorityThreadPool _slave_replica_worker_pool;
|
||||
|
||||
// every brpc service request should put into thread pool
|
||||
// the reason see issue #16634
|
||||
// define the interface for reading and writing data as heavy interface
|
||||
// otherwise as light interface
|
||||
PriorityThreadPool _heavy_work_pool;
|
||||
PriorityThreadPool _light_work_pool;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -222,6 +222,16 @@ public:
|
||||
IntCounter* upload_rowset_count;
|
||||
IntCounter* upload_fail_count;
|
||||
|
||||
UIntGauge* light_work_pool_queue_size;
|
||||
UIntGauge* heavy_work_pool_queue_size;
|
||||
UIntGauge* heavy_work_active_threads;
|
||||
UIntGauge* light_work_active_threads;
|
||||
|
||||
UIntGauge* heavy_work_pool_max_queue_size;
|
||||
UIntGauge* light_work_pool_max_queue_size;
|
||||
UIntGauge* heavy_work_max_threads;
|
||||
UIntGauge* light_work_max_threads;
|
||||
|
||||
static DorisMetrics* instance() {
|
||||
static DorisMetrics instance;
|
||||
return &instance;
|
||||
|
||||
@ -55,7 +55,7 @@ public:
|
||||
// queue exceeds this size, subsequent calls to Offer will block until there is
|
||||
// capacity available.
|
||||
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const std::string& name)
|
||||
: _work_queue(queue_size), _shutdown(false), _name(name) {
|
||||
: _work_queue(queue_size), _shutdown(false), _name(name), _active_threads(0) {
|
||||
for (int i = 0; i < num_threads; ++i) {
|
||||
_threads.create_thread(
|
||||
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
|
||||
@ -101,6 +101,7 @@ public:
|
||||
virtual void join() { _threads.join_all(); }
|
||||
|
||||
virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
|
||||
virtual uint32_t get_active_threads() const { return _active_threads; }
|
||||
|
||||
// Blocks until the work queue is empty, and then calls shutdown to stop the worker
|
||||
// threads and Join to wait until they are finished.
|
||||
@ -136,7 +137,9 @@ private:
|
||||
while (!is_shutdown()) {
|
||||
Task task;
|
||||
if (_work_queue.blocking_get(&task)) {
|
||||
_active_threads++;
|
||||
task.work_function();
|
||||
_active_threads--;
|
||||
}
|
||||
if (_work_queue.get_size() == 0) {
|
||||
_empty_cv.notify_all();
|
||||
@ -151,6 +154,7 @@ private:
|
||||
// Set to true when threads should stop doing work and terminate.
|
||||
std::atomic<bool> _shutdown;
|
||||
std::string _name;
|
||||
std::atomic<int> _active_threads;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -298,6 +298,12 @@ curl http://be_host:webserver_port/metrics?type=json
|
||||
|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
|
||||
|`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 |
|
||||
|`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 |
|
||||
|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
|
||||
|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 |
|
||||
|`doris_be_heavy_work_pool_queue_size`| | Num | brpc heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
|
||||
|`doris_be_light_work_pool_queue_size`| | Num | brpc light线程池队列最大长度,超过则阻塞提交work| | p0 |
|
||||
|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
|
||||
|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |
|
||||
|
||||
### 机器监控
|
||||
|
||||
|
||||
@ -276,6 +276,7 @@ enum PCacheStatus {
|
||||
INVALID_KEY_RANGE = 6;
|
||||
DATA_OVERDUE = 7;
|
||||
EMPTY_DATA = 8;
|
||||
CANCELED = 9;
|
||||
};
|
||||
|
||||
enum CacheType {
|
||||
|
||||
Reference in New Issue
Block a user