[fix] fix wrong result of spill agg with limit (#35403)

This commit is contained in:
TengJianPing
2024-05-27 17:07:34 +08:00
committed by yiguolei
parent 7058b31edd
commit d8eefd0be8
5 changed files with 50 additions and 10 deletions

View File

@ -142,7 +142,8 @@ Status AggSinkLocalState::open(RuntimeState* state) {
_should_limit_output = p._limit != -1 && // has limit
(!p._have_conjuncts) && // no having conjunct
p._needs_finalize; // agg's finalize step
p._needs_finalize && // agg's finalize step
!Base::_shared_state->enable_spill;
}
// move _create_agg_status to open not in during prepare,
// because during prepare and open thread is not the same one,
@ -459,7 +460,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
_places.data(), _agg_arena_pool));
}
if (_should_limit_output && !Base::_shared_state->enable_spill) {
if (_should_limit_output) {
_reach_limit = _get_hash_table_size() >=
Base::_parent->template cast<AggSinkOperatorX>()._limit;
if (_reach_limit &&

View File

@ -525,11 +525,6 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
_shared_state->agg_arena_pool.get(), rows);
}
}
if (_should_limit_output) {
_reach_limit = _get_hash_table_size() >=
Base::_parent->template cast<AggSourceOperatorX>()._limit;
}
}
return Status::OK();

View File

@ -114,9 +114,6 @@ protected:
};
executor _executor;
bool _should_limit_output = false;
bool _reach_limit = false;
};
class AggSourceOperatorX : public OperatorX<AggLocalState> {

View File

@ -0,0 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !aggregate_spill --
5 1

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("aggregate_spill") {
sql """
set enable_agg_spill = true;
"""
sql """
set enable_force_spill = true;
"""
sql """
set min_revocable_mem = 1;
"""
sql """
set parallel_pipeline_task_num = 4;
"""
sql """
drop table if exists aggregate_spill_test;
"""
sql """
CREATE TABLE `aggregate_spill_test` (k1 int, k2 int replace) distributed by hash(k1) properties("replication_num"="1");
"""
sql """
insert into aggregate_spill_test values(1, 1), (2, 1), (3, 1), (4, 1), (5, 1);
"""
qt_aggregate_spill """
select count(), k2 from aggregate_spill_test group by k2 limit 1;
"""
}