[fix](Nereids) ban right outer, right anti, full outer with bucket shuffle (#26529)

if left bucket has no data, we do not generate left bucket instance.
These join should reserve all right side data. But because left instance
is not exists. So right data will be discard since no dest be set.

We ban these join temporarily until we could generate all instance
for left side in Coordinator.
This commit is contained in:
morrySnow
2023-11-08 15:16:50 +08:00
committed by GitHub
parent 5d4557938a
commit f80495da83
5 changed files with 56 additions and 10 deletions

View File

@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
@ -178,6 +179,12 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
return true;
}
private boolean couldNotRightBucketShuffleJoin(JoinType joinType) {
return joinType == JoinType.RIGHT_ANTI_JOIN
|| joinType == JoinType.RIGHT_OUTER_JOIN
|| joinType == JoinType.FULL_OUTER_JOIN;
}
@Override
public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
Void context) {
@ -207,12 +214,22 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
Optional<PhysicalProperties> updatedForLeft = Optional.empty();
Optional<PhysicalProperties> updatedForRight = Optional.empty();
if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
// check colocate join with scan
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec)) {
return true;
}
return true;
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType())) {
// right anti, right outer, full outer join could not do bucket shuffle join
// TODO remove this after we refactor coordinator
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),

View File

@ -17,8 +17,9 @@ PhysicalResultSink
------------------------PhysicalProject
--------------------------hashJoin[INNER_JOIN] hashCondition=((d1.d_week_seq = d2.d_week_seq))otherCondition=()
----------------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((catalog_returns.cr_item_sk = catalog_sales.cs_item_sk) and (catalog_returns.cr_order_number = catalog_sales.cs_order_number))otherCondition=()
------------------------------PhysicalProject
--------------------------------PhysicalOlapScan[catalog_returns]
------------------------------PhysicalDistribute
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[catalog_returns]
------------------------------PhysicalDistribute
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN] hashCondition=((item.i_item_sk = catalog_sales.cs_item_sk))otherCondition=()

View File

@ -18,8 +18,9 @@ PhysicalResultSink
--------------------------hashJoin[INNER_JOIN] hashCondition=((d1.d_week_seq = d2.d_week_seq))otherCondition=()
----------------------------PhysicalDistribute
------------------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((catalog_returns.cr_item_sk = catalog_sales.cs_item_sk) and (catalog_returns.cr_order_number = catalog_sales.cs_order_number))otherCondition=()
--------------------------------PhysicalProject
----------------------------------PhysicalOlapScan[catalog_returns]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------PhysicalOlapScan[catalog_returns]
--------------------------------PhysicalDistribute
----------------------------------PhysicalProject
------------------------------------hashJoin[LEFT_OUTER_JOIN] hashCondition=((catalog_sales.cs_promo_sk = promotion.p_promo_sk))otherCondition=()

View File

@ -62,7 +62,8 @@ PhysicalResultSink
--------------------------hashAgg[LOCAL]
----------------------------PhysicalProject
------------------------------hashJoin[RIGHT_OUTER_JOIN] hashCondition=((web_sales.ws_item_sk = web_returns.wr_item_sk) and (web_sales.ws_order_number = web_returns.wr_order_number))otherCondition=()
--------------------------------PhysicalOlapScan[web_returns]
--------------------------------PhysicalDistribute
----------------------------------PhysicalOlapScan[web_returns]
--------------------------------PhysicalDistribute
----------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_web_site_sk = web_site.web_site_sk))otherCondition=()
------------------------------------hashJoin[INNER_JOIN] hashCondition=((web_sales.ws_item_sk = item.i_item_sk))otherCondition=()

View File

@ -20,6 +20,7 @@ suite("test_outer_join", "nereids_p0") {
sql "SET enable_fallback_to_original_planner=false"
def tbl1 = "test_outer_join1"
def tbl2 = "test_outer_join2"
def tbl3 = "test_outer_join3"
sql "DROP TABLE IF EXISTS ${tbl1}"
sql """
@ -37,6 +38,15 @@ suite("test_outer_join", "nereids_p0") {
DISTRIBUTED BY RANDOM BUCKETS 30
PROPERTIES ("replication_num" = "1");
"""
sql "DROP TABLE IF EXISTS ${tbl3}"
sql """
CREATE TABLE IF NOT EXISTS ${tbl3} (
c0 DECIMALV3(8,3)
)
DISTRIBUTED BY HASH (c0) BUCKETS 1 PROPERTIES ("replication_num" = "1");
"""
sql """INSERT INTO ${tbl2} (c0) VALUES ('dr'), ('x7Tq'), ('');"""
sql """INSERT INTO ${tbl1} (c0) VALUES (0.47683432698249817), (0.8864791393280029);"""
sql """INSERT INTO ${tbl1} (c0) VALUES (0.11287713050842285);"""
@ -56,6 +66,22 @@ suite("test_outer_join", "nereids_p0") {
qt_join """
SELECT * FROM ${tbl2} LEFT OUTER JOIN ${tbl1} ON (('') like ('15DScmSM')) WHERE ('abc' NOT LIKE 'abc');
"""
sql "set disable_join_reorder=true"
explain {
sql "SELECT * FROM ${tbl1} RIGHT OUTER JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0"
contains "RIGHT OUTER JOIN(PARTITIONED)"
}
explain {
sql "SELECT * FROM ${tbl1} RIGHT ANTI JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0"
contains "RIGHT ANTI JOIN(PARTITIONED)"
}
explain {
sql "SELECT * FROM ${tbl1} FULL OUTER JOIN ${tbl3} ON ${tbl1}.c0 = ${tbl3}.c0"
contains "FULL OUTER JOIN(PARTITIONED)"
}
sql "DROP TABLE IF EXISTS ${tbl1}"
sql "DROP TABLE IF EXISTS ${tbl2}"
sql "DROP TABLE IF EXISTS ${tbl3}"
}