[improvement](predicate) Cache the dict code in ComparisonPredicate (#17684)
This commit is contained in:
@ -192,6 +192,11 @@ public:
|
||||
", opposite=" + (_opposite ? "true" : "false");
|
||||
}
|
||||
|
||||
/// Some predicates need to be cloned for each segment.
|
||||
virtual bool need_to_clone() const { return false; }
|
||||
|
||||
virtual void clone(ColumnPredicate** to) const { LOG(FATAL) << "clone not supported"; }
|
||||
|
||||
std::shared_ptr<PredicateParams> predicate_params() { return _predicate_params; }
|
||||
|
||||
const std::string pred_type_string(PredicateType type) {
|
||||
|
||||
@ -32,7 +32,15 @@ class ComparisonPredicateBase : public ColumnPredicate {
|
||||
public:
|
||||
using T = typename PredicatePrimitiveTypeTraits<Type>::PredicateFieldType;
|
||||
ComparisonPredicateBase(uint32_t column_id, const T& value, bool opposite = false)
|
||||
: ColumnPredicate(column_id, opposite), _value(value) {}
|
||||
: ColumnPredicate(column_id, opposite),
|
||||
_cached_code(_InvalidateCodeValue),
|
||||
_value(value) {}
|
||||
|
||||
void clone(ColumnPredicate** to) const override {
|
||||
*to = new ComparisonPredicateBase(_column_id, _value, _opposite);
|
||||
}
|
||||
|
||||
bool need_to_clone() const override { return true; }
|
||||
|
||||
PredicateType type() const override { return PT; }
|
||||
|
||||
@ -258,13 +266,20 @@ public:
|
||||
auto* dict_column_ptr =
|
||||
vectorized::check_and_get_column<vectorized::ColumnDictI32>(
|
||||
nested_column);
|
||||
auto dict_code = _is_range() ? dict_column_ptr->find_code_by_bound(
|
||||
_value, _is_greater(), _is_eq())
|
||||
: dict_column_ptr->find_code(_value);
|
||||
auto* data_array = dict_column_ptr->get_data().data();
|
||||
|
||||
_base_loop_vec<true, is_and>(size, flags, null_map.data(), data_array,
|
||||
dict_code);
|
||||
auto dict_code = _find_code_from_dictionary_column(*dict_column_ptr);
|
||||
do {
|
||||
if constexpr (PT == PredicateType::EQ) {
|
||||
if (dict_code == -2) {
|
||||
memset(flags, 0, size);
|
||||
break;
|
||||
}
|
||||
}
|
||||
auto* data_array = dict_column_ptr->get_data().data();
|
||||
|
||||
_base_loop_vec<true, is_and>(size, flags, null_map.data(), data_array,
|
||||
dict_code);
|
||||
} while (false);
|
||||
} else {
|
||||
LOG(FATAL) << "column_dictionary must use StringRef predicate.";
|
||||
}
|
||||
@ -281,12 +296,18 @@ public:
|
||||
if constexpr (std::is_same_v<T, StringRef>) {
|
||||
auto* dict_column_ptr =
|
||||
vectorized::check_and_get_column<vectorized::ColumnDictI32>(column);
|
||||
auto dict_code = _is_range() ? dict_column_ptr->find_code_by_bound(
|
||||
_value, _is_greater(), _is_eq())
|
||||
: dict_column_ptr->find_code(_value);
|
||||
auto* data_array = dict_column_ptr->get_data().data();
|
||||
auto dict_code = _find_code_from_dictionary_column(*dict_column_ptr);
|
||||
do {
|
||||
if constexpr (PT == PredicateType::EQ) {
|
||||
if (dict_code == -2) {
|
||||
memset(flags, 0, size);
|
||||
break;
|
||||
}
|
||||
}
|
||||
auto* data_array = dict_column_ptr->get_data().data();
|
||||
|
||||
_base_loop_vec<false, is_and>(size, flags, nullptr, data_array, dict_code);
|
||||
_base_loop_vec<false, is_and>(size, flags, nullptr, data_array, dict_code);
|
||||
} while (false);
|
||||
} else {
|
||||
LOG(FATAL) << "column_dictionary must use StringRef predicate.";
|
||||
}
|
||||
@ -461,9 +482,7 @@ private:
|
||||
auto* dict_column_ptr =
|
||||
vectorized::check_and_get_column<vectorized::ColumnDictI32>(column);
|
||||
auto* data_array = dict_column_ptr->get_data().data();
|
||||
auto dict_code = _is_range() ? dict_column_ptr->find_code_by_bound(
|
||||
_value, _operator(1, 0), _operator(1, 1))
|
||||
: dict_column_ptr->find_code(_value);
|
||||
auto dict_code = _find_code_from_dictionary_column(*dict_column_ptr);
|
||||
_base_loop_bit<is_nullable, is_and>(sel, size, flags, null_map, data_array,
|
||||
dict_code);
|
||||
} else {
|
||||
@ -507,9 +526,13 @@ private:
|
||||
auto* dict_column_ptr =
|
||||
vectorized::check_and_get_column<vectorized::ColumnDictI32>(column);
|
||||
auto* data_array = dict_column_ptr->get_data().data();
|
||||
auto dict_code = _is_range() ? dict_column_ptr->find_code_by_bound(
|
||||
_value, _is_greater(), _is_eq())
|
||||
: dict_column_ptr->find_code(_value);
|
||||
auto dict_code = _find_code_from_dictionary_column(*dict_column_ptr);
|
||||
|
||||
if constexpr (PT == PredicateType::EQ) {
|
||||
if (dict_code == -2) {
|
||||
return _opposite ? size : 0;
|
||||
}
|
||||
}
|
||||
|
||||
return _base_loop<is_nullable>(sel, size, null_map, data_array, dict_code);
|
||||
} else {
|
||||
@ -527,12 +550,23 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
__attribute__((flatten)) int32_t _find_code_from_dictionary_column(
|
||||
const vectorized::ColumnDictI32& column) const {
|
||||
if (UNLIKELY(_cached_code == _InvalidateCodeValue)) {
|
||||
_cached_code = _is_range() ? column.find_code_by_bound(_value, _is_greater(), _is_eq())
|
||||
: column.find_code(_value);
|
||||
}
|
||||
return _cached_code;
|
||||
}
|
||||
|
||||
std::string _debug_string() const override {
|
||||
std::string info =
|
||||
"ComparisonPredicateBase(" + type_to_string(Type) + ", " + type_to_string(PT) + ")";
|
||||
return info;
|
||||
}
|
||||
|
||||
static constexpr int32_t _InvalidateCodeValue = std::numeric_limits<int32_t>::max();
|
||||
mutable int32_t _cached_code;
|
||||
T _value;
|
||||
};
|
||||
|
||||
|
||||
@ -157,7 +157,8 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, const Schema&
|
||||
_lazy_materialization_read(false),
|
||||
_inited(false),
|
||||
_estimate_row_size(true),
|
||||
_wait_times_estimate_row_size(10) {}
|
||||
_wait_times_estimate_row_size(10),
|
||||
_pool(new ObjectPool) {}
|
||||
|
||||
SegmentIterator::~SegmentIterator() {
|
||||
for (auto iter : _column_iterators) {
|
||||
@ -173,9 +174,18 @@ SegmentIterator::~SegmentIterator() {
|
||||
|
||||
Status SegmentIterator::init(const StorageReadOptions& opts) {
|
||||
_opts = opts;
|
||||
if (!opts.column_predicates.empty()) {
|
||||
_col_predicates = opts.column_predicates;
|
||||
|
||||
for (auto& predicate : opts.column_predicates) {
|
||||
if (predicate->need_to_clone()) {
|
||||
ColumnPredicate* cloned;
|
||||
predicate->clone(&cloned);
|
||||
_pool->add(cloned);
|
||||
_col_predicates.emplace_back(cloned);
|
||||
} else {
|
||||
_col_predicates.emplace_back(predicate);
|
||||
}
|
||||
}
|
||||
|
||||
// Read options will not change, so that just resize here
|
||||
_block_rowids.resize(_opts.block_row_max);
|
||||
if (!opts.column_predicates_except_leafnode_of_andnode.empty()) {
|
||||
|
||||
@ -397,6 +397,8 @@ private:
|
||||
// used for compaction, record selectd rowids of current batch
|
||||
uint16_t _selected_size;
|
||||
vector<uint16_t> _sel_rowid_idx;
|
||||
|
||||
std::unique_ptr<ObjectPool> _pool;
|
||||
};
|
||||
|
||||
} // namespace segment_v2
|
||||
|
||||
Reference in New Issue
Block a user