[feature](move-memtable) share delta writer v2 among sinks (#24066)

This commit is contained in:
Kaijie Chen
2023-09-13 14:39:29 +08:00
committed by GitHub
parent d87b852e18
commit 563c3f75ff
15 changed files with 622 additions and 93 deletions

View File

@ -730,7 +730,11 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576");
// In most cases, it does not need to be modified.
DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1");
// number of brpc stream per OlapTableSinkV2
// share brpc streams when memtable_on_sink_node = true
DEFINE_Bool(share_load_streams, "true");
// share delta writers when memtable_on_sink_node = true
DEFINE_Bool(share_delta_writers, "true");
// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = true)
DEFINE_Int32(num_streams_per_sink, "5");
// timeout for open stream sink rpc in ms
DEFINE_Int64(open_stream_sink_timeout_ms, "500");

View File

@ -789,7 +789,11 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes);
// In most cases, it does not need to be modified.
DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
// number of brpc stream per OlapTableSinkV2
// share brpc streams when memtable_on_sink_node = true
DECLARE_Bool(share_load_streams);
// share delta writers when memtable_on_sink_node = true
DECLARE_Bool(share_delta_writers);
// number of brpc stream per OlapTableSinkV2 (per load if share_load_streams = true)
DECLARE_Int32(num_streams_per_sink);
// timeout for open stream sink rpc in ms
DECLARE_Int64(open_stream_sink_timeout_ms);

View File

@ -29,9 +29,13 @@
#include "time.h"
#include "util/debug_util.h"
#include "util/time.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub_pool.h"
namespace doris {
ExecEnv::ExecEnv() = default;
ExecEnv::~ExecEnv() {
destroy();
}

View File

@ -50,6 +50,10 @@ class TaskScheduler;
namespace taskgroup {
class TaskGroupManager;
}
namespace stream_load {
class DeltaWriterV2Pool;
class LoadStreamStubPool;
} // namespace stream_load
namespace io {
class S3FileBufferPool;
class FileCacheFactory;
@ -234,6 +238,11 @@ public:
vectorized::ZoneList& global_zone_cache() { return *_global_zone_cache; }
std::shared_mutex& zone_cache_rw_lock() { return _zone_cache_rw_lock; }
stream_load::LoadStreamStubPool* load_stream_stub_pool() {
return _load_stream_stub_pool.get();
}
stream_load::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); }
void wait_for_all_tasks_done();
void update_frontends(const std::vector<TFrontendInfo>& new_infos);
@ -257,7 +266,7 @@ public:
}
private:
ExecEnv() = default;
ExecEnv();
[[nodiscard]] Status _init(const std::vector<StorePath>& store_paths);
void _destroy();
@ -334,6 +343,8 @@ private:
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<stream_load::LoadStreamStubPool> _load_stream_stub_pool;
std::unique_ptr<stream_load::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::unique_ptr<vectorized::ZoneList> _global_zone_cache;
std::shared_mutex _zone_cache_rw_lock;

View File

@ -92,6 +92,8 @@
#include "util/timezone_utils.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub_pool.h"
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \
!defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
@ -207,6 +209,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_group_commit_mgr = new GroupCommitMgr(this);
_file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
_load_stream_stub_pool = std::make_unique<stream_load::LoadStreamStubPool>();
_delta_writer_v2_pool = std::make_unique<stream_load::DeltaWriterV2Pool>();
_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
@ -543,6 +547,8 @@ void ExecEnv::destroy() {
_deregister_metrics();
SAFE_DELETE(_load_channel_mgr);
_memtable_memory_limiter.reset(nullptr);
_load_stream_stub_pool.reset();
_delta_writer_v2_pool.reset();
// shared_ptr maybe no need to be reset
// _brpc_iobuf_block_memory_tracker.reset();

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/delta_writer_v2_pool.h"
#include "olap/delta_writer_v2.h"
namespace doris {
class TExpr;
namespace stream_load {
DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id), _use_cnt(1) {}
DeltaWriterV2Map::~DeltaWriterV2Map() = default;
DeltaWriterV2* DeltaWriterV2Map::get_or_create(int64_t tablet_id,
std::function<DeltaWriterV2*()> creator) {
_map.lazy_emplace(tablet_id, [&](const TabletToDeltaWriterV2Map::constructor& ctor) {
ctor(tablet_id, creator());
});
return _map.at(tablet_id).get();
}
Status DeltaWriterV2Map::close() {
if (--_use_cnt > 0) {
return Status::OK();
}
Status status = Status::OK();
_map.for_each([&status](auto& entry) {
if (status.ok()) {
status = entry.second->close();
}
});
if (!status.ok()) {
return status;
}
_map.for_each([&status](auto& entry) {
if (status.ok()) {
status = entry.second->close_wait();
}
});
return status;
}
void DeltaWriterV2Map::cancel(Status status) {
_map.for_each([&status](auto& entry) { entry.second->cancel_with_status(status); });
}
DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id) {
UniqueId id {load_id};
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<DeltaWriterV2Map> map = _pool[id].lock();
if (map) {
map->grab();
return map;
}
auto deleter = [this](DeltaWriterV2Map* m) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(m->unique_id());
delete m;
};
map = std::shared_ptr<DeltaWriterV2Map>(new DeltaWriterV2Map(id), deleter);
_pool[id] = map;
return map;
}
} // namespace stream_load
} // namespace doris

