From 4ba4767eef28eca4d1c64fbf45a9c2ebc594d2e1 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Thu, 4 Jan 2024 09:12:30 +0800 Subject: [PATCH] [improvement](scan) make global runtime filter support in-list filter (#29394) --- be/src/exprs/runtime_filter.cpp | 45 ++++++++++++------- be/src/exprs/runtime_filter.h | 4 +- be/src/exprs/runtime_filter_slots.h | 5 +-- .../post/RuntimeFilterGenerator.java | 6 +-- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f52a9574bf..bf09adc53f 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -453,9 +453,11 @@ public: std::vector& push_exprs, const TExpr& probe_expr); Status merge(const RuntimePredicateWrapper* wrapper) { - bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && - (wrapper->_filter_type != RuntimeFilterType::IN_FILTER && - wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER); + bool can_not_merge_in_or_bloom = + _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER && + (wrapper->_filter_type != RuntimeFilterType::IN_FILTER && + wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER && + wrapper->_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER); bool can_not_merge_other = _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER && _filter_type != wrapper->_filter_type; @@ -513,8 +515,15 @@ public: case RuntimeFilterType::IN_OR_BLOOM_FILTER: { auto real_filter_type = _is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER : RuntimeFilterType::IN_FILTER; + + auto other_filter_type = wrapper->_filter_type; + if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { + other_filter_type = wrapper->_is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER + : RuntimeFilterType::IN_FILTER; + } + if (real_filter_type == RuntimeFilterType::IN_FILTER) { - if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in + if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in CHECK(!wrapper->_is_ignored_in_filter) << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " @@ -526,7 +535,6 @@ public: << ") >= max_in_num(" << _max_in_num << ")"; change_to_bloom_filter(); } - // in merge bloom filter } else { VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id << ") because: already exist a bloom filter"; @@ -535,8 +543,7 @@ public: wrapper->_context.bloom_filter_func.get())); } } else { - if (wrapper->_filter_type == - RuntimeFilterType::IN_FILTER) { // bloom filter merge in + if (other_filter_type == RuntimeFilterType::IN_FILTER) { // bloom filter merge in CHECK(!wrapper->_is_ignored_in_filter) << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " @@ -1157,6 +1164,14 @@ void IRuntimeFilter::set_filter_timer(std::shared_ptr_filter_type == RuntimeFilterType::IN_FILTER) { + _wrapper->_is_ignored_in_filter = true; + _wrapper->_ignored_in_filter_msg = _pool->add(new std::string(msg)); + } +} + BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { return _wrapper->get_bloomfilter(); } @@ -1344,14 +1359,14 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { if (!_is_ignored && wrapper->is_ignored_in_filter()) { - set_ignored(); - set_ignored_msg(*(wrapper->get_ignored_in_filter_msg())); + std::string* msg = wrapper->get_ignored_in_filter_msg(); + set_ignored(msg ? *msg : ""); } auto origin_type = _wrapper->get_real_type(); Status status = _wrapper->merge(wrapper); if (!_is_ignored && _wrapper->is_ignored_in_filter()) { - set_ignored(); - set_ignored_msg(*(_wrapper->get_ignored_in_filter_msg())); + std::string* msg = _wrapper->get_ignored_in_filter_msg(); + set_ignored(msg ? *msg : ""); } if (origin_type != _wrapper->get_real_type()) { update_runtime_filter_type_to_profile(); @@ -1656,10 +1671,8 @@ bool IRuntimeFilter::is_bloomfilter() { Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { - set_ignored(); const PInFilter in_filter = param->request->in_filter(); - auto msg = param->pool->add(new std::string(in_filter.ignored_msg())); - set_ignored_msg(*msg); + set_ignored(in_filter.ignored_msg()); } std::unique_ptr wrapper; RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, &wrapper)); @@ -1677,10 +1690,8 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply) { if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { - set_ignored(); const PInFilter in_filter = param->request->in_filter(); - auto msg = param->pool->add(new std::string(in_filter.ignored_msg())); - set_ignored_msg(*msg); + set_ignored(in_filter.ignored_msg()); } std::unique_ptr tmp_wrapper; diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 2673308ae7..fc324c1c1b 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -292,9 +292,7 @@ public: Status update_filter(const UpdateRuntimeFilterParams* param); Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply); - void set_ignored() { _is_ignored = true; } - - void set_ignored_msg(std::string& msg) { _ignored_msg = msg; } + void set_ignored(const std::string& msg); // for ut bool is_bloomfilter(); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index d539e295ae..495ac28e76 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -58,15 +58,14 @@ public: filter_id); } for (auto filter : filters) { - filter->set_ignored(); + filter->set_ignored(""); filter->signal(); } return Status::OK(); }; auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) { - runtime_filter->set_ignored(); - runtime_filter->set_ignored_msg(msg); + runtime_filter->set_ignored(msg); RETURN_IF_ERROR(runtime_filter->publish()); return Status::OK(); }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 45a14d3b63..12db27793d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -369,11 +369,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { List legalTypes = Arrays.stream(TRuntimeFilterType.values()) .filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0) .collect(Collectors.toList()); - if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) { - // If storage data distribution is ignored, we use BLOOM filter. - legalTypes.clear(); - legalTypes.add(TRuntimeFilterType.BLOOM); - } + List hashJoinConjuncts = join.getEqualToConjuncts(); for (int i = 0; i < hashJoinConjuncts.size(); i++) { EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(