diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index daae5feecf..1b717ec3c3 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -208,6 +208,17 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state->set_total_load_streams(request.total_load_streams); _runtime_state->set_num_local_sink(request.num_local_sink); + const auto& local_params = request.local_params[0]; + if (local_params.__isset.runtime_filter_params) { + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + local_params.runtime_filter_params); + } + if (local_params.__isset.topn_filter_source_node_ids) { + _query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids); + } else { + _query_ctx->init_runtime_predicates({0}); + } + _need_local_merge = request.__isset.parallel_instances && (request.__isset.per_node_shared_scans && !request.per_node_shared_scans.empty()); @@ -237,17 +248,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r request.shuffle_idx_to_instance_idx)); } - const auto& local_params = request.local_params[0]; - if (local_params.__isset.runtime_filter_params) { - _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( - local_params.runtime_filter_params); - } - if (local_params.__isset.topn_filter_source_node_ids) { - _query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids); - } else { - _query_ctx->init_runtime_predicates({0}); - } - // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { pipeline->children().clear();