View File

@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <brpc/controller.h>
#include <bthread/types.h>
#include <butil/errno.h>
#include <fmt/format.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <parallel_hashmap/phmap.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/config.h"
#include "util/uid_util.h"
namespace doris {
class DeltaWriterV2;
namespace stream_load {
class DeltaWriterV2Map {
public:
DeltaWriterV2Map(UniqueId load_id);
~DeltaWriterV2Map();
void grab() { ++_use_cnt; }
// get or create delta writer for the given tablet, memory is managed by DeltaWriterV2Map
DeltaWriterV2* get_or_create(int64_t tablet_id, std::function<DeltaWriterV2*()> creator);
// close all delta writers in this DeltaWriterV2Map if there is no other users
Status close();
// cancel all delta writers in this DeltaWriterV2Map
void cancel(Status status);
UniqueId unique_id() const { return _load_id; }
size_t size() const { return _map.size(); }
private:
using TabletToDeltaWriterV2Map = phmap::parallel_flat_hash_map<
int64_t, std::unique_ptr<DeltaWriterV2>, std::hash<int64_t>, std::equal_to<int64_t>,
std::allocator<phmap::Pair<const int64_t, std::unique_ptr<DeltaWriterV2>>>, 4,
std::mutex>;
UniqueId _load_id;
TabletToDeltaWriterV2Map _map;
std::atomic<int> _use_cnt;
};
class DeltaWriterV2Pool {
public:
DeltaWriterV2Pool();
~DeltaWriterV2Pool();
std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
return _pool.size();
}
private:
std::mutex _mutex;
std::unordered_map<UniqueId, std::weak_ptr<DeltaWriterV2Map>> _pool;
};
} // namespace stream_load
} // namespace doris

View File

@ -108,6 +108,7 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, bool enable_profile) {
_num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_init) {
return Status::OK();
@ -188,6 +189,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (--_num_open > 0) {
return Status::OK();
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);

View File

@ -26,6 +26,7 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <parallel_hashmap/phmap.h>
#include <stddef.h>
#include <stdint.h>
@ -71,8 +72,14 @@ class LoadStreamStub;
struct SegmentStatistics;
using IndexToTabletSchema = std::unordered_map<int64_t, std::shared_ptr<TabletSchema>>;
using IndexToEnableMoW = std::unordered_map<int64_t, bool>;
using IndexToTabletSchema = phmap::parallel_flat_hash_map<
int64_t, std::shared_ptr<TabletSchema>, std::hash<int64_t>, std::equal_to<int64_t>,
std::allocator<phmap::Pair<const int64_t, std::shared_ptr<TabletSchema>>>, 4, std::mutex>;
using IndexToEnableMoW =
phmap::parallel_flat_hash_map<int64_t, bool, std::hash<int64_t>, std::equal_to<int64_t>,
std::allocator<phmap::Pair<const int64_t, bool>>, 4,
std::mutex>;
class LoadStreamStub {
private:
@ -175,6 +182,8 @@ protected:
bthread::Mutex _mutex;
bthread::ConditionVariable _close_cv;
std::atomic<int> _num_open;
std::mutex _buffer_mutex;
std::mutex _send_mutex;
butil::IOBuf _buffer;

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/load_stream_stub.h"
namespace doris {
class TExpr;
namespace stream_load {
LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamStubPool::~LoadStreamStubPool() = default;
std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _pool[key].lock();
if (streams) {
return streams;
}
int32_t num_streams = std::max(1, config::num_streams_per_sink);
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id});
auto deleter = [this, key](Streams* s) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(key);
_template_stubs.erase(key.first);
delete s;
};
streams = std::shared_ptr<Streams>(new Streams(), deleter);
for (int32_t i = 0; i < num_streams; i++) {
// copy construct, internal tablet schema map will be shared among all stubs
streams->emplace_back(new LoadStreamStub {*it->second});
}
_pool[key] = streams;
return streams;
}
} // namespace stream_load
} // namespace doris

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <brpc/controller.h>
#include <bthread/types.h>
#include <butil/errno.h>
#include <fmt/format.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <functional>
#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/tablet_info.h"
#include "gutil/ref_counted.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "util/countdown_latch.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/common/allocator.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris {
class LoadStreamStub;
namespace stream_load {
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
class LoadStreamStubPool {
public:
LoadStreamStubPool();
~LoadStreamStubPool();
std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
return _pool.size();
}
// for UT only
size_t templates_size() {
std::lock_guard<std::mutex> lock(_mutex);
return _template_stubs.size();
}
private:
std::mutex _mutex;
std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> _template_stubs;
std::unordered_map<std::pair<UniqueId, int64_t>, std::weak_ptr<Streams>> _pool;
};
} // namespace stream_load
} // namespace doris

