Fix bugs when using function in both stream load request and routine load job (#1091)

This commit is contained in:
Mingyu Chen
2019-05-05 20:51:30 +08:00
committed by ZHAO Chun
parent 53ae591183
commit ba78adae94
11 changed files with 66 additions and 27 deletions

View File

@ -77,6 +77,16 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
注意这里我们使用 `"3"` 而不是 `3`,虽然 `k1` 的类型为 `int`。因为对于导入任务来说,源数据中的列类型都为 `varchar`,所以这里 `xx` 虚拟列的类型也为 `varchar`。所以我们需要使用 `"3"` 来进行对应的匹配,否则 `ifnull` 函数无法找到参数为 `(varchar, int)` 的函数签名,将出现错误。
再举例,假设用户需要导入只包含 `k1` 一列的表,列类型为 `int`。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以 100。这个功能可以通过 `case when` 函数实现,正确写法应如下:
`COLUMNS (xx, case when xx < 0 than cast(-xx as varchar) else cast((xx + '100') as varchar) end)`
注意这里我们需要将 `case when` 中所有的参数都最终转换为 varchar,才能得到期望的结果。
* where_predicates
`where_predicates` 中的的列的类型,已经是实际的列类型了,所以无需向 `columns_mapping` 那样强制的转换为 varchar 类型。按照实际的列类型书写即可。
* desired\_concurrent\_number
`desired_concurrent_number` 用于指定一个例行作业期望的并发度。即一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:

View File

@ -198,13 +198,13 @@
语法:
PAUSE ROUTINE LOAD [db.]name;
PAUSE ROUTINE LOAD FOR [db.]name;
## example
1. 暂停名称为 test1 的例行导入作业。
PAUSE ROUTINE LOAD test1;
PAUSE ROUTINE LOAD FOR test1;
## keyword
PAUSE,ROUTINE,LOAD
@ -215,13 +215,13 @@
语法:
RESUME ROUTINE LOAD [db.]name;
RESUME ROUTINE LOAD FOR [db.]name;
## example
1. 恢复名称为 test1 的例行导入作业。
RESUME ROUTINE LOAD test1;
RESUME ROUTINE LOAD FOR test1;
## keyword
RESUME,ROUTINE,LOAD
@ -232,7 +232,7 @@
语法:
STOP ROUTINE LOAD [db.]name;
STOP ROUTINE LOAD FOR [db.]name;
被停止的作业无法再恢复运行。
@ -240,7 +240,7 @@
1. 停止名称为 test1 的例行导入作业。
STOP ROUTINE LOAD test1;
STOP ROUTINE LOAD FOR test1;
## keyword
STOP,ROUTINE,LOAD

View File

@ -1205,21 +1205,21 @@ load_property ::=
;
pause_routine_load_stmt ::=
KW_PAUSE KW_ROUTINE KW_LOAD job_label:jobLabel
KW_PAUSE KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel
{:
RESULT = new PauseRoutineLoadStmt(jobLabel);
:}
;
resume_routine_load_stmt ::=
KW_RESUME KW_ROUTINE KW_LOAD job_label:jobLabel
KW_RESUME KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel
{:
RESULT = new ResumeRoutineLoadStmt(jobLabel);
:}
;
stop_routine_load_stmt ::=
KW_STOP KW_ROUTINE KW_LOAD job_label:jobLabel
KW_STOP KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel
{:
RESULT = new StopRoutineLoadStmt(jobLabel);
:}

View File

@ -136,6 +136,8 @@ public class Analyzer {
// On-clause of any such semi-join is not allowed to reference other semi-joined tuples
// except its own. Therefore, only a single semi-joined tuple can be visible at a time.
private TupleId visibleSemiJoinedTupleId_ = null;
// for some situation that udf is not allowed.
private boolean isUDFAllowed = true;
public void setIsSubquery() {
isSubquery = true;
@ -145,6 +147,9 @@ public class Analyzer {
public boolean hasPlanHints() { return globalState.hasPlanHints; }
public void setIsWithClause() { isWithClause_ = true; }
public boolean isWithClause() { return isWithClause_; }
public void setUDFAllowed(boolean val) { this.isUDFAllowed = val; }
public boolean isUDFAllowed() { return this.isUDFAllowed; }
// state shared between all objects of an Analyzer tree
// TODO: Many maps here contain properties about tuples, e.g., whether

View File

@ -25,6 +25,7 @@ import org.apache.doris.thrift.TAggregateFunction;
import org.apache.doris.thrift.TAggregationOp;
import org.apache.doris.thrift.TFunction;
import org.apache.doris.thrift.TFunctionBinaryType;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
@ -41,7 +42,7 @@ public class BuiltinAggregateFunction extends Function {
return isAnalyticFn;
}
// TODO: this is not used yet until the planner understand this.
private org.apache.doris.catalog.Type intermediateType_;
private org.apache.doris.catalog.Type intermediateType_;
private boolean reqIntermediateTuple = false;
public boolean isReqIntermediateTuple() {
@ -51,7 +52,7 @@ public class BuiltinAggregateFunction extends Function {
public BuiltinAggregateFunction(Operator op, ArrayList<Type> argTypes,
Type retType, org.apache.doris.catalog.Type intermediateType, boolean isAnalyticFn)
throws AnalysisException {
super(FunctionName.CreateBuiltinName(op.toString()), argTypes,
super(FunctionName.createBuiltinName(op.toString()), argTypes,
retType, false);
Preconditions.checkState(intermediateType != null);
Preconditions.checkState(op != null);

View File

@ -17,12 +17,12 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarFunction;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.thrift.TExpr;
@ -31,10 +31,10 @@ import org.apache.doris.thrift.TExprNodeType;
import org.apache.doris.thrift.TExprOpcode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class CastExpr extends Expr {
private static final Logger LOG = LogManager.getLogger(CastExpr.class);

View File

@ -17,13 +17,12 @@
package org.apache.doris.analysis;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import org.apache.doris.catalog.*;
import org.apache.doris.catalog.AggregateFunction;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.ScalarFunction;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -32,6 +31,14 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TAggregateExpr;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TExprNodeType;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -472,7 +479,7 @@ public class FunctionCallExpr extends Expr {
Type type = getChild(0).type.getMaxResolutionType();
fn = getBuiltinFunction(analyzer, fnName.getFunction(), new Type[]{type},
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
} else if (fnName.getFunction().equalsIgnoreCase("count_distinct")) {
} else if (fnName.getFunction().equalsIgnoreCase("count_distinct")) {
Type compatibleType = this.children.get(0).getType();
for (int i = 1; i < this.children.size(); ++i) {
Type type = this.children.get(i).getType();
@ -486,13 +493,18 @@ public class FunctionCallExpr extends Expr {
fn = getBuiltinFunction(analyzer, fnName.getFunction(), new Type[]{compatibleType},
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
} else {
// now first find function in builtin functions
// now first find function in built-in functions
if (Strings.isNullOrEmpty(fnName.getDb())) {
fn = getBuiltinFunction(analyzer, fnName.getFunction(), collectChildReturnTypes(),
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
}
// find user defined functions
if (fn == null) {
if (!analyzer.isUDFAllowed()) {
throw new AnalysisException("Does not support non-builtin functions: " + fnName);
}
String dbName = fnName.analyzeDb(analyzer);
if (!Strings.isNullOrEmpty(dbName)) {
// check operation privilege

View File

@ -17,16 +17,16 @@
package org.apache.doris.analysis;
import com.google.common.base.Strings;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.PullLoadSourceInfo;
import org.apache.doris.thrift.TFunctionName;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -67,7 +67,7 @@ public class FunctionName implements Writable {
// Same as FunctionName but for builtins and we'll leave the case
// as is since we aren't matching by string.
public static FunctionName CreateBuiltinName(String fn) {
public static FunctionName createBuiltinName(String fn) {
FunctionName name = new FunctionName(fn);
name.fn_ = fn;
return name;

View File

@ -24,6 +24,13 @@ public class ImportColumnDesc {
private String columnName;
private Expr expr;
public ImportColumnDesc(ImportColumnDesc other) {
this.columnName = other.columnName;
if (other.expr != null) {
this.expr = other.expr.clone();
}
}
public ImportColumnDesc(String column) {
this.columnName = column;
}

View File

@ -72,8 +72,10 @@ public class StreamLoadPlanner {
this.db = db;
this.destTable = destTable;
this.streamLoadTask = streamLoadTask;
analyzer = new Analyzer(Catalog.getInstance(), null);
// TODO(cmy): currently we do not support UDF in stream load command.
// Because there is no way to check the privilege of accessing UDF..
analyzer.setUDFAllowed(false);
descTable = analyzer.getDescTbl();
}

View File

@ -189,6 +189,8 @@ public class StreamLoadScanNode extends ScanNode {
// substitute SlotRef in filter expression
Expr whereExpr = streamLoadTask.getWhereExpr();
// where expr must be rewrite first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate)
whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer);
List<SlotRef> slots = Lists.newArrayList();
whereExpr.collect(SlotRef.class, slots);