From b4dbb087c0c0dd7e511d92d66efff2f64568bda3 Mon Sep 17 00:00:00 2001 From: Pxl Date: Wed, 6 Mar 2024 19:09:10 +0800 Subject: [PATCH] [Bug](top-n) init query_ctx runtime predicate before operators prepare #31876 --- .../pipeline_x_fragment_context.cpp | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 733cdfe2b5..daae5feecf 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -236,6 +236,18 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r request.bucket_seq_to_instance_idx, request.shuffle_idx_to_instance_idx)); } + + const auto& local_params = request.local_params[0]; + if (local_params.__isset.runtime_filter_params) { + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + local_params.runtime_filter_params); + } + if (local_params.__isset.topn_filter_source_node_ids) { + _query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids); + } else { + _query_ctx->init_runtime_predicates({0}); + } + // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { pipeline->children().clear(); @@ -523,17 +535,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( // build local_runtime_filter_mgr for each instance runtime_filter_mgr = std::make_unique(request.query_id, filterparams.get()); - if (i == 0) { - if (local_params.__isset.runtime_filter_params) { - _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( - local_params.runtime_filter_params); - } - if (local_params.__isset.topn_filter_source_node_ids) { - _query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids); - } else { - _query_ctx->init_runtime_predicates({0}); - } - } + filterparams->runtime_filter_mgr = runtime_filter_mgr.get(); _runtime_filter_states.push_back(std::move(filterparams));