diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp index 66368fe3a1..8d4f71ec88 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.cpp +++ b/be/src/vec/exec/distinct_vaggregation_node.cpp @@ -17,8 +17,11 @@ #include "vec/exec/distinct_vaggregation_node.h" +#include + #include "runtime/runtime_state.h" #include "vec/aggregate_functions/aggregate_function_uniq.h" +#include "vec/columns/column.h" #include "vec/exec/vaggregation_node.h" namespace doris { @@ -64,9 +67,18 @@ Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key( SCOPED_TIMER(_insert_keys_to_column_timer); bool mem_reuse = _make_nullable_keys.empty() && out_block->mem_reuse(); if (mem_reuse) { + if (_stop_emplace_flag && !out_block->empty()) { + // when out_block row >= batch_size, push it to data_queue, so when _stop_emplace_flag = true, maybe have some data in block + // need output those data firstly + for (int i = 0; i < rows; ++i) { + _distinct_row.push_back(i); + } + } for (int i = 0; i < key_size; ++i) { auto output_column = out_block->get_by_position(i).column; - if (_stop_emplace_flag) { // swap the column directly, to solve Check failed: d.column->use_count() == 1 (2 vs. 1) + if (_stop_emplace_flag && _distinct_row.empty()) { + // means it's streaming and out_block have no data. + // swap the column directly, so not insert data again. and solve Check failed: d.column->use_count() == 1 (2 vs. 1) out_block->replace_by_position(i, key_columns[i]->assume_mutable()); in_block->replace_by_position(result_idxs[i], output_column); } else { diff --git a/regression-test/data/variant_github_events_p0_new/load.out b/regression-test/data/variant_github_events_p0_new/load.out index 13ce3dfca0..0aeaaeed02 100644 --- a/regression-test/data/variant_github_events_p0_new/load.out +++ b/regression-test/data/variant_github_events_p0_new/load.out @@ -5,3 +5,6 @@ \N 4748 +-- !sql_select_count -- +67843 + diff --git a/regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out b/regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out new file mode 100644 index 0000000000..20dee980ec --- /dev/null +++ b/regression-test/data/variant_github_events_p0_new/sql/test_distinct_streaming_agg.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !test_distinct_streaming_agg -- +31481 + diff --git a/regression-test/suites/variant_github_events_p0_new/load.groovy b/regression-test/suites/variant_github_events_p0_new/load.groovy index 777befbd16..0be0f205b6 100644 --- a/regression-test/suites/variant_github_events_p0_new/load.groovy +++ b/regression-test/suites/variant_github_events_p0_new/load.groovy @@ -80,4 +80,21 @@ suite("regression_test_variant_github_events_p0", "nonConcurrent"){ // TODO fix compaction issue, this case could be stable qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int) from github_events where cast(v["repo"]["name"] as string) = 'xpressengine/xe-core' order by 1;""" // TODO add test case that some certain columns are materialized in some file while others are not materilized(sparse) + + sql """DROP TABLE IF EXISTS github_events_2""" + sql """ + CREATE TABLE IF NOT EXISTS `github_events_2` ( + `k` BIGINT NULL, + `v` text NULL, + INDEX idx_var (`v`) USING INVERTED PROPERTIES("parser" = "english") COMMENT '' + ) ENGINE = OLAP DUPLICATE KEY(`k`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`k`) BUCKETS 4 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + insert into github_events_2 select 1, cast(v["repo"]["name"] as string) FROM github_events; + """ + + qt_sql_select_count """ select count(*) from github_events_2; """ } diff --git a/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql b/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql new file mode 100644 index 0000000000..40854a2d53 --- /dev/null +++ b/regression-test/suites/variant_github_events_p0_new/sql/test_distinct_streaming_agg.sql @@ -0,0 +1,5 @@ +SELECT + /*+SET_VAR(batch_size=50,experimental_enable_pipeline_x_engine=false,parallel_pipeline_task_num=1,disable_streaming_preaggregations=false) */ + count(distinct v) +FROM + github_events_2; \ No newline at end of file