From 190debaac92cade94e3101cb673bfb09f0c34472 Mon Sep 17 00:00:00 2001 From: zzzxl <33418555+zzzxl1993@users.noreply.github.com> Date: Tue, 20 Jun 2023 20:29:39 +0800 Subject: [PATCH] [Improvement](load) single partition load optimize (#20876) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. When creating a single partition,partition and tablet are not looked up for each row of data 2. Only DISTRIBUTED BY random --- be/src/vec/sink/vtablet_sink.cpp | 94 ++++++++++++++++++++++++++------ be/src/vec/sink/vtablet_sink.h | 3 + 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index e8871bae40..25bb88dfb7 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -36,9 +36,11 @@ #include #include +#include #include #include #include +#include // IWYU pragma: no_include #include "common/compiler_util.h" // IWYU pragma: keep @@ -1276,6 +1278,56 @@ void VOlapTableSink::_generate_row_distribution_payload( } } +Status VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorized::Block* block, + ChannelDistributionPayload& channel_to_payload, + size_t num_rows, int32_t filtered_rows) { + const VOlapTablePartition* partition = nullptr; + uint32_t tablet_index = 0; + bool stop_processing = false; + for (int32_t i = 0; i < num_rows; ++i) { + if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) { + continue; + } + bool is_continue = false; + RETURN_IF_ERROR(find_tablet(state, block, i, &partition, tablet_index, stop_processing, + is_continue)); + if (is_continue) { + continue; + } + if (config::enable_lazy_open_partition) { + _open_partition(partition); + } + break; + } + for (int j = 0; j < partition->indexes.size(); ++j) { + auto tid = partition->indexes[j].tablets[tablet_index]; + auto it = _channels[j]->_channels_by_tablet.find(tid); + DCHECK(it != _channels[j]->_channels_by_tablet.end()) + << "unknown tablet, tablet_id=" << tablet_index; + int64_t row_cnt = 0; + for (const auto& channel : it->second) { + if (channel_to_payload[j].count(channel.get()) < 1) { + channel_to_payload[j].insert( + {channel.get(), Payload {std::unique_ptr( + new vectorized::IColumn::Selector()), + std::vector()}}); + } + auto& selector = channel_to_payload[j][channel.get()].first; + auto& tablet_ids = channel_to_payload[j][channel.get()].second; + for (int32_t i = 0; i < num_rows; ++i) { + if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) { + continue; + } + selector->push_back(i); + } + tablet_ids.resize(selector->size(), tid); + row_cnt = selector->size(); + } + _number_output_rows += row_cnt; + } + return Status::OK(); +} + Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, bool eos) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Status status = Status::OK(); @@ -1328,24 +1380,30 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, _partition_to_tablet_map.clear(); } _row_distribution_watch.start(); - for (int i = 0; i < num_rows; ++i) { - if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) { - continue; - } - const VOlapTablePartition* partition = nullptr; - bool is_continue = false; - uint32_t tablet_index = 0; - RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing, - is_continue)); - if (is_continue) { - continue; - } - // each row - _generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i, 1); - // open partition - if (config::enable_lazy_open_partition) { - // aysnc open operation,don't block send operation - _open_partition(partition); + size_t partition_num = _vpartition->get_partitions().size(); + if (partition_num == 1 && findTabletMode == FindTabletMode::FIND_TABLET_EVERY_SINK) { + RETURN_IF_ERROR(_single_partition_generate(state, &block, channel_to_payload, num_rows, + filtered_rows)); + } else { + for (int i = 0; i < num_rows; ++i) { + if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) { + continue; + } + const VOlapTablePartition* partition = nullptr; + bool is_continue = false; + uint32_t tablet_index = 0; + RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing, + is_continue)); + if (is_continue) { + continue; + } + // each row + _generate_row_distribution_payload(channel_to_payload, partition, tablet_index, i, 1); + // open partition + if (config::enable_lazy_open_partition) { + // aysnc open operation,don't block send operation + _open_partition(partition); + } } } _row_distribution_watch.stop(); diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 1e91f4247f..6a6d2f6038 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -490,6 +490,9 @@ private: void _generate_row_distribution_payload(ChannelDistributionPayload& payload, const VOlapTablePartition* partition, uint32_t tablet_index, int row_idx, size_t row_cnt); + Status _single_partition_generate(RuntimeState* state, vectorized::Block* block, + ChannelDistributionPayload& channel_to_payload, + size_t num_rows, int32_t filtered_rows); // make input data valid for OLAP table // return number of invalid/filtered rows.