View File

@ -30,6 +30,7 @@
#include <algorithm>
#include <execution>
#include <mutex>
#include <ranges>
#include <string>
#include <unordered_map>
@ -54,7 +55,9 @@
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
@ -153,41 +156,51 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
_stream_pool_for_node = std::make_shared<NodeToStreams>();
_delta_writer_for_tablet = std::make_shared<DeltaWriterForTablet>();
_build_tablet_node_mapping();
RETURN_IF_ERROR(_init_stream_pools());
return Status::OK();
}
Status VOlapTableSinkV2::_init_stream_pools() {
// stub template is for sharing internal schema map among all stubs
LoadStreamStub stub_template {_load_id, _sender_id};
for (auto& [node_id, _] : _tablets_for_node) {
auto node_info = _nodes_info->find_node(node_id);
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", node_id);
}
Streams& stream_pool = (*_stream_pool_for_node)[node_id];
RETURN_IF_ERROR(_init_stream_pool(*node_info, stream_pool, stub_template));
if (config::share_delta_writers) {
_delta_writer_for_tablet =
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(_load_id);
} else {
_delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
}
_build_tablet_node_mapping();
RETURN_IF_ERROR(_open_streams(state->backend_id()));
return Status::OK();
}
Status VOlapTableSinkV2::_init_stream_pool(const NodeInfo& node_info, Streams& stream_pool,
LoadStreamStub& stub_template) {
stream_pool.reserve(config::num_streams_per_sink);
for (int i = 0; i < config::num_streams_per_sink; ++i) {
// internal tablet schema map will be shared among all stubs
auto stream = std::make_unique<LoadStreamStub>(stub_template);
Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
for (auto& [dst_id, _] : _tablets_for_node) {
auto node_info = _nodes_info->find_node(dst_id);
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location", dst_id);
}
std::shared_ptr<Streams> streams;
if (config::share_load_streams) {
streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id);
} else {
int32_t num_streams = std::max(1, config::num_streams_per_sink);
streams = std::make_shared<Streams>();
LoadStreamStub template_stub {_load_id, _sender_id};
for (int32_t i = 0; i < num_streams; i++) {
// copy construct, internal tablet schema map will be shared among all stubs
streams->emplace_back(new LoadStreamStub {template_stub});
}
}
// get tablet schema from each backend only in the 1st stream
const std::vector<PTabletID>& tablets_for_schema =
i == 0 ? _indexes_from_node[node_info.id] : std::vector<PTabletID> {};
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), node_info,
_txn_id, *_schema, tablets_for_schema,
_state->enable_profile()));
stream_pool.emplace_back(std::move(stream));
for (auto& stream : *streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, tablets_for_schema,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : *streams | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema, {},
_state->enable_profile()));
}
_streams_for_node[dst_id] = streams;
}
return Status::OK();
}
@ -238,7 +251,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, Streams& streams) {
return Status::InternalError("unknown tablet location, tablet id = {}", tablet_id);
}
for (auto& node_id : location->node_ids) {
streams.emplace_back(_stream_pool_for_node->at(node_id)[_stream_index]);
streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
}
_stream_index = (_stream_index + 1) % config::num_streams_per_sink;
return Status::OK();
@ -310,36 +323,27 @@ Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* input_bloc
Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> block,
int64_t tablet_id, const Rows& rows,
const Streams& streams) {
DeltaWriterV2* delta_writer = nullptr;
{
auto it = _delta_writer_for_tablet->find(tablet_id);
if (it == _delta_writer_for_tablet->end()) {
VLOG_DEBUG << "Creating DeltaWriterV2 for Tablet(tablet id: " << tablet_id
<< ", index id: " << rows.index_id << ")";
WriteRequest req;
req.partition_id = rows.partition_id;
req.index_id = rows.index_id;
req.tablet_id = tablet_id;
req.txn_id = _txn_id;
req.load_id = _load_id;
req.tuple_desc = _output_tuple_desc;
req.is_high_priority = _is_high_priority;
req.table_schema_param = _schema.get();
for (auto& index : _schema->indexes()) {
if (index->index_id == rows.index_id) {
req.slots = &index->slots;
req.schema_hash = index->schema_hash;
break;
}
DeltaWriterV2* delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
WriteRequest req;
req.partition_id = rows.partition_id;
req.index_id = rows.index_id;
req.tablet_id = tablet_id;
req.txn_id = _txn_id;
req.load_id = _load_id;
req.tuple_desc = _output_tuple_desc;
req.is_high_priority = _is_high_priority;
req.table_schema_param = _schema.get();
for (auto& index : _schema->indexes()) {
if (index->index_id == rows.index_id) {
req.slots = &index->slots;
req.schema_hash = index->schema_hash;
break;
}
DeltaWriterV2::open(&req, streams, &delta_writer, _profile);
_delta_writer_for_tablet->emplace(tablet_id, delta_writer);
} else {
VLOG_DEBUG << "Reusing DeltaWriterV2 for Tablet(tablet id: " << tablet_id
<< ", index id: " << rows.index_id << ")";
delta_writer = it->second.get();
}
}
DeltaWriterV2* delta_writer = nullptr;
DeltaWriterV2::open(&req, streams, &delta_writer, _profile);
return delta_writer;
});
{
SCOPED_TIMER(_wait_mem_limit_timer);
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
@ -352,12 +356,10 @@ Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
Status VOlapTableSinkV2::_cancel(Status status) {
LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", due to error: " << status;
if (_delta_writer_for_tablet.use_count() == 1) {
std::for_each(std::begin(*_delta_writer_for_tablet), std::end(*_delta_writer_for_tablet),
[&status](auto&& entry) { entry.second->cancel_with_status(status); });
if (_delta_writer_for_tablet) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
_delta_writer_for_tablet.reset();
return Status::OK();
}
@ -382,37 +384,30 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
{
SCOPED_TIMER(_close_writer_timer);
// close all delta writers
if (_delta_writer_for_tablet.use_count() == 1) {
std::for_each(std::begin(*_delta_writer_for_tablet),
std::end(*_delta_writer_for_tablet),
[](auto&& entry) { entry.second->close(); });
std::for_each(std::begin(*_delta_writer_for_tablet),
std::end(*_delta_writer_for_tablet),
[](auto&& entry) { entry.second->close_wait(); });
}
// close all delta writers if this is the last user
_delta_writer_for_tablet->close();
_delta_writer_for_tablet.reset();
}
{
// send CLOSE_LOAD to all streams, return ERROR if any
for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
RETURN_IF_ERROR(_close_load(stream_pool));
for (const auto& [_, streams] : _streams_for_node) {
RETURN_IF_ERROR(_close_load(*streams));
}
}
{
SCOPED_TIMER(_close_load_timer);
for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
for (const auto& stream : stream_pool) {
for (const auto& [_, streams] : _streams_for_node) {
for (const auto& stream : *streams) {
stream->close_wait();
}
}
}
std::vector<TTabletCommitInfo> tablet_commit_infos;
for (const auto& [node_id, stream_pool] : *_stream_pool_for_node) {
for (const auto& stream : stream_pool) {
for (const auto& [node_id, streams] : _streams_for_node) {
for (const auto& stream : *streams) {
for (auto tablet_id : stream->success_tablets()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet_id;
@ -424,7 +419,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
std::make_move_iterator(tablet_commit_infos.end()));
_stream_pool_for_node.reset();
_streams_for_node.clear();
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() +

View File

@ -80,10 +80,9 @@ namespace stream_load {
class OlapTableBlockConvertor;
class OlapTabletFinder;
class VOlapTableSinkV2;
class DeltaWriterV2Map;
using DeltaWriterForTablet = std::unordered_map<int64_t, std::unique_ptr<DeltaWriterV2>>;
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
using NodeToStreams = std::unordered_map<int64_t, Streams>;
using NodeIdForStream = std::unordered_map<brpc::StreamId, int64_t>;
using NodePartitionTabletMapping =
std::unordered_map<int64_t, std::unordered_map<int64_t, std::unordered_set<int64_t>>>;
@ -133,10 +132,7 @@ public:
Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override;
private:
Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool,
LoadStreamStub& stub_template);
Status _init_stream_pools();
Status _open_streams(int64_t src_id);
void _build_tablet_node_mapping();
@ -215,9 +211,9 @@ private:
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
std::shared_ptr<NodeToStreams> _stream_pool_for_node;
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterForTablet> _delta_writer_for_tablet;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
std::atomic<int> _pending_streams {0};

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/delta_writer_v2_pool.h"
#include <gtest/gtest.h>
#include "olap/delta_writer_v2.h"
namespace doris {
namespace stream_load {
class DeltaWriterV2PoolTest : public testing::Test {
public:
DeltaWriterV2PoolTest() = default;
virtual ~DeltaWriterV2PoolTest() = default;
};
TEST_F(DeltaWriterV2PoolTest, test_pool) {
DeltaWriterV2Pool pool;
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
PUniqueId load_id2;
load_id2.set_hi(1);
load_id2.set_hi(3);
auto map = pool.get_or_create(load_id);
auto map2 = pool.get_or_create(load_id2);
auto map3 = pool.get_or_create(load_id);
EXPECT_EQ(2, pool.size());
EXPECT_EQ(map, map3);
EXPECT_NE(map, map2);
map.reset();
map2.reset();
map3.reset();
EXPECT_EQ(0, pool.size());
}
TEST_F(DeltaWriterV2PoolTest, test_map) {
DeltaWriterV2Pool pool;
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
auto map = pool.get_or_create(load_id);
EXPECT_EQ(1, pool.size());
WriteRequest req;
auto writer = map->get_or_create(100, [&req]() {
RuntimeProfile profile("test");
DeltaWriterV2* writer;
DeltaWriterV2::open(&req, {}, &writer, &profile);
return writer;
});
auto writer2 = map->get_or_create(101, [&req]() {
RuntimeProfile profile("test");
DeltaWriterV2* writer;
DeltaWriterV2::open(&req, {}, &writer, &profile);
return writer;
});
auto writer3 = map->get_or_create(100, [&req]() {
RuntimeProfile profile("test");
DeltaWriterV2* writer;
DeltaWriterV2::open(&req, {}, &writer, &profile);
return writer;
});
EXPECT_EQ(2, map->size());
EXPECT_EQ(writer, writer3);
EXPECT_NE(writer, writer2);
map.reset();
EXPECT_EQ(0, pool.size());
}
} // namespace stream_load
} // namespace doris

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/load_stream_stub_pool.h"
#include <gtest/gtest.h>
#include "vec/sink/load_stream_stub.h"
namespace doris {
namespace stream_load {
class LoadStreamStubPoolTest : public testing::Test {
public:
LoadStreamStubPoolTest() = default;
virtual ~LoadStreamStubPoolTest() = default;
};
TEST_F(LoadStreamStubPoolTest, test) {
LoadStreamStubPool pool;
int64_t src_id = 100;
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
auto streams1 = pool.get_or_create(load_id, src_id, 101);
auto streams2 = pool.get_or_create(load_id, src_id, 102);
auto streams3 = pool.get_or_create(load_id, src_id, 101);
EXPECT_EQ(2, pool.size());
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);
EXPECT_NE(streams1, streams2);
streams1.reset();
streams2.reset();
streams3.reset();
EXPECT_EQ(0, pool.size());
EXPECT_EQ(0, pool.templates_size());
}
} // namespace stream_load
} // namespace doris