diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index 11831ccdc0..6182bfe781 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -204,6 +204,8 @@ public: return Status::OK(); } + size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 0; } + void light_copy(BloomFilterFuncBase* bloomfilter_func) { auto other_func = static_cast(bloomfilter_func); _bloom_filter_alloced = other_func->_bloom_filter_alloced; diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index e2ec2d57a2..2a9d7cc24f 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -348,28 +348,52 @@ public: RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, const RuntimeFilterParams* params) : _state(state), + _be_exec_version(_state->be_exec_version()), _pool(pool), _column_return_type(params->column_return_type), _filter_type(params->filter_type), - _fragment_instance_id(params->fragment_instance_id), _filter_id(params->filter_id), - _use_batch(IRuntimeFilter::enable_use_batch(_state->be_exec_version(), - _column_return_type)), - _use_new_hash(_state->be_exec_version() >= 2) {} + _use_batch( + IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)), + _use_new_hash(_be_exec_version >= 2) {} // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, PrimitiveType column_type, - RuntimeFilterType type, UniqueId fragment_instance_id, - uint32_t filter_id) + RuntimeFilterType type, uint32_t filter_id) : _state(state), + _be_exec_version(_state->be_exec_version()), _pool(pool), _column_return_type(column_type), _filter_type(type), - _fragment_instance_id(fragment_instance_id), _filter_id(filter_id), - _use_batch(IRuntimeFilter::enable_use_batch(_state->be_exec_version(), - _column_return_type)), - _use_new_hash(_state->be_exec_version() >= 2) {} + _use_batch( + IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)), + _use_new_hash(_be_exec_version >= 2) {} + + RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, + const RuntimeFilterParams* params) + : _query_ctx(query_ctx), + _be_exec_version(_query_ctx->be_exec_version()), + _pool(pool), + _column_return_type(params->column_return_type), + _filter_type(params->filter_type), + _filter_id(params->filter_id), + _use_batch( + IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)), + _use_new_hash(_be_exec_version >= 2) {} + // for a 'tmp' runtime predicate wrapper + // only could called assign method or as a param for merge + RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool, PrimitiveType column_type, + RuntimeFilterType type, uint32_t filter_id) + : _query_ctx(query_ctx), + _be_exec_version(_query_ctx->be_exec_version()), + _pool(pool), + _column_return_type(column_type), + _filter_type(type), + _filter_id(filter_id), + _use_batch( + IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)), + _use_new_hash(_be_exec_version >= 2) {} // init runtime filter wrapper // alloc memory to init runtime filter function Status init(const RuntimeFilterParams* params) { @@ -542,8 +566,7 @@ public: void insert_batch(const vectorized::ColumnPtr column, const std::vector& rows) { if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) { bitmap_filter_insert_batch(column, rows); - } else if (IRuntimeFilter::enable_use_batch(_state->be_exec_version(), - _column_return_type)) { + } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, _column_return_type)) { insert_fixed_len(column->get_raw_data().data, rows.data(), rows.size()); } else { for (int index : rows) { @@ -571,7 +594,14 @@ public: return real_filter_type; } - Status get_push_vexprs(std::vector* container, RuntimeState* state, + size_t get_bloom_filter_size() { + if (_is_bloomfilter) { + return _context.bloom_filter_func->get_size(); + } + return 0; + } + + Status get_push_vexprs(std::vector* container, vectorized::VExprContext* prob_expr); Status merge(const RuntimePredicateWrapper* wrapper) { @@ -583,7 +613,6 @@ public: _filter_type != wrapper->_filter_type; CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other) - << "fragment instance " << _fragment_instance_id.to_string() << " can not merge runtime filter(id=" << _filter_id << "), current is filter type is " << to_string(_filter_type) << ", other filter type is " << to_string(wrapper->_filter_type); @@ -593,8 +622,7 @@ public: if (_is_ignored_in_filter) { break; } else if (wrapper->_is_ignored_in_filter) { - VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string() - << " ignore merge runtime filter(in filter id " << _filter_id + VLOG_DEBUG << " ignore merge runtime filter(in filter id " << _filter_id << ") because: " << *(wrapper->get_ignored_in_filter_msg()); _is_ignored_in_filter = true; @@ -608,8 +636,7 @@ public: if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { #ifdef VLOG_DEBUG_IS_ON std::stringstream msg; - msg << "fragment instance " << _fragment_instance_id.to_string() - << " ignore merge runtime filter(in filter id " << _filter_id + msg << " ignore merge runtime filter(in filter id " << _filter_id << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; _ignored_in_filter_msg = _pool->add(new std::string(msg.str())); @@ -637,22 +664,19 @@ public: if (real_filter_type == RuntimeFilterType::IN_FILTER) { if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { // in merge in CHECK(!wrapper->_is_ignored_in_filter) - << "fragment instance " << _fragment_instance_id.to_string() << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " << *(wrapper->get_ignored_in_filter_msg()); _context.hybrid_set->insert(wrapper->_context.hybrid_set.get()); if (_max_in_num >= 0 && _context.hybrid_set->size() >= _max_in_num) { - VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string() - << " change runtime filter to bloom filter(id=" << _filter_id + VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id << ") because: in_num(" << _context.hybrid_set->size() << ") >= max_in_num(" << _max_in_num << ")"; change_to_bloom_filter(); } // in merge bloom filter } else { - VLOG_DEBUG << "fragment instance " << _fragment_instance_id.to_string() - << " change runtime filter to bloom filter(id=" << _filter_id + VLOG_DEBUG << " change runtime filter to bloom filter(id=" << _filter_id << ") because: already exist a bloom filter"; change_to_bloom_filter(); _context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get()); @@ -661,7 +685,6 @@ public: if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) { // bloom filter merge in CHECK(!wrapper->_is_ignored_in_filter) - << "fragment instance " << _fragment_instance_id.to_string() << " can not ignore merge runtime filter(in filter id " << wrapper->_filter_id << ") when used IN_OR_BLOOM_FILTER, ignore msg: " << *(wrapper->get_ignored_in_filter_msg()); @@ -1036,6 +1059,8 @@ public: private: RuntimeState* _state; + QueryContext* _query_ctx; + int _be_exec_version; ObjectPool* _pool; // When a runtime filter received from remote and it is a bloom filter, _column_return_type will be invalid. @@ -1047,7 +1072,6 @@ private: bool _is_bloomfilter = false; bool _is_ignored_in_filter = false; std::string* _ignored_in_filter_msg = nullptr; - UniqueId _fragment_instance_id; uint32_t _filter_id; // When _column_return_type is invalid, _use_batch will be always false. @@ -1063,9 +1087,16 @@ Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRunt int node_id, IRuntimeFilter** res, bool build_bf_exactly) { *res = pool->add(new IRuntimeFilter(state, pool)); (*res)->set_role(role); - UniqueId fragment_instance_id(state->fragment_instance_id()); - return (*res)->init_with_desc(desc, query_options, fragment_instance_id, node_id, - build_bf_exactly); + return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); +} + +Status IRuntimeFilter::create(QueryContext* query_ctx, ObjectPool* pool, + const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, + const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, + bool build_bf_exactly) { + *res = pool->add(new IRuntimeFilter(query_ctx, pool)); + (*res)->set_role(role); + return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); } void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) { @@ -1100,6 +1131,7 @@ Status IRuntimeFilter::publish() { if (_has_local_target) { IRuntimeFilter* consumer_filter = nullptr; // TODO: log if err + DCHECK(_state != nullptr); RETURN_IF_ERROR( _state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter)); // push down @@ -1109,8 +1141,9 @@ Status IRuntimeFilter::publish() { return Status::OK(); } else { TNetworkAddress addr; + DCHECK(_state != nullptr); RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr)); - return push_to_remote(_state, &addr); + return push_to_remote(_state, &addr, _opt_remote_rf); } } @@ -1124,7 +1157,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::vector* push_ if (!_is_ignored) { _set_push_down(); _profile->add_info_string("Info", _format_status()); - return _wrapper->get_push_vexprs(push_vexprs, _state, _vprobe_ctx); + return _wrapper->get_push_vexprs(push_vexprs, _vprobe_ctx); } else { _profile->add_info_string("Info", _format_status()); return Status::OK(); @@ -1132,32 +1165,35 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::vector* push_ } Status IRuntimeFilter::get_prepared_vexprs(std::vector* vexprs, - const RowDescriptor& desc) { + const RowDescriptor& desc, RuntimeState* state) { _profile->add_info_string("Info", _format_status()); if (_is_ignored) { return Status::OK(); } - DCHECK((!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY) || - (_state->enable_pipeline_exec() && + DCHECK((!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) || + (_enable_pipeline_exec && _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY)); DCHECK(is_consumer()); std::lock_guard guard(_inner_mutex); if (_push_down_vexprs.empty()) { - RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, _state, _vprobe_ctx)); + RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, _vprobe_ctx)); } - // push expr vexprs->insert(vexprs->end(), _push_down_vexprs.begin(), _push_down_vexprs.end()); return Status::OK(); } bool IRuntimeFilter::await() { DCHECK(is_consumer()); + auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000 + : _state->execution_timeout() * 1000; + auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms() + : _state->runtime_filter_wait_time_ms(); // bitmap filter is precise filter and only filter once, so it must be applied. int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER - ? _state->execution_timeout() * 1000 - : _state->runtime_filter_wait_time_ms(); - if (_state->enable_pipeline_exec()) { + ? execution_timeout + : runtime_filter_wait_time_ms; + if (_enable_pipeline_exec) { auto expected = _rf_state_atomic.load(std::memory_order_acquire); if (expected == RuntimeFilterState::NOT_READY) { if (!_rf_state_atomic.compare_exchange_strong( @@ -1203,12 +1239,16 @@ bool IRuntimeFilter::await() { bool IRuntimeFilter::is_ready_or_timeout() { DCHECK(is_consumer()); auto cur_state = _rf_state_atomic.load(std::memory_order_acquire); + auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000 + : _state->execution_timeout() * 1000; + auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms() + : _state->runtime_filter_wait_time_ms(); // bitmap filter is precise filter and only filter once, so it must be applied. int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER - ? _state->execution_timeout() * 1000 - : _state->runtime_filter_wait_time_ms(); + ? execution_timeout + : runtime_filter_wait_time_ms; int64_t ms_since_registration = MonotonicMillis() - registration_time_; - if (!_state->enable_pipeline_exec()) { + if (!_enable_pipeline_exec) { _rf_state = RuntimeFilterState::TIME_OUT; return true; } else if (is_ready()) { @@ -1245,7 +1285,7 @@ bool IRuntimeFilter::is_ready_or_timeout() { void IRuntimeFilter::signal() { DCHECK(is_consumer()); - if (_state->enable_pipeline_exec()) { + if (_enable_pipeline_exec) { _rf_state_atomic.store(RuntimeFilterState::READY); } else { std::unique_lock lock(_inner_mutex); @@ -1261,6 +1301,10 @@ void IRuntimeFilter::signal() { _profile->add_info_string("BitmapSize", std::to_string(bitmap_filter->size())); _profile->add_info_string("IsNotIn", bitmap_filter->is_not_in() ? "true" : "false"); } + if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) { + _profile->add_info_string("BloomFilterSize", + std::to_string(_wrapper->get_bloom_filter_size())); + } } BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { @@ -1268,8 +1312,7 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const { } Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - UniqueId fragment_instance_id, int node_id, - bool build_bf_exactly) { + int node_id, bool build_bf_exactly) { // if node_id == -1 , it shouldn't be a consumer DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer())); @@ -1292,15 +1335,19 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue _has_remote_target = desc->has_remote_targets; _expr_order = desc->expr_order; _filter_id = desc->filter_id; + _opt_remote_rf = desc->__isset.opt_remote_rf && desc->opt_remote_rf; vectorized::VExprContext* build_ctx = nullptr; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, desc->src_expr, &build_ctx)); RuntimeFilterParams params; - params.fragment_instance_id = fragment_instance_id; params.filter_id = _filter_id; params.filter_type = _runtime_filter_type; params.column_return_type = build_ctx->root()->type().type; params.max_in_num = options->runtime_filter_max_in_num; + // We build runtime filter by exact distinct count iff three conditions are met: + // 1. Only 1 join key + // 2. Do not have remote target (e.g. do not need to merge) + // 3. Bloom filter params.build_bf_exactly = build_bf_exactly && !_has_remote_target && _runtime_filter_type == RuntimeFilterType::BLOOM_FILTER; if (desc->__isset.bloom_filter_size_bytes) { @@ -1334,7 +1381,11 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, iter->second, &_vprobe_ctx)); } - _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms)); + if (_state) { + _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms)); + } else { + _wrapper = _pool->add(new RuntimePredicateWrapper(_query_ctx, _pool, ¶ms)); + } return _wrapper->init(¶ms); } @@ -1346,6 +1397,10 @@ Status IRuntimeFilter::serialize(PPublishFilterRequest* request, void** data, in return serialize_impl(request, data, len); } +Status IRuntimeFilter::serialize(PPublishFilterRequestV2* request, void** data, int* len) { + return serialize_impl(request, data, len); +} + Status IRuntimeFilter::create_wrapper(RuntimeState* state, const MergeRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr* wrapper) { @@ -1358,6 +1413,35 @@ Status IRuntimeFilter::create_wrapper(RuntimeState* state, const UpdateRuntimeFi return _create_wrapper(state, param, pool, wrapper); } +Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx, + const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool, + std::unique_ptr* wrapper) { + int filter_type = param->request->filter_type(); + PrimitiveType column_type = PrimitiveType::INVALID_TYPE; + if (param->request->has_in_filter()) { + column_type = to_primitive_type(param->request->in_filter().column_type()); + } + wrapper->reset(new RuntimePredicateWrapper(query_ctx, pool, column_type, get_type(filter_type), + param->request->filter_id())); + + switch (filter_type) { + case PFilterType::IN_FILTER: { + DCHECK(param->request->has_in_filter()); + return (*wrapper)->assign(¶m->request->in_filter()); + } + case PFilterType::BLOOM_FILTER: { + DCHECK(param->request->has_bloom_filter()); + return (*wrapper)->assign(¶m->request->bloom_filter(), param->data); + } + case PFilterType::MINMAX_FILTER: { + DCHECK(param->request->has_minmax_filter()); + return (*wrapper)->assign(¶m->request->minmax_filter()); + } + default: + return Status::InvalidArgument("unknown filter type"); + } +} + void IRuntimeFilter::change_to_bloom_filter() { auto origin_type = _wrapper->get_real_type(); _wrapper->change_to_bloom_filter(); @@ -1379,7 +1463,6 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, Obje column_type = to_primitive_type(param->request->in_filter().column_type()); } wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type, get_type(filter_type), - UniqueId(param->request->fragment_id()), param->request->filter_id())); switch (filter_type) { @@ -1401,11 +1484,22 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, Obje } void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { - DCHECK(parent_profile != nullptr); - _profile.reset(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, - ::doris::to_string(_runtime_filter_type)))); + if (_profile_init) { + return; + } + { + std::lock_guard guard(_inner_mutex); + if (_profile_init) { + return; + } + DCHECK(parent_profile != nullptr); + _profile.reset( + new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, + ::doris::to_string(_runtime_filter_type)))); + _profile_init = true; + } parent_profile->add_child(_profile.get(), true, nullptr); - if (!_state->enable_pipeline_exec()) { + if (!_enable_pipeline_exec) { _await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost"); } _profile->add_info_string("Info", _format_status()); @@ -1421,10 +1515,6 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { } } -void IRuntimeFilter::set_push_down_profile() { - _profile->add_info_string("HasPushDownToEngine", "true"); -} - void IRuntimeFilter::ready_for_publish() { _wrapper->ready_for_publish(); } @@ -1446,10 +1536,6 @@ Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { return status; } -const RuntimePredicateWrapper* IRuntimeFilter::get_wrapper() { - return _wrapper; -} - template void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it, void (*set_func)(PColumnValue*, const T*)) { @@ -1752,21 +1838,46 @@ Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) { return Status::OK(); } +Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param) { + int64_t start_update = MonotonicMillis(); + if (param->request->has_in_filter() && param->request->in_filter().has_ignored_msg()) { + set_ignored(); + const PInFilter in_filter = param->request->in_filter(); + auto msg = param->pool->add(new std::string(in_filter.ignored_msg())); + set_ignored_msg(*msg); + } + + std::unique_ptr tmp_wrapper; + RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_query_ctx, param, _pool, &tmp_wrapper)); + auto origin_type = _wrapper->get_real_type(); + RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get())); + if (origin_type != _wrapper->get_real_type()) { + update_runtime_filter_type_to_profile(); + } + this->signal(); + + _profile->add_info_string("MergeTime", std::to_string(param->request->merge_time()) + " ms"); + _profile->add_info_string("UpdateTime", + std::to_string(MonotonicMillis() - start_update) + " ms"); + return Status::OK(); +} + Status IRuntimeFilter::consumer_close() { DCHECK(is_consumer()); return Status::OK(); } Status RuntimePredicateWrapper::get_push_vexprs(std::vector* container, - RuntimeState* state, vectorized::VExprContext* vprob_expr) { - DCHECK(state != nullptr); DCHECK(container != nullptr); DCHECK(_pool != nullptr); DCHECK(vprob_expr->root()->type().type == _column_return_type || (is_string_type(vprob_expr->root()->type().type) && is_string_type(_column_return_type)) || - _filter_type == RuntimeFilterType::BITMAP_FILTER); + _filter_type == RuntimeFilterType::BITMAP_FILTER) + << " vprob_expr->root()->type().type: " << vprob_expr->root()->type().type + << " _column_return_type: " << _column_return_type + << " _filter_type: " << ::doris::to_string(_filter_type); auto real_filter_type = get_real_type(); switch (real_filter_type) { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index a2f6995d59..52191b5182 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -32,6 +32,7 @@ #include "runtime/define_primitive_type.h" #include "runtime/large_int_value.h" #include "runtime/primitive_type.h" +#include "runtime/query_context.h" #include "runtime/runtime_state.h" #include "runtime/types.h" #include "util/lock.h" @@ -51,6 +52,7 @@ namespace doris { class ObjectPool; class RuntimePredicateWrapper; class PPublishFilterRequest; +class PPublishFilterRequestV2; class PMergeFilterRequest; class TRuntimeFilterDesc; class RowDescriptor; @@ -106,7 +108,6 @@ struct RuntimeFilterParams { bloom_filter_size(-1), max_in_num(0), filter_id(0), - fragment_instance_id(0, 0), bitmap_filter_not_in(false) {} RuntimeFilterType filter_type; @@ -115,7 +116,6 @@ struct RuntimeFilterParams { int64_t bloom_filter_size; int32_t max_in_num; int32_t filter_id; - UniqueId fragment_instance_id; bool bitmap_filter_not_in; bool build_bf_exactly; }; @@ -129,6 +129,16 @@ struct UpdateRuntimeFilterParams { ObjectPool* pool; }; +struct UpdateRuntimeFilterParamsV2 { + UpdateRuntimeFilterParamsV2(const PPublishFilterRequestV2* req, + butil::IOBufAsZeroCopyInputStream* data_stream, + ObjectPool* obj_pool) + : request(req), data(data_stream), pool(obj_pool) {} + const PPublishFilterRequestV2* request; + butil::IOBufAsZeroCopyInputStream* data; + ObjectPool* pool; +}; + struct MergeRuntimeFilterParams { MergeRuntimeFilterParams(const PMergeFilterRequest* req, butil::IOBufAsZeroCopyInputStream* data_stream) @@ -164,7 +174,25 @@ public: _expr_order(-1), _always_true(false), _is_ignored(false), - registration_time_(MonotonicMillis()) {} + registration_time_(MonotonicMillis()), + _enable_pipeline_exec(_state->enable_pipeline_exec()) {} + + IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool) + : _query_ctx(query_ctx), + _pool(pool), + _runtime_filter_type(RuntimeFilterType::UNKNOWN_FILTER), + _filter_id(-1), + _is_broadcast_join(true), + _has_remote_target(false), + _has_local_target(false), + _rf_state(RuntimeFilterState::NOT_READY), + _rf_state_atomic(RuntimeFilterState::NOT_READY), + _role(RuntimeFilterRole::PRODUCER), + _expr_order(-1), + _always_true(false), + _is_ignored(false), + registration_time_(MonotonicMillis()), + _enable_pipeline_exec(query_ctx->enable_pipeline_exec()) {} ~IRuntimeFilter() = default; @@ -172,6 +200,10 @@ public: const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, bool build_bf_exactly = false); + static Status create(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc, + const TQueryOptions* query_options, const RuntimeFilterRole role, + int node_id, IRuntimeFilter** res, bool build_bf_exactly = false); + void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context); Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context); @@ -192,20 +224,19 @@ public: Status get_push_expr_ctxs(std::vector* push_vexprs); Status get_prepared_vexprs(std::vector* push_vexprs, - const RowDescriptor& desc); + const RowDescriptor& desc, RuntimeState* state); bool is_broadcast_join() const { return _is_broadcast_join; } bool has_remote_target() const { return _has_remote_target; } bool is_ready() const { - return (!_state->enable_pipeline_exec() && _rf_state == RuntimeFilterState::READY) || - (_state->enable_pipeline_exec() && + return (!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY) || + (_enable_pipeline_exec && _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY); } RuntimeFilterState current_state() const { - return _state->enable_pipeline_exec() ? _rf_state_atomic.load(std::memory_order_acquire) - : _rf_state; + return _enable_pipeline_exec ? _rf_state_atomic.load(std::memory_order_acquire) : _rf_state; } bool is_ready_or_timeout(); @@ -226,33 +257,34 @@ public: // init filter with desc Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options, - UniqueId fragment_id, int node_id = -1, bool build_bf_exactly = false); + int node_id = -1, bool build_bf_exactly = false); BloomFilterFuncBase* get_bloomfilter() const; // serialize _wrapper to protobuf Status serialize(PMergeFilterRequest* request, void** data, int* len); Status serialize(PPublishFilterRequest* request, void** data = nullptr, int* len = nullptr); + Status serialize(PPublishFilterRequestV2* request, void** data = nullptr, int* len = nullptr); Status merge_from(const RuntimePredicateWrapper* wrapper); // for ut - const RuntimePredicateWrapper* get_wrapper(); static Status create_wrapper(RuntimeState* state, const MergeRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr* wrapper); static Status create_wrapper(RuntimeState* state, const UpdateRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr* wrapper); + static Status create_wrapper(QueryContext* query_ctx, const UpdateRuntimeFilterParamsV2* param, + ObjectPool* pool, + std::unique_ptr* wrapper); void change_to_bloom_filter(); Status init_bloom_filter(const size_t build_bf_cardinality); Status update_filter(const UpdateRuntimeFilterParams* param); + Status update_filter(const UpdateRuntimeFilterParamsV2* param); void set_ignored() { _is_ignored = true; } - // for ut - bool is_ignored() const { return _is_ignored; } - void set_ignored_msg(std::string& msg) { _ignored_msg = msg; } // for ut @@ -262,21 +294,17 @@ public: Status consumer_close(); // async push runtimefilter to remote node - Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr); + Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, bool opt_remote_rf); Status join_rpc(); void init_profile(RuntimeProfile* parent_profile); void update_runtime_filter_type_to_profile(); - void set_push_down_profile(); - void ready_for_publish(); - std::shared_ptr get_bitmap_filter() const; - - static bool enable_use_batch(int be_exec_version, PrimitiveType type) { - return be_exec_version > 0 && (is_int_or_bool(type) || is_float_or_double(type)); + static bool enable_use_batch(bool use_batch, PrimitiveType type) { + return use_batch && (is_int_or_bool(type) || is_float_or_double(type)); } int filter_id() const { return _filter_id; } @@ -304,7 +332,7 @@ protected: } std::string _get_explain_state_string() { - if (_state->enable_pipeline_exec()) { + if (_enable_pipeline_exec) { return _rf_state_atomic.load(std::memory_order_acquire) == RuntimeFilterState::READY ? "READY" : _rf_state_atomic.load(std::memory_order_acquire) == @@ -318,11 +346,12 @@ protected: } } - RuntimeState* _state; + RuntimeState* _state = nullptr; + QueryContext* _query_ctx = nullptr; ObjectPool* _pool; // _wrapper is a runtime filter function wrapper // _wrapper should alloc from _pool - RuntimePredicateWrapper* _wrapper = nullptr; + RuntimePredicateWrapper* _wrapper; // runtime filter type RuntimeFilterType _runtime_filter_type; // runtime filter id @@ -350,7 +379,7 @@ protected: // this filter won't filter any data bool _always_true; - doris::vectorized::VExprContext* _vprobe_ctx; + doris::vectorized::VExprContext* _vprobe_ctx = nullptr; // Indicate whether runtime filter expr has been ignored bool _is_ignored; @@ -370,6 +399,12 @@ protected: /// Time in ms (from MonotonicMillis()), that the filter was registered. const int64_t registration_time_; + + const bool _enable_pipeline_exec; + + bool _profile_init = false; + + bool _opt_remote_rf; }; // avoid expose RuntimePredicateWrapper diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp index 8a5c4acb9b..829224f3f2 100644 --- a/be/src/exprs/runtime_filter_rpc.cpp +++ b/be/src/exprs/runtime_filter_rpc.cpp @@ -45,7 +45,8 @@ struct IRuntimeFilter::rpc_context { brpc::CallId cid; }; -Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) { +Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr, + bool opt_remote_rf) { DCHECK(is_producer()); DCHECK(_rpc_context == nullptr); std::shared_ptr stub( @@ -69,6 +70,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress pfragment_instance_id->set_lo(state->fragment_instance_id().lo); _rpc_context->request.set_filter_id(_filter_id); + _rpc_context->request.set_opt_remote_rf(opt_remote_rf); _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec()); _rpc_context->cntl.set_timeout_ms(1000); _rpc_context->cid = _rpc_context->cntl.call_id(); diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h index 9090ea86a3..73c2e9fba1 100644 --- a/be/src/olap/bloom_filter_predicate.h +++ b/be/src/olap/bloom_filter_predicate.h @@ -89,8 +89,7 @@ private: } } } - - } else if (IRuntimeFilter::enable_use_batch(_be_exec_version, T)) { + } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, T)) { const auto& data = reinterpret_cast< const vectorized::PredicateColumnType>*>( diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7a7ec74835..e2a8802fd9 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -18,6 +18,7 @@ #include "runtime/fragment_mgr.h" #include +#include #include #include #include @@ -640,7 +641,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } else { // This may be a first fragment request of the query. // Create the query fragments context. - query_ctx = QueryContext::create_shared(params.fragment_num_on_host, _exec_env); + query_ctx = QueryContext::create_shared(params.fragment_num_on_host, _exec_env, + params.query_options); query_ctx->query_id = query_id; RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); @@ -1155,10 +1157,65 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, return runtime_filter_mgr->update_filter(request, attach_data); } +Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, + butil::IOBufAsZeroCopyInputStream* attach_data) { + bool is_pipeline = request->has_is_pipeline() && request->is_pipeline(); + + const auto& fragment_instance_ids = request->fragment_instance_ids(); + if (fragment_instance_ids.size() > 0) { + UniqueId fragment_instance_id = fragment_instance_ids[0]; + TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); + + std::shared_ptr fragment_state; + std::shared_ptr pip_context; + + RuntimeFilterMgr* runtime_filter_mgr = nullptr; + ObjectPool* pool; + if (is_pipeline) { + std::unique_lock lock(_lock); + auto iter = _pipeline_map.find(tfragment_instance_id); + if (iter == _pipeline_map.end()) { + VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; + return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); + } + pip_context = iter->second; + + DCHECK(pip_context != nullptr); + runtime_filter_mgr = + pip_context->get_runtime_state()->get_query_ctx()->runtime_filter_mgr(); + pool = &pip_context->get_query_context()->obj_pool; + } else { + std::unique_lock lock(_lock); + auto iter = _fragment_map.find(tfragment_instance_id); + if (iter == _fragment_map.end()) { + VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; + return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); + } + fragment_state = iter->second; + + DCHECK(fragment_state != nullptr); + runtime_filter_mgr = fragment_state->executor() + ->runtime_state() + ->get_query_ctx() + ->runtime_filter_mgr(); + pool = &fragment_state->get_query_ctx()->obj_pool; + } + + UpdateRuntimeFilterParamsV2 params(request, attach_data, pool); + int filter_id = request->filter_id(); + IRuntimeFilter* real_filter = nullptr; + RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id, &real_filter)); + RETURN_IF_ERROR(real_filter->update_filter(¶ms)); + } + + return Status::OK(); +} + Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data) { UniqueId queryid = request->query_id(); bool is_pipeline = request->has_is_pipeline() && request->is_pipeline(); + bool opt_remote_rf = request->has_opt_remote_rf() && request->opt_remote_rf(); std::shared_ptr filter_controller; RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); @@ -1189,7 +1246,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, // when filter_controller->merge is still in progress fragment_state = iter->second; } - RETURN_IF_ERROR(filter_controller->merge(request, attach_data)); + RETURN_IF_ERROR(filter_controller->merge(request, attach_data, opt_remote_rf)); return Status::OK(); } diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index ad7f830a7f..8ca58ccffa 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -129,6 +129,9 @@ public: Status apply_filter(const PPublishFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data); + Status apply_filterv2(const PPublishFilterRequestV2* request, + butil::IOBufAsZeroCopyInputStream* attach_data); + Status merge_filter(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index b941f0fe86..0909702c70 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -30,6 +30,7 @@ #include "runtime/datetime_value.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_predicate.h" #include "task_group/task_group.h" #include "util/pretty_printer.h" @@ -48,8 +49,12 @@ class QueryContext { ENABLE_FACTORY_CREATOR(QueryContext); public: - QueryContext(int total_fragment_num, ExecEnv* exec_env) - : fragment_num(total_fragment_num), timeout_second(-1), _exec_env(exec_env) { + QueryContext(int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options) + : fragment_num(total_fragment_num), + timeout_second(-1), + _exec_env(exec_env), + _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)), + _query_options(query_options) { _start_time = vectorized::VecDateTimeValue::local_time(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _shared_scanner_controller.reset(new vectorized::SharedScannerController()); @@ -139,6 +144,29 @@ public: taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); } + int execution_timeout() const { + return _query_options.__isset.execution_timeout ? _query_options.execution_timeout + : _query_options.query_timeout; + } + + int32_t runtime_filter_wait_time_ms() const { + return _query_options.runtime_filter_wait_time_ms; + } + + bool enable_pipeline_exec() const { + return _query_options.__isset.enable_pipeline_engine && + _query_options.enable_pipeline_engine; + } + + int be_exec_version() const { + if (!_query_options.__isset.be_exec_version) { + return 0; + } + return _query_options.be_exec_version; + } + + RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } + public: TUniqueId query_id; DescriptorTbl* desc_tbl; @@ -186,6 +214,8 @@ private: vectorized::RuntimePredicate _runtime_predicate; taskgroup::TaskGroupPtr _task_group; + std::unique_ptr _runtime_filter_mgr; + const TQueryOptions _query_options; }; } // namespace doris diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index e956122130..b6dfa0a625 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -35,6 +35,7 @@ #include "exprs/runtime_filter.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" +#include "runtime/query_context.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "util/brpc_client_cache.h" @@ -51,10 +52,12 @@ struct async_rpc_context { RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state) : _state(state) {} +RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx) + : _query_ctx(query_ctx) {} + RuntimeFilterMgr::~RuntimeFilterMgr() {} Status RuntimeFilterMgr::init() { - DCHECK(_state->query_mem_tracker() != nullptr); _tracker = std::make_unique("RuntimeFilterMgr", ExecEnv::GetInstance()->experimental_mem_tracker()); return Status::OK(); @@ -107,20 +110,41 @@ Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role, VLOG_NOTICE << "regist filter...:" << key << ",role:" << (int)role; auto iter = filter_map->find(key); - if (iter != filter_map->end()) { - return Status::InvalidArgument("filter has registed"); - } RuntimeFilterMgrVal filter_mgr_val; filter_mgr_val.role = role; - RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id, - &filter_mgr_val.filter, build_bf_exactly)); - - filter_map->emplace(key, filter_mgr_val); + if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && role == RuntimeFilterRole::CONSUMER && + desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) { + // if this runtime filter has remote target (e.g. need merge), we reuse the runtime filter between all instances + DCHECK(_query_ctx != nullptr); + if (iter != filter_map->end()) { + return Status::OK(); + } + { + std::lock_guard l(_lock); + iter = filter_map->find(key); + if (iter != filter_map->end()) { + return Status::OK(); + } + RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, &_query_ctx->obj_pool, &desc, + &options, role, node_id, &filter_mgr_val.filter, + build_bf_exactly)); + filter_map->emplace(key, filter_mgr_val); + } + } else { + DCHECK(_state != nullptr); + if (iter != filter_map->end()) { + return Status::InvalidArgument("filter has registed"); + } + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, role, node_id, + &filter_mgr_val.filter, build_bf_exactly)); + filter_map->emplace(key, filter_mgr_val); + } return Status::OK(); } + Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, butil::IOBufAsZeroCopyInputStream* data) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); @@ -158,12 +182,34 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( cntVal->runtime_filter_desc = *runtime_filter_desc; cntVal->target_info = *target_info; cntVal->pool.reset(new ObjectPool()); - cntVal->filter = cntVal->pool->add(new IRuntimeFilter(_state, cntVal->pool.get())); + cntVal->filter = + cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool)); std::string filter_id = std::to_string(runtime_filter_desc->filter_id); // LOG(INFO) << "entity filter id:" << filter_id; - cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, - _fragment_instance_id, -1, false); + cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, -1, false); + _filter_map.emplace(filter_id, cntVal); + return Status::OK(); +} + +Status RuntimeFilterMergeControllerEntity::_init_with_desc( + const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options, + const std::vector* targetv2_info, + const int producer_size) { + std::lock_guard guard(_filter_map_mutex); + std::shared_ptr cntVal = std::make_shared(); + // runtime_filter_desc and target will be released, + // so we need to copy to cntVal + cntVal->producer_size = producer_size; + cntVal->runtime_filter_desc = *runtime_filter_desc; + cntVal->targetv2_info = *targetv2_info; + cntVal->pool.reset(new ObjectPool()); + cntVal->filter = + cntVal->pool->add(new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool)); + + std::string filter_id = std::to_string(runtime_filter_desc->filter_id); + // LOG(INFO) << "entity filter id:" << filter_id; + cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options); _filter_map.emplace(filter_id, cntVal); return Status::OK(); } @@ -179,26 +225,60 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { int filter_id = filterid_to_desc.first; const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id); - if (target_iter == runtime_filter_params.rid_to_target_param.end()) { + if (target_iter == runtime_filter_params.rid_to_target_param.end() && + !runtime_filter_params.__isset.rid_to_target_paramv2) { return Status::InternalError("runtime filter params meet error"); + } else if (target_iter == runtime_filter_params.rid_to_target_param.end()) { + const auto& targetv2_iter = runtime_filter_params.rid_to_target_paramv2.find(filter_id); + if (targetv2_iter == runtime_filter_params.rid_to_target_paramv2.end()) { + return Status::InternalError("runtime filter params meet error"); + } + const auto& build_iter = + runtime_filter_params.runtime_filter_builder_num.find(filter_id); + if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { + return Status::InternalError("runtime filter params meet error"); + } + _init_with_desc(&filterid_to_desc.second, &query_options, &targetv2_iter->second, + build_iter->second); + } else { + const auto& build_iter = + runtime_filter_params.runtime_filter_builder_num.find(filter_id); + if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { + return Status::InternalError("runtime filter params meet error"); + } + _init_with_desc(&filterid_to_desc.second, &query_options, &target_iter->second, + build_iter->second); } - const auto& build_iter = runtime_filter_params.runtime_filter_builder_num.find(filter_id); - if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { - return Status::InternalError("runtime filter params meet error"); + } + if (runtime_filter_params.__isset.rid_to_runtime_filter) { + for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { + int filter_id = filterid_to_desc.first; + const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id); + if (target_iter == runtime_filter_params.rid_to_target_param.end()) { + return Status::InternalError("runtime filter params meet error"); + } + const auto& build_iter = + runtime_filter_params.runtime_filter_builder_num.find(filter_id); + if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { + return Status::InternalError("runtime filter params meet error"); + } + _init_with_desc(&filterid_to_desc.second, &query_options, &target_iter->second, + build_iter->second); } - _init_with_desc(&filterid_to_desc.second, &query_options, &target_iter->second, - build_iter->second); } return Status::OK(); } // merge data Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request, - butil::IOBufAsZeroCopyInputStream* attach_data) { + butil::IOBufAsZeroCopyInputStream* attach_data, + bool opt_remote_rf) { + _opt_remote_rf = _opt_remote_rf && opt_remote_rf; SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); std::shared_ptr cntVal; int merged_size = 0; { + int64_t start_merge = MonotonicMillis(); std::lock_guard guard(_filter_map_mutex); auto iter = _filter_map.find(std::to_string(request->filter_id())); VLOG_ROW << "recv filter id:" << request->filter_id() << " " << request->ShortDebugString(); @@ -220,70 +300,143 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ // TODO: avoid log when we had acquired a lock VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size; DCHECK_LE(merged_size, cntVal->producer_size); + _merge_timer += (MonotonicMillis() - start_merge); if (merged_size < cntVal->producer_size) { return Status::OK(); } } if (merged_size == cntVal->producer_size) { - // prepare rpc context - using PPublishFilterRpcContext = - async_rpc_context; - std::vector> rpc_contexts; - rpc_contexts.reserve(cntVal->target_info.size()); + if (opt_remote_rf) { + DCHECK_GT(cntVal->targetv2_info.size(), 0); + DCHECK(cntVal->filter->is_bloomfilter()); + // Optimize merging phase iff: + // 1. All BE has been upgraded (e.g. _opt_remote_rf) + // 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0) + // 3. This filter is bloom filter (only bloom filter should be used for merging) + using PPublishFilterRpcContext = + async_rpc_context; + std::vector> rpc_contexts; + rpc_contexts.reserve(cntVal->targetv2_info.size()); - butil::IOBuf request_attachment; + butil::IOBuf request_attachment; - PPublishFilterRequest apply_request; - // serialize filter - void* data = nullptr; - int len = 0; - bool has_attachment = false; - RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len)); - if (data != nullptr && len > 0) { - request_attachment.append(data, len); - has_attachment = true; - } - - std::vector& targets = cntVal->target_info; - for (size_t i = 0; i < targets.size(); i++) { - rpc_contexts.emplace_back(new PPublishFilterRpcContext); - size_t cur = rpc_contexts.size() - 1; - rpc_contexts[cur]->request = apply_request; - rpc_contexts[cur]->request.set_filter_id(request->filter_id()); - rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() && - request->is_pipeline()); - *rpc_contexts[cur]->request.mutable_query_id() = request->query_id(); - if (has_attachment) { - rpc_contexts[cur]->cntl.request_attachment().append(request_attachment); + PPublishFilterRequestV2 apply_request; + // serialize filter + void* data = nullptr; + int len = 0; + bool has_attachment = false; + RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len)); + if (data != nullptr && len > 0) { + request_attachment.append(data, len); + has_attachment = true; } - rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id(); - // set fragment-id - auto request_fragment_id = rpc_contexts[cur]->request.mutable_fragment_id(); - request_fragment_id->set_hi(targets[cur].target_fragment_instance_id.hi); - request_fragment_id->set_lo(targets[cur].target_fragment_instance_id.lo); + std::vector& targets = cntVal->targetv2_info; + for (size_t i = 0; i < targets.size(); i++) { + rpc_contexts.emplace_back(new PPublishFilterRpcContext); + size_t cur = rpc_contexts.size() - 1; + rpc_contexts[cur]->request = apply_request; + rpc_contexts[cur]->request.set_filter_id(request->filter_id()); + rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() && + request->is_pipeline()); + rpc_contexts[cur]->request.set_merge_time(_merge_timer); + *rpc_contexts[cur]->request.mutable_query_id() = request->query_id(); + if (has_attachment) { + rpc_contexts[cur]->cntl.request_attachment().append(request_attachment); + } + rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id(); - std::shared_ptr stub( - ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( - targets[i].target_fragment_instance_addr)); - VLOG_NOTICE << "send filter " << rpc_contexts[cur]->request.filter_id() - << " to:" << targets[i].target_fragment_instance_addr.hostname << ":" - << targets[i].target_fragment_instance_addr.port - << rpc_contexts[cur]->request.ShortDebugString(); - if (stub == nullptr) { - rpc_contexts.pop_back(); - continue; + // set fragment-id + for (size_t fid = 0; fid < targets[cur].target_fragment_instance_ids.size(); + fid++) { + PUniqueId* cur_id = rpc_contexts[cur]->request.add_fragment_instance_ids(); + cur_id->set_hi(targets[cur].target_fragment_instance_ids[fid].hi); + cur_id->set_lo(targets[cur].target_fragment_instance_ids[fid].lo); + } + + std::shared_ptr stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + targets[i].target_fragment_instance_addr)); + VLOG_NOTICE << "send filter " << rpc_contexts[cur]->request.filter_id() + << " to:" << targets[i].target_fragment_instance_addr.hostname << ":" + << targets[i].target_fragment_instance_addr.port + << rpc_contexts[cur]->request.ShortDebugString(); + if (stub == nullptr) { + rpc_contexts.pop_back(); + continue; + } + stub->apply_filterv2(&rpc_contexts[cur]->cntl, &rpc_contexts[cur]->request, + &rpc_contexts[cur]->response, brpc::DoNothing()); } - stub->apply_filter(&rpc_contexts[cur]->cntl, &rpc_contexts[cur]->request, - &rpc_contexts[cur]->response, brpc::DoNothing()); - } - for (auto& rpc_context : rpc_contexts) { - brpc::Join(rpc_context->cid); - if (rpc_context->cntl.Failed()) { - LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText(); - ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( - rpc_context->cntl.remote_side()); + for (auto& rpc_context : rpc_contexts) { + brpc::Join(rpc_context->cid); + if (rpc_context->cntl.Failed()) { + LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText(); + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + rpc_context->cntl.remote_side()); + } + } + } else { + // prepare rpc context + using PPublishFilterRpcContext = + async_rpc_context; + std::vector> rpc_contexts; + rpc_contexts.reserve(cntVal->target_info.size()); + + butil::IOBuf request_attachment; + + PPublishFilterRequest apply_request; + // serialize filter + void* data = nullptr; + int len = 0; + bool has_attachment = false; + RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data, &len)); + if (data != nullptr && len > 0) { + request_attachment.append(data, len); + has_attachment = true; + } + + std::vector& targets = cntVal->target_info; + for (size_t i = 0; i < targets.size(); i++) { + rpc_contexts.emplace_back(new PPublishFilterRpcContext); + size_t cur = rpc_contexts.size() - 1; + rpc_contexts[cur]->request = apply_request; + rpc_contexts[cur]->request.set_filter_id(request->filter_id()); + rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() && + request->is_pipeline()); + *rpc_contexts[cur]->request.mutable_query_id() = request->query_id(); + if (has_attachment) { + rpc_contexts[cur]->cntl.request_attachment().append(request_attachment); + } + rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id(); + + // set fragment-id + auto request_fragment_id = rpc_contexts[cur]->request.mutable_fragment_id(); + request_fragment_id->set_hi(targets[cur].target_fragment_instance_id.hi); + request_fragment_id->set_lo(targets[cur].target_fragment_instance_id.lo); + + std::shared_ptr stub( + ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( + targets[i].target_fragment_instance_addr)); + VLOG_NOTICE << "send filter " << rpc_contexts[cur]->request.filter_id() + << " to:" << targets[i].target_fragment_instance_addr.hostname << ":" + << targets[i].target_fragment_instance_addr.port + << rpc_contexts[cur]->request.ShortDebugString(); + if (stub == nullptr) { + rpc_contexts.pop_back(); + continue; + } + stub->apply_filter(&rpc_contexts[cur]->cntl, &rpc_contexts[cur]->request, + &rpc_contexts[cur]->response, brpc::DoNothing()); + } + for (auto& rpc_context : rpc_contexts) { + brpc::Join(rpc_context->cid); + if (rpc_context->cntl.Failed()) { + LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText(); + ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( + rpc_context->cntl.remote_side()); + } } } } diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 09a760eff8..0a0a70ec7a 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -41,11 +41,14 @@ class IOBufAsZeroCopyInputStream; namespace doris { class PPublishFilterRequest; +class PPublishFilterRequestV2; class PMergeFilterRequest; class IRuntimeFilter; class MemTracker; class RuntimeState; enum class RuntimeFilterRole; +class RuntimePredicateWrapper; +class QueryContext; /// producer: /// Filter filter; @@ -63,6 +66,8 @@ class RuntimeFilterMgr { public: RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state); + RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx); + ~RuntimeFilterMgr(); Status init(); @@ -100,12 +105,14 @@ private: std::map _producer_map; RuntimeState* _state; + QueryContext* _query_ctx; std::unique_ptr _tracker; ObjectPool _pool; TNetworkAddress _merge_addr; bool _has_merge_addr; + std::mutex _lock; }; // controller -> @@ -123,8 +130,8 @@ public: const TQueryOptions& query_options); // handle merge rpc - Status merge(const PMergeFilterRequest* request, - butil::IOBufAsZeroCopyInputStream* attach_data); + Status merge(const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data, + bool opt_remote_rf); UniqueId query_id() const { return _query_id; } @@ -135,6 +142,7 @@ public: int producer_size; TRuntimeFilterDesc runtime_filter_desc; std::vector target_info; + std::vector targetv2_info; IRuntimeFilter* filter; std::unordered_set arrive_id; // fragment_instance_id ? std::shared_ptr pool; @@ -149,6 +157,11 @@ private: const std::vector* target_info, const int producer_size); + Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, + const TQueryOptions* query_options, + const std::vector* target_info, + const int producer_size); + UniqueId _query_id; UniqueId _fragment_instance_id; // protect _filter_map @@ -158,6 +171,8 @@ private: // filter-id -> val std::map> _filter_map; RuntimeState* _state; + bool _opt_remote_rf = true; + int64_t _merge_timer = 0; }; // RuntimeFilterMergeController has a map query-id -> entity diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 512438c776..0dbc26d6ac 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -829,6 +829,30 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr } } +void PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController* controller, + const ::doris::PPublishFilterRequestV2* request, + ::doris::PPublishFilterResponse* response, + ::google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, controller, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + auto attachment = static_cast(controller)->request_attachment(); + butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment); + UniqueId unique_id(request->query_id()); + VLOG_NOTICE << "rpc apply_filterv2 recv"; + Status st = _exec_env->fragment_mgr()->apply_filterv2(request, &zero_copy_input_stream); + if (!st.ok()) { + LOG(WARNING) << "apply filter meet error: " << st.to_string(); + } + st.to_protobuf(response->mutable_status()); + }); + if (!ret) { + LOG(WARNING) << "fail to offer request to the work pool"; + brpc::ClosureGuard closure_guard(done); + response->mutable_status()->set_status_code(TStatusCode::CANCELLED); + response->mutable_status()->add_error_msgs("fail to offer request to the work pool"); + } +} + void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 6cfbae4fa7..22042196e2 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -123,6 +123,10 @@ public: const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) override; + void apply_filterv2(::google::protobuf::RpcController* controller, + const ::doris::PPublishFilterRequestV2* request, + ::doris::PPublishFilterResponse* response, + ::google::protobuf::Closure* done) override; void transmit_block(::google::protobuf::RpcController* controller, const ::doris::PTransmitDataParams* request, ::doris::PTransmitDataResult* response, diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index f811775d69..67ad6d29c1 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -428,6 +428,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { ADD_TIMER(_build_phase_profile, "BuildTableConvertToPartitionedTime"); _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT); _build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile, "BuildSideHashComputingTime"); + _build_runtime_filter_timer = ADD_TIMER(_build_phase_profile, "BuildRuntimeFilterTime"); // Probe phase auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true); @@ -441,7 +442,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer"); - _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime"); + _push_down_timer = ADD_TIMER(runtime_profile(), "PublishRuntimeFilterTime"); _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime"); _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT); _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index eb5bf7d240..eef8b30b90 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -288,6 +288,7 @@ private: RuntimeProfile::Counter* _probe_side_output_timer; RuntimeProfile::Counter* _build_side_compute_hash_timer; RuntimeProfile::Counter* _build_side_merge_block_timer; + RuntimeProfile::Counter* _build_runtime_filter_timer; RuntimeProfile::Counter* _build_blocks_memory_usage; RuntimeProfile::Counter* _hash_table_memory_usage; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 6ff46427ed..d13d4fc9c8 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -316,10 +316,23 @@ Status VScanNode::_register_runtime_filter() { for (int i = 0; i < filter_size; ++i) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; - RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter( - RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id())); - RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, - &runtime_filter)); + if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { + DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM && filter_desc.has_remote_targets); + // Optimize merging phase iff: + // 1. All BE and FE has been upgraded (e.g. opt_remote_rf) + // 2. This filter is bloom filter (only bloom filter should be used for merging) + RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_filter( + RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(), + false)); + RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( + filter_desc.filter_id, &runtime_filter)); + } else { + RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter( + RuntimeFilterRole::CONSUMER, filter_desc, _state->query_options(), id(), + false)); + RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, + &runtime_filter)); + } _runtime_filter_ctxs.emplace_back(runtime_filter); _runtime_filter_ready_flag.emplace_back(false); } @@ -345,13 +358,6 @@ Status VScanNode::_acquire_runtime_filter(bool wait) { std::vector vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtime_filter; - // If all targets are local, scan node will use hash node's runtime filter, and we don't - // need to allocate memory again - if (runtime_filter->has_remote_target()) { - if (auto bf = runtime_filter->get_bloomfilter()) { - RETURN_IF_ERROR(bf->init_with_fixed_length()); - } - } bool ready = runtime_filter->is_ready(); if (!ready && wait) { ready = runtime_filter->await(); @@ -1329,7 +1335,8 @@ Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { ++current_arrived_rf_num; continue; } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { - _runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor); + _runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor, + _state); ++current_arrived_rf_num; _runtime_filter_ctxs[i].apply_mark = true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 58d0086ed6..fe6180eff1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -190,8 +190,12 @@ public final class RuntimeFilter { tFilter.setIsBroadcastJoin(isBroadcastJoin); tFilter.setHasLocalTargets(hasLocalTargets); tFilter.setHasRemoteTargets(hasRemoteTargets); + boolean optRemoteRf = true; for (RuntimeFilterTarget target : targets) { tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift()); + // TODO: now only support SlotRef + optRemoteRf = optRemoteRf && hasRemoteTargets && runtimeFilterType == TRuntimeFilterType.BLOOM + && target.expr instanceof SlotRef; } tFilter.setType(runtimeFilterType); tFilter.setBloomFilterSizeBytes(filterSizeBytes); @@ -199,6 +203,7 @@ public final class RuntimeFilter { tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift()); tFilter.setBitmapFilterNotIn(bitmapFilterNotIn); } + tFilter.setOptRemoteRf(optRemoteRf); return tFilter; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e60e4122b2..8068c1a998 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -96,7 +96,7 @@ import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterParams; -import org.apache.doris.thrift.TRuntimeFilterTargetParams; +import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TScanRangeParams; @@ -3122,12 +3122,24 @@ public class Coordinator { if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) { for (Map.Entry> entry : ridToTargetParam.entrySet()) { - List targetParams = Lists.newArrayList(); + Map targetParams = new HashMap<>(); for (FRuntimeFilterTargetParam targetParam : entry.getValue()) { - targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId, - targetParam.targetFragmentInstanceAddr)); + if (targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) { + targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids + .add(targetParam.targetFragmentInstanceId); + } else { + targetParams.put(targetParam.targetFragmentInstanceAddr, + new TRuntimeFilterTargetParamsV2()); + targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr + = targetParam.targetFragmentInstanceAddr; + targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids + = new ArrayList<>(); + targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids + .add(targetParam.targetFragmentInstanceId); + } } - params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(), targetParams); + params.params.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(), + new ArrayList(targetParams.values())); } for (Map.Entry entry : ridToBuilderNum.entrySet()) { params.params.runtime_filter_params.putToRuntimeFilterBuilderNum( @@ -3197,12 +3209,25 @@ public class Coordinator { if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) { for (Map.Entry> entry : ridToTargetParam.entrySet()) { - List targetParams = Lists.newArrayList(); + Map targetParams = new HashMap<>(); for (FRuntimeFilterTargetParam targetParam : entry.getValue()) { - targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId, - targetParam.targetFragmentInstanceAddr)); + if (targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) { + targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids + .add(targetParam.targetFragmentInstanceId); + } else { + targetParams.put(targetParam.targetFragmentInstanceAddr, + new TRuntimeFilterTargetParamsV2()); + targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr + = targetParam.targetFragmentInstanceAddr; + targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids + = new ArrayList<>(); + targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids + .add(targetParam.targetFragmentInstanceId); + } } - localParams.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(), targetParams); + + localParams.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(), + new ArrayList(targetParams.values())); } for (Map.Entry entry : ridToBuilderNum.entrySet()) { localParams.runtime_filter_params.putToRuntimeFilterBuilderNum( diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index d41a670052..f66090a77b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -481,6 +481,7 @@ message PMergeFilterRequest { optional PBloomFilter bloom_filter = 6; optional PInFilter in_filter = 7; optional bool is_pipeline = 8; + optional bool opt_remote_rf = 9; }; message PMergeFilterResponse { @@ -498,6 +499,18 @@ message PPublishFilterRequest { optional bool is_pipeline = 8; }; +message PPublishFilterRequestV2 { + required int32 filter_id = 1; + required PUniqueId query_id = 2; + repeated PUniqueId fragment_instance_ids = 3; + required PFilterType filter_type = 4; + optional PMinMaxFilter minmax_filter = 5; + optional PBloomFilter bloom_filter = 6; + optional PInFilter in_filter = 7; + optional bool is_pipeline = 8; + optional int64 merge_time = 9; +}; + message PPublishFilterResponse { required PStatus status = 1; }; @@ -639,6 +652,7 @@ service PBackendService { rpc rollback(PRollbackRequest) returns (PRollbackResult); rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse); rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse); + rpc apply_filterv2(PPublishFilterRequestV2) returns (PPublishFilterResponse); rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult); rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 111718eb1b..aa7ffc24ae 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -237,6 +237,12 @@ struct TRuntimeFilterTargetParams { 2: required Types.TNetworkAddress target_fragment_instance_addr } +struct TRuntimeFilterTargetParamsV2 { + 1: required list target_fragment_instance_ids + // The address of the instance where the fragment is expected to run + 2: required Types.TNetworkAddress target_fragment_instance_addr +} + struct TRuntimeFilterParams { // Runtime filter merge instance address 1: optional Types.TNetworkAddress runtime_filter_merge_addr @@ -250,6 +256,8 @@ struct TRuntimeFilterParams { // Number of Runtime filter producers 4: optional map runtime_filter_builder_num + + 5: optional map> rid_to_target_paramv2 } // Parameters for a single execution instance of a particular TPlanFragment diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0eaf1d61a0..3b332744a9 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1001,6 +1001,8 @@ struct TRuntimeFilterDesc { // for bitmap filter 11: optional bool bitmap_filter_not_in + + 12: optional bool opt_remote_rf; } struct TDataGenScanNode {