[bug](node) add dependency for set operation node (#30203)

These sinks must be completed one by one in order, eg: child(1) must wait child(0) build finish
This commit is contained in:
zhangstar333
2024-01-29 21:21:02 +08:00
committed by yiguolei
parent f0a35f6e2d
commit f7e01ceffa
4 changed files with 90 additions and 5 deletions

View File

@ -693,7 +693,8 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
OperatorBuilderPtr sink_builder =
std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), node);
RETURN_IF_ERROR(build_pipeline->set_sink_builder(sink_builder));
std::vector<PipelinePtr> all_pipelines;
all_pipelines.emplace_back(build_pipeline);
for (int child_id = 1; child_id < node->children_count(); ++child_id) {
auto probe_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(child_id), probe_pipeline));
@ -701,6 +702,9 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), child_id,
node);
RETURN_IF_ERROR(probe_pipeline->set_sink_builder(probe_sink_builder));
//eg: These sinks must be completed one by one in order, child(1) must wait child(0) build finish
probe_pipeline->add_dependency(all_pipelines[child_id - 1]);
all_pipelines.emplace_back(probe_pipeline);
}
OperatorBuilderPtr source_builder =

View File

@ -352,10 +352,13 @@ Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* state, int chil
bool eos) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_probe_timer);
CHECK(_build_finished) << "cannot sink probe data before build finished";
if (child_id > 1) {
CHECK(_probe_finished_children_index[child_id - 1])
<< fmt::format("child with id: {} should be probed first", child_id);
if (!_build_finished) {
return Status::RuntimeError("cannot sink probe data before build finished " +
std::to_string(child_id));
}
if (child_id > 1 && !_probe_finished_children_index[child_id - 1]) {
return Status::RuntimeError("the child with id should be probed first " +
std::to_string(child_id - 1));
}
auto probe_rows = block->rows();
if (probe_rows > 0) {

View File

@ -596,3 +596,7 @@ hell0
2.0554876421875E8 3601
5.395085565625E7 3602
-- !intersect_case --
0
1

View File

@ -329,4 +329,78 @@ suite("test_nereids_set_operation") {
union
select avg(tap), potno from dwd_daytable where potno=3602 and ddate >= '2023-08-01' group by potno limit 10;
"""
sql "DROP TABLE IF EXISTS table_22_undef_partitions2_keys3_properties4_distributed_by54"
sql """
create table table_22_undef_partitions2_keys3_properties4_distributed_by54 (
`col_int_undef_signed_not_null` int not null ,
`col_date_undef_signed_not_null` date not null ,
`pk` int,
`col_int_undef_signed` int null ,
`col_date_undef_signed` date null ,
`col_varchar_10__undef_signed` varchar(10) null ,
`col_varchar_10__undef_signed_not_null` varchar(10) not null ,
`col_varchar_1024__undef_signed` varchar(1024) null ,
`col_varchar_1024__undef_signed_not_null` varchar(1024) not null
) engine=olap
DUPLICATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk)
PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21')))
distributed by hash(pk) buckets 10
properties("replication_num" = "1");
"""
sql """
insert into table_22_undef_partitions2_keys3_properties4_distributed_by54(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,0,2,"2023-12-10","2023-12-09","ok",'f','a','l'),(1,1,9,"2023-12-19","2023-12-10",'x',"it","is",'b'),(2,null,7,"2023-12-18","2023-12-13","on",'p',"why",'x'),(3,3,1,"2023-12-20","2023-12-12","are","his",'r',"really"),(4,7,5,"2023-12-15","2023-12-12",'e',"ok","yeah","some"),(5,8,1,"2023-12-09","2023-12-18","been","get",'i',"get"),(6,null,4,"2023-12-20","2023-12-17","as",'z',"in",'b'),(7,3,6,"2023-12-20","2023-12-10",'u',"he","like",'i'),(8,4,7,"2023-12-11","2023-12-09",'w','m',"you","in"),(9,0,1,"2023-12-12","2023-12-16",'i',"oh","because","in"),(10,null,8,"2023-12-16","2023-12-13","not",'e','n',"he"),(11,null,8,"2023-12-11","2023-12-12","that",'a',"how","all"),(12,null,4,"2023-12-15","2023-12-09","yeah","see",'n','g'),(13,null,6,"2023-12-16","2023-12-14","yeah",'f',"from",'c'),(14,8,0,"2023-12-15","2023-12-09","out","not","on",'i'),(15,2,6,"2023-12-19","2023-12-12",'d','d','c','p'),(16,null,2,"2023-12-12","2023-12-13",'p','d','s','n'),(17,6,8,"2023-12-14","2023-12-19",'l',"can't","not","could"),(18,null,0,"2023-12-18","2023-12-10",'z',"as",'j','j'),(19,null,5,"2023-12-20","2023-12-16","just",'c','h','d'),(20,9,7,"2023-12-10","2023-12-14",'l','q',"you're","why"),(21,5,9,"2023-12-14","2023-12-16","I'm",'x',"because",'i');
"""
sql "DROP TABLE IF EXISTS table_3_undef_partitions2_keys3_properties4_distributed_by54"
sql """
create table table_3_undef_partitions2_keys3_properties4_distributed_by54 (
`col_int_undef_signed_not_null` int not null ,
`col_date_undef_signed_not_null` date not null ,
`pk` int,
`col_int_undef_signed` int MIN null ,
`col_date_undef_signed` date REPLACE null ,
`col_varchar_10__undef_signed` varchar(10) REPLACE null ,
`col_varchar_10__undef_signed_not_null` varchar(10) MIN not null ,
`col_varchar_1024__undef_signed` varchar(1024) REPLACE null ,
`col_varchar_1024__undef_signed_not_null` varchar(1024) REPLACE not null
) engine=olap
AGGREGATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk)
PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21')))
distributed by hash(pk) buckets 10
properties("replication_num" = "1");
"""
sql """
insert into table_3_undef_partitions2_keys3_properties4_distributed_by54(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,null,5,"2023-12-13","2023-12-15",'c','c',"ok","had"),(1,null,7,"2023-12-12","2023-12-19","up",'e',"why",'c'),(2,3,2,"2023-12-15","2023-12-18","hey",'b',"as","she");
"""
sql "DROP TABLE IF EXISTS table_2_undef_partitions2_keys3_properties4_distributed_by55"
sql """
create table table_2_undef_partitions2_keys3_properties4_distributed_by55 (
`col_int_undef_signed_not_null` int not null ,
`col_date_undef_signed_not_null` date not null ,
`pk` int,
`col_int_undef_signed` int REPLACE null ,
`col_date_undef_signed` date MIN null ,
`col_varchar_10__undef_signed` varchar(10) MIN null ,
`col_varchar_10__undef_signed_not_null` varchar(10) MAX not null ,
`col_varchar_1024__undef_signed` varchar(1024) REPLACE null ,
`col_varchar_1024__undef_signed_not_null` varchar(1024) MIN not null
) engine=olap
AGGREGATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk)
PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21')))
distributed by hash(pk) buckets 10
properties("replication_num" = "1");
"""
sql """
insert into table_2_undef_partitions2_keys3_properties4_distributed_by55(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,null,2,"2023-12-20","2023-12-11",'m','g','t',"in"),(1,1,4,"2023-12-09","2023-12-19","had",'b',"was","didn't");
"""
qt_intersect_case """
SELECT subq1.`pk` AS pk1 FROM ( ( SELECT t1.`pk` FROM table_22_undef_partitions2_keys3_properties4_distributed_by54 AS t1 INNER JOIN table_3_undef_partitions2_keys3_properties4_distributed_by54 AS alias1 ON t1 . `pk` = alias1 . `pk` ) INTERSECT ( SELECT t1.`pk` FROM table_22_undef_partitions2_keys3_properties4_distributed_by54 AS t1 INNER JOIN table_2_undef_partitions2_keys3_properties4_distributed_by55 AS alias2 ON t1 . `pk` = alias2 . `pk` ) ) subq1 GROUP BY subq1.`pk` order by 1 LIMIT 66666666 ;
"""
}