[Enhancement](FE) use customized grpc threadpool to get better metric for grpc from FE to BE (#13983)
Previously in Doris FE, there is no specific thread pool for grpc-client-channel, by default the underlying netty logic would use one dynamic unbounded cache threadpool. The workload for this grpc threadpool is unseen. Use ThreadpoolMgr to create one customized threadpool to get Prometheus-compatible metric data.
This commit is contained in:
@ -167,6 +167,12 @@ Default:100
|
||||
|
||||
the max txn number which bdbje can rollback when trying to rejoin the group
|
||||
|
||||
### grpc_threadmgr_threads_nums
|
||||
|
||||
Default: 4096
|
||||
|
||||
Num of thread to handle grpc events in grpc_threadmgr.
|
||||
|
||||
#### `bdbje_replica_ack_timeout_second`
|
||||
|
||||
Default:10 (s)
|
||||
|
||||
@ -173,7 +173,13 @@ Doris 元数据将保存在这里。 强烈建议将此目录的存储为:
|
||||
|
||||
元数据会同步写入到多个 Follower FE,这个参数用于控制 Master FE 等待 Follower FE 发送 ack 的超时时间。当写入的数据较大时,可能 ack 时间较长,如果超时,会导致写元数据失败,FE 进程退出。此时可以适当调大这个参数。
|
||||
|
||||
#### `bdbje_lock_timeout_second`
|
||||
### grpc_threadmgr_threads_nums
|
||||
|
||||
默认值: 4096
|
||||
|
||||
在grpc_threadmgr中处理grpc events的线程数量。
|
||||
|
||||
#### `bdbje_lock_timeout_second`>>>>>>> 1b46f49ad0 (use customed threadpool instead of the default threadpool of grpc java to get better metrics)
|
||||
|
||||
默认值:1
|
||||
|
||||
|
||||
@ -1510,6 +1510,12 @@ public class Config extends ConfigBase {
|
||||
@ConfField
|
||||
public static int grpc_max_message_size_bytes = 2147483647; // 2GB
|
||||
|
||||
/**
|
||||
* num of thread to handle grpc events in grpc_threadmgr
|
||||
*/
|
||||
@ConfField
|
||||
public static int grpc_threadmgr_threads_nums = 4096;
|
||||
|
||||
/**
|
||||
* Used to set minimal number of replication per tablet.
|
||||
*/
|
||||
|
||||
@ -36,6 +36,7 @@ import io.opentelemetry.context.Context;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -49,9 +50,10 @@ public class BackendServiceClient {
|
||||
private final ManagedChannel channel;
|
||||
private final long execPlanTimeout;
|
||||
|
||||
public BackendServiceClient(TNetworkAddress address) {
|
||||
public BackendServiceClient(TNetworkAddress address, Executor executor) {
|
||||
this.address = address;
|
||||
channel = NettyChannelBuilder.forAddress(address.getHostname(), address.getPort())
|
||||
.executor(executor)
|
||||
.flowControlWindow(Config.grpc_max_message_size_bytes)
|
||||
.keepAliveWithoutCalls(true)
|
||||
.maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.rpc;
|
||||
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
|
||||
@ -38,6 +39,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
@ -47,6 +49,9 @@ public class BackendServiceProxy {
|
||||
// use exclusive lock to make sure only one thread can add or remove client from serviceMap.
|
||||
// use concurrent map to allow access serviceMap in multi thread.
|
||||
private ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private Executor grpcThreadPool = ThreadPoolManager.newDaemonCacheThreadPool(Config.grpc_threadmgr_threads_nums,
|
||||
"grpc_thread_pool", true);
|
||||
private final Map<TNetworkAddress, BackendServiceClient> serviceMap;
|
||||
|
||||
public BackendServiceProxy() {
|
||||
@ -99,7 +104,7 @@ public class BackendServiceProxy {
|
||||
try {
|
||||
service = serviceMap.get(address);
|
||||
if (service == null) {
|
||||
service = new BackendServiceClient(address);
|
||||
service = new BackendServiceClient(address, grpcThreadPool);
|
||||
serviceMap.put(address, service);
|
||||
}
|
||||
return service;
|
||||
|
||||
Reference in New Issue
Block a user