[improve](move-memtable) always share load streams (#24763)
This commit is contained in:
@ -733,12 +733,10 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576");
|
||||
// In most cases, it does not need to be modified.
|
||||
DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
|
||||
|
||||
// share brpc streams when memtable_on_sink_node = true
|
||||
DEFINE_Bool(share_load_streams, "true");
|
||||
// share delta writers when memtable_on_sink_node = true
|
||||
DEFINE_Bool(share_delta_writers, "true");
|
||||
// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = true)
|
||||
DEFINE_Int32(num_streams_per_sink, "5");
|
||||
// number of brpc stream per load
|
||||
DEFINE_Int32(num_streams_per_load, "5");
|
||||
// timeout for open stream sink rpc in ms
|
||||
DEFINE_Int64(open_stream_sink_timeout_ms, "500");
|
||||
|
||||
|
||||
@ -791,12 +791,10 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes);
|
||||
// In most cases, it does not need to be modified.
|
||||
DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
|
||||
|
||||
// share brpc streams when memtable_on_sink_node = true
|
||||
DECLARE_Bool(share_load_streams);
|
||||
// share delta writers when memtable_on_sink_node = true
|
||||
DECLARE_Bool(share_delta_writers);
|
||||
// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = true)
|
||||
DECLARE_Int32(num_streams_per_sink);
|
||||
// number of brpc stream per load
|
||||
DECLARE_Int32(num_streams_per_load);
|
||||
// timeout for open stream sink rpc in ms
|
||||
DECLARE_Int64(open_stream_sink_timeout_ms);
|
||||
|
||||
|
||||
@ -35,7 +35,7 @@ std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id, in
|
||||
if (streams) {
|
||||
return streams;
|
||||
}
|
||||
int32_t num_streams = std::max(1, config::num_streams_per_sink);
|
||||
int32_t num_streams = std::max(1, config::num_streams_per_load);
|
||||
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id});
|
||||
auto deleter = [this, key](Streams* s) {
|
||||
std::lock_guard<std::mutex> lock(_mutex);
|
||||
|
||||
@ -176,18 +176,8 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
|
||||
return Status::InternalError("Unknown node {} in tablet location", dst_id);
|
||||
}
|
||||
std::shared_ptr<Streams> streams;
|
||||
if (config::share_load_streams) {
|
||||
streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
|
||||
_load_id, src_id, dst_id);
|
||||
} else {
|
||||
int32_t num_streams = std::max(1, config::num_streams_per_sink);
|
||||
streams = std::make_shared<Streams>();
|
||||
LoadStreamStub template_stub {_load_id, _sender_id};
|
||||
for (int32_t i = 0; i < num_streams; i++) {
|
||||
// copy construct, internal tablet schema map will be shared among all stubs
|
||||
streams->emplace_back(new LoadStreamStub {template_stub});
|
||||
}
|
||||
}
|
||||
streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(_load_id, src_id,
|
||||
dst_id);
|
||||
// get tablet schema from each backend only in the 1st stream
|
||||
for (auto& stream : *streams | std::ranges::views::take(1)) {
|
||||
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
|
||||
@ -254,7 +244,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) {
|
||||
for (auto& node_id : location->node_ids) {
|
||||
streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
|
||||
}
|
||||
_stream_index = (_stream_index + 1) % config::num_streams_per_sink;
|
||||
_stream_index = (_stream_index + 1) % config::num_streams_per_load;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user