diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 34ca84b148..9ffcb40038 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -112,6 +112,11 @@ public: void close_a_pipeline(); + void set_merge_controller_handler( + std::shared_ptr& handler) { + _merge_controller_handler = handler; + } + virtual void add_merge_controller_handler( std::shared_ptr& handler) {} @@ -188,6 +193,10 @@ protected: std::shared_ptr _query_ctx; + // This shared ptr is never used. It is just a reference to hold the object. + // There is a weak ptr in runtime filter manager to reference this object. + std::shared_ptr _merge_controller_handler; + MonotonicStopWatch _fragment_watcher; RuntimeProfile::Counter* _start_timer = nullptr; RuntimeProfile::Counter* _prepare_timer = nullptr; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1d4802af2f..a8d29b9f9d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -719,6 +719,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, static_cast(_runtimefilter_controller.add_entity( params.params, params.params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(fragment_executor->runtime_state()))); + fragment_executor->set_merge_controller_handler(handler); { std::lock_guard lock(_lock); _fragment_instance_map.insert( @@ -806,6 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, static_cast(_runtimefilter_controller.add_entity( params.local_params[i], params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); + context->set_merge_controller_handler(handler); const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id; { std::lock_guard lock(_lock); @@ -885,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, static_cast(_runtimefilter_controller.add_entity( local_params, params.query_id, params.query_options, &handler, RuntimeFilterParamsContext::create(context->get_runtime_state()))); - + context->set_merge_controller_handler(handler); { std::lock_guard lock(_lock); _pipeline_map.insert(std::make_pair(fragment_instance_id, context)); diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 5529d1ba3b..41fa6c2f81 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -134,6 +134,11 @@ public: void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } + void set_merge_controller_handler( + std::shared_ptr& handler) { + _merge_controller_handler = handler; + } + std::shared_ptr get_query_ctx() { return _query_ctx; } TUniqueId fragment_instance_id() const { return _fragment_instance_id; } @@ -214,6 +219,9 @@ private: RuntimeProfile::Counter* _blocks_produced_counter = nullptr; RuntimeProfile::Counter* _fragment_cpu_timer = nullptr; + // This shared ptr is never used. It is just a reference to hold the object. + // There is a weak ptr in runtime filter manager to reference this object. + std::shared_ptr _merge_controller_handler; // If set the true, this plan fragment will be executed only after FE send execution start rpc. bool _need_wait_execution_trigger = false;