diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index c404c73a07..4b2478ccf9 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -215,6 +215,9 @@ public: BrpcClientCache* brpc_internal_client_cache() const { return _internal_client_cache; } + BrpcClientCache* brpc_streaming_client_cache() const { + return _streaming_client_cache; + } BrpcClientCache* brpc_function_client_cache() const { return _function_client_cache; } @@ -392,6 +395,7 @@ private: // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle. std::shared_ptr _new_load_stream_mgr; BrpcClientCache* _internal_client_cache = nullptr; + BrpcClientCache* _streaming_client_cache = nullptr; BrpcClientCache* _function_client_cache = nullptr; std::shared_ptr _stream_load_executor; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 30098c9b61..bbd6bbc944 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -233,7 +233,10 @@ Status ExecEnv::_init(const std::vector& store_paths, _load_stream_mgr = std::make_unique(num_flush_threads); _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); _internal_client_cache = new BrpcClientCache(); - _function_client_cache = new BrpcClientCache(); + _streaming_client_cache = + new BrpcClientCache("baidu_std", "single", "streaming"); + _function_client_cache = + new BrpcClientCache(config::function_service_protocol); _stream_load_executor = StreamLoadExecutor::create_shared(this); _routine_load_task_executor = new RoutineLoadTaskExecutor(this); RETURN_IF_ERROR(_routine_load_task_executor->init()); @@ -631,6 +634,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_routine_load_task_executor); // _stream_load_executor SAFE_DELETE(_function_client_cache); + SAFE_DELETE(_streaming_client_cache); SAFE_DELETE(_internal_client_cache); SAFE_DELETE(_bfd_parser); diff --git a/be/src/util/brpc_client_cache.cpp b/be/src/util/brpc_client_cache.cpp index b9135e8014..c5a6488787 100644 --- a/be/src/util/brpc_client_cache.cpp +++ b/be/src/util/brpc_client_cache.cpp @@ -25,12 +25,23 @@ namespace doris { DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_stream_endpoint_stub_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_function_endpoint_stub_count, MetricUnit::NOUNIT); template <> -BrpcClientCache::BrpcClientCache() { - REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); }); +BrpcClientCache::BrpcClientCache(std::string protocol, + std::string connection_type, + std::string connection_group) + : _protocol(protocol), + _connection_type(connection_type), + _connection_group(connection_group) { + if (connection_group == "streaming") { + REGISTER_HOOK_METRIC(brpc_stream_endpoint_stub_count, + [this]() { return _stub_map.size(); }); + } else { + REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return _stub_map.size(); }); + } } template <> @@ -39,7 +50,12 @@ BrpcClientCache::~BrpcClientCache() { } template <> -BrpcClientCache::BrpcClientCache() { +BrpcClientCache::BrpcClientCache(std::string protocol, + std::string connection_type, + std::string connection_group) + : _protocol(protocol), + _connection_type(connection_type), + _connection_group(connection_group) { REGISTER_HOOK_METRIC(brpc_function_endpoint_stub_count, [this]() { return _stub_map.size(); }); } diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index ebef80f4a6..09c92fb398 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -59,7 +59,8 @@ namespace doris { template class BrpcClientCache { public: - BrpcClientCache(); + BrpcClientCache(std::string protocol = "baidu_std", std::string connection_type = "", + std::string connection_group = ""); virtual ~BrpcClientCache(); std::shared_ptr get_client(const butil::EndPoint& endpoint) { @@ -110,20 +111,24 @@ public: } std::shared_ptr get_new_client_no_cache(const std::string& host_port, - const std::string& protocol = "baidu_std", - const std::string& connect_type = "", + const std::string& protocol = "", + const std::string& connection_type = "", const std::string& connection_group = "") { brpc::ChannelOptions options; - if constexpr (std::is_same_v) { - options.protocol = config::function_service_protocol; - } else { + if (protocol != "") { options.protocol = protocol; + } else if (_protocol != "") { + options.protocol = _protocol; } - if (connect_type != "") { - options.connection_type = connect_type; + if (connection_type != "") { + options.connection_type = connection_type; + } else if (_connection_type != "") { + options.connection_type = _connection_type; } if (connection_group != "") { options.connection_group = connection_group; + } else if (_connection_group != "") { + options.connection_group = _connection_group; } options.connect_timeout_ms = 2000; options.timeout_ms = 2000; @@ -204,6 +209,9 @@ public: private: StubMap _stub_map; + const std::string _protocol; + const std::string _connection_type; + const std::string _connection_group; }; using InternalServiceClientCache = BrpcClientCache; diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 513ef91723..567efdc9ae 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -176,6 +176,7 @@ public: UIntGauge* stream_load_pipe_count = nullptr; UIntGauge* new_stream_load_pipe_count = nullptr; UIntGauge* brpc_endpoint_stub_count = nullptr; + UIntGauge* brpc_stream_endpoint_stub_count = nullptr; UIntGauge* brpc_function_endpoint_stub_count = nullptr; UIntGauge* tablet_writer_count = nullptr; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index dc34b13e0a..c535f03214 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -183,8 +183,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, } POpenLoadStreamResponse response; // set connection_group "streaming" to distinguish with non-streaming connections - const auto& stub = - client_cache->get_new_client_no_cache(host_port, "baidu_std", "single", "streaming"); + const auto& stub = client_cache->get_client(host_port); stub->open_load_stream(&cntl, &request, &response, nullptr); for (const auto& resp : response.tablet_schemas()) { auto tablet_schema = std::make_unique(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index fecbd324c5..6013e31609 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -281,13 +281,13 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream // get tablet schema from each backend only in the 1st stream for (auto& stream : streams | std::ranges::views::take(1)) { const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, _state->enable_profile())); } // for the rest streams, open without getting tablet schema for (auto& stream : streams | std::ranges::views::drop(1)) { - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), *node_info, + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, *_schema, {}, _total_streams, idle_timeout_ms, _state->enable_profile())); }