From 28acfaed2b8c12241750702b7eb7dd86ef482df5 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Mon, 8 Apr 2024 11:12:26 +0800 Subject: [PATCH] [fix](pipeline)group by and output is empty (#33192) --- be/src/pipeline/pipeline_fragment_context.cpp | 4 ++-- be/src/vec/exec/vaggregation_node.cpp | 4 ++-- be/src/vec/exec/vaggregation_node.h | 2 ++ 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index a32d777788..e53492c9fa 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -556,10 +556,10 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur break; } case TPlanNodeType::AGGREGATION_NODE: { - auto* agg_node = dynamic_cast(node); + auto* agg_node = static_cast(node); auto new_pipe = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe)); - if (agg_node->is_probe_expr_ctxs_empty() && node->row_desc().num_slots() == 0) { + if (agg_node->is_probe_expr_ctxs_empty() && agg_node->agg_output_desc()->slots().empty()) { return Status::InternalError("Illegal aggregate node " + std::to_string(agg_node->id()) + ": group by and output is empty"); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index d46aa3f573..67072e2f60 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -119,6 +119,8 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, _is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase; _agg_data = std::make_unique(); _agg_arena_pool = std::make_unique(); + _intermediate_tuple_desc = descs.get_tuple_descriptor(_intermediate_tuple_id); + _output_tuple_desc = descs.get_tuple_descriptor(_output_tuple_id); } AggregationNode::~AggregationNode() = default; @@ -250,8 +252,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { _hash_table_input_counter = ADD_COUNTER(runtime_profile(), "HashTableInputCount", TUnit::UNIT); _max_row_size_counter = ADD_COUNTER(runtime_profile(), "MaxRowSizeInBytes", TUnit::UNIT); COUNTER_SET(_max_row_size_counter, (int64_t)0); - _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); - _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); RETURN_IF_ERROR(VExpr::prepare(_probe_expr_ctxs, state, child(0)->row_desc())); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index f89bbb9d78..93723e54b8 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -424,6 +424,8 @@ public: /// the preagg should pass through any rows it can't fit in its tables. bool _should_expand_preagg_hash_tables(); + TupleDescriptor* agg_output_desc() { return _output_tuple_desc; } + protected: bool _is_streaming_preagg; bool _child_eos = false;