[Bug](top-n) init query_ctx runtime predicate before _build_pipelines #31895
This commit is contained in:
@ -208,6 +208,17 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
|
||||
_runtime_state->set_total_load_streams(request.total_load_streams);
|
||||
_runtime_state->set_num_local_sink(request.num_local_sink);
|
||||
|
||||
const auto& local_params = request.local_params[0];
|
||||
if (local_params.__isset.runtime_filter_params) {
|
||||
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
|
||||
local_params.runtime_filter_params);
|
||||
}
|
||||
if (local_params.__isset.topn_filter_source_node_ids) {
|
||||
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
|
||||
} else {
|
||||
_query_ctx->init_runtime_predicates({0});
|
||||
}
|
||||
|
||||
_need_local_merge =
|
||||
request.__isset.parallel_instances &&
|
||||
(request.__isset.per_node_shared_scans && !request.per_node_shared_scans.empty());
|
||||
@ -237,17 +248,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
|
||||
request.shuffle_idx_to_instance_idx));
|
||||
}
|
||||
|
||||
const auto& local_params = request.local_params[0];
|
||||
if (local_params.__isset.runtime_filter_params) {
|
||||
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
|
||||
local_params.runtime_filter_params);
|
||||
}
|
||||
if (local_params.__isset.topn_filter_source_node_ids) {
|
||||
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
|
||||
} else {
|
||||
_query_ctx->init_runtime_predicates({0});
|
||||
}
|
||||
|
||||
// 4. Initialize global states in pipelines.
|
||||
for (PipelinePtr& pipeline : _pipelines) {
|
||||
pipeline->children().clear();
|
||||
|
||||
Reference in New Issue
Block a user