diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 0be3d2d80b..6bce7287e1 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -116,7 +116,7 @@ std::string AndDependency::debug_string(int indentation_level) { bool RuntimeFilterTimer::has_ready() { std::unique_lock lc(_lock); - return _runtime_filter->is_ready(); + return _is_ready; } void RuntimeFilterTimer::call_timeout() { @@ -139,6 +139,7 @@ void RuntimeFilterTimer::call_ready() { if (_parent) { _parent->sub_filters(); } + _is_ready = true; } void RuntimeFilterTimer::call_has_ready() { @@ -160,7 +161,7 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) { int32 wait_time_ms = runtime_filter->wait_time_ms(); auto filter_timer = std::make_shared( registration_time, wait_time_ms, - std::dynamic_pointer_cast(shared_from_this()), runtime_filter); + std::dynamic_pointer_cast(shared_from_this())); runtime_filter->set_filter_timer(filter_timer); ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer); } diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index 0f4f3727e6..ecf0c8188f 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -161,12 +161,10 @@ class RuntimeFilterDependency; class RuntimeFilterTimer { public: RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, - std::shared_ptr parent, - IRuntimeFilter* runtime_filter) + std::shared_ptr parent) : _parent(std::move(parent)), _registration_time(registration_time), - _wait_time_ms(wait_time_ms), - _runtime_filter(runtime_filter) {} + _wait_time_ms(wait_time_ms) {} void call_ready(); @@ -188,7 +186,7 @@ private: std::mutex _lock; const int64_t _registration_time; const int32_t _wait_time_ms; - IRuntimeFilter* _runtime_filter = nullptr; + bool _is_ready = false; }; struct RuntimeFilterTimerQueue {