From ba78adae947f3cefcb7f74478e2cf86c72b331f1 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sun, 5 May 2019 20:51:30 +0800 Subject: [PATCH] Fix bugs when using function in both stream load request and routine load job (#1091) --- .../load-data/routine-load-manual.md | 10 +++++++ .../Data Manipulation/routine_load.md | 12 ++++---- fe/src/main/cup/sql_parser.cup | 6 ++-- .../org/apache/doris/analysis/Analyzer.java | 5 ++++ .../analysis/BuiltinAggregateFunction.java | 5 ++-- .../org/apache/doris/analysis/CastExpr.java | 6 ++-- .../doris/analysis/FunctionCallExpr.java | 30 +++++++++++++------ .../apache/doris/analysis/FunctionName.java | 6 ++-- .../doris/analysis/ImportColumnDesc.java | 7 +++++ .../doris/planner/StreamLoadPlanner.java | 4 ++- .../doris/planner/StreamLoadScanNode.java | 2 ++ 11 files changed, 66 insertions(+), 27 deletions(-) diff --git a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md index f6b20913b6..ad90ac2e15 100644 --- a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md @@ -77,6 +77,16 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或 注意这里我们使用 `"3"` 而不是 `3`,虽然 `k1` 的类型为 `int`。因为对于导入任务来说,源数据中的列类型都为 `varchar`,所以这里 `xx` 虚拟列的类型也为 `varchar`。所以我们需要使用 `"3"` 来进行对应的匹配,否则 `ifnull` 函数无法找到参数为 `(varchar, int)` 的函数签名,将出现错误。 + 再举例,假设用户需要导入只包含 `k1` 一列的表,列类型为 `int`。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以 100。这个功能可以通过 `case when` 函数实现,正确写法应如下: + + `COLUMNS (xx, case when xx < 0 than cast(-xx as varchar) else cast((xx + '100') as varchar) end)` + + 注意这里我们需要将 `case when` 中所有的参数都最终转换为 varchar,才能得到期望的结果。 + +* where_predicates + + `where_predicates` 中的的列的类型,已经是实际的列类型了,所以无需向 `columns_mapping` 那样强制的转换为 varchar 类型。按照实际的列类型书写即可。 + * desired\_concurrent\_number `desired_concurrent_number` 用于指定一个例行作业期望的并发度。即一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下: diff --git a/docs/help/Contents/Data Manipulation/routine_load.md b/docs/help/Contents/Data Manipulation/routine_load.md index da0aeafbb4..11a06d7a8b 100644 --- a/docs/help/Contents/Data Manipulation/routine_load.md +++ b/docs/help/Contents/Data Manipulation/routine_load.md @@ -198,13 +198,13 @@ 语法: - PAUSE ROUTINE LOAD [db.]name; + PAUSE ROUTINE LOAD FOR [db.]name; ## example 1. 暂停名称为 test1 的例行导入作业。 - PAUSE ROUTINE LOAD test1; + PAUSE ROUTINE LOAD FOR test1; ## keyword PAUSE,ROUTINE,LOAD @@ -215,13 +215,13 @@ 语法: - RESUME ROUTINE LOAD [db.]name; + RESUME ROUTINE LOAD FOR [db.]name; ## example 1. 恢复名称为 test1 的例行导入作业。 - RESUME ROUTINE LOAD test1; + RESUME ROUTINE LOAD FOR test1; ## keyword RESUME,ROUTINE,LOAD @@ -232,7 +232,7 @@ 语法: - STOP ROUTINE LOAD [db.]name; + STOP ROUTINE LOAD FOR [db.]name; 被停止的作业无法再恢复运行。 @@ -240,7 +240,7 @@ 1. 停止名称为 test1 的例行导入作业。 - STOP ROUTINE LOAD test1; + STOP ROUTINE LOAD FOR test1; ## keyword STOP,ROUTINE,LOAD diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index ffab7e0147..1476aa84bb 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -1205,21 +1205,21 @@ load_property ::= ; pause_routine_load_stmt ::= - KW_PAUSE KW_ROUTINE KW_LOAD job_label:jobLabel + KW_PAUSE KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel {: RESULT = new PauseRoutineLoadStmt(jobLabel); :} ; resume_routine_load_stmt ::= - KW_RESUME KW_ROUTINE KW_LOAD job_label:jobLabel + KW_RESUME KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel {: RESULT = new ResumeRoutineLoadStmt(jobLabel); :} ; stop_routine_load_stmt ::= - KW_STOP KW_ROUTINE KW_LOAD job_label:jobLabel + KW_STOP KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel {: RESULT = new StopRoutineLoadStmt(jobLabel); :} diff --git a/fe/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/src/main/java/org/apache/doris/analysis/Analyzer.java index b5916e4020..6947be7c71 100644 --- a/fe/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -136,6 +136,8 @@ public class Analyzer { // On-clause of any such semi-join is not allowed to reference other semi-joined tuples // except its own. Therefore, only a single semi-joined tuple can be visible at a time. private TupleId visibleSemiJoinedTupleId_ = null; + // for some situation that udf is not allowed. + private boolean isUDFAllowed = true; public void setIsSubquery() { isSubquery = true; @@ -145,6 +147,9 @@ public class Analyzer { public boolean hasPlanHints() { return globalState.hasPlanHints; } public void setIsWithClause() { isWithClause_ = true; } public boolean isWithClause() { return isWithClause_; } + + public void setUDFAllowed(boolean val) { this.isUDFAllowed = val; } + public boolean isUDFAllowed() { return this.isUDFAllowed; } // state shared between all objects of an Analyzer tree // TODO: Many maps here contain properties about tuples, e.g., whether diff --git a/fe/src/main/java/org/apache/doris/analysis/BuiltinAggregateFunction.java b/fe/src/main/java/org/apache/doris/analysis/BuiltinAggregateFunction.java index 8e581460d6..925ddf9a05 100644 --- a/fe/src/main/java/org/apache/doris/analysis/BuiltinAggregateFunction.java +++ b/fe/src/main/java/org/apache/doris/analysis/BuiltinAggregateFunction.java @@ -25,6 +25,7 @@ import org.apache.doris.thrift.TAggregateFunction; import org.apache.doris.thrift.TAggregationOp; import org.apache.doris.thrift.TFunction; import org.apache.doris.thrift.TFunctionBinaryType; + import com.google.common.base.Preconditions; import java.util.ArrayList; @@ -41,7 +42,7 @@ public class BuiltinAggregateFunction extends Function { return isAnalyticFn; } // TODO: this is not used yet until the planner understand this. - private org.apache.doris.catalog.Type intermediateType_; + private org.apache.doris.catalog.Type intermediateType_; private boolean reqIntermediateTuple = false; public boolean isReqIntermediateTuple() { @@ -51,7 +52,7 @@ public class BuiltinAggregateFunction extends Function { public BuiltinAggregateFunction(Operator op, ArrayList argTypes, Type retType, org.apache.doris.catalog.Type intermediateType, boolean isAnalyticFn) throws AnalysisException { - super(FunctionName.CreateBuiltinName(op.toString()), argTypes, + super(FunctionName.createBuiltinName(op.toString()), argTypes, retType, false); Preconditions.checkState(intermediateType != null); Preconditions.checkState(op != null); diff --git a/fe/src/main/java/org/apache/doris/analysis/CastExpr.java b/fe/src/main/java/org/apache/doris/analysis/CastExpr.java index da35ea8b6e..b87550509d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CastExpr.java +++ b/fe/src/main/java/org/apache/doris/analysis/CastExpr.java @@ -17,12 +17,12 @@ package org.apache.doris.analysis; -import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarFunction; +import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.thrift.TExpr; @@ -31,10 +31,10 @@ import org.apache.doris.thrift.TExprNodeType; import org.apache.doris.thrift.TExprOpcode; import com.google.common.base.Preconditions; - import com.google.common.collect.Lists; -import org.apache.logging.log4j.Logger; + import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class CastExpr extends Expr { private static final Logger LOG = LogManager.getLogger(CastExpr.class); diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index b8de91f823..0591c65220 100644 --- a/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -17,13 +17,12 @@ package org.apache.doris.analysis; -import com.google.common.base.Joiner; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSortedSet; -import org.apache.doris.catalog.*; +import org.apache.doris.catalog.AggregateFunction; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Function; +import org.apache.doris.catalog.ScalarFunction; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -32,6 +31,14 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TAggregateExpr; import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TExprNodeType; + +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -472,7 +479,7 @@ public class FunctionCallExpr extends Expr { Type type = getChild(0).type.getMaxResolutionType(); fn = getBuiltinFunction(analyzer, fnName.getFunction(), new Type[]{type}, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF); - } else if (fnName.getFunction().equalsIgnoreCase("count_distinct")) { + } else if (fnName.getFunction().equalsIgnoreCase("count_distinct")) { Type compatibleType = this.children.get(0).getType(); for (int i = 1; i < this.children.size(); ++i) { Type type = this.children.get(i).getType(); @@ -486,13 +493,18 @@ public class FunctionCallExpr extends Expr { fn = getBuiltinFunction(analyzer, fnName.getFunction(), new Type[]{compatibleType}, Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF); } else { - // now first find function in builtin functions + // now first find function in built-in functions if (Strings.isNullOrEmpty(fnName.getDb())) { fn = getBuiltinFunction(analyzer, fnName.getFunction(), collectChildReturnTypes(), Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF); } + // find user defined functions if (fn == null) { + if (!analyzer.isUDFAllowed()) { + throw new AnalysisException("Does not support non-builtin functions: " + fnName); + } + String dbName = fnName.analyzeDb(analyzer); if (!Strings.isNullOrEmpty(dbName)) { // check operation privilege diff --git a/fe/src/main/java/org/apache/doris/analysis/FunctionName.java b/fe/src/main/java/org/apache/doris/analysis/FunctionName.java index e61fa3f002..bafc853110 100644 --- a/fe/src/main/java/org/apache/doris/analysis/FunctionName.java +++ b/fe/src/main/java/org/apache/doris/analysis/FunctionName.java @@ -17,16 +17,16 @@ package org.apache.doris.analysis; -import com.google.common.base.Strings; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.load.PullLoadSourceInfo; import org.apache.doris.thrift.TFunctionName; +import com.google.common.base.Strings; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,7 +67,7 @@ public class FunctionName implements Writable { // Same as FunctionName but for builtins and we'll leave the case // as is since we aren't matching by string. - public static FunctionName CreateBuiltinName(String fn) { + public static FunctionName createBuiltinName(String fn) { FunctionName name = new FunctionName(fn); name.fn_ = fn; return name; diff --git a/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java b/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java index 1274fa71cf..9ab656c69d 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java +++ b/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java @@ -24,6 +24,13 @@ public class ImportColumnDesc { private String columnName; private Expr expr; + public ImportColumnDesc(ImportColumnDesc other) { + this.columnName = other.columnName; + if (other.expr != null) { + this.expr = other.expr.clone(); + } + } + public ImportColumnDesc(String column) { this.columnName = column; } diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 5bb2d4762d..249f0f09bf 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -72,8 +72,10 @@ public class StreamLoadPlanner { this.db = db; this.destTable = destTable; this.streamLoadTask = streamLoadTask; - analyzer = new Analyzer(Catalog.getInstance(), null); + // TODO(cmy): currently we do not support UDF in stream load command. + // Because there is no way to check the privilege of accessing UDF.. + analyzer.setUDFAllowed(false); descTable = analyzer.getDescTbl(); } diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index a9eaac5013..bc03f2395a 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -189,6 +189,8 @@ public class StreamLoadScanNode extends ScanNode { // substitute SlotRef in filter expression Expr whereExpr = streamLoadTask.getWhereExpr(); + // where expr must be rewrite first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate) + whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer); List slots = Lists.newArrayList(); whereExpr.collect(SlotRef.class, slots);