[Bug](top-n) init query_ctx runtime predicate before operators prepare #31876

This commit is contained in:
Pxl
2024-03-06 19:09:10 +08:00
committed by GitHub
parent 6ea5218ee8
commit b4dbb087c0

View File

@ -236,6 +236,18 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
}
const auto& local_params = request.local_params[0];
if (local_params.__isset.runtime_filter_params) {
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
local_params.runtime_filter_params);
}
if (local_params.__isset.topn_filter_source_node_ids) {
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
} else {
_query_ctx->init_runtime_predicates({0});
}
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
pipeline->children().clear();
@ -523,17 +535,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
// build local_runtime_filter_mgr for each instance
runtime_filter_mgr =
std::make_unique<RuntimeFilterMgr>(request.query_id, filterparams.get());
if (i == 0) {
if (local_params.__isset.runtime_filter_params) {
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
local_params.runtime_filter_params);
}
if (local_params.__isset.topn_filter_source_node_ids) {
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
} else {
_query_ctx->init_runtime_predicates({0});
}
}
filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
_runtime_filter_states.push_back(std::move(filterparams));