[Improvement](bloom filter) initialize bloom filter with adaptive size (#18785)
This commit is contained in:
@ -92,7 +92,31 @@ public:
|
||||
|
||||
void set_length(int64_t bloom_filter_length) { _bloom_filter_length = bloom_filter_length; }
|
||||
|
||||
Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); }
|
||||
void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly = build_bf_exactly; }
|
||||
|
||||
Status init_with_fixed_length() {
|
||||
if (_build_bf_exactly) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
return init_with_fixed_length(_bloom_filter_length);
|
||||
}
|
||||
}
|
||||
|
||||
Status init_with_cardinality(const size_t build_bf_cardinality) {
|
||||
if (_build_bf_exactly) {
|
||||
// Use the same algorithm as org.apache.doris.planner.RuntimeFilter#calculateFilterSize
|
||||
constexpr double fpp = 0.05;
|
||||
constexpr double k = 8; // BUCKET_WORDS
|
||||
// m is the number of bits we would need to get the fpp specified
|
||||
double m = -k * build_bf_cardinality / std::log(1 - std::pow(fpp, 1.0 / k));
|
||||
|
||||
// Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
|
||||
int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / std::log(2))));
|
||||
return init_with_fixed_length(((int64_t)1) << log_filter_size);
|
||||
} else {
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
Status init_with_fixed_length(int64_t bloom_filter_length) {
|
||||
if (_inited) {
|
||||
@ -198,6 +222,7 @@ protected:
|
||||
bool _inited;
|
||||
std::mutex _lock;
|
||||
int64_t _bloom_filter_length;
|
||||
bool _build_bf_exactly = false;
|
||||
};
|
||||
|
||||
template <class T>
|
||||
|
||||
@ -384,6 +384,7 @@ public:
|
||||
_is_bloomfilter = true;
|
||||
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
|
||||
_context.bloom_filter_func->set_length(params->bloom_filter_size);
|
||||
_context.bloom_filter_func->set_build_bf_exactly(params->build_bf_exactly);
|
||||
return Status::OK();
|
||||
}
|
||||
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
|
||||
@ -413,6 +414,11 @@ public:
|
||||
_context.hybrid_set.reset(create_set(_column_return_type));
|
||||
}
|
||||
|
||||
Status init_bloom_filter(const size_t build_bf_cardinality) {
|
||||
DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER);
|
||||
return _context.bloom_filter_func->init_with_cardinality(build_bf_cardinality);
|
||||
}
|
||||
|
||||
void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
|
||||
if (_context.hybrid_set->size() > 0) {
|
||||
auto it = _context.hybrid_set->begin();
|
||||
@ -1034,11 +1040,12 @@ private:
|
||||
|
||||
Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc,
|
||||
const TQueryOptions* query_options, const RuntimeFilterRole role,
|
||||
int node_id, IRuntimeFilter** res) {
|
||||
int node_id, IRuntimeFilter** res, bool build_bf_exactly) {
|
||||
*res = pool->add(new IRuntimeFilter(state, pool));
|
||||
(*res)->set_role(role);
|
||||
UniqueId fragment_instance_id(state->fragment_instance_id());
|
||||
return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id);
|
||||
return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id,
|
||||
build_bf_exactly);
|
||||
}
|
||||
|
||||
void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) {
|
||||
@ -1241,7 +1248,8 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
|
||||
UniqueId fragment_instance_id, int node_id) {
|
||||
UniqueId fragment_instance_id, int node_id,
|
||||
bool build_bf_exactly) {
|
||||
// if node_id == -1 , it shouldn't be a consumer
|
||||
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
|
||||
|
||||
@ -1273,6 +1281,8 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
|
||||
params.filter_type = _runtime_filter_type;
|
||||
params.column_return_type = build_ctx->root()->type().type;
|
||||
params.max_in_num = options->runtime_filter_max_in_num;
|
||||
params.build_bf_exactly = build_bf_exactly && !_has_remote_target &&
|
||||
_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER;
|
||||
if (desc->__isset.bloom_filter_size_bytes) {
|
||||
params.bloom_filter_size = desc->bloom_filter_size_bytes;
|
||||
}
|
||||
@ -1336,6 +1346,10 @@ void IRuntimeFilter::change_to_bloom_filter() {
|
||||
}
|
||||
}
|
||||
|
||||
Status IRuntimeFilter::init_bloom_filter(const size_t build_bf_cardinality) {
|
||||
return _wrapper->init_bloom_filter(build_bf_cardinality);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, ObjectPool* pool,
|
||||
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
|
||||
|
||||
@ -117,6 +117,7 @@ struct RuntimeFilterParams {
|
||||
int32_t filter_id;
|
||||
UniqueId fragment_instance_id;
|
||||
bool bitmap_filter_not_in;
|
||||
bool build_bf_exactly;
|
||||
};
|
||||
|
||||
struct UpdateRuntimeFilterParams {
|
||||
@ -169,7 +170,7 @@ public:
|
||||
|
||||
static Status create(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc,
|
||||
const TQueryOptions* query_options, const RuntimeFilterRole role,
|
||||
int node_id, IRuntimeFilter** res);
|
||||
int node_id, IRuntimeFilter** res, bool build_bf_exactly = false);
|
||||
|
||||
void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
|
||||
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);
|
||||
@ -225,7 +226,7 @@ public:
|
||||
|
||||
// init filter with desc
|
||||
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
|
||||
UniqueId fragment_id, int node_id = -1);
|
||||
UniqueId fragment_id, int node_id = -1, bool build_bf_exactly = false);
|
||||
|
||||
BloomFilterFuncBase* get_bloomfilter() const;
|
||||
|
||||
@ -244,6 +245,7 @@ public:
|
||||
ObjectPool* pool,
|
||||
std::unique_ptr<RuntimePredicateWrapper>* wrapper);
|
||||
void change_to_bloom_filter();
|
||||
Status init_bloom_filter(const size_t build_bf_cardinality);
|
||||
Status update_filter(const UpdateRuntimeFilterParams* param);
|
||||
|
||||
void set_ignored() { _is_ignored = true; }
|
||||
|
||||
@ -39,7 +39,7 @@ public:
|
||||
_build_expr_context(build_expr_ctxs),
|
||||
_runtime_filter_descs(runtime_filter_descs) {}
|
||||
|
||||
Status init(RuntimeState* state, int64_t hash_table_size) {
|
||||
Status init(RuntimeState* state, int64_t hash_table_size, size_t build_bf_cardinality) {
|
||||
DCHECK(_probe_expr_context.size() == _build_expr_context.size());
|
||||
|
||||
// runtime filter effect strategy
|
||||
@ -104,6 +104,10 @@ public:
|
||||
runtime_filter->change_to_bloom_filter();
|
||||
}
|
||||
|
||||
if (runtime_filter->type() == RuntimeFilterType::BLOOM_FILTER) {
|
||||
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(build_bf_cardinality));
|
||||
}
|
||||
|
||||
// Note:
|
||||
// In the case that exist *remote target* and in filter and other filter,
|
||||
// we must merge other filter whatever in filter is over the max num in current node,
|
||||
|
||||
@ -91,7 +91,8 @@ Status RuntimeFilterMgr::get_producer_filter(const int filter_id,
|
||||
|
||||
Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role,
|
||||
const TRuntimeFilterDesc& desc,
|
||||
const TQueryOptions& options, int node_id) {
|
||||
const TQueryOptions& options, int node_id,
|
||||
bool build_bf_exactly) {
|
||||
DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) ||
|
||||
role != RuntimeFilterRole::CONSUMER);
|
||||
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
|
||||
@ -114,7 +115,7 @@ Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role,
|
||||
filter_mgr_val.role = role;
|
||||
|
||||
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id,
|
||||
&filter_mgr_val.filter));
|
||||
&filter_mgr_val.filter, build_bf_exactly));
|
||||
|
||||
filter_map->emplace(key, filter_mgr_val);
|
||||
|
||||
@ -162,7 +163,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
|
||||
std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
|
||||
// LOG(INFO) << "entity filter id:" << filter_id;
|
||||
cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options,
|
||||
_fragment_instance_id);
|
||||
_fragment_instance_id, -1, false);
|
||||
_filter_map.emplace(filter_id, cntVal);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -73,7 +73,8 @@ public:
|
||||
Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter);
|
||||
// regist filter
|
||||
Status register_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc,
|
||||
const TQueryOptions& options, int node_id = -1);
|
||||
const TQueryOptions& options, int node_id = -1,
|
||||
bool build_bf_exactly = false);
|
||||
|
||||
// update filter by remote
|
||||
Status update_filter(const PPublishFilterRequest* request,
|
||||
|
||||
@ -217,12 +217,14 @@ struct ProcessHashTableBuild {
|
||||
if (emplace_result.is_inserted()) {
|
||||
new (&emplace_result.get_mapped()) Mapped({k, _offset});
|
||||
inserted_rows.push_back(k);
|
||||
_join_node->_build_bf_cardinality++;
|
||||
} else { _skip_rows++; });
|
||||
} else if (has_runtime_filter && !build_unique) {
|
||||
EMPLACE_IMPL(
|
||||
if (emplace_result.is_inserted()) {
|
||||
new (&emplace_result.get_mapped()) Mapped({k, _offset});
|
||||
inserted_rows.push_back(k);
|
||||
_join_node->_build_bf_cardinality++;
|
||||
} else {
|
||||
emplace_result.get_mapped().insert({k, _offset}, *(_join_node->_arena));
|
||||
inserted_rows.push_back(k);
|
||||
@ -278,7 +280,7 @@ struct ProcessRuntimeFilterBuild {
|
||||
_join_node->_runtime_filter_descs));
|
||||
|
||||
RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init(
|
||||
state, hash_table_ctx.hash_table.get_size()));
|
||||
state, hash_table_ctx.hash_table.get_size(), _join_node->_build_bf_cardinality));
|
||||
|
||||
if (!_join_node->_runtime_filter_slots->empty() && !_join_node->_inserted_rows.empty()) {
|
||||
{
|
||||
@ -374,7 +376,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
_runtime_filters.resize(_runtime_filter_descs.size());
|
||||
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
|
||||
RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options()));
|
||||
RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i], state->query_options(), -1,
|
||||
_probe_expr_ctxs.size() == 1));
|
||||
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
|
||||
_runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
|
||||
}
|
||||
@ -912,7 +915,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
|
||||
_runtime_filter_descs));
|
||||
|
||||
RETURN_IF_ERROR(_runtime_filter_slots->init(
|
||||
state, arg.hash_table.get_size()));
|
||||
state, arg.hash_table.get_size(), 0));
|
||||
RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context(
|
||||
_shared_hash_table_context));
|
||||
_runtime_filter_slots->publish();
|
||||
|
||||
@ -383,6 +383,7 @@ private:
|
||||
std::unordered_map<const Block*, std::vector<int>> _inserted_rows;
|
||||
|
||||
std::vector<IRuntimeFilter*> _runtime_filters;
|
||||
size_t _build_bf_cardinality = 0;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user