[fix](pipeline)group by and output is empty (#33192)
This commit is contained in:
@ -556,10 +556,10 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
|
||||
break;
|
||||
}
|
||||
case TPlanNodeType::AGGREGATION_NODE: {
|
||||
auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
|
||||
auto* agg_node = static_cast<vectorized::AggregationNode*>(node);
|
||||
auto new_pipe = add_pipeline();
|
||||
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
|
||||
if (agg_node->is_probe_expr_ctxs_empty() && node->row_desc().num_slots() == 0) {
|
||||
if (agg_node->is_probe_expr_ctxs_empty() && agg_node->agg_output_desc()->slots().empty()) {
|
||||
return Status::InternalError("Illegal aggregate node " +
|
||||
std::to_string(agg_node->id()) +
|
||||
": group by and output is empty");
|
||||
|
||||
@ -119,6 +119,8 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
|
||||
_is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase;
|
||||
_agg_data = std::make_unique<AggregatedDataVariants>();
|
||||
_agg_arena_pool = std::make_unique<Arena>();
|
||||
_intermediate_tuple_desc = descs.get_tuple_descriptor(_intermediate_tuple_id);
|
||||
_output_tuple_desc = descs.get_tuple_descriptor(_output_tuple_id);
|
||||
}
|
||||
|
||||
AggregationNode::~AggregationNode() = default;
|
||||
@ -250,8 +252,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
|
||||
_hash_table_input_counter = ADD_COUNTER(runtime_profile(), "HashTableInputCount", TUnit::UNIT);
|
||||
_max_row_size_counter = ADD_COUNTER(runtime_profile(), "MaxRowSizeInBytes", TUnit::UNIT);
|
||||
COUNTER_SET(_max_row_size_counter, (int64_t)0);
|
||||
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
|
||||
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
|
||||
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
|
||||
RETURN_IF_ERROR(VExpr::prepare(_probe_expr_ctxs, state, child(0)->row_desc()));
|
||||
|
||||
|
||||
@ -424,6 +424,8 @@ public:
|
||||
/// the preagg should pass through any rows it can't fit in its tables.
|
||||
bool _should_expand_preagg_hash_tables();
|
||||
|
||||
TupleDescriptor* agg_output_desc() { return _output_tuple_desc; }
|
||||
|
||||
protected:
|
||||
bool _is_streaming_preagg;
|
||||
bool _child_eos = false;
|
||||
|
||||
Reference in New Issue
Block a user