[feature](function) Add ntile function (#9867)

Add ntile function.
For non-vectorized-engine, I just implemented like Impala, rewrite ntile to row_number and count.
But for vectorized-engine, I implemented WindowFunctionNTile.
This commit is contained in:
Jing Shen
2022-06-10 10:32:40 +08:00
committed by GitHub
parent a34d4b55f9
commit 4a474420c8
16 changed files with 484 additions and 4 deletions

View File

@ -63,6 +63,7 @@ import java.util.Objects;
*/
public class AnalyticExpr extends Expr {
private final static Logger LOG = LoggerFactory.getLogger(AnalyticExpr.class);
private static String NTILE = "NTILE";
private FunctionCallExpr fnCall;
private final List<Expr> partitionExprs;
@ -236,7 +237,8 @@ public class AnalyticExpr extends Expr {
return fn.functionName().equalsIgnoreCase(RANK)
|| fn.functionName().equalsIgnoreCase(DENSERANK)
|| fn.functionName().equalsIgnoreCase(ROWNUMBER);
|| fn.functionName().equalsIgnoreCase(ROWNUMBER)
|| fn.functionName().equalsIgnoreCase(NTILE);
}
static private boolean isHllAggFn(Function fn) {
@ -247,6 +249,110 @@ public class AnalyticExpr extends Expr {
return fn.functionName().equalsIgnoreCase(HLL_UNION_AGG);
}
private static boolean isNTileFn(Function fn) {
if (!isAnalyticFn(fn)) {
return false;
}
return fn.functionName().equalsIgnoreCase(NTILE);
}
/**
* Rewrite the following analytic functions: ntile().
* Returns a new Expr if the analytic expr is rewritten, returns null if it's not one
* that we want to equal.
*/
public static Expr rewrite(AnalyticExpr analyticExpr) {
Function fn = analyticExpr.getFnCall().getFn();
// TODO(zc)
// if (AnalyticExpr.isPercentRankFn(fn)) {
// return createPercentRank(analyticExpr);
// } else if (AnalyticExpr.isCumeDistFn(fn)) {
// return createCumeDist(analyticExpr);
// } else if (AnalyticExpr.isNtileFn(fn)) {
// return createNtile(analyticExpr);
// }
if (isNTileFn(fn) && !VectorizedUtil.isVectorized()) {
return createNTile(analyticExpr);
}
return null;
}
/**
* Rewrite ntile().
* The logic is translated from be class WindowFunctionNTile.
* count = bigBucketNum * (smallBucketSize + 1) + smallBucketNum * smallBucketSize
* bigBucketNum + smallBucketNum = bucketNum
*/
private static Expr createNTile(AnalyticExpr analyticExpr) {
Preconditions.checkState(AnalyticExpr.isNTileFn(analyticExpr.getFnCall().getFn()));
Expr bucketNum = analyticExpr.getChild(0);
AnalyticExpr rowNum = create("row_number", analyticExpr, true, false);
AnalyticExpr count = create("count", analyticExpr, false, false);
IntLiteral one = new IntLiteral(1);
ArithmeticExpr smallBucketSize = new ArithmeticExpr(ArithmeticExpr.Operator.INT_DIVIDE,
count, bucketNum);
ArithmeticExpr bigBucketNum = new ArithmeticExpr(ArithmeticExpr.Operator.MOD,
count, bucketNum);
ArithmeticExpr firstSmallBucketRowIndex = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY,
bigBucketNum,
new ArithmeticExpr(ArithmeticExpr.Operator.ADD,
smallBucketSize, one));
ArithmeticExpr rowIndex = new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT,
rowNum, one);
List<Expr> ifParams = new ArrayList<>();
ifParams.add(
new BinaryPredicate(BinaryPredicate.Operator.GE, rowIndex, firstSmallBucketRowIndex));
ArithmeticExpr rowInSmallBucket = new ArithmeticExpr(ArithmeticExpr.Operator.ADD,
new ArithmeticExpr(ArithmeticExpr.Operator.ADD,
bigBucketNum, one),
new ArithmeticExpr(ArithmeticExpr.Operator.INT_DIVIDE,
new ArithmeticExpr(ArithmeticExpr.Operator.SUBTRACT,
rowIndex, firstSmallBucketRowIndex),
smallBucketSize));
ArithmeticExpr rowInBigBucket = new ArithmeticExpr(ArithmeticExpr.Operator.ADD,
new ArithmeticExpr(ArithmeticExpr.Operator.INT_DIVIDE,
rowIndex,
new ArithmeticExpr(ArithmeticExpr.Operator.ADD,
smallBucketSize, one)),
one);
ifParams.add(rowInSmallBucket);
ifParams.add(rowInBigBucket);
return new FunctionCallExpr("if", ifParams);
}
/**
* Create a new Analytic Expr and associate it with a new function.
* Takes a reference analytic expression and clones the partition expressions and the
* order by expressions if 'copyOrderBy' is set and optionally reverses it if
* 'reverseOrderBy' is set. The new function that it will be associated with is
* specified by fnName.
*/
private static AnalyticExpr create(String fnName,
AnalyticExpr referenceExpr, boolean copyOrderBy, boolean reverseOrderBy) {
FunctionCallExpr fnExpr = new FunctionCallExpr(fnName, new ArrayList<>());
fnExpr.setIsAnalyticFnCall(true);
List<OrderByElement> orderByElements = null;
if (copyOrderBy) {
if (reverseOrderBy) {
orderByElements = OrderByElement.reverse(referenceExpr.getOrderByElements());
} else {
orderByElements = new ArrayList<>();
for (OrderByElement elem : referenceExpr.getOrderByElements()) {
orderByElements.add(elem.clone());
}
}
}
AnalyticExpr analyticExpr = new AnalyticExpr(fnExpr,
Expr.cloneList(referenceExpr.getPartitionExprs()), orderByElements, null);
return analyticExpr;
}
/**
* Checks that the value expr of an offset boundary of a RANGE window is compatible
* with orderingExprs (and that there's only a single ordering expr).
@ -533,6 +639,24 @@ public class AnalyticExpr extends Expr {
return;
}
if (analyticFnName.getFunction().equalsIgnoreCase(NTILE)) {
Preconditions.checkState(window == null, "Unexpected window set for ntile()");
Expr bucketExpr = getFnCall().getFnParams().exprs().get(0);
if (bucketExpr instanceof LiteralExpr && bucketExpr.getType().getPrimitiveType().isIntegerType()) {
Preconditions.checkState(((LiteralExpr) bucketExpr).getLongValue() > 0,
"Parameter n in ntile(n) should be positive.");
} else {
throw new AnalysisException("Parameter n in ntile(n) should be constant positive integer.");
}
window = new AnalyticWindow(AnalyticWindow.Type.ROWS,
new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null),
new Boundary(BoundaryType.CURRENT_ROW, null));
resetWindow = true;
return;
}
// Explicitly set the default arguments to lead()/lag() for BE simplicity.
// Set a window for lead(): UNBOUNDED PRECEDING to OFFSET FOLLOWING,
// Set a window for lag(): UNBOUNDED PRECEDING to OFFSET PRECEDING.

View File

@ -117,7 +117,8 @@ public class BuiltinAggregateFunction extends Function {
DENSE_RANK("DENSE_RANK", TAggregationOp.DENSE_RANK, null),
ROW_NUMBER("ROW_NUMBER", TAggregationOp.ROW_NUMBER, null),
LEAD("LEAD", TAggregationOp.LEAD, null),
FIRST_VALUE_REWRITE("FIRST_VALUE_REWRITE", null, null);
FIRST_VALUE_REWRITE("FIRST_VALUE_REWRITE", null, null),
NTILE("NTILE", TAggregationOp.NTILE, null);
private final String description;
private final TAggregationOp thriftOp;

View File

@ -521,7 +521,8 @@ public class FunctionCallExpr extends Expr {
|| fnName.getFunction().equalsIgnoreCase("row_number")
|| fnName.getFunction().equalsIgnoreCase("first_value")
|| fnName.getFunction().equalsIgnoreCase("last_value")
|| fnName.getFunction().equalsIgnoreCase("first_value_rewrite")) {
|| fnName.getFunction().equalsIgnoreCase("first_value_rewrite")
|| fnName.getFunction().equalsIgnoreCase("ntile")) {
if (!isAnalyticFnCall) {
throw new AnalysisException(fnName.getFunction() + " only used in analytic function");
}

View File

@ -1277,6 +1277,17 @@ public class SelectStmt extends QueryStmt {
return;
}
ExprSubstitutionMap rewriteSmap = new ExprSubstitutionMap();
for (Expr expr : analyticExprs) {
AnalyticExpr toRewrite = (AnalyticExpr) expr;
Expr newExpr = AnalyticExpr.rewrite(toRewrite);
if (newExpr != null) {
newExpr.analyze(analyzer);
if (!rewriteSmap.containsMappingFor(toRewrite)) {
rewriteSmap.put(toRewrite, newExpr);
}
}
}
if (rewriteSmap.size() > 0) {
// Substitute the exprs with their rewritten versions.
ArrayList<Expr> updatedAnalyticExprs =

View File

@ -36,6 +36,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -2347,6 +2348,9 @@ public class FunctionSet<T> {
prefix + "17count_star_updateEPN9doris_udf15FunctionContextEPNS1_9BigIntValE",
prefix + "11count_mergeEPN9doris_udf15FunctionContextERKNS1_9BigIntValEPS4_",
null, null));
//ntile, we use rewrite sql for ntile, actually we don't really need this.
addBuiltin(AggregateFunction.createAnalyticBuiltin("ntile",
Collections.singletonList(Type.BIGINT), Type.BIGINT, Type.BIGINT, null, null, null, null, null));
//vec Rank
addBuiltin(AggregateFunction.createAnalyticBuiltin("rank",
@ -2371,6 +2375,9 @@ public class FunctionSet<T> {
prefix + "17count_star_updateEPN9doris_udf15FunctionContextEPNS1_9BigIntValE",
prefix + "11count_mergeEPN9doris_udf15FunctionContextERKNS1_9BigIntValEPS4_",
null, null, true));
//vec ntile
addBuiltin(AggregateFunction.createAnalyticBuiltin("ntile",
Collections.singletonList(Type.BIGINT), Type.BIGINT, Type.BIGINT, null, null, null, null, null, true));
for (Type t : Type.getSupportedTypes()) {
if (t.isNull()) {