diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 5d49aaf408..c62ed04e58 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -227,9 +227,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _query_ctx->init_runtime_predicates({0}); } - _need_local_merge = - request.__isset.parallel_instances && - (request.__isset.per_node_shared_scans && !request.per_node_shared_scans.empty()); + _need_local_merge = request.__isset.parallel_instances; + // 2. Build pipelines with operators in this fragment. auto root_pipeline = add_pipeline(); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines( @@ -926,14 +925,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); std::stringstream error_msg; + switch (tnode.node_type) { case TPlanNodeType::OLAP_SCAN_NODE: { op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); - if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) { - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - } + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); } break; @@ -947,10 +945,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN "Jdbc scan node is disabled, you can change be config enable_java_support " "to true and restart be."); } - if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) { - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - } + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); } break; @@ -958,10 +954,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case doris::TPlanNodeType::FILE_SCAN_NODE: { op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); - if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) { - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - } + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); } break; @@ -970,10 +964,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::ES_HTTP_SCAN_NODE: { op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); - if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) { - if (request.__isset.parallel_instances) { - cur_pipe->set_num_tasks(request.parallel_instances); - } + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); op->set_ignore_data_distribution(); } break; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index a541b2d2ad..75d06adc56 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -531,9 +531,6 @@ Status RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilte bool need_local_merge, doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) { - // If runtime filter need to be local merged, `build_bf_exactly` will lead to bloom filters with - // different size need to be merged which is not allowed. - // So if `need_local_merge` is true, we will disable `build_bf_exactly`. if (desc.has_remote_targets || need_local_merge) { return global_runtime_filter_mgr()->register_local_merge_producer_filter( desc, query_options(), producer_filter, build_bf_exactly);