[Feature] Add Topn udaf (#4803)

For #4674 
This is a udaf for approximate topn using Space-Saving algorithm.  At present, we can only calculate
the frequent items and their frequencies in a certain column, based on which we can implement similar
topN functions supported by Kylin in the future. 

I have also added a test to calculate the accuracy of this algorithm. The following is a rough running result.
The total amount of data is 1 million lines and follows the Zipfian distribution, where Element Cardinality
represents the data cardinality, 20X, 50X.. The value representing space_expand_rate is 20,50, which is
used to set the counter number in the space-saving algorithm

```
zf exponent = 0.5
Element cardinality	        20X        50X          100X
               1000		100%	   100%         100%
               10000		100%	   100%		100%
	       100000		100%	   100%		100%
	       500000		 94%	    98%		 99%

zf exponent = 0.6,1
Element cardinality	        20X        50X          100X
		1000		100%	   100%         100%
		10000		100%	   100%		100%
		100000		100%	   100%		100%
		500000		100%	   100%		100%

```
This commit is contained in:
Youngwb
2020-12-16 21:58:34 +08:00
committed by GitHub
parent 6afa14cda7
commit 650536d53e
15 changed files with 1115 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.ScalarFunction;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
@ -455,6 +456,30 @@ public class FunctionCallExpr extends Expr {
}
}
}
if (fnName.getFunction().equalsIgnoreCase("topn")) {
if (children.size() != 2 && children.size() != 3) {
throw new AnalysisException("topn(expr, INT [, B]) requires two or three parameters");
}
if (!getChild(1).isConstant() || !getChild(1).getType().isIntegerType()) {
throw new AnalysisException("topn requires second parameter must be a constant Integer Type: "
+ this.toSql());
}
if (getChild(1).getType() != ScalarType.INT) {
Expr e = getChild(1).castTo(ScalarType.INT);
setChild(1, e);
}
if (children.size() == 3) {
if (!getChild(2).isConstant() || !getChild(2).getType().isIntegerType()) {
throw new AnalysisException("topn requires the third parameter must be a constant Integer Type: "
+ this.toSql());
}
if (getChild(2).getType() != ScalarType.INT) {
Expr e = getChild(2).castTo(ScalarType.INT);
setChild(2, e);
}
}
}
}
// Provide better error message for some aggregate builtins. These can be

View File

@ -817,6 +817,70 @@ public class FunctionSet {
"_ZN5doris15BitmapFunctions25bitmap_intersect_finalizeINS_11StringValueEEEN9doris_udf9BigIntValEPNS3_15FunctionContextERKNS3_9StringValE")
.build();
private static final Map<Type, String> TOPN_UPDATE_SYMBOL =
ImmutableMap.<Type, String>builder()
.put(Type.BOOLEAN,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.TINYINT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.SMALLINT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.INT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_PNS2_9StringValE")
.put(Type.BIGINT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.FLOAT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.DOUBLE,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.CHAR,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_")
.put(Type.VARCHAR,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPS3_")
.put(Type.DATE,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.DATETIME,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.DECIMAL,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.DECIMALV2,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf12DecimalV2ValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.put(Type.LARGEINT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11LargeIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValEPNS2_9StringValE")
.build();
private static final Map<Type, String> TOPN_UPDATE_MORE_PARAM_SYMBOL =
ImmutableMap.<Type, String>builder()
.put(Type.BOOLEAN,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10BooleanValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.TINYINT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.SMALLINT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11SmallIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.INT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf6IntValEEEvPNS2_15FunctionContextERKT_RKS3_SA_PNS2_9StringValE")
.put(Type.BIGINT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9BigIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.FLOAT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf8FloatValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.DOUBLE,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9DoubleValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.CHAR,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PS3_")
.put(Type.VARCHAR,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf9StringValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PS3_")
.put(Type.DATE,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.DATETIME,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11DateTimeValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.DECIMAL,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf10DecimalValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.DECIMALV2,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf12DecimalV2ValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.put(Type.LARGEINT,
"_ZN5doris13TopNFunctions11topn_updateIN9doris_udf11LargeIntValEEEvPNS2_15FunctionContextERKT_RKNS2_6IntValESB_PNS2_9StringValE")
.build();
public Function getFunction(Function desc, Function.CompareMode mode) {
List<Function> fns = functions.get(desc.functionName());
if (fns == null) {
@ -1185,6 +1249,26 @@ public class FunctionSet {
"_ZN5doris12HllFunctions13hll_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
true, false, true));
// TopN
if (TOPN_UPDATE_SYMBOL.containsKey(t)) {
addBuiltin(AggregateFunction.createBuiltin("topn",
Lists.newArrayList(t, Type.INT), Type.VARCHAR, Type.VARCHAR,
"_ZN5doris13TopNFunctions9topn_initEPN9doris_udf15FunctionContextEPNS1_9StringValE",
TOPN_UPDATE_SYMBOL.get(t),
"_ZN5doris13TopNFunctions10topn_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_",
"_ZN5doris13TopNFunctions14topn_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
"_ZN5doris13TopNFunctions13topn_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
true, false, true));
addBuiltin(AggregateFunction.createBuiltin("topn",
Lists.newArrayList(t, Type.INT, Type.INT), Type.VARCHAR, Type.VARCHAR,
"_ZN5doris13TopNFunctions9topn_initEPN9doris_udf15FunctionContextEPNS1_9StringValE",
TOPN_UPDATE_MORE_PARAM_SYMBOL.get(t),
"_ZN5doris13TopNFunctions10topn_mergeEPN9doris_udf15FunctionContextERKNS1_9StringValEPS4_",
"_ZN5doris13TopNFunctions14topn_serializeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
"_ZN5doris13TopNFunctions13topn_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
true, false, true));
}
if (STDDEV_UPDATE_SYMBOL.containsKey(t)) {
addBuiltin(AggregateFunction.createBuiltin("stddev",
Lists.newArrayList(t), STDDEV_RETTYPE_SYMBOL.get(t), Type.VARCHAR,