From db6c16058a026af8252eaa50e72a6ffdfc02703d Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 5 Oct 2023 22:09:59 +0800 Subject: [PATCH] [improve](move-memtable) always share load streams (#24763) --- be/src/common/config.cpp | 6 ++---- be/src/common/config.h | 6 ++---- be/src/vec/sink/load_stream_stub_pool.cpp | 2 +- be/src/vec/sink/vtablet_sink_v2.cpp | 16 +++------------- 4 files changed, 8 insertions(+), 22 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 9b4140bc29..753aef2524 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -733,12 +733,10 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576"); // In most cases, it does not need to be modified. DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); -// share brpc streams when memtable_on_sink_node = true -DEFINE_Bool(share_load_streams, "true"); // share delta writers when memtable_on_sink_node = true DEFINE_Bool(share_delta_writers, "true"); -// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = true) -DEFINE_Int32(num_streams_per_sink, "5"); +// number of brpc stream per load +DEFINE_Int32(num_streams_per_load, "5"); // timeout for open stream sink rpc in ms DEFINE_Int64(open_stream_sink_timeout_ms, "500"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 1189169187..3aaed3c006 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -791,12 +791,10 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes); // In most cases, it does not need to be modified. DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); -// share brpc streams when memtable_on_sink_node = true -DECLARE_Bool(share_load_streams); // share delta writers when memtable_on_sink_node = true DECLARE_Bool(share_delta_writers); -// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = true) -DECLARE_Int32(num_streams_per_sink); +// number of brpc stream per load +DECLARE_Int32(num_streams_per_load); // timeout for open stream sink rpc in ms DECLARE_Int64(open_stream_sink_timeout_ms); diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index 848d038b2f..834c152386 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -35,7 +35,7 @@ std::shared_ptr LoadStreamStubPool::get_or_create(PUniqueId load_id, in if (streams) { return streams; } - int32_t num_streams = std::max(1, config::num_streams_per_sink); + int32_t num_streams = std::max(1, config::num_streams_per_load); auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id}); auto deleter = [this, key](Streams* s) { std::lock_guard lock(_mutex); diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index 66ac604e74..45fa48cb32 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -176,18 +176,8 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) { return Status::InternalError("Unknown node {} in tablet location", dst_id); } std::shared_ptr streams; - if (config::share_load_streams) { - streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, src_id, dst_id); - } else { - int32_t num_streams = std::max(1, config::num_streams_per_sink); - streams = std::make_shared(); - LoadStreamStub template_stub {_load_id, _sender_id}; - for (int32_t i = 0; i < num_streams; i++) { - // copy construct, internal tablet schema map will be shared among all stubs - streams->emplace_back(new LoadStreamStub {template_stub}); - } - } + streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(_load_id, src_id, + dst_id); // get tablet schema from each backend only in the 1st stream for (auto& stream : *streams | std::ranges::views::take(1)) { const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; @@ -254,7 +244,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) { for (auto& node_id : location->node_ids) { streams.emplace_back(_streams_for_node[node_id]->at(_stream_index)); } - _stream_index = (_stream_index + 1) % config::num_streams_per_sink; + _stream_index = (_stream_index + 1) % config::num_streams_per_load; return Status::OK(); }