diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index 2e1d0508ba..290f2cc3e0 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -111,7 +111,8 @@ public: std::shared_ptr get_new_client_no_cache(const std::string& host_port, const std::string& protocol = "baidu_std", - const std::string& connect_type = "") { + const std::string& connect_type = "", + const std::string& connection_group = "") { brpc::ChannelOptions options; if constexpr (std::is_same_v) { options.protocol = config::function_service_protocol; @@ -121,6 +122,9 @@ public: if (connect_type != "") { options.connection_type = connect_type; } + if (connection_group != "") { + options.connection_group = connection_group; + } options.connect_timeout_ms = 2000; options.max_retry = 10; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 155ce2de34..92670c1c93 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -175,9 +175,9 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, *request.add_tablets() = tablet; } POpenLoadStreamResponse response; - // use "pooled" connection to avoid conflicts between streaming rpc and regular rpc, - // see: https://github.com/apache/brpc/issues/392 - const auto& stub = client_cache->get_new_client_no_cache(host_port, "baidu_std", "pooled"); + // set connection_group "streaming" to distinguish with non-streaming connections + const auto& stub = + client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", "streaming"); stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique();