[fix](planner)Fix colocate query failed #16459

Issue Number: close #16458
Co-authored-by: wangbo36@meituan.com <wangbo36@meituan.com>
This commit is contained in:
wangbo
2023-02-14 18:51:28 +08:00
committed by GitHub
parent 4444abc828
commit acf5540a9f
3 changed files with 80 additions and 2 deletions

View File

@ -539,8 +539,14 @@ public class DistributedPlanner {
// they are naturally colocate relationship no need to check colocate group
Collection<Long> leftPartitions = leftRoot.getSelectedPartitionIds();
Collection<Long> rightPartitions = rightRoot.getSelectedPartitionIds();
boolean noNeedCheckColocateGroup = (leftTable.getId() == rightTable.getId())
&& (leftPartitions.equals(rightPartitions)) && (leftPartitions.size() <= 1);
// For UT or no partition is selected, getSelectedIndexId() == -1, see selectMaterializedView()
boolean hitSameIndex = (leftTable.getId() == rightTable.getId())
&& (leftRoot.getSelectedIndexId() != -1 && rightRoot.getSelectedIndexId() != -1)
&& (leftRoot.getSelectedIndexId() == rightRoot.getSelectedIndexId());
boolean noNeedCheckColocateGroup = hitSameIndex && (leftPartitions.equals(rightPartitions))
&& (leftPartitions.size() <= 1);
if (!noNeedCheckColocateGroup) {
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();

View File

@ -321,6 +321,10 @@ public class OlapScanNode extends ScanNode {
this.selectedIndexId = olapTable.getBaseIndexId();
}
public long getSelectedIndexId() {
return selectedIndexId;
}
/**
* This method is mainly used to update scan range info in OlapScanNode by the
* new materialized selector.

View File

@ -81,4 +81,72 @@ suite("test_colocate_join") {
contains "4:VHASH JOIN\n | join op: INNER JOIN(COLOCATE[])[]"
contains "2:VHASH JOIN\n | join op: INNER JOIN(COLOCATE[])[]"
}
/* test join same table but hit different rollup, should disable colocate join */
sql """ DROP TABLE IF EXISTS `test_query_colocate`;"""
sql """
CREATE TABLE `test_query_colocate` (
`datekey` int(11) NULL,
`rollup_1_condition` int null,
`rollup_2_condition` int null,
`sum_col1` bigint(20) SUM NULL,
`sum_col2` bigint(20) SUM NULL
) ENGINE=OLAP
AGGREGATE KEY(`datekey`,`rollup_1_condition`,`rollup_2_condition`)
COMMENT ""
PARTITION BY RANGE(`datekey`)
(PARTITION p20220102 VALUES [("20220101"), ("20220102")),
PARTITION p20220103 VALUES [("20220102"), ("20220103")))
DISTRIBUTED BY HASH(`datekey`) BUCKETS 1
rollup (
rollup_1(datekey, sum_col1),
rollup_2(datekey, sum_col2)
)
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""
sql """insert into test_query_colocate values
(20220101, 102, 200, 200, 100),
(20220101, 101, 200, 200, 100),
(20220101, 102, 202, 200, 100),
(20220101, 101, 202, 200, 100);"""
explain {
sql("select " +
" sum_col1,sum_col2 " +
"from " +
"(select datekey,sum(sum_col1) as sum_col1 from test_query_colocate where datekey=20220101 group by datekey) t1 " +
"left join " +
"(select datekey,sum(sum_col2) as sum_col2 from test_query_colocate where datekey=20220101 group by datekey) t2 " +
"on t1.datekey = t2.datekey")
contains "Tables are not in the same group"
}
sql """ DROP TABLE IF EXISTS `test_query_colocate` """
/* test no rollup is selected */
sql """ DROP TABLE IF EXISTS `tbl1`;"""
sql """ DROP TABLE IF EXISTS `tbl2`;"""
sql """
create table tbl1(k1 int, k2 varchar(32), v bigint sum) AGGREGATE KEY(k1,k2) distributed by hash(k1) buckets 1 properties('replication_num' = '1');
"""
sql """
create table tbl2(k3 int, k4 varchar(32)) DUPLICATE KEY(k3) distributed by hash(k3) buckets 1 properties('replication_num' = '1');
"""
explain {
sql("select * from tbl1 join tbl2 on tbl1.k1 = tbl2.k3")
contains "INNER JOIN"
}
sql """ DROP TABLE IF EXISTS `tbl1`;"""
sql """ DROP TABLE IF EXISTS `tbl2`;"""
}