[Chore](runtime-filter) adjust need_local_merge setting conditions (#33886)
This commit is contained in:
@ -227,9 +227,8 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
|
||||
_query_ctx->init_runtime_predicates({0});
|
||||
}
|
||||
|
||||
_need_local_merge =
|
||||
request.__isset.parallel_instances &&
|
||||
(request.__isset.per_node_shared_scans && !request.per_node_shared_scans.empty());
|
||||
_need_local_merge = request.__isset.parallel_instances;
|
||||
|
||||
// 2. Build pipelines with operators in this fragment.
|
||||
auto root_pipeline = add_pipeline();
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
|
||||
@ -926,14 +925,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
|
||||
// Therefore, here we need to use a stack-like structure.
|
||||
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
|
||||
std::stringstream error_msg;
|
||||
|
||||
switch (tnode.node_type) {
|
||||
case TPlanNodeType::OLAP_SCAN_NODE: {
|
||||
op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(op));
|
||||
if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) {
|
||||
if (request.__isset.parallel_instances) {
|
||||
cur_pipe->set_num_tasks(request.parallel_instances);
|
||||
}
|
||||
if (request.__isset.parallel_instances) {
|
||||
cur_pipe->set_num_tasks(request.parallel_instances);
|
||||
op->set_ignore_data_distribution();
|
||||
}
|
||||
break;
|
||||
@ -947,10 +945,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
|
||||
"Jdbc scan node is disabled, you can change be config enable_java_support "
|
||||
"to true and restart be.");
|
||||
}
|
||||
if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) {
|
||||
if (request.__isset.parallel_instances) {
|
||||
cur_pipe->set_num_tasks(request.parallel_instances);
|
||||
}
|
||||
if (request.__isset.parallel_instances) {
|
||||
cur_pipe->set_num_tasks(request.parallel_instances);
|
||||
op->set_ignore_data_distribution();
|
||||
}
|
||||
break;
|
||||
@ -958,10 +954,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
|
||||
case doris::TPlanNodeType::FILE_SCAN_NODE: {
|
||||
op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(op));
|
||||
if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) {
|
||||
if (request.__isset.parallel_instances) {
|
||||
cur_pipe->set_num_tasks(request.parallel_instances);
|
||||
}
|
||||
if (request.__isset.parallel_instances) {
|
||||
cur_pipe->set_num_tasks(request.parallel_instances);
|
||||
op->set_ignore_data_distribution();
|
||||
}
|
||||
break;
|
||||
@ -970,10 +964,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
|
||||
case TPlanNodeType::ES_HTTP_SCAN_NODE: {
|
||||
op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(op));
|
||||
if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) {
|
||||
if (request.__isset.parallel_instances) {
|
||||
cur_pipe->set_num_tasks(request.parallel_instances);
|
||||
}
|
||||
if (request.__isset.parallel_instances) {
|
||||
cur_pipe->set_num_tasks(request.parallel_instances);
|
||||
op->set_ignore_data_distribution();
|
||||
}
|
||||
break;
|
||||
|
||||
@ -531,9 +531,6 @@ Status RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilte
|
||||
bool need_local_merge,
|
||||
doris::IRuntimeFilter** producer_filter,
|
||||
bool build_bf_exactly) {
|
||||
// If runtime filter need to be local merged, `build_bf_exactly` will lead to bloom filters with
|
||||
// different size need to be merged which is not allowed.
|
||||
// So if `need_local_merge` is true, we will disable `build_bf_exactly`.
|
||||
if (desc.has_remote_targets || need_local_merge) {
|
||||
return global_runtime_filter_mgr()->register_local_merge_producer_filter(
|
||||
desc, query_options(), producer_filter, build_bf_exactly);
|
||||
|
||||
Reference in New Issue
Block a user