diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index c70d87f59e..97f7d9e573 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -125,8 +125,16 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const return _multi_cast_data_streamer->profile(); } +MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state, + OperatorXBase* parent) + : Base(state, parent), + vectorized::RuntimeFilterConsumer( + static_cast(parent)->dest_id_from_sink(), parent->runtime_filter_descs(), + static_cast(parent)->_row_desc(), _conjuncts) {}; + Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); + RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); @@ -134,6 +142,8 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); } + // init profile for runtime filter + RuntimeFilterConsumer::_init_profile(profile()); return Status::OK(); } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 6377b5ef16..943c62d077 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -94,15 +94,21 @@ private: class MultiCastDataStreamerSourceOperatorX; -class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState { +class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState, + public vectorized::RuntimeFilterConsumer { public: ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); using Base = PipelineXLocalState; using Parent = MultiCastDataStreamerSourceOperatorX; - MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) - : Base(state, parent) {}; - + MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + RETURN_IF_ERROR(_acquire_runtime_filter()); + return Status::OK(); + } + friend class MultiCastDataStreamerSourceOperatorX; private: @@ -163,6 +169,18 @@ public: bool is_source() const override { return true; } + const std::vector& runtime_filter_descs() override { + return _t_data_stream_sink.runtime_filters; + } + + int dest_id_from_sink() const { return _t_data_stream_sink.dest_node_id; } + + bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const override { + return state->get_local_state(id()) + ->template cast() + .runtime_filters_are_ready_or_timeout(); + } + private: friend class MultiCastDataStreamSourceLocalState; const int _consumer_id;