diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 57e6238013..0ffe509d8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -378,6 +378,8 @@ public enum RuleType { STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION), STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION), STORAGE_LAYER_AGGREGATE_WITH_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION), + STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE(RuleTypeClass.IMPLEMENTATION), + STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION), COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION), COUNT_ON_INDEX_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION), ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 89373cc95c..b4fce67beb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -40,12 +40,15 @@ import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.IsNull; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam; import org.apache.doris.nereids.trees.expressions.functions.agg.Count; import org.apache.doris.nereids.trees.expressions.functions.agg.GroupConcat; +import org.apache.doris.nereids.trees.expressions.functions.agg.Max; +import org.apache.doris.nereids.trees.expressions.functions.agg.Min; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctCount; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinctSum; import org.apache.doris.nereids.trees.expressions.functions.agg.Sum; @@ -140,6 +143,72 @@ public class AggregateStrategies implements ImplementationRuleFactory { return pushdownCountOnIndex(agg, project, filter, olapScan, ctx.cascadesContext); }) ), + RuleType.STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE_WITHOUT_PROJECT.build( + logicalAggregate( + logicalFilter( + logicalOlapScan().when(this::isUniqueKeyTable)) + .when(filter -> { + if (filter.getConjuncts().size() != 1) { + return false; + } + Expression childExpr = filter.getConjuncts().iterator().next().children().get(0); + if (childExpr instanceof SlotReference) { + Optional column = ((SlotReference) childExpr).getColumn(); + return column.isPresent() ? column.get().isDeleteSignColumn() : false; + } + return false; + }) + ) + .when(agg -> enablePushDownMinMaxOnUnique()) + .when(agg -> agg.getGroupByExpressions().isEmpty()) + .when(agg -> { + Set funcs = agg.getAggregateFunctions(); + return !funcs.isEmpty() && funcs.stream() + .allMatch(f -> (f instanceof Min) || (f instanceof Max)); + }) + .thenApply(ctx -> { + LogicalAggregate> agg = ctx.root; + LogicalFilter filter = agg.child(); + LogicalOlapScan olapScan = filter.child(); + return pushdownMinMaxOnUniqueTable(agg, null, filter, olapScan, + ctx.cascadesContext); + }) + ), + RuleType.STORAGE_LAYER_AGGREGATE_MINMAX_ON_UNIQUE.build( + logicalAggregate( + logicalProject( + logicalFilter( + logicalOlapScan().when(this::isUniqueKeyTable)) + .when(filter -> { + if (filter.getConjuncts().size() != 1) { + return false; + } + Expression childExpr = filter.getConjuncts().iterator().next() + .children().get(0); + if (childExpr instanceof SlotReference) { + Optional column = ((SlotReference) childExpr).getColumn(); + return column.isPresent() ? column.get().isDeleteSignColumn() + : false; + } + return false; + })) + ) + .when(agg -> enablePushDownMinMaxOnUnique()) + .when(agg -> agg.getGroupByExpressions().isEmpty()) + .when(agg -> { + Set funcs = agg.getAggregateFunctions(); + return !funcs.isEmpty() + && funcs.stream().allMatch(f -> (f instanceof Min) || (f instanceof Max)); + }) + .thenApply(ctx -> { + LogicalAggregate>> agg = ctx.root; + LogicalProject> project = agg.child(); + LogicalFilter filter = project.child(); + LogicalOlapScan olapScan = filter.child(); + return pushdownMinMaxOnUniqueTable(agg, project, filter, olapScan, + ctx.cascadesContext); + }) + ), RuleType.STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT.build( logicalAggregate( logicalOlapScan() @@ -238,6 +307,19 @@ public class AggregateStrategies implements ImplementationRuleFactory { ); } + private boolean enablePushDownMinMaxOnUnique() { + ConnectContext connectContext = ConnectContext.get(); + return connectContext != null && connectContext.getSessionVariable().isEnablePushDownMinMaxOnUnique(); + } + + private boolean isUniqueKeyTable(LogicalOlapScan logicalScan) { + if (logicalScan != null) { + KeysType keysType = logicalScan.getTable().getKeysType(); + return keysType == KeysType.UNIQUE_KEYS; + } + return false; + } + private boolean enablePushDownCountOnIndex() { ConnectContext connectContext = ConnectContext.get(); return connectContext != null && connectContext.getSessionVariable().isEnablePushDownCountOnIndex(); @@ -314,6 +396,90 @@ public class AggregateStrategies implements ImplementationRuleFactory { } } + //select /*+SET_VAR(enable_pushdown_minmax_on_unique=true) */min(user_id) from table_unique; + //push pushAggOp=MINMAX to scan node + private LogicalAggregate pushdownMinMaxOnUniqueTable( + LogicalAggregate aggregate, + @Nullable LogicalProject project, + LogicalFilter filter, + LogicalOlapScan olapScan, + CascadesContext cascadesContext) { + final LogicalAggregate canNotPush = aggregate; + Set aggregateFunctions = aggregate.getAggregateFunctions(); + if (checkWhetherPushDownMinMax(aggregateFunctions, project, olapScan.getOutput())) { + PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan() + .build() + .transform(olapScan, cascadesContext) + .get(0); + if (project != null) { + return aggregate.withChildren(ImmutableList.of( + project.withChildren(ImmutableList.of( + filter.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate( + physicalOlapScan, + PushDownAggOp.MIN_MAX))))))); + } else { + return aggregate.withChildren(ImmutableList.of( + filter.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate( + physicalOlapScan, + PushDownAggOp.MIN_MAX))))); + } + } else { + return canNotPush; + } + } + + private boolean checkWhetherPushDownMinMax(Set aggregateFunctions, + @Nullable LogicalProject project, List outPutSlots) { + boolean onlyContainsSlotOrNumericCastSlot = aggregateFunctions.stream() + .map(ExpressionTrait::getArguments) + .flatMap(List::stream) + .allMatch(argument -> { + if (argument instanceof SlotReference) { + return true; + } + return false; + }); + if (!onlyContainsSlotOrNumericCastSlot) { + return false; + } + List argumentsOfAggregateFunction = aggregateFunctions.stream() + .flatMap(aggregateFunction -> aggregateFunction.getArguments().stream()) + .collect(ImmutableList.toImmutableList()); + + if (project != null) { + argumentsOfAggregateFunction = Project.findProject( + argumentsOfAggregateFunction, project.getProjects()) + .stream() + .map(p -> p instanceof Alias ? p.child(0) : p) + .collect(ImmutableList.toImmutableList()); + } + onlyContainsSlotOrNumericCastSlot = argumentsOfAggregateFunction + .stream() + .allMatch(argument -> { + if (argument instanceof SlotReference) { + return true; + } + return false; + }); + if (!onlyContainsSlotOrNumericCastSlot) { + return false; + } + Set aggUsedSlots = ExpressionUtils.collect(argumentsOfAggregateFunction, + SlotReference.class::isInstance); + List usedSlotInTable = (List) Project.findProject(aggUsedSlots, + outPutSlots); + for (SlotReference slot : usedSlotInTable) { + Column column = slot.getColumn().get(); + PrimitiveType colType = column.getType().getPrimitiveType(); + if (colType.isComplexType() || colType.isHllType() || colType.isBitmapType()) { + return false; + } + } + return true; + } + /** * sql: select count(*) from tbl *

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d594fd395e..bbe5eddaa2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -480,6 +480,8 @@ public class SessionVariable implements Serializable, Writable { public static final String MATERIALIZED_VIEW_REWRITE_ENABLE_CONTAIN_FOREIGN_TABLE = "materialized_view_rewrite_enable_contain_foreign_table"; + public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique"; + // When set use fix replica = true, the fixed replica maybe bad, try to use the health one if // this session variable is set to true. public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt"; @@ -1221,6 +1223,11 @@ public class SessionVariable implements Serializable, Writable { "是否启用count_on_index pushdown。", "Set whether to pushdown count_on_index."}) public boolean enablePushDownCountOnIndex = true; + // Whether enable pushdown minmax to scan node of unique table. + @VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE, needForward = true, description = { + "是否启用pushdown minmax on unique table。", "Set whether to pushdown minmax on unique table."}) + public boolean enablePushDownMinMaxOnUnique = false; + // Whether drop table when create table as select insert data appear error. @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; @@ -2438,6 +2445,14 @@ public class SessionVariable implements Serializable, Writable { this.disableJoinReorder = disableJoinReorder; } + public boolean isEnablePushDownMinMaxOnUnique() { + return enablePushDownMinMaxOnUnique; + } + + public void setEnablePushDownMinMaxOnUnique(boolean enablePushDownMinMaxOnUnique) { + this.enablePushDownMinMaxOnUnique = enablePushDownMinMaxOnUnique; + } + /** * Nereids only support vectorized engine. * diff --git a/regression-test/data/nereids_p0/explain/test_pushdown_explain.out b/regression-test/data/nereids_p0/explain/test_pushdown_explain.out index 72d126351a..a5fa13665d 100644 --- a/regression-test/data/nereids_p0/explain/test_pushdown_explain.out +++ b/regression-test/data/nereids_p0/explain/test_pushdown_explain.out @@ -2,3 +2,123 @@ -- !select -- 1 +-- !select_table_unique0 -- +1 c + +-- !select_table_unique0_min -- +a + +-- !select_table_unique1 -- +1 c +2 e + +-- !select_table_unique1_max -- +g + +-- !select_table_unique2 -- +1 c +2 e + +-- !select_table_unique2_max -- +h + +-- !select_table_unique3 -- +1 c +2 e +4 k + +-- !select_table_unique3_max -- +l + +-- !select_0 -- +1 asd cc +2 qwe vvx +3 ffsd mnm +4 qdf ll +5 cvfv vff + +-- !select_1 -- +1 + +-- !select_2 -- +5 + +-- !select_3 -- +asd + +-- !select_4 -- +qwe + +-- !select_5 -- +cc + +-- !select_6 -- +vvx + +-- !select_00 -- +1 asd zzz +2 qwe vvx +3 ffsd mnm +4 qdf ll +5 cvfv vff + +-- !select_7 -- +1 + +-- !select_8 -- +5 + +-- !select_9 -- +asd + +-- !select_10 -- +qwe + +-- !select_11 -- +cc + +-- !select_12 -- +zzz + +-- !select_000 -- +1 asd zzz +3 ffsd mnm +4 qdf ll +5 cvfv vff + +-- !select_13 -- +1 + +-- !select_14 -- +5 + +-- !select_15 -- +asd + +-- !select_16 -- +qdf + +-- !select_17 -- +ll + +-- !select_18 -- +zzz + +-- !select_19 -- +1 + +-- !select_20 -- +5 + +-- !select_21 -- +asd + +-- !select_22 -- +qwe + +-- !select_23 -- +cc + +-- !select_24 -- +vvx + diff --git a/regression-test/suites/nereids_p0/explain/test_pushdown_explain.groovy b/regression-test/suites/nereids_p0/explain/test_pushdown_explain.groovy index e82b524b5a..8406e0972b 100644 --- a/regression-test/suites/nereids_p0/explain/test_pushdown_explain.groovy +++ b/regression-test/suites/nereids_p0/explain/test_pushdown_explain.groovy @@ -65,4 +65,174 @@ suite("test_pushdown_explain") { sql("select count(cast(lo_orderkey as bigint)) from test_lineorder;") contains "pushAggOp=COUNT" } + + sql "DROP TABLE IF EXISTS table_unique0" + sql """ + CREATE TABLE `table_unique0` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `username` VARCHAR(50) NOT NULL COMMENT '\"用户昵称\"' + ) ENGINE=OLAP + UNIQUE KEY(`user_id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + // set seession variables + sql "set enable_pushdown_minmax_on_unique = true;" + + sql """ insert into table_unique0 values(1,"a"); """ + sql """ insert into table_unique0 values(1,"b"); """ + sql """ insert into table_unique0 values(1,"c"); """ + qt_select_table_unique0 "select * from table_unique0 order by user_id;" // 1, c + qt_select_table_unique0_min "select min(username) from table_unique0;" // a is read from zone map + + sql """ insert into table_unique0 values(2,"g"); """ + sql """ insert into table_unique0 values(2,"f"); """ + sql """ insert into table_unique0 values(2,"e"); """ + qt_select_table_unique1 "select * from table_unique0 order by user_id;" // 2, e + qt_select_table_unique1_max "select max(username) from table_unique0;" // g is read from zone map + + sql """ insert into table_unique0 values(3,"h"); """ + sql """ insert into table_unique0(user_id,username,__DORIS_DELETE_SIGN__) values(3,'h',1); """ // delete id = 3 + qt_select_table_unique2 "select * from table_unique0 order by user_id;" // no user_id = 3 + qt_select_table_unique2_max "select max(username) from table_unique0;" // h is read from zone map + + sql """ insert into table_unique0 values(4,"l"); """ + sql """ update table_unique0 set username = "k" where user_id = 4; """ + qt_select_table_unique3 "select * from table_unique0 order by user_id;" // 4 ,k + qt_select_table_unique3_max "select max(username) from table_unique0;" // l is read from zone map + + sql "DROP TABLE IF EXISTS table_unique" + sql """ + CREATE TABLE `table_unique` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `username` VARCHAR(50) NOT NULL COMMENT '\"用户昵称\"', + `val` VARCHAR(50) NULL + ) ENGINE=OLAP + UNIQUE KEY(`user_id`, `username`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + sql """ + insert into table_unique values(1,"asd","cc"),(2,"qwe","vvx"),(3,"ffsd","mnm"),(4,"qdf","ll"),(5,"cvfv","vff"); + """ + + sql "set enable_pushdown_minmax_on_unique = false;" + explain { + sql("select min(user_id) from table_unique;") + contains "pushAggOp=NONE" + } + explain { + sql("select max(user_id) from table_unique;") + contains "pushAggOp=NONE" + } + explain { + sql("select min(username) from table_unique;") + contains "pushAggOp=NONE" + } + explain { + sql("select max(username) from table_unique;") + contains "pushAggOp=NONE" + } + + + // set seession variables + sql "set enable_pushdown_minmax_on_unique = true;" + explain { + sql("select min(user_id) from table_unique;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select max(user_id) from table_unique;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select min(username) from table_unique;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select max(username) from table_unique;") + contains "pushAggOp=MINMAX" + } + qt_select_0 "select * from table_unique order by user_id;" + qt_select_1 "select min(user_id) from table_unique;" + qt_select_2 "select max(user_id) from table_unique;" + qt_select_3 "select min(username) from table_unique;" + qt_select_4 "select max(username) from table_unique;" + qt_select_5 "select min(val) from table_unique;" + qt_select_6 "select max(val) from table_unique;" + sql """ + update table_unique set val = "zzz" where user_id = 1; + """ + qt_select_00 "select * from table_unique order by user_id;" + qt_select_7 "select min(user_id) from table_unique;" + qt_select_8 "select max(user_id) from table_unique;" + qt_select_9 "select min(username) from table_unique;" + qt_select_10 "select max(username) from table_unique;" + qt_select_11 "select min(val) from table_unique;" + qt_select_12 "select max(val) from table_unique;" + + sql """ + delete from table_unique where user_id = 2; + """ + qt_select_000 "select * from table_unique order by user_id;" + qt_select_13 "select min(user_id) from table_unique;" + qt_select_14 "select max(user_id) from table_unique;" + qt_select_15 "select min(username) from table_unique;" + qt_select_16 "select max(username) from table_unique;" + qt_select_17 "select min(val) from table_unique;" + qt_select_18 "select max(val) from table_unique;" + + + sql "DROP TABLE IF EXISTS table_agg" + sql """ + CREATE TABLE `table_agg` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `username` VARCHAR(50) NOT NULL COMMENT '\"用户昵称\"', + `val` VARCHAR(50) max NULL + ) ENGINE=OLAP + AGGREGATE KEY(`user_id`, `username`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "disable_auto_compaction" = "true" + ); + """ + + sql """ + insert into table_agg values(1,"asd","cc"),(2,"qwe","vvx"),(3,"ffsd","mnm"),(4,"qdf","ll"),(5,"cvfv","vff"); + """ + + explain { + sql("select min(user_id) from table_agg;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select max(user_id) from table_agg;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select min(username) from table_agg;") + contains "pushAggOp=MINMAX" + } + explain { + sql("select max(username) from table_agg;") + contains "pushAggOp=MINMAX" + } + + qt_select_19 "select min(user_id) from table_agg;" + qt_select_20 "select max(user_id) from table_agg;" + qt_select_21 "select min(username) from table_agg;" + qt_select_22 "select max(username) from table_agg;" + qt_select_23 "select min(val) from table_agg;" + qt_select_24 "select max(val) from table_agg;" }