branch-2.1: [improve](thrift) Config thrift_max_message_size for THREAD_POOL and … #49677 (#49724)

Cherry-picked from #49677

Co-authored-by: walter <maochuan@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-04-01 17:16:58 +08:00
committed by GitHub
parent e898dbbba0
commit 997db43dda

View File

@ -82,6 +82,25 @@ private:
std::shared_ptr<TConfiguration> config;
};
class ImprovedBufferedTransportFactory
: public apache::thrift::transport::TBufferedTransportFactory {
using TConfiguration = apache::thrift::TConfiguration;
using TTransport = apache::thrift::transport::TTransport;
using TBufferedTransport = apache::thrift::transport::TBufferedTransport;
public:
ImprovedBufferedTransportFactory()
: config(std::make_shared<TConfiguration>(config::thrift_max_message_size)) {}
~ImprovedBufferedTransportFactory() override = default;
std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
return std::make_shared<TBufferedTransport>(std::move(trans), config);
}
private:
std::shared_ptr<TConfiguration> config;
};
// Helper class that starts a server in a separate thread, and handles
// the inter-thread communication to monitor whether it started
// correctly.
@ -337,8 +356,7 @@ Status ThriftServer::start() {
std::make_shared<apache::thrift::concurrency::ThreadFactory>();
std::shared_ptr<apache::thrift::transport::TServerTransport> fe_server_transport;
std::shared_ptr<apache::thrift::transport::TTransportFactory> transport_factory;
std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket> socket =
std::make_shared<ImprovedNonblockingServerSocket>(_port);
std::shared_ptr<apache::thrift::transport::TNonblockingServerSocket> socket;
if (_server_type != THREADED) {
thread_mgr = apache::thrift::concurrency::ThreadManager::newSimpleThreadManager(
_num_worker_threads);
@ -352,6 +370,7 @@ Status ThriftServer::start() {
switch (_server_type) {
case NON_BLOCKING:
socket = std::make_shared<ImprovedNonblockingServerSocket>(_port);
if (transport_factory == nullptr) {
transport_factory.reset(new apache::thrift::transport::TTransportFactory());
}
@ -366,7 +385,7 @@ Status ThriftServer::start() {
BackendOptions::get_service_bind_address_without_bracket(), _port));
if (transport_factory == nullptr) {
transport_factory.reset(new apache::thrift::transport::TBufferedTransportFactory());
transport_factory = std::make_shared<ImprovedBufferedTransportFactory>();
}
_server = std::make_unique<apache::thrift::server::TThreadPoolServer>(
@ -380,7 +399,7 @@ Status ThriftServer::start() {
server_socket->setKeepAlive(true);
if (transport_factory == nullptr) {
transport_factory.reset(new apache::thrift::transport::TBufferedTransportFactory());
transport_factory = std::make_shared<ImprovedBufferedTransportFactory>();
}
_server = std::make_unique<apache::thrift::server::TThreadedServer>(