[Rf](exec) Support build exactly not cal by ndv (#30398)

This commit is contained in:
HappenLee
2024-01-26 17:14:46 +08:00
committed by yiguolei
parent 5e4674ab66
commit 2befa75b9c
4 changed files with 15 additions and 33 deletions

View File

@ -83,21 +83,11 @@ class BloomFilterFuncBase : public FilterFuncBase {
public:
virtual ~BloomFilterFuncBase() = default;
Status init(int64_t expect_num, double fpp) {
size_t filter_size = BloomFilterAdaptor::optimal_bit_num(expect_num, fpp);
return init_with_fixed_length(filter_size);
}
void set_length(int64_t bloom_filter_length) { _bloom_filter_length = bloom_filter_length; }
void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly = build_bf_exactly; }
Status init_with_fixed_length() {
if (_build_bf_exactly) {
return Status::OK();
}
return init_with_fixed_length(_bloom_filter_length);
}
Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); }
Status init_with_cardinality(const size_t build_bf_cardinality) {
if (_build_bf_exactly) {
@ -109,10 +99,9 @@ public:
// Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8) / std::log(2))));
_bloom_filter_length = std::min(((int64_t)1) << log_filter_size, _bloom_filter_length);
return init_with_fixed_length(_bloom_filter_length);
_bloom_filter_length = (((int64_t)1) << log_filter_size);
}
return Status::OK();
return init_with_fixed_length(_bloom_filter_length);
}
Status init_with_fixed_length(int64_t bloom_filter_length) {

View File

@ -338,15 +338,17 @@ public:
return Status::OK();
}
void change_to_bloom_filter() {
void change_to_bloom_filter(bool need_init_bf = false) {
CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER)
<< "Can not change to bloom filter because of runtime filter type is "
<< IRuntimeFilter::to_string(_filter_type);
_is_bloomfilter = true;
BloomFilterFuncBase* bf = _context.bloom_filter_func.get();
// BloomFilter may be not init
static_cast<void>(bf->init_with_fixed_length());
insert_to_bloom_filter(bf);
if (need_init_bf) {
// BloomFilter may be not init
static_cast<void>(bf->init_with_fixed_length());
insert_to_bloom_filter(bf);
}
// release in filter
_context.hybrid_set.reset(create_set(_column_return_type));
}
@ -533,12 +535,12 @@ public:
VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id
<< ") because: in_num(" << _context.hybrid_set->size()
<< ") >= max_in_num(" << _max_in_num << ")";
change_to_bloom_filter();
change_to_bloom_filter(true);
}
} else {
VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id
<< ") because: already exist a bloom filter";
change_to_bloom_filter();
change_to_bloom_filter(true);
RETURN_IF_ERROR(_context.bloom_filter_func->merge(
wrapper->_context.bloom_filter_func.get()));
}
@ -1198,7 +1200,10 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
// 1. Only 1 join key
// 2. Do not have remote target (e.g. do not need to merge), or broadcast join
// 3. Bloom filter
params.build_bf_exactly = build_bf_exactly && (!_has_remote_target || _is_broadcast_join) &&
// 4. FE do not use ndv stat to predict the bf size, only the row count. BE have more
// exactly row count stat
params.build_bf_exactly = build_bf_exactly && !desc->bloom_filter_size_calculated_by_ndv &&
(!_has_remote_target || _is_broadcast_join) &&
(_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER ||
_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);
if (desc->__isset.bloom_filter_size_bytes) {

View File

@ -116,13 +116,6 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
}
}
return Status::OK();
}

View File

@ -662,11 +662,6 @@ Status HashJoinNode::alloc_resource(doris::RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_allocate_resource_timer);
RETURN_IF_ERROR(VJoinNodeBase::alloc_resource(state));
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
if (auto* bf = _runtime_filters[i]->get_bloomfilter()) {
RETURN_IF_ERROR(bf->init_with_fixed_length());
}
}
RETURN_IF_ERROR(VExpr::open(_build_expr_ctxs, state));
RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
for (auto& conjunct : _other_join_conjuncts) {