From a4f29193f6432c2e9bf6e14b17575ce7ca0067d7 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 11 Jan 2024 19:53:09 +0800 Subject: [PATCH] [pipelineX](fix) Fix incorrect runtime filter (#29860) --- be/src/exprs/runtime_filter.cpp | 2 +- be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 +-- .../src/main/java/org/apache/doris/planner/ScanNode.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index bf09adc53f..3215b842af 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -994,7 +994,7 @@ Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int* Status IRuntimeFilter::publish(bool publish_local) { DCHECK(is_producer()); - if (_is_global) { + if (_is_global && _has_local_target) { std::vector filters; RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filters( _filter_id, filters)); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index c9e17727c7..164b00a8d2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -130,8 +130,7 @@ public: int task_id() const { return _index; }; void clear_blocking_state() { - if (!is_final_state(get_state()) && get_state() != PipelineTaskState::PENDING_FINISH && - _blocked_dep) { + if (!_finished && get_state() != PipelineTaskState::PENDING_FINISH && _blocked_dep) { _blocked_dep->set_ready(); _blocked_dep = nullptr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 885bb335e4..7ddf2ae1d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -721,7 +721,7 @@ public abstract class ScanNode extends PlanNode { } public boolean ignoreStorageDataDistribution(ConnectContext context) { - return !isKeySearch() && context != null + return context != null && context.getSessionVariable().isIgnoreStorageDataDistribution() && context.getSessionVariable().getEnablePipelineXEngine() && !fragment.isHasNullAwareLeftAntiJoin()