[Enhancement](filter) support only min/max runtime filter in BE (#25290)
this PR #25193 have achieve about FE. eg: select count() from lineorder join supplier on lo_partkey < s_suppkey; will have a max filter after build hash table , so could use it to filter probe table data.
This commit is contained in:
@ -128,11 +128,164 @@ public:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
protected:
|
||||
T _max = type_limit<T>::min();
|
||||
T _min = type_limit<T>::max();
|
||||
// we use _empty to avoid compare twice
|
||||
bool _empty = true;
|
||||
};
|
||||
|
||||
template <class T>
|
||||
class MinNumFunc : public MinMaxNumFunc<T> {
|
||||
public:
|
||||
MinNumFunc() = default;
|
||||
~MinNumFunc() override = default;
|
||||
|
||||
void insert(const void* data) override {
|
||||
if (data == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
T val_data = *reinterpret_cast<const T*>(data);
|
||||
|
||||
if (this->_empty) {
|
||||
this->_min = val_data;
|
||||
this->_empty = false;
|
||||
return;
|
||||
}
|
||||
if (val_data < this->_min) {
|
||||
this->_min = val_data;
|
||||
}
|
||||
}
|
||||
|
||||
void insert_fixed_len(const char* data, const int* offsets, int number) override {
|
||||
if (!number) {
|
||||
return;
|
||||
}
|
||||
if (this->_empty) {
|
||||
this->_min = *((T*)data + offsets[0]);
|
||||
}
|
||||
for (int i = this->_empty; i < number; i++) {
|
||||
this->_min = std::min(this->_min, *((T*)data + offsets[i]));
|
||||
}
|
||||
this->_empty = false;
|
||||
}
|
||||
|
||||
bool find(void* data) override {
|
||||
if (data == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
T val_data = *reinterpret_cast<T*>(data);
|
||||
return val_data >= this->_min;
|
||||
}
|
||||
|
||||
Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
|
||||
if constexpr (std::is_same_v<T, StringRef>) {
|
||||
MinNumFunc<T>* other_minmax = assert_cast<MinNumFunc<T>*>(minmax_func);
|
||||
if (other_minmax->_min < this->_min) {
|
||||
auto& other_min = other_minmax->_min;
|
||||
auto str = pool->add(new std::string(other_min.data, other_min.size));
|
||||
this->_min.data = str->data();
|
||||
this->_min.size = str->length();
|
||||
}
|
||||
} else {
|
||||
MinNumFunc<T>* other_minmax = assert_cast<MinNumFunc<T>*>(minmax_func);
|
||||
if (other_minmax->_min < this->_min) {
|
||||
this->_min = other_minmax->_min;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
//min filter the max is useless, so return nullptr directly
|
||||
void* get_max() override {
|
||||
DCHECK(false);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
Status assign(void* min_data, void* max_data) override {
|
||||
this->_min = *(T*)min_data;
|
||||
return Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
template <class T>
|
||||
class MaxNumFunc : public MinMaxNumFunc<T> {
|
||||
public:
|
||||
MaxNumFunc() = default;
|
||||
~MaxNumFunc() override = default;
|
||||
|
||||
void insert(const void* data) override {
|
||||
if (data == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
T val_data = *reinterpret_cast<const T*>(data);
|
||||
|
||||
if (this->_empty) {
|
||||
this->_max = val_data;
|
||||
this->_empty = false;
|
||||
return;
|
||||
}
|
||||
if (val_data > this->_max) {
|
||||
this->_max = val_data;
|
||||
}
|
||||
}
|
||||
|
||||
void insert_fixed_len(const char* data, const int* offsets, int number) override {
|
||||
if (!number) {
|
||||
return;
|
||||
}
|
||||
if (this->_empty) {
|
||||
this->_max = *((T*)data + offsets[0]);
|
||||
}
|
||||
for (int i = this->_empty; i < number; i++) {
|
||||
this->_max = std::max(this->_max, *((T*)data + offsets[i]));
|
||||
}
|
||||
this->_empty = false;
|
||||
}
|
||||
|
||||
bool find(void* data) override {
|
||||
if (data == nullptr) {
|
||||
return false;
|
||||
}
|
||||
|
||||
T val_data = *reinterpret_cast<T*>(data);
|
||||
return val_data <= this->_max;
|
||||
}
|
||||
|
||||
Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
|
||||
if constexpr (std::is_same_v<T, StringRef>) {
|
||||
MinMaxNumFunc<T>* other_minmax = assert_cast<MinMaxNumFunc<T>*>(minmax_func);
|
||||
|
||||
if (other_minmax->_max > this->_max) {
|
||||
auto& other_max = other_minmax->_max;
|
||||
auto str = pool->add(new std::string(other_max.data, other_max.size));
|
||||
this->_max.data = str->data();
|
||||
this->_max.size = str->length();
|
||||
}
|
||||
} else {
|
||||
MinMaxNumFunc<T>* other_minmax = assert_cast<MinMaxNumFunc<T>*>(minmax_func);
|
||||
if (other_minmax->_max > this->_max) {
|
||||
this->_max = other_minmax->_max;
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
//max filter the min is useless, so return nullptr directly
|
||||
void* get_min() override {
|
||||
DCHECK(false);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
Status assign(void* min_data, void* max_data) override {
|
||||
this->_max = *(T*)max_data;
|
||||
return Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -323,6 +323,8 @@ public:
|
||||
_context.hybrid_set.reset(create_set(_column_return_type));
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MIN_FILTER:
|
||||
case RuntimeFilterType::MAX_FILTER:
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
|
||||
break;
|
||||
@ -405,6 +407,8 @@ public:
|
||||
_context.hybrid_set->insert(data);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MIN_FILTER:
|
||||
case RuntimeFilterType::MAX_FILTER:
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
_context.minmax_func->insert(data);
|
||||
break;
|
||||
@ -448,6 +452,8 @@ public:
|
||||
_context.hybrid_set->insert_fixed_len(data, offsets, number);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MIN_FILTER:
|
||||
case RuntimeFilterType::MAX_FILTER:
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
_context.minmax_func->insert_fixed_len(data, offsets, number);
|
||||
break;
|
||||
@ -575,6 +581,8 @@ public:
|
||||
}
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MIN_FILTER:
|
||||
case RuntimeFilterType::MAX_FILTER:
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
static_cast<void>(
|
||||
_context.minmax_func->merge(wrapper->_context.minmax_func.get(), _pool));
|
||||
@ -1770,6 +1778,43 @@ Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
|
||||
}
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MIN_FILTER: {
|
||||
// create min filter
|
||||
vectorized::VExprSPtr min_pred;
|
||||
TExprNode min_pred_node;
|
||||
RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::GE, min_pred,
|
||||
&min_pred_node));
|
||||
vectorized::VExprSPtr min_literal;
|
||||
RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_min(),
|
||||
min_literal));
|
||||
min_pred->add_child(probe_ctx->root());
|
||||
min_pred->add_child(min_literal);
|
||||
container.push_back(
|
||||
vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred));
|
||||
vectorized::VExprContextSPtr new_probe_ctx;
|
||||
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx));
|
||||
probe_ctxs.push_back(new_probe_ctx);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MAX_FILTER: {
|
||||
vectorized::VExprSPtr max_pred;
|
||||
// create max filter
|
||||
TExprNode max_pred_node;
|
||||
RETURN_IF_ERROR(create_vbin_predicate(probe_ctx->root()->type(), TExprOpcode::LE, max_pred,
|
||||
&max_pred_node));
|
||||
vectorized::VExprSPtr max_literal;
|
||||
RETURN_IF_ERROR(create_literal(probe_ctx->root()->type(), _context.minmax_func->get_max(),
|
||||
max_literal));
|
||||
max_pred->add_child(probe_ctx->root());
|
||||
max_pred->add_child(max_literal);
|
||||
container.push_back(
|
||||
vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred));
|
||||
|
||||
vectorized::VExprContextSPtr new_probe_ctx;
|
||||
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx));
|
||||
probe_ctxs.push_back(new_probe_ctx);
|
||||
break;
|
||||
}
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
vectorized::VExprSPtr max_pred;
|
||||
// create max filter
|
||||
|
||||
@ -76,9 +76,29 @@ enum class RuntimeFilterType {
|
||||
MINMAX_FILTER = 1,
|
||||
BLOOM_FILTER = 2,
|
||||
IN_OR_BLOOM_FILTER = 3,
|
||||
BITMAP_FILTER = 4
|
||||
BITMAP_FILTER = 4,
|
||||
MIN_FILTER = 5, // only min // now only support at local
|
||||
MAX_FILTER = 6 // only max // now only support at local
|
||||
};
|
||||
|
||||
static RuntimeFilterType get_minmax_filter_type(TMinMaxRuntimeFilterType::type ttype) {
|
||||
switch (ttype) {
|
||||
case TMinMaxRuntimeFilterType::MIN: {
|
||||
return RuntimeFilterType::MIN_FILTER;
|
||||
}
|
||||
case TMinMaxRuntimeFilterType::MAX: {
|
||||
return RuntimeFilterType::MAX_FILTER;
|
||||
}
|
||||
case TMinMaxRuntimeFilterType::MIN_MAX: {
|
||||
return RuntimeFilterType::MINMAX_FILTER;
|
||||
}
|
||||
default: {
|
||||
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
|
||||
"Invalid minmax runtime filter type!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static RuntimeFilterType get_runtime_filter_type(TRuntimeFilterType::type ttype) {
|
||||
switch (ttype) {
|
||||
case TRuntimeFilterType::BLOOM: {
|
||||
@ -189,10 +209,15 @@ public:
|
||||
_is_ignored(false),
|
||||
registration_time_(MonotonicMillis()),
|
||||
_enable_pipeline_exec(_state->enable_pipeline_exec()),
|
||||
_runtime_filter_type(get_runtime_filter_type(desc->type)),
|
||||
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
|
||||
to_string(_runtime_filter_type))),
|
||||
_profile(new RuntimeProfile(_name)) {}
|
||||
_profile(new RuntimeProfile(_name)) {
|
||||
if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) {
|
||||
_runtime_filter_type = get_minmax_filter_type(desc->min_max_type);
|
||||
} else {
|
||||
_runtime_filter_type = get_runtime_filter_type(desc->type);
|
||||
}
|
||||
_name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
|
||||
to_string(_runtime_filter_type));
|
||||
}
|
||||
|
||||
IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc)
|
||||
: _query_ctx(query_ctx),
|
||||
@ -209,10 +234,15 @@ public:
|
||||
_is_ignored(false),
|
||||
registration_time_(MonotonicMillis()),
|
||||
_enable_pipeline_exec(query_ctx->enable_pipeline_exec()),
|
||||
_runtime_filter_type(get_runtime_filter_type(desc->type)),
|
||||
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
|
||||
to_string(_runtime_filter_type))),
|
||||
_profile(new RuntimeProfile(_name)) {}
|
||||
_profile(new RuntimeProfile(_name)) {
|
||||
if (desc->__isset.min_max_type && desc->type == TRuntimeFilterType::MIN_MAX) {
|
||||
_runtime_filter_type = get_minmax_filter_type(desc->min_max_type);
|
||||
} else {
|
||||
_runtime_filter_type = get_runtime_filter_type(desc->type);
|
||||
}
|
||||
_name = fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
|
||||
to_string(_runtime_filter_type));
|
||||
}
|
||||
|
||||
~IRuntimeFilter() = default;
|
||||
|
||||
@ -335,6 +365,12 @@ public:
|
||||
case RuntimeFilterType::BLOOM_FILTER: {
|
||||
return std::string("bloomfilter");
|
||||
}
|
||||
case RuntimeFilterType::MIN_FILTER: {
|
||||
return std::string("only_min");
|
||||
}
|
||||
case RuntimeFilterType::MAX_FILTER: {
|
||||
return std::string("only_max");
|
||||
}
|
||||
case RuntimeFilterType::MINMAX_FILTER: {
|
||||
return std::string("minmax");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user