Add strict mode in Routine load, Stream load and Mini load (#1677)

This commit is contained in:
EmmyMiao87
2019-08-20 21:56:45 +08:00
committed by Mingyu Chen
parent 0a27ef030b
commit 978b1ee1af
18 changed files with 197 additions and 9 deletions

View File

@ -112,6 +112,7 @@ const std::string COLUMNS_KEY = "columns";
const std::string HLL_KEY = "hll";
const std::string COLUMN_SEPARATOR_KEY = "column_separator";
const std::string MAX_FILTER_RATIO_KEY = "max_filter_ratio";
const std::string STRICT_MODE_KEY = "strict_mode";
const std::string TIMEOUT_KEY = "timeout";
const char* k_100_continue = "100-continue";
@ -713,6 +714,17 @@ Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
if (ctx->timeout_second != -1) {
put_request.__set_timeout(ctx->timeout_second);
}
auto strict_mode_it = params.find(STRICT_MODE_KEY);
if (strict_mode_it != params.end()) {
std::string strict_mode_value = strict_mode_it->second;
if (boost::iequals(strict_mode_value, "false")) {
put_request.__set_strictMode(false);
} else if (boost::iequals(strict_mode_value, "true")) {
put_request.__set_strictMode(true);
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
}
}
// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;

View File

@ -335,6 +335,15 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
} else {
request.__set_negative(false);
}
if (!http_req->header(HTTP_STRICT_MODE).empty()) {
if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) {
request.__set_strictMode(false);
} else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) {
request.__set_strictMode(true);
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
}
}
// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;

View File

@ -32,6 +32,7 @@ static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
static const std::string HTTP_TIMEOUT = "timeout";
static const std::string HTTP_PARTITIONS = "partitions";
static const std::string HTTP_NEGATIVE = "negative";
static const std::string HTTP_STRICT_MODE = "strict_mode";
static const std::string HTTP_100_CONTINUE = "100-continue";

View File

@ -128,6 +128,44 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
`data_source_properties` 中可以指定消费具体的 Kakfa partition。如果不指定,则默认消费所订阅的 topic 的所有 partition。
注意,当显式的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态调整需要消费的 partition。
* strict\_mode
Routine load 导入可以开启 strict mode 模式。开启方式为在 job\_properties 中增加 ```"strict_mode" = "true"``` 。默认的 strict mode 为开启。
strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。
3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
#### strict mode 与 source data 的导入关系
这里以列类型为 TinyInt 来举例
>注:当表中的列允许导入空值时
|source data | source data example | string to int | strict_mode | result|
|------------|---------------------|-----------------|--------------------|---------|
|空值 | \N | N/A | true or false | NULL|
|not null | aaa or 2000 | NULL | true | invalid data(filtered)|
|not null | aaa | NULL | false | NULL|
|not null | 1 | 1 | true or false | correct data|
这里以列类型为 Decimal(1,0) 举例
>注:当表中的列允许导入空值时
|source data | source data example | string to int | strict_mode | result|
|------------|---------------------|-----------------|--------------------|--------|
|空值 | \N | N/A | true or false | NULL|
|not null | aaa | NULL | true | invalid data(filtered)|
|not null | aaa | NULL | false | NULL|
|not null | 1 or 10 | 1 | true or false | correct data|
> 注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。
#### 访问 SSL 认证的 Kafka 集群

View File

