branch-2.1: [fix](Nereids) self join not always could do colocate join #54323 (#54349)

picked from #54323
This commit is contained in:
morrySnow
2025-08-06 11:26:44 +08:00
committed by GitHub
parent 1f707d2695
commit 838648e52d
4 changed files with 295 additions and 5 deletions

View File

@ -783,6 +783,16 @@ under the License.
<artifactId>ap-loader-all</artifactId>
<version>3.0-8</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<!-- for hive-catalog-shade -->

View File

@ -288,11 +288,8 @@ public class JoinUtils {
boolean noNeedCheckColocateGroup = hitSameIndex && (leftTablePartitions.equals(rightTablePartitions))
&& (leftTablePartitions.size() <= 1);
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
if (noNeedCheckColocateGroup) {
return true;
}
if (!colocateIndex.isSameGroup(leftTableId, rightTableId)
|| colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId))) {
if (!noNeedCheckColocateGroup && (!colocateIndex.isSameGroup(leftTableId, rightTableId)
|| colocateIndex.isGroupUnstable(colocateIndex.getGroup(leftTableId)))) {
return false;
}

View File

@ -0,0 +1,272 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.util;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Env;
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.types.TinyIntType;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.List;
public class JoinUtilsTest {
@Test
public void testCouldColocateJoinForSameTable() {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
DistributionSpecHash left = new DistributionSpecHash(Lists.newArrayList(new ExprId(1)), ShuffleType.NATURAL,
1L, 1L, Collections.emptySet());
DistributionSpecHash right = new DistributionSpecHash(Lists.newArrayList(new ExprId(2)), ShuffleType.NATURAL,
1L, 1L, Collections.emptySet());
Expression leftKey1 = new SlotReference(new ExprId(1), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey1 = new SlotReference(new ExprId(2), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression leftKey2 = new SlotReference(new ExprId(3), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey2 = new SlotReference(new ExprId(4), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
List<Expression> conjuncts;
// key same with distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1));
Assertions.assertTrue(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1), new EqualTo(leftKey2, rightKey2));
Assertions.assertTrue(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and NOT have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey2), new EqualTo(leftKey2, rightKey1));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key not contains distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey2, rightKey2));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
}
@Test
public void testCouldColocateJoinForDiffTableInSameGroupAndGroupIsStable() {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
// same group and group is statble
try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
GroupId groupId = new GroupId(1L, 1L);
ColocateTableIndex colocateIndex = Mockito.mock(ColocateTableIndex.class);
Mockito.when(colocateIndex.isSameGroup(1L, 2L)).thenReturn(true);
Mockito.when(colocateIndex.getGroup(1L)).thenReturn(groupId);
Mockito.when(colocateIndex.isGroupUnstable(groupId)).thenReturn(false);
mockedEnv.when(() -> Env.getCurrentColocateIndex()).thenReturn(colocateIndex);
DistributionSpecHash left = new DistributionSpecHash(Lists.newArrayList(new ExprId(1)),
ShuffleType.NATURAL, 1L, 1L, Collections.emptySet());
DistributionSpecHash right = new DistributionSpecHash(Lists.newArrayList(new ExprId(2)),
ShuffleType.NATURAL, 2L, 2L, Collections.emptySet());
Expression leftKey1 = new SlotReference(new ExprId(1), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey1 = new SlotReference(new ExprId(2), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression leftKey2 = new SlotReference(new ExprId(3), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey2 = new SlotReference(new ExprId(4), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
List<Expression> conjuncts;
// key same with distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1));
Assertions.assertTrue(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1), new EqualTo(leftKey2, rightKey2));
Assertions.assertTrue(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and NOT have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey2), new EqualTo(leftKey2, rightKey1));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key not contains distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey2, rightKey2));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
}
}
@Test
public void testCouldColocateJoinForNotNaturalHashDstribution() {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
// same group and group is statble
try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
GroupId groupId = new GroupId(1L, 1L);
ColocateTableIndex colocateIndex = Mockito.mock(ColocateTableIndex.class);
Mockito.when(colocateIndex.isSameGroup(1L, 2L)).thenReturn(true);
Mockito.when(colocateIndex.getGroup(1L)).thenReturn(groupId);
Mockito.when(colocateIndex.isGroupUnstable(groupId)).thenReturn(false);
mockedEnv.when(() -> Env.getCurrentColocateIndex()).thenReturn(colocateIndex);
DistributionSpecHash left = new DistributionSpecHash(Lists.newArrayList(new ExprId(1)),
ShuffleType.NATURAL, 1L, 1L, Collections.emptySet());
DistributionSpecHash right = new DistributionSpecHash(Lists.newArrayList(new ExprId(2)),
ShuffleType.EXECUTION_BUCKETED, 2L, 2L, Collections.emptySet());
Expression leftKey1 = new SlotReference(new ExprId(1), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey1 = new SlotReference(new ExprId(2), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression leftKey2 = new SlotReference(new ExprId(3), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey2 = new SlotReference(new ExprId(4), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
List<Expression> conjuncts;
// key same with distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1), new EqualTo(leftKey2, rightKey2));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and NOT have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey2), new EqualTo(leftKey2, rightKey1));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key not contains distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey2, rightKey2));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
}
}
@Test
public void testCouldColocateJoinForDiffTableInSameGroupAndGroupIsUnstable() {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
// same group and group is statble
try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
GroupId groupId = new GroupId(1L, 1L);
ColocateTableIndex colocateIndex = Mockito.mock(ColocateTableIndex.class);
Mockito.when(colocateIndex.isSameGroup(1L, 2L)).thenReturn(true);
Mockito.when(colocateIndex.getGroup(1L)).thenReturn(groupId);
Mockito.when(colocateIndex.isGroupUnstable(groupId)).thenReturn(true);
mockedEnv.when(() -> Env.getCurrentColocateIndex()).thenReturn(colocateIndex);
DistributionSpecHash left = new DistributionSpecHash(Lists.newArrayList(new ExprId(1)),
ShuffleType.NATURAL, 1L, 1L, Collections.emptySet());
DistributionSpecHash right = new DistributionSpecHash(Lists.newArrayList(new ExprId(2)),
ShuffleType.NATURAL, 2L, 2L, Collections.emptySet());
Expression leftKey1 = new SlotReference(new ExprId(1), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey1 = new SlotReference(new ExprId(2), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression leftKey2 = new SlotReference(new ExprId(3), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey2 = new SlotReference(new ExprId(4), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
List<Expression> conjuncts;
// key same with distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1), new EqualTo(leftKey2, rightKey2));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and NOT have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey2), new EqualTo(leftKey2, rightKey1));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key not contains distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey2, rightKey2));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
}
}
@Test
public void testCouldColocateJoinForDiffTableNotInSameGroup() {
ConnectContext ctx = new ConnectContext();
ctx.setThreadLocalInfo();
// same group and group is statble
try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
GroupId groupId = new GroupId(1L, 1L);
ColocateTableIndex colocateIndex = Mockito.mock(ColocateTableIndex.class);
Mockito.when(colocateIndex.isSameGroup(1L, 2L)).thenReturn(true);
Mockito.when(colocateIndex.getGroup(1L)).thenReturn(groupId);
Mockito.when(colocateIndex.isGroupUnstable(groupId)).thenReturn(true);
mockedEnv.when(() -> Env.getCurrentColocateIndex()).thenReturn(colocateIndex);
DistributionSpecHash left = new DistributionSpecHash(Lists.newArrayList(new ExprId(1)), ShuffleType.NATURAL,
1L, 1L, Collections.emptySet());
DistributionSpecHash right = new DistributionSpecHash(Lists.newArrayList(new ExprId(2)), ShuffleType.NATURAL,
2L, 2L, Collections.emptySet());
Expression leftKey1 = new SlotReference(new ExprId(1), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey1 = new SlotReference(new ExprId(2), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression leftKey2 = new SlotReference(new ExprId(3), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
Expression rightKey2 = new SlotReference(new ExprId(4), "c1",
TinyIntType.INSTANCE, false, Lists.newArrayList());
List<Expression> conjuncts;
// key same with distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey1), new EqualTo(leftKey2, rightKey2));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key contains distribute key, and NOT have distribute key = distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey1, rightKey2), new EqualTo(leftKey2, rightKey1));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
// key not contains distribute key
conjuncts = Lists.newArrayList(new EqualTo(leftKey2, rightKey2));
Assertions.assertFalse(JoinUtils.couldColocateJoin(left, right, conjuncts));
}
}
}

View File

@ -374,6 +374,7 @@ under the License.
<semver4j.version>5.3.0</semver4j.version>
<aliyun-sdk-oss.version>3.15.0</aliyun-sdk-oss.version>
<awssdk.version>2.29.26</awssdk.version>
<mockito.version>4.11.0</mockito.version>
</properties>
<profiles>
<profile>
@ -1595,6 +1596,16 @@ under the License.
<artifactId>semver4j</artifactId>
<version>${semver4j.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>