From 997db43dda05bb899311e76f74d073dea226d323 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:16:58 +0800 Subject: [PATCH] =?UTF-8?q?branch-2.1:=20[improve](thrift)=20Config=20thri?= =?UTF-8?q?ft=5Fmax=5Fmessage=5Fsize=20for=20THREAD=5FPOOL=20and=20?= =?UTF-8?q?=E2=80=A6=20#49677=20(#49724)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cherry-picked from #49677 Co-authored-by: walter --- be/src/util/thrift_server.cpp | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/be/src/util/thrift_server.cpp b/be/src/util/thrift_server.cpp index 84173300bd..fc40ac7b87 100644 --- a/be/src/util/thrift_server.cpp +++ b/be/src/util/thrift_server.cpp @@ -82,6 +82,25 @@ private: std::shared_ptr config; }; +class ImprovedBufferedTransportFactory + : public apache::thrift::transport::TBufferedTransportFactory { + using TConfiguration = apache::thrift::TConfiguration; + using TTransport = apache::thrift::transport::TTransport; + using TBufferedTransport = apache::thrift::transport::TBufferedTransport; + +public: + ImprovedBufferedTransportFactory() + : config(std::make_shared(config::thrift_max_message_size)) {} + ~ImprovedBufferedTransportFactory() override = default; + + std::shared_ptr getTransport(std::shared_ptr trans) override { + return std::make_shared(std::move(trans), config); + } + +private: + std::shared_ptr config; +}; + // Helper class that starts a server in a separate thread, and handles // the inter-thread communication to monitor whether it started // correctly. @@ -337,8 +356,7 @@ Status ThriftServer::start() { std::make_shared(); std::shared_ptr fe_server_transport; std::shared_ptr transport_factory; - std::shared_ptr socket = - std::make_shared(_port); + std::shared_ptr socket; if (_server_type != THREADED) { thread_mgr = apache::thrift::concurrency::ThreadManager::newSimpleThreadManager( _num_worker_threads); @@ -352,6 +370,7 @@ Status ThriftServer::start() { switch (_server_type) { case NON_BLOCKING: + socket = std::make_shared(_port); if (transport_factory == nullptr) { transport_factory.reset(new apache::thrift::transport::TTransportFactory()); } @@ -366,7 +385,7 @@ Status ThriftServer::start() { BackendOptions::get_service_bind_address_without_bracket(), _port)); if (transport_factory == nullptr) { - transport_factory.reset(new apache::thrift::transport::TBufferedTransportFactory()); + transport_factory = std::make_shared(); } _server = std::make_unique( @@ -380,7 +399,7 @@ Status ThriftServer::start() { server_socket->setKeepAlive(true); if (transport_factory == nullptr) { - transport_factory.reset(new apache::thrift::transport::TBufferedTransportFactory()); + transport_factory = std::make_shared(); } _server = std::make_unique(