@ -113,7 +113,46 @@ Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的
columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = mouth(tmp_c2)
其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
```
+ strict\_mode
Stream load 导入可以开启 strict mode 模式。开启方式为在 HEADER 中声明 ```strict_mode=true``` 。默认的 strict mode 为开启。
strict mode 模式的意思是:对于导入过程中的列类型转换进行严格过滤。严格过滤的策略如下:
1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。
3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
#### strict mode 与 source data 的导入关系
这里以列类型为 TinyInt 来举例
>注:当表中的列允许导入空值时
|source data | source data example | string to int | strict_mode | result|
|------------|---------------------|-----------------|--------------------|---------|
|空值 | \N | N/A | true or false | NULL|
|not null | aaa or 2000 | NULL | true | invalid data(filtered)|
|not null | aaa | NULL | false | NULL|
|not null | 1 | 1 | true or false | correct data|
这里以列类型为 Decimal(1,0) 举例
>注:当表中的列允许导入空值时
|source data | source data example | string to int | strict_mode | result|
|------------|---------------------|-----------------|--------------------|--------|
|空值 | \N | N/A | true or false | NULL|
|not null | aaa | NULL | true | invalid data(filtered)|
|not null | aaa | NULL | false | NULL|
|not null | 1 or 10 | 1 | true or false | correct data|
> 注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。
### 返回结果
由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。

View File

@ -60,6 +60,9 @@
hll: 用于指定数据里面和表里面的HLL列的对应关系,表中的列和数据里面指定的列
(如果不指定columns,则数据列面的列也可以是表里面的其它非HLL列)通过","分割
指定多个hll列使用“:”分割,例如: 'hll1,cuid:hll2,device'
strict_mode: 指定当前导入是否使用严格模式,默认为 true。严格模式下,非空原始数据在列类型转化后结果为 NULL 的会被过滤。
指定方式为 'strict_mode=false'
NOTE:
1. 此种导入方式当前是在一台机器上完成导入工作,因而不宜进行数据量较大的导入工作。
@ -99,6 +102,9 @@
curl -u root http://host:port/api/testDb/_load_info?label=123
8. 指定非严格模式导入
curl --location-trusted -u root -T testData http://host:port/api/testDb/testTbl/_load?label=123\&strict_mode=false
## keyword
MINI, LOAD

View File

@ -111,6 +111,10 @@
采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。
采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。
被 where 条件过滤掉的行不算错误行。
4. strict_mode
是否开启严格模式,默认为开启。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true"
5. data_source
@ -207,7 +211,7 @@
## example
1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。
1. 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。导入任务为严格模式。
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
@ -217,7 +221,8 @@
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(
@ -227,7 +232,7 @@
"kafka_offsets" = "101,0,0,200"
);
2. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。
2. 通过 SSL 认证方式,从 Kafka 集群导入数据。同时设置 client.id 参数。导入任务为非严格模式
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
@ -237,7 +242,8 @@
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA
(

View File

@ -41,6 +41,8 @@
比如指定导入到p1, p2分区,-H "partitions: p1, p2"
timeout: 指定导入的超时时间。单位秒。默认是 600 秒。可设置范围为 1 秒 ~ 259200 秒。
strict_mode: 用户指定此次导入是否开启严格模式,默认为开启。关闭方式为 -H "strict_mode: false"。
RETURN VALUES
导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段
@ -88,6 +90,10 @@
7. 导入含有HLL列的表,可以是表中的列或者数据中的列用于生成HLL列
curl --location-trusted -u root -H "columns: k1, k2, v1=hll_hash(k1)" -T testData http://host:port/api/testDb/testTbl/_stream_load
8. 导入数据进行严格模式过滤
curl --location-trusted -u root -H "strict_mode: true" -T testData http://host:port/api/testDb/testTbl/_stream_load
## keyword
STREAM,LOAD

View File

@ -105,6 +105,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
.add(MAX_BATCH_INTERVAL_SEC_PROPERTY)
.add(MAX_BATCH_ROWS_PROPERTY)
.add(MAX_BATCH_SIZE_PROPERTY)
.add(LoadStmt.STRICT_MODE)
.build();
private static final ImmutableSet<String> KAFKA_PROPERTIES_SET = new ImmutableSet.Builder<String>()
@ -131,6 +132,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private long maxBatchIntervalS = -1;
private long maxBatchRows = -1;
private long maxBatchSizeBytes = -1;
private boolean strictMode = true;
// kafka related properties
private String kafkaBrokerList;
@ -199,6 +201,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
return maxBatchSizeBytes;
}
public boolean isStrictMode() {
return strictMode;
}
public String getKafkaBrokerList() {
return kafkaBrokerList;
}
@ -308,6 +314,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
maxBatchSizeBytes = Util.getLongPropertyOrDefault(jobProperties.get(MAX_BATCH_SIZE_PROPERTY),
RoutineLoadJob.DEFAULT_MAX_BATCH_SIZE, MAX_BATCH_SIZE_PRED,
MAX_BATCH_SIZE_PROPERTY + " should between 100MB and 1GB");
strictMode = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.STRICT_MODE),
RoutineLoadJob.DEFAULT_STRICT_MODE,
LoadStmt.STRICT_MODE + " should be a boolean");
}
private void checkDataSourceProperties() throws AnalysisException {

View File

@ -31,6 +31,7 @@ import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -278,12 +279,14 @@ public class DataDescription {
}
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column, child1);
parsedColumnExprList.add(importColumnDesc);
analyzeColumnToHadoopFunction(column, child1);
if (child1 instanceof FunctionCallExpr) {
analyzeColumnToHadoopFunction(column, child1);
}
}
}
private void analyzeColumnToHadoopFunction(String columnName, Expr child1) throws AnalysisException {
Preconditions.checkState(child1 instanceof FunctionCallExpr);
FunctionCallExpr functionCallExpr = (FunctionCallExpr) child1;
String functionName = functionCallExpr.getFnName().getFunction();
if (!hadoopSupportFunctionName.contains(functionName.toLowerCase())) {

View File

@ -35,5 +35,5 @@ public class FeConstants {
// general model
// Current meta data version. Use this version to write journals and image
public static int meta_version = FeMetaVersion.VERSION_58;
public static int meta_version = FeMetaVersion.VERSION_59;
}

View File

@ -126,4 +126,6 @@ public final class FeMetaVersion {
public static final int VERSION_57 = 57;
// broker load support function, persist origin stmt in broker load
public static final int VERSION_58 = 58;
// support strict mode in routine load and stream load
public static final int VERSION_59 = 59;
}

View File

@ -391,5 +391,19 @@ public class Util {
return result;
}
public static boolean getBooleanPropertyOrDefault(String valStr, boolean defaultVal, String hintMsg)
throws AnalysisException {
if (Strings.isNullOrEmpty(valStr)) {
return defaultVal;
}
try {
boolean result = Boolean.valueOf(valStr);
return result;
} catch (NumberFormatException e) {
throw new AnalysisException(hintMsg);
}
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Catalog;
@ -33,6 +34,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
@ -92,6 +94,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
public static final long DEFAULT_MAX_BATCH_ROWS = 200000;
public static final long DEFAULT_MAX_BATCH_SIZE = 100 * 1024 * 1024; // 100MB
public static final boolean DEFAULT_STRICT_MODE = true;
protected static final String STAR_STRING = "*";
/*
@ -147,6 +150,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// maxErrorNum / (maxBatchRows * 10) = max error rate of routine load job
// if current error rate is more then max error rate, the job will be paused
protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional
// include strict mode
protected Map<String, String> jobProperties = Maps.newHashMap();
/*
* The following 3 variables control the max execute time of a single task.
@ -241,6 +246,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
if (stmt.getMaxBatchSize() != -1) {
this.maxBatchSizeBytes = stmt.getMaxBatchSize();
}
jobProperties.put(LoadStmt.STRICT_MODE, String.valueOf(stmt.isStrictMode()));
}
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
@ -366,6 +372,14 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
return columnSeparator;
}
public boolean isStrictMode() {
String value = jobProperties.get(LoadStmt.STRICT_MODE);
if (value == null) {
return DEFAULT_STRICT_MODE;
}
return Boolean.valueOf(value);
}
public RoutineLoadProgress getProgress() {
return progress;
}
@ -1113,6 +1127,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
out.writeLong(abortedTaskNum);
Text.writeString(out, origStmt);
out.writeInt(jobProperties.size());
for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
@Override
@ -1160,6 +1179,18 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
origStmt = Text.readString(in);
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_59) {
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String value = Text.readString(in);
jobProperties.put(key, value);
}
} else {
// The behaviors of old broker load could not be changed
jobProperties.put(LoadStmt.STRICT_MODE, Boolean.toString(false));
}
// parse the origin stmt to get routine load desc
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt)));
CreateRoutineLoadStmt stmt = null;

View File

@ -112,7 +112,7 @@ public class BrokerScanNode extends ScanNode {
private Table targetTable;
private BrokerDesc brokerDesc;
private List<BrokerFileGroup> fileGroups;
private boolean strictMode;
private boolean strictMode = true;
private List<List<TBrokerFileStatus>> fileStatusesList;
// file num

View File

@ -118,6 +118,7 @@ public class StreamLoadScanNode extends ScanNode {
srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor("StreamLoadScanNode");
TBrokerScanRangeParams params = new TBrokerScanRangeParams();
params.setStrict_mode(streamLoadTask.isStrictMode());
// parse columns header. this contain map from input column to column of destination table
// columns: k1, k2, v1, v2=k1 + k2
@ -255,7 +256,7 @@ public class StreamLoadScanNode extends ScanNode {
if (expr == null) {
SlotDescriptor srcSlotDesc = slotDescByName.get(dstSlotDesc.getColumn().getName());
if (srcSlotDesc != null) {
destSidToSrcSidWithoutTrans.put(srcSlotDesc.getId().asInt(), dstSlotDesc.getId().asInt());
destSidToSrcSidWithoutTrans.put(dstSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
// If dest is allow null, we set source to nullable
if (dstSlotDesc.getColumn().isAllowNull()) {
srcSlotDesc.setIsNullable(true);

View File

@ -57,6 +57,7 @@ public class StreamLoadTask {
private String partitions;
private String path;
private boolean negative;
private boolean strictMode = true;
private int timeout = Config.stream_load_default_timeout_second;
public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) {
@ -106,6 +107,10 @@ public class StreamLoadTask {
return negative;
}
public boolean isStrictMode() {
return strictMode;
}
public int getTimeout() {
return timeout;
}
@ -143,6 +148,9 @@ public class StreamLoadTask {
if (request.isSetTimeout()) {
timeout = request.getTimeout();
}
if (request.isSetStrictMode()) {
strictMode = request.isStrictMode();
}
}
public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) {
@ -158,6 +166,7 @@ public class StreamLoadTask {
whereExpr = routineLoadJob.getWhereExpr();
columnSeparator = routineLoadJob.getColumnSeparator();
partitions = routineLoadJob.getPartitions() == null ? null : Joiner.on(",").join(routineLoadJob.getPartitions());
strictMode = routineLoadJob.isStrictMode();
}
private void setColumnToColumnExpr(String columns) throws UserException {

View File

@ -523,6 +523,7 @@ struct TStreamLoadPutRequest {
16: optional i64 auth_code
17: optional bool negative
18: optional i32 timeout
19: optional bool strictMode
}
struct TStreamLoadPutResult {