[Refactor](predicate) Unify Conditions and ColumnPredicate (#11985)
This commit is contained in:
@ -49,7 +49,6 @@ add_library(Olap STATIC
|
||||
memtable_flush_executor.cpp
|
||||
merger.cpp
|
||||
null_predicate.cpp
|
||||
olap_cond.cpp
|
||||
olap_meta.cpp
|
||||
olap_server.cpp
|
||||
options.cpp
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "block_column_predicate.h"
|
||||
|
||||
#include "olap/row_block2.h"
|
||||
#include "olap/rowset/segment_v2/bloom_filter.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -55,6 +56,15 @@ void SingleColumnBlockPredicate::evaluate_and(vectorized::MutableColumns& block,
|
||||
_predicate->evaluate_and(*column, sel, selected_size, flags);
|
||||
}
|
||||
|
||||
bool SingleColumnBlockPredicate::evaluate_and(
|
||||
const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
return _predicate->evaluate_and(statistic);
|
||||
}
|
||||
|
||||
bool SingleColumnBlockPredicate::evaluate_and(const segment_v2::BloomFilter* bf) const {
|
||||
return _predicate->evaluate_and(bf);
|
||||
}
|
||||
|
||||
void SingleColumnBlockPredicate::evaluate_or(vectorized::MutableColumns& block, uint16_t* sel,
|
||||
uint16_t selected_size, bool* flags) const {
|
||||
auto column_id = _predicate->column_id();
|
||||
@ -198,6 +208,25 @@ void AndBlockColumnPredicate::evaluate_and(vectorized::MutableColumns& block, ui
|
||||
}
|
||||
}
|
||||
|
||||
bool AndBlockColumnPredicate::evaluate_and(
|
||||
const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
for (auto block_column_predicate : _block_column_predicate_vec) {
|
||||
if (!block_column_predicate->evaluate_and(statistic)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool AndBlockColumnPredicate::evaluate_and(const segment_v2::BloomFilter* bf) const {
|
||||
for (auto block_column_predicate : _block_column_predicate_vec) {
|
||||
if (!block_column_predicate->evaluate_and(bf)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void AndBlockColumnPredicate::evaluate_or(RowBlockV2* block, uint16_t selected_size,
|
||||
bool* flags) const {
|
||||
if (num_of_column_predicate() == 1) {
|
||||
|
||||
@ -23,6 +23,10 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
namespace segment_v2 {
|
||||
class BloomFilter;
|
||||
}
|
||||
|
||||
// Block Column Predicate support do column predicate in RowBlockV2 and support OR and AND predicate
|
||||
// Block Column Predicate will replace column predicate as a unified external vectorized interface
|
||||
// in the future
|
||||
@ -57,6 +61,17 @@ public:
|
||||
|
||||
virtual void evaluate_vec(vectorized::MutableColumns& block, uint16_t size,
|
||||
bool* flags) const {};
|
||||
|
||||
virtual bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
LOG(FATAL) << "should not reach here";
|
||||
return true;
|
||||
};
|
||||
|
||||
virtual bool evaluate_and(const segment_v2::BloomFilter* bf) const {
|
||||
LOG(FATAL) << "should not reach here";
|
||||
return true;
|
||||
};
|
||||
virtual bool can_do_bloom_filter() const { return false; }
|
||||
};
|
||||
|
||||
class SingleColumnBlockPredicate : public BlockColumnPredicate {
|
||||
@ -79,11 +94,15 @@ public:
|
||||
uint16_t selected_size) const override;
|
||||
void evaluate_and(vectorized::MutableColumns& block, uint16_t* sel, uint16_t selected_size,
|
||||
bool* flags) const override;
|
||||
bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override;
|
||||
bool evaluate_and(const segment_v2::BloomFilter* bf) const override;
|
||||
void evaluate_or(vectorized::MutableColumns& block, uint16_t* sel, uint16_t selected_size,
|
||||
bool* flags) const override;
|
||||
|
||||
void evaluate_vec(vectorized::MutableColumns& block, uint16_t size, bool* flags) const override;
|
||||
|
||||
bool can_do_bloom_filter() const override { return _predicate->can_do_bloom_filter(); }
|
||||
|
||||
private:
|
||||
const ColumnPredicate* _predicate;
|
||||
};
|
||||
@ -158,6 +177,19 @@ public:
|
||||
bool* flags) const override;
|
||||
|
||||
void evaluate_vec(vectorized::MutableColumns& block, uint16_t size, bool* flags) const override;
|
||||
|
||||
bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override;
|
||||
|
||||
bool evaluate_and(const segment_v2::BloomFilter* bf) const override;
|
||||
|
||||
bool can_do_bloom_filter() const override {
|
||||
for (auto& pred : _block_column_predicate_vec) {
|
||||
if (!pred->can_do_bloom_filter()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
} //namespace doris
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
#include "olap/column_block.h"
|
||||
#include "olap/rowset/segment_v2/bitmap_index_reader.h"
|
||||
#include "olap/rowset/segment_v2/bloom_filter.h"
|
||||
#include "olap/selection_vector.h"
|
||||
#include "vec/columns/column.h"
|
||||
|
||||
@ -96,6 +97,14 @@ public:
|
||||
virtual void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
|
||||
bool* flags) const {};
|
||||
|
||||
virtual bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual bool evaluate_and(const BloomFilter* bf) const { return true; }
|
||||
|
||||
virtual bool can_do_bloom_filter() const { return false; }
|
||||
|
||||
// used to evaluate pre read column in lazy matertialization
|
||||
// now only support integer/float
|
||||
// a vectorized eval way
|
||||
|
||||
@ -20,6 +20,8 @@
|
||||
#include <cstdint>
|
||||
|
||||
#include "olap/column_predicate.h"
|
||||
#include "olap/rowset/segment_v2/bloom_filter.h"
|
||||
#include "olap/wrapper_field.h"
|
||||
#include "vec/columns/column_dictionary.h"
|
||||
|
||||
namespace doris {
|
||||
@ -199,6 +201,78 @@ public:
|
||||
_evaluate_bit<true>(column, sel, size, flags);
|
||||
}
|
||||
|
||||
#define COMPARE_TO_MIN_OR_MAX(ELE) \
|
||||
if constexpr (Type == TYPE_DATE) { \
|
||||
T tmp_uint32_value = 0; \
|
||||
memcpy((char*)(&tmp_uint32_value), statistic.ELE->cell_ptr(), sizeof(uint24_t)); \
|
||||
return _operator(tmp_uint32_value, _value); \
|
||||
} else { \
|
||||
return _operator(*reinterpret_cast<const T*>(statistic.ELE->cell_ptr()), _value); \
|
||||
}
|
||||
|
||||
bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
|
||||
if (statistic.first->is_null()) {
|
||||
return true;
|
||||
}
|
||||
if constexpr (PT == PredicateType::EQ) {
|
||||
if constexpr (Type == TYPE_DATE) {
|
||||
T tmp_min_uint32_value = 0;
|
||||
memcpy((char*)(&tmp_min_uint32_value), statistic.first->cell_ptr(),
|
||||
sizeof(uint24_t));
|
||||
T tmp_max_uint32_value = 0;
|
||||
memcpy((char*)(&tmp_max_uint32_value), statistic.second->cell_ptr(),
|
||||
sizeof(uint24_t));
|
||||
return _operator(tmp_min_uint32_value <= _value && tmp_max_uint32_value >= _value,
|
||||
true);
|
||||
} else {
|
||||
return _operator(
|
||||
*reinterpret_cast<const T*>(statistic.first->cell_ptr()) <= _value &&
|
||||
*reinterpret_cast<const T*>(statistic.second->cell_ptr()) >= _value,
|
||||
true);
|
||||
}
|
||||
} else if constexpr (PT == PredicateType::NE) {
|
||||
if constexpr (Type == TYPE_DATE) {
|
||||
T tmp_min_uint32_value = 0;
|
||||
memcpy((char*)(&tmp_min_uint32_value), statistic.first->cell_ptr(),
|
||||
sizeof(uint24_t));
|
||||
T tmp_max_uint32_value = 0;
|
||||
memcpy((char*)(&tmp_max_uint32_value), statistic.second->cell_ptr(),
|
||||
sizeof(uint24_t));
|
||||
return _operator(tmp_min_uint32_value == _value && tmp_max_uint32_value == _value,
|
||||
true);
|
||||
} else {
|
||||
return _operator(
|
||||
*reinterpret_cast<const T*>(statistic.first->cell_ptr()) == _value &&
|
||||
*reinterpret_cast<const T*>(statistic.second->cell_ptr()) == _value,
|
||||
true);
|
||||
}
|
||||
} else if constexpr (PT == PredicateType::LT || PT == PredicateType::LE) {
|
||||
COMPARE_TO_MIN_OR_MAX(first)
|
||||
} else {
|
||||
static_assert(PT == PredicateType::GT || PT == PredicateType::GE);
|
||||
COMPARE_TO_MIN_OR_MAX(second)
|
||||
}
|
||||
}
|
||||
|
||||
bool evaluate_and(const segment_v2::BloomFilter* bf) const override {
|
||||
if constexpr (PT == PredicateType::EQ) {
|
||||
if constexpr (std::is_same_v<T, StringValue>) {
|
||||
return bf->test_bytes(_value.ptr, _value.len);
|
||||
} else if constexpr (Type == TYPE_DATE) {
|
||||
return bf->test_bytes(const_cast<char*>(reinterpret_cast<const char*>(&_value)),
|
||||
sizeof(uint24_t));
|
||||
} else {
|
||||
return bf->test_bytes(const_cast<char*>(reinterpret_cast<const char*>(&_value)),
|
||||
sizeof(_value));
|
||||
}
|
||||
} else {
|
||||
LOG(FATAL) << "Bloom filter is not supported by predicate type.";
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool can_do_bloom_filter() const override { return PT == PredicateType::EQ; }
|
||||
|
||||
void evaluate_or(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
|
||||
bool* flags) const override {
|
||||
_evaluate_bit<false>(column, sel, size, flags);
|
||||
|
||||
@ -29,7 +29,6 @@
|
||||
|
||||
#include "gen_cpp/olap_file.pb.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/olap_cond.h"
|
||||
#include "olap/predicate_creator.h"
|
||||
#include "olap/tablet.h"
|
||||
#include "olap/utils.h"
|
||||
@ -255,12 +254,6 @@ Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tabl
|
||||
Version(delete_condition.version(), delete_condition.version()));
|
||||
DeleteConditions temp;
|
||||
temp.filter_version = delete_condition.version();
|
||||
temp.del_cond = new (std::nothrow) Conditions(tablet_schema);
|
||||
|
||||
if (temp.del_cond == nullptr) {
|
||||
LOG(FATAL) << "fail to malloc Conditions. size=" << sizeof(Conditions);
|
||||
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
|
||||
}
|
||||
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
|
||||
TCondition condition;
|
||||
if (!_parse_condition(sub_predicate, &condition)) {
|
||||
@ -269,11 +262,6 @@ Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tabl
|
||||
}
|
||||
condition.__set_column_unique_id(
|
||||
delete_pred_related_schema->column(condition.column_name).unique_id());
|
||||
Status res = temp.del_cond->append_condition(condition);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "fail to append condition.res = " << res;
|
||||
return res;
|
||||
}
|
||||
auto predicate =
|
||||
parse_to_predicate(tablet_schema, condition, _predicate_mem_pool.get(), true);
|
||||
if (predicate != nullptr) {
|
||||
@ -294,11 +282,6 @@ Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tabl
|
||||
for (const auto& value : in_predicate.values()) {
|
||||
condition.condition_values.push_back(value);
|
||||
}
|
||||
Status res = temp.del_cond->append_condition(condition);
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "fail to append condition.res = " << res;
|
||||
return res;
|
||||
}
|
||||
temp.column_predicate_vec.push_back(
|
||||
parse_to_predicate(tablet_schema, condition, _predicate_mem_pool.get(), true));
|
||||
}
|
||||
@ -311,24 +294,12 @@ Status DeleteHandler::init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tabl
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
std::vector<int64_t> DeleteHandler::get_conds_version() {
|
||||
std::vector<int64_t> conds_version;
|
||||
for (const auto& cond : _del_conds) {
|
||||
conds_version.push_back(cond.filter_version);
|
||||
}
|
||||
|
||||
return conds_version;
|
||||
}
|
||||
|
||||
void DeleteHandler::finalize() {
|
||||
if (!_is_inited) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& cond : _del_conds) {
|
||||
cond.del_cond->finalize();
|
||||
delete cond.del_cond;
|
||||
|
||||
for (auto pred : cond.column_predicate_vec) {
|
||||
delete pred;
|
||||
}
|
||||
@ -339,12 +310,11 @@ void DeleteHandler::finalize() {
|
||||
}
|
||||
|
||||
void DeleteHandler::get_delete_conditions_after_version(
|
||||
int64_t version, std::vector<const Conditions*>* delete_conditions,
|
||||
AndBlockColumnPredicate* and_block_column_predicate_ptr) const {
|
||||
int64_t version, AndBlockColumnPredicate* and_block_column_predicate_ptr,
|
||||
std::unordered_map<int32_t, std::vector<const ColumnPredicate*>>* col_id_to_del_predicates)
|
||||
const {
|
||||
for (auto& del_cond : _del_conds) {
|
||||
if (del_cond.filter_version > version) {
|
||||
delete_conditions->emplace_back(del_cond.del_cond);
|
||||
|
||||
// now, only query support delete column predicate operator
|
||||
if (!del_cond.column_predicate_vec.empty()) {
|
||||
if (del_cond.column_predicate_vec.size() == 1) {
|
||||
@ -352,16 +322,33 @@ void DeleteHandler::get_delete_conditions_after_version(
|
||||
new SingleColumnBlockPredicate(del_cond.column_predicate_vec[0]);
|
||||
and_block_column_predicate_ptr->add_column_predicate(
|
||||
single_column_block_predicate);
|
||||
if (col_id_to_del_predicates->count(
|
||||
del_cond.column_predicate_vec[0]->column_id()) < 1) {
|
||||
col_id_to_del_predicates->insert(
|
||||
{del_cond.column_predicate_vec[0]->column_id(),
|
||||
std::vector<const ColumnPredicate*> {}});
|
||||
}
|
||||
(*col_id_to_del_predicates)[del_cond.column_predicate_vec[0]->column_id()]
|
||||
.push_back(del_cond.column_predicate_vec[0]);
|
||||
} else {
|
||||
auto or_column_predicate = new OrBlockColumnPredicate();
|
||||
|
||||
// build or_column_predicate
|
||||
std::for_each(del_cond.column_predicate_vec.cbegin(),
|
||||
del_cond.column_predicate_vec.cend(),
|
||||
[&or_column_predicate](const ColumnPredicate* predicate) {
|
||||
or_column_predicate->add_column_predicate(
|
||||
new SingleColumnBlockPredicate(predicate));
|
||||
});
|
||||
std::for_each(
|
||||
del_cond.column_predicate_vec.cbegin(),
|
||||
del_cond.column_predicate_vec.cend(),
|
||||
[&or_column_predicate,
|
||||
col_id_to_del_predicates](const ColumnPredicate* predicate) {
|
||||
if (col_id_to_del_predicates->count(predicate->column_id()) < 1) {
|
||||
col_id_to_del_predicates->insert(
|
||||
{predicate->column_id(),
|
||||
std::vector<const ColumnPredicate*> {}});
|
||||
}
|
||||
(*col_id_to_del_predicates)[predicate->column_id()].push_back(
|
||||
predicate);
|
||||
or_column_predicate->add_column_predicate(
|
||||
new SingleColumnBlockPredicate(predicate));
|
||||
});
|
||||
and_block_column_predicate_ptr->add_column_predicate(or_column_predicate);
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,7 +29,6 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
class Conditions;
|
||||
class RowCursor;
|
||||
class Tablet;
|
||||
class TabletReader;
|
||||
@ -37,8 +36,7 @@ class TabletSchema;
|
||||
|
||||
// Represent a delete condition.
|
||||
struct DeleteConditions {
|
||||
int64_t filter_version = 0; // The version of this condition
|
||||
Conditions* del_cond = nullptr; // The delete condition
|
||||
int64_t filter_version = 0; // The version of this condition
|
||||
std::vector<const ColumnPredicate*> column_predicate_vec;
|
||||
};
|
||||
|
||||
@ -94,23 +92,15 @@ public:
|
||||
Status init(std::shared_ptr<Tablet> tablet, TabletSchemaSPtr tablet_schema,
|
||||
const std::vector<DeletePredicatePB>& delete_conditions, int64_t version);
|
||||
|
||||
// Return the delete conditions' size.
|
||||
size_t conditions_num() const { return _del_conds.size(); }
|
||||
|
||||
bool empty() const { return _del_conds.empty(); }
|
||||
|
||||
// Return all the versions of the delete conditions.
|
||||
std::vector<int64_t> get_conds_version();
|
||||
|
||||
// Release an instance of this class.
|
||||
void finalize();
|
||||
|
||||
// Return all the delete conditions.
|
||||
const std::vector<DeleteConditions>& get_delete_conditions() const { return _del_conds; }
|
||||
|
||||
void get_delete_conditions_after_version(
|
||||
int64_t version, std::vector<const Conditions*>* delete_conditions,
|
||||
AndBlockColumnPredicate* and_block_column_predicate_ptr) const;
|
||||
int64_t version, AndBlockColumnPredicate* and_block_column_predicate_ptr,
|
||||
std::unordered_map<int32_t, std::vector<const ColumnPredicate*>>*
|
||||
col_id_to_del_predicates) const;
|
||||
|
||||
private:
|
||||
// Use regular expression to extract 'column_name', 'op' and 'operands'
|
||||
|
||||
@ -25,7 +25,10 @@
|
||||
|
||||
#include "decimal12.h"
|
||||
#include "olap/column_predicate.h"
|
||||
#include "olap/rowset/segment_v2/bloom_filter.h"
|
||||
#include "olap/wrapper_field.h"
|
||||
#include "runtime/string_value.h"
|
||||
#include "runtime/type_limit.h"
|
||||
#include "uint24.h"
|
||||
#include "vec/columns/column_dictionary.h"
|
||||
#include "vec/core/types.h"
|
||||
@ -80,9 +83,41 @@ template <PrimitiveType Type, PredicateType PT>
|
||||
class InListPredicateBase : public ColumnPredicate {
|
||||
public:
|
||||
using T = typename PredicatePrimitiveTypeTraits<Type>::PredicateFieldType;
|
||||
template <typename ConditionType, typename ConvertFunc>
|
||||
InListPredicateBase(uint32_t column_id, const ConditionType& conditions,
|
||||
const ConvertFunc& convert, bool is_opposite = false,
|
||||
const TabletColumn* col = nullptr, MemPool* pool = nullptr)
|
||||
: ColumnPredicate(column_id, is_opposite),
|
||||
_min_value(type_limit<T>::max()),
|
||||
_max_value(type_limit<T>::min()) {
|
||||
for (const auto& condition : conditions) {
|
||||
T tmp;
|
||||
if constexpr (Type == TYPE_STRING || Type == TYPE_CHAR) {
|
||||
tmp = convert(*col, condition, pool);
|
||||
} else if constexpr (Type == TYPE_DECIMAL32 || Type == TYPE_DECIMAL64 ||
|
||||
Type == TYPE_DECIMAL128) {
|
||||
tmp = convert(*col, condition);
|
||||
} else {
|
||||
tmp = convert(condition);
|
||||
}
|
||||
_values.insert(tmp);
|
||||
if (tmp > _max_value) {
|
||||
_max_value = tmp;
|
||||
}
|
||||
if (tmp < _min_value) {
|
||||
_min_value = tmp;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Only for test
|
||||
InListPredicateBase(uint32_t column_id, phmap::flat_hash_set<T>&& values,
|
||||
T min_value = type_limit<T>::min(), T max_value = type_limit<T>::max(),
|
||||
bool is_opposite = false)
|
||||
: ColumnPredicate(column_id, is_opposite), _values(std::move(values)) {}
|
||||
: ColumnPredicate(column_id, is_opposite),
|
||||
_values(std::move(values)),
|
||||
_min_value(min_value),
|
||||
_max_value(max_value) {}
|
||||
|
||||
PredicateType type() const override { return PT; }
|
||||
|
||||
@ -181,6 +216,54 @@ public:
|
||||
LOG(FATAL) << "IColumn not support in_list_predicate.evaluate_or now.";
|
||||
}
|
||||
|
||||
bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
|
||||
if (statistic.first->is_null()) {
|
||||
return true;
|
||||
}
|
||||
if constexpr (PT == PredicateType::IN_LIST) {
|
||||
if constexpr (Type == TYPE_DATE) {
|
||||
T tmp_min_uint32_value = 0;
|
||||
memcpy((char*)(&tmp_min_uint32_value), statistic.first->cell_ptr(),
|
||||
sizeof(uint24_t));
|
||||
T tmp_max_uint32_value = 0;
|
||||
memcpy((char*)(&tmp_max_uint32_value), statistic.second->cell_ptr(),
|
||||
sizeof(uint24_t));
|
||||
return tmp_min_uint32_value <= _max_value && tmp_max_uint32_value >= _min_value;
|
||||
} else {
|
||||
return *reinterpret_cast<const T*>(statistic.first->cell_ptr()) <= _max_value &&
|
||||
*reinterpret_cast<const T*>(statistic.second->cell_ptr()) >= _min_value;
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool evaluate_and(const segment_v2::BloomFilter* bf) const override {
|
||||
if constexpr (PT == PredicateType::IN_LIST) {
|
||||
for (auto value : _values) {
|
||||
if constexpr (std::is_same_v<T, StringValue>) {
|
||||
if (bf->test_bytes(value.ptr, value.len)) {
|
||||
return true;
|
||||
}
|
||||
} else if constexpr (Type == TYPE_DATE) {
|
||||
if (bf->test_bytes(reinterpret_cast<char*>(&value), sizeof(uint24_t))) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
if (bf->test_bytes(reinterpret_cast<char*>(&value), sizeof(value))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
LOG(FATAL) << "Bloom filter is not supported by predicate type.";
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool can_do_bloom_filter() const override { return PT == PredicateType::IN_LIST; }
|
||||
|
||||
private:
|
||||
template <typename LeftT, typename RightT>
|
||||
bool _operator(const LeftT& lhs, const RightT& rhs) const {
|
||||
@ -325,6 +408,8 @@ private:
|
||||
|
||||
phmap::flat_hash_set<T> _values;
|
||||
mutable std::vector<vectorized::UInt8> _value_in_dict_flags;
|
||||
T _min_value;
|
||||
T _max_value;
|
||||
};
|
||||
|
||||
} //namespace doris
|
||||
|
||||
@ -31,7 +31,6 @@ namespace doris {
|
||||
class RowCursor;
|
||||
class RowBlockV2;
|
||||
class Schema;
|
||||
class Conditions;
|
||||
class ColumnPredicate;
|
||||
|
||||
class StorageReadOptions {
|
||||
@ -64,14 +63,6 @@ public:
|
||||
// used by short key index to filter row blocks
|
||||
std::vector<KeyRange> key_ranges;
|
||||
|
||||
// reader's column predicates, nullptr if not existed.
|
||||
// used by column index to filter pages and rows
|
||||
// TODO use vector<ColumnPredicate*> instead
|
||||
const Conditions* conditions = nullptr;
|
||||
|
||||
// delete conditions used by column index to filter pages
|
||||
std::vector<const Conditions*> delete_conditions;
|
||||
|
||||
// For unique-key merge-on-write, the effect is similar to delete_conditions
|
||||
// that filters out rows that are deleted in realtime.
|
||||
// For a particular row, if delete_bitmap.contains(rowid) means that row is
|
||||
@ -83,9 +74,9 @@ public:
|
||||
std::make_shared<AndBlockColumnPredicate>();
|
||||
// reader's column predicate, nullptr if not existed
|
||||
// used to fiter rows in row block
|
||||
// TODO(hkp): refactor the column predicate framework
|
||||
// to unify Conditions and ColumnPredicate
|
||||
std::vector<ColumnPredicate*> column_predicates;
|
||||
std::unordered_map<int32_t, std::shared_ptr<AndBlockColumnPredicate>> col_id_to_predicates;
|
||||
std::unordered_map<int32_t, std::vector<const ColumnPredicate*>> col_id_to_del_predicates;
|
||||
|
||||
// REQUIRED (null is not allowed)
|
||||
OlapReaderStatistics* stats = nullptr;
|
||||
|
||||
@ -22,6 +22,8 @@
|
||||
#include <roaring/roaring.hh>
|
||||
|
||||
#include "olap/column_predicate.h"
|
||||
#include "olap/rowset/segment_v2/bloom_filter.h"
|
||||
#include "olap/wrapper_field.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
@ -49,6 +51,25 @@ public:
|
||||
void evaluate_and(const vectorized::IColumn& column, const uint16_t* sel, uint16_t size,
|
||||
bool* flags) const override;
|
||||
|
||||
bool evaluate_and(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
|
||||
if (_is_null) {
|
||||
return statistic.first->is_null();
|
||||
} else {
|
||||
return !statistic.second->is_null();
|
||||
}
|
||||
}
|
||||
|
||||
bool evaluate_and(const segment_v2::BloomFilter* bf) const override {
|
||||
if (_is_null) {
|
||||
return bf->test_bytes(nullptr, 0);
|
||||
} else {
|
||||
LOG(FATAL) << "Bloom filter is not supported by predicate type: is_null=" << _is_null;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
bool can_do_bloom_filter() const override { return _is_null; }
|
||||
|
||||
void evaluate_vec(const vectorized::IColumn& column, uint16_t size, bool* flags) const override;
|
||||
|
||||
private:
|
||||
|
||||
@ -1,572 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "olap/olap_cond.h"
|
||||
|
||||
#include <thrift/protocol/TDebugProtocol.h>
|
||||
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/olap_define.h"
|
||||
#include "olap/utils.h"
|
||||
#include "olap/wrapper_field.h"
|
||||
|
||||
using std::nothrow;
|
||||
using std::pair;
|
||||
using std::string;
|
||||
using std::vector;
|
||||
|
||||
using doris::ColumnStatistics;
|
||||
|
||||
//This file is mainly used to process query conditions and delete conditions sent by users. Logically, both can be divided into three layers
|
||||
//Condition->Condcolumn->Cond
|
||||
//Condition represents a single condition sent by the user
|
||||
//Condcolumn represents the collection of all conditions on a column.
|
||||
//Conds represents a single condition on a column.
|
||||
//For query conditions, the conditions of each level are logical AND relationships
|
||||
//There are different conditions for delete. The relationship between Cond and Condcolumn is logical AND, and the relationship between Condtion is logical OR.
|
||||
|
||||
//Specific to the realization.
|
||||
//eval is used to filter query conditions, including the filtering of heap row, block, and version. Which layer is used depends on the specific calling place.
|
||||
// 1. There is no filter condition to filter rows separately, this part is carried out in the query layer.
|
||||
// 2. The filter block is in the SegmentReader.
|
||||
// 3. Filter version in Reader. Call delta_pruing_filter
|
||||
//
|
||||
//del_eval is used to filter deletion conditions, including the filtering of heap block and version, but this filtering has one more state than eval, that is, partial filtering.
|
||||
// 1. The filtering of rows is in DeleteHandler.
|
||||
// This part directly calls delete_condition_eval to achieve, and internally calls the eval function, because the filtering of row does not involve partial filtering.
|
||||
// 2. The filter block is in the SegmentReader, call del_eval directly
|
||||
// 3. The filter version is actually in Reader, call rowset_pruning_filter
|
||||
|
||||
namespace doris {
|
||||
|
||||
#define MAX_OP_STR_LENGTH 3
|
||||
|
||||
static CondOp parse_op_type(const string& op) {
|
||||
if (op.size() > MAX_OP_STR_LENGTH) {
|
||||
return OP_NULL;
|
||||
}
|
||||
|
||||
if (op == "=") {
|
||||
return OP_EQ;
|
||||
} else if (0 == strcasecmp(op.c_str(), "is")) {
|
||||
return OP_IS;
|
||||
} else if (op == "!=") {
|
||||
return OP_NE;
|
||||
} else if (op == "*=") {
|
||||
return OP_IN;
|
||||
} else if (op == "!*=") {
|
||||
return OP_NOT_IN;
|
||||
} else if (op == ">=") {
|
||||
return OP_GE;
|
||||
} else if (op == ">>" || op == ">") {
|
||||
return OP_GT;
|
||||
} else if (op == "<=") {
|
||||
return OP_LE;
|
||||
} else if (op == "<<" || op == "<") {
|
||||
return OP_LT;
|
||||
}
|
||||
|
||||
return OP_NULL;
|
||||
}
|
||||
|
||||
Cond::~Cond() {
|
||||
delete operand_field;
|
||||
for (auto& it : operand_set) {
|
||||
delete it;
|
||||
}
|
||||
min_value_field = nullptr;
|
||||
max_value_field = nullptr;
|
||||
}
|
||||
|
||||
Status Cond::init(const TCondition& tcond, const TabletColumn& column) {
|
||||
// Parse op type
|
||||
op = parse_op_type(tcond.condition_op);
|
||||
if (op == OP_NULL || (op != OP_IN && op != OP_NOT_IN && tcond.condition_values.size() != 1)) {
|
||||
LOG(WARNING) << "Condition op type is invalid. [name=" << tcond.column_name << ", op=" << op
|
||||
<< ", size=" << tcond.condition_values.size() << "]";
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
}
|
||||
if (op == OP_IS) {
|
||||
// 'is null' or 'is not null'
|
||||
DCHECK_EQ(tcond.condition_values.size(), 1);
|
||||
auto operand = tcond.condition_values.begin();
|
||||
std::unique_ptr<WrapperField> f(WrapperField::create(column, operand->length()));
|
||||
if (f == nullptr) {
|
||||
LOG(WARNING) << "Create field failed. [name=" << tcond.column_name
|
||||
<< ", operand=" << operand->c_str() << ", op_type=" << op << "]";
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
}
|
||||
if (strcasecmp(operand->c_str(), "NULL") == 0) {
|
||||
f->set_null();
|
||||
} else {
|
||||
f->set_not_null();
|
||||
}
|
||||
operand_field = f.release();
|
||||
} else if (op != OP_IN && op != OP_NOT_IN) {
|
||||
DCHECK_EQ(tcond.condition_values.size(), 1);
|
||||
auto operand = tcond.condition_values.begin();
|
||||
std::unique_ptr<WrapperField> f(WrapperField::create(column, operand->length()));
|
||||
if (f == nullptr) {
|
||||
LOG(WARNING) << "Create field failed. [name=" << tcond.column_name
|
||||
<< ", operand=" << operand->c_str() << ", op_type=" << op << "]";
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
}
|
||||
Status res = f->from_string(*operand, column.precision(), column.frac());
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "Convert from string failed. [name=" << tcond.column_name
|
||||
<< ", operand=" << operand->c_str() << ", op_type=" << op << "]";
|
||||
return res;
|
||||
}
|
||||
operand_field = f.release();
|
||||
} else {
|
||||
DCHECK(op == OP_IN || op == OP_NOT_IN);
|
||||
DCHECK(!tcond.condition_values.empty());
|
||||
for (auto& operand : tcond.condition_values) {
|
||||
std::unique_ptr<WrapperField> f(WrapperField::create(column, operand.length()));
|
||||
if (f == nullptr) {
|
||||
LOG(WARNING) << "Create field failed. [name=" << tcond.column_name
|
||||
<< ", operand=" << operand.c_str() << ", op_type=" << op << "]";
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
}
|
||||
Status res = f->from_string(operand, column.precision(), column.frac());
|
||||
if (!res.ok()) {
|
||||
LOG(WARNING) << "Convert from string failed. [name=" << tcond.column_name
|
||||
<< ", operand=" << operand.c_str() << ", op_type=" << op << "]";
|
||||
return res;
|
||||
}
|
||||
if (min_value_field == nullptr || f->cmp(min_value_field) < 0) {
|
||||
min_value_field = f.get();
|
||||
}
|
||||
|
||||
if (max_value_field == nullptr || f->cmp(max_value_field) > 0) {
|
||||
max_value_field = f.get();
|
||||
}
|
||||
|
||||
auto insert_result = operand_set.insert(f.get());
|
||||
if (!insert_result.second) {
|
||||
LOG(WARNING) << "Duplicate operand in in-predicate.[condition=" << operand << "]";
|
||||
// Duplicated, let std::unique_ptr delete field
|
||||
} else {
|
||||
// Normal case, release this std::unique_ptr
|
||||
f.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool Cond::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
//A single query condition filtered by a single column
|
||||
// When we apply column statistic, Field can be NULL when type is Varchar,
|
||||
// we just ignore this cond
|
||||
if (statistic.first == nullptr || statistic.second == nullptr) {
|
||||
return true;
|
||||
}
|
||||
if (OP_IS != op && statistic.first->is_null()) {
|
||||
return true;
|
||||
}
|
||||
switch (op) {
|
||||
case OP_EQ: {
|
||||
return operand_field->cmp(statistic.first) >= 0 &&
|
||||
operand_field->cmp(statistic.second) <= 0;
|
||||
}
|
||||
case OP_NE: {
|
||||
return true;
|
||||
}
|
||||
case OP_LT: {
|
||||
return operand_field->cmp(statistic.first) > 0;
|
||||
}
|
||||
case OP_LE: {
|
||||
return operand_field->cmp(statistic.first) >= 0;
|
||||
}
|
||||
case OP_GT: {
|
||||
return operand_field->cmp(statistic.second) < 0;
|
||||
}
|
||||
case OP_GE: {
|
||||
return operand_field->cmp(statistic.second) <= 0;
|
||||
}
|
||||
case OP_IN: {
|
||||
return min_value_field->cmp(statistic.second) <= 0 &&
|
||||
max_value_field->cmp(statistic.first) >= 0;
|
||||
}
|
||||
case OP_NOT_IN: {
|
||||
return true;
|
||||
}
|
||||
case OP_IS: {
|
||||
if (operand_field->is_null()) {
|
||||
return statistic.first->is_null();
|
||||
} else {
|
||||
return !statistic.second->is_null();
|
||||
}
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
int Cond::del_eval(const std::pair<WrapperField*, WrapperField*>& stat) const {
|
||||
// When we apply column statistics, stat maybe null.
|
||||
if (stat.first == nullptr || stat.second == nullptr) {
|
||||
//for string type, the column statistics may be not recorded in block level
|
||||
//so it can be ignored for ColumnStatistics.
|
||||
return DEL_PARTIAL_SATISFIED;
|
||||
}
|
||||
|
||||
if (OP_IS != op) {
|
||||
if (stat.first->is_null() && stat.second->is_null()) {
|
||||
return DEL_NOT_SATISFIED;
|
||||
} else if (stat.first->is_null() && !stat.second->is_null()) {
|
||||
return DEL_PARTIAL_SATISFIED;
|
||||
}
|
||||
}
|
||||
|
||||
int ret = DEL_NOT_SATISFIED;
|
||||
switch (op) {
|
||||
case OP_EQ: {
|
||||
int cmp1 = operand_field->cmp(stat.first);
|
||||
int cmp2 = operand_field->cmp(stat.second);
|
||||
if (cmp1 == 0 && cmp2 == 0) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else if (cmp1 >= 0 && cmp2 <= 0) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
case OP_NE: {
|
||||
int cmp1 = operand_field->cmp(stat.first);
|
||||
int cmp2 = operand_field->cmp(stat.second);
|
||||
if (cmp1 == 0 && cmp2 == 0) {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (cmp1 >= 0 && cmp2 <= 0) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_SATISFIED;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
case OP_LT: {
|
||||
if (operand_field->cmp(stat.first) <= 0) {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (operand_field->cmp(stat.second) > 0) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
case OP_LE: {
|
||||
if (operand_field->cmp(stat.first) < 0) {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (operand_field->cmp(stat.second) >= 0) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
case OP_GT: {
|
||||
if (operand_field->cmp(stat.second) >= 0) {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (operand_field->cmp(stat.first) < 0) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
case OP_GE: {
|
||||
if (operand_field->cmp(stat.second) > 0) {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (operand_field->cmp(stat.first) <= 0) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
case OP_IN: {
|
||||
if (stat.first->cmp(stat.second) == 0) {
|
||||
if (operand_set.find(stat.first) != operand_set.end()) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
}
|
||||
} else {
|
||||
if (min_value_field->cmp(stat.second) <= 0 && max_value_field->cmp(stat.first) >= 0) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
case OP_NOT_IN: {
|
||||
if (stat.first->cmp(stat.second) == 0) {
|
||||
if (operand_set.find(stat.first) == operand_set.end()) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
}
|
||||
} else {
|
||||
if (min_value_field->cmp(stat.second) > 0 || max_value_field->cmp(stat.first) < 0) {
|
||||
// When there is no intersection, all entries in the range should be deleted.
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
case OP_IS: {
|
||||
if (operand_field->is_null()) {
|
||||
if (stat.first->is_null() && stat.second->is_null()) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else if (stat.first->is_null() && !stat.second->is_null()) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else if (!stat.first->is_null() && !stat.second->is_null()) {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else {
|
||||
CHECK(false)
|
||||
<< "It will not happen when the stat's min is not null and max is null";
|
||||
}
|
||||
} else {
|
||||
if (stat.first->is_null() && stat.second->is_null()) {
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (stat.first->is_null() && !stat.second->is_null()) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else if (!stat.first->is_null() && !stat.second->is_null()) {
|
||||
ret = DEL_SATISFIED;
|
||||
} else {
|
||||
CHECK(false)
|
||||
<< "It will not happen when the stat's min is not null and max is null";
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
default:
|
||||
LOG(WARNING) << "Not supported operation: " << op;
|
||||
break;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool Cond::eval(const BloomFilter& bf) const {
|
||||
switch (op) {
|
||||
case OP_EQ: {
|
||||
bool existed = false;
|
||||
if (operand_field->is_string_type()) {
|
||||
Slice* slice = (Slice*)(operand_field->ptr());
|
||||
existed = bf.test_bytes(slice->data, slice->size);
|
||||
} else {
|
||||
existed = bf.test_bytes(operand_field->ptr(), operand_field->size());
|
||||
}
|
||||
return existed;
|
||||
}
|
||||
case OP_IN: {
|
||||
FieldSet::const_iterator it = operand_set.begin();
|
||||
for (; it != operand_set.end(); ++it) {
|
||||
bool existed = false;
|
||||
if ((*it)->is_string_type()) {
|
||||
Slice* slice = (Slice*)((*it)->ptr());
|
||||
existed = bf.test_bytes(slice->data, slice->size);
|
||||
} else {
|
||||
existed = bf.test_bytes((*it)->ptr(), (*it)->size());
|
||||
}
|
||||
if (existed) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
case OP_IS: {
|
||||
// IS [NOT] nullptr can only used in to filter IS nullptr predicate.
|
||||
if (operand_field->is_null()) {
|
||||
return bf.test_bytes(nullptr, 0);
|
||||
}
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Cond::eval(const segment_v2::BloomFilter* bf) const {
|
||||
switch (op) {
|
||||
case OP_EQ: {
|
||||
bool existed = false;
|
||||
if (operand_field->is_string_type()) {
|
||||
Slice* slice = (Slice*)(operand_field->ptr());
|
||||
existed = bf->test_bytes(slice->data, slice->size);
|
||||
} else {
|
||||
existed = bf->test_bytes(operand_field->ptr(), operand_field->size());
|
||||
}
|
||||
return existed;
|
||||
}
|
||||
case OP_IN: {
|
||||
FieldSet::const_iterator it = operand_set.begin();
|
||||
for (; it != operand_set.end(); ++it) {
|
||||
bool existed = false;
|
||||
if ((*it)->is_string_type()) {
|
||||
Slice* slice = (Slice*)((*it)->ptr());
|
||||
existed = bf->test_bytes(slice->data, slice->size);
|
||||
} else {
|
||||
existed = bf->test_bytes((*it)->ptr(), (*it)->size());
|
||||
}
|
||||
if (existed) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
case OP_IS: {
|
||||
// IS [NOT] nullptr can only used in to filter IS nullptr predicate.
|
||||
return operand_field->is_null() == bf->test_bytes(nullptr, 0);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
CondColumn::~CondColumn() {
|
||||
for (auto& it : _conds) {
|
||||
delete it;
|
||||
}
|
||||
}
|
||||
|
||||
// PRECONDITION 1. index is valid; 2. at least has one operand
|
||||
Status CondColumn::add_cond(const TCondition& tcond, const TabletColumn& column) {
|
||||
std::unique_ptr<Cond> cond(new Cond());
|
||||
auto res = cond->init(tcond, column);
|
||||
if (!res.ok()) {
|
||||
return res;
|
||||
}
|
||||
_conds.push_back(cond.release());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool CondColumn::eval(const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
for (auto& each_cond : _conds) {
|
||||
// As long as there is one condition not satisfied, we can return false
|
||||
if (!each_cond->eval(statistic)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int CondColumn::del_eval(const std::pair<WrapperField*, WrapperField*>& statistic) const {
|
||||
/*
|
||||
* the relationship between cond A and B is A & B.
|
||||
* if all delete condition is satisfied, the data can be filtered.
|
||||
* elseif any delete condition is not satisfied, the data can't be filtered.
|
||||
* else is the partial satisfied.
|
||||
*/
|
||||
int ret = DEL_NOT_SATISFIED;
|
||||
bool del_partial_satisfied = false;
|
||||
bool del_not_satisfied = false;
|
||||
for (auto& each_cond : _conds) {
|
||||
int del_ret = each_cond->del_eval(statistic);
|
||||
if (DEL_SATISFIED == del_ret) {
|
||||
continue;
|
||||
} else if (DEL_PARTIAL_SATISFIED == del_ret) {
|
||||
del_partial_satisfied = true;
|
||||
} else {
|
||||
del_not_satisfied = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (del_not_satisfied || _conds.empty()) {
|
||||
// if the size of condcolumn vector is zero,
|
||||
// the delete condtion is not satisfied.
|
||||
ret = DEL_NOT_SATISFIED;
|
||||
} else if (del_partial_satisfied) {
|
||||
ret = DEL_PARTIAL_SATISFIED;
|
||||
} else {
|
||||
ret = DEL_SATISFIED;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool CondColumn::eval(const BloomFilter& bf) const {
|
||||
for (auto& each_cond : _conds) {
|
||||
if (!each_cond->eval(bf)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CondColumn::eval(const segment_v2::BloomFilter* bf) const {
|
||||
for (auto& each_cond : _conds) {
|
||||
if (!each_cond->eval(bf)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
Status Conditions::append_condition(const TCondition& tcond) {
|
||||
DCHECK(_schema != nullptr);
|
||||
int32_t index = _schema->field_index(tcond.column_unique_id);
|
||||
if (index < 0) {
|
||||
LOG(WARNING) << "fail to get field index, field name=" << tcond.column_name;
|
||||
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
|
||||
}
|
||||
|
||||
// Skip column which is non-key, or whose type is string or float
|
||||
const TabletColumn& column = _schema->column(index);
|
||||
if (column.type() == OLAP_FIELD_TYPE_DOUBLE || column.type() == OLAP_FIELD_TYPE_FLOAT) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
CondColumn* cond_col = nullptr;
|
||||
auto it = _columns.find(column.unique_id());
|
||||
if (it == _columns.end()) {
|
||||
cond_col = new CondColumn(*_schema, index);
|
||||
_columns[column.unique_id()] = cond_col;
|
||||
} else {
|
||||
cond_col = it->second;
|
||||
}
|
||||
|
||||
return cond_col->add_cond(tcond, column);
|
||||
}
|
||||
|
||||
CondColumn* Conditions::get_column(int32_t uid) const {
|
||||
auto iter = _columns.find(uid);
|
||||
if (iter != _columns.end()) {
|
||||
return iter->second;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
@ -1,184 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <unordered_set>
|
||||
#include <vector>
|
||||
|
||||
#include "gen_cpp/PaloInternalService_types.h"
|
||||
#include "gen_cpp/column_data_file.pb.h"
|
||||
#include "olap/bloom_filter.hpp"
|
||||
#include "olap/field.h"
|
||||
#include "olap/row_cursor.h"
|
||||
#include "olap/rowset/segment_v2/bloom_filter.h"
|
||||
#include "olap/stream_index_common.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
class WrapperField;
|
||||
struct RowCursorCell;
|
||||
|
||||
enum CondOp {
|
||||
OP_NULL = -1, // invalid op
|
||||
OP_EQ = 0, // equal
|
||||
OP_NE = 1, // not equal
|
||||
OP_LT = 2, // less than
|
||||
OP_LE = 3, // less or equal
|
||||
OP_GT = 4, // greater than
|
||||
OP_GE = 5, // greater or equal
|
||||
OP_IN = 6, // in
|
||||
OP_IS = 7, // is null or not null
|
||||
OP_NOT_IN = 8 // not in
|
||||
};
|
||||
|
||||
// Hash functor for IN set
|
||||
struct FieldHash {
|
||||
size_t operator()(const WrapperField* field) const { return field->hash_code(); }
|
||||
};
|
||||
|
||||
// Equal function for IN set
|
||||
struct FieldEqual {
|
||||
bool operator()(const WrapperField* left, const WrapperField* right) const {
|
||||
return left->cmp(right) == 0;
|
||||
}
|
||||
};
|
||||
|
||||
// 条件二元组,描述了一个条件的操作类型和操作数(1个或者多个)
|
||||
struct Cond {
|
||||
public:
|
||||
Cond() = default;
|
||||
~Cond();
|
||||
|
||||
Status init(const TCondition& tcond, const TabletColumn& column);
|
||||
|
||||
// 用一行数据的指定列同条件进行比较,如果符合过滤条件,
|
||||
// 即按照此条件,行应被过滤掉,则返回true,否则返回false
|
||||
bool eval(const KeyRange& statistic) const;
|
||||
|
||||
// 通过单列上的单个删除条件对version进行过滤
|
||||
int del_eval(const KeyRange& stat) const;
|
||||
|
||||
// 通过单列上BloomFilter对block进行过滤
|
||||
bool eval(const BloomFilter& bf) const;
|
||||
bool eval(const segment_v2::BloomFilter* bf) const;
|
||||
|
||||
bool can_do_bloom_filter() const { return op == OP_EQ || op == OP_IN || op == OP_IS; }
|
||||
|
||||
CondOp op = OP_NULL;
|
||||
// valid when op is not OP_IN and OP_NOT_IN
|
||||
WrapperField* operand_field = nullptr;
|
||||
// valid when op is OP_IN or OP_NOT_IN
|
||||
typedef std::unordered_set<const WrapperField*, FieldHash, FieldEqual> FieldSet;
|
||||
FieldSet operand_set;
|
||||
// valid when op is OP_IN or OP_NOT_IN, represents the minimum or maximum value of in elements
|
||||
WrapperField* min_value_field = nullptr;
|
||||
WrapperField* max_value_field = nullptr;
|
||||
};
|
||||
|
||||
// 所有归属于同一列上的条件二元组,聚合在一个CondColumn上
|
||||
class CondColumn {
|
||||
public:
|
||||
CondColumn(const TabletSchema& tablet_schema, int32_t index) : _col_index(index) {
|
||||
_is_key = tablet_schema.column(_col_index).is_key();
|
||||
}
|
||||
~CondColumn();
|
||||
|
||||
Status add_cond(const TCondition& tcond, const TabletColumn& column);
|
||||
|
||||
// Return true if the rowset should be pruned
|
||||
bool eval(const std::pair<WrapperField*, WrapperField*>& statistic) const;
|
||||
|
||||
// Whether the rowset satisfied delete condition
|
||||
int del_eval(const std::pair<WrapperField*, WrapperField*>& statistic) const;
|
||||
|
||||
// 通过一列上的所有BloomFilter索引信息对block进行过滤
|
||||
// Return true if the block should be filtered out
|
||||
bool eval(const BloomFilter& bf) const;
|
||||
|
||||
// Return true if the block should be filtered out
|
||||
bool eval(const segment_v2::BloomFilter* bf) const;
|
||||
|
||||
bool can_do_bloom_filter() const {
|
||||
for (auto& cond : _conds) {
|
||||
if (cond->can_do_bloom_filter()) {
|
||||
// if any cond can do bloom filter
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool is_key() const { return _is_key; }
|
||||
|
||||
const std::vector<Cond*>& conds() const { return _conds; }
|
||||
|
||||
private:
|
||||
friend class Conditions;
|
||||
|
||||
bool _is_key = false;
|
||||
int32_t _col_index = 0;
|
||||
// Conds in _conds are in 'AND' relationship
|
||||
std::vector<Cond*> _conds;
|
||||
};
|
||||
|
||||
// 一次请求所关联的条件
|
||||
class Conditions {
|
||||
public:
|
||||
// Key: field index of condition's column
|
||||
// Value: CondColumn object
|
||||
// col_unique_id --> CondColumn
|
||||
typedef std::map<int32_t, CondColumn*> CondColumns;
|
||||
|
||||
Conditions(TabletSchemaSPtr schema) : _schema(schema) {}
|
||||
~Conditions() { finalize(); }
|
||||
|
||||
void finalize() {
|
||||
for (auto& it : _columns) {
|
||||
delete it.second;
|
||||
}
|
||||
_columns.clear();
|
||||
}
|
||||
bool empty() const { return _columns.empty(); }
|
||||
|
||||
// 如果成功,则_columns中增加一项,如果失败则无视此condition,同时输出日志
|
||||
// 对于下列情况,将不会被处理
|
||||
// 1. column不属于key列
|
||||
// 2. column类型是double, float
|
||||
Status append_condition(const TCondition& condition);
|
||||
|
||||
const CondColumns& columns() const { return _columns; }
|
||||
|
||||
CondColumn* get_column(int32_t col_unique_id) const;
|
||||
|
||||
private:
|
||||
bool _cond_column_is_key_or_duplicate(const CondColumn* cc) const {
|
||||
return cc->is_key() || _schema->keys_type() == KeysType::DUP_KEYS;
|
||||
}
|
||||
|
||||
private:
|
||||
TabletSchemaSPtr _schema = nullptr;
|
||||
// CondColumns in _index_conds are in 'AND' relationship
|
||||
CondColumns _columns; // list of condition column
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(Conditions);
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
@ -23,8 +23,8 @@
|
||||
#include "olap/comparison_predicate.h"
|
||||
#include "olap/in_list_predicate.h"
|
||||
#include "olap/null_predicate.h"
|
||||
#include "olap/olap_cond.h"
|
||||
#include "olap/tablet_schema.h"
|
||||
#include "runtime/type_limit.h"
|
||||
#include "util/date_func.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
@ -46,11 +46,7 @@ public:
|
||||
ColumnPredicate* create(const TabletColumn& column, int index, const ConditionType& conditions,
|
||||
bool opposite, MemPool* pool) override {
|
||||
if constexpr (PredicateTypeTraits::is_list(PT)) {
|
||||
phmap::flat_hash_set<CppType> values;
|
||||
for (const auto& condition : conditions) {
|
||||
values.insert(convert(condition));
|
||||
}
|
||||
return new InListPredicateBase<Type, PT>(index, std::move(values), opposite);
|
||||
return new InListPredicateBase<Type, PT>(index, conditions, convert, opposite);
|
||||
} else {
|
||||
static_assert(PredicateTypeTraits::is_comparison(PT));
|
||||
return new ComparisonPredicateBase<Type, PT>(index, convert(conditions), opposite);
|
||||
@ -58,7 +54,7 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
CppType convert(const std::string& condition) {
|
||||
static CppType convert(const std::string& condition) {
|
||||
CppType value = 0;
|
||||
std::from_chars(condition.data(), condition.data() + condition.size(), value);
|
||||
return value;
|
||||
@ -72,11 +68,7 @@ public:
|
||||
ColumnPredicate* create(const TabletColumn& column, int index, const ConditionType& conditions,
|
||||
bool opposite, MemPool* pool) override {
|
||||
if constexpr (PredicateTypeTraits::is_list(PT)) {
|
||||
phmap::flat_hash_set<CppType> values;
|
||||
for (const auto& condition : conditions) {
|
||||
values.insert(convert(column, condition));
|
||||
}
|
||||
return new InListPredicateBase<Type, PT>(index, std::move(values), opposite);
|
||||
return new InListPredicateBase<Type, PT>(index, conditions, convert, opposite, &column);
|
||||
} else {
|
||||
static_assert(PredicateTypeTraits::is_comparison(PT));
|
||||
return new ComparisonPredicateBase<Type, PT>(index, convert(column, conditions),
|
||||
@ -85,7 +77,7 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
CppType convert(const TabletColumn& column, const std::string& condition) {
|
||||
static CppType convert(const TabletColumn& column, const std::string& condition) {
|
||||
StringParser::ParseResult result = StringParser::ParseResult::PARSE_SUCCESS;
|
||||
// return CppType value cast from int128_t
|
||||
return StringParser::string_to_decimal<int128_t>(
|
||||
@ -96,16 +88,11 @@ private:
|
||||
template <PrimitiveType Type, PredicateType PT, typename ConditionType>
|
||||
class StringPredicateCreator : public PredicateCreator<ConditionType> {
|
||||
public:
|
||||
StringPredicateCreator(bool should_padding) : _should_padding(should_padding) {};
|
||||
|
||||
ColumnPredicate* create(const TabletColumn& column, int index, const ConditionType& conditions,
|
||||
bool opposite, MemPool* pool) override {
|
||||
if constexpr (PredicateTypeTraits::is_list(PT)) {
|
||||
phmap::flat_hash_set<StringValue> values;
|
||||
for (const auto& condition : conditions) {
|
||||
values.insert(convert(column, condition, pool));
|
||||
}
|
||||
return new InListPredicateBase<Type, PT>(index, std::move(values), opposite);
|
||||
return new InListPredicateBase<Type, PT>(index, conditions, convert, opposite, &column,
|
||||
pool);
|
||||
} else {
|
||||
static_assert(PredicateTypeTraits::is_comparison(PT));
|
||||
return new ComparisonPredicateBase<Type, PT>(index, convert(column, conditions, pool),
|
||||
@ -114,10 +101,10 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
bool _should_padding;
|
||||
StringValue convert(const TabletColumn& column, const std::string& condition, MemPool* pool) {
|
||||
static StringValue convert(const TabletColumn& column, const std::string& condition,
|
||||
MemPool* pool) {
|
||||
size_t length = condition.length();
|
||||
if (_should_padding) {
|
||||
if constexpr (Type == TYPE_CHAR) {
|
||||
length = std::max(static_cast<size_t>(column.length()), length);
|
||||
}
|
||||
|
||||
@ -139,11 +126,7 @@ public:
|
||||
ColumnPredicate* create(const TabletColumn& column, int index, const ConditionType& conditions,
|
||||
bool opposite, MemPool* pool) override {
|
||||
if constexpr (PredicateTypeTraits::is_list(PT)) {
|
||||
phmap::flat_hash_set<CppType> values;
|
||||
for (const auto& condition : conditions) {
|
||||
values.insert(_convert(condition));
|
||||
}
|
||||
return new InListPredicateBase<Type, PT>(index, std::move(values), opposite);
|
||||
return new InListPredicateBase<Type, PT>(index, conditions, _convert, opposite);
|
||||
} else {
|
||||
static_assert(PredicateTypeTraits::is_comparison(PT));
|
||||
return new ComparisonPredicateBase<Type, PT>(index, _convert(conditions), opposite);
|
||||
@ -190,11 +173,11 @@ inline std::unique_ptr<PredicateCreator<ConditionType>> get_creator(const FieldT
|
||||
return std::make_unique<DecimalPredicateCreator<TYPE_DECIMAL128, PT, ConditionType>>();
|
||||
}
|
||||
case OLAP_FIELD_TYPE_CHAR: {
|
||||
return std::make_unique<StringPredicateCreator<TYPE_CHAR, PT, ConditionType>>(true);
|
||||
return std::make_unique<StringPredicateCreator<TYPE_CHAR, PT, ConditionType>>();
|
||||
}
|
||||
case OLAP_FIELD_TYPE_VARCHAR:
|
||||
case OLAP_FIELD_TYPE_STRING: {
|
||||
return std::make_unique<StringPredicateCreator<TYPE_STRING, PT, ConditionType>>(false);
|
||||
return std::make_unique<StringPredicateCreator<TYPE_STRING, PT, ConditionType>>();
|
||||
}
|
||||
case OLAP_FIELD_TYPE_DATE: {
|
||||
return std::make_unique<CustomPredicateCreator<TYPE_DATE, PT, ConditionType>>(
|
||||
|
||||
@ -82,6 +82,7 @@ std::string TabletReader::KeysParam::to_string() const {
|
||||
|
||||
TabletReader::~TabletReader() {
|
||||
VLOG_NOTICE << "merged rows:" << _merged_rows;
|
||||
_delete_handler.finalize();
|
||||
|
||||
for (auto pred : _col_predicates) {
|
||||
delete pred;
|
||||
@ -203,10 +204,6 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
|
||||
_reader_context.return_columns = &_return_columns;
|
||||
_reader_context.read_orderby_key_columns =
|
||||
_orderby_key_columns.size() > 0 ? &_orderby_key_columns : nullptr;
|
||||
_reader_context.load_bf_columns = &_load_bf_columns;
|
||||
_reader_context.load_bf_all_columns = &_load_bf_all_columns;
|
||||
_reader_context.conditions = _conditions.get();
|
||||
_reader_context.all_conditions = _all_conditions.get();
|
||||
_reader_context.predicates = &_col_predicates;
|
||||
_reader_context.value_predicates = &_value_col_predicates;
|
||||
_reader_context.lower_bound_keys = &_keys_param.start_keys;
|
||||
@ -241,7 +238,6 @@ Status TabletReader::_init_params(const ReaderParams& read_params) {
|
||||
_tablet_schema = read_params.tablet_schema;
|
||||
|
||||
_init_conditions_param(read_params);
|
||||
_init_load_bf_columns(read_params);
|
||||
|
||||
Status res = _init_delete_condition(read_params);
|
||||
if (!res.ok()) {
|
||||
@ -438,8 +434,6 @@ Status TabletReader::_init_orderby_keys_param(const ReaderParams& read_params) {
|
||||
}
|
||||
|
||||
void TabletReader::_init_conditions_param(const ReaderParams& read_params) {
|
||||
_conditions = std::make_unique<Conditions>(_tablet_schema);
|
||||
_all_conditions = std::make_unique<Conditions>(_tablet_schema);
|
||||
for (auto& condition : read_params.conditions) {
|
||||
// These conditions is passed from OlapScannode, but not set column unique id here, so that set it here because it
|
||||
// is too complicated to modify related interface
|
||||
@ -454,11 +448,7 @@ void TabletReader::_init_conditions_param(const ReaderParams& read_params) {
|
||||
_value_col_predicates.push_back(predicate);
|
||||
} else {
|
||||
_col_predicates.push_back(predicate);
|
||||
Status status = _conditions->append_condition(tmp_cond);
|
||||
DCHECK_EQ(Status::OK(), status);
|
||||
}
|
||||
Status status = _all_conditions->append_condition(tmp_cond);
|
||||
DCHECK_EQ(Status::OK(), status);
|
||||
}
|
||||
}
|
||||
|
||||
@ -495,66 +485,6 @@ ColumnPredicate* TabletReader::_parse_to_predicate(const FunctionFilter& functio
|
||||
function_filter._string_param);
|
||||
}
|
||||
|
||||
void TabletReader::_init_load_bf_columns(const ReaderParams& read_params) {
|
||||
_init_load_bf_columns(read_params, _conditions.get(), &_load_bf_columns);
|
||||
_init_load_bf_columns(read_params, _all_conditions.get(), &_load_bf_all_columns);
|
||||
}
|
||||
|
||||
void TabletReader::_init_load_bf_columns(const ReaderParams& read_params, Conditions* conditions,
|
||||
std::set<uint32_t>* load_bf_columns) {
|
||||
// add all columns with condition to load_bf_columns
|
||||
for (const auto& cond_column : conditions->columns()) {
|
||||
int32_t column_id = _tablet_schema->field_index(cond_column.first);
|
||||
if (!_tablet_schema->column(column_id).is_bf_column()) {
|
||||
continue;
|
||||
}
|
||||
for (const auto& cond : cond_column.second->conds()) {
|
||||
if (cond->op == OP_EQ ||
|
||||
(cond->op == OP_IN && cond->operand_set.size() < MAX_OP_IN_FIELD_NUM)) {
|
||||
load_bf_columns->insert(column_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove columns which have same value between start_key and end_key
|
||||
int min_scan_key_len = _tablet_schema->num_columns();
|
||||
for (const auto& start_key : read_params.start_key) {
|
||||
min_scan_key_len = std::min(min_scan_key_len, static_cast<int>(start_key.size()));
|
||||
}
|
||||
for (const auto& end_key : read_params.end_key) {
|
||||
min_scan_key_len = std::min(min_scan_key_len, static_cast<int>(end_key.size()));
|
||||
}
|
||||
|
||||
int max_equal_index = -1;
|
||||
for (int i = 0; i < read_params.start_key.size(); ++i) {
|
||||
int j = 0;
|
||||
for (; j < min_scan_key_len; ++j) {
|
||||
if (read_params.start_key[i].get_value(j) != read_params.end_key[i].get_value(j)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (max_equal_index < j - 1) {
|
||||
max_equal_index = j - 1;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < max_equal_index; ++i) {
|
||||
load_bf_columns->erase(i);
|
||||
}
|
||||
|
||||
// remove the max_equal_index column when it's not varchar
|
||||
// or longer than number of short key fields
|
||||
if (max_equal_index == -1) {
|
||||
return;
|
||||
}
|
||||
FieldType type = _tablet_schema->column(max_equal_index).type();
|
||||
if ((type != OLAP_FIELD_TYPE_VARCHAR && type != OLAP_FIELD_TYPE_STRING) ||
|
||||
max_equal_index + 1 > _tablet->num_short_key_columns()) {
|
||||
load_bf_columns->erase(max_equal_index);
|
||||
}
|
||||
}
|
||||
|
||||
Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
|
||||
if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
|
||||
return Status::OK();
|
||||
|
||||
@ -22,7 +22,6 @@
|
||||
#include "exprs/bloomfilter_predicate.h"
|
||||
#include "exprs/function_filter.h"
|
||||
#include "olap/delete_handler.h"
|
||||
#include "olap/olap_cond.h"
|
||||
#include "olap/row_cursor.h"
|
||||
#include "olap/rowset/rowset_reader.h"
|
||||
#include "olap/tablet.h"
|
||||
@ -174,16 +173,10 @@ protected:
|
||||
|
||||
Status _init_return_columns(const ReaderParams& read_params);
|
||||
|
||||
void _init_load_bf_columns(const ReaderParams& read_params);
|
||||
void _init_load_bf_columns(const ReaderParams& read_params, Conditions* conditions,
|
||||
std::set<uint32_t>* load_bf_columns);
|
||||
|
||||
TabletSharedPtr tablet() { return _tablet; }
|
||||
const TabletSchema& tablet_schema() { return *_tablet_schema; }
|
||||
|
||||
std::unique_ptr<MemPool> _predicate_mem_pool;
|
||||
std::set<uint32_t> _load_bf_columns;
|
||||
std::set<uint32_t> _load_bf_all_columns;
|
||||
std::vector<uint32_t> _return_columns;
|
||||
// used for special optimization for query : ORDER BY key [ASC|DESC] LIMIT n
|
||||
// columns for orderby keys
|
||||
@ -198,11 +191,6 @@ protected:
|
||||
KeysParam _keys_param;
|
||||
std::vector<bool> _is_lower_keys_included;
|
||||
std::vector<bool> _is_upper_keys_included;
|
||||
// contains condition on key columns in agg or unique table or all column in dup tables
|
||||
std::unique_ptr<Conditions> _conditions;
|
||||
// contains _conditions and condition on value columns, used for push down
|
||||
// conditions to base rowset of unique table
|
||||
std::unique_ptr<Conditions> _all_conditions;
|
||||
std::vector<ColumnPredicate*> _col_predicates;
|
||||
std::vector<ColumnPredicate*> _value_col_predicates;
|
||||
DeleteHandler _delete_handler;
|
||||
|
||||
@ -50,7 +50,6 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
|
||||
// convert RowsetReaderContext to StorageReadOptions
|
||||
StorageReadOptions read_options;
|
||||
read_options.stats = _stats;
|
||||
read_options.conditions = read_context->conditions;
|
||||
if (read_context->lower_bound_keys != nullptr) {
|
||||
for (int i = 0; i < read_context->lower_bound_keys->size(); ++i) {
|
||||
read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i),
|
||||
@ -65,10 +64,10 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
|
||||
// or predicates when it is not inited.
|
||||
if (read_context->delete_handler != nullptr) {
|
||||
read_context->delete_handler->get_delete_conditions_after_version(
|
||||
_rowset->end_version(), &read_options.delete_conditions,
|
||||
read_options.delete_condition_predicates.get());
|
||||
_rowset->end_version(), read_options.delete_condition_predicates.get(),
|
||||
&read_options.col_id_to_del_predicates);
|
||||
// if del cond is not empty, schema may be different in multiple rowset
|
||||
can_reuse_schema = read_options.delete_conditions.empty();
|
||||
can_reuse_schema = read_options.col_id_to_del_predicates.empty();
|
||||
}
|
||||
|
||||
if (!can_reuse_schema || _context->reuse_input_schema == nullptr) {
|
||||
@ -103,6 +102,15 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
|
||||
read_options.column_predicates.insert(read_options.column_predicates.end(),
|
||||
read_context->predicates->begin(),
|
||||
read_context->predicates->end());
|
||||
for (auto pred : *(read_context->predicates)) {
|
||||
if (read_options.col_id_to_predicates.count(pred->column_id()) < 1) {
|
||||
read_options.col_id_to_predicates.insert(
|
||||
{pred->column_id(), std::make_shared<AndBlockColumnPredicate>()});
|
||||
}
|
||||
auto single_column_block_predicate = new SingleColumnBlockPredicate(pred);
|
||||
read_options.col_id_to_predicates[pred->column_id()]->add_column_predicate(
|
||||
single_column_block_predicate);
|
||||
}
|
||||
}
|
||||
// Take a delete-bitmap for each segment, the bitmap contains all deletes
|
||||
// until the max read version, which is read_context->version.second
|
||||
@ -123,9 +131,15 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
|
||||
read_options.column_predicates.insert(read_options.column_predicates.end(),
|
||||
read_context->value_predicates->begin(),
|
||||
read_context->value_predicates->end());
|
||||
}
|
||||
if (read_context->all_conditions != nullptr && !read_context->all_conditions->empty()) {
|
||||
read_options.conditions = read_context->all_conditions;
|
||||
for (auto pred : *(read_context->value_predicates)) {
|
||||
if (read_options.col_id_to_predicates.count(pred->column_id()) < 1) {
|
||||
read_options.col_id_to_predicates.insert(
|
||||
{pred->column_id(), std::make_shared<AndBlockColumnPredicate>()});
|
||||
}
|
||||
auto single_column_block_predicate = new SingleColumnBlockPredicate(pred);
|
||||
read_options.col_id_to_predicates[pred->column_id()]->add_column_predicate(
|
||||
single_column_block_predicate);
|
||||
}
|
||||
}
|
||||
}
|
||||
read_options.use_page_cache = read_context->use_page_cache;
|
||||
|
||||
@ -25,7 +25,6 @@
|
||||
namespace doris {
|
||||
|
||||
class RowCursor;
|
||||
class Conditions;
|
||||
class DeleteBitmap;
|
||||
class DeleteHandler;
|
||||
class TabletSchema;
|
||||
@ -42,14 +41,6 @@ struct RowsetReaderContext {
|
||||
std::vector<uint32_t>* read_orderby_key_columns = nullptr;
|
||||
// projection columns: the set of columns rowset reader should return
|
||||
const std::vector<uint32_t>* return_columns = nullptr;
|
||||
// columns to load bloom filter index
|
||||
// including columns in "=" or "in" conditions
|
||||
const std::set<uint32_t>* load_bf_columns = nullptr;
|
||||
const std::set<uint32_t>* load_bf_all_columns = nullptr;
|
||||
// column filter conditions by delete sql
|
||||
const Conditions* conditions = nullptr;
|
||||
// value column predicate in UNIQUE table
|
||||
const Conditions* all_conditions = nullptr;
|
||||
// column name -> column predicate
|
||||
// adding column_name for predicate to make use of column selectivity
|
||||
const std::vector<ColumnPredicate*>* predicates = nullptr;
|
||||
|
||||
@ -26,6 +26,7 @@
|
||||
#include "olap/rowset/segment_v2/page_io.h"
|
||||
#include "olap/rowset/segment_v2/page_pointer.h" // for PagePointer
|
||||
#include "olap/types.h" // for TypeInfo
|
||||
#include "olap/wrapper_field.h"
|
||||
#include "util/block_compression.h"
|
||||
#include "util/rle_encoding.h" // for RleDecoder
|
||||
#include "vec/columns/column.h"
|
||||
@ -159,19 +160,19 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
|
||||
return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
|
||||
}
|
||||
|
||||
Status ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column,
|
||||
CondColumn* delete_condition,
|
||||
RowRanges* row_ranges) {
|
||||
Status ColumnReader::get_row_ranges_by_zone_map(
|
||||
const AndBlockColumnPredicate* col_predicates,
|
||||
std::vector<const ColumnPredicate*>* delete_predicates, RowRanges* row_ranges) {
|
||||
RETURN_IF_ERROR(_ensure_index_loaded());
|
||||
|
||||
std::vector<uint32_t> page_indexes;
|
||||
RETURN_IF_ERROR(_get_filtered_pages(cond_column, delete_condition, &page_indexes));
|
||||
RETURN_IF_ERROR(_get_filtered_pages(col_predicates, delete_predicates, &page_indexes));
|
||||
RETURN_IF_ERROR(_calculate_row_ranges(page_indexes, row_ranges));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool ColumnReader::match_condition(CondColumn* cond) const {
|
||||
if (_zone_map_index_meta == nullptr || cond == nullptr) {
|
||||
bool ColumnReader::match_condition(const AndBlockColumnPredicate* col_predicates) const {
|
||||
if (_zone_map_index_meta == nullptr) {
|
||||
return true;
|
||||
}
|
||||
FieldType type = _type_info->type();
|
||||
@ -180,7 +181,7 @@ bool ColumnReader::match_condition(CondColumn* cond) const {
|
||||
_parse_zone_map(_zone_map_index_meta->segment_zone_map(), min_value.get(), max_value.get());
|
||||
|
||||
return _zone_map_match_condition(_zone_map_index_meta->segment_zone_map(), min_value.get(),
|
||||
max_value.get(), cond);
|
||||
max_value.get(), col_predicates);
|
||||
}
|
||||
|
||||
void ColumnReader::_parse_zone_map(const ZoneMapPB& zone_map, WrapperField* min_value_container,
|
||||
@ -205,19 +206,20 @@ void ColumnReader::_parse_zone_map(const ZoneMapPB& zone_map, WrapperField* min_
|
||||
bool ColumnReader::_zone_map_match_condition(const ZoneMapPB& zone_map,
|
||||
WrapperField* min_value_container,
|
||||
WrapperField* max_value_container,
|
||||
CondColumn* cond) const {
|
||||
const AndBlockColumnPredicate* col_predicates) const {
|
||||
if (!zone_map.has_not_null() && !zone_map.has_null()) {
|
||||
return false; // no data in this zone
|
||||
}
|
||||
|
||||
if (cond == nullptr || zone_map.pass_all()) {
|
||||
if (zone_map.pass_all() || min_value_container == nullptr || max_value_container == nullptr) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return cond->eval({min_value_container, max_value_container});
|
||||
return col_predicates->evaluate_and({min_value_container, max_value_container});
|
||||
}
|
||||
|
||||
Status ColumnReader::_get_filtered_pages(CondColumn* cond_column, CondColumn* delete_condition,
|
||||
Status ColumnReader::_get_filtered_pages(const AndBlockColumnPredicate* col_predicates,
|
||||
std::vector<const ColumnPredicate*>* delete_predicates,
|
||||
std::vector<uint32_t>* page_indexes) {
|
||||
FieldType type = _type_info->type();
|
||||
const std::vector<ZoneMapPB>& zone_maps = _zone_map_index->page_zone_maps();
|
||||
@ -230,12 +232,15 @@ Status ColumnReader::_get_filtered_pages(CondColumn* cond_column, CondColumn* de
|
||||
} else {
|
||||
_parse_zone_map(zone_maps[i], min_value.get(), max_value.get());
|
||||
if (_zone_map_match_condition(zone_maps[i], min_value.get(), max_value.get(),
|
||||
cond_column)) {
|
||||
col_predicates)) {
|
||||
bool should_read = true;
|
||||
if (delete_condition != nullptr) {
|
||||
int state = delete_condition->del_eval({min_value.get(), max_value.get()});
|
||||
if (state == DEL_SATISFIED) {
|
||||
should_read = false;
|
||||
if (delete_predicates != nullptr) {
|
||||
for (auto del_pred : *delete_predicates) {
|
||||
if (min_value.get() == nullptr || max_value.get() == nullptr ||
|
||||
del_pred->evaluate_and({min_value.get(), max_value.get()})) {
|
||||
should_read = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (should_read) {
|
||||
@ -261,7 +266,7 @@ Status ColumnReader::_calculate_row_ranges(const std::vector<uint32_t>& page_ind
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ColumnReader::get_row_ranges_by_bloom_filter(CondColumn* cond_column,
|
||||
Status ColumnReader::get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
|
||||
RowRanges* row_ranges) {
|
||||
RETURN_IF_ERROR(_ensure_index_loaded());
|
||||
RowRanges bf_row_ranges;
|
||||
@ -284,7 +289,7 @@ Status ColumnReader::get_row_ranges_by_bloom_filter(CondColumn* cond_column,
|
||||
for (auto& pid : page_ids) {
|
||||
std::unique_ptr<BloomFilter> bf;
|
||||
RETURN_IF_ERROR(bf_iter->read_bloom_filter(pid, &bf));
|
||||
if (cond_column->eval(bf.get())) {
|
||||
if (col_predicates->evaluate_and(bf.get())) {
|
||||
bf_row_ranges.add(RowRange(_ordinal_index->get_first_ordinal(pid),
|
||||
_ordinal_index->get_last_ordinal(pid) + 1));
|
||||
}
|
||||
@ -846,21 +851,20 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter)
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FileColumnIterator::get_row_ranges_by_zone_map(CondColumn* cond_column,
|
||||
CondColumn* delete_condition,
|
||||
RowRanges* row_ranges) {
|
||||
Status FileColumnIterator::get_row_ranges_by_zone_map(
|
||||
const AndBlockColumnPredicate* col_predicates,
|
||||
std::vector<const ColumnPredicate*>* delete_predicates, RowRanges* row_ranges) {
|
||||
if (_reader->has_zone_map()) {
|
||||
RETURN_IF_ERROR(
|
||||
_reader->get_row_ranges_by_zone_map(cond_column, delete_condition, row_ranges));
|
||||
_reader->get_row_ranges_by_zone_map(col_predicates, delete_predicates, row_ranges));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status FileColumnIterator::get_row_ranges_by_bloom_filter(CondColumn* cond_column,
|
||||
RowRanges* row_ranges) {
|
||||
if (cond_column != nullptr && cond_column->can_do_bloom_filter() &&
|
||||
_reader->has_bloom_filter_index()) {
|
||||
RETURN_IF_ERROR(_reader->get_row_ranges_by_bloom_filter(cond_column, row_ranges));
|
||||
Status FileColumnIterator::get_row_ranges_by_bloom_filter(
|
||||
const AndBlockColumnPredicate* col_predicates, RowRanges* row_ranges) {
|
||||
if (col_predicates->can_do_bloom_filter() && _reader->has_bloom_filter_index()) {
|
||||
RETURN_IF_ERROR(_reader->get_row_ranges_by_bloom_filter(col_predicates, row_ranges));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -21,11 +21,13 @@
|
||||
#include <cstdint> // for uint32_t
|
||||
#include <memory> // for unique_ptr
|
||||
|
||||
#include "bloom_filter_index_reader.h"
|
||||
#include "common/logging.h"
|
||||
#include "common/status.h" // for Status
|
||||
#include "gen_cpp/segment_v2.pb.h" // for ColumnMetaPB
|
||||
#include "io/fs/file_reader.h"
|
||||
#include "olap/olap_cond.h" // for CondColumn
|
||||
#include "olap/block_column_predicate.h"
|
||||
#include "olap/column_predicate.h"
|
||||
#include "olap/rowset/segment_v2/bitmap_index_reader.h" // for BitmapIndexReader
|
||||
#include "olap/rowset/segment_v2/common.h"
|
||||
#include "olap/rowset/segment_v2/ordinal_page_index.h" // for OrdinalPageIndexIterator
|
||||
@ -120,16 +122,18 @@ public:
|
||||
// Check if this column could match `cond' using segment zone map.
|
||||
// Since segment zone map is stored in metadata, this function is fast without I/O.
|
||||
// Return true if segment zone map is absent or `cond' could be satisfied, false otherwise.
|
||||
bool match_condition(CondColumn* cond) const;
|
||||
bool match_condition(const AndBlockColumnPredicate* col_predicates) const;
|
||||
|
||||
// get row ranges with zone map
|
||||
// - cond_column is user's query predicate
|
||||
// - delete_condition is a delete predicate of one version
|
||||
Status get_row_ranges_by_zone_map(CondColumn* cond_column, CondColumn* delete_condition,
|
||||
Status get_row_ranges_by_zone_map(const AndBlockColumnPredicate* col_predicates,
|
||||
std::vector<const ColumnPredicate*>* delete_predicates,
|
||||
RowRanges* row_ranges);
|
||||
|
||||
// get row ranges with bloom filter index
|
||||
Status get_row_ranges_by_bloom_filter(CondColumn* cond_column, RowRanges* row_ranges);
|
||||
Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
|
||||
RowRanges* row_ranges);
|
||||
|
||||
PagePointer get_dict_page_pointer() const { return _meta.dict_page(); }
|
||||
|
||||
@ -169,12 +173,14 @@ private:
|
||||
Status _load_bloom_filter_index(bool use_page_cache, bool kept_in_memory);
|
||||
|
||||
bool _zone_map_match_condition(const ZoneMapPB& zone_map, WrapperField* min_value_container,
|
||||
WrapperField* max_value_container, CondColumn* cond) const;
|
||||
WrapperField* max_value_container,
|
||||
const AndBlockColumnPredicate* col_predicates) const;
|
||||
|
||||
void _parse_zone_map(const ZoneMapPB& zone_map, WrapperField* min_value_container,
|
||||
WrapperField* max_value_container) const;
|
||||
|
||||
Status _get_filtered_pages(CondColumn* cond_column, CondColumn* delete_conditions,
|
||||
Status _get_filtered_pages(const AndBlockColumnPredicate* col_predicates,
|
||||
std::vector<const ColumnPredicate*>* delete_predicates,
|
||||
std::vector<uint32_t>* page_indexes);
|
||||
|
||||
Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges);
|
||||
@ -256,12 +262,14 @@ public:
|
||||
|
||||
virtual ordinal_t get_current_ordinal() const = 0;
|
||||
|
||||
virtual Status get_row_ranges_by_zone_map(CondColumn* cond_column, CondColumn* delete_condition,
|
||||
RowRanges* row_ranges) {
|
||||
virtual Status get_row_ranges_by_zone_map(
|
||||
const AndBlockColumnPredicate* col_predicates,
|
||||
std::vector<const ColumnPredicate*>* delete_predicates, RowRanges* row_ranges) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
virtual Status get_row_ranges_by_bloom_filter(CondColumn* cond_column, RowRanges* row_ranges) {
|
||||
virtual Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
|
||||
RowRanges* row_ranges) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -298,10 +306,12 @@ public:
|
||||
// get row ranges by zone map
|
||||
// - cond_column is user's query predicate
|
||||
// - delete_condition is delete predicate of one version
|
||||
Status get_row_ranges_by_zone_map(CondColumn* cond_column, CondColumn* delete_condition,
|
||||
Status get_row_ranges_by_zone_map(const AndBlockColumnPredicate* col_predicates,
|
||||
std::vector<const ColumnPredicate*>* delete_predicates,
|
||||
RowRanges* row_ranges) override;
|
||||
|
||||
Status get_row_ranges_by_bloom_filter(CondColumn* cond_column, RowRanges* row_ranges) override;
|
||||
Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
|
||||
RowRanges* row_ranges) override;
|
||||
|
||||
ParsedPage* get_current_page() { return &_page; }
|
||||
|
||||
|
||||
@ -79,19 +79,22 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
|
||||
std::unique_ptr<RowwiseIterator>* iter) {
|
||||
read_options.stats->total_segment_number++;
|
||||
// trying to prune the current segment by segment-level zone map
|
||||
if (read_options.conditions != nullptr) {
|
||||
for (auto& column_condition : read_options.conditions->columns()) {
|
||||
int32_t column_unique_id = column_condition.first;
|
||||
if (_column_readers.count(column_unique_id) < 1 ||
|
||||
!_column_readers.at(column_unique_id)->has_zone_map()) {
|
||||
continue;
|
||||
}
|
||||
if (!_column_readers.at(column_unique_id)->match_condition(column_condition.second)) {
|
||||
// any condition not satisfied, return.
|
||||
iter->reset(new EmptySegmentIterator(schema));
|
||||
read_options.stats->filtered_segment_number++;
|
||||
return Status::OK();
|
||||
}
|
||||
for (auto& entry : read_options.col_id_to_predicates) {
|
||||
int32_t column_id = entry.first;
|
||||
// schema change
|
||||
if (_tablet_schema->num_columns() <= column_id) {
|
||||
continue;
|
||||
}
|
||||
int32_t uid = read_options.tablet_schema->column(column_id).unique_id();
|
||||
if (_column_readers.count(uid) < 1 || !_column_readers.at(uid)->has_zone_map()) {
|
||||
continue;
|
||||
}
|
||||
if (read_options.col_id_to_predicates.count(column_id) > 0 &&
|
||||
!_column_readers.at(uid)->match_condition(entry.second.get())) {
|
||||
// any condition not satisfied, return.
|
||||
iter->reset(new EmptySegmentIterator(schema));
|
||||
read_options.stats->filtered_segment_number++;
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -288,7 +288,8 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
|
||||
RETURN_IF_ERROR(_apply_bitmap_index());
|
||||
|
||||
if (!_row_bitmap.isEmpty() &&
|
||||
(_opts.conditions != nullptr || !_opts.delete_conditions.empty())) {
|
||||
(!_opts.col_id_to_predicates.empty() ||
|
||||
_opts.delete_condition_predicates->num_of_column_predicate() > 0)) {
|
||||
RowRanges condition_row_ranges = RowRanges::create_single(_segment->num_rows());
|
||||
RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges));
|
||||
size_t pre_size = _row_bitmap.cardinality();
|
||||
@ -302,22 +303,20 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
|
||||
}
|
||||
|
||||
Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row_ranges) {
|
||||
std::set<int32_t> uids;
|
||||
if (_opts.conditions != nullptr) {
|
||||
for (auto& column_condition : _opts.conditions->columns()) {
|
||||
uids.insert(column_condition.first);
|
||||
}
|
||||
std::set<int32_t> cids;
|
||||
for (auto& entry : _opts.col_id_to_predicates) {
|
||||
cids.insert(entry.first);
|
||||
}
|
||||
|
||||
// first filter data by bloom filter index
|
||||
// bloom filter index only use CondColumn
|
||||
RowRanges bf_row_ranges = RowRanges::create_single(num_rows());
|
||||
for (auto& uid : uids) {
|
||||
for (auto& cid : cids) {
|
||||
// get row ranges by bf index of this column,
|
||||
RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows());
|
||||
CondColumn* column_cond = _opts.conditions->get_column(uid);
|
||||
RETURN_IF_ERROR(_column_iterators[uid]->get_row_ranges_by_bloom_filter(
|
||||
column_cond, &column_bf_row_ranges));
|
||||
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
|
||||
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->get_row_ranges_by_bloom_filter(
|
||||
_opts.col_id_to_predicates[cid].get(), &column_bf_row_ranges));
|
||||
RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges);
|
||||
}
|
||||
size_t pre_size = condition_row_ranges->count();
|
||||
@ -326,40 +325,21 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
|
||||
|
||||
RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows());
|
||||
// second filter data by zone map
|
||||
for (auto& uid : uids) {
|
||||
for (auto& cid : cids) {
|
||||
// get row ranges by zone map of this column,
|
||||
RowRanges column_row_ranges = RowRanges::create_single(num_rows());
|
||||
CondColumn* column_cond = nullptr;
|
||||
if (_opts.conditions != nullptr) {
|
||||
column_cond = _opts.conditions->get_column(uid);
|
||||
}
|
||||
RETURN_IF_ERROR(_column_iterators[uid]->get_row_ranges_by_zone_map(column_cond, nullptr,
|
||||
&column_row_ranges));
|
||||
DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
|
||||
RETURN_IF_ERROR(_column_iterators[_schema.unique_id(cid)]->get_row_ranges_by_zone_map(
|
||||
_opts.col_id_to_predicates[cid].get(),
|
||||
_opts.col_id_to_del_predicates.count(cid) > 0
|
||||
? &(_opts.col_id_to_del_predicates[cid])
|
||||
: nullptr,
|
||||
&column_row_ranges));
|
||||
// intersect different columns's row ranges to get final row ranges by zone map
|
||||
RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges,
|
||||
&zone_map_row_ranges);
|
||||
}
|
||||
|
||||
// final filter data with delete conditions
|
||||
for (auto& delete_condition : _opts.delete_conditions) {
|
||||
RowRanges delete_condition_row_ranges = RowRanges::create_single(0);
|
||||
for (auto& delete_column_condition : delete_condition->columns()) {
|
||||
const int32_t uid = delete_column_condition.first;
|
||||
CondColumn* column_cond = nullptr;
|
||||
if (_opts.conditions != nullptr) {
|
||||
column_cond = _opts.conditions->get_column(uid);
|
||||
}
|
||||
RowRanges single_delete_condition_row_ranges = RowRanges::create_single(num_rows());
|
||||
RETURN_IF_ERROR(_column_iterators[uid]->get_row_ranges_by_zone_map(
|
||||
column_cond, delete_column_condition.second,
|
||||
&single_delete_condition_row_ranges));
|
||||
RowRanges::ranges_union(delete_condition_row_ranges, single_delete_condition_row_ranges,
|
||||
&delete_condition_row_ranges);
|
||||
}
|
||||
RowRanges::ranges_intersection(zone_map_row_ranges, delete_condition_row_ranges,
|
||||
&zone_map_row_ranges);
|
||||
}
|
||||
|
||||
pre_size = condition_row_ranges->count();
|
||||
RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
|
||||
condition_row_ranges);
|
||||
|
||||
@ -25,7 +25,6 @@
|
||||
#include "io/fs/file_reader.h"
|
||||
#include "io/fs/file_system.h"
|
||||
#include "olap/olap_common.h"
|
||||
#include "olap/olap_cond.h"
|
||||
#include "olap/rowset/segment_v2/common.h"
|
||||
#include "olap/rowset/segment_v2/row_ranges.h"
|
||||
#include "olap/rowset/segment_v2/segment.h"
|
||||
|
||||
@ -970,15 +970,6 @@ TEST_F(TestDeleteHandler, InitSuccess) {
|
||||
// Get delete conditions which version <= 5
|
||||
res = _delete_handler.init(tablet, tablet->tablet_schema(), tablet->delete_predicates(), 5);
|
||||
EXPECT_EQ(Status::OK(), res);
|
||||
EXPECT_EQ(4, _delete_handler.conditions_num());
|
||||
std::vector<int64_t> conds_version = _delete_handler.get_conds_version();
|
||||
EXPECT_EQ(4, conds_version.size());
|
||||
sort(conds_version.begin(), conds_version.end());
|
||||
EXPECT_EQ(2, conds_version[0]);
|
||||
EXPECT_EQ(3, conds_version[1]);
|
||||
EXPECT_EQ(4, conds_version[2]);
|
||||
EXPECT_EQ(5, conds_version[3]);
|
||||
|
||||
_delete_handler.finalize();
|
||||
}
|
||||
|
||||
@ -1011,7 +1002,6 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) {
|
||||
// 指定版本号为10以载入Header中的所有过滤条件(在这个case中,只有过滤条件1)
|
||||
res = _delete_handler.init(tablet, tablet->tablet_schema(), tablet->delete_predicates(), 4);
|
||||
EXPECT_EQ(Status::OK(), res);
|
||||
EXPECT_EQ(1, _delete_handler.conditions_num());
|
||||
|
||||
// 构造一行测试数据
|
||||
std::vector<string> data_str;
|
||||
@ -1096,7 +1086,6 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
|
||||
// 指定版本号为4以载入meta中的所有过滤条件(在这个case中,只有过滤条件1)
|
||||
res = _delete_handler.init(tablet, tablet->tablet_schema(), tablet->delete_predicates(), 4);
|
||||
EXPECT_EQ(Status::OK(), res);
|
||||
EXPECT_EQ(3, _delete_handler.conditions_num());
|
||||
|
||||
std::vector<string> data_str;
|
||||
data_str.push_back("4");
|
||||
@ -1159,7 +1148,6 @@ TEST_F(TestDeleteHandler, FilterDataVersion) {
|
||||
// 指定版本号为4以载入meta中的所有过滤条件(过滤条件1,过滤条件2)
|
||||
res = _delete_handler.init(tablet, tablet->tablet_schema(), tablet->delete_predicates(), 4);
|
||||
EXPECT_EQ(Status::OK(), res);
|
||||
EXPECT_EQ(2, _delete_handler.conditions_num());
|
||||
|
||||
// 构造一行测试数据
|
||||
std::vector<string> data_str;
|
||||
|
||||
@ -552,177 +552,6 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
|
||||
}
|
||||
},
|
||||
&segment);
|
||||
|
||||
// reader with condition
|
||||
{
|
||||
Schema schema(tablet_schema);
|
||||
OlapReaderStatistics stats;
|
||||
// test empty segment iterator
|
||||
{
|
||||
// the first two page will be read by this condition
|
||||
TCondition condition;
|
||||
condition.__set_column_name("3");
|
||||
condition.__set_condition_op("<");
|
||||
std::vector<std::string> vals = {"2"};
|
||||
condition.__set_condition_values(vals);
|
||||
std::shared_ptr<Conditions> conditions(new Conditions(tablet_schema));
|
||||
condition.__set_column_unique_id(
|
||||
tablet_schema->column(condition.column_name).unique_id());
|
||||
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
|
||||
|
||||
StorageReadOptions read_opts;
|
||||
read_opts.stats = &stats;
|
||||
read_opts.tablet_schema = tablet_schema;
|
||||
read_opts.conditions = conditions.get();
|
||||
|
||||
std::unique_ptr<RowwiseIterator> iter;
|
||||
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
|
||||
|
||||
RowBlockV2 block(schema, 1);
|
||||
|
||||
EXPECT_TRUE(iter->next_batch(&block).is_end_of_file());
|
||||
EXPECT_EQ(0, block.num_rows());
|
||||
}
|
||||
// scan all rows
|
||||
{
|
||||
TCondition condition;
|
||||
condition.__set_column_name("2");
|
||||
condition.__set_condition_op("<");
|
||||
std::vector<std::string> vals = {"100"};
|
||||
condition.__set_condition_values(vals);
|
||||
std::shared_ptr<Conditions> conditions(new Conditions(tablet_schema));
|
||||
condition.__set_column_unique_id(
|
||||
tablet_schema->column(condition.column_name).unique_id());
|
||||
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
|
||||
|
||||
StorageReadOptions read_opts;
|
||||
read_opts.stats = &stats;
|
||||
read_opts.tablet_schema = tablet_schema;
|
||||
read_opts.conditions = conditions.get();
|
||||
|
||||
std::unique_ptr<RowwiseIterator> iter;
|
||||
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
|
||||
|
||||
RowBlockV2 block(schema, 1024);
|
||||
|
||||
// only first page will be read because of zone map
|
||||
int left = 16 * 1024;
|
||||
|
||||
int rowid = 0;
|
||||
while (left > 0) {
|
||||
int rows_read = left > 1024 ? 1024 : left;
|
||||
block.clear();
|
||||
EXPECT_TRUE(iter->next_batch(&block).ok());
|
||||
EXPECT_EQ(DEL_NOT_SATISFIED, block.delete_state());
|
||||
EXPECT_EQ(rows_read, block.num_rows());
|
||||
left -= rows_read;
|
||||
|
||||
for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
|
||||
auto cid = block.schema()->column_ids()[j];
|
||||
auto column_block = block.column_block(j);
|
||||
for (int i = 0; i < rows_read; ++i) {
|
||||
int rid = rowid + i;
|
||||
EXPECT_FALSE(column_block.is_null(i));
|
||||
EXPECT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i))
|
||||
<< "rid:" << rid << ", i:" << i;
|
||||
}
|
||||
}
|
||||
rowid += rows_read;
|
||||
}
|
||||
EXPECT_EQ(16 * 1024, rowid);
|
||||
EXPECT_TRUE(iter->next_batch(&block).is_end_of_file());
|
||||
EXPECT_EQ(0, block.num_rows());
|
||||
}
|
||||
// test zone map with query predicate an delete predicate
|
||||
{
|
||||
// the first two page will be read by this condition
|
||||
TCondition condition;
|
||||
condition.__set_column_name("2");
|
||||
condition.__set_condition_op("<");
|
||||
std::vector<std::string> vals = {"165000"};
|
||||
condition.__set_condition_values(vals);
|
||||
std::shared_ptr<Conditions> conditions(new Conditions(tablet_schema));
|
||||
condition.__set_column_unique_id(
|
||||
tablet_schema->column(condition.column_name).unique_id());
|
||||
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
|
||||
|
||||
// the second page read will be pruned by the following delete predicate
|
||||
TCondition delete_condition;
|
||||
delete_condition.__set_column_name("2");
|
||||
delete_condition.__set_condition_op("=");
|
||||
std::vector<std::string> vals2 = {"164001"};
|
||||
delete_condition.__set_condition_values(vals2);
|
||||
std::shared_ptr<Conditions> delete_conditions(new Conditions(tablet_schema));
|
||||
delete_condition.__set_column_unique_id(
|
||||
tablet_schema->column(delete_condition.column_name).unique_id());
|
||||
EXPECT_EQ(Status::OK(), delete_conditions->append_condition(delete_condition));
|
||||
|
||||
StorageReadOptions read_opts;
|
||||
read_opts.stats = &stats;
|
||||
read_opts.tablet_schema = tablet_schema;
|
||||
read_opts.conditions = conditions.get();
|
||||
read_opts.delete_conditions.push_back(delete_conditions.get());
|
||||
|
||||
std::unique_ptr<RowwiseIterator> iter;
|
||||
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
|
||||
|
||||
RowBlockV2 block(schema, 1024);
|
||||
|
||||
// so the first page will be read because of zone map
|
||||
int left = 16 * 1024;
|
||||
|
||||
int rowid = 0;
|
||||
while (left > 0) {
|
||||
int rows_read = left > 1024 ? 1024 : left;
|
||||
block.clear();
|
||||
auto s = iter->next_batch(&block);
|
||||
EXPECT_TRUE(s.ok()) << s.to_string();
|
||||
EXPECT_EQ(rows_read, block.num_rows());
|
||||
EXPECT_EQ(DEL_NOT_SATISFIED, block.delete_state());
|
||||
left -= rows_read;
|
||||
|
||||
for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
|
||||
auto cid = block.schema()->column_ids()[j];
|
||||
auto column_block = block.column_block(j);
|
||||
for (int i = 0; i < rows_read; ++i) {
|
||||
int rid = rowid + i;
|
||||
EXPECT_FALSE(column_block.is_null(i));
|
||||
EXPECT_EQ(rid * 10 + cid, *(int*)column_block.cell_ptr(i))
|
||||
<< "rid:" << rid << ", i:" << i;
|
||||
}
|
||||
}
|
||||
rowid += rows_read;
|
||||
}
|
||||
EXPECT_EQ(16 * 1024, rowid);
|
||||
EXPECT_TRUE(iter->next_batch(&block).is_end_of_file());
|
||||
EXPECT_EQ(0, block.num_rows());
|
||||
}
|
||||
// test bloom filter
|
||||
{
|
||||
StorageReadOptions read_opts;
|
||||
read_opts.stats = &stats;
|
||||
read_opts.tablet_schema = tablet_schema;
|
||||
TCondition condition;
|
||||
condition.__set_column_name("2");
|
||||
condition.__set_condition_op("=");
|
||||
// 102 is not in page 1
|
||||
std::vector<std::string> vals = {"102"};
|
||||
condition.__set_condition_values(vals);
|
||||
std::shared_ptr<Conditions> conditions(new Conditions(tablet_schema));
|
||||
condition.__set_column_unique_id(
|
||||
tablet_schema->column(condition.column_name).unique_id());
|
||||
condition.__set_column_unique_id(
|
||||
tablet_schema->column(condition.column_name).unique_id());
|
||||
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
|
||||
read_opts.conditions = conditions.get();
|
||||
std::unique_ptr<RowwiseIterator> iter;
|
||||
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
|
||||
|
||||
RowBlockV2 block(schema, 1024);
|
||||
EXPECT_TRUE(iter->next_batch(&block).is_end_of_file());
|
||||
EXPECT_EQ(0, block.num_rows());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(SegmentReaderWriterTest, estimate_segment_size) {
|
||||
@ -1051,92 +880,6 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
|
||||
EXPECT_TRUE(st.is_end_of_file());
|
||||
EXPECT_EQ(0, block.num_rows());
|
||||
}
|
||||
|
||||
// test char zone_map query hit;should read whole page
|
||||
{
|
||||
TCondition condition;
|
||||
condition.__set_column_name("1");
|
||||
condition.__set_condition_op(">");
|
||||
std::vector<std::string> vals = {"100"};
|
||||
condition.__set_condition_values(vals);
|
||||
std::shared_ptr<Conditions> conditions(new Conditions(tablet_schema));
|
||||
condition.__set_column_unique_id(
|
||||
tablet_schema->column(condition.column_name).unique_id());
|
||||
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
|
||||
|
||||
StorageReadOptions read_opts;
|
||||
read_opts.stats = &stats;
|
||||
read_opts.tablet_schema = tablet_schema;
|
||||
read_opts.conditions = conditions.get();
|
||||
|
||||
std::unique_ptr<RowwiseIterator> iter;
|
||||
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
|
||||
|
||||
RowBlockV2 block(schema, 1024);
|
||||
int left = 4 * 1024;
|
||||
int rowid = 0;
|
||||
|
||||
while (left > 0) {
|
||||
int rows_read = left > 1024 ? 1024 : left;
|
||||
block.clear();
|
||||
st = iter->next_batch(&block);
|
||||
EXPECT_TRUE(st.ok());
|
||||
EXPECT_EQ(DEL_NOT_SATISFIED, block.delete_state());
|
||||
EXPECT_EQ(rows_read, block.num_rows());
|
||||
left -= rows_read;
|
||||
|
||||
for (int j = 0; j < block.schema()->column_ids().size(); ++j) {
|
||||
auto cid = block.schema()->column_ids()[j];
|
||||
auto column_block = block.column_block(j);
|
||||
for (int i = 0; i < rows_read; ++i) {
|
||||
int rid = rowid + i;
|
||||
EXPECT_FALSE(column_block.is_null(i));
|
||||
|
||||
const Slice* actual =
|
||||
reinterpret_cast<const Slice*>(column_block.cell_ptr(i));
|
||||
Slice expect;
|
||||
set_column_value_by_type(tablet_schema->_cols[j]._type, rid * 10 + cid,
|
||||
reinterpret_cast<char*>(&expect), &pool,
|
||||
tablet_schema->_cols[j]._length);
|
||||
EXPECT_EQ(expect.to_string(), actual->to_string())
|
||||
<< "rid:" << rid << ", i:" << i;
|
||||
;
|
||||
}
|
||||
}
|
||||
rowid += rows_read;
|
||||
}
|
||||
EXPECT_EQ(4 * 1024, rowid);
|
||||
st = iter->next_batch(&block);
|
||||
EXPECT_TRUE(st.is_end_of_file());
|
||||
EXPECT_EQ(0, block.num_rows());
|
||||
}
|
||||
|
||||
// test char zone_map query miss;col < -1
|
||||
{
|
||||
TCondition condition;
|
||||
condition.__set_column_name("1");
|
||||
condition.__set_condition_op("<");
|
||||
std::vector<std::string> vals = {"-2"};
|
||||
condition.__set_condition_values(vals);
|
||||
std::shared_ptr<Conditions> conditions(new Conditions(tablet_schema));
|
||||
condition.__set_column_unique_id(
|
||||
tablet_schema->column(condition.column_name).unique_id());
|
||||
EXPECT_EQ(Status::OK(), conditions->append_condition(condition));
|
||||
|
||||
StorageReadOptions read_opts;
|
||||
read_opts.stats = &stats;
|
||||
read_opts.tablet_schema = tablet_schema;
|
||||
read_opts.conditions = conditions.get();
|
||||
|
||||
std::unique_ptr<RowwiseIterator> iter;
|
||||
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
|
||||
|
||||
RowBlockV2 block(schema, 1024);
|
||||
|
||||
st = iter->next_batch(&block);
|
||||
EXPECT_TRUE(st.is_end_of_file());
|
||||
EXPECT_EQ(0, block.num_rows());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user