From 563545475ee87954d9d9ea29681d030af9725c37 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Thu, 6 Jan 2022 19:08:35 +0800 Subject: [PATCH] [Optimize](Runtime Filter) Support merge in runtime filter(#7546) (#7547) Support merge IN predicate when exist remote target(e.g. shuffle hash join). Remote the code that IN predicate implicit conversion to Bloom filter then exist remote target. Close related #7546 --- be/src/exec/hash_join_node.cpp | 3 +- be/src/exec/olap_scan_node.cpp | 4 +- be/src/exprs/runtime_filter.cpp | 361 ++++++++++++++++-- be/src/exprs/runtime_filter.h | 27 +- be/src/exprs/runtime_filter_slots.h | 51 ++- be/src/runtime/runtime_filter_mgr.cpp | 21 +- be/src/runtime/runtime_filter_mgr.h | 10 +- be/test/exprs/runtime_filter_test.cpp | 172 +++++++-- docs/en/administrator-guide/runtime-filter.md | 3 +- .../administrator-guide/runtime-filter.md | 4 +- .../apache/doris/planner/RuntimeFilter.java | 2 +- .../doris/planner/RuntimeFilterGenerator.java | 16 - .../apache/doris/planner/QueryPlanTest.java | 13 +- gensrc/proto/internal_service.proto | 9 + 14 files changed, 586 insertions(+), 110 deletions(-) diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 551c4b24e8..389ca98a3c 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -86,7 +86,8 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { for (const auto& filter_desc : _runtime_filter_descs) { RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(RuntimeFilterRole::PRODUCER, - filter_desc)); + filter_desc, + state->query_options())); } return Status::OK(); diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 5e4ba33867..ebf872043b 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -83,7 +83,9 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(RuntimeFilterRole::CONSUMER, - filter_desc, id())); + filter_desc, + state->query_options(), + id())); RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter)); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 29830f2ddf..b77cf9d71e 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -167,6 +167,9 @@ PrimitiveType to_primitive_type(PColumnType type) { // PFilterType -> RuntimeFilterType RuntimeFilterType get_type(int filter_type) { switch (filter_type) { + case PFilterType::IN_FILTER: { + return RuntimeFilterType::IN_FILTER; + } case PFilterType::BLOOM_FILTER: { return RuntimeFilterType::BLOOM_FILTER; } @@ -180,6 +183,8 @@ RuntimeFilterType get_type(int filter_type) { // RuntimeFilterType -> PFilterType PFilterType get_type(RuntimeFilterType type) { switch (type) { + case RuntimeFilterType::IN_FILTER: + return PFilterType::IN_FILTER; case RuntimeFilterType::BLOOM_FILTER: return PFilterType::BLOOM_FILTER; case RuntimeFilterType::MINMAX_FILTER: @@ -316,14 +321,17 @@ public: : _tracker(tracker), _pool(pool), _column_return_type(params->column_return_type), - _filter_type(params->filter_type) {} + _filter_type(params->filter_type), + _fragment_instance_id(params->fragment_instance_id), + _filter_id(params->filter_id) {} // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge - RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool, RuntimeFilterType type) - : _tracker(tracker), _pool(pool), _filter_type(type) {} - // init runtimefilter wrapper + RuntimePredicateWrapper(MemTracker* tracker, ObjectPool* pool, RuntimeFilterType type, UniqueId fragment_instance_id, uint32_t filter_id) + : _tracker(tracker), _pool(pool), _filter_type(type), _fragment_instance_id(fragment_instance_id), _filter_id(filter_id) {} + // init runtime filter wrapper // alloc memory to init runtime filter function Status init(const RuntimeFilterParams* params) { + _max_in_num = params->max_in_num; switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { _hybrid_set.reset(create_set(_column_return_type)); @@ -346,6 +354,9 @@ public: void insert(const void* data) { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { + if (_is_ignored_in_filter) { + break; + } _hybrid_set->insert(data); break; } @@ -372,19 +383,21 @@ public: switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - TTypeDesc type_desc = create_type_desc(_column_return_type); - TExprNode node; - node.__set_type(type_desc); - node.__set_node_type(TExprNodeType::IN_PRED); - node.in_predicate.__set_is_not_in(false); - node.__set_opcode(TExprOpcode::FILTER_IN); - node.__isset.vector_opcode = true; - node.__set_vector_opcode(to_in_opcode(_column_return_type)); - auto in_pred = _pool->add(new InPredicate(node)); - RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release())); - in_pred->add_child(Expr::copy(_pool, prob_expr->root())); - ExprContext* ctx = _pool->add(new ExprContext(in_pred)); - container->push_back(ctx); + if (!_is_ignored_in_filter) { + TTypeDesc type_desc = create_type_desc(_column_return_type); + TExprNode node; + node.__set_type(type_desc); + node.__set_node_type(TExprNodeType::IN_PRED); + node.in_predicate.__set_is_not_in(false); + node.__set_opcode(TExprOpcode::FILTER_IN); + node.__isset.vector_opcode = true; + node.__set_vector_opcode(to_in_opcode(_column_return_type)); + auto in_pred = _pool->add(new InPredicate(node)); + RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.release())); + in_pred->add_child(Expr::copy(_pool, prob_expr->root())); + ExprContext* ctx = _pool->add(new ExprContext(in_pred)); + container->push_back(ctx); + } break; } case RuntimeFilterType::MINMAX_FILTER: { @@ -432,8 +445,35 @@ public: } switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { - DCHECK(false) << "in filter should't apply in shuffle join"; - return Status::InternalError("in filter should't apply in shuffle join"); + if (_is_ignored_in_filter) { + break; + } else if (wrapper->_is_ignored_in_filter) { + LOG(INFO) << "fragment instance " << _fragment_instance_id.to_string() + << " ignore merge runtime filter(in filter id " + << _filter_id << ") because: " << *(wrapper->get_ignored_in_filter_msg()); + + _is_ignored_in_filter = true; + _ignored_in_filter_msg = wrapper->_ignored_in_filter_msg; + // release in filter + _hybrid_set.reset(create_set(_column_return_type)); + break; + } + // try insert set + _hybrid_set->insert(wrapper->_hybrid_set.get()); + if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) { + std::stringstream msg; + msg << "fragment instance " << _fragment_instance_id.to_string() + << " ignore merge runtime filter(in filter id " + << _filter_id << ") because: in_num(" << _hybrid_set->size() + << ") >= max_in_num(" << _max_in_num << ")"; + _ignored_in_filter_msg = _pool->add(new std::string(msg.str())); + _is_ignored_in_filter = true; + + // release in filter + _hybrid_set.reset(create_set(_column_return_type)); + LOG(INFO) << msg.str(); + } + break; } case RuntimeFilterType::MINMAX_FILTER: { _minmax_func->merge(wrapper->_minmax_func.get(), _pool); @@ -450,6 +490,108 @@ public: return Status::OK(); } + Status assign(const PInFilter* in_filter) { + DCHECK(_tracker != nullptr); + + PrimitiveType type = to_primitive_type(in_filter->column_type()); + if (in_filter->has_ignored_msg()) { + LOG(INFO) << "Ignore in filter because: " << in_filter->ignored_msg(); + _is_ignored_in_filter = true; + _ignored_in_filter_msg = _pool->add(new std::string(in_filter->ignored_msg())); + return Status::OK(); + } + _hybrid_set.reset(create_set(type)); + switch (type) { + case TYPE_BOOLEAN: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + bool bool_val = column.boolval(); + set->insert(&bool_val); + }); + break; + } + case TYPE_TINYINT: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + int8_t int_val = static_cast(column.intval()); + set->insert(&int_val); + }); + break; + } + case TYPE_SMALLINT: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + int16_t int_val = static_cast(column.intval()); + set->insert(&int_val); + }); + break; + } + case TYPE_INT: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + int32_t int_val = column.intval(); + set->insert(&int_val); + }); + break; + } + case TYPE_BIGINT: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + int64_t long_val = column.longval(); + set->insert(&long_val); + }); + break; + } + case TYPE_LARGEINT: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + auto string_val = column.stringval(); + StringParser::ParseResult result; + int128_t int128_val = StringParser::string_to_int(string_val.c_str(), + string_val.length(), &result); + DCHECK(result == StringParser::PARSE_SUCCESS); + set->insert(&int128_val); + }); + break; + } + case TYPE_FLOAT: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + float float_val = static_cast(column.doubleval()); + set->insert(&float_val); + }); + break; + } + case TYPE_DOUBLE: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + double double_val = column.doubleval(); + set->insert(&double_val); + }); + break; + } + case TYPE_DATETIME: + case TYPE_DATE: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + auto &string_val_ref = column.stringval(); + DateTimeValue datetime_val; + datetime_val.from_date_str(string_val_ref.c_str(), string_val_ref.length()); + set->insert(&datetime_val); + }); + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + batch_assign(in_filter, [](std::unique_ptr &set, PColumnValue &column, ObjectPool *pool) { + auto &string_val_ref = column.stringval(); + auto val_ptr = pool->add(new std::string(string_val_ref)); + StringValue string_val(const_cast(val_ptr->c_str()), val_ptr->length()); + set->insert(&string_val); + }); + break; + } + default: { + DCHECK(false) << "unknown type: " << type_to_string(type); + return Status::InvalidArgument("not support assign to in filter, type: " + + type_to_string(type)); + } + } + return Status::OK(); + } + // used by shuffle runtime filter // assign this filter by protobuf Status assign(const PBloomFilter* bloom_filter, const char* data) { @@ -558,6 +700,11 @@ public: return Status::InvalidArgument("not support!"); } + Status get_in_filter_iterator(HybridSetBase::IteratorBase** it) { + *it = _hybrid_set->begin(); + return Status::OK(); + } + Status get_bloom_filter_desc(char** data, int* filter_length) { return _bloomfilter_func->get_data(data, filter_length); } @@ -590,27 +737,51 @@ public: } } + bool is_ignored_in_filter() const { + return _is_ignored_in_filter; + } + + std::string* get_ignored_in_filter_msg() const { + return _ignored_in_filter_msg; + } + + void batch_assign(const PInFilter* filter, + void (*assign_func) (std::unique_ptr &_hybrid_set, PColumnValue&, ObjectPool*)) { + for (int i = 0; i < filter->values_size(); ++i) { + PColumnValue column = filter->values(i); + assign_func(_hybrid_set, column, _pool); + } + } + private: MemTracker* _tracker; ObjectPool* _pool; PrimitiveType _column_return_type; // column type RuntimeFilterType _filter_type; + int32_t _max_in_num = -1; std::unique_ptr _minmax_func; std::unique_ptr _hybrid_set; std::unique_ptr _bloomfilter_func; + bool _is_ignored_in_filter = false; + std::string *_ignored_in_filter_msg; + UniqueId _fragment_instance_id; + uint32_t _filter_id; }; Status IRuntimeFilter::create(RuntimeState* state, MemTracker* tracker, ObjectPool* pool, - const TRuntimeFilterDesc* desc, const RuntimeFilterRole role, - int node_id, IRuntimeFilter** res) { + const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, + const RuntimeFilterRole role, int node_id, IRuntimeFilter** res) { *res = pool->add(new IRuntimeFilter(state, tracker, pool)); (*res)->set_role(role); - return (*res)->init_with_desc(desc, node_id); + UniqueId fragment_instance_id(state->fragment_instance_id()); + return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id); } void IRuntimeFilter::insert(const void* data) { DCHECK(is_producer()); - _wrapper->insert(data); + if (!_is_ignored) { + _wrapper->insert(data); + } } Status IRuntimeFilter::publish() { @@ -688,7 +859,8 @@ void IRuntimeFilter::signal() { _effect_timer.reset(); } -Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, int node_id) { +Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, + UniqueId fragment_instance_id, int node_id) { // if node_id == -1 , it shouldn't be a consumer DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer())); @@ -712,8 +884,11 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, int node_i RETURN_IF_ERROR(Expr::create_expr_tree(_pool, desc->src_expr, &build_ctx)); RuntimeFilterParams params; + params.fragment_instance_id = fragment_instance_id; + params.filter_id = _filter_id; params.filter_type = _runtime_filter_type; params.column_return_type = build_ctx->root()->type().type; + params.max_in_num = options->runtime_filter_max_in_num; if (desc->__isset.bloom_filter_size_bytes) { params.bloom_filter_size = desc->bloom_filter_size_bytes; } @@ -756,9 +931,14 @@ template Status IRuntimeFilter::_create_wrapper(const T* param, MemTracker* tracker, ObjectPool* pool, std::unique_ptr* wrapper) { int filter_type = param->request->filter_type(); - wrapper->reset(new RuntimePredicateWrapper(tracker, pool, get_type(filter_type))); + wrapper->reset(new RuntimePredicateWrapper(tracker, pool, get_type(filter_type), + UniqueId(param->request->fragment_id()), param->request->filter_id())); switch (filter_type) { + case PFilterType::IN_FILTER: { + DCHECK(param->request->has_in_filter()); + return (*wrapper)->assign(¶m->request->in_filter()); + } case PFilterType::BLOOM_FILTER: { DCHECK(param->request->has_bloom_filter()); return (*wrapper)->assign(¶m->request->bloom_filter(), param->data); @@ -768,7 +948,7 @@ Status IRuntimeFilter::_create_wrapper(const T* param, MemTracker* tracker, Obje return (*wrapper)->assign(¶m->request->minmax_filter()); } default: - return Status::InvalidArgument("unknow filter type"); + return Status::InvalidArgument("unknown filter type"); } } @@ -792,14 +972,41 @@ void IRuntimeFilter::ready_for_publish() { } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { - return _wrapper->merge(wrapper); + if (!_is_ignored && wrapper->is_ignored_in_filter()) { + set_ignored(); + set_ignored_msg(*(wrapper->get_ignored_in_filter_msg())); + } + Status status = _wrapper->merge(wrapper); + if (!_is_ignored && _wrapper->is_ignored_in_filter()) { + set_ignored(); + set_ignored_msg(*(_wrapper->get_ignored_in_filter_msg())); + } + return status; +} + +const RuntimePredicateWrapper* IRuntimeFilter::get_wrapper() { + return _wrapper; +} + +template +void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it, + void (*set_func) (PColumnValue*, const T*)) { + while (it->has_next()) { + const void* void_value = it->get_value(); + auto origin_value = reinterpret_cast(void_value); + set_func(filter->add_values(), origin_value); + it->next(); + } } template Status IRuntimeFilter::_serialize(T* request, void** data, int* len) { request->set_filter_type(get_type(_runtime_filter_type)); - if (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) { + if (_runtime_filter_type == RuntimeFilterType::IN_FILTER) { + auto in_filter = request->mutable_in_filter(); + to_protobuf(in_filter); + } else if (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER) { RETURN_IF_ERROR(_wrapper->get_bloom_filter_desc((char**)data, len)); DCHECK(data != nullptr); request->mutable_bloom_filter()->set_filter_length(*len); @@ -813,6 +1020,98 @@ Status IRuntimeFilter::_serialize(T* request, void** data, int* len) { return Status::OK(); } +void IRuntimeFilter::to_protobuf(PInFilter* filter) { + auto column_type = _wrapper->column_type(); + filter->set_column_type(to_proto(column_type)); + + if (_is_ignored) { + filter->set_ignored_msg(_ignored_msg); + return; + } + + HybridSetBase::IteratorBase* it; + _wrapper->get_in_filter_iterator(&it); + DCHECK(it != nullptr); + + switch (column_type) { + case TYPE_BOOLEAN: { + batch_copy(filter, it, [](PColumnValue *column, const int32_t *value) { + column->set_boolval(*value); + }); + return; + } + case TYPE_TINYINT: { + batch_copy(filter, it, [](PColumnValue *column, const int8_t *value) { + column->set_intval(*value); + }); + return; + } + case TYPE_SMALLINT: { + batch_copy(filter, it, [](PColumnValue *column, const int16_t *value) { + column->set_intval(*value); + }); + return; + } + case TYPE_INT: { + batch_copy(filter, it, [](PColumnValue *column, const int32_t *value) { + column->set_intval(*value); + }); + return; + } + case TYPE_BIGINT: { + batch_copy(filter, it, [](PColumnValue *column, const int64_t *value) { + column->set_longval(*value); + }); + return; + } + case TYPE_LARGEINT: { + batch_copy(filter, it, [](PColumnValue *column, const int128_t *value) { + column->set_stringval(LargeIntValue::to_string(*value)); + }); + return; + } + case TYPE_FLOAT: { + batch_copy(filter, it, [](PColumnValue *column, const float *value) { + column->set_doubleval(*value); + }); + return; + } + case TYPE_DOUBLE: { + batch_copy(filter, it, [](PColumnValue *column, const double *value) { + column->set_doubleval(*value); + }); + return; + } + case TYPE_DATE: + case TYPE_DATETIME: { + batch_copy(filter, it, [](PColumnValue *column, const DateTimeValue *value) { + char convert_buffer[30]; + value->to_string(convert_buffer); + column->set_stringval(convert_buffer); + }); + return; + } + case TYPE_DECIMALV2: { + batch_copy(filter, it, [](PColumnValue *column, const DecimalV2Value *value) { + column->set_stringval(value->to_string()); + }); + return; + } + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + batch_copy(filter, it, [](PColumnValue *column, const StringValue *value) { + column->set_stringval(std::string(value->ptr, value->len)); + }); + return; + } + default: { + DCHECK(false) << "unknown type"; + break; + } + } +} + void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) { void* min_data = nullptr; void* max_data = nullptr; @@ -899,6 +1198,12 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) { } Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { + if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { + set_ignored(); + const PInFilter in_filter = param->request->in_filter(); + auto msg = param->pool->add(new std::string(in_filter.ignored_msg())); + set_ignored_msg(*msg); + } std::unique_ptr wrapper; RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(param, _mem_tracker, _pool, &wrapper)); RETURN_IF_ERROR(_wrapper->merge(wrapper.get())); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 809c132dc5..7d406f3600 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -41,6 +41,7 @@ class PPublishFilterRequest; class PMergeFilterRequest; class TRuntimeFilterDesc; class RowDescriptor; +class PInFilter; class PMinMaxFilter; class HashJoinNode; class RuntimeProfile; @@ -71,17 +72,22 @@ inline std::string to_string(RuntimeFilterType type) { enum class RuntimeFilterRole { PRODUCER = 0, CONSUMER = 1 }; struct RuntimeFilterParams { - RuntimeFilterParams() : filter_type(RuntimeFilterType::UNKNOWN_FILTER), bloom_filter_size(-1) {} + RuntimeFilterParams() : filter_type(RuntimeFilterType::UNKNOWN_FILTER), + bloom_filter_size(-1), filter_id(0), fragment_instance_id(0, 0) {} RuntimeFilterType filter_type; PrimitiveType column_return_type; // used in bloom filter int64_t bloom_filter_size; + int32_t max_in_num; + int32_t filter_id; + UniqueId fragment_instance_id; }; struct UpdateRuntimeFilterParams { const PPublishFilterRequest* request; const char* data; + ObjectPool *pool; }; struct MergeRuntimeFilterParams { @@ -115,7 +121,8 @@ public: ~IRuntimeFilter() = default; static Status create(RuntimeState* state, MemTracker* tracker, ObjectPool* pool, - const TRuntimeFilterDesc* desc, const RuntimeFilterRole role, int node_id, + const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, + const RuntimeFilterRole role, int node_id, IRuntimeFilter** res); // insert data to build filter @@ -166,7 +173,10 @@ public: void signal(); // init filter with desc - Status init_with_desc(const TRuntimeFilterDesc* desc, int node_id = -1); + Status init_with_desc(const TRuntimeFilterDesc* desc, + const TQueryOptions* options, + UniqueId fragment_id = UniqueId(0, 0), + int node_id = -1); // serialize _wrapper to protobuf Status serialize(PMergeFilterRequest* request, void** data, int* len); @@ -174,17 +184,23 @@ public: Status merge_from(const RuntimePredicateWrapper* wrapper); + // for ut + const RuntimePredicateWrapper* get_wrapper(); static Status create_wrapper(const MergeRuntimeFilterParams* param, MemTracker* tracker, ObjectPool* pool, std::unique_ptr* wrapper); static Status create_wrapper(const UpdateRuntimeFilterParams* param, MemTracker* tracker, ObjectPool* pool, std::unique_ptr* wrapper); - Status update_filter(const UpdateRuntimeFilterParams* param); void set_ignored() { _is_ignored = true; } + // for ut + bool is_ignored() { return _is_ignored; } + + void set_ignored_msg(std::string &msg) { _ignored_msg = msg; } + // consumer should call before released Status consumer_close(); @@ -200,6 +216,7 @@ public: protected: // serialize _wrapper to protobuf + void to_protobuf(PInFilter* filter); void to_protobuf(PMinMaxFilter* filter); template @@ -212,6 +229,7 @@ protected: RuntimeState* _state; MemTracker* _mem_tracker; ObjectPool* _pool; + int32_t _fragment_id; // _wrapper is a runtime filter function wrapper // _wrapper should alloc from _pool RuntimePredicateWrapper* _wrapper; @@ -248,6 +266,7 @@ protected: // Indicate whether runtime filter expr has been ignored bool _is_ignored; + std::string _ignored_msg = ""; // some runtime filter will generate // multiple contexts such as minmax filter diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 7f0957aea0..576cad04b3 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -38,14 +38,14 @@ public: Status init(RuntimeState* state, int64_t hash_table_size) { DCHECK(_probe_expr_context.size() == _build_expr_context.size()); - // runtime filter effect stragety + // runtime filter effect strategy // 1. we will ignore IN filter when hash_table_size is too big // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size // is too small and IN filter has effect std::map has_in_filter; - auto ignore_filter = [state](int filter_id) { + auto ignore_local_filter = [state](int filter_id) { IRuntimeFilter* consumer_filter = nullptr; state->runtime_filter_mgr()->get_consume_filter(filter_id, &consumer_filter); DCHECK(consumer_filter != nullptr); @@ -53,6 +53,13 @@ public: consumer_filter->signal(); }; + auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string &msg) { + runtime_filter->set_ignored(); + runtime_filter->set_ignored_msg(msg); + runtime_filter->publish(); + runtime_filter->publish_finally(); + }; + for (auto& filter_desc : _runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id, @@ -62,16 +69,42 @@ public: DCHECK(runtime_filter->expr_order() < _probe_expr_context.size()); // do not create 'in filter' when hash_table size over limit - bool over_max_in_num = (hash_table_size >= state->runtime_filter_max_in_num()); + auto max_in_num = state->runtime_filter_max_in_num(); + bool over_max_in_num = (hash_table_size >= max_in_num); bool is_in_filter = (runtime_filter->type() == RuntimeFilterType::IN_FILTER); - // do not create 'bloom filter' and 'minmax filter' when 'in filter' has created - bool pass_not_in = (has_in_filter[runtime_filter->expr_order()] && - !runtime_filter->has_remote_target()); - - if (over_max_in_num == is_in_filter && (is_in_filter || pass_not_in)) { - ignore_filter(filter_desc.filter_id); + // Note: + // In the case that exist *remote target* and in filter and other filter, + // we must merge other filter whatever in filter is over the max num in current node, + // because: + // case 1: (in filter >= max num) in current node, so in filter will be ignored, + // and then other filter can be used + // case 2: (in filter < max num) in current node, we don't know whether the in filter + // will be ignored in merge node, so we must transfer other filter to merge node + if (!runtime_filter->has_remote_target()) { + bool exists_in_filter = has_in_filter[runtime_filter->expr_order()]; + if (is_in_filter && over_max_in_num) { + LOG(INFO) << "fragment instance " << print_id(state->fragment_instance_id()) + << " ignore runtime filter(in filter id " << filter_desc.filter_id + << ") because: in_num(" << hash_table_size + << ") >= max_in_num(" << max_in_num << ")"; + ignore_local_filter(filter_desc.filter_id); + continue; + } else if (!is_in_filter && exists_in_filter) { + // do not create 'bloom filter' and 'minmax filter' when 'in filter' has created + // because in filter is exactly filter, so it is enough to filter data + LOG(INFO) << "fragment instance " << print_id(state->fragment_instance_id()) + << " ignore runtime filter(" << to_string(runtime_filter->type()) + << " id " << filter_desc.filter_id + << ") because: already exists in filter"; + ignore_local_filter(filter_desc.filter_id); + continue; + } + } else if (is_in_filter && over_max_in_num) { + std::string msg = fmt::format("fragment instance {} ignore runtime filter(in filter id {}) because: in_num({}) >= max_in_num({})", + print_id(state->fragment_instance_id()), filter_desc.filter_id, hash_table_size, max_in_num); + ignore_remote_filter(runtime_filter, msg); continue; } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 41c81ab85d..e7b3a0c981 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -81,7 +81,7 @@ Status RuntimeFilterMgr::get_producer_filter(const int filter_id, } Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - int node_id) { + const TQueryOptions& options, int node_id) { DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) || role != RuntimeFilterRole::CONSUMER); int32_t key = desc.filter_id; @@ -102,8 +102,8 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt RuntimeFilterMgrVal filter_mgr_val; filter_mgr_val.role = role; - RETURN_IF_ERROR(IRuntimeFilter::create(_state, _tracker, &_pool, &desc, role, node_id, - &filter_mgr_val.filter)); + RETURN_IF_ERROR(IRuntimeFilter::create(_state, _tracker, &_pool, &desc, &options, + role, node_id, &filter_mgr_val.filter)); filter_map->emplace(key, filter_mgr_val); @@ -114,6 +114,7 @@ Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, con UpdateRuntimeFilterParams params; params.request = request; params.data = data; + params.pool = &_pool; int filter_id = request->filter_id(); IRuntimeFilter* real_filter = nullptr; RETURN_IF_ERROR(get_consume_filter(filter_id, &real_filter)); @@ -137,6 +138,7 @@ Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) { Status RuntimeFilterMergeControllerEntity::_init_with_desc( const TRuntimeFilterDesc* runtime_filter_desc, + const TQueryOptions* query_options, const std::vector* target_info, const int producer_size) { std::lock_guard guard(_filter_map_mutex); @@ -154,14 +156,16 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( std::string filter_id = std::to_string(runtime_filter_desc->filter_id); // LOG(INFO) << "entity filter id:" << filter_id; - cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc); + cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, _fragment_instance_id); _filter_map.emplace(filter_id, cntVal); return Status::OK(); } -Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, - const TRuntimeFilterParams& runtime_filter_params) { +Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId fragment_instance_id, + const TRuntimeFilterParams& runtime_filter_params, + const TQueryOptions& query_options) { _query_id = query_id; + _fragment_instance_id = fragment_instance_id; for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { int filter_id = filterid_to_desc.first; const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id); @@ -172,7 +176,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { return Status::InternalError("runtime filter params meet error"); } - _init_with_desc(&filterid_to_desc.second, &target_iter->second, build_iter->second); + _init_with_desc(&filterid_to_desc.second, &query_options, &target_iter->second, build_iter->second); } return Status::OK(); } @@ -275,6 +279,7 @@ Status RuntimeFilterMergeController::add_entity( UniqueId query_id(params.params.query_id); std::string query_id_str = query_id.to_string(); auto iter = _filter_controller_map.find(query_id_str); + UniqueId fragment_instance_id = UniqueId(params.params.fragment_instance_id); if (iter == _filter_controller_map.end()) { *handle = std::shared_ptr( @@ -282,7 +287,7 @@ Status RuntimeFilterMergeController::add_entity( _filter_controller_map[query_id_str] = *handle; const TRuntimeFilterParams& filter_params = params.params.runtime_filter_params; if (params.params.__isset.runtime_filter_params) { - RETURN_IF_ERROR(handle->get()->init(query_id, filter_params)); + RETURN_IF_ERROR(handle->get()->init(query_id, fragment_instance_id, filter_params, params.query_options)); } } else { *handle = _filter_controller_map[query_id_str].lock(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index cfcbbecd18..653ce675b2 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -66,7 +66,7 @@ public: Status get_producer_filter(const int filter_id, IRuntimeFilter** producer_filter); // regist filter Status regist_filter(const RuntimeFilterRole role, const TRuntimeFilterDesc& desc, - int node_id = -1); + const TQueryOptions& options, int node_id = -1); // update filter by remote Status update_filter(const PPublishFilterRequest* request, const char* data); @@ -105,10 +105,12 @@ private: // the class is destroyed with the last fragment_exec. class RuntimeFilterMergeControllerEntity { public: - RuntimeFilterMergeControllerEntity() : _query_id(0, 0) {} + RuntimeFilterMergeControllerEntity() : _query_id(0, 0), _fragment_instance_id(0, 0) {} ~RuntimeFilterMergeControllerEntity() = default; - Status init(UniqueId query_id, const TRuntimeFilterParams& runtime_filter_params); + Status init(UniqueId query_id, UniqueId fragment_instance_id, + const TRuntimeFilterParams& runtime_filter_params, + const TQueryOptions& query_options); // handle merge rpc Status merge(const PMergeFilterRequest* request, const char* data); @@ -117,6 +119,7 @@ public: private: Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, + const TQueryOptions* query_options, const std::vector* target_info, const int producer_size); @@ -131,6 +134,7 @@ private: std::shared_ptr pool; }; UniqueId _query_id; + UniqueId _fragment_instance_id; // protect _filter_map std::mutex _filter_map_mutex; // TODO: convert filter id to i32 diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp index 3cd858e314..68cefabbf1 100644 --- a/be/test/exprs/runtime_filter_test.cpp +++ b/be/test/exprs/runtime_filter_test.cpp @@ -55,14 +55,15 @@ private: // std::unique_ptr _runtime_filter; }; -TEST_F(RuntimeFilterTest, runtime_filter_basic_test) { +IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type, TQueryOptions* options, + RuntimeState* _runtime_stat, ObjectPool* _obj_pool) { TRuntimeFilterDesc desc; desc.__set_filter_id(0); desc.__set_expr_order(0); desc.__set_has_local_targets(true); desc.__set_has_remote_targets(false); desc.__set_is_broadcast_join(true); - desc.__set_type(TRuntimeFilterType::BLOOM); + desc.__set_type(type); desc.__set_bloom_filter_size_bytes(4096); // build src expr context @@ -102,40 +103,49 @@ TEST_F(RuntimeFilterTest, runtime_filter_basic_test) { desc.__set_planId_to_target_expr(planid_to_target_expr); } - // size_t prob_index = 0; + IRuntimeFilter* runtime_filter = nullptr; + Status status = IRuntimeFilter::create(_runtime_stat, + _runtime_stat->instance_mem_tracker().get(), _obj_pool, + &desc, options, RuntimeFilterRole::PRODUCER, -1, &runtime_filter); + assert(status.ok()); + return runtime_filter; +} + +std::vector* create_rows(ObjectPool* _obj_pool, int from, int to) { + auto& rows = *(_obj_pool->add(new std::vector(to - from + 1))); + int i = 0; + while (from + i <= to) { + std::array* data = _obj_pool->add(new std::array()); + data->at(0) = data->at(1) = from + i; + TupleRow row; + row._tuples[0] = (Tuple*)data->data(); + rows[i++] = row; + } + return &rows; +} + +void insert(IRuntimeFilter* runtime_filter, ExprContext* expr_ctx, std::vector* rows) { + for (TupleRow& row : *rows) { + void* val = expr_ctx->get_value(&row); + runtime_filter->insert(val); + } +} + +TEST_F(RuntimeFilterTest, runtime_filter_basic_test) { SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0)); ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr)); ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr)); - IRuntimeFilter* runtime_filter = nullptr; + auto tuple_rows = create_rows(&_obj_pool, 1, 1024); + auto not_exist_data = create_rows(&_obj_pool, 1025, 2048); - Status status = IRuntimeFilter::create(_runtime_stat.get(), - _runtime_stat->instance_mem_tracker().get(), &_obj_pool, - &desc, RuntimeFilterRole::PRODUCER, -1, &runtime_filter); + TQueryOptions options; + options.runtime_filter_max_in_num = 1024; - ASSERT_TRUE(status.ok()); + IRuntimeFilter* runtime_filter = + create_runtime_filter(TRuntimeFilterType::BLOOM, &options, _runtime_stat.get(), &_obj_pool); + insert(runtime_filter, build_expr_ctx, tuple_rows); - // generate data - std::array tuple_rows; - int generator_index = 0; - auto generator = [&]() { - std::array* data = _obj_pool.add(new std::array()); - data->at(0) = data->at(1) = generator_index++; - TupleRow row; - row._tuples[0] = (Tuple*)data->data(); - return row; - }; - std::generate(tuple_rows.begin(), tuple_rows.end(), generator); - - std::array not_exist_data; - // generate not exist data - std::generate(not_exist_data.begin(), not_exist_data.end(), generator); - - // build runtime filter - for (TupleRow& row : tuple_rows) { - void* val = build_expr_ctx->get_value(&row); - runtime_filter->insert(val); - } // get expr context from filter std::list expr_context_list; @@ -143,13 +153,111 @@ TEST_F(RuntimeFilterTest, runtime_filter_basic_test) { ASSERT_TRUE(!expr_context_list.empty()); // test data in - for (TupleRow& row : tuple_rows) { + for (TupleRow& row : *tuple_rows) { for (ExprContext* ctx : expr_context_list) { ASSERT_TRUE(ctx->get_boolean_val(&row).val); } } // test not exist data - for (TupleRow& row : not_exist_data) { + for (TupleRow& row : *not_exist_data) { + for (ExprContext* ctx : expr_context_list) { + ASSERT_FALSE(ctx->get_boolean_val(&row).val); + } + } + + // test null + for (ExprContext* ctx : expr_context_list) { + TupleRow row; + row._tuples[0] = nullptr; + ASSERT_FALSE(ctx->get_boolean_val(&row).val); + } +} + +TEST_F(RuntimeFilterTest, runtime_filter_merge_in_filter_test) { + // size_t prob_index = 0; + SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0)); + ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr)); + ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr)); + + TQueryOptions options; + options.runtime_filter_max_in_num = 1024 * 2 + 1; + + auto rows1 = create_rows(&_obj_pool, 1, 1024); + auto rows2 = create_rows(&_obj_pool, 1025, 2048); + + IRuntimeFilter* runtime_filter = + create_runtime_filter(TRuntimeFilterType::IN, &options, _runtime_stat.get(), &_obj_pool); + insert(runtime_filter, build_expr_ctx, rows1); + + IRuntimeFilter* runtime_filter2 = + create_runtime_filter(TRuntimeFilterType::IN, &options, _runtime_stat.get(), &_obj_pool); + insert(runtime_filter2, build_expr_ctx, rows2); + + Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper()); + ASSERT_TRUE(status.ok()); + + // get expr context from filter + + std::list expr_context_list; + ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, prob_expr_ctx).ok()); + ASSERT_TRUE(!expr_context_list.empty()); + + // test data in + for (TupleRow& row : *rows1) { + for (ExprContext* ctx : expr_context_list) { + ASSERT_TRUE(ctx->get_boolean_val(&row).val); + } + } + for (TupleRow& row : *rows2) { + for (ExprContext* ctx : expr_context_list) { + ASSERT_TRUE(ctx->get_boolean_val(&row).val); + } + } + + // test null + for (ExprContext* ctx : expr_context_list) { + TupleRow row; + row._tuples[0] = nullptr; + ASSERT_FALSE(ctx->get_boolean_val(&row).val); + } +} + +TEST_F(RuntimeFilterTest, runtime_filter_ignore_in_filter_test) { + // size_t prob_index = 0; + SlotRef* expr = _obj_pool.add(new SlotRef(TYPE_INT, 0)); + ExprContext* prob_expr_ctx = _obj_pool.add(new ExprContext(expr)); + ExprContext* build_expr_ctx = _obj_pool.add(new ExprContext(expr)); + + TQueryOptions options; + options.runtime_filter_max_in_num = 2; + + auto rows1 = create_rows(&_obj_pool, 1, 1); + auto rows2 = create_rows(&_obj_pool, 2, 2); + + IRuntimeFilter* runtime_filter = + create_runtime_filter(TRuntimeFilterType::IN, &options, _runtime_stat.get(), &_obj_pool); + insert(runtime_filter, build_expr_ctx, rows1); + + IRuntimeFilter* runtime_filter2 = + create_runtime_filter(TRuntimeFilterType::IN, &options, _runtime_stat.get(), &_obj_pool); + insert(runtime_filter2, build_expr_ctx, rows2); + + Status status = runtime_filter->merge_from(runtime_filter2->get_wrapper()); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(runtime_filter->is_ignored()); + + // get expr context from filter + + std::list expr_context_list; + ASSERT_TRUE(runtime_filter->get_push_expr_ctxs(&expr_context_list, prob_expr_ctx).ok()); + ASSERT_TRUE(expr_context_list.empty()); + + for (TupleRow& row : *rows1) { + for (ExprContext* ctx : expr_context_list) { + ASSERT_FALSE(ctx->get_boolean_val(&row).val); + } + } + for (TupleRow& row : *rows2) { for (ExprContext* ctx : expr_context_list) { ASSERT_FALSE(ctx->get_boolean_val(&row).val); } @@ -157,8 +265,6 @@ TEST_F(RuntimeFilterTest, runtime_filter_basic_test) { // test null for (ExprContext* ctx : expr_context_list) { - std::array* data = _obj_pool.add(new std::array()); - data->at(0) = data->at(1) = generator_index++; TupleRow row; row._tuples[0] = nullptr; ASSERT_FALSE(ctx->get_boolean_val(&row).val); diff --git a/docs/en/administrator-guide/runtime-filter.md b/docs/en/administrator-guide/runtime-filter.md index a0080686a6..549cc9e79d 100644 --- a/docs/en/administrator-guide/runtime-filter.md +++ b/docs/en/administrator-guide/runtime-filter.md @@ -136,7 +136,8 @@ set runtime_filter_type=7; - **IN predicate**: Construct IN predicate based on all the values ​​of Key listed in the join on clause on the right table, and use the constructed IN predicate to filter on the left table. Compared with Bloom Filter, the cost of construction and application is lower. The amount of data in the right table is lower. When it is less, it tends to perform better. - By default, only the number of data rows in the right table is less than 1024 will be pushed down (can be adjusted by `runtime_filter_max_in_num` in the session variable). - - Currently IN predicate does not implement a merge method, that is, it cannot be pushed down across Fragments, so currently when it is necessary to push down to the ScanNode of the left table of shuffle join, if Bloom Filter is not generated, then we will convert IN predicate to Bloom Filter for Process pushdown across Fragments, so even if the type only selects IN predicate, Bloom Filter may actually be applied; + - Currently IN predicate already implement a merge method. + - When IN predicate and other filters are specified at the same time, and the filtering value of IN predicate does not reach runtime_filter_max_in_num will try to remove other filters. The reason is that IN predicate is an accurate filtering condition. Even if there is no other filter, it can filter efficiently. If it is used at the same time, other filters will do useless work. Currently, only when the producer and consumer of the runtime filter are in the same fragment can there be logic to remove the Non-IN predicate. #### 2.runtime_filter_mode Used to control the transmission range of Runtime Filter between instances. diff --git a/docs/zh-CN/administrator-guide/runtime-filter.md b/docs/zh-CN/administrator-guide/runtime-filter.md index 6f8b25180a..6da7d0ed8d 100644 --- a/docs/zh-CN/administrator-guide/runtime-filter.md +++ b/docs/zh-CN/administrator-guide/runtime-filter.md @@ -136,7 +136,9 @@ set runtime_filter_type=7; - **IN predicate**: 根据join on clause中Key列在右表上的所有值构建IN predicate,使用构建的IN predicate在左表上过滤,相比Bloom Filter构建和应用的开销更低,在右表数据量较少时往往性能更高。 - 默认只有右表数据行数少于1024才会下推(可通过session变量中的`runtime_filter_max_in_num`调整)。 - - 目前IN predicate没有实现合并方法,即无法跨Fragment下推,所以目前当需要下推给shuffle join左表的ScanNode时,如果没有生成Bloom Filter,那么我们会将IN predicate转为Bloom Filter,用于处理跨Fragment下推,所以即使类型只选择了IN predicate,实际也可能应用了Bloom Filter; + - 目前IN predicate已实现合并方法。 + - 当同时指定In predicate和其他filter,并且in的过滤数值没达到runtime_filter_max_in_num时,会尝试把其他filter去除掉。原因是In predicate是精确的过滤条件,即使没有其他filter也可以高效过滤,如果同时使用则其他filter会做无用功。目前仅在Runtime filter的生产者和消费者处于同一个fragment时才会有去除非in filter的逻辑。 +- ** #### 2.runtime_filter_mode 用于控制Runtime Filter在instance之间传输的范围。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index a368208a7c..6c504d6ccd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -431,6 +431,7 @@ public final class RuntimeFilter { public void assignToPlanNodes() { Preconditions.checkState(hasTargets()); builderNode.addRuntimeFilter(this); + builderNode.fragment_.setBuilderRuntimeFilterIds(getFilterId()); for (RuntimeFilterTarget target: targets) { target.node.addRuntimeFilter(this); // fragment is expected to use this filter id @@ -443,7 +444,6 @@ public final class RuntimeFilter { if (LOG.isTraceEnabled()) LOG.trace("Runtime filter: " + debugString()); assignToPlanNodes(); analyzer.putAssignedRuntimeFilter(this); - getBuilderNode().fragment_.setBuilderRuntimeFilterIds(getFilterId()); } public String debugString() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java index 7c53de3068..66fec98c11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java @@ -153,22 +153,6 @@ public final class RuntimeFilterGenerator { int numBloomFilters = 0; for (RuntimeFilter filter: filters) { filter.extractTargetsPosition(); - // When there is a remote target, the producer and consumer of the filter are not in the same fragment at - // this time, and the filter build by the producer needs to be merged. Currently, the IN filter has - // no merge logic, so replace it with Bloom Filter. - // The reason for this is that in the IN pushdown implemented by early Doris, when OlapScanNode and - // HashJoinNode are not in the same fragment, the IN filter will be pushed down to the nearest - // ExchangeNode, so that although it cannot be pushed down to the storage engine to improve performance, - // In some extreme cases, the number of rows in the hash table constructed by HashJoinNode can be reduced, - // thereby avoiding OOM. To cover the previous case (from tpcds 1T query 17), replace IN with Bloom Filter. - // Only when no Bloom Filter is generated, will IN be converted to Bloom Filter and pushed down. - if (filter.getType() == TRuntimeFilterType.IN && filter.hasRemoteTargets()) { - if ((runtimeFilterType & TRuntimeFilterType.BLOOM.getValue()) == 0) { - filter.setType(TRuntimeFilterType.BLOOM); - } else { - continue; - } - } if (filter.getType() == TRuntimeFilterType.BLOOM) { if (numBloomFilters >= maxNumBloomFilters) continue; ++numBloomFilters; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index b8aa9e5cd8..6323f6ee5e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -43,6 +43,7 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.load.EtlJobType; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.thrift.TRuntimeFilterType; import org.apache.doris.utframe.UtFrameUtils; import com.google.common.collect.Lists; @@ -1373,8 +1374,15 @@ public class QueryPlanTest { Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL"); Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", 7); explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); - System.out.println(explainString); Assert.assertFalse(explainString.contains("runtime filter")); + + // support merge in filter, and forbidden implicit conversion to bloom filter + queryStr = "explain select * from jointest t2 join [shuffle] jointest t1 where t1.k1 = t2.k1"; + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterMode", "GLOBAL"); + Deencapsulation.setField(connectContext.getSessionVariable(), "runtimeFilterType", TRuntimeFilterType.IN.getValue()); + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("runtime filters: RF000[in] -> `t2`.`k1`")); + Assert.assertFalse(explainString.contains("runtime filters: RF000[bloom] -> `t2`.`k1`")); } @Test @@ -1446,7 +1454,6 @@ public class QueryPlanTest { for (String sql: sqls) { String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, sql); - System.out.println(explainString); Assert.assertTrue(explainString.contains(emptyNode)); Assert.assertFalse(explainString.contains(denseRank)); } @@ -1475,11 +1482,9 @@ public class QueryPlanTest { connectContext.setDatabase("default_cluster:test"); String sql = "SELECT dt, dis_key, COUNT(1) FROM table_unpartitioned group by dt, dis_key"; String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); - System.out.println(explainString); Assert.assertTrue(explainString.contains("AGGREGATE (update finalize)")); sql = "SELECT dt, dis_key, COUNT(1) FROM table_partitioned group by dt, dis_key"; explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "EXPLAIN " + sql); - System.out.println(explainString); Assert.assertTrue(explainString.contains("AGGREGATE (update serialize)")); } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ef4362573a..080c8d328c 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -340,10 +340,17 @@ message PMinMaxFilter { required PColumnValue max_val = 3; }; +message PInFilter { + required PColumnType column_type = 1; + repeated PColumnValue values = 2; + optional string ignored_msg = 3; +} + enum PFilterType { UNKNOW_FILTER = 0; BLOOM_FILTER = 1; MINMAX_FILTER = 2; + IN_FILTER = 3; }; message PMergeFilterRequest { @@ -353,6 +360,7 @@ message PMergeFilterRequest { required PFilterType filter_type = 4; optional PMinMaxFilter minmax_filter = 5; optional PBloomFilter bloom_filter = 6; + optional PInFilter in_filter = 7; }; message PMergeFilterResponse { @@ -366,6 +374,7 @@ message PPublishFilterRequest { required PFilterType filter_type = 4; optional PMinMaxFilter minmax_filter = 5; optional PBloomFilter bloom_filter = 6; + optional PInFilter in_filter = 7; }; message PPublishFilterResponse {