diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index eecc4a1118..7b10bc7212 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -730,7 +730,11 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576"); // In most cases, it does not need to be modified. DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); -// number of brpc stream per OlapTableSinkV2 +// share brpc streams when memtable_on_sink_node = true +DEFINE_Bool(share_load_streams, "true"); +// share delta writers when memtable_on_sink_node = true +DEFINE_Bool(share_delta_writers, "true"); +// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = true) DEFINE_Int32(num_streams_per_sink, "5"); // timeout for open stream sink rpc in ms DEFINE_Int64(open_stream_sink_timeout_ms, "500"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 83b7dbd95f..eefcbb9b16 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -789,7 +789,11 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes); // In most cases, it does not need to be modified. DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); -// number of brpc stream per OlapTableSinkV2 +// share brpc streams when memtable_on_sink_node = true +DECLARE_Bool(share_load_streams); +// share delta writers when memtable_on_sink_node = true +DECLARE_Bool(share_delta_writers); +// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = true) DECLARE_Int32(num_streams_per_sink); // timeout for open stream sink rpc in ms DECLARE_Int64(open_stream_sink_timeout_ms); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 27986f5de3..aab83fba5f 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -29,9 +29,13 @@ #include "time.h" #include "util/debug_util.h" #include "util/time.h" +#include "vec/sink/delta_writer_v2_pool.h" +#include "vec/sink/load_stream_stub_pool.h" namespace doris { +ExecEnv::ExecEnv() = default; + ExecEnv::~ExecEnv() { destroy(); } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index fd54d44166..d52e7cd361 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -50,6 +50,10 @@ class TaskScheduler; namespace taskgroup { class TaskGroupManager; } +namespace stream_load { +class DeltaWriterV2Pool; +class LoadStreamStubPool; +} // namespace stream_load namespace io { class S3FileBufferPool; class FileCacheFactory; @@ -234,6 +238,11 @@ public: vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; } std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; } + stream_load::LoadStreamStubPool* load_stream_stub_pool() { + return _load_stream_stub_pool.get(); + } + stream_load::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); } + void wait_for_all_tasks_done(); void update_frontends(const std::vector& new_infos); @@ -257,7 +266,7 @@ public: } private: - ExecEnv() = default; + ExecEnv(); [[nodiscard]] Status _init(const std::vector& store_paths); void _destroy(); @@ -334,6 +343,8 @@ private: // To save meta info of external file, such as parquet footer. FileMetaCache* _file_meta_cache = nullptr; std::unique_ptr _memtable_memory_limiter; + std::unique_ptr _load_stream_stub_pool; + std::unique_ptr _delta_writer_v2_pool; std::unique_ptr _global_zone_cache; std::shared_mutex _zone_cache_rw_lock; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index bcdbf49801..24234a90d9 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -92,6 +92,8 @@ #include "util/timezone_utils.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" +#include "vec/sink/delta_writer_v2_pool.h" +#include "vec/sink/load_stream_stub_pool.h" #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) @@ -207,6 +209,8 @@ Status ExecEnv::_init(const std::vector& store_paths) { _group_commit_mgr = new GroupCommitMgr(this); _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); _memtable_memory_limiter = std::make_unique(); + _load_stream_stub_pool = std::make_unique(); + _delta_writer_v2_pool = std::make_unique(); _backend_client_cache->init_metrics("backend"); _frontend_client_cache->init_metrics("frontend"); @@ -543,6 +547,8 @@ void ExecEnv::destroy() { _deregister_metrics(); SAFE_DELETE(_load_channel_mgr); _memtable_memory_limiter.reset(nullptr); + _load_stream_stub_pool.reset(); + _delta_writer_v2_pool.reset(); // shared_ptr maybe no need to be reset // _brpc_iobuf_block_memory_tracker.reset(); diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp b/be/src/vec/sink/delta_writer_v2_pool.cpp new file mode 100644 index 0000000000..c1066174a8 --- /dev/null +++ b/be/src/vec/sink/delta_writer_v2_pool.cpp @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/delta_writer_v2_pool.h" + +#include "olap/delta_writer_v2.h" + +namespace doris { +class TExpr; + +namespace stream_load { + +DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id), _use_cnt(1) {} + +DeltaWriterV2Map::~DeltaWriterV2Map() = default; + +DeltaWriterV2* DeltaWriterV2Map::get_or_create(int64_t tablet_id, + std::function creator) { + _map.lazy_emplace(tablet_id, [&](const TabletToDeltaWriterV2Map::constructor& ctor) { + ctor(tablet_id, creator()); + }); + return _map.at(tablet_id).get(); +} + +Status DeltaWriterV2Map::close() { + if (--_use_cnt > 0) { + return Status::OK(); + } + Status status = Status::OK(); + _map.for_each([&status](auto& entry) { + if (status.ok()) { + status = entry.second->close(); + } + }); + if (!status.ok()) { + return status; + } + _map.for_each([&status](auto& entry) { + if (status.ok()) { + status = entry.second->close_wait(); + } + }); + return status; +} + +void DeltaWriterV2Map::cancel(Status status) { + _map.for_each([&status](auto& entry) { entry.second->cancel_with_status(status); }); +} + +DeltaWriterV2Pool::DeltaWriterV2Pool() = default; + +DeltaWriterV2Pool::~DeltaWriterV2Pool() = default; + +std::shared_ptr DeltaWriterV2Pool::get_or_create(PUniqueId load_id) { + UniqueId id {load_id}; + std::lock_guard lock(_mutex); + std::shared_ptr map = _pool[id].lock(); + if (map) { + map->grab(); + return map; + } + auto deleter = [this](DeltaWriterV2Map* m) { + std::lock_guard lock(_mutex); + _pool.erase(m->unique_id()); + delete m; + }; + map = std::shared_ptr(new DeltaWriterV2Map(id), deleter); + _pool[id] = map; + return map; +} + +} // namespace stream_load +} // namespace doris diff --git a/be/src/vec/sink/delta_writer_v2_pool.h b/be/src/vec/sink/delta_writer_v2_pool.h new file mode 100644 index 0000000000..d0328d7d2b --- /dev/null +++ b/be/src/vec/sink/delta_writer_v2_pool.h @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +// IWYU pragma: no_include + +#include // IWYU pragma: keep +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "util/uid_util.h" + +namespace doris { + +class DeltaWriterV2; + +namespace stream_load { + +class DeltaWriterV2Map { +public: + DeltaWriterV2Map(UniqueId load_id); + + ~DeltaWriterV2Map(); + + void grab() { ++_use_cnt; } + + // get or create delta writer for the given tablet, memory is managed by DeltaWriterV2Map + DeltaWriterV2* get_or_create(int64_t tablet_id, std::function creator); + + // close all delta writers in this DeltaWriterV2Map if there is no other users + Status close(); + + // cancel all delta writers in this DeltaWriterV2Map + void cancel(Status status); + + UniqueId unique_id() const { return _load_id; } + + size_t size() const { return _map.size(); } + +private: + using TabletToDeltaWriterV2Map = phmap::parallel_flat_hash_map< + int64_t, std::unique_ptr, std::hash, std::equal_to, + std::allocator>>, 4, + std::mutex>; + + UniqueId _load_id; + TabletToDeltaWriterV2Map _map; + std::atomic _use_cnt; +}; + +class DeltaWriterV2Pool { +public: + DeltaWriterV2Pool(); + + ~DeltaWriterV2Pool(); + + std::shared_ptr get_or_create(PUniqueId load_id); + + size_t size() { + std::lock_guard lock(_mutex); + return _pool.size(); + } + +private: + std::mutex _mutex; + std::unordered_map> _pool; +}; + +} // namespace stream_load +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 63bea0b6c7..052aa1f256 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -108,6 +108,7 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, const NodeInfo& node_info, int64_t txn_id, const OlapTableSchemaParam& schema, const std::vector& tablets_for_schema, bool enable_profile) { + _num_open++; std::unique_lock lock(_mutex); if (_is_init) { return Status::OK(); @@ -188,6 +189,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector& tablets_to_commit) { + if (--_num_open > 0) { + return Status::OK(); + } PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 8a1ae79c52..20cf5fc02a 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -71,8 +72,14 @@ class LoadStreamStub; struct SegmentStatistics; -using IndexToTabletSchema = std::unordered_map>; -using IndexToEnableMoW = std::unordered_map; +using IndexToTabletSchema = phmap::parallel_flat_hash_map< + int64_t, std::shared_ptr, std::hash, std::equal_to, + std::allocator>>, 4, std::mutex>; + +using IndexToEnableMoW = + phmap::parallel_flat_hash_map, std::equal_to, + std::allocator>, 4, + std::mutex>; class LoadStreamStub { private: @@ -175,6 +182,8 @@ protected: bthread::Mutex _mutex; bthread::ConditionVariable _close_cv; + std::atomic _num_open; + std::mutex _buffer_mutex; std::mutex _send_mutex; butil::IOBuf _buffer; diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp new file mode 100644 index 0000000000..848d038b2f --- /dev/null +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/load_stream_stub_pool.h" + +#include "vec/sink/load_stream_stub.h" + +namespace doris { +class TExpr; + +namespace stream_load { + +LoadStreamStubPool::LoadStreamStubPool() = default; + +LoadStreamStubPool::~LoadStreamStubPool() = default; +std::shared_ptr LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id, + int64_t dst_id) { + auto key = std::make_pair(UniqueId(load_id), dst_id); + std::lock_guard lock(_mutex); + std::shared_ptr streams = _pool[key].lock(); + if (streams) { + return streams; + } + int32_t num_streams = std::max(1, config::num_streams_per_sink); + auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id}); + auto deleter = [this, key](Streams* s) { + std::lock_guard lock(_mutex); + _pool.erase(key); + _template_stubs.erase(key.first); + delete s; + }; + streams = std::shared_ptr(new Streams(), deleter); + for (int32_t i = 0; i < num_streams; i++) { + // copy construct, internal tablet schema map will be shared among all stubs + streams->emplace_back(new LoadStreamStub {*it->second}); + } + _pool[key] = streams; + return streams; +} + +} // namespace stream_load +} // namespace doris diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h new file mode 100644 index 0000000000..ae550340d2 --- /dev/null +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +// IWYU pragma: no_include +#include // IWYU pragma: keep +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "common/status.h" +#include "exec/data_sink.h" +#include "exec/tablet_info.h" +#include "gutil/ref_counted.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "runtime/thread_context.h" +#include "runtime/types.h" +#include "util/countdown_latch.h" +#include "util/runtime_profile.h" +#include "util/stopwatch.hpp" +#include "vec/columns/column.h" +#include "vec/common/allocator.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" + +namespace doris { + +class LoadStreamStub; + +namespace stream_load { + +using Streams = std::vector>; + +class LoadStreamStubPool { +public: + LoadStreamStubPool(); + + ~LoadStreamStubPool(); + + std::shared_ptr get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id); + + size_t size() { + std::lock_guard lock(_mutex); + return _pool.size(); + } + + // for UT only + size_t templates_size() { + std::lock_guard lock(_mutex); + return _template_stubs.size(); + } + +private: + std::mutex _mutex; + std::unordered_map> _template_stubs; + std::unordered_map, std::weak_ptr> _pool; +}; + +} // namespace stream_load +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp b/be/src/vec/sink/vtablet_sink_v2.cpp index a88f05b88d..ebf50d222c 100644 --- a/be/src/vec/sink/vtablet_sink_v2.cpp +++ b/be/src/vec/sink/vtablet_sink_v2.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -54,7 +55,9 @@ #include "util/uid_util.h" #include "vec/core/block.h" #include "vec/exprs/vexpr.h" +#include "vec/sink/delta_writer_v2_pool.h" #include "vec/sink/load_stream_stub.h" +#include "vec/sink/load_stream_stub_pool.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -153,41 +156,51 @@ Status VOlapTableSinkV2::open(RuntimeState* state) { SCOPED_TIMER(_open_timer); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - _stream_pool_for_node = std::make_shared(); - _delta_writer_for_tablet = std::make_shared(); - _build_tablet_node_mapping(); - RETURN_IF_ERROR(_init_stream_pools()); - - return Status::OK(); -} - -Status VOlapTableSinkV2::_init_stream_pools() { - // stub template is for sharing internal schema map among all stubs - LoadStreamStub stub_template {_load_id, _sender_id}; - for (auto& [node_id, _] : _tablets_for_node) { - auto node_info = _nodes_info->find_node(node_id); - if (node_info == nullptr) { - return Status::InternalError("Unknown node {} in tablet location", node_id); - } - Streams& stream_pool = (*_stream_pool_for_node)[node_id]; - RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool, stub_template)); + if (config::share_delta_writers) { + _delta_writer_for_tablet = + ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(_load_id); + } else { + _delta_writer_for_tablet = std::make_shared(_load_id); } + _build_tablet_node_mapping(); + RETURN_IF_ERROR(_open_streams(state->backend_id())); + return Status::OK(); } -Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, Streams& stream_pool, - LoadStreamStub& stub_template) { - stream_pool.reserve(config::num_streams_per_sink); - for (int i = 0; i < config::num_streams_per_sink; ++i) { - // internal tablet schema map will be shared among all stubs - auto stream = std::make_unique(stub_template); +Status VOlapTableSinkV2::_open_streams(int64_t src_id) { + for (auto& [dst_id, _] : _tablets_for_node) { + auto node_info = _nodes_info->find_node(dst_id); + if (node_info == nullptr) { + return Status::InternalError("Unknown node {} in tablet location", dst_id); + } + std::shared_ptr streams; + if (config::share_load_streams) { + streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_id, src_id, dst_id); + } else { + int32_t num_streams = std::max(1, config::num_streams_per_sink); + streams = std::make_shared(); + LoadStreamStub template_stub {_load_id, _sender_id}; + for (int32_t i = 0; i < num_streams; i++) { + // copy construct, internal tablet schema map will be shared among all stubs + streams->emplace_back(new LoadStreamStub {template_stub}); + } + } // get tablet schema from each backend only in the 1st stream - const std::vector& tablets_for_schema = - i == 0 ? _indexes_from_node[node_info.id] : std::vector {}; - RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), node_info, - _txn_id, *_schema, tablets_for_schema, - _state->enable_profile())); - stream_pool.emplace_back(std::move(stream)); + for (auto& stream : *streams | std::ranges::views::take(1)) { + const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), + *node_info, _txn_id, *_schema, tablets_for_schema, + _state->enable_profile())); + } + // for the rest streams, open without getting tablet schema + for (auto& stream : *streams | std::ranges::views::drop(1)) { + RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), + *node_info, _txn_id, *_schema, {}, + _state->enable_profile())); + } + _streams_for_node[dst_id] = streams; } return Status::OK(); } @@ -238,7 +251,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) { return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id); } for (auto& node_id : location->node_ids) { - streams.emplace_back(_stream_pool_for_node->at(node_id)[_stream_index]); + streams.emplace_back(_streams_for_node[node_id]->at(_stream_index)); } _stream_index = (_stream_index + 1) % config::num_streams_per_sink; return Status::OK(); @@ -310,36 +323,27 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc Status VOlapTableSinkV2::_write_memtable(std::shared_ptr block, int64_t tablet_id, const Rows& rows, const Streams& streams) { - DeltaWriterV2* delta_writer = nullptr; - { - auto it = _delta_writer_for_tablet->find(tablet_id); - if (it == _delta_writer_for_tablet->end()) { - VLOG_DEBUG << "Creating DeltaWriterV2 for Tablet(tablet id: " << tablet_id - << ", index id: " << rows.index_id << ")"; - WriteRequest req; - req.partition_id = rows.partition_id; - req.index_id = rows.index_id; - req.tablet_id = tablet_id; - req.txn_id = _txn_id; - req.load_id = _load_id; - req.tuple_desc = _output_tuple_desc; - req.is_high_priority = _is_high_priority; - req.table_schema_param = _schema.get(); - for (auto& index : _schema->indexes()) { - if (index->index_id == rows.index_id) { - req.slots = &index->slots; - req.schema_hash = index->schema_hash; - break; - } + DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { + WriteRequest req; + req.partition_id = rows.partition_id; + req.index_id = rows.index_id; + req.tablet_id = tablet_id; + req.txn_id = _txn_id; + req.load_id = _load_id; + req.tuple_desc = _output_tuple_desc; + req.is_high_priority = _is_high_priority; + req.table_schema_param = _schema.get(); + for (auto& index : _schema->indexes()) { + if (index->index_id == rows.index_id) { + req.slots = &index->slots; + req.schema_hash = index->schema_hash; + break; } - DeltaWriterV2::open(&req, streams, &delta_writer, _profile); - _delta_writer_for_tablet->emplace(tablet_id, delta_writer); - } else { - VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " << tablet_id - << ", index id: " << rows.index_id << ")"; - delta_writer = it->second.get(); } - } + DeltaWriterV2* delta_writer = nullptr; + DeltaWriterV2::open(&req, streams, &delta_writer, _profile); + return delta_writer; + }); { SCOPED_TIMER(_wait_mem_limit_timer); ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); @@ -352,12 +356,10 @@ Status VOlapTableSinkV2::_write_memtable(std::shared_ptr bloc Status VOlapTableSinkV2::_cancel(Status status) { LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id << ", due to error: " << status; - - if (_delta_writer_for_tablet.use_count() == 1) { - std::for_each(std::begin(*_delta_writer_for_tablet), std::end(*_delta_writer_for_tablet), - [&status](auto&& entry) { entry.second->cancel_with_status(status); }); + if (_delta_writer_for_tablet) { + _delta_writer_for_tablet->cancel(status); + _delta_writer_for_tablet.reset(); } - _delta_writer_for_tablet.reset(); return Status::OK(); } @@ -382,37 +384,30 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { { SCOPED_TIMER(_close_writer_timer); - // close all delta writers - if (_delta_writer_for_tablet.use_count() == 1) { - std::for_each(std::begin(*_delta_writer_for_tablet), - std::end(*_delta_writer_for_tablet), - [](auto&& entry) { entry.second->close(); }); - std::for_each(std::begin(*_delta_writer_for_tablet), - std::end(*_delta_writer_for_tablet), - [](auto&& entry) { entry.second->close_wait(); }); - } + // close all delta writers if this is the last user + _delta_writer_for_tablet->close(); _delta_writer_for_tablet.reset(); } { // send CLOSE_LOAD to all streams, return ERROR if any - for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) { - RETURN_IF_ERROR(_close_load(stream_pool)); + for (const auto& [_, streams] : _streams_for_node) { + RETURN_IF_ERROR(_close_load(*streams)); } } { SCOPED_TIMER(_close_load_timer); - for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) { - for (const auto& stream : stream_pool) { + for (const auto& [_, streams] : _streams_for_node) { + for (const auto& stream : *streams) { stream->close_wait(); } } } std::vector tablet_commit_infos; - for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) { - for (const auto& stream : stream_pool) { + for (const auto& [node_id, streams] : _streams_for_node) { + for (const auto& stream : *streams) { for (auto tablet_id : stream->success_tablets()) { TTabletCommitInfo commit_info; commit_info.tabletId = tablet_id; @@ -424,7 +419,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), std::make_move_iterator(tablet_commit_infos.begin()), std::make_move_iterator(tablet_commit_infos.end())); - _stream_pool_for_node.reset(); + _streams_for_node.clear(); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 047377f4e2..bf983ba693 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -80,10 +80,9 @@ namespace stream_load { class OlapTableBlockConvertor; class OlapTabletFinder; class VOlapTableSinkV2; +class DeltaWriterV2Map; -using DeltaWriterForTablet = std::unordered_map>; using Streams = std::vector>; -using NodeToStreams = std::unordered_map; using NodeIdForStream = std::unordered_map; using NodePartitionTabletMapping = std::unordered_map>>; @@ -133,10 +132,7 @@ public: Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; private: - Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool, - LoadStreamStub& stub_template); - - Status _init_stream_pools(); + Status _open_streams(int64_t src_id); void _build_tablet_node_mapping(); @@ -215,9 +211,9 @@ private: std::unordered_map> _tablets_for_node; std::unordered_map> _indexes_from_node; - std::shared_ptr _stream_pool_for_node; + std::unordered_map> _streams_for_node; size_t _stream_index = 0; - std::shared_ptr _delta_writer_for_tablet; + std::shared_ptr _delta_writer_for_tablet; std::atomic _pending_streams {0}; diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp b/be/test/vec/exec/delta_writer_v2_pool_test.cpp new file mode 100644 index 0000000000..dfc3276ea7 --- /dev/null +++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "vec/sink/delta_writer_v2_pool.h" + +#include + +#include "olap/delta_writer_v2.h" + +namespace doris { + +namespace stream_load { + +class DeltaWriterV2PoolTest : public testing::Test { +public: + DeltaWriterV2PoolTest() = default; + virtual ~DeltaWriterV2PoolTest() = default; +}; + +TEST_F(DeltaWriterV2PoolTest, test_pool) { + DeltaWriterV2Pool pool; + PUniqueId load_id; + load_id.set_hi(1); + load_id.set_hi(2); + PUniqueId load_id2; + load_id2.set_hi(1); + load_id2.set_hi(3); + auto map = pool.get_or_create(load_id); + auto map2 = pool.get_or_create(load_id2); + auto map3 = pool.get_or_create(load_id); + EXPECT_EQ(2, pool.size()); + EXPECT_EQ(map, map3); + EXPECT_NE(map, map2); + map.reset(); + map2.reset(); + map3.reset(); + EXPECT_EQ(0, pool.size()); +} + +TEST_F(DeltaWriterV2PoolTest, test_map) { + DeltaWriterV2Pool pool; + PUniqueId load_id; + load_id.set_hi(1); + load_id.set_hi(2); + auto map = pool.get_or_create(load_id); + EXPECT_EQ(1, pool.size()); + WriteRequest req; + auto writer = map->get_or_create(100, [&req]() { + RuntimeProfile profile("test"); + DeltaWriterV2* writer; + DeltaWriterV2::open(&req, {}, &writer, &profile); + return writer; + }); + auto writer2 = map->get_or_create(101, [&req]() { + RuntimeProfile profile("test"); + DeltaWriterV2* writer; + DeltaWriterV2::open(&req, {}, &writer, &profile); + return writer; + }); + auto writer3 = map->get_or_create(100, [&req]() { + RuntimeProfile profile("test"); + DeltaWriterV2* writer; + DeltaWriterV2::open(&req, {}, &writer, &profile); + return writer; + }); + EXPECT_EQ(2, map->size()); + EXPECT_EQ(writer, writer3); + EXPECT_NE(writer, writer2); + map.reset(); + EXPECT_EQ(0, pool.size()); +} + +} // namespace stream_load +} // namespace doris diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp new file mode 100644 index 0000000000..f1ccb70bee --- /dev/null +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "vec/sink/load_stream_stub_pool.h" + +#include + +#include "vec/sink/load_stream_stub.h" + +namespace doris { + +namespace stream_load { + +class LoadStreamStubPoolTest : public testing::Test { +public: + LoadStreamStubPoolTest() = default; + virtual ~LoadStreamStubPoolTest() = default; +}; + +TEST_F(LoadStreamStubPoolTest, test) { + LoadStreamStubPool pool; + int64_t src_id = 100; + PUniqueId load_id; + load_id.set_hi(1); + load_id.set_hi(2); + auto streams1 = pool.get_or_create(load_id, src_id, 101); + auto streams2 = pool.get_or_create(load_id, src_id, 102); + auto streams3 = pool.get_or_create(load_id, src_id, 101); + EXPECT_EQ(2, pool.size()); + EXPECT_EQ(1, pool.templates_size()); + EXPECT_EQ(streams1, streams3); + EXPECT_NE(streams1, streams2); + streams1.reset(); + streams2.reset(); + streams3.reset(); + EXPECT_EQ(0, pool.size()); + EXPECT_EQ(0, pool.templates_size()); +} + +} // namespace stream_load +} // namespace doris