[bug](distinct agg) fix distinct streaming agg not output all data (#32760)

fix distinct streaming agg not output all data
This commit is contained in:
zhangstar333
2024-03-26 17:34:19 +08:00
committed by yiguolei
parent ad2d20348a
commit 0a44de67bf
5 changed files with 42 additions and 1 deletions

View File

@ -17,8 +17,11 @@
#include "vec/exec/distinct_vaggregation_node.h"
#include <glog/logging.h>
#include "runtime/runtime_state.h"
#include "vec/aggregate_functions/aggregate_function_uniq.h"
#include "vec/columns/column.h"
#include "vec/exec/vaggregation_node.h"
namespace doris {
@ -64,9 +67,18 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
SCOPED_TIMER(_insert_keys_to_column_timer);
bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse();
if (mem_reuse) {
if (_stop_emplace_flag && !out_block->empty()) {
// when out_block row >= batch_size, push it to data_queue, so when _stop_emplace_flag = true, maybe have some data in block
// need output those data firstly
for (int i = 0; i < rows; ++i) {
_distinct_row.push_back(i);
}
}
for (int i = 0; i < key_size; ++i) {
auto output_column = out_block->get_by_position(i).column;
if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1)
if (_stop_emplace_flag && _distinct_row.empty()) {
// means it's streaming and out_block have no data.
// swap the column directly, so not insert data again. and solve Check failed: d.column->use_count() == 1 (2 vs. 1)
out_block->replace_by_position(i, key_columns[i]->assume_mutable());
in_block->replace_by_position(result_idxs[i], output_column);
} else {

View File

@ -5,3 +5,6 @@
\N
4748
-- !sql_select_count --
67843

View File

@ -0,0 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !test_distinct_streaming_agg --
31481

View File

@ -80,4 +80,21 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){
// TODO fix compaction issue, this case could be stable
qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int) from github_events where cast(v["repo"]["name"] as string) = 'xpressengine/xe-core' order by 1;"""
// TODO add test case that some certain columns are materialized in some file while others are not materilized(sparse)
sql """DROP TABLE IF EXISTS github_events_2"""
sql """
CREATE TABLE IF NOT EXISTS `github_events_2` (
`k` BIGINT NULL,
`v` text NULL,
INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT ''
) ENGINE = OLAP DUPLICATE KEY(`k`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`k`) BUCKETS 4 PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
sql """
insert into github_events_2 select 1, cast(v["repo"]["name"] as string) FROM github_events;
"""
qt_sql_select_count """ select count(*) from github_events_2; """
}

View File

@ -0,0 +1,5 @@
SELECT
/*+SET_VAR(batch_size=50,experimental_enable_pipeline_x_engine=false,parallel_pipeline_task_num=1,disable_streaming_preaggregations=false) */
count(distinct v)
FROM
github_events_2;