[improvement](scan) make global runtime filter support in-list filter (#29394)

This commit is contained in:
Jerry Hu
2024-01-04 09:12:30 +08:00
committed by GitHub
parent 3b7d5feb84
commit 4ba4767eef
4 changed files with 32 additions and 28 deletions

View File

@ -453,9 +453,11 @@ public:
std::vector<vectorized::VExprSPtr>& push_exprs, const TExpr& probe_expr);
Status merge(const RuntimePredicateWrapper* wrapper) {
bool can_not_merge_in_or_bloom = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
(wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER);
bool can_not_merge_in_or_bloom =
_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
(wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER &&
wrapper->_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER);
bool can_not_merge_other = _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER &&
_filter_type != wrapper->_filter_type;
@ -513,8 +515,15 @@ public:
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
auto real_filter_type = _is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER
: RuntimeFilterType::IN_FILTER;
auto other_filter_type = wrapper->_filter_type;
if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
other_filter_type = wrapper->_is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER
: RuntimeFilterType::IN_FILTER;
}
if (real_filter_type == RuntimeFilterType::IN_FILTER) {
if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in
if (other_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in
CHECK(!wrapper->_is_ignored_in_filter)
<< " can not ignore merge runtime filter(in filter id "
<< wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: "
@ -526,7 +535,6 @@ public:
<< ") >= max_in_num(" << _max_in_num << ")";
change_to_bloom_filter();
}
// in merge bloom filter
} else {
VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id
<< ") because: already exist a bloom filter";
@ -535,8 +543,7 @@ public:
wrapper->_context.bloom_filter_func.get()));
}
} else {
if (wrapper->_filter_type ==
RuntimeFilterType::IN_FILTER) { // bloom filter merge in
if (other_filter_type == RuntimeFilterType::IN_FILTER) { // bloom filter merge in
CHECK(!wrapper->_is_ignored_in_filter)
<< " can not ignore merge runtime filter(in filter id "
<< wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: "
@ -1157,6 +1164,14 @@ void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTim
_filter_timer.push_back(timer);
}
void IRuntimeFilter::set_ignored(const std::string& msg) {
_is_ignored = true;
if (_wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
_wrapper->_is_ignored_in_filter = true;
_wrapper->_ignored_in_filter_msg = _pool->add(new std::string(msg));
}
}
BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
return _wrapper->get_bloomfilter();
}
@ -1344,14 +1359,14 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {
Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
if (!_is_ignored && wrapper->is_ignored_in_filter()) {
set_ignored();
set_ignored_msg(*(wrapper->get_ignored_in_filter_msg()));
std::string* msg = wrapper->get_ignored_in_filter_msg();
set_ignored(msg ? *msg : "");
}
auto origin_type = _wrapper->get_real_type();
Status status = _wrapper->merge(wrapper);
if (!_is_ignored && _wrapper->is_ignored_in_filter()) {
set_ignored();
set_ignored_msg(*(_wrapper->get_ignored_in_filter_msg()));
std::string* msg = _wrapper->get_ignored_in_filter_msg();
set_ignored(msg ? *msg : "");
}
if (origin_type != _wrapper->get_real_type()) {
update_runtime_filter_type_to_profile();
@ -1656,10 +1671,8 @@ bool IRuntimeFilter::is_bloomfilter() {
Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) {
set_ignored();
const PInFilter in_filter = param->request->in_filter();
auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
set_ignored_msg(*msg);
set_ignored(in_filter.ignored_msg());
}
std::unique_ptr<RuntimePredicateWrapper> wrapper;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, param, _pool, &wrapper));
@ -1677,10 +1690,8 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
int64_t start_apply) {
if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) {
set_ignored();
const PInFilter in_filter = param->request->in_filter();
auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
set_ignored_msg(*msg);
set_ignored(in_filter.ignored_msg());
}
std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;

View File

@ -292,9 +292,7 @@ public:
Status update_filter(const UpdateRuntimeFilterParams* param);
Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t start_apply);
void set_ignored() { _is_ignored = true; }
void set_ignored_msg(std::string& msg) { _ignored_msg = msg; }
void set_ignored(const std::string& msg);
// for ut
bool is_bloomfilter();

View File

@ -58,15 +58,14 @@ public:
filter_id);
}
for (auto filter : filters) {
filter->set_ignored();
filter->set_ignored("");
filter->signal();
}
return Status::OK();
};
auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) {
runtime_filter->set_ignored();
runtime_filter->set_ignored_msg(msg);
runtime_filter->set_ignored(msg);
RETURN_IF_ERROR(runtime_filter->publish());
return Status::OK();
};

View File

@ -369,11 +369,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
.filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) {
// If storage data distribution is ignored, we use BLOOM filter.
legalTypes.clear();
legalTypes.add(TRuntimeFilterType.BLOOM);
}
List<EqualTo> hashJoinConjuncts = join.getEqualToConjuncts();
for (int i = 0; i < hashJoinConjuncts.size(); i++) {
EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(