From 457de3fc55a8a17d7ba288d86a48292af155eb66 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 6 Jul 2023 16:51:32 +0800 Subject: [PATCH] [refactor](load) move find_tablet out of VOlapTableSink (#21462) --- be/src/vec/sink/vtablet_finder.cpp | 89 ++++++++++++++++++++++++++++++ be/src/vec/sink/vtablet_finder.h | 77 ++++++++++++++++++++++++++ be/src/vec/sink/vtablet_sink.cpp | 85 ++++++---------------------- be/src/vec/sink/vtablet_sink.h | 20 +------ 4 files changed, 186 insertions(+), 85 deletions(-) create mode 100644 be/src/vec/sink/vtablet_finder.cpp create mode 100644 be/src/vec/sink/vtablet_finder.h diff --git a/be/src/vec/sink/vtablet_finder.cpp b/be/src/vec/sink/vtablet_finder.cpp new file mode 100644 index 0000000000..2ee9f598b5 --- /dev/null +++ b/be/src/vec/sink/vtablet_finder.cpp @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vtablet_finder.h" + +#include + +#include +#include +#include +#include + +// IWYU pragma: no_include +#include "common/compiler_util.h" // IWYU pragma: keep +#include "common/status.h" +#include "exec/tablet_info.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "vec/core/block.h" + +namespace doris { +namespace stream_load { + +Status OlapTabletFinder::find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, + const VOlapTablePartition** partition, uint32_t& tablet_index, + bool& stop_processing, bool& is_continue) { + Status status = Status::OK(); + *partition = nullptr; + tablet_index = 0; + BlockRow block_row; + block_row = {block, row_index}; + if (!_vpartition->find_partition(&block_row, partition)) { + RETURN_IF_ERROR(state->append_error_msg_to_file( + []() -> std::string { return ""; }, + [&]() -> std::string { + fmt::memory_buffer buf; + fmt::format_to(buf, "no partition for this tuple. tuple={}", + block->dump_data(row_index, 1)); + return fmt::to_string(buf); + }, + &stop_processing)); + _num_filtered_rows++; + if (stop_processing) { + return Status::EndOfFile("Encountered unqualified data, stop processing"); + } + is_continue = true; + return status; + } + if (!(*partition)->is_mutable) { + _num_immutable_partition_filtered_rows++; + is_continue = true; + return status; + } + if ((*partition)->num_buckets <= 0) { + std::stringstream ss; + ss << "num_buckets must be greater than 0, num_buckets=" << (*partition)->num_buckets; + return Status::InternalError(ss.str()); + } + _partition_ids.emplace((*partition)->id); + if (_find_tablet_mode != FindTabletMode::FIND_TABLET_EVERY_ROW) { + if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) { + tablet_index = _vpartition->find_tablet(&block_row, **partition); + _partition_to_tablet_map.emplace((*partition)->id, tablet_index); + } else { + tablet_index = _partition_to_tablet_map[(*partition)->id]; + } + } else { + tablet_index = _vpartition->find_tablet(&block_row, **partition); + } + + return status; +} + +} // namespace stream_load +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_finder.h b/be/src/vec/sink/vtablet_finder.h new file mode 100644 index 0000000000..97282e403a --- /dev/null +++ b/be/src/vec/sink/vtablet_finder.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/tablet_info.h" +#include "vec/core/block.h" + +namespace doris { +namespace stream_load { + +class OlapTabletFinder { +public: + // FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we + // should compute tablet index for every row + // FIND_TABLET_EVERY_BATCH is only used for random distribution info, which indicates that we should + // compute tablet index for every row batch + // FIND_TABLET_EVERY_SINK is only used for random distribution info, which indicates that we should + // only compute tablet index in the corresponding partition once for the whole time in olap table sink + enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK }; + + OlapTabletFinder(VOlapTablePartitionParam* vpartition, FindTabletMode mode) + : _vpartition(vpartition), _find_tablet_mode(mode) {}; + + Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, + const VOlapTablePartition** partition, uint32_t& tablet_index, + bool& filtered, bool& is_continue); + + bool is_find_tablet_every_sink() { + return _find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_SINK; + } + + void clear_for_new_batch() { + if (_find_tablet_mode == FindTabletMode::FIND_TABLET_EVERY_BATCH) { + _partition_to_tablet_map.clear(); + } + } + + bool is_single_tablet() { return _partition_to_tablet_map.size() == 1; } + + const std::set& partition_ids() { return _partition_ids; } + + int64_t num_filtered_rows() const { return _num_filtered_rows; } + + int64_t num_immutable_partition_filtered_rows() const { + return _num_immutable_partition_filtered_rows; + } + +private: + VOlapTablePartitionParam* _vpartition; + FindTabletMode _find_tablet_mode; + std::map _partition_to_tablet_map; + std::set _partition_ids; + + int64_t _num_filtered_rows = 0; + int64_t _num_immutable_partition_filtered_rows = 0; +}; + +} // namespace stream_load +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 568c67ce1f..ca33b75a2d 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -90,6 +90,7 @@ #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" namespace doris { class TExpr; @@ -798,7 +799,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) { } if (request.eos()) { - for (auto pid : _parent->_partition_ids) { + for (auto pid : _parent->_tablet_finder->partition_ids()) { request.add_partition_ids(pid); } @@ -1031,14 +1032,16 @@ Status VOlapTableSink::init(const TDataSink& t_sink) { // if distributed column list is empty, we can ensure that tablet is with random distribution info // and if load_to_single_tablet is set and set to true, we should find only one tablet in one partition // for the whole olap table sink + auto find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW; if (table_sink.partition.distributed_columns.empty()) { if (table_sink.__isset.load_to_single_tablet && table_sink.load_to_single_tablet) { - findTabletMode = FindTabletMode::FIND_TABLET_EVERY_SINK; + find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK; } else { - findTabletMode = FindTabletMode::FIND_TABLET_EVERY_BATCH; + find_tablet_mode = OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH; } } _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, table_sink.partition)); + _tablet_finder = std::make_unique(_vpartition, find_tablet_mode); return _vpartition->init(); } @@ -1221,56 +1224,6 @@ size_t VOlapTableSink::get_pending_bytes() const { return mem_consumption; } -Status VOlapTableSink::find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, - const VOlapTablePartition** partition, uint32_t& tablet_index, - bool& stop_processing, bool& is_continue) { - Status status = Status::OK(); - *partition = nullptr; - tablet_index = 0; - BlockRow block_row; - block_row = {block, row_index}; - if (!_vpartition->find_partition(&block_row, partition)) { - RETURN_IF_ERROR(state->append_error_msg_to_file( - []() -> std::string { return ""; }, - [&]() -> std::string { - fmt::memory_buffer buf; - fmt::format_to(buf, "no partition for this tuple. tuple={}", - block->dump_data(row_index, 1)); - return fmt::to_string(buf); - }, - &stop_processing)); - _number_filtered_rows++; - if (stop_processing) { - return Status::EndOfFile("Encountered unqualified data, stop processing"); - } - is_continue = true; - return status; - } - if (!(*partition)->is_mutable) { - _number_immutable_partition_filtered_rows++; - is_continue = true; - return status; - } - if ((*partition)->num_buckets <= 0) { - std::stringstream ss; - ss << "num_buckets must be greater than 0, num_buckets=" << (*partition)->num_buckets; - return Status::InternalError(ss.str()); - } - _partition_ids.emplace((*partition)->id); - if (findTabletMode != FindTabletMode::FIND_TABLET_EVERY_ROW) { - if (_partition_to_tablet_map.find((*partition)->id) == _partition_to_tablet_map.end()) { - tablet_index = _vpartition->find_tablet(&block_row, **partition); - _partition_to_tablet_map.emplace((*partition)->id, tablet_index); - } else { - tablet_index = _partition_to_tablet_map[(*partition)->id]; - } - } else { - tablet_index = _vpartition->find_tablet(&block_row, **partition); - } - - return status; -} - void VOlapTableSink::_generate_row_distribution_payload( ChannelDistributionPayload& channel_to_payload, const VOlapTablePartition* partition, uint32_t tablet_index, int row_idx, size_t row_cnt) { @@ -1305,8 +1258,8 @@ Status VOlapTableSink::_single_partition_generate(RuntimeState* state, vectorize continue; } bool is_continue = false; - RETURN_IF_ERROR(find_tablet(state, block, i, &partition, tablet_index, stop_processing, - is_continue)); + RETURN_IF_ERROR(_tablet_finder->find_tablet(state, block, i, &partition, tablet_index, + stop_processing, is_continue)); if (is_continue) { continue; } @@ -1375,14 +1328,11 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, bool stop_processing = false; std::vector> channel_to_payload; channel_to_payload.resize(_channels.size()); - if (findTabletMode == FIND_TABLET_EVERY_BATCH) { - // Recaculate is needed - _partition_to_tablet_map.clear(); - } + _tablet_finder->clear_for_new_batch(); _row_distribution_watch.start(); auto num_rows = block->rows(); size_t partition_num = _vpartition->get_partitions().size(); - if (partition_num == 1 && findTabletMode == FindTabletMode::FIND_TABLET_EVERY_SINK) { + if (partition_num == 1 && _tablet_finder->is_find_tablet_every_sink()) { RETURN_IF_ERROR(_single_partition_generate(state, block.get(), channel_to_payload, num_rows, has_filtered_rows)); } else { @@ -1393,8 +1343,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, const VOlapTablePartition* partition = nullptr; bool is_continue = false; uint32_t tablet_index = 0; - RETURN_IF_ERROR(find_tablet(state, block.get(), i, &partition, tablet_index, - stop_processing, is_continue)); + RETURN_IF_ERROR(_tablet_finder->find_tablet( + state, block.get(), i, &partition, tablet_index, stop_processing, is_continue)); if (is_continue) { continue; } @@ -1411,7 +1361,7 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block, // Random distribution and the block belongs to a single tablet, we could optimize to append the whole // block into node channel. bool load_block_to_single_tablet = - !_schema->is_dynamic_schema() && _partition_to_tablet_map.size() == 1; + !_schema->is_dynamic_schema() && _tablet_finder->is_single_tablet(); if (load_block_to_single_tablet) { SCOPED_RAW_TIMER(&_filter_ns); // Filter block @@ -1606,8 +1556,8 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { COUNTER_SET(_input_rows_counter, _number_input_rows); COUNTER_SET(_output_rows_counter, _number_output_rows); - COUNTER_SET(_filtered_rows_counter, - _block_convertor->num_filtered_rows() + _number_filtered_rows); + COUNTER_SET(_filtered_rows_counter, _block_convertor->num_filtered_rows() + + _tablet_finder->num_filtered_rows()); COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); COUNTER_SET(_filter_timer, _filter_ns); @@ -1628,8 +1578,9 @@ Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { state->num_rows_load_unselected(); state->set_num_rows_load_total(num_rows_load_total); state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + - _number_filtered_rows); - state->update_num_rows_load_unselected(_number_immutable_partition_filtered_rows); + _tablet_finder->num_filtered_rows()); + state->update_num_rows_load_unselected( + _tablet_finder->num_immutable_partition_filtered_rows()); // print log of add batch time of all node, for tracing load performance easily std::stringstream ss; diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index 7551b35571..aeec8d1047 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -82,6 +82,7 @@ class RefCountClosure; namespace stream_load { class OlapTableBlockConvertor; +class OlapTabletFinder; class OpenPartitionClosure; // The counter of add_batch rpc of a single node @@ -517,10 +518,6 @@ private: ChannelDistributionPayload& channel_to_payload, size_t num_rows, bool has_filtered_rows); - Status find_tablet(RuntimeState* state, vectorized::Block* block, int row_index, - const VOlapTablePartition** partition, uint32_t& tablet_index, - bool& stop_processing, bool& is_continue); - void _open_partition(const VOlapTablePartition* partition); Status _cancel_channel_and_check_intolerable_failure(Status status, const std::string& err_msg, @@ -560,9 +557,7 @@ private: RuntimeProfile* _profile = nullptr; - std::set _partition_ids; - // only used for partition with random distribution - std::map _partition_to_tablet_map; + std::unique_ptr _tablet_finder; // index_channel std::vector> _channels; @@ -575,8 +570,6 @@ private: int64_t _send_data_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; - int64_t _number_filtered_rows = 0; - int64_t _number_immutable_partition_filtered_rows = 0; int64_t _filter_ns = 0; MonotonicStopWatch _row_distribution_watch; @@ -617,15 +610,6 @@ private: // User can change this config at runtime, avoid it being modified during query or loading process. bool _transfer_large_data_by_brpc = false; - // FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we - // should compute tablet index for every row - // FIND_TABLET_EVERY_BATCH is only used for random distribution info, which indicates that we should - // compute tablet index for every row batch - // FIND_TABLET_EVERY_SINK is only used for random distribution info, which indicates that we should - // only compute tablet index in the corresponding partition once for the whole time in olap table sink - enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK }; - FindTabletMode findTabletMode = FindTabletMode::FIND_TABLET_EVERY_ROW; - VOlapTablePartitionParam* _vpartition = nullptr; vectorized::VExprContextSPtrs _output_vexpr_ctxs;