From e3d2425d47264e885d55629c2b25b15c29f74f18 Mon Sep 17 00:00:00 2001 From: Pxl Date: Mon, 4 Dec 2023 11:03:22 +0800 Subject: [PATCH] [Improvement](join) remove insert_indices_from_join and special judge for -1 (#27779) remove insert_indices_from_join and special judge for -1 --- be/src/olap/delta_writer.cpp | 2 +- be/src/olap/delta_writer.h | 2 +- be/src/olap/delta_writer_v2.cpp | 2 +- be/src/olap/delta_writer_v2.h | 2 +- be/src/olap/memtable.cpp | 6 +- be/src/olap/memtable.h | 2 +- be/src/olap/memtable_writer.cpp | 2 +- be/src/olap/memtable_writer.h | 2 +- be/src/olap/tablet.cpp | 2 +- .../pipeline/exec/exchange_sink_operator.cpp | 4 +- .../exec/nested_loop_join_probe_operator.cpp | 2 +- .../local_exchange/local_exchanger.cpp | 2 +- .../local_exchange/local_exchanger.h | 2 +- be/src/runtime/tablets_channel.cpp | 7 +- be/src/vec/columns/column.h | 13 +--- be/src/vec/columns/column_array.cpp | 23 ++----- be/src/vec/columns/column_array.h | 7 +- be/src/vec/columns/column_complex.h | 25 +------ be/src/vec/columns/column_const.h | 9 +-- be/src/vec/columns/column_decimal.h | 16 +---- be/src/vec/columns/column_dictionary.h | 9 +-- be/src/vec/columns/column_dummy.h | 4 +- .../vec/columns/column_fixed_length_object.h | 35 ++-------- be/src/vec/columns/column_map.cpp | 23 ++----- be/src/vec/columns/column_map.h | 6 +- be/src/vec/columns/column_nothing.h | 5 -- be/src/vec/columns/column_nullable.cpp | 14 +--- be/src/vec/columns/column_nullable.h | 6 +- be/src/vec/columns/column_object.cpp | 17 +---- be/src/vec/columns/column_object.h | 7 +- be/src/vec/columns/column_string.cpp | 67 ++++--------------- be/src/vec/columns/column_string.h | 7 +- be/src/vec/columns/column_struct.cpp | 15 +---- be/src/vec/columns/column_struct.h | 7 +- be/src/vec/columns/column_vector.cpp | 27 +------- be/src/vec/columns/column_vector.h | 6 +- be/src/vec/columns/predicate_column.h | 9 +-- be/src/vec/core/block.cpp | 3 +- be/src/vec/core/block.h | 2 +- .../exec/join/process_hash_table_probe_impl.h | 8 +-- .../vec/exec/join/vnested_loop_join_node.cpp | 2 +- be/src/vec/exec/scan/pip_scanner_context.h | 8 +-- be/src/vec/sink/vdata_stream_sender.cpp | 8 +-- be/src/vec/sink/vdata_stream_sender.h | 10 +-- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +- 45 files changed, 106 insertions(+), 333 deletions(-) diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 9b80e4ec3e..310ffacad9 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -108,7 +108,7 @@ Status DeltaWriter::append(const vectorized::Block* block) { return write(block, {}, true); } -Status DeltaWriter::write(const vectorized::Block* block, const std::vector& row_idxs, +Status DeltaWriter::write(const vectorized::Block* block, const std::vector& row_idxs, bool is_append) { if (UNLIKELY(row_idxs.empty() && !is_append)) { return Status::OK(); diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index d4519c36a7..d7e351a168 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -67,7 +67,7 @@ public: Status init(); - Status write(const vectorized::Block* block, const std::vector& row_idxs, + Status write(const vectorized::Block* block, const std::vector& row_idxs, bool is_append = false); Status append(const vectorized::Block* block); diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp index 6f6dd939a4..fb5db340be 100644 --- a/be/src/olap/delta_writer_v2.cpp +++ b/be/src/olap/delta_writer_v2.cpp @@ -140,7 +140,7 @@ Status DeltaWriterV2::append(const vectorized::Block* block) { return write(block, {}, true); } -Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector& row_idxs, +Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector& row_idxs, bool is_append) { if (UNLIKELY(row_idxs.empty() && !is_append)) { return Status::OK(); diff --git a/be/src/olap/delta_writer_v2.h b/be/src/olap/delta_writer_v2.h index 8a102c5706..f0581bf56a 100644 --- a/be/src/olap/delta_writer_v2.h +++ b/be/src/olap/delta_writer_v2.h @@ -70,7 +70,7 @@ public: Status init(); - Status write(const vectorized::Block* block, const std::vector& row_idxs, + Status write(const vectorized::Block* block, const std::vector& row_idxs, bool is_append = false); Status append(const vectorized::Block* block); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 093c241573..fa7afe4ccd 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -166,7 +166,7 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r *_pblock, -1); } -void MemTable::insert(const vectorized::Block* input_block, const std::vector& row_idxs, +void MemTable::insert(const vectorized::Block* input_block, const std::vector& row_idxs, bool is_append) { SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get()); vectorized::Block target_block = *input_block; @@ -239,7 +239,7 @@ void MemTable::_aggregate_two_row_in_block(vectorized::MutableBlock& mutable_blo } void MemTable::_put_into_output(vectorized::Block& in_block) { SCOPED_RAW_TIMER(&_stat.put_into_output_ns); - std::vector row_pos_vec; + std::vector row_pos_vec; DCHECK(in_block.rows() <= std::numeric_limits::max()); row_pos_vec.reserve(in_block.rows()); for (int i = 0; i < _row_in_blocks.size(); i++) { @@ -330,7 +330,7 @@ void MemTable::_sort_by_cluster_keys() { in_block = mutable_block.to_block(); SCOPED_RAW_TIMER(&_stat.put_into_output_ns); - std::vector row_pos_vec; + std::vector row_pos_vec; DCHECK(in_block.rows() <= std::numeric_limits::max()); row_pos_vec.reserve(in_block.rows()); for (int i = 0; i < row_in_blocks.size(); i++) { diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 04ad022c82..9a171b8ad8 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -180,7 +180,7 @@ public: _flush_mem_tracker->consumption(); } // insert tuple from (row_pos) to (row_pos+num_rows) - void insert(const vectorized::Block* block, const std::vector& row_idxs, + void insert(const vectorized::Block* block, const std::vector& row_idxs, bool is_append = false); void shrink_memtable_by_agg(); diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp index a782901f44..0098ff6f8b 100644 --- a/be/src/olap/memtable_writer.cpp +++ b/be/src/olap/memtable_writer.cpp @@ -88,7 +88,7 @@ Status MemTableWriter::append(const vectorized::Block* block) { return write(block, {}, true); } -Status MemTableWriter::write(const vectorized::Block* block, const std::vector& row_idxs, +Status MemTableWriter::write(const vectorized::Block* block, const std::vector& row_idxs, bool is_append) { if (UNLIKELY(row_idxs.empty() && !is_append)) { return Status::OK(); diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index 1893e2cd6a..a6132aa887 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -71,7 +71,7 @@ public: std::shared_ptr partial_update_info, bool unique_key_mow = false); - Status write(const vectorized::Block* block, const std::vector& row_idxs, + Status write(const vectorized::Block* block, const std::vector& row_idxs, bool is_append = false); Status append(const vectorized::Block* block); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index bbd57cf1b5..21d390650a 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2912,7 +2912,7 @@ void Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& output_b << " r_pos: " << r->_row_pos; return value < 0; }); - std::vector row_pos_vec; + std::vector row_pos_vec; row_pos_vec.reserve(in_block.rows()); for (int i = 0; i < row_in_blocks.size(); i++) { row_pos_vec.emplace_back(row_in_blocks[i]->_row_pos); diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 8065ae3082..8143e71a4a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -473,9 +473,9 @@ Status ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch int num_channels, const HashValueType* __restrict channel_ids, int rows, vectorized::Block* block, bool eos) { - std::vector channel2rows[num_channels]; + std::vector channel2rows[num_channels]; - for (int i = 0; i < rows; i++) { + for (uint32_t i = 0; i < rows; i++) { channel2rows[channel_ids[i]].emplace_back(i); } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 371e010614..a3d24aa061 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -255,7 +255,7 @@ void NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB .data(); const auto num_rows = cur_block.rows(); - std::vector selector(num_rows); + std::vector selector(num_rows); size_t selector_idx = 0; for (size_t j = 0; j < num_rows; j++) { if constexpr (IsSemi) { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 13a3f23222..3487f3bcbf 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -82,7 +82,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest LocalExchangeSinkLocalState& local_state) { auto& data_queue = _data_queue; const auto rows = block->rows(); - auto row_idx = std::make_shared>(rows); + auto row_idx = std::make_shared>(rows); { local_state._partition_rows_histogram.assign(_num_instances + 1, 0); for (size_t i = 0; i < rows; ++i) { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index b7acff688f..6a9bebd7b4 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -49,7 +49,7 @@ class LocalExchangeSinkLocalState; class ShuffleExchanger final : public Exchanger { using PartitionedBlock = std::pair, - std::tuple>, size_t, size_t>>; + std::tuple>, size_t, size_t>>; public: ENABLE_FACTORY_CREATOR(ShuffleExchanger); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 1dc7cf5afa..5508bc3005 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -496,8 +496,9 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, return Status::OK(); } - std::unordered_map /* row index */> tablet_to_rowidxs; - for (int i = 0; i < request.tablet_ids_size(); ++i) { + std::unordered_map /* row index */> + tablet_to_rowidxs; + for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) { if (request.is_single_tablet_block()) { break; } @@ -509,7 +510,7 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, } auto it = tablet_to_rowidxs.find(tablet_id); if (it == tablet_to_rowidxs.end()) { - tablet_to_rowidxs.emplace(tablet_id, std::initializer_list {i}); + tablet_to_rowidxs.emplace(tablet_id, std::initializer_list {i}); } else { it->second.emplace_back(i); } diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 80c5803bff..b2cf72b016 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -240,17 +240,8 @@ public: /// Appends a batch elements from other column with the same type /// indices_begin + indices_end represent the row indices of column src - /// Warning: - /// if *indices == -1 means the row is null - virtual void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) = 0; - - /// Appends a batch elements from other column with the same type - /// indices_begin + indices_end represent the row indices of column src - /// Warning: - /// if *indices == 0 means the row is null, only use in outer join, do not use in any other place - virtual void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) = 0; + virtual void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) = 0; /// Appends data located in specified memory chunk if it is possible (throws an exception if it cannot be implemented). /// Is used to optimize some computations (in aggregation, for example). diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 98fb480dd1..c936c28b88 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -797,25 +797,10 @@ size_t ColumnArray::filter_nullable(const Filter& filter) { return result_size; } -void ColumnArray::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x == -1) { - ColumnArray::insert_default(); - } else { - ColumnArray::insert_from(src, *x); - } - } -} - -void ColumnArray::insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) { - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x == 0) { - ColumnArray::insert_default(); - } else { - ColumnArray::insert_from(src, *x); - } +void ColumnArray::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) { + for (const auto* x = indices_begin; x != indices_end; ++x) { + ColumnArray::insert_from(src, *x); } } diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 95fd463334..01ce3bcfe2 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -219,11 +219,8 @@ public: callback(data); } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override; void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 0) override { DCHECK(size() > self_row); diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h index fb89740d85..fc8268ec79 100644 --- a/be/src/vec/columns/column_complex.h +++ b/be/src/vec/columns/column_complex.h @@ -184,33 +184,14 @@ public: data.insert(data.end(), st, ed); } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { - const Self& src_vec = assert_cast(src); - auto new_size = indices_end - indices_begin; - - for (int i = 0; i < new_size; ++i) { - auto offset = *(indices_begin + i); - if (offset == -1) { - data.emplace_back(T {}); - } else { - data.emplace_back(src_vec.get_element(offset)); - } - } - } - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { const Self& src_vec = assert_cast(src); auto new_size = indices_end - indices_begin; for (uint32_t i = 0; i < new_size; ++i) { auto offset = *(indices_begin + i); - if (offset == 0) { - data.emplace_back(T {}); - } else { - data.emplace_back(src_vec.get_element(offset)); - } + data.emplace_back(src_vec.get_element(offset)); } } diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 280d2de834..8d03087cc3 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -111,13 +111,8 @@ public: s += length; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { - s += (indices_end - indices_begin); - } - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { s += (indices_end - indices_begin); } diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index b61753146f..dfdfbb0d6b 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -119,20 +119,8 @@ public: data.push_back(assert_cast(src).get_data()[n]); } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { - auto origin_size = size(); - auto new_size = indices_end - indices_begin; - data.resize(origin_size + new_size); - const T* src_data = reinterpret_cast(src.get_raw_data().data); - - for (int i = 0; i < new_size; ++i) { - data[origin_size + i] = src_data[indices_begin[i]]; - } - } - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { auto origin_size = size(); auto new_size = indices_end - indices_begin; data.resize(origin_size + new_size); diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h index d2374811e1..95238bf4f2 100644 --- a/be/src/vec/columns/column_dictionary.h +++ b/be/src/vec/columns/column_dictionary.h @@ -77,16 +77,11 @@ public: LOG(FATAL) << "insert_range_from not supported in ColumnDictionary"; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { LOG(FATAL) << "insert_indices_from not supported in ColumnDictionary"; } - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override { - LOG(FATAL) << "insert_indices_from_join not supported in ColumnDictionary"; - } - void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported in ColumnDictionary"; } void update_hash_with_value(size_t n, SipHash& hash) const override { diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h index 790c135889..a2d76ade56 100644 --- a/be/src/vec/columns/column_dummy.h +++ b/be/src/vec/columns/column_dummy.h @@ -80,8 +80,8 @@ public: s += length; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { s += (indices_end - indices_begin); } diff --git a/be/src/vec/columns/column_fixed_length_object.h b/be/src/vec/columns/column_fixed_length_object.h index e6b1db8bf5..a817a4ae05 100644 --- a/be/src/vec/columns/column_fixed_length_object.h +++ b/be/src/vec/columns/column_fixed_length_object.h @@ -81,30 +81,8 @@ public: return res; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { - const Self& src_vec = assert_cast(src); - auto origin_size = size(); - auto new_size = indices_end - indices_begin; - if (_item_size == 0) { - _item_size = src_vec._item_size; - } - DCHECK(_item_size == src_vec._item_size) << "dst and src should have the same _item_size"; - resize(origin_size + new_size); - - for (int i = 0; i < new_size; ++i) { - int offset = indices_begin[i]; - if (offset > -1) { - memcpy(&_data[(origin_size + i) * _item_size], &src_vec._data[offset * _item_size], - _item_size); - } else { - memset(&_data[(origin_size + i) * _item_size], 0, _item_size); - } - } - } - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { const Self& src_vec = assert_cast(src); auto origin_size = size(); auto new_size = indices_end - indices_begin; @@ -115,13 +93,8 @@ public: resize(origin_size + new_size); for (uint32_t i = 0; i < new_size; ++i) { - auto offset = indices_begin[i]; - if (offset) { - memcpy(&_data[(origin_size + i) * _item_size], &src_vec._data[offset * _item_size], - _item_size); - } else { - memset(&_data[(origin_size + i) * _item_size], 0, _item_size); - } + memcpy(&_data[(origin_size + i) * _item_size], + &src_vec._data[indices_begin[i] * _item_size], _item_size); } } diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index 82e8c0a911..d4b64f8c16 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -185,25 +185,10 @@ void ColumnMap::insert_from(const IColumn& src_, size_t n) { get_offsets().push_back(get_offsets().back() + size); } -void ColumnMap::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x == -1) { - ColumnMap::insert_default(); - } else { - ColumnMap::insert_from(src, *x); - } - } -} - -void ColumnMap::insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) { - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x == 0) { - ColumnMap::insert_default(); - } else { - ColumnMap::insert_from(src, *x); - } +void ColumnMap::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) { + for (const auto* x = indices_begin; x != indices_end; ++x) { + ColumnMap::insert_from(src, *x); } } diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 1cb3dd0c73..752de2e10c 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -127,11 +127,9 @@ public: Permutation& res) const override { LOG(FATAL) << "get_permutation not implemented"; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override; void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector) const override { diff --git a/be/src/vec/columns/column_nothing.h b/be/src/vec/columns/column_nothing.h index 8874bb6e7a..8a10eec8b6 100644 --- a/be/src/vec/columns/column_nothing.h +++ b/be/src/vec/columns/column_nothing.h @@ -39,11 +39,6 @@ public: bool structure_equals(const IColumn& rhs) const override { return typeid(rhs) == typeid(ColumnNothing); } - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override { - LOG(FATAL) << "insert_indices_from_join not supported in ColumnNothing"; - } }; } // namespace doris::vectorized diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index 3553e9823d..ecf330bead 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -294,8 +294,8 @@ void ColumnNullable::insert_range_from(const IColumn& src, size_t start, size_t _has_null |= simd::contain_byte(src_null_map_data.data() + start, length, 1); } -void ColumnNullable::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { +void ColumnNullable::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) { const auto& src_concrete = assert_cast(src); get_nested_column().insert_indices_from(src_concrete.get_nested_column(), indices_begin, indices_end); @@ -304,16 +304,6 @@ void ColumnNullable::insert_indices_from(const IColumn& src, const int* indices_ _need_update_has_null = true; } -void ColumnNullable::insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) { - const auto& src_concrete = assert_cast(src); - get_nested_column().insert_indices_from_join(src_concrete.get_nested_column(), indices_begin, - indices_end); - _get_null_map_column().insert_indices_from_join(src_concrete.get_null_map_column(), - indices_begin, indices_end); - _need_update_has_null = true; -} - void ColumnNullable::insert(const Field& x) { if (x.is_null()) { get_nested_column().insert_default(); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 365400a669..10b0951ab8 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -121,10 +121,8 @@ public: void deserialize_vec(std::vector& keys, size_t num_rows) override; void insert_range_from(const IColumn& src, size_t start, size_t length) override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override; void insert(const Field& x) override; void insert_from(const IColumn& src, size_t n) override; diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index 1dd4f7c74c..78ff31a55d 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -1422,20 +1422,9 @@ void ColumnObject::append_data_by_selector(MutableColumnPtr& res, return append_data_by_selector_impl(res, selector); } -void ColumnObject::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x == -1) { - ColumnObject::insert_default(); - } else { - ColumnObject::insert_from(src, *x); - } - } -} - -void ColumnObject::insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) { - for (auto x = indices_begin; x != indices_end; ++x) { +void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) { + for (const auto* x = indices_begin; x != indices_end; ++x) { ColumnObject::insert_from(src, *x); } } diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 7e8be7e6d8..8efec7ad12 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -369,11 +369,8 @@ public: void append_data_by_selector(MutableColumnPtr& res, const IColumn::Selector& selector) const override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override; // May throw execption void try_insert(const Field& field); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 2d009e2a08..424a8717e1 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -124,10 +124,10 @@ void ColumnString::insert_range_from(const IColumn& src, size_t start, size_t le } } -void ColumnString::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { - const ColumnString& src_str = assert_cast(src); - auto src_offset_data = src_str.offsets.data(); +void ColumnString::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) { + const auto& src_str = assert_cast(src); + const auto* src_offset_data = src_str.offsets.data(); auto old_char_size = chars.size(); size_t total_chars_size = old_char_size; @@ -136,65 +136,24 @@ void ColumnString::insert_indices_from(const IColumn& src, const int* indices_be offsets.resize(offsets.size() + indices_end - indices_begin); auto* dst_offsets_data = offsets.data(); - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x != -1) { - total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1]; - } + for (const auto* x = indices_begin; x != indices_end; ++x) { + total_chars_size += src_offset_data[*x] - src_offset_data[int(*x) - 1]; dst_offsets_data[dst_offsets_pos++] = total_chars_size; } check_chars_length(total_chars_size, offsets.size()); chars.resize(total_chars_size); - auto* src_data_ptr = src_str.chars.data(); + const auto* src_data_ptr = src_str.chars.data(); auto* dst_data_ptr = chars.data(); size_t dst_chars_pos = old_char_size; - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x != -1) { - const size_t size_to_append = src_offset_data[*x] - src_offset_data[*x - 1]; - const size_t offset = src_offset_data[*x - 1]; - memcpy_small_allow_read_write_overflow15(dst_data_ptr + dst_chars_pos, - src_data_ptr + offset, size_to_append); - dst_chars_pos += size_to_append; - } - } -} - -void ColumnString::insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) { - const ColumnString& src_str = assert_cast(src); - auto src_offset_data = src_str.offsets.data(); - - auto old_char_size = chars.size(); - size_t total_chars_size = old_char_size; - - auto dst_offsets_pos = offsets.size(); - offsets.resize(offsets.size() + indices_end - indices_begin); - auto* dst_offsets_data = offsets.data(); - - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x != 0) { - total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1]; - } - dst_offsets_data[dst_offsets_pos++] = total_chars_size; - } - check_chars_length(total_chars_size, offsets.size()); - - chars.resize(total_chars_size); - - auto* src_data_ptr = src_str.chars.data(); - auto* dst_data_ptr = chars.data(); - - size_t dst_chars_pos = old_char_size; - for (auto x = indices_begin; x != indices_end; ++x) { - if (*x != 0) { - const size_t size_to_append = src_offset_data[*x] - src_offset_data[*x - 1]; - const size_t offset = src_offset_data[*x - 1]; - memcpy_small_allow_read_write_overflow15(dst_data_ptr + dst_chars_pos, - src_data_ptr + offset, size_to_append); - dst_chars_pos += size_to_append; - } + for (const auto* x = indices_begin; x != indices_end; ++x) { + const size_t size_to_append = src_offset_data[*x] - src_offset_data[int(*x) - 1]; + const size_t offset = src_offset_data[int(*x) - 1]; + memcpy_small_allow_read_write_overflow15(dst_data_ptr + dst_chars_pos, + src_data_ptr + offset, size_to_append); + dst_chars_pos += size_to_append; } } diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 191c6a95cf..e6b27f2005 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -484,11 +484,8 @@ public: void insert_range_from(const IColumn& src, size_t start, size_t length) override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override; ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; size_t filter(const Filter& filter) override; diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 3502fdf581..5a89b5d754 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -225,23 +225,14 @@ void ColumnStruct::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTy } } -void ColumnStruct::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { - const ColumnStruct& src_concrete = assert_cast(src); +void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) { + const auto& src_concrete = assert_cast(src); for (size_t i = 0; i < columns.size(); ++i) { columns[i]->insert_indices_from(src_concrete.get_column(i), indices_begin, indices_end); } } -void ColumnStruct::insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) { - const ColumnStruct& src_concrete = assert_cast(src); - for (size_t i = 0; i < columns.size(); ++i) { - columns[i]->insert_indices_from_join(src_concrete.get_column(i), indices_begin, - indices_end); - } -} - void ColumnStruct::insert_range_from(const IColumn& src, size_t start, size_t length) { const size_t tuple_size = columns.size(); for (size_t i = 0; i < tuple_size; ++i) { diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index 499fb8444f..e2da7fd644 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -121,11 +121,8 @@ public: uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; - - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override; void get_permutation(bool reverse, size_t limit, int nan_direction_hint, Permutation& res) const override { diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index a825e07d5f..65b1b6308e 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -366,31 +366,8 @@ void ColumnVector::insert_range_from(const IColumn& src, size_t start, size_t } template -void ColumnVector::insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) { - auto origin_size = size(); - auto new_size = indices_end - indices_begin; - data.resize(origin_size + new_size); - - const T* src_data = reinterpret_cast(src.get_raw_data().data); - - if constexpr (std::is_same_v) { - // nullmap : indices_begin[i] == -1 means is null at the here, set true here - for (int i = 0; i < new_size; ++i) { - data[origin_size + i] = (indices_begin[i] == -1) + - (indices_begin[i] != -1) * src_data[indices_begin[i]]; - } - } else { - // real data : indices_begin[i] == -1 what at is meaningless - for (int i = 0; i < new_size; ++i) { - data[origin_size + i] = src_data[indices_begin[i]]; - } - } -} - -template -void ColumnVector::insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) { +void ColumnVector::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) { auto origin_size = size(); auto new_size = indices_end - indices_begin; data.resize(origin_size + new_size); diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index cb1edddb52..5319902758 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -386,11 +386,9 @@ public: void insert_range_from(const IColumn& src, size_t start, size_t length) override; - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override; - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override; void fill(const value_type& element, size_t num) { auto old_size = data.size(); auto new_size = old_size + num; diff --git a/be/src/vec/columns/predicate_column.h b/be/src/vec/columns/predicate_column.h index 79f445b08d..c42f0a3322 100644 --- a/be/src/vec/columns/predicate_column.h +++ b/be/src/vec/columns/predicate_column.h @@ -126,16 +126,11 @@ public: LOG(FATAL) << "insert_range_from not supported in PredicateColumnType"; } - void insert_indices_from(const IColumn& src, const int* indices_begin, - const int* indices_end) override { + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, + const uint32_t* indices_end) override { LOG(FATAL) << "insert_indices_from not supported in PredicateColumnType"; } - void insert_indices_from_join(const IColumn& src, const uint32_t* indices_begin, - const uint32_t* indices_end) override { - LOG(FATAL) << "insert_indices_from_join not supported in PredicateColumnType"; - } - void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported in PredicateColumnType"; } diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 723dc3ac63..195134d029 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -950,7 +950,8 @@ void MutableBlock::add_row(const Block* block, int row) { } } -void MutableBlock::add_rows(const Block* block, const int* row_begin, const int* row_end) { +void MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, + const uint32_t* row_end) { DCHECK_LE(columns(), block->columns()); const auto& block_data = block->get_columns_with_type_and_name(); for (size_t i = 0; i < _columns.size(); ++i) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 6c7fa80cb8..ec2cf249b2 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -563,7 +563,7 @@ public: void swap(MutableBlock&& other) noexcept; void add_row(const Block* block, int row); - void add_rows(const Block* block, const int* row_begin, const int* row_end); + void add_rows(const Block* block, const uint32_t* row_begin, const uint32_t* row_end); void add_rows(const Block* block, size_t row_begin, size_t length); void add_rows(const Block* block, std::vector rows); diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 38f8b3a558..c8f2ae0e55 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -76,8 +76,8 @@ void ProcessHashTableProbe::build_side_output_column( for (int i = 0; i < _right_col_len; i++) { const auto& column = *_build_block->safe_get_by_position(i).column; if (output_slot_flags[i]) { - mcol[i + _right_col_idx]->insert_indices_from_join(column, _build_indexs.data(), - _build_indexs.data() + size); + mcol[i + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), + _build_indexs.data() + size); } else { mcol[i + _right_col_idx]->insert_many_defaults(size); } @@ -365,8 +365,8 @@ Status ProcessHashTableProbe::process_data_in_hashtable( } for (size_t j = 0; j < _right_col_len; ++j) { const auto& column = *_build_block->safe_get_by_position(j).column; - mcol[j + _right_col_idx]->insert_indices_from_join(column, _build_indexs.data(), - _build_indexs.data() + block_size); + mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(), + _build_indexs.data() + block_size); } // just resize the left table column in case with other conjunct to make block size is not zero diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index a5305a4b53..f321463985 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -409,7 +409,7 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s .data(); const auto num_rows = cur_block.rows(); - std::vector selector(num_rows); + std::vector selector(num_rows); size_t selector_idx = 0; for (size_t j = 0; j < num_rows; j++) { if constexpr (IsSemi) { diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 8e4ab5c22b..681fc09739 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -126,8 +126,8 @@ public: hashes[i] = hashes[i] % element_size; } - std::vector channel2rows[element_size]; - for (int i = 0; i < rows; i++) { + std::vector channel2rows[element_size]; + for (uint32_t i = 0; i < rows; i++) { channel2rows[hashes[i]].emplace_back(i); } @@ -234,10 +234,10 @@ private: std::vector> _colocate_block_mutexs; void _add_rows_colocate_blocks(vectorized::Block* block, int loc, - const std::vector& rows) { + const std::vector& rows) { int row_wait_add = rows.size(); const int batch_size = _batch_size; - const int* begin = &rows[0]; + const uint32_t* begin = rows.data(); std::lock_guard l(*_colocate_block_mutexs[loc]); while (row_wait_add > 0) { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 5bfec60d1f..4d55e0fcb0 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -222,7 +222,7 @@ Status Channel::send_remote_block(PBlock* block, bool eos, Status exec_s } template -Status Channel::add_rows(Block* block, const std::vector& rows, bool eos) { +Status Channel::add_rows(Block* block, const std::vector& rows, bool eos) { if (_fragment_instance_id.lo == -1) { return Status::OK(); } @@ -713,7 +713,7 @@ BlockSerializer::BlockSerializer(Parent* parent, bool is_local) template Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int num_receivers, bool* serialized, bool eos, - const std::vector* rows) { + const std::vector* rows) { if (_mutable_block == nullptr) { SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); _mutable_block = MutableBlock::create_unique(block->clone_empty()); @@ -722,9 +722,9 @@ Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest { SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker()); if (rows) { - if (rows->size() > 0) { + if (!rows->empty()) { SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer()); - const int* begin = &(*rows)[0]; + const auto* begin = rows->data(); _mutable_block->add_rows(block, begin, begin + rows->size()); } } else if (!block->empty()) { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index d25385d6b2..75a3bfd86a 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -78,7 +78,7 @@ class BlockSerializer { public: BlockSerializer(Parent* parent, bool is_local = true); Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized, - bool eos, const std::vector* rows = nullptr); + bool eos, const std::vector* rows = nullptr); Status serialize_block(PBlock* dest, int num_receivers = 1); Status serialize_block(const Block* src, PBlock* dest, int num_receivers = 1); @@ -277,7 +277,7 @@ public: return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); } - virtual Status add_rows(Block* block, const std::vector& row, bool eos); + virtual Status add_rows(Block* block, const std::vector& row, bool eos); virtual Status send_current_block(bool eos, Status exec_status); @@ -412,9 +412,9 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channe int num_channels, const HashValueType* __restrict channel_ids, int rows, Block* block, bool eos) { - std::vector channel2rows[num_channels]; + std::vector channel2rows[num_channels]; - for (int i = 0; i < rows; i++) { + for (uint32_t i = 0; i < rows; i++) { channel2rows[channel_ids[i]].emplace_back(i); } @@ -503,7 +503,7 @@ public: return Status::OK(); } - Status add_rows(Block* block, const std::vector& rows, bool eos) override { + Status add_rows(Block* block, const std::vector& rows, bool eos) override { if (Channel::_fragment_instance_id.lo == -1) { return Status::OK(); } diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 69918b37a6..e2b069db3b 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -93,7 +93,7 @@ using Streams = std::vector>; struct Rows { int64_t partition_id; int64_t index_id; - std::vector row_idxes; + std::vector row_idxes; }; using RowsForTablet = std::unordered_map;