[fix](join) hash join should use children's output tuple ids not output tableref ids (#12261)

This commit is contained in:
starocean999
2022-09-06 09:53:45 +08:00
committed by GitHub
parent f2aa87d797
commit 86fa0e38e2
20 changed files with 195 additions and 58 deletions

View File

@ -523,16 +523,8 @@ public final class AggregateInfo extends AggregateInfoBase {
if (exprList.size() > 1) {
continue;
}
Expr expr = exprList.get(0);
if (!(expr instanceof SlotRef)) {
continue;
}
SlotRef slotRef = (SlotRef) expr;
Expr right = smap.get(slotRef);
if (right == null) {
continue;
}
slotDesc.setIsNullable(right.isNullable());
Expr srcExpr = exprList.get(0).substitute(smap);
slotDesc.setIsNullable(srcExpr.isNullable() || slotDesc.getIsNullable());
}
}

View File

@ -452,15 +452,7 @@ public class HashJoinNode extends PlanNode {
int leftNullableNumber = 0;
int rightNullableNumber = 0;
if (copyLeft) {
//cross join do not have OutputTblRefIds
List<TupleId> srcTupleIds = getChild(0) instanceof CrossJoinNode ? getChild(0).getOutputTupleIds()
: getChild(0).getOutputTblRefIds();
for (TupleDescriptor leftTupleDesc : analyzer.getDescTbl().getTupleDesc(srcTupleIds)) {
// if the child is cross join node, the only way to get the correct nullable info of its output slots
// is to check if the output tuple ids are outer joined or not.
// then pass this nullable info to hash join node will be correct.
boolean needSetToNullable =
getChild(0) instanceof CrossJoinNode && analyzer.isOuterJoined(leftTupleDesc.getId());
for (TupleDescriptor leftTupleDesc : analyzer.getDescTbl().getTupleDesc(getChild(0).getOutputTupleIds())) {
for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) {
if (!isMaterailizedByChild(leftSlotDesc, getChild(0).getOutputSmap())) {
continue;
@ -471,19 +463,13 @@ public class HashJoinNode extends PlanNode {
outputSlotDesc.setIsNullable(true);
leftNullableNumber++;
}
if (needSetToNullable) {
outputSlotDesc.setIsNullable(true);
}
srcTblRefToOutputTupleSmap.put(new SlotRef(leftSlotDesc), new SlotRef(outputSlotDesc));
}
}
}
if (copyRight) {
List<TupleId> srcTupleIds = getChild(1) instanceof CrossJoinNode ? getChild(1).getOutputTupleIds()
: getChild(1).getOutputTblRefIds();
for (TupleDescriptor rightTupleDesc : analyzer.getDescTbl().getTupleDesc(srcTupleIds)) {
boolean needSetToNullable =
getChild(1) instanceof CrossJoinNode && analyzer.isOuterJoined(rightTupleDesc.getId());
for (TupleDescriptor rightTupleDesc :
analyzer.getDescTbl().getTupleDesc(getChild(1).getOutputTupleIds())) {
for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) {
if (!isMaterailizedByChild(rightSlotDesc, getChild(1).getOutputSmap())) {
continue;
@ -494,9 +480,6 @@ public class HashJoinNode extends PlanNode {
outputSlotDesc.setIsNullable(true);
rightNullableNumber++;
}
if (needSetToNullable) {
outputSlotDesc.setIsNullable(true);
}
srcTblRefToOutputTupleSmap.put(new SlotRef(rightSlotDesc), new SlotRef(outputSlotDesc));
}
}

View File

@ -1645,15 +1645,15 @@ public class QueryPlanTest extends TestWithFeService {
sql = "SELECT a.k1, b.k2 FROM (SELECT k1 from baseall) a LEFT OUTER JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)";
explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
Assert.assertTrue(explainString.contains("<slot 5>\n" + " <slot 7>"));
Assert.assertTrue(explainString.contains("<slot 5>\n" + " 999"));
sql = "SELECT a.k1, b.k2 FROM (SELECT 1 as k1 from baseall) a RIGHT OUTER JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)";
explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
Assert.assertTrue(explainString.contains("<slot 5>\n" + " <slot 7>"));
Assert.assertTrue(explainString.contains("1\n" + " 999"));
sql = "SELECT a.k1, b.k2 FROM (SELECT 1 as k1 from baseall) a FULL JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)";
explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql);
Assert.assertTrue(explainString.contains("<slot 5>\n" + " <slot 7>"));
Assert.assertTrue(explainString.contains("1\n" + " 999"));
}
@Test

View File

@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_outer_join_with_grouping") {
sql """
drop table if exists table_a;
"""
sql """
drop table if exists table_b;
"""
sql """
CREATE TABLE `table_a` (
`id` bigint(20) NOT NULL COMMENT '',
`moid` int(11) REPLACE_IF_NOT_NULL NULL COMMENT '',
`sid` int(11) REPLACE_IF_NOT_NULL NULL COMMENT ''
) ENGINE=OLAP
AGGREGATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
"""
sql """
CREATE TABLE `table_b` (
`id` bigint(20) NOT NULL COMMENT '',
`name` varchar(192) NOT NULL COMMENT ''
) ENGINE=OLAP
AGGREGATE KEY(`id`, `name`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`, `name`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
"""
sql """
INSERT INTO table_a (id,moid,sid) VALUES
(2,8,9),
(3,5,7),
(6,3,6),
(8,3,6),
(1,6,1),
(4,3,7),
(7,3,2),
(9,3,9),
(5,6,4);
"""
sql """
INSERT INTO table_b (id,name) VALUES
(1,'e'),
(2,'b'),
(4,'b'),
(4,'c'),
(7,'d'),
(8,'c'),
(2,'a'),
(2,'c'),
(3,'e'),
(5,'e'),
(6,'a'),
(3,'d'),
(6,'b'),
(3,'a'),
(5,'b'),
(7,'a'),
(9,'c'),
(5,'c'),
(6,'e'),
(8,'e'),
(1,'d'),
(4,'a'),
(9,'e'),
(1,'b'),
(1,'c'),
(3,'b'),
(9,'b'),
(2,'d'),
(4,'d'),
(5,'a'),
(7,'b'),
(9,'a'),
(1,'a'),
(2,'e'),
(6,'d'),
(7,'c'),
(5,'d'),
(6,'c'),
(7,'e'),
(8,'a'),
(8,'b'),
(9,'d');
"""
sql """
with tmp_view AS
(SELECT moid,
`name`
FROM table_a m
LEFT JOIN table_b s
ON s.id = m.sid), t6 AS
(SELECT moid,
GROUP_CONCAT(distinct `name`) nlist
FROM tmp_view
GROUP BY moid)
SELECT nlist
FROM tmp_view
INNER JOIN t6
ON t6.moid=tmp_view.moid
ORDER BY nlist;
"""
sql """
with tmp_view AS
(SELECT moid,
`name`
FROM table_a m
LEFT JOIN table_b s
ON s.id = m.sid), t6 AS
(SELECT moid,
GROUP_CONCAT(distinct `name`) nlist
FROM tmp_view
GROUP BY moid)
SELECT nlist
FROM tmp_view
INNER JOIN t6
ON TRUE
ORDER BY nlist;
"""
sql """
drop table if exists table_a;
"""
sql """
drop table if exists table_b;
"""
}

View File

@ -61,9 +61,9 @@ suite("test_explain_tpch_sf_1_q10") {
check {
explainStr ->
explainStr.contains("VTOP-N\n" +
" | order by: <slot 24> <slot 23> sum(`l_extendedprice` * (1 - `l_discount`)) DESC") &&
" | order by: <slot 24> <slot 23> sum(<slot 53> * (1 - <slot 54>)) DESC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 23> sum(`l_extendedprice` * (1 - `l_discount`)))\n" +
" | output: sum(<slot 23> sum(<slot 53> * (1 - <slot 54>)))\n" +
" | group by: <slot 16> `c_custkey`, <slot 17> `c_name`, <slot 18> `c_acctbal`, <slot 19> `c_phone`, <slot 20> `n_name`, <slot 21> `c_address`, <slot 22> `c_comment`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -58,12 +58,12 @@ suite("test_explain_tpch_sf_1_q11") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 22> `\$a\$1`.`\$c\$2` DESC") &&
explainStr.contains("cross join:\n" +
" | predicates: <slot 9> sum(`ps_supplycost` * `ps_availqty`) > <slot 20> sum(`ps_supplycost` * `ps_availqty`) * 0.0001") &&
" | predicates: <slot 9> sum(<slot 31> * <slot 32>) > <slot 20> sum(<slot 43> * <slot 44>) * 0.0001") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 9> sum(`ps_supplycost` * `ps_availqty`))\n" +
" | output: sum(<slot 9> sum(<slot 31> * <slot 32>))\n" +
" | group by: <slot 8> `ps_partkey`") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 19> sum(`ps_supplycost` * `ps_availqty`))\n" +
" | output: sum(<slot 19> sum(<slot 43> * <slot 44>))\n" +
" | group by: ") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | output: sum(<slot 43> * <slot 44>)\n" +

