From 29ca77622fe492ec767221b7d87b6dfcfff1aa8c Mon Sep 17 00:00:00 2001 From: Pxl <952130278@qq.com> Date: Sun, 7 Nov 2021 17:40:45 +0800 Subject: [PATCH] [Refactor] Refactor part of RuntimeFilter's code (#6998) #6997 --- be/src/exec/hash_join_node.cpp | 5 +- be/src/exec/hash_join_node.h | 1 + be/src/exprs/bloomfilter_predicate.cpp | 22 ++-- be/src/exprs/hybrid_set.cpp | 19 ++-- be/src/exprs/hybrid_set.h | 16 +-- be/src/exprs/runtime_filter.cpp | 108 ++++--------------- be/src/exprs/runtime_filter.h | 46 +-------- be/src/exprs/runtime_filter_slots.h | 137 +++++++++++++++++++++++++ be/src/runtime/primitive_type.h | 4 + 9 files changed, 197 insertions(+), 161 deletions(-) create mode 100644 be/src/exprs/runtime_filter_slots.h diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 3fc0dfecf2..b1b9356e65 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -242,8 +242,7 @@ Status HashJoinNode::open(RuntimeState* state) { _runtime_filter_descs); RETURN_IF_ERROR(thread_status.get_future().get()); - RETURN_IF_ERROR(runtime_filter_slots.init(state, _pool, expr_mem_tracker().get(), - _hash_tbl->size())); + RETURN_IF_ERROR(runtime_filter_slots.init(state, _hash_tbl->size())); { SCOPED_TIMER(_push_compute_timer); auto func = [&](TupleRow* row) { runtime_filter_slots.insert(row); }; @@ -252,7 +251,7 @@ Status HashJoinNode::open(RuntimeState* state) { COUNTER_UPDATE(_build_timer, _push_compute_timer->value()); { SCOPED_TIMER(_push_down_timer); - runtime_filter_slots.publish(this); + runtime_filter_slots.publish(); } Status open_status = child(0)->open(state); RETURN_IF_ERROR(open_status); diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index 2379f7771c..f097a2c10e 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -25,6 +25,7 @@ #include "exec/exec_node.h" #include "exec/hash_table.h" +#include "exprs/runtime_filter_slots.h" #include "gen_cpp/PlanNodes_types.h" namespace doris { diff --git a/be/src/exprs/bloomfilter_predicate.cpp b/be/src/exprs/bloomfilter_predicate.cpp index 44906fd7a0..4bc7584867 100644 --- a/be/src/exprs/bloomfilter_predicate.cpp +++ b/be/src/exprs/bloomfilter_predicate.cpp @@ -40,31 +40,37 @@ IBloomFilterFuncBase* IBloomFilterFuncBase::create_bloom_filter(MemTracker* trac return new BloomFilterFunc(tracker); case TYPE_BIGINT: return new BloomFilterFunc(tracker); + case TYPE_LARGEINT: + return new BloomFilterFunc(tracker); + case TYPE_FLOAT: return new BloomFilterFunc(tracker); case TYPE_DOUBLE: return new BloomFilterFunc(tracker); + + case TYPE_DECIMALV2: + return new BloomFilterFunc(tracker); + + case TYPE_TIME: + return new BloomFilterFunc(tracker); case TYPE_DATE: return new BloomFilterFunc(tracker); case TYPE_DATETIME: return new BloomFilterFunc(tracker); - case TYPE_DECIMALV2: - return new BloomFilterFunc(tracker); - case TYPE_LARGEINT: - return new BloomFilterFunc(tracker); + case TYPE_CHAR: return new BloomFilterFunc(tracker); case TYPE_VARCHAR: return new BloomFilterFunc(tracker); case TYPE_STRING: - return new BloomFilterFunc(tracker); + return new BloomFilterFunc(tracker); + default: - return nullptr; + DCHECK(false) << "Invalid type."; } return nullptr; } - BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node) : Predicate(node), _is_prepare(false), @@ -74,7 +80,7 @@ BloomFilterPredicate::BloomFilterPredicate(const TExprNode& node) BloomFilterPredicate::~BloomFilterPredicate() { VLOG_NOTICE << "bloom filter rows:" << _filtered_rows << ",scan_rows:" << _scan_rows - << ",rate:" << (double)_filtered_rows / _scan_rows; + << ",rate:" << (double)_filtered_rows / _scan_rows; } BloomFilterPredicate::BloomFilterPredicate(const BloomFilterPredicate& other) diff --git a/be/src/exprs/hybrid_set.cpp b/be/src/exprs/hybrid_set.cpp index 4ebdabcaf0..1b3fd377b4 100644 --- a/be/src/exprs/hybrid_set.cpp +++ b/be/src/exprs/hybrid_set.cpp @@ -36,21 +36,22 @@ HybridSetBase* HybridSetBase::create_set(PrimitiveType type) { case TYPE_BIGINT: return new (std::nothrow) HybridSet(); + case TYPE_LARGEINT: + return new (std::nothrow) HybridSet<__int128>(); + case TYPE_FLOAT: return new (std::nothrow) HybridSet(); + case TYPE_TIME: case TYPE_DOUBLE: return new (std::nothrow) HybridSet(); - case TYPE_DATE: - case TYPE_DATETIME: - return new (std::nothrow) HybridSet(); - case TYPE_DECIMALV2: return new (std::nothrow) HybridSet(); - case TYPE_LARGEINT: - return new (std::nothrow) HybridSet<__int128>(); + case TYPE_DATE: + case TYPE_DATETIME: + return new (std::nothrow) HybridSet(); case TYPE_CHAR: case TYPE_VARCHAR: @@ -58,12 +59,10 @@ HybridSetBase* HybridSetBase::create_set(PrimitiveType type) { return new (std::nothrow) StringValueSet(); default: - return NULL; + DCHECK(false) << "Invalid type."; } - return NULL; + return nullptr; } } // namespace doris - -/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 47c02ead46..6947f09a58 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -35,7 +35,7 @@ class HybridSetBase { public: HybridSetBase() = default; virtual ~HybridSetBase() = default; - virtual void insert(void* data) = 0; + virtual void insert(const void* data) = 0; // use in vectorize execute engine virtual void insert(void* data, size_t) = 0; @@ -66,17 +66,18 @@ public: ~HybridSet() override = default; - void insert(void* data) override { + void insert(const void* data) override { + if (data == nullptr) return; + if (sizeof(T) >= 16) { // for largeint, it will core dump with no memcpy T value; memcpy(&value, data, sizeof(T)); _set.insert(value); } else { - _set.insert(*reinterpret_cast(data)); + _set.insert(*reinterpret_cast(data)); } } - void insert(void* data, size_t) override { insert(data); } void insert(HybridSetBase* set) override { @@ -124,12 +125,13 @@ public: ~StringValueSet() override = default; - void insert(void* data) override { - auto* value = reinterpret_cast(data); + void insert(const void* data) override { + if (data == nullptr) return; + + const auto* value = reinterpret_cast(data); std::string str_value(value->ptr, value->len); _set.insert(str_value); } - void insert(void* data, size_t size) override { std::string str_value(reinterpret_cast(data), size); _set.insert(str_value); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 446eb10e0c..17e7f3b3c1 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -43,7 +43,7 @@ namespace doris { // only used in Runtime Filter class MinMaxFuncBase { public: - virtual void insert(void* data) = 0; + virtual void insert(const void* data) = 0; virtual bool find(void* data) = 0; virtual bool is_empty() = 0; virtual void* get_max() = 0; @@ -61,9 +61,9 @@ class MinMaxNumFunc : public MinMaxFuncBase { public: MinMaxNumFunc() = default; ~MinMaxNumFunc() = default; - virtual void insert(void* data) { + virtual void insert(const void* data) { if (data == nullptr) return; - T val_data = *reinterpret_cast(data); + const T val_data = *reinterpret_cast(data); if (_empty) { _min = val_data; _max = val_data; @@ -101,7 +101,6 @@ public: _max.ptr = str->data(); _max.len = str->length(); } - } else { MinMaxNumFunc* other_minmax = static_cast*>(minmax_func); if (other_minmax->_min < _min) { @@ -151,30 +150,33 @@ MinMaxFuncBase* MinMaxFuncBase::create_minmax_filter(PrimitiveType type) { case TYPE_BIGINT: return new (std::nothrow) MinMaxNumFunc(); + case TYPE_LARGEINT: + return new (std::nothrow) MinMaxNumFunc<__int128>(); + case TYPE_FLOAT: return new (std::nothrow) MinMaxNumFunc(); + case TYPE_TIME: case TYPE_DOUBLE: return new (std::nothrow) MinMaxNumFunc(); + case TYPE_DECIMALV2: + return new (std::nothrow) MinMaxNumFunc(); + case TYPE_DATE: case TYPE_DATETIME: return new (std::nothrow) MinMaxNumFunc(); - case TYPE_DECIMALV2: - return new (std::nothrow) MinMaxNumFunc(); - - case TYPE_LARGEINT: - return new (std::nothrow) MinMaxNumFunc<__int128>(); - case TYPE_CHAR: case TYPE_VARCHAR: case TYPE_STRING: return new (std::nothrow) MinMaxNumFunc(); + default: DCHECK(false) << "Invalid type."; } - return NULL; + + return nullptr; } // PrimitiveType->TExprNodeType @@ -331,6 +333,8 @@ TTypeDesc create_type_desc(PrimitiveType type) { TScalarType scalarType; scalarType.__set_type(to_thrift(type)); scalarType.__set_len(-1); + scalarType.__set_precision(-1); + scalarType.__set_scale(-1); node_type.back().__set_scalar_type(scalarType); type_desc.__set_types(node_type); return type_desc; @@ -472,18 +476,15 @@ public: return _bloomfilter_func->init_with_fixed_length(params->bloom_filter_size); } default: - DCHECK(false); return Status::InvalidArgument("Unknown Filter type"); } return Status::OK(); } - void insert(void* data) { + void insert(const void* data) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - if (data != nullptr) { - _hybrid_set->insert(data); - } + _hybrid_set->insert(data); break; } case RuntimeFilterType::MINMAX_FILTER: { @@ -491,7 +492,6 @@ public: break; } case RuntimeFilterType::BLOOM_FILTER: { - DCHECK(_bloomfilter_func != nullptr); _bloomfilter_func->insert(data); break; } @@ -747,12 +747,12 @@ Status IRuntimeFilter::create(RuntimeState* state, MemTracker* tracker, ObjectPo return (*res)->init_with_desc(desc, node_id); } -void IRuntimeFilter::insert(void* data) { +void IRuntimeFilter::insert(const void* data) { DCHECK(is_producer()); _wrapper->insert(data); } -Status IRuntimeFilter::publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx) { +Status IRuntimeFilter::publish() { DCHECK(is_producer()); if (_has_local_target) { IRuntimeFilter* consumer_filter = nullptr; @@ -1054,74 +1054,4 @@ Status IRuntimeFilter::consumer_close() { RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default; RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default; -Status RuntimeFilterSlots::init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker, - int64_t hash_table_size) { - DCHECK(_probe_expr_context.size() == _build_expr_context.size()); - - // runtime filter effect stragety - // 1. we will ignore IN filter when hash_table_size is too big - // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size - // is too small and IN filter has effect - - std::map has_in_filter; - - auto ignore_filter = [state](int filter_id) { - IRuntimeFilter* consumer_filter = nullptr; - state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter); - DCHECK(consumer_filter != nullptr); - consumer_filter->set_ignored(); - consumer_filter->signal(); - }; - - for (auto& filter_desc : _runtime_filter_descs) { - IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id, - &runtime_filter)); - DCHECK(runtime_filter != nullptr); - DCHECK(runtime_filter->expr_order() >= 0); - DCHECK(runtime_filter->expr_order() < _probe_expr_context.size()); - - if (runtime_filter->type() == RuntimeFilterType::IN_FILTER && - hash_table_size >= state->runtime_filter_max_in_num()) { - ignore_filter(filter_desc.filter_id); - continue; - } - if (has_in_filter[runtime_filter->expr_order()] && !runtime_filter->has_remote_target() && - runtime_filter->type() != RuntimeFilterType::IN_FILTER && - hash_table_size < state->runtime_filter_max_in_num()) { - ignore_filter(filter_desc.filter_id); - continue; - } - has_in_filter[runtime_filter->expr_order()] = - (runtime_filter->type() == RuntimeFilterType::IN_FILTER); - _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter); - } - - return Status::OK(); -} - -void RuntimeFilterSlots::ready_for_publish() { - for (auto& pair : _runtime_filters) { - for (auto filter : pair.second) { - filter->ready_for_publish(); - } - } -} - -void RuntimeFilterSlots::publish(HashJoinNode* hash_join_node) { - for (int i = 0; i < _probe_expr_context.size(); ++i) { - auto iter = _runtime_filters.find(i); - if (iter != _runtime_filters.end()) { - for (auto filter : iter->second) { - filter->publish(hash_join_node, _probe_expr_context[i]); - } - } - } - for (auto& pair : _runtime_filters) { - for (auto filter : pair.second) { - filter->publish_finally(); - } - } -} - } // namespace doris diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index b182c42690..809c132dc5 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -120,11 +120,11 @@ public: // insert data to build filter // only used for producer - void insert(void* data); + void insert(const void* data); // publish filter // push filter to remote node or push down it to scan_node - Status publish(HashJoinNode* hash_join_node, ExprContext* probe_ctx); + Status publish(); void publish_finally(); @@ -209,7 +209,6 @@ protected: static Status _create_wrapper(const T* param, MemTracker* tracker, ObjectPool* pool, std::unique_ptr* wrapper); -protected: RuntimeState* _state; MemTracker* _mem_tracker; ObjectPool* _pool; @@ -280,47 +279,6 @@ private: WrapperPtr _wrapper; }; -/// this class used in a hash join node -/// Provide a unified interface for other classes -class RuntimeFilterSlots { -public: - RuntimeFilterSlots(const std::vector& prob_expr_ctxs, - const std::vector& build_expr_ctxs, - const std::vector& runtime_filter_descs) - : _probe_expr_context(prob_expr_ctxs), - _build_expr_context(build_expr_ctxs), - _runtime_filter_descs(runtime_filter_descs) {} - - Status init(RuntimeState* state, ObjectPool* pool, MemTracker* tracker, - int64_t hash_table_size); - - void insert(TupleRow* row) { - for (int i = 0; i < _build_expr_context.size(); ++i) { - auto iter = _runtime_filters.find(i); - if (iter != _runtime_filters.end()) { - void* val = _build_expr_context[i]->get_value(row); - if (val != nullptr) { - for (auto filter : iter->second) { - filter->insert(val); - } - } - } - } - } - - // should call this method after insert - void ready_for_publish(); - // publish runtime filter - void publish(HashJoinNode* hash_join_node); - -private: - const std::vector& _probe_expr_context; - const std::vector& _build_expr_context; - const std::vector& _runtime_filter_descs; - // prob_contition index -> [IRuntimeFilter] - std::map> _runtime_filters; -}; - } // namespace doris #endif diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h new file mode 100644 index 0000000000..7f0957aea0 --- /dev/null +++ b/be/src/exprs/runtime_filter_slots.h @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exprs/runtime_filter.h" +#include "runtime/runtime_filter_mgr.h" +#include "runtime/runtime_state.h" + +namespace doris { + +// this class used in a hash join node +// Provide a unified interface for other classes +template +class RuntimeFilterSlotsBase { +public: + RuntimeFilterSlotsBase(const std::vector& prob_expr_ctxs, + const std::vector& build_expr_ctxs, + const std::vector& runtime_filter_descs) + : _probe_expr_context(prob_expr_ctxs), + _build_expr_context(build_expr_ctxs), + _runtime_filter_descs(runtime_filter_descs) {} + + Status init(RuntimeState* state, int64_t hash_table_size) { + DCHECK(_probe_expr_context.size() == _build_expr_context.size()); + + // runtime filter effect stragety + // 1. we will ignore IN filter when hash_table_size is too big + // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size + // is too small and IN filter has effect + + std::map has_in_filter; + + auto ignore_filter = [state](int filter_id) { + IRuntimeFilter* consumer_filter = nullptr; + state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter); + DCHECK(consumer_filter != nullptr); + consumer_filter->set_ignored(); + consumer_filter->signal(); + }; + + for (auto& filter_desc : _runtime_filter_descs) { + IRuntimeFilter* runtime_filter = nullptr; + RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id, + &runtime_filter)); + DCHECK(runtime_filter != nullptr); + DCHECK(runtime_filter->expr_order() >= 0); + DCHECK(runtime_filter->expr_order() < _probe_expr_context.size()); + + // do not create 'in filter' when hash_table size over limit + bool over_max_in_num = (hash_table_size >= state->runtime_filter_max_in_num()); + + bool is_in_filter = (runtime_filter->type() == RuntimeFilterType::IN_FILTER); + + // do not create 'bloom filter' and 'minmax filter' when 'in filter' has created + bool pass_not_in = (has_in_filter[runtime_filter->expr_order()] && + !runtime_filter->has_remote_target()); + + if (over_max_in_num == is_in_filter && (is_in_filter || pass_not_in)) { + ignore_filter(filter_desc.filter_id); + continue; + } + + has_in_filter[runtime_filter->expr_order()] = + (runtime_filter->type() == RuntimeFilterType::IN_FILTER); + _runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter); + } + + return Status::OK(); + } + + void insert(TupleRow* row) { + for (int i = 0; i < _build_expr_context.size(); ++i) { + auto iter = _runtime_filters.find(i); + if (iter != _runtime_filters.end()) { + void* val = _build_expr_context[i]->get_value(row); + if (val != nullptr) { + for (auto filter : iter->second) { + filter->insert(val); + } + } + } + } + } + + // should call this method after insert + void ready_for_publish() { + for (auto& pair : _runtime_filters) { + for (auto filter : pair.second) { + filter->ready_for_publish(); + } + } + } + // publish runtime filter + void publish() { + for (int i = 0; i < _probe_expr_context.size(); ++i) { + auto iter = _runtime_filters.find(i); + if (iter != _runtime_filters.end()) { + for (auto filter : iter->second) { + filter->publish(); + } + } + } + for (auto& pair : _runtime_filters) { + for (auto filter : pair.second) { + filter->publish_finally(); + } + } + } + + bool empty() { return !_runtime_filters.size(); } + +private: + const std::vector& _probe_expr_context; + const std::vector& _build_expr_context; + const std::vector& _runtime_filter_descs; + // prob_contition index -> [IRuntimeFilter] + std::map> _runtime_filters; +}; + +using RuntimeFilterSlots = RuntimeFilterSlotsBase; + +} // namespace doris diff --git a/be/src/runtime/primitive_type.h b/be/src/runtime/primitive_type.h index cdafb2ced8..36113a55cd 100644 --- a/be/src/runtime/primitive_type.h +++ b/be/src/runtime/primitive_type.h @@ -242,6 +242,10 @@ struct PrimitiveTypeTraits { using CppType = float; }; template <> +struct PrimitiveTypeTraits { + using CppType = double; +}; +template <> struct PrimitiveTypeTraits { using CppType = double; };