[prune](partition)support prune partition when is auto partition with function call (#24747)

now create table use auto create partition:
AUTO PARTITION BY RANGE date_trunc(event_day, 'day')
so the value of event_day will be insert into partition of date_trunc(event_day, 'day'),
eg: select * from partition_range where date_trunc(event_day,"day")= "2023-08-07 11:00:00";
we can prune some partitions by invoke function of date_trunc("2023-08-07 11:00:00","day" );
This commit is contained in:
zhangstar333
2023-10-10 20:39:43 +08:00
committed by GitHub
parent 913282b29b
commit fb3b888ff1
13 changed files with 173 additions and 124 deletions

View File

@ -46,6 +46,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Objects;
/**
@ -535,12 +536,33 @@ public class BinaryPredicate extends Predicate implements Writable {
// vectorizedAnalyze(analyzer);
}
public Expr invokeFunctionExpr(ArrayList<Expr> partitionExprs, Expr paramExpr) {
for (Expr partExpr : partitionExprs) {
if (partExpr instanceof FunctionCallExpr) {
FunctionCallExpr function = (FunctionCallExpr) partExpr.clone();
ArrayList<Expr> children = function.getChildren();
for (int i = 0; i < children.size(); ++i) {
if (children.get(i) instanceof SlotRef) {
// when create partition have check only support one slotRef
function.setChild(i, paramExpr);
return ExpressionFunctions.INSTANCE.evalExpr(function);
}
}
}
}
return null;
}
/**
* If predicate is of the form "<slotref> <op> <expr>", returns expr,
* otherwise returns null. Slotref may be wrapped in a CastExpr.
* now, when support auto create partition by function(column), so need check the "<function(column)> <op> <expr>"
* because import data use function result to create partition,
* so when have a SQL of query, prune partition also need use this function
*/
public Expr getSlotBinding(SlotId id) {
public Expr getSlotBinding(SlotId id, ArrayList<Expr> partitionExprs) {
SlotRef slotRef = null;
boolean isFunctionCallExpr = false;
// check left operand
if (getChild(0) instanceof SlotRef) {
slotRef = (SlotRef) getChild(0);
@ -548,9 +570,25 @@ public class BinaryPredicate extends Predicate implements Writable {
if (((CastExpr) getChild(0)).canHashPartition()) {
slotRef = (SlotRef) getChild(0).getChild(0);
}
} else if (getChild(0) instanceof FunctionCallExpr) {
FunctionCallExpr left = (FunctionCallExpr) getChild(0);
if (partitionExprs != null && left.findEqual(partitionExprs) != null) {
ArrayList<Expr> children = left.getChildren();
for (int i = 0; i < children.size(); ++i) {
if (children.get(i) instanceof SlotRef) {
slotRef = (SlotRef) children.get(i);
isFunctionCallExpr = true;
break;
}
}
}
}
if (slotRef != null && slotRef.getSlotId() == id) {
slotIsleft = true;
if (isFunctionCallExpr) {
return invokeFunctionExpr(partitionExprs, getChild(1));
}
return getChild(1);
}
@ -561,10 +599,25 @@ public class BinaryPredicate extends Predicate implements Writable {
if (((CastExpr) getChild(1)).canHashPartition()) {
slotRef = (SlotRef) getChild(1).getChild(0);
}
} else if (getChild(1) instanceof FunctionCallExpr) {
FunctionCallExpr left = (FunctionCallExpr) getChild(1);
if (partitionExprs != null && left.findEqual(partitionExprs) != null) {
ArrayList<Expr> children = left.getChildren();
for (int i = 0; i < children.size(); ++i) {
if (children.get(i) instanceof SlotRef) {
slotRef = (SlotRef) children.get(i);
isFunctionCallExpr = true;
break;
}
}
}
}
if (slotRef != null && slotRef.getSlotId() == id) {
slotIsleft = false;
if (isFunctionCallExpr) {
return invokeFunctionExpr(partitionExprs, getChild(0));
}
return getChild(0);
}

View File

@ -195,6 +195,24 @@ public class IntLiteral extends LiteralExpr {
return new IntLiteral(value);
}
@Override
protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
//it's so strange, now in write/read function, not write type info
if (this.type.getPrimitiveType() == Type.INVALID.getPrimitiveType()) {
if (this.value <= TINY_INT_MAX && this.value >= TINY_INT_MIN) {
type = Type.TINYINT;
} else if (this.value <= SMALL_INT_MAX && this.value >= SMALL_INT_MIN) {
type = Type.SMALLINT;
} else if (this.value <= INT_MAX && this.value >= INT_MIN) {
type = Type.INT;
} else if (this.value <= BIG_INT_MAX && this.value >= BIG_INT_MIN) {
type = Type.BIGINT;
} else {
Preconditions.checkState(false, value);
}
}
}
@Override
public boolean isMinValue() {
switch (type.getPrimitiveType()) {

View File

@ -46,7 +46,9 @@ public class PartitionDesc {
protected boolean isAutoCreatePartitions;
protected PartitionType type;
public static final ImmutableSet<String> RANGE_PARTITION_FUNCTIONS = new ImmutableSortedSet.Builder<String>(
String.CASE_INSENSITIVE_ORDER).add("date_trunc").add("date_ceil").add("date_floor")
String.CASE_INSENSITIVE_ORDER).add("date_trunc").add("date_ceil").add("date_floor").add("second_floor")
.add("minute_floor").add("hour_floor").add("day_floor").add("month_floor").add("year_floor")
.add("second_ceil").add("minute_ceil").add("hour_ceil").add("day_ceil").add("month_ceil").add("year_ceil")
.build();
public PartitionDesc() {}

View File

@ -57,9 +57,9 @@ public class PartitionExprUtil {
throw new AnalysisException("now range partition only support FunctionCallExpr");
}
FunctionCallExpr functionCallExpr = (FunctionCallExpr) e;
String fnName = functionCallExpr.getFnName().getFunction();
String fnName = functionCallExpr.getFnName().getFunction().toLowerCase();
String timeUnit;
int interval;
long interval;
if ("date_trunc".equalsIgnoreCase(fnName)) {
List<Expr> paramsExprs = functionCallExpr.getParams().exprs();
if (paramsExprs.size() != 2) {
@ -70,9 +70,22 @@ public class PartitionExprUtil {
throw new AnalysisException("date_trunc param of time unit is not string literal.");
}
timeUnit = ((StringLiteral) param).getStringValue().toLowerCase();
interval = 1;
interval = 1L;
} else if (PartitionDesc.RANGE_PARTITION_FUNCTIONS.contains(fnName)) {
List<Expr> paramsExprs = functionCallExpr.getParams().exprs();
if (paramsExprs.size() != 3) {
throw new AnalysisException("date_floor/date_ceil params exprs size should be 3.");
}
Expr param = paramsExprs.get(1);
if (!(param instanceof IntLiteral)) {
throw new AnalysisException("date_floor/date_ceil param of interval must be int literal.");
}
//date_floor(event_day,interval 5 day) ---> day_floor(`event_day`, 5, '0001-01-01 00:00:00')
String[] splits = fnName.split("_");
timeUnit = splits[0]; //day
interval = ((IntLiteral) param).getLongValue(); //5
} else {
throw new AnalysisException("now range partition only support date_trunc.");
throw new AnalysisException("now range partition only support date_trunc/date_floor/date_ceil.");
}
return partitionExprUtil.new FunctionIntervalInfo(timeUnit, interval);
}
@ -80,7 +93,7 @@ public class PartitionExprUtil {
public static DateLiteral getRangeEnd(DateLiteral beginTime, FunctionIntervalInfo intervalInfo)
throws AnalysisException {
String timeUnit = intervalInfo.timeUnit;
int interval = intervalInfo.interval;
long interval = intervalInfo.interval;
switch (timeUnit) {
case "year":
return beginTime.plusYears(interval);
@ -206,9 +219,9 @@ public class PartitionExprUtil {
public class FunctionIntervalInfo {
public String timeUnit;
public int interval;
public long interval;
public FunctionIntervalInfo(String timeUnit, int interval) {
public FunctionIntervalInfo(String timeUnit, long interval) {
this.timeUnit = timeUnit;
this.interval = interval;
}

View File

@ -225,6 +225,7 @@ public class SlotRef extends Expr {
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
helper.add("slotDesc", desc != null ? desc.debugString() : "null");
helper.add("col", col);
helper.add("type", type.toSql());
helper.add("label", label);
helper.add("tblName", tblName != null ? tblName.toSql() : "null");
return helper.toString();

View File

@ -537,6 +537,10 @@ public class OlapScanNode extends ScanNode {
// lazy evaluation, since stmt is a prepared statment
isFromPrepareStmt = analyzer.getPrepareStmt() != null;
if (!isFromPrepareStmt) {
if (olapTable.getPartitionInfo().enableAutomaticPartition()) {
partitionsInfo = olapTable.getPartitionInfo();
analyzerPartitionExpr(analyzer, partitionsInfo);
}
computeColumnsFilter();
computePartitionInfo();
}
@ -1117,7 +1121,7 @@ public class OlapScanNode extends ScanNode {
// Lazy evaluation
selectedIndexId = olapTable.getBaseIndexId();
// Only key columns
computeColumnsFilter(olapTable.getBaseSchemaKeyColumns());
computeColumnsFilter(olapTable.getBaseSchemaKeyColumns(), olapTable.getPartitionInfo());
computePartitionInfo();
scanBackendIds.clear();
scanTabletIds.clear();
@ -1434,6 +1438,13 @@ public class OlapScanNode extends ScanNode {
}
}
private void analyzerPartitionExpr(Analyzer analyzer, PartitionInfo partitionInfo) throws AnalysisException {
ArrayList<Expr> exprs = partitionInfo.getPartitionExprs();
for (Expr e : exprs) {
e.analyze(analyzer);
}
}
public TupleId getTupleId() {
Preconditions.checkNotNull(desc);
return desc.getId();
@ -1631,3 +1642,4 @@ public class OlapScanNode extends ScanNode {
}
}

View File

@ -339,14 +339,15 @@ public class OlapTableSink extends DataSink {
}
}
}
boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition();
// for auto create partition by function expr, there is no any partition firstly,
// But this is required in thrift struct.
if (partitionIds.isEmpty()) {
if (enableAutomaticPartition && partitionIds.isEmpty()) {
partitionParam.setDistributedColumns(getDistColumns(table.getDefaultDistributionInfo()));
partitionParam.setPartitions(new ArrayList<TOlapTablePartition>());
}
ArrayList<Expr> exprs = partitionInfo.getPartitionExprs();
if (exprs != null && analyzer != null) {
if (enableAutomaticPartition && exprs != null && analyzer != null) {
Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext());
tupleDescriptor.setTable(table);
funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
@ -355,7 +356,7 @@ public class OlapTableSink extends DataSink {
}
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
}
partitionParam.setEnableAutomaticPartition(partitionInfo.enableAutomaticPartition());
partitionParam.setEnableAutomaticPartition(enableAutomaticPartition);
break;
}
case UNPARTITIONED: {

View File

@ -36,6 +36,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.NotImplementedException;
@ -61,6 +62,7 @@ import com.google.common.collect.TreeRangeSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -79,6 +81,7 @@ public abstract class ScanNode extends PlanNode {
protected String sortColumn = null;
protected Analyzer analyzer;
protected List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
protected PartitionInfo partitionsInfo = null;
public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType) {
super(id, desc.getId().asList(), planNodeName, statisticalType);
@ -154,23 +157,23 @@ public abstract class ScanNode extends PlanNode {
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
}
private void computeColumnFilter(Column column, SlotDescriptor slotDesc) {
private void computeColumnFilter(Column column, SlotDescriptor slotDesc, PartitionInfo partitionsInfo) {
// Set `columnFilters` all the time because `DistributionPruner` also use this.
// Maybe we could use `columnNameToRange` for `DistributionPruner` and
// only create `columnFilters` when `partition_prune_algorithm_version` is 1.
PartitionColumnFilter keyFilter = createPartitionFilter(slotDesc, conjuncts);
PartitionColumnFilter keyFilter = createPartitionFilter(slotDesc, conjuncts, partitionsInfo);
if (null != keyFilter) {
columnFilters.put(column.getName(), keyFilter);
}
ColumnRange columnRange = createColumnRange(slotDesc, conjuncts);
ColumnRange columnRange = createColumnRange(slotDesc, conjuncts, partitionsInfo);
if (columnRange != null) {
columnNameToRange.put(column.getName(), columnRange);
}
}
// TODO(ML): move it into PrunerOptimizer
public void computeColumnsFilter(List<Column> columns) {
public void computeColumnsFilter(List<Column> columns, PartitionInfo partitionsInfo) {
if (columns.size() > conjuncts.size()) {
Set<SlotRef> slotRefs = Sets.newHashSet();
for (Expr conjunct : conjuncts) {
@ -185,7 +188,7 @@ public abstract class ScanNode extends PlanNode {
if (column == null) {
continue;
}
computeColumnFilter(column, slotDesc);
computeColumnFilter(column, slotDesc, partitionsInfo);
}
} else {
for (Column column : columns) {
@ -193,20 +196,21 @@ public abstract class ScanNode extends PlanNode {
if (null == slotDesc) {
continue;
}
computeColumnFilter(column, slotDesc);
computeColumnFilter(column, slotDesc, partitionsInfo);
}
}
}
public void computeColumnsFilter() {
// for load scan node, table is null
// partitionsInfo maybe null for other scan node, eg: ExternalScanNode...
if (desc.getTable() != null) {
computeColumnsFilter(desc.getTable().getBaseSchema());
computeColumnsFilter(desc.getTable().getBaseSchema(), partitionsInfo);
}
}
public static ColumnRange createColumnRange(SlotDescriptor desc,
List<Expr> conjuncts) {
List<Expr> conjuncts, PartitionInfo partitionsInfo) {
ColumnRange result = ColumnRange.create();
for (Expr expr : conjuncts) {
if (!expr.isBound(desc.getId())) {
@ -224,7 +228,7 @@ public abstract class ScanNode extends PlanNode {
List<Range<ColumnBound>> disjunctiveRanges = Lists.newArrayList();
Set<Boolean> hasIsNull = Sets.newHashSet();
boolean allMatch = disjunctivePredicates.stream().allMatch(e -> {
ColumnRanges ranges = expressionToRanges(e, desc);
ColumnRanges ranges = expressionToRanges(e, desc, partitionsInfo);
switch (ranges.type) {
case IS_NULL:
hasIsNull.add(true);
@ -244,7 +248,7 @@ public abstract class ScanNode extends PlanNode {
}
} else {
// Try to get column filter from conjunctive predicates.
ColumnRanges ranges = expressionToRanges(expr, desc);
ColumnRanges ranges = expressionToRanges(expr, desc, partitionsInfo);
switch (ranges.type) {
case IS_NULL:
result.setHasConjunctiveIsNull(true);
@ -262,7 +266,7 @@ public abstract class ScanNode extends PlanNode {
}
public static ColumnRanges expressionToRanges(Expr expr,
SlotDescriptor desc) {
SlotDescriptor desc, PartitionInfo partitionsInfo) {
if (expr instanceof IsNullPredicate) {
IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
if (isNullPredicate.isSlotRefChildren() && !isNullPredicate.isNotNull()) {
@ -273,8 +277,10 @@ public abstract class ScanNode extends PlanNode {
List<Range<ColumnBound>> result = Lists.newArrayList();
if (expr instanceof BinaryPredicate) {
BinaryPredicate binPred = (BinaryPredicate) expr;
Expr slotBinding = binPred.getSlotBinding(desc.getId());
ArrayList<Expr> partitionExprs = (partitionsInfo != null && partitionsInfo.enableAutomaticPartition())
? partitionsInfo.getPartitionExprs()
: null;
Expr slotBinding = binPred.getSlotBinding(desc.getId(), partitionExprs);
if (slotBinding == null || !slotBinding.isConstant() || !(slotBinding instanceof LiteralExpr)) {
return ColumnRanges.createFailure();
}
@ -327,15 +333,15 @@ public abstract class ScanNode extends PlanNode {
ColumnRanges rightChildRange = null;
switch (compoundPredicate.getOp()) {
case AND:
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc);
rightChildRange = expressionToRanges(compoundPredicate.getChild(1), desc);
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc, partitionsInfo);
rightChildRange = expressionToRanges(compoundPredicate.getChild(1), desc, partitionsInfo);
return leftChildRange.intersectRanges(rightChildRange);
case OR:
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc);
rightChildRange = expressionToRanges(compoundPredicate.getChild(1), desc);
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc, partitionsInfo);
rightChildRange = expressionToRanges(compoundPredicate.getChild(1), desc, partitionsInfo);
return leftChildRange.unionRanges(rightChildRange);
case NOT:
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc);
leftChildRange = expressionToRanges(compoundPredicate.getChild(0), desc, partitionsInfo);
return leftChildRange.complementOfRanges();
default:
throw new RuntimeException("unknown OP in compound predicate: "
@ -350,7 +356,8 @@ public abstract class ScanNode extends PlanNode {
}
}
private PartitionColumnFilter createPartitionFilter(SlotDescriptor desc, List<Expr> conjuncts) {
private PartitionColumnFilter createPartitionFilter(SlotDescriptor desc, List<Expr> conjuncts,
PartitionInfo partitionsInfo) {
PartitionColumnFilter partitionColumnFilter = null;
for (Expr expr : conjuncts) {
if (!expr.isBound(desc.getId())) {
@ -363,7 +370,11 @@ public abstract class ScanNode extends PlanNode {
continue;
}
Expr slotBinding = binPredicate.getSlotBinding(desc.getId());
ArrayList<Expr> partitionExprs = (partitionsInfo != null && partitionsInfo.enableAutomaticPartition())
? partitionsInfo.getPartitionExprs()
: null;
Expr slotBinding = binPredicate.getSlotBinding(desc.getId(), partitionExprs);
if (slotBinding == null || !slotBinding.isConstant() || !(slotBinding instanceof LiteralExpr)) {
continue;
}

View File

@ -34,13 +34,9 @@ import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.GroupByClause;
import org.apache.doris.analysis.GroupingInfo;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.InlineViewRef;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.LateralViewRef;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SetOperationStmt;
@ -1513,89 +1509,6 @@ public class SingleNodePlanner {
return tupleDesc;
}
// no need to remove?
public static PartitionColumnFilter createPartitionFilter(SlotDescriptor desc, List<Expr> conjuncts) {
PartitionColumnFilter partitionColumnFilter = null;
for (Expr expr : conjuncts) {
if (!expr.isBound(desc.getId())) {
continue;
}
if (expr instanceof BinaryPredicate) {
BinaryPredicate binPredicate = (BinaryPredicate) expr;
Expr slotBinding = binPredicate.getSlotBinding(desc.getId());
if (slotBinding == null || !slotBinding.isConstant()) {
continue;
}
if (binPredicate.getOp() == BinaryPredicate.Operator.NE
|| !(slotBinding instanceof LiteralExpr)) {
continue;
}
if (null == partitionColumnFilter) {
partitionColumnFilter = new PartitionColumnFilter();
}
LiteralExpr literal = (LiteralExpr) slotBinding;
BinaryPredicate.Operator op = binPredicate.getOp();
if (!binPredicate.slotIsLeft()) {
op = op.commutative();
}
switch (op) {
case EQ:
partitionColumnFilter.setLowerBound(literal, true);
partitionColumnFilter.setUpperBound(literal, true);
break;
case LE:
partitionColumnFilter.setUpperBound(literal, true);
if (null == partitionColumnFilter.lowerBound) {
partitionColumnFilter.lowerBoundInclusive = true;
}
break;
case LT:
partitionColumnFilter.setUpperBound(literal, false);
if (null == partitionColumnFilter.lowerBound) {
partitionColumnFilter.lowerBoundInclusive = true;
}
break;
case GE:
partitionColumnFilter.setLowerBound(literal, true);
break;
case GT:
partitionColumnFilter.setLowerBound(literal, false);
break;
default:
break;
}
} else if (expr instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) expr;
if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) {
continue;
}
if (!(inPredicate.getChild(0).unwrapExpr(false) instanceof SlotRef)) {
// If child(0) of the in predicate is not a SlotRef,
// then other children of in predicate should not be used as a condition for partition prune.
continue;
}
if (null == partitionColumnFilter) {
partitionColumnFilter = new PartitionColumnFilter();
}
partitionColumnFilter.setInPredicate(inPredicate);
} else if (expr instanceof IsNullPredicate) {
IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
if (!isNullPredicate.isSlotRefChildren() || isNullPredicate.isNotNull()) {
continue;
}
// If we meet a IsNull predicate on partition column, then other predicates are useless
// eg: (xxxx) and (col is null), only the IsNull predicate has an effect on partition pruning.
partitionColumnFilter = new PartitionColumnFilter();
NullLiteral nullLiteral = new NullLiteral();
partitionColumnFilter.setLowerBound(nullLiteral, true);
partitionColumnFilter.setUpperBound(nullLiteral, true);
break;
}
}
LOG.debug("partitionColumnFilter: {}", partitionColumnFilter);
return partitionColumnFilter;
}
/**
* Returns plan tree for an inline view ref:

View File

@ -555,7 +555,7 @@ public class StreamLoadPlanner {
if (null == slotDesc) {
continue;
}
ColumnRange columnRange = ScanNode.createColumnRange(slotDesc, conjuncts);
ColumnRange columnRange = ScanNode.createColumnRange(slotDesc, conjuncts, partitionInfo);
if (columnRange != null) {
columnNameToRange.put(column.getName(), columnRange);
}