[Enchancement](optimize) set result_size_hint to filter_block (#11972)
This commit is contained in:
@ -297,7 +297,7 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t result_si
|
||||
auto res = this->create();
|
||||
Container& res_data = res->get_data();
|
||||
|
||||
if (result_size_hint) res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
|
||||
res_data.reserve(result_size_hint > 0 ? result_size_hint : size);
|
||||
|
||||
const UInt8* filt_pos = filt.data();
|
||||
const UInt8* filt_end = filt_pos + size;
|
||||
@ -319,7 +319,7 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t result_si
|
||||
} else {
|
||||
while (mask) {
|
||||
const size_t idx = __builtin_ctzll(mask);
|
||||
res_data.push_back(data_pos[idx]);
|
||||
res_data.push_back_without_reserve(data_pos[idx]);
|
||||
mask = mask & (mask - 1);
|
||||
}
|
||||
}
|
||||
@ -329,7 +329,9 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t result_si
|
||||
}
|
||||
|
||||
while (filt_pos < filt_end) {
|
||||
if (*filt_pos) res_data.push_back(*data_pos);
|
||||
if (*filt_pos) {
|
||||
res_data.push_back_without_reserve(*data_pos);
|
||||
}
|
||||
|
||||
++filt_pos;
|
||||
++data_pos;
|
||||
|
||||
@ -29,7 +29,6 @@
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_vector.h"
|
||||
#include "vec/columns/columns_common.h"
|
||||
#include "vec/common/typeid_cast.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
@ -48,7 +47,7 @@ size_t count_bytes_in_filter(const IColumn::Filter& filt) {
|
||||
const __m128i zero16 = _mm_setzero_si128();
|
||||
const Int8* end64 = pos + filt.size() / 64 * 64;
|
||||
|
||||
for (; pos < end64; pos += 64)
|
||||
for (; pos < end64; pos += 64) {
|
||||
count += __builtin_popcountll(
|
||||
static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
|
||||
_mm_loadu_si128(reinterpret_cast<const __m128i*>(pos)), zero16))) |
|
||||
@ -61,8 +60,9 @@ size_t count_bytes_in_filter(const IColumn::Filter& filt) {
|
||||
(static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
|
||||
_mm_loadu_si128(reinterpret_cast<const __m128i*>(pos + 48)), zero16)))
|
||||
<< 48));
|
||||
}
|
||||
|
||||
/// TODO Add duff device for tail?
|
||||
/// TODO Add duff device for tail?
|
||||
#endif
|
||||
|
||||
for (; pos < end; ++pos) {
|
||||
@ -75,13 +75,17 @@ size_t count_bytes_in_filter(const IColumn::Filter& filt) {
|
||||
std::vector<size_t> count_columns_size_in_selector(IColumn::ColumnIndex num_columns,
|
||||
const IColumn::Selector& selector) {
|
||||
std::vector<size_t> counts(num_columns);
|
||||
for (auto idx : selector) ++counts[idx];
|
||||
for (auto idx : selector) {
|
||||
++counts[idx];
|
||||
}
|
||||
|
||||
return counts;
|
||||
}
|
||||
|
||||
bool memory_is_byte(const void* data, size_t size, uint8_t byte) {
|
||||
if (size == 0) return true;
|
||||
if (size == 0) {
|
||||
return true;
|
||||
}
|
||||
auto ptr = reinterpret_cast<const uint8_t*>(data);
|
||||
return *ptr == byte && memcmp(ptr, ptr + 1, size - 1) == 0;
|
||||
}
|
||||
@ -104,16 +108,16 @@ struct ResultOffsetsBuilder {
|
||||
res_offsets.reserve(result_size_hint > 0 ? result_size_hint : src_size);
|
||||
}
|
||||
|
||||
void insertOne(size_t array_size) {
|
||||
void insert_one(size_t array_size) {
|
||||
current_src_offset += array_size;
|
||||
res_offsets.push_back(current_src_offset);
|
||||
res_offsets.push_back_without_reserve(current_src_offset);
|
||||
}
|
||||
|
||||
template <size_t SIMD_BYTES>
|
||||
void insertChunk(const IColumn::Offset* src_offsets_pos, bool first,
|
||||
IColumn::Offset chunk_offset, size_t chunk_size) {
|
||||
void insert_chunk(const IColumn::Offset* src_offsets_pos, bool first,
|
||||
IColumn::Offset chunk_offset, size_t chunk_size) {
|
||||
const auto offsets_size_old = res_offsets.size();
|
||||
res_offsets.resize(offsets_size_old + SIMD_BYTES);
|
||||
res_offsets.resize_assume_reserved(offsets_size_old + SIMD_BYTES);
|
||||
memcpy(&res_offsets[offsets_size_old], src_offsets_pos,
|
||||
SIMD_BYTES * sizeof(IColumn::Offset));
|
||||
|
||||
@ -125,7 +129,9 @@ struct ResultOffsetsBuilder {
|
||||
const auto res_offsets_pos = &res_offsets[offsets_size_old];
|
||||
|
||||
/// adjust offsets
|
||||
for (size_t i = 0; i < SIMD_BYTES; ++i) res_offsets_pos[i] -= diff_offset;
|
||||
for (size_t i = 0; i < SIMD_BYTES; ++i) {
|
||||
res_offsets_pos[i] -= diff_offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
current_src_offset += chunk_size;
|
||||
@ -135,10 +141,10 @@ struct ResultOffsetsBuilder {
|
||||
struct NoResultOffsetsBuilder {
|
||||
explicit NoResultOffsetsBuilder(IColumn::Offsets*) {}
|
||||
void reserve(ssize_t, size_t) {}
|
||||
void insertOne(size_t) {}
|
||||
void insert_one(size_t) {}
|
||||
|
||||
template <size_t SIMD_BYTES>
|
||||
void insertChunk(const IColumn::Offset*, bool, IColumn::Offset, size_t) {}
|
||||
void insert_chunk(const IColumn::Offset*, bool, IColumn::Offset, size_t) {}
|
||||
};
|
||||
|
||||
template <typename T, typename ResultOffsetsBuilder>
|
||||
@ -151,15 +157,15 @@ void filter_arrays_impl_generic(const PaddedPODArray<T>& src_elems,
|
||||
LOG(FATAL) << "Size of filter doesn't match size of column.";
|
||||
}
|
||||
|
||||
constexpr int ASSUME_STRING_LENGTH = 5;
|
||||
ResultOffsetsBuilder result_offsets_builder(res_offsets);
|
||||
|
||||
if (result_size_hint) {
|
||||
result_offsets_builder.reserve(result_size_hint, size);
|
||||
result_offsets_builder.reserve(result_size_hint, size);
|
||||
|
||||
if (result_size_hint < 0)
|
||||
res_elems.reserve(src_elems.size());
|
||||
else if (result_size_hint < 1000000000 && src_elems.size() < 1000000000) /// Avoid overflow.
|
||||
res_elems.reserve((result_size_hint * src_elems.size() + size - 1) / size);
|
||||
if (result_size_hint < 0) {
|
||||
res_elems.reserve(src_elems.size() * ASSUME_STRING_LENGTH);
|
||||
} else if (result_size_hint < 1000000000 && src_elems.size() < 1000000000) { /// Avoid overflow.
|
||||
res_elems.reserve(result_size_hint * ASSUME_STRING_LENGTH);
|
||||
}
|
||||
|
||||
const UInt8* filt_pos = filt.data();
|
||||
@ -173,7 +179,7 @@ void filter_arrays_impl_generic(const PaddedPODArray<T>& src_elems,
|
||||
const auto arr_offset = offset_ptr == offsets_begin ? 0 : offset_ptr[-1];
|
||||
const auto arr_size = *offset_ptr - arr_offset;
|
||||
|
||||
result_offsets_builder.insertOne(arr_size);
|
||||
result_offsets_builder.insert_one(arr_size);
|
||||
|
||||
const auto elems_size_old = res_elems.size();
|
||||
res_elems.resize(elems_size_old + arr_size);
|
||||
@ -193,8 +199,8 @@ void filter_arrays_impl_generic(const PaddedPODArray<T>& src_elems,
|
||||
const auto chunk_offset = first ? 0 : offsets_pos[-1];
|
||||
const auto chunk_size = offsets_pos[SIMD_BYTES - 1] - chunk_offset;
|
||||
|
||||
result_offsets_builder.template insertChunk<SIMD_BYTES>(offsets_pos, first,
|
||||
chunk_offset, chunk_size);
|
||||
result_offsets_builder.template insert_chunk<SIMD_BYTES>(offsets_pos, first,
|
||||
chunk_offset, chunk_size);
|
||||
|
||||
/// copy elements for SIMD_BYTES arrays at once
|
||||
const auto elems_size_old = res_elems.size();
|
||||
@ -213,7 +219,9 @@ void filter_arrays_impl_generic(const PaddedPODArray<T>& src_elems,
|
||||
}
|
||||
|
||||
while (filt_pos < filt_end) {
|
||||
if (*filt_pos) copy_array(offsets_pos);
|
||||
if (*filt_pos) {
|
||||
copy_array(offsets_pos);
|
||||
}
|
||||
|
||||
++filt_pos;
|
||||
++offsets_pos;
|
||||
@ -259,19 +267,4 @@ INSTANTIATE(Float64)
|
||||
|
||||
#undef INSTANTIATE
|
||||
|
||||
namespace detail {
|
||||
template <typename T>
|
||||
const PaddedPODArray<T>* get_indexes_data(const IColumn& indexes) {
|
||||
auto* column = typeid_cast<const ColumnVector<T>*>(&indexes);
|
||||
if (column) return &column->get_data();
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
template const PaddedPODArray<UInt8>* get_indexes_data<UInt8>(const IColumn& indexes);
|
||||
template const PaddedPODArray<UInt16>* get_indexes_data<UInt16>(const IColumn& indexes);
|
||||
template const PaddedPODArray<UInt32>* get_indexes_data<UInt32>(const IColumn& indexes);
|
||||
template const PaddedPODArray<UInt64>* get_indexes_data<UInt64>(const IColumn& indexes);
|
||||
} // namespace detail
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -50,42 +50,4 @@ void filter_arrays_impl_only_data(const PaddedPODArray<T>& src_elems,
|
||||
const IColumn::Offsets& src_offsets, PaddedPODArray<T>& res_elems,
|
||||
const IColumn::Filter& filt, ssize_t result_size_hint);
|
||||
|
||||
namespace detail {
|
||||
template <typename T>
|
||||
const PaddedPODArray<T>* get_indexes_data(const IColumn& indexes);
|
||||
}
|
||||
|
||||
/// Check limit <= indexes->size() and call column.index_impl(const PaddedPodArray<Type> & indexes, UInt64 limit).
|
||||
template <typename Column>
|
||||
ColumnPtr select_index_impl(const Column& column, const IColumn& indexes, size_t limit) {
|
||||
if (limit == 0) limit = indexes.size();
|
||||
|
||||
if (indexes.size() < limit) {
|
||||
LOG(FATAL) << "Size of indexes is less than required.";
|
||||
}
|
||||
|
||||
if (auto* data_uint8 = detail::get_indexes_data<UInt8>(indexes))
|
||||
return column.template index_impl<UInt8>(*data_uint8, limit);
|
||||
else if (auto* data_uint16 = detail::get_indexes_data<UInt16>(indexes))
|
||||
return column.template index_impl<UInt16>(*data_uint16, limit);
|
||||
else if (auto* data_uint32 = detail::get_indexes_data<UInt32>(indexes))
|
||||
return column.template index_impl<UInt32>(*data_uint32, limit);
|
||||
else if (auto* data_uint64 = detail::get_indexes_data<UInt64>(indexes))
|
||||
return column.template index_impl<UInt64>(*data_uint64, limit);
|
||||
else {
|
||||
LOG(FATAL) << "Indexes column for IColumn::select must be ColumnUInt, got"
|
||||
<< indexes.get_name();
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
#define INSTANTIATE_INDEX_IMPL(Column) \
|
||||
template ColumnPtr Column::indexImpl<UInt8>(const PaddedPODArray<UInt8>& indexes, \
|
||||
size_t limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt16>(const PaddedPODArray<UInt16>& indexes, \
|
||||
size_t limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt32>(const PaddedPODArray<UInt32>& indexes, \
|
||||
size_t limit) const; \
|
||||
template ColumnPtr Column::indexImpl<UInt64>(const PaddedPODArray<UInt64>& indexes, \
|
||||
size_t limit) const;
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -23,11 +23,6 @@
|
||||
#include <fmt/format.h>
|
||||
#include <snappy.h>
|
||||
|
||||
#include <cstring>
|
||||
#include <iomanip>
|
||||
#include <iterator>
|
||||
#include <memory>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "runtime/descriptors.h"
|
||||
#include "runtime/row_batch.h"
|
||||
@ -35,26 +30,17 @@
|
||||
#include "runtime/tuple_row.h"
|
||||
#include "udf/udf.h"
|
||||
#include "util/block_compression.h"
|
||||
#include "util/simd/bits.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/columns/column_const.h"
|
||||
#include "vec/columns/column_nullable.h"
|
||||
#include "vec/columns/column_string.h"
|
||||
#include "vec/columns/column_vector.h"
|
||||
#include "vec/columns/columns_common.h"
|
||||
#include "vec/columns/columns_number.h"
|
||||
#include "vec/common/assert_cast.h"
|
||||
#include "vec/common/exception.h"
|
||||
#include "vec/common/string_ref.h"
|
||||
#include "vec/common/typeid_cast.h"
|
||||
#include "vec/data_types/data_type_bitmap.h"
|
||||
#include "vec/data_types/data_type_date.h"
|
||||
#include "vec/data_types/data_type_date_time.h"
|
||||
#include "vec/data_types/data_type_decimal.h"
|
||||
#include "vec/data_types/data_type_factory.hpp"
|
||||
#include "vec/data_types/data_type_hll.h"
|
||||
#include "vec/data_types/data_type_nullable.h"
|
||||
#include "vec/data_types/data_type_number.h"
|
||||
#include "vec/data_types/data_type_string.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
@ -628,8 +614,9 @@ void Block::update_hash(SipHash& hash) const {
|
||||
}
|
||||
}
|
||||
|
||||
void filter_block_internal(Block* block, const IColumn::Filter& filter, uint32_t column_to_keep) {
|
||||
auto count = count_bytes_in_filter(filter);
|
||||
void Block::filter_block_internal(Block* block, const IColumn::Filter& filter,
|
||||
uint32_t column_to_keep) {
|
||||
size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
|
||||
if (count == 0) {
|
||||
for (size_t i = 0; i < column_to_keep; ++i) {
|
||||
std::move(*block->get_by_position(i).column).assume_mutable()->clear();
|
||||
@ -638,7 +625,7 @@ void filter_block_internal(Block* block, const IColumn::Filter& filter, uint32_t
|
||||
if (count != block->rows()) {
|
||||
for (size_t i = 0; i < column_to_keep; ++i) {
|
||||
block->get_by_position(i).column =
|
||||
block->get_by_position(i).column->filter(filter, 0);
|
||||
block->get_by_position(i).column->filter(filter, count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -257,6 +257,9 @@ public:
|
||||
// copy a new block by the offset column
|
||||
Block copy_block(const std::vector<int>& column_offset) const;
|
||||
|
||||
static void filter_block_internal(Block* block, const IColumn::Filter& filter,
|
||||
uint32_t column_to_keep);
|
||||
|
||||
static Status filter_block(Block* block, int filter_column_id, int column_to_keep);
|
||||
|
||||
static void erase_useless_column(Block* block, int column_to_keep) {
|
||||
|
||||
@ -501,5 +501,13 @@ TEST(BlockTest, dump_data) {
|
||||
fill_block_with_array_string(block1);
|
||||
// Note: here we should set 'row_num' in dump_data
|
||||
EXPECT_GT(block1.dump_data(10).size(), 1);
|
||||
|
||||
vectorized::IColumn::Filter filter;
|
||||
int size = block1.rows() / 2;
|
||||
for (int i = 0; i < block1.rows(); i++) {
|
||||
filter.push_back(i % 2);
|
||||
}
|
||||
vectorized::Block::filter_block_internal(&block1, filter, block1.columns());
|
||||
EXPECT_EQ(size, block1.rows());
|
||||
}
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user