diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5e8af4940b..64572f0369 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -693,7 +693,8 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode OperatorBuilderPtr sink_builder = std::make_shared>(node->id(), node); RETURN_IF_ERROR(build_pipeline->set_sink_builder(sink_builder)); - + std::vector all_pipelines; + all_pipelines.emplace_back(build_pipeline); for (int child_id = 1; child_id < node->children_count(); ++child_id) { auto probe_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(child_id), probe_pipeline)); @@ -701,6 +702,9 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode std::make_shared>(node->id(), child_id, node); RETURN_IF_ERROR(probe_pipeline->set_sink_builder(probe_sink_builder)); + //eg: These sinks must be completed one by one in order, child(1) must wait child(0) build finish + probe_pipeline->add_dependency(all_pipelines[child_id - 1]); + all_pipelines.emplace_back(probe_pipeline); } OperatorBuilderPtr source_builder = diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 75317b4c93..16c267b26d 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -352,10 +352,13 @@ Status VSetOperationNode::sink_probe(RuntimeState* state, int chil bool eos) { SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_probe_timer); - CHECK(_build_finished) << "cannot sink probe data before build finished"; - if (child_id > 1) { - CHECK(_probe_finished_children_index[child_id - 1]) - << fmt::format("child with id: {} should be probed first", child_id); + if (!_build_finished) { + return Status::RuntimeError("cannot sink probe data before build finished " + + std::to_string(child_id)); + } + if (child_id > 1 && !_probe_finished_children_index[child_id - 1]) { + return Status::RuntimeError("the child with id should be probed first " + + std::to_string(child_id - 1)); } auto probe_rows = block->rows(); if (probe_rows > 0) { diff --git a/regression-test/data/nereids_syntax_p0/set_operation.out b/regression-test/data/nereids_syntax_p0/set_operation.out index 28f286ea3d..0ce9a4b4c9 100644 --- a/regression-test/data/nereids_syntax_p0/set_operation.out +++ b/regression-test/data/nereids_syntax_p0/set_operation.out @@ -596,3 +596,7 @@ hell0 2.0554876421875E8 3601 5.395085565625E7 3602 +-- !intersect_case -- +0 +1 + diff --git a/regression-test/suites/nereids_syntax_p0/set_operation.groovy b/regression-test/suites/nereids_syntax_p0/set_operation.groovy index b23811727b..6d7f4e18ff 100644 --- a/regression-test/suites/nereids_syntax_p0/set_operation.groovy +++ b/regression-test/suites/nereids_syntax_p0/set_operation.groovy @@ -329,4 +329,78 @@ suite("test_nereids_set_operation") { union select avg(tap), potno from dwd_daytable where potno=3602 and ddate >= '2023-08-01' group by potno limit 10; """ + + sql "DROP TABLE IF EXISTS table_22_undef_partitions2_keys3_properties4_distributed_by54" + sql """ + create table table_22_undef_partitions2_keys3_properties4_distributed_by54 ( + `col_int_undef_signed_not_null` int not null , + `col_date_undef_signed_not_null` date not null , + `pk` int, + `col_int_undef_signed` int null , + `col_date_undef_signed` date null , + `col_varchar_10__undef_signed` varchar(10) null , + `col_varchar_10__undef_signed_not_null` varchar(10) not null , + `col_varchar_1024__undef_signed` varchar(1024) null , + `col_varchar_1024__undef_signed_not_null` varchar(1024) not null + ) engine=olap + DUPLICATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk) + PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21'))) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + insert into table_22_undef_partitions2_keys3_properties4_distributed_by54(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,0,2,"2023-12-10","2023-12-09","ok",'f','a','l'),(1,1,9,"2023-12-19","2023-12-10",'x',"it","is",'b'),(2,null,7,"2023-12-18","2023-12-13","on",'p',"why",'x'),(3,3,1,"2023-12-20","2023-12-12","are","his",'r',"really"),(4,7,5,"2023-12-15","2023-12-12",'e',"ok","yeah","some"),(5,8,1,"2023-12-09","2023-12-18","been","get",'i',"get"),(6,null,4,"2023-12-20","2023-12-17","as",'z',"in",'b'),(7,3,6,"2023-12-20","2023-12-10",'u',"he","like",'i'),(8,4,7,"2023-12-11","2023-12-09",'w','m',"you","in"),(9,0,1,"2023-12-12","2023-12-16",'i',"oh","because","in"),(10,null,8,"2023-12-16","2023-12-13","not",'e','n',"he"),(11,null,8,"2023-12-11","2023-12-12","that",'a',"how","all"),(12,null,4,"2023-12-15","2023-12-09","yeah","see",'n','g'),(13,null,6,"2023-12-16","2023-12-14","yeah",'f',"from",'c'),(14,8,0,"2023-12-15","2023-12-09","out","not","on",'i'),(15,2,6,"2023-12-19","2023-12-12",'d','d','c','p'),(16,null,2,"2023-12-12","2023-12-13",'p','d','s','n'),(17,6,8,"2023-12-14","2023-12-19",'l',"can't","not","could"),(18,null,0,"2023-12-18","2023-12-10",'z',"as",'j','j'),(19,null,5,"2023-12-20","2023-12-16","just",'c','h','d'),(20,9,7,"2023-12-10","2023-12-14",'l','q',"you're","why"),(21,5,9,"2023-12-14","2023-12-16","I'm",'x',"because",'i'); + """ + + sql "DROP TABLE IF EXISTS table_3_undef_partitions2_keys3_properties4_distributed_by54" + sql """ + create table table_3_undef_partitions2_keys3_properties4_distributed_by54 ( + `col_int_undef_signed_not_null` int not null , + `col_date_undef_signed_not_null` date not null , + `pk` int, + `col_int_undef_signed` int MIN null , + `col_date_undef_signed` date REPLACE null , + `col_varchar_10__undef_signed` varchar(10) REPLACE null , + `col_varchar_10__undef_signed_not_null` varchar(10) MIN not null , + `col_varchar_1024__undef_signed` varchar(1024) REPLACE null , + `col_varchar_1024__undef_signed_not_null` varchar(1024) REPLACE not null + ) engine=olap + AGGREGATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk) + PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21'))) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + insert into table_3_undef_partitions2_keys3_properties4_distributed_by54(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,null,5,"2023-12-13","2023-12-15",'c','c',"ok","had"),(1,null,7,"2023-12-12","2023-12-19","up",'e',"why",'c'),(2,3,2,"2023-12-15","2023-12-18","hey",'b',"as","she"); + """ + + + sql "DROP TABLE IF EXISTS table_2_undef_partitions2_keys3_properties4_distributed_by55" + sql """ + create table table_2_undef_partitions2_keys3_properties4_distributed_by55 ( + `col_int_undef_signed_not_null` int not null , + `col_date_undef_signed_not_null` date not null , + `pk` int, + `col_int_undef_signed` int REPLACE null , + `col_date_undef_signed` date MIN null , + `col_varchar_10__undef_signed` varchar(10) MIN null , + `col_varchar_10__undef_signed_not_null` varchar(10) MAX not null , + `col_varchar_1024__undef_signed` varchar(1024) REPLACE null , + `col_varchar_1024__undef_signed_not_null` varchar(1024) MIN not null + ) engine=olap + AGGREGATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk) + PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21'))) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + insert into table_2_undef_partitions2_keys3_properties4_distributed_by55(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,null,2,"2023-12-20","2023-12-11",'m','g','t',"in"),(1,1,4,"2023-12-09","2023-12-19","had",'b',"was","didn't"); + """ + + qt_intersect_case """ + SELECT subq1.`pk` AS pk1 FROM ( ( SELECT t1.`pk` FROM table_22_undef_partitions2_keys3_properties4_distributed_by54 AS t1 INNER JOIN table_3_undef_partitions2_keys3_properties4_distributed_by54 AS alias1 ON t1 . `pk` = alias1 . `pk` ) INTERSECT ( SELECT t1.`pk` FROM table_22_undef_partitions2_keys3_properties4_distributed_by54 AS t1 INNER JOIN table_2_undef_partitions2_keys3_properties4_distributed_by55 AS alias2 ON t1 . `pk` = alias2 . `pk` ) ) subq1 GROUP BY subq1.`pk` order by 1 LIMIT 66666666 ; + """ }