From 293e11553641fd9743da124141ee5622fe116eb5 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 20 Apr 2023 10:06:40 +0800 Subject: [PATCH] [Improvement](bloom filter) initialize bloom filter with adaptive size (#18785) --- be/src/exprs/bloom_filter_func.h | 27 +++++++++++++++++++++++- be/src/exprs/runtime_filter.cpp | 20 +++++++++++++++--- be/src/exprs/runtime_filter.h | 6 ++++-- be/src/exprs/runtime_filter_slots.h | 6 +++++- be/src/runtime/runtime_filter_mgr.cpp | 7 +++--- be/src/runtime/runtime_filter_mgr.h | 3 ++- be/src/vec/exec/join/vhash_join_node.cpp | 9 +++++--- be/src/vec/exec/join/vhash_join_node.h | 1 + 8 files changed, 65 insertions(+), 14 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index eb8ee79e0d..d2b4ffd97c 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -92,7 +92,31 @@ public: void set_length(int64_t bloom_filter_length) { _bloom_filter_length = bloom_filter_length; } - Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); } + void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly = build_bf_exactly; } + + Status init_with_fixed_length() { + if (_build_bf_exactly) { + return Status::OK(); + } else { + return init_with_fixed_length(_bloom_filter_length); + } + } + + Status init_with_cardinality(const size_t build_bf_cardinality) { + if (_build_bf_exactly) { + // Use the same algorithm as org.apache.doris.planner.RuntimeFilter#calculateFilterSize + constexpr double fpp = 0.05; + constexpr double k = 8; // BUCKET_WORDS + // m is the number of bits we would need to get the fpp specified + double m = -k * build_bf_cardinality / std::log(1 - std::pow(fpp, 1.0 / k)); + + // Handle case where ndv == 1 => ceil(log2(m/8)) < 0. + int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / std::log(2)))); + return init_with_fixed_length(((int64_t)1) << log_filter_size); + } else { + return Status::OK(); + } + } Status init_with_fixed_length(int64_t bloom_filter_length) { if (_inited) { @@ -198,6 +222,7 @@ protected: bool _inited; std::mutex _lock; int64_t _bloom_filter_length; + bool _build_bf_exactly = false; }; template diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index fbcf08d5d5..efd72ffd0e 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -384,6 +384,7 @@ public: _is_bloomfilter = true; _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type)); _context.bloom_filter_func->set_length(params->bloom_filter_size); + _context.bloom_filter_func->set_build_bf_exactly(params->build_bf_exactly); return Status::OK(); } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { @@ -413,6 +414,11 @@ public: _context.hybrid_set.reset(create_set(_column_return_type)); } + Status init_bloom_filter(const size_t build_bf_cardinality) { + DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER); + return _context.bloom_filter_func->init_with_cardinality(build_bf_cardinality); + } + void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const { if (_context.hybrid_set->size() > 0) { auto it = _context.hybrid_set->begin(); @@ -1034,11 +1040,12 @@ private: Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, IRuntimeFilter** res) { + int node_id, IRuntimeFilter** res, bool build_bf_exactly) { *res = pool->add(new IRuntimeFilter(state, pool)); (*res)->set_role(role); UniqueId fragment_instance_id(state->fragment_instance_id()); - return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id); + return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id, + build_bf_exactly); } void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) { @@ -1241,7 +1248,8 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - UniqueId fragment_instance_id, int node_id) { + UniqueId fragment_instance_id, int node_id, + bool build_bf_exactly) { // if node_id == -1 , it shouldn't be a consumer DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer())); @@ -1273,6 +1281,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue params.filter_type = _runtime_filter_type; params.column_return_type = build_ctx->root()->type().type; params.max_in_num = options->runtime_filter_max_in_num; + params.build_bf_exactly = build_bf_exactly && !_has_remote_target && + _runtime_filter_type == RuntimeFilterType::BLOOM_FILTER; if (desc->__isset.bloom_filter_size_bytes) { params.bloom_filter_size = desc->bloom_filter_size_bytes; } @@ -1336,6 +1346,10 @@ void IRuntimeFilter::change_to_bloom_filter() { } } +Status IRuntimeFilter::init_bloom_filter(const size_t build_bf_cardinality) { + return _wrapper->init_bloom_filter(build_bf_cardinality); +} + template Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, ObjectPool* pool, std::unique_ptr* wrapper) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index a76b02080f..a2f6995d59 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -117,6 +117,7 @@ struct RuntimeFilterParams { int32_t filter_id; UniqueId fragment_instance_id; bool bitmap_filter_not_in; + bool build_bf_exactly; }; struct UpdateRuntimeFilterParams { @@ -169,7 +170,7 @@ public: static Status create(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, IRuntimeFilter** res); + int node_id, IRuntimeFilter** res, bool build_bf_exactly = false); void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context); Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context); @@ -225,7 +226,7 @@ public: // init filter with desc Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - UniqueId fragment_id, int node_id = -1); + UniqueId fragment_id, int node_id = -1, bool build_bf_exactly = false); BloomFilterFuncBase* get_bloomfilter() const; @@ -244,6 +245,7 @@ public: ObjectPool* pool, std::unique_ptr* wrapper); void change_to_bloom_filter(); + Status init_bloom_filter(const size_t build_bf_cardinality); Status update_filter(const UpdateRuntimeFilterParams* param); void set_ignored() { _is_ignored = true; } diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 2d3b0c2c73..88fcccc47b 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -39,7 +39,7 @@ public: _build_expr_context(build_expr_ctxs), _runtime_filter_descs(runtime_filter_descs) {} - Status init(RuntimeState* state, int64_t hash_table_size) { + Status init(RuntimeState* state, int64_t hash_table_size, size_t build_bf_cardinality) { DCHECK(_probe_expr_context.size() == _build_expr_context.size()); // runtime filter effect strategy @@ -104,6 +104,10 @@ public: runtime_filter->change_to_bloom_filter(); } + if (runtime_filter->type() == RuntimeFilterType::BLOOM_FILTER) { + RETURN_IF_ERROR(runtime_filter->init_bloom_filter(build_bf_cardinality)); + } + // Note: // In the case that exist *remote target* and in filter and other filter, // we must merge other filter whatever in filter is over the max num in current node, diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 187948d1a5..e956122130 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -91,7 +91,8 @@ Status RuntimeFilterMgr::get_producer_filter(const int filter_id, Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id) { + const TQueryOptions& options, int node_id, + bool build_bf_exactly) { DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) || role != RuntimeFilterRole::CONSUMER); SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); @@ -114,7 +115,7 @@ Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role, filter_mgr_val.role = role; RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id, - &filter_mgr_val.filter)); + &filter_mgr_val.filter, build_bf_exactly)); filter_map->emplace(key, filter_mgr_val); @@ -162,7 +163,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( std::string filter_id = std::to_string(runtime_filter_desc->filter_id); // LOG(INFO) << "entity filter id:" << filter_id; cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, - _fragment_instance_id); + _fragment_instance_id, -1, false); _filter_map.emplace(filter_id, cntVal); return Status::OK(); } diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 6bf9454e32..09a760eff8 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -73,7 +73,8 @@ public: Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter); // regist filter Status register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id = -1); + const TQueryOptions& options, int node_id = -1, + bool build_bf_exactly = false); // update filter by remote Status update_filter(const PPublishFilterRequest* request, diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 5c3eb3edd9..f6bfe66525 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -217,12 +217,14 @@ struct ProcessHashTableBuild { if (emplace_result.is_inserted()) { new (&emplace_result.get_mapped()) Mapped({k, _offset}); inserted_rows.push_back(k); + _join_node->_build_bf_cardinality++; } else { _skip_rows++; }); } else if (has_runtime_filter && !build_unique) { EMPLACE_IMPL( if (emplace_result.is_inserted()) { new (&emplace_result.get_mapped()) Mapped({k, _offset}); inserted_rows.push_back(k); + _join_node->_build_bf_cardinality++; } else { emplace_result.get_mapped().insert({k, _offset}, *(_join_node->_arena)); inserted_rows.push_back(k); @@ -278,7 +280,7 @@ struct ProcessRuntimeFilterBuild { _join_node->_runtime_filter_descs)); RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init( - state, hash_table_ctx.hash_table.get_size())); + state, hash_table_ctx.hash_table.get_size(), _join_node->_build_bf_cardinality)); if (!_join_node->_runtime_filter_slots->empty() && !_join_node->_inserted_rows.empty()) { { @@ -374,7 +376,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _runtime_filters.resize(_runtime_filter_descs.size()); for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options())); + RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options(), -1, + _probe_expr_ctxs.size() == 1)); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter( _runtime_filter_descs[i].filter_id, &_runtime_filters[i])); } @@ -912,7 +915,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc _runtime_filter_descs)); RETURN_IF_ERROR(_runtime_filter_slots->init( - state, arg.hash_table.get_size())); + state, arg.hash_table.get_size(), 0)); RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); _runtime_filter_slots->publish(); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 960988e718..eb5bf7d240 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -383,6 +383,7 @@ private: std::unordered_map> _inserted_rows; std::vector _runtime_filters; + size_t _build_bf_cardinality = 0; }; } // namespace vectorized } // namespace doris