View File

@ -59,7 +59,7 @@ suite("test_explain_tpch_sf_1_q12") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 10> <slot 7> `l_shipmode` ASC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 8> sum(CASE WHEN ((`o_orderpriority` = '1-URGENT' OR `o_orderpriority` = '2-HIGH') AND (`o_orderpriority` = '1-URGENT' OR `o_orderpriority` = '2-HIGH')) THEN 1 ELSE 0 END)), sum(<slot 9> sum(CASE WHEN `o_orderpriority` != '1-URGENT' AND `o_orderpriority` != '2-HIGH' THEN 1 ELSE 0 END))\n" +
" | output: sum(<slot 8> sum(CASE WHEN ((<slot 18> = '1-URGENT' OR <slot 18> = '2-HIGH') AND (<slot 18> = '1-URGENT' OR <slot 18> = '2-HIGH')) THEN 1 ELSE 0 END)), sum(<slot 9> sum(CASE WHEN <slot 18> != '1-URGENT' AND <slot 18> != '2-HIGH' THEN 1 ELSE 0 END))\n" +
" | group by: <slot 7> `l_shipmode`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -56,9 +56,9 @@ suite("test_explain_tpch_sf_1_q13") {
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +
" | output: count(*)\n" +
" | group by: <slot 5> count(`o_orderkey`)") &&
" | group by: <slot 5> count(<slot 15>)") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: count(<slot 5> count(`o_orderkey`))\n" +
" | output: count(<slot 5> count(<slot 15>))\n" +
" | group by: <slot 4> `c_custkey`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -41,7 +41,7 @@ suite("test_explain_tpch_sf_1_q14") {
check {
explainStr ->
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 6> sum(CASE WHEN `p_type` LIKE 'PROMO%' THEN `l_extendedprice` * (1 - `l_discount`) ELSE 0 END)), sum(<slot 7> sum(`l_extendedprice` * (1 - `l_discount`)))\n" +
" | output: sum(<slot 6> sum(CASE WHEN <slot 14> LIKE 'PROMO%' THEN <slot 10> * (1 - <slot 11>) ELSE 0 END)), sum(<slot 7> sum(<slot 10> * (1 - <slot 11>)))\n" +
" | group by: ") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | output: sum(CASE WHEN <slot 14> LIKE 'PROMO%' THEN <slot 10> * (1 - <slot 11>) ELSE 0 END), sum(<slot 10> * (1 - <slot 11>))\n" +

