diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index f5e822e257..397f86a369 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -185,10 +185,7 @@ public: return _bloom_filter->contain_null(); } - void set_contain_null() { - DCHECK(_bloom_filter); - _bloom_filter->set_contain_null(); - } + void set_contain_null() { _bloom_filter->set_contain_null(); } size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 0; } diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h index 96e0c3f879..ba5fabe509 100644 --- a/be/src/exprs/hybrid_set.h +++ b/be/src/exprs/hybrid_set.h @@ -192,6 +192,7 @@ public: insert(value); iter->next(); } + _contains_null |= set->_contains_null; } virtual int size() = 0; @@ -231,6 +232,9 @@ public: }; virtual IteratorBase* begin() = 0; + + bool contain_null() const { return _contains_null && _null_aware; } + bool _contains_null = false; }; template @@ -268,10 +272,12 @@ public: void insert(const void* data) override { if (data == nullptr) { + _contains_null = true; return; } _set.insert(*reinterpret_cast(data)); } + void insert(void* data, size_t /*unused*/) override { insert(data); } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { @@ -288,6 +294,8 @@ public: for (size_t i = start; i < size; i++) { if (!nullmap[i]) { _set.insert(*(data + i)); + } else { + _contains_null = true; } } } else { @@ -392,6 +400,7 @@ public: void insert(const void* data) override { if (data == nullptr) { + _contains_null = true; return; } @@ -401,8 +410,12 @@ public: } void insert(void* data, size_t size) override { - std::string str_value(reinterpret_cast(data), size); - _set.insert(str_value); + if (data == nullptr) { + insert(nullptr); + } else { + std::string str_value(reinterpret_cast(data), size); + _set.insert(str_value); + } } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { @@ -417,6 +430,8 @@ public: for (size_t i = start; i < nullable->size(); i++) { if (!nullmap[i]) { _set.insert(col.get_data_at(i).to_string()); + } else { + _contains_null = true; } } } else { @@ -534,6 +549,7 @@ public: void insert(const void* data) override { if (data == nullptr) { + _contains_null = true; return; } @@ -543,8 +559,12 @@ public: } void insert(void* data, size_t size) override { - StringRef sv(reinterpret_cast(data), size); - _set.insert(sv); + if (data == nullptr) { + insert(nullptr); + } else { + StringRef sv(reinterpret_cast(data), size); + _set.insert(sv); + } } void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) override { @@ -559,6 +579,8 @@ public: for (size_t i = start; i < nullable->size(); i++) { if (!nullmap[i]) { _set.insert(col.get_data_at(i)); + } else { + _contains_null = true; } } } else { diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index e8e169e8b9..963bcae7b0 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -280,15 +280,16 @@ class RuntimePredicateWrapper { public: RuntimePredicateWrapper(ObjectPool* pool, const RuntimeFilterParams* params) : RuntimePredicateWrapper(pool, params->column_return_type, params->filter_type, - params->filter_id) {}; + params->filter_id, params->build_bf_exactly) {}; // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge RuntimePredicateWrapper(ObjectPool* pool, PrimitiveType column_type, RuntimeFilterType type, - uint32_t filter_id) + uint32_t filter_id, bool build_bf_exactly = false) : _pool(pool), _column_return_type(column_type), _filter_type(type), - _filter_id(filter_id) {} + _filter_id(filter_id), + _build_bf_exactly(build_bf_exactly) {} // init runtime filter wrapper // alloc memory to init runtime filter function @@ -297,6 +298,7 @@ public: switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { _context.hybrid_set.reset(create_set(_column_return_type)); + _context.hybrid_set->set_null_aware(params->null_aware); break; } case RuntimeFilterType::MIN_FILTER: @@ -315,6 +317,7 @@ public: } case RuntimeFilterType::IN_OR_BLOOM_FILTER: { _context.hybrid_set.reset(create_set(_column_return_type)); + _context.hybrid_set->set_null_aware(params->null_aware); _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type)); _context.bloom_filter_func->set_length(params->bloom_filter_size); _context.bloom_filter_func->set_build_bf_exactly(params->build_bf_exactly); @@ -362,6 +365,9 @@ public: it->next(); } } + if (_context.hybrid_set->contain_null()) { + bloom_filter->set_contain_null(); + } } BloomFilterFuncBase* get_bloomfilter() const { return _context.bloom_filter_func.get(); } @@ -428,7 +434,7 @@ public: _context.bitmap_filter_func->insert_many(bitmaps); } - RuntimeFilterType get_real_type() { + RuntimeFilterType get_real_type() const { auto real_filter_type = _filter_type; if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { real_filter_type = _is_bloomfilter ? RuntimeFilterType::BLOOM_FILTER @@ -437,7 +443,7 @@ public: return real_filter_type; } - size_t get_bloom_filter_size() { + size_t get_bloom_filter_size() const { if (_is_bloomfilter) { return _context.bloom_filter_func->get_size(); } @@ -516,7 +522,7 @@ public: } else { VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id << ") because: already exist a bloom filter"; - RETURN_IF_ERROR(change_to_bloom_filter()); + RETURN_IF_ERROR(change_to_bloom_filter(!_build_bf_exactly)); RETURN_IF_ERROR(_context.bloom_filter_func->merge( wrapper->_context.bloom_filter_func.get())); } @@ -541,7 +547,7 @@ public: return Status::OK(); } - Status assign(const PInFilter* in_filter) { + Status assign(const PInFilter* in_filter, bool contain_null) { if (in_filter->has_ignored_msg()) { VLOG_DEBUG << "Ignore in filter(id=" << _filter_id << ") because: " << in_filter->ignored_msg(); @@ -552,6 +558,11 @@ public: PrimitiveType type = to_primitive_type(in_filter->column_type()); _context.hybrid_set.reset(create_set(type)); + if (contain_null) { + _context.hybrid_set->set_null_aware(true); + _context.hybrid_set->insert((const void*)nullptr); + } + switch (type) { case TYPE_BOOLEAN: { batch_assign(in_filter, [](std::shared_ptr& set, PColumnValue& column, @@ -882,7 +893,14 @@ public: bool is_bloomfilter() const { return _is_bloomfilter; } bool contain_null() const { - return _is_bloomfilter && _context.bloom_filter_func->contain_null(); + if (_is_bloomfilter) { + return _context.bloom_filter_func->contain_null(); + } + if (_context.hybrid_set) { + DCHECK(get_real_type() == RuntimeFilterType::IN_FILTER); + return _context.hybrid_set->contain_null(); + } + return false; } bool is_ignored() const { return _ignored; } @@ -931,6 +949,7 @@ private: bool _ignored = false; std::string _ignored_msg; uint32_t _filter_id; + bool _build_bf_exactly; }; Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* pool, @@ -1315,7 +1334,7 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, switch (filter_type) { case PFilterType::IN_FILTER: { DCHECK(param->request->has_in_filter()); - return (*wrapper)->assign(¶m->request->in_filter()); + return (*wrapper)->assign(¶m->request->in_filter(), param->request->contain_null()); } case PFilterType::BLOOM_FILTER: { DCHECK(param->request->has_bloom_filter()); @@ -1358,7 +1377,7 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, switch (filter_type) { case PFilterType::IN_FILTER: { DCHECK(param->request->has_in_filter()); - return (*wrapper)->assign(¶m->request->in_filter()); + return (*wrapper)->assign(¶m->request->in_filter(), param->request->contain_null()); } case PFilterType::BLOOM_FILTER: { DCHECK(param->request->has_bloom_filter()); @@ -1742,17 +1761,19 @@ Status RuntimePredicateWrapper::get_push_exprs( << " _filter_type: " << IRuntimeFilter::to_string(_filter_type); auto real_filter_type = get_real_type(); + bool null_aware = contain_null(); switch (real_filter_type) { case RuntimeFilterType::IN_FILTER: { TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN); type_desc.__set_is_nullable(false); TExprNode node; node.__set_type(type_desc); - node.__set_node_type(TExprNodeType::IN_PRED); + node.__set_node_type(null_aware ? TExprNodeType::NULL_AWARE_IN_PRED + : TExprNodeType::IN_PRED); node.in_predicate.__set_is_not_in(false); node.__set_opcode(TExprOpcode::FILTER_IN); node.__set_is_nullable(false); - auto in_pred = vectorized::VDirectInPredicate::create_shared(node); + auto in_pred = vectorized::VDirectInPredicate::create_shared(node, null_aware); in_pred->set_filter(_context.hybrid_set); in_pred->add_child(probe_ctx->root()); auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, in_pred); diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 987034a76f..c10e7777bd 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -804,7 +804,7 @@ Status ScanLocalState::_normalize_not_in_and_not_eq_predicate( HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); auto fn_name = std::string(""); - if (!is_fixed_range && state->null_in_set) { + if (!is_fixed_range && state->hybrid_set->contain_null()) { _eos = true; _scan_dependency->set_ready(); } diff --git a/be/src/runtime/types.cpp b/be/src/runtime/types.cpp index 10a6b47f84..14ba4b2ceb 100644 --- a/be/src/runtime/types.cpp +++ b/be/src/runtime/types.cpp @@ -66,7 +66,7 @@ TypeDescriptor::TypeDescriptor(const std::vector& types, int* idx) DCHECK_LT(*idx, types.size() - 1); type = TYPE_ARRAY; contains_nulls.reserve(1); - // here should compatible with fe 1.2, because use contains_null in contains_nulls + // here should compatible with fe 1.2, because use contain_null in contains_nulls if (node.__isset.contains_nulls) { DCHECK_EQ(node.contains_nulls.size(), 1); contains_nulls.push_back(node.contains_nulls[0]); @@ -94,7 +94,7 @@ TypeDescriptor::TypeDescriptor(const std::vector& types, int* idx) break; } case TTypeNodeType::MAP: { - //TODO(xy): handle contains_null[0] for key and [1] for value + //TODO(xy): handle contain_null[0] for key and [1] for value DCHECK(!node.__isset.scalar_type); DCHECK_LT(*idx, types.size() - 2); DCHECK_EQ(node.contains_nulls.size(), 2); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 4c0129b2bd..5f8e6d8aa4 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -864,7 +864,7 @@ Status VScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, VExprConte HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); auto fn_name = std::string(""); - if (!is_fixed_range && state->null_in_set) { + if (!is_fixed_range && state->hybrid_set->contain_null()) { _eos = true; } while (iter->has_next()) { diff --git a/be/src/vec/exprs/vdirect_in_predicate.h b/be/src/vec/exprs/vdirect_in_predicate.h index fbba76de61..9b3d861b3b 100644 --- a/be/src/vec/exprs/vdirect_in_predicate.h +++ b/be/src/vec/exprs/vdirect_in_predicate.h @@ -26,8 +26,11 @@ class VDirectInPredicate final : public VExpr { ENABLE_FACTORY_CREATOR(VDirectInPredicate); public: - VDirectInPredicate(const TExprNode& node) - : VExpr(node), _filter(nullptr), _expr_name("direct_in_predicate") {} + VDirectInPredicate(const TExprNode& node, bool null_aware = false) + : VExpr(node), + _filter(nullptr), + _expr_name("direct_in_predicate"), + _null_aware(null_aware) {} ~VDirectInPredicate() override = default; Status prepare(RuntimeState* state, const RowDescriptor& row_desc, @@ -93,7 +96,7 @@ public: private: std::shared_ptr _filter; - bool _null_aware = false; std::string _expr_name; + bool _null_aware; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/functions/in.h b/be/src/vec/functions/in.h index 9fa182caf0..1bfbf7eb2d 100644 --- a/be/src/vec/functions/in.h +++ b/be/src/vec/functions/in.h @@ -59,9 +59,6 @@ namespace doris::vectorized { struct InState { bool use_set = true; - - // only use in null in set - bool null_in_set = false; std::unique_ptr hybrid_set; }; @@ -125,17 +122,16 @@ public: state->hybrid_set.reset( create_set(context->get_arg_type(0)->type, get_size_with_out_null(context))); } + state->hybrid_set->set_null_aware(true); + for (int i = 1; i < context->get_num_args(); ++i) { const auto& const_column_ptr = context->get_constant_col(i); if (const_column_ptr != nullptr) { auto const_data = const_column_ptr->column_ptr->get_data_at(0); - if (const_data.data == nullptr) { - state->null_in_set = true; - } else { - state->hybrid_set->insert((void*)const_data.data, const_data.size); - } + state->hybrid_set->insert((void*)const_data.data, const_data.size); } else { state->use_set = false; + state->hybrid_set.reset(); break; } } @@ -181,7 +177,7 @@ public: nested_col_ptr); } - if (!in_state->null_in_set) { + if (!in_state->hybrid_set->contain_null()) { for (size_t i = 0; i < input_rows_count; ++i) { vec_null_map_to[i] = null_map[i]; } @@ -200,7 +196,7 @@ public: search_hash_set(in_state, input_rows_count, vec_res, materialized_column.get()); } - if (in_state->null_in_set) { + if (in_state->hybrid_set->contain_null()) { for (size_t i = 0; i < input_rows_count; ++i) { vec_null_map_to[i] = negative == vec_res[i]; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 9f71bda0b4..03dc70e653 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -135,10 +135,8 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi } else { // null safe equal runtime filter only support bloom filter EqualPredicate eq = (EqualPredicate) builderNode.getHashJoinConjuncts().get(exprOrder); - if (eq instanceof NullSafeEqual && type == TRuntimeFilterType.IN_OR_BLOOM) { - type = TRuntimeFilterType.BLOOM; - } - if (eq instanceof NullSafeEqual && type != TRuntimeFilterType.BLOOM) { + if (eq instanceof NullSafeEqual && type == TRuntimeFilterType.MIN_MAX + || type == TRuntimeFilterType.BITMAP) { return false; } filter = new RuntimeFilter(generator.getNextId(), diff --git a/gensrc/thrift/Exprs.thrift b/gensrc/thrift/Exprs.thrift index 4311bd44b2..e3503f3a77 100644 --- a/gensrc/thrift/Exprs.thrift +++ b/gensrc/thrift/Exprs.thrift @@ -77,6 +77,9 @@ enum TExprNodeType { IPV4_LITERAL, IPV6_LITERAL + + // only used in runtime filter + NULL_AWARE_IN_PRED, } //enum TAggregationOp {