[improve](fe) Support to config max msg/frame size of the thrift server (#36594)
Cherry-pick #35845
This commit is contained in:
@ -23,6 +23,7 @@ import org.apache.doris.thrift.TNetworkAddress;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TConfiguration;
|
||||
import org.apache.thrift.TProcessor;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.server.TServer;
|
||||
@ -31,10 +32,13 @@ import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.server.TThreadedSelectorServer;
|
||||
import org.apache.thrift.transport.TNonblockingServerSocket;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
@ -98,8 +102,9 @@ public class ThriftServer {
|
||||
|
||||
private void createThreadedServer() throws TTransportException {
|
||||
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(
|
||||
new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms)).protocolFactory(
|
||||
new TBinaryProtocol.Factory()).processor(processor);
|
||||
new TNonblockingServerSocket(port, Config.thrift_client_timeout_ms))
|
||||
.protocolFactory(new TBinaryProtocol.Factory())
|
||||
.processor(processor);
|
||||
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(
|
||||
Config.thrift_server_max_worker_threads, "thrift-server-pool", true);
|
||||
args.executorService(threadPoolExecutor);
|
||||
@ -111,19 +116,19 @@ public class ThriftServer {
|
||||
|
||||
if (FrontendOptions.isBindIPV6()) {
|
||||
socketTransportArgs = new TServerSocket.ServerSocketTransportArgs()
|
||||
.bindAddr(new InetSocketAddress("::0", port))
|
||||
.clientTimeout(Config.thrift_client_timeout_ms)
|
||||
.backlog(Config.thrift_backlog_num);
|
||||
.bindAddr(new InetSocketAddress("::0", port))
|
||||
.clientTimeout(Config.thrift_client_timeout_ms)
|
||||
.backlog(Config.thrift_backlog_num);
|
||||
} else {
|
||||
socketTransportArgs = new TServerSocket.ServerSocketTransportArgs()
|
||||
.bindAddr(new InetSocketAddress("0.0.0.0", port))
|
||||
.clientTimeout(Config.thrift_client_timeout_ms)
|
||||
.backlog(Config.thrift_backlog_num);
|
||||
.bindAddr(new InetSocketAddress("0.0.0.0", port))
|
||||
.clientTimeout(Config.thrift_client_timeout_ms)
|
||||
.backlog(Config.thrift_backlog_num);
|
||||
}
|
||||
|
||||
TThreadPoolServer.Args serverArgs =
|
||||
new TThreadPoolServer.Args(new TServerSocket(socketTransportArgs)).protocolFactory(
|
||||
new TBinaryProtocol.Factory()).processor(processor);
|
||||
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(new ImprovedTServerSocket(socketTransportArgs))
|
||||
.protocolFactory(new TBinaryProtocol.Factory())
|
||||
.processor(processor);
|
||||
ThreadPoolExecutor threadPoolExecutor = ThreadPoolManager.newDaemonCacheThreadPool(
|
||||
Config.thrift_server_max_worker_threads, "thrift-server-pool", true);
|
||||
serverArgs.executorService(threadPoolExecutor);
|
||||
@ -175,4 +180,38 @@ public class ThriftServer {
|
||||
public void removeConnect(TNetworkAddress clientAddress) {
|
||||
connects.remove(clientAddress);
|
||||
}
|
||||
|
||||
static class ImprovedTServerSocket extends TServerSocket {
|
||||
public ImprovedTServerSocket(ServerSocketTransportArgs args) throws TTransportException {
|
||||
super(args);
|
||||
}
|
||||
|
||||
public TSocket accept() throws TTransportException {
|
||||
ServerSocket serverSocket = getServerSocket();
|
||||
if (serverSocket == null) {
|
||||
throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
|
||||
}
|
||||
|
||||
Socket result;
|
||||
try {
|
||||
result = serverSocket.accept();
|
||||
} catch (Exception e) {
|
||||
throw new TTransportException(e);
|
||||
}
|
||||
if (result == null) {
|
||||
throw new TTransportException("Blocking server's accept() may not return NULL");
|
||||
}
|
||||
|
||||
TSocket socket = new TSocket(result);
|
||||
|
||||
TConfiguration cfg = socket.getConfiguration();
|
||||
cfg.setMaxMessageSize(Config.thrift_max_message_size);
|
||||
cfg.setMaxFrameSize(Config.thrift_max_frame_size);
|
||||
|
||||
socket.updateKnownMessageSize(0); // Since we update the configuration, reset consumed message size.
|
||||
socket.setTimeout(Config.thrift_client_timeout_ms);
|
||||
|
||||
return socket;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user