View File

@ -49,7 +49,7 @@ suite("test_explain_tpch_sf_1_q15") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 23> `s_suppkey` ASC") &&
explainStr.contains("join op: LEFT SEMI JOIN(BROADCAST)[The src data has been redistributed]\n" +
" | equal join conjunct: <slot 33> = <slot 17> max(`total_revenue`)") &&
" | equal join conjunct: <slot 33> = <slot 17> max(<slot 13> sum(`l_extendedprice` * (1 - `l_discount`)))") &&
explainStr.contains("vec output tuple id: 12") &&
explainStr.contains("output slot ids: 34 35 36 37 39 \n" +
" | hash output slot ids: 33 28 29 30 31 ") &&
@ -62,7 +62,7 @@ suite("test_explain_tpch_sf_1_q15") {
explainStr.contains("TABLE: supplier(supplier), PREAGGREGATION: ON\n" +
" runtime filters: RF000[in_or_bloom] -> `s_suppkey`") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: max(<slot 16> max(`total_revenue`))\n" +
" | output: max(<slot 16> max(<slot 13> sum(`l_extendedprice` * (1 - `l_discount`))))\n" +
" | group by: ") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | output: max(<slot 13> sum(`l_extendedprice` * (1 - `l_discount`)))\n" +

View File

@ -44,7 +44,7 @@ suite("test_explain_tpch_sf_1_q17") {
check {
explainStr ->
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 12> sum(`l_extendedprice`))\n" +
" | output: sum(<slot 12> sum(<slot 21>))\n" +
" | group by: ") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | output: sum(<slot 21>)\n" +

View File

@ -64,7 +64,7 @@ suite("test_explain_tpch_sf_1_q18") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 24> <slot 22> `o_totalprice` DESC, <slot 25> <slot 21> `o_orderdate` ASC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 23> sum(`l_quantity`))\n" +
" | output: sum(<slot 23> sum(<slot 53>))\n" +
" | group by: <slot 18> `c_name`, <slot 19> `c_custkey`, <slot 20> `o_orderkey`, <slot 21> `o_orderdate`, <slot 22> `o_totalprice`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -63,7 +63,7 @@ suite("test_explain_tpch_sf_1_q19") {
check {
explainStr ->
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 10> sum(`l_extendedprice` * (1 - `l_discount`)))\n" +
" | output: sum(<slot 10> sum(<slot 12> * (1 - <slot 13>)))\n" +
" | group by: ") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | output: sum(<slot 12> * (1 - <slot 13>))\n" +

View File

@ -72,9 +72,9 @@ suite("test_explain_tpch_sf_1_q2") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 32> `s_acctbal` DESC, <slot 33> `n_name` ASC, <slot 34> `s_name` ASC, <slot 35> `p_partkey` ASC") &&
explainStr.contains("join op: LEFT SEMI JOIN(BROADCAST)[The src data has been redistributed]\n" +
" | equal join conjunct: <slot 78> = <slot 10> min(`ps_supplycost`)\n" +
" | equal join conjunct: <slot 78> = <slot 10> min(<slot 109>)\n" +
" | equal join conjunct: <slot 81> = <slot 9> `ps_partkey`\n" +
" | runtime filters: RF000[in_or_bloom] <- <slot 10> min(`ps_supplycost`), RF001[in_or_bloom] <- <slot 9> `ps_partkey`") &&
" | runtime filters: RF000[in_or_bloom] <- <slot 10> min(<slot 109>), RF001[in_or_bloom] <- <slot 9> `ps_partkey`") &&
explainStr.contains("vec output tuple id: 19") &&
explainStr.contains("output slot ids: 121 122 125 126 127 128 129 132 \n" +
" | hash output slot ids: 81 82 85 86 87 88 89 92 ") &&
@ -105,7 +105,7 @@ suite("test_explain_tpch_sf_1_q2") {
explainStr.contains("TABLE: partsupp(partsupp), PREAGGREGATION: ON\n" +
" runtime filters: RF000[in_or_bloom] -> <slot 13>, RF004[in_or_bloom] -> <slot 24>, RF005[in_or_bloom] -> `ps_partkey`") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: min(<slot 10> min(`ps_supplycost`))\n" +
" | output: min(<slot 10> min(<slot 109>))\n" +
" | group by: <slot 9> `ps_partkey`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -65,7 +65,7 @@ suite("test_explain_tpch_sf_1_q22") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 32> <slot 29> `cntrycode` ASC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: count(<slot 30> count(*)), sum(<slot 31> sum(`c_acctbal`))\n" +
" | output: count(<slot 30> count(*)), sum(<slot 31> sum(<slot 37>))\n" +
" | group by: <slot 29> `cntrycode`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -52,9 +52,9 @@ suite("test_explain_tpch_sf_1_q3") {
check {
explainStr ->
explainStr.contains("VTOP-N\n" +
" | order by: <slot 14> <slot 13> sum(`l_extendedprice` * (1 - `l_discount`)) DESC, <slot 15> <slot 11> `o_orderdate` ASC") &&
" | order by: <slot 14> <slot 13> sum(<slot 27> * (1 - <slot 28>)) DESC, <slot 15> <slot 11> `o_orderdate` ASC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 13> sum(`l_extendedprice` * (1 - `l_discount`)))\n" +
" | output: sum(<slot 13> sum(<slot 27> * (1 - <slot 28>)))\n" +
" | group by: <slot 10> `l_orderkey`, <slot 11> `o_orderdate`, <slot 12> `o_shippriority`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -53,9 +53,9 @@ suite("test_explain_tpch_sf_1_q5") {
check {
explainStr ->
explainStr.contains("VTOP-N\n" +
" | order by: <slot 18> <slot 17> sum(`l_extendedprice` * (1 - `l_discount`)) DESC") &&
" | order by: <slot 18> <slot 17> sum(<slot 61> * (1 - <slot 62>)) DESC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 17> sum(`l_extendedprice` * (1 - `l_discount`)))\n" +
" | output: sum(<slot 17> sum(<slot 61> * (1 - <slot 62>)))\n" +
" | group by: <slot 16> `n_name`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -69,7 +69,7 @@ suite("test_explain_tpch_sf_1_q7") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 23> <slot 19> `supp_nation` ASC, <slot 24> <slot 20> `cust_nation` ASC, <slot 25> <slot 21> `l_year` ASC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 22> sum(`volume`))\n" +
" | output: sum(<slot 22> sum(<slot 68> * (1 - <slot 69>)))\n" +
" | group by: <slot 19> `supp_nation`, <slot 20> `cust_nation`, <slot 21> `l_year`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -68,7 +68,7 @@ suite("test_explain_tpch_sf_1_q8") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 26> <slot 23> `o_year` ASC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 24> sum(CASE WHEN `nation` = 'BRAZIL' THEN `volume` ELSE 0 END)), sum(<slot 25> sum(`volume`))\n" +
" | output: sum(<slot 24> sum(CASE WHEN <slot 117> = 'BRAZIL' THEN <slot 105> * (1 - <slot 106>) ELSE 0 END)), sum(<slot 25> sum(<slot 105> * (1 - <slot 106>)))\n" +
" | group by: <slot 23> `o_year`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +

View File

@ -62,7 +62,7 @@ suite("test_explain_tpch_sf_1_q9") {
explainStr.contains("VTOP-N\n" +
" | order by: <slot 23> <slot 20> `nation` ASC, <slot 24> <slot 21> `o_year` DESC") &&
explainStr.contains("VAGGREGATE (merge finalize)\n" +
" | output: sum(<slot 22> sum(`amount`))\n" +
" | output: sum(<slot 22> sum(<slot 73> * (1 - <slot 74>) - <slot 81> * <slot 75>))\n" +
" | group by: <slot 20> `nation`, <slot 21> `o_year`") &&
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +