[feature](Load)Suppot MySQL Load Data (#15511)

Main subtask of [DSIP-28](https://cwiki.apache.org/confluence/display/DORIS/DSIP-028%3A+Suppot+MySQL+Load+Data)

## Problem summary
Support mysql load syntax as below: 
```sql
LOAD DATA
    [LOCAL]
    INFILE 'file_name'
    INTO TABLE tbl_name
    [PARTITION (partition_name [, partition_name] ...)]
    [COLUMNS TERMINATED BY 'string']
    [LINES TERMINATED BY 'string']
    [IGNORE number {LINES | ROWS}]
    [(col_name_or_user_var [, col_name_or_user_var] ...)]
    [SET (col_name={expr | DEFAULT} [, col_name={expr | DEFAULT}] ...)]
    [PROPERTIES (key1 = value1 [, key2=value2]) ]
```

For example, 
```sql
            LOAD DATA 
            LOCAL
            INFILE 'local_test.file'
            INTO TABLE db1.table1
            PARTITION (partition_a, partition_b, partition_c, partition_d)
            COLUMNS TERMINATED BY '\t'
            (k1, k2, v2, v10, v11)
            set (c1=k1,c2=k2,c3=v10,c4=v11)
            PROPERTIES ("auth" = "root:", "strict_mode"="true")
```

Note that in this pr the property named `auth` must be set since stream load need auth. I will optimize it later.
This commit is contained in:
huangzhaowei
2023-01-29 14:44:59 +08:00
committed by GitHub
parent c9f66250a8
commit c6bc0a03a4
29 changed files with 1090 additions and 29 deletions

View File

@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.io;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ByteBufferNetworkInputStream extends InputStream {
private ArrayBlockingQueue<ByteArrayInputStream> queue;
private ByteArrayInputStream currentInputStream;
private volatile boolean finished = false;
private volatile boolean closed = false;
public ByteBufferNetworkInputStream() {
this(32);
}
public ByteBufferNetworkInputStream(int capacity) {
this.queue = new ArrayBlockingQueue<>(capacity);
}
public void fillByteBuffer(ByteBuffer buffer) throws IOException, InterruptedException {
if (closed) {
throw new IOException("Stream is already closed.");
}
ByteArrayInputStream inputStream = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
queue.offer(inputStream, 300, TimeUnit.SECONDS);
}
public void markFinished() {
this.finished = true;
}
private ByteArrayInputStream getNextByteArrayStream() throws IOException {
if (currentInputStream == null || currentInputStream.available() == 0) {
// No any byte array stream will come
while (!finished || !queue.isEmpty()) {
try {
currentInputStream = queue.poll(1, TimeUnit.SECONDS);
if (currentInputStream != null) {
return currentInputStream;
}
} catch (InterruptedException e) {
throw new IOException("Failed to get next stream");
}
}
return null;
}
return currentInputStream;
}
@Override
public int read() throws IOException {
ByteArrayInputStream stream = getNextByteArrayStream();
if (stream == null) {
return -1;
}
return stream.read();
}
public int read(byte[] b, int off, int len) throws IOException {
ByteArrayInputStream stream = getNextByteArrayStream();
if (stream == null) {
return -1;
}
return stream.read(b, off, len);
}
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
public void close() throws IOException {
closed = true;
ByteArrayInputStream stream = getNextByteArrayStream();
if (stream == null) {
return;
}
stream.close();
while (!queue.isEmpty()) {
queue.poll().close();
}
}
}

View File

@ -0,0 +1,122 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.io;
import org.junit.Assert;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
public class ByteBufferNetworkInputStreamTest {
@Test
public void testMultiByteBuffer() throws IOException, InterruptedException {
ByteBufferNetworkInputStream inputStream = new ByteBufferNetworkInputStream(2);
inputStream.fillByteBuffer(ByteBuffer.wrap("1\t2\n".getBytes()));
inputStream.fillByteBuffer(ByteBuffer.wrap("2\t3\n".getBytes()));
inputStream.markFinished();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
Assert.assertEquals(bufferedReader.readLine(), "1\t2");
Assert.assertEquals(bufferedReader.readLine(), "2\t3");
Assert.assertNull(bufferedReader.readLine());
bufferedReader.close();
}
@Test
public void testMultiThreadByteBuffer() throws IOException, InterruptedException {
int num = 5;
ByteBufferNetworkInputStream inputStream = new ByteBufferNetworkInputStream(2);
Thread thread1 = new Thread(() -> {
try {
for (int i = 0; i < num; i++) {
inputStream.fillByteBuffer(ByteBuffer.wrap(String.format("%d\t%d\n", i, i + 1).getBytes()));
Thread.sleep(500);
}
inputStream.markFinished();
} catch (Exception e) {
e.printStackTrace();
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
int count = 0;
String line = bufferedReader.readLine();
while (line != null) {
Assert.assertEquals(line, String.format("%d\t%d", count, count + 1));
count++;
line = bufferedReader.readLine();
}
Assert.assertEquals(count, num);
} catch (Exception e) {
e.printStackTrace();
}
});
thread2.start();
thread2.join();
Assert.assertFalse(thread1.isAlive());
inputStream.close();
}
@Test
public void testMultiThreadByteBuffer2() throws IOException, InterruptedException {
int num = 5;
ByteBufferNetworkInputStream inputStream = new ByteBufferNetworkInputStream(2);
Thread thread1 = new Thread(() -> {
try {
for (int i = 0; i < num; i++) {
inputStream.fillByteBuffer(ByteBuffer.wrap(String.format("%d\t%d\n", i, i + 1).getBytes()));
}
inputStream.markFinished();
} catch (Exception e) {
e.printStackTrace();
}
});
thread1.start();
Thread thread2 = new Thread(() -> {
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
int count = 0;
String line = bufferedReader.readLine();
while (line != null) {
Assert.assertEquals(line, String.format("%d\t%d", count, count + 1));
count++;
Thread.sleep(500);
line = bufferedReader.readLine();
}
Assert.assertEquals(count, num);
} catch (Exception e) {
e.printStackTrace();
}
});
thread2.start();
thread2.join();
Assert.assertFalse(thread1.isAlive());
inputStream.close();
}
}

View File

@ -620,7 +620,8 @@ terminal String
KW_HISTOGRAM,
KW_AUTO,
KW_PREPARE,
KW_EXECUTE;
KW_EXECUTE,
KW_LINES;
terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT, PLACEHOLDER;
terminal BITAND, BITOR, BITXOR, BITNOT;
@ -816,7 +817,7 @@ nonterminal List<String> string_list;
nonterminal List<Long> integer_list, cancel_rollup_job_id_list;
nonterminal AccessPrivilege privilege_type;
nonterminal DataDescription data_desc;
nonterminal DataDescription data_desc, mysql_data_desc;
nonterminal List<DataDescription> data_desc_list;
nonterminal LabelName job_label;
nonterminal String opt_with_label;
@ -827,7 +828,7 @@ nonterminal List<String> opt_col_list, opt_dup_keys, opt_columns_from_path;
nonterminal List<ColWithComment> opt_col_with_comment_list, col_with_comment_list;
nonterminal ColWithComment col_with_comment;
nonterminal List<Expr> opt_col_mapping_list;
nonterminal Separator opt_field_term, separator;
nonterminal Separator opt_field_term, opt_line_term, separator;
nonterminal String opt_user_role;
nonterminal TablePattern tbl_pattern;
nonterminal ResourcePattern resource_pattern;
@ -844,7 +845,7 @@ nonterminal ParseNode load_property;
nonterminal List<ParseNode> opt_load_property_list;
// Boolean
nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate;
nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate, opt_local;
nonterminal String opt_from_rollup, opt_to_rollup;
nonterminal ColumnPosition opt_col_pos;
@ -2244,6 +2245,10 @@ load_stmt ::=
{:
RESULT = new LoadStmt(label, dataDescList, resource, properties);
:}
| KW_LOAD mysql_data_desc:desc opt_properties:properties
{:
RESULT = new LoadStmt(desc, properties);
:}
;
job_label ::=
@ -2351,6 +2356,24 @@ data_desc ::=
:}
;
/* MySQL LOAD DATA Statement */
mysql_data_desc ::=
KW_DATA
opt_local:clientLocal
KW_INFILE STRING_LITERAL:file
KW_INTO KW_TABLE table_name:tableName
opt_partition_names:partitionNames
opt_field_term:colSep
opt_line_term:lineDelimiter
opt_col_list:colList
opt_col_mapping_list:colMappingList
opt_properties:properties
{:
RESULT = new DataDescription(tableName, partitionNames, file, clientLocal, colList, colSep, lineDelimiter,
colMappingList, properties);
:}
;
opt_negative ::=
{:
RESULT = false;
@ -2361,6 +2384,16 @@ opt_negative ::=
:}
;
opt_local ::=
{:
RESULT = false;
:}
| KW_LOCAL
{:
RESULT = true;
:}
;
opt_field_term ::=
/* Empty */
{:
@ -2372,6 +2405,17 @@ opt_field_term ::=
:}
;
opt_line_term ::=
/* Empty */
{:
RESULT = null;
:}
| KW_LINES KW_TERMINATED KW_BY STRING_LITERAL:sep
{:
RESULT = new Separator(sep);
:}
;
separator ::=
KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep
{:
@ -7011,6 +7055,8 @@ keyword ::=
{: RESULT = id; :}
| KW_MTMV:id
{: RESULT = id; :}
| KW_LINES:id
{: RESULT = id; :}
;
// Identifier that contain keyword

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -95,11 +96,14 @@ public class DataDescription {
"substitute");
private final String tableName;
private String dbName;
private final PartitionNames partitionNames;
private final List<String> filePaths;
private final Separator columnSeparator;
private String fileFormat;
private TFileCompressType compressType = TFileCompressType.UNKNOWN;
private boolean clientLocal = false;
private final boolean isNegative;
// column names in the path
private final List<String> columnsFromPath;
@ -146,6 +150,7 @@ public class DataDescription {
private final Expr deleteCondition;
private final Map<String, String> properties;
private boolean trimDoubleQuotes = false;
private boolean isMysqlLoad = false;
public DataDescription(String tableName,
PartitionNames partitionNames,
@ -219,6 +224,37 @@ public class DataDescription {
this.properties = properties;
}
// data desc for mysql client
public DataDescription(TableName tableName,
PartitionNames partitionNames,
String file,
boolean clientLocal,
List<String> columns,
Separator columnSeparator,
Separator lineDelimiter,
List<Expr> columnMappingList,
Map<String, String> properties) {
this.tableName = tableName.getTbl();
this.dbName = tableName.getDb();
this.partitionNames = partitionNames;
this.filePaths = Lists.newArrayList(file);
this.clientLocal = clientLocal;
this.fileFieldNames = columns;
this.columnSeparator = columnSeparator;
this.lineDelimiter = lineDelimiter;
this.fileFormat = null;
this.columnsFromPath = null;
this.isNegative = false;
this.columnMappingList = columnMappingList;
this.precedingFilterExpr = null;
this.whereExpr = null;
this.srcTableName = null;
this.mergeType = null;
this.deleteCondition = null;
this.properties = properties;
this.isMysqlLoad = true;
}
// For stream load using external file scan node.
public DataDescription(String tableName, LoadTaskInfo taskInfo) {
this.tableName = tableName;
@ -459,6 +495,10 @@ public class DataDescription {
}
}
public String getDbName() {
return dbName;
}
public String getTableName() {
return tableName;
}
@ -497,6 +537,13 @@ public class DataDescription {
return fileFieldNames;
}
public List<Expr> getColumnMappingList() {
if (columnMappingList == null || columnMappingList.isEmpty()) {
return null;
}
return columnMappingList;
}
public String getFileFormat() {
return fileFormat;
}
@ -631,6 +678,10 @@ public class DataDescription {
return isHadoopLoad;
}
public boolean isClientLocal() {
return clientLocal;
}
public String getSrcTableName() {
return srcTableName;
}
@ -647,6 +698,10 @@ public class DataDescription {
return trimDoubleQuotes;
}
public Map<String, String> getProperties() {
return properties;
}
/*
* Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList
* Example:
@ -846,7 +901,8 @@ public class DataDescription {
Map<String, String> analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
analysisMap.putAll(properties);
if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)) {
// If lineDelimiter had assigned, do not get it from properties again.
if (lineDelimiter == null && analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)) {
lineDelimiter = new Separator(analysisMap.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER));
lineDelimiter.analyze();
}
@ -896,6 +952,21 @@ public class DataDescription {
}
}
public String analyzeFullDbName(String labelDbName, Analyzer analyzer) throws AnalysisException {
if (Strings.isNullOrEmpty(labelDbName)) {
String dbName = Strings.isNullOrEmpty(getDbName()) ? analyzer.getDefaultDb() : getDbName();
this.dbName = dbName;
if (Strings.isNullOrEmpty(dbName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
return ClusterNamespace.getFullName(analyzer.getClusterName(), dbName);
} else {
// Get dbName from label.
this.dbName = ClusterNamespace.getNameFromFullName(labelDbName);
return labelDbName;
}
}
public void analyze(String fullDbName) throws AnalysisException {
if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) {
throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE.");
@ -927,6 +998,10 @@ public class DataDescription {
columnSeparator.analyze();
}
if (lineDelimiter != null) {
lineDelimiter.analyze();
}
if (partitionNames != null) {
partitionNames.analyze(null);
}
@ -985,7 +1060,10 @@ public class DataDescription {
public String toSql() {
StringBuilder sb = new StringBuilder();
if (isLoadFromTable()) {
if (isMysqlLoad) {
sb.append("DATA ").append(isClientLocal() ? "LOCAL " : "");
sb.append("INFILE '").append(filePaths.get(0)).append("'");
} else if (isLoadFromTable()) {
sb.append(mergeType.toString());
sb.append(" DATA FROM TABLE ").append(srcTableName);
} else {
@ -1001,7 +1079,7 @@ public class DataDescription {
if (isNegative) {
sb.append(" NEGATIVE");
}
sb.append(" INTO TABLE ").append(tableName);
sb.append(" INTO TABLE ").append(isMysqlLoad ? dbName + "." + tableName : tableName);
if (partitionNames != null) {
sb.append(" ");
sb.append(partitionNames.toSql());
@ -1009,6 +1087,9 @@ public class DataDescription {
if (columnSeparator != null) {
sb.append(" COLUMNS TERMINATED BY ").append(columnSeparator.toSql());
}
if (lineDelimiter != null && isMysqlLoad) {
sb.append(" LINES TERMINATED BY ").append(lineDelimiter.toSql());
}
if (fileFormat != null && !fileFormat.isEmpty()) {
sb.append(" FORMAT AS '" + fileFormat + "'");
}

View File

@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -124,6 +125,8 @@ public class LoadStmt extends DdlStmt {
private final Map<String, String> properties;
private String user;
private boolean isMysqlLoad = false;
private EtlJobType etlJobType = EtlJobType.UNKNOWN;
public static final ImmutableMap<String, Function> PROPERTIES_MAP = new ImmutableMap.Builder<String, Function>()
@ -189,6 +192,17 @@ public class LoadStmt extends DdlStmt {
})
.build();
public LoadStmt(DataDescription dataDescription, Map<String, String> properties) {
this.label = new LabelName();
this.dataDescriptions = Lists.newArrayList(dataDescription);
this.brokerDesc = null;
this.cluster = null;
this.resourceDesc = null;
this.properties = properties;
this.user = null;
this.isMysqlLoad = true;
}
public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
BrokerDesc brokerDesc, String cluster, Map<String, String> properties) {
this.label = label;
@ -326,7 +340,9 @@ public class LoadStmt extends DdlStmt {
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
label.analyze(analyzer);
if (!isMysqlLoad) {
label.analyze(analyzer);
}
if (dataDescriptions == null || dataDescriptions.isEmpty()) {
throw new AnalysisException("No data file in load statement.");
}
@ -335,15 +351,16 @@ public class LoadStmt extends DdlStmt {
// case 2: one hive table, one data description
boolean isLoadFromTable = false;
for (DataDescription dataDescription : dataDescriptions) {
if (brokerDesc == null && resourceDesc == null) {
if (brokerDesc == null && resourceDesc == null && !isMysqlLoad) {
dataDescription.setIsHadoopLoad(true);
}
dataDescription.analyze(label.getDbName());
String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
dataDescription.analyze(fullDbName);
if (dataDescription.isLoadFromTable()) {
isLoadFromTable = true;
}
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(label.getDbName());
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName);
OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName());
if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND
&& table.getKeysType() != KeysType.UNIQUE_KEYS) {
@ -370,6 +387,15 @@ public class LoadStmt extends DdlStmt {
}
}
// mysql load only have one data desc.
if (isMysqlLoad && !dataDescriptions.get(0).isClientLocal()) {
for (String path : dataDescriptions.get(0).getFilePaths()) {
if (!new File(path).exists()) {
throw new AnalysisException("Path: " + path + " is not exists.");
}
}
}
if (resourceDesc != null) {
resourceDesc.analyze();
etlJobType = resourceDesc.getEtlJobType();
@ -383,6 +409,8 @@ public class LoadStmt extends DdlStmt {
}
} else if (brokerDesc != null) {
etlJobType = EtlJobType.BROKER;
} else if (isMysqlLoad) {
etlJobType = EtlJobType.LOCAL_FILE;
} else {
// if cluster is null, use default hadoop cluster
// if cluster is not null, use this hadoop cluster

View File

@ -56,6 +56,8 @@ public enum ErrorCode {
+ " host '%s'"),
ERR_TABLEACCESS_DENIED_ERROR(1142, new byte[]{'4', '2', '0', '0', '0'}, "%s command denied to user '%s'@'%s' for "
+ "table '%s'"),
ERR_NOT_ALLOWED_COMMAND(1148, new byte[]{'4', '2', '0', '0', '0'}, "The used command is not allowed"
+ " with this MySQL version"),
ERR_WRONG_COLUMN_NAME(1166, new byte[]{'4', '2', '0', '0', '0'}, "Incorrect column name '%s'. Column regex is '%s'"),
ERR_UNKNOWN_SYSTEM_VARIABLE(1193, new byte[]{'H', 'Y', '0', '0', '0'}, "Unknown system variable '%s'"),
ERR_BAD_SLAVE(1200, new byte[]{'H', 'Y', '0', '0', '0'}, "The server is not configured as slave; fix in config "

View File

@ -24,5 +24,6 @@ public enum EtlJobType {
BROKER,
DELETE,
SPARK,
LOCAL_FILE,
UNKNOWN
}

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
public class LoadJobRowResult {
private long records;
private long deleted;
private int skipped;
private int warnings;
public LoadJobRowResult() {
this.records = 0;
this.deleted = 0;
this.skipped = 0;
this.warnings = 0;
}
public long getRecords() {
return records;
}
public void setRecords(long records) {
this.records = records;
}
public void incRecords(long records) {
this.records += records;
}
public int getSkipped() {
return skipped;
}
public void setSkipped(int skipped) {
this.skipped = skipped;
}
public void incSkipped(int skipped) {
this.skipped += skipped;
}
public int getWarnings() {
return warnings;
}
@Override
public String toString() {
return "Records: " + records + " Deleted: " + deleted + " Skipped: " + skipped + " Warnings: " + warnings;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
@ -41,7 +42,9 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;
@ -86,9 +89,11 @@ public class LoadManager implements Writable {
private LoadJobScheduler loadJobScheduler;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private MysqlLoadManager mysqlLoadManager;
public LoadManager(LoadJobScheduler loadJobScheduler) {
this.loadJobScheduler = loadJobScheduler;
this.mysqlLoadManager = new MysqlLoadManager();
}
/**
@ -151,6 +156,11 @@ public class LoadManager implements Writable {
}
}
public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt)
throws IOException, LoadException {
return mysqlLoadManager.executeMySqlLoadJobFromStmt(context, stmt);
}
public void replayCreateLoadJob(LoadJob loadJob) {
createLoadJob(loadJob);
LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", "replay create load job").build());

View File

@ -0,0 +1,244 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.io.ByteBufferNetworkInputStream;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.load.loadv2.LoadTask.MergeType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.google.common.base.Joiner;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
public class MysqlLoadManager {
private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class);
private final ThreadPoolExecutor mysqlLoadPool;
public MysqlLoadManager() {
this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, "Mysql Load", true);
}
public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt)
throws IOException, LoadException {
LoadJobRowResult loadResult = new LoadJobRowResult();
// Mysql data load only have one data desc
DataDescription dataDesc = stmt.getDataDescriptions().get(0);
String database = dataDesc.getDbName();
String table = dataDesc.getTableName();
List<String> filePaths = dataDesc.getFilePaths();
try (final CloseableHttpClient httpclient = HttpClients.createDefault()) {
for (String file : filePaths) {
InputStreamEntity entity = getInputStreamEntity(context, dataDesc.isClientLocal(), file);
HttpPut request = generateRequestForMySqlLoad(entity, dataDesc, database, table);
try (final CloseableHttpResponse response = httpclient.execute(request)) {
JSONObject result = JSON.parseObject(EntityUtils.toString(response.getEntity()));
if (!result.getString("Status").equalsIgnoreCase("Success")) {
LOG.warn("Execute stream load for mysql data load failed with message: " + request);
throw new LoadException(result.getString("Message"));
}
loadResult.incRecords(result.getLong("NumberLoadedRows"));
loadResult.incSkipped(result.getIntValue("NumberFilteredRows"));
}
}
}
return loadResult;
}
private InputStreamEntity getInputStreamEntity(ConnectContext context, boolean isClientLocal, String file)
throws IOException {
InputStream inputStream;
if (isClientLocal) {
// mysql client will check the file exist.
replyClientForReadFile(context, file);
inputStream = new ByteBufferNetworkInputStream();
fillByteBufferAsync(context, (ByteBufferNetworkInputStream) inputStream);
} else {
// server side file had already check after analyze.
inputStream = Files.newInputStream(Paths.get(file));
}
return new InputStreamEntity(inputStream, -1, ContentType.TEXT_PLAIN);
}
private void replyClientForReadFile(ConnectContext context, String path) throws IOException {
context.getSerializer().reset();
context.getSerializer().writeByte((byte) 0xfb);
context.getSerializer().writeEofString(path);
context.getMysqlChannel().sendAndFlush(context.getSerializer().toByteBuffer());
}
private void fillByteBufferAsync(ConnectContext context, ByteBufferNetworkInputStream inputStream) {
mysqlLoadPool.submit(() -> {
ByteBuffer buffer = null;
try {
buffer = context.getMysqlChannel().fetchOnePacket();
// MySql client will send an empty packet when eof
while (buffer != null && buffer.limit() != 0) {
inputStream.fillByteBuffer(buffer);
buffer = context.getMysqlChannel().fetchOnePacket();
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
inputStream.markFinished();
}
});
}
// public only for test
public HttpPut generateRequestForMySqlLoad(
InputStreamEntity entity,
DataDescription desc,
String database,
String table) throws LoadException {
final HttpPut httpPut = new HttpPut(selectBackendForMySqlLoad(database, table));
httpPut.addHeader("Expect", "100-continue");
httpPut.addHeader("Content-Type", "text/plain");
Map<String, String> props = desc.getProperties();
if (props != null) {
// auth
if (!props.containsKey("auth")) {
throw new LoadException("Must have auth(user:password) in properties.");
}
// TODO: use token to send request to avoid double auth.
String auth = props.get("auth");
String base64Auth = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
httpPut.addHeader("Authorization", "Basic " + base64Auth);
// max_filter_ratio
if (props.containsKey(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO)) {
String maxFilterRatio = props.get(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO);
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO, maxFilterRatio);
}
// exec_mem_limit
if (props.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
String memory = props.get(LoadStmt.EXEC_MEM_LIMIT);
httpPut.addHeader(LoadStmt.EXEC_MEM_LIMIT, memory);
}
// strict_mode
if (props.containsKey(LoadStmt.STRICT_MODE)) {
String strictMode = props.get(LoadStmt.STRICT_MODE);
httpPut.addHeader(LoadStmt.STRICT_MODE, strictMode);
}
}
// column_separator
if (desc.getColumnSeparator() != null) {
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR, desc.getColumnSeparator());
}
// line_delimiter
if (desc.getLineDelimiter() != null) {
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER, desc.getLineDelimiter());
}
// merge_type
if (!desc.getMergeType().equals(MergeType.APPEND)) {
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_MERGE_TYPE, desc.getMergeType().name());
}
// columns
if (desc.getFileFieldNames() != null) {
List<String> fields = desc.getFileFieldNames();
StringBuilder fieldString = new StringBuilder();
fieldString.append(Joiner.on(",").join(fields));
if (desc.getColumnMappingList() != null) {
fieldString.append(",");
List<String> mappings = new ArrayList<>();
for (Expr expr : desc.getColumnMappingList()) {
mappings.add(expr.toSql().replaceAll("`", ""));
}
fieldString.append(Joiner.on(",").join(mappings));
}
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_COLUMNS, fieldString.toString());
}
// partitions
if (desc.getPartitionNames() != null && !desc.getPartitionNames().getPartitionNames().isEmpty()) {
List<String> ps = desc.getPartitionNames().getPartitionNames();
String pNames = Joiner.on(",").join(ps);
if (desc.getPartitionNames().isTemp()) {
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS, pNames);
} else {
httpPut.addHeader(LoadStmt.KEY_IN_PARAM_PARTITIONS, pNames);
}
}
httpPut.setEntity(entity);
return httpPut;
}
private String selectBackendForMySqlLoad(String database, String table) throws LoadException {
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
Backend backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
StringBuilder sb = new StringBuilder();
sb.append("http://");
sb.append(backend.getHost());
sb.append(":");
sb.append(backend.getHttpPort());
sb.append("/api/");
sb.append(database);
sb.append("/");
sb.append(table);
sb.append("/_stream_load");
return sb.toString();
}
}

View File

@ -74,7 +74,7 @@ public class MysqlCapability {
private static final int DEFAULT_FLAGS = Flag.CLIENT_PROTOCOL_41.getFlagBit()
| Flag.CLIENT_CONNECT_WITH_DB.getFlagBit() | Flag.CLIENT_SECURE_CONNECTION.getFlagBit()
| Flag.CLIENT_PLUGIN_AUTH.getFlagBit();
| Flag.CLIENT_PLUGIN_AUTH.getFlagBit() | Flag.CLIENT_LOCAL_FILES.getFlagBit();
public static final MysqlCapability DEFAULT_CAPABILITY = new MysqlCapability(DEFAULT_FLAGS);
private int flags;
@ -141,6 +141,10 @@ public class MysqlCapability {
return (flags & Flag.CLIENT_SESSION_TRACK.getFlagBit()) != 0;
}
public boolean supportClientLocalFile() {
return (flags & Flag.CLIENT_LOCAL_FILES.getFlagBit()) != 0;
}
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof MysqlCapability)) {

View File

@ -90,7 +90,6 @@ import org.apache.doris.analysis.DropUserStmt;
import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.analysis.LinkDbStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.MigrateDbStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
import org.apache.doris.analysis.PauseSyncJobStmt;
@ -115,7 +114,6 @@ import org.apache.doris.analysis.UpdateStmt;
import org.apache.doris.catalog.EncryptKeyHelper;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.sync.SyncJobManager;
import org.apache.doris.statistics.StatisticsRepository;
@ -170,17 +168,6 @@ public class DdlExecutor {
env.alterView((AlterViewStmt) ddlStmt);
} else if (ddlStmt instanceof CancelAlterTableStmt) {
env.cancelAlter((CancelAlterTableStmt) ddlStmt);
} else if (ddlStmt instanceof LoadStmt) {
LoadStmt loadStmt = (LoadStmt) ddlStmt;
EtlJobType jobType = loadStmt.getEtlJobType();
if (jobType == EtlJobType.UNKNOWN) {
throw new DdlException("Unknown load job type");
}
if (jobType == EtlJobType.HADOOP) {
throw new DdlException("Load job by hadoop cluster is disabled."
+ " Try using broker load. See 'help broker load;'");
}
env.getLoadManager().createLoadJobFromStmt(loadStmt);
} else if (ddlStmt instanceof CancelExportStmt) {
env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt);
} else if (ddlStmt instanceof CancelLoadStmt) {

View File

@ -32,6 +32,7 @@ import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.LockTablesStmt;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.OutFileClause;
@ -89,6 +90,8 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlEofPacket;
@ -582,6 +585,8 @@ public class StmtExecutor implements ProfileWriter {
// the transaction of this insert may already begin, we will abort it at outer finally block.
throw t;
}
} else if (parsedStmt instanceof LoadStmt) {
handleLoadStmt();
} else if (parsedStmt instanceof DdlStmt) {
handleDdlStmt();
} else if (parsedStmt instanceof ShowStmt) {
@ -1836,6 +1841,42 @@ public class StmtExecutor implements ProfileWriter {
context.getState().setEof();
}
private void handleLoadStmt() {
try {
LoadStmt loadStmt = (LoadStmt) parsedStmt;
EtlJobType jobType = loadStmt.getEtlJobType();
if (jobType == EtlJobType.UNKNOWN) {
throw new DdlException("Unknown load job type");
}
if (jobType == EtlJobType.HADOOP) {
throw new DdlException("Load job by hadoop cluster is disabled."
+ " Try using broker load. See 'help broker load;'");
}
LoadManager loadManager = context.getEnv().getLoadManager();
if (jobType == EtlJobType.LOCAL_FILE) {
if (!context.getCapability().supportClientLocalFile()) {
context.getState().setError(ErrorCode.ERR_NOT_ALLOWED_COMMAND, "This client is not support"
+ " to load client local file.");
return;
}
LoadJobRowResult submitResult = loadManager.executeMySqlLoadJobFromStmt(context, loadStmt);
context.getState().setOk(submitResult.getRecords(), submitResult.getWarnings(),
submitResult.toString());
} else {
loadManager.createLoadJobFromStmt(loadStmt);
context.getState().setOk();
}
} catch (UserException e) {
// Return message to info client what happened.
LOG.debug("DDL statement({}) process failed.", originStmt.originStmt, e);
context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
} catch (Exception e) {
// Maybe our bug
LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e);
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
}
}
private void handleDdlStmt() {
try {
DdlExecutor.execute(context.getEnv(), (DdlStmt) parsedStmt);

View File

@ -482,6 +482,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("auto", new Integer(SqlParserSymbols.KW_AUTO));
keywordMap.put("prepare", new Integer(SqlParserSymbols.KW_PREPARE));
keywordMap.put("execute", new Integer(SqlParserSymbols.KW_EXECUTE));
keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES));
}
// map from token id to token description

View File

@ -372,4 +372,45 @@ public class DataDescriptionTest {
};
desc.analyze("testDb");
}
@Test
public void testMysqlLoadData() throws AnalysisException {
TableName tbl = new TableName(null, "testDb", "testTable");
List<Expr> params = Lists.newArrayList();
params.add(new StringLiteral("day"));
params.add(new SlotRef(null, "k2"));
BinaryPredicate predicate =
new BinaryPredicate(Operator.EQ, new SlotRef(null, "k1"), new FunctionCallExpr("bitmap_dict", params));
Map<String, String> properties = Maps.newHashMap();
properties.put("line_delimiter", "abc");
DataDescription desc =
new DataDescription(tbl, new PartitionNames(false, Lists.newArrayList("p1", "p2")), "abc.txt", true,
Lists.newArrayList("k1", "k2", "v1"), new Separator("010203"), new Separator("040506"),
Lists.newArrayList(predicate), properties);
String db = desc.analyzeFullDbName(null, analyzer);
Assert.assertEquals("default_cluster:testDb", db);
Assert.assertEquals("testDb", desc.getDbName());
db = desc.analyzeFullDbName("testCluster:testDb1", analyzer);
Assert.assertEquals("testCluster:testDb1", db);
Assert.assertEquals("testDb1", desc.getDbName());
desc.analyze("testDb1");
Assert.assertEquals(1, desc.getFilePaths().size());
Assert.assertEquals("abc.txt", desc.getFilePaths().get(0));
Assert.assertEquals(2, desc.getPartitionNames().getPartitionNames().size());
Assert.assertEquals("p1", desc.getPartitionNames().getPartitionNames().get(0));
Assert.assertEquals("p2", desc.getPartitionNames().getPartitionNames().get(1));
Assert.assertEquals("040506", desc.getLineDelimiter());
Assert.assertEquals("010203", desc.getColumnSeparator());
String sql = "DATA LOCAL INFILE 'abc.txt' "
+ "INTO TABLE testDb1.testTable "
+ "PARTITIONS (p1, p2) "
+ "COLUMNS TERMINATED BY '010203' "
+ "LINES TERMINATED BY '040506' "
+ "(k1, k2, v1) "
+ "SET (`k1` = bitmap_dict('day', `k2`))";
Assert.assertEquals(sql, desc.toSql());
}
}

View File

@ -32,6 +32,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.LoadTaskInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
@ -39,6 +40,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
@ -93,6 +96,9 @@ public class LoadStmtTest {
desc.getTableName();
minTimes = 0;
result = "testTbl";
desc.analyzeFullDbName("testCluster:testDb", (Analyzer) any);
minTimes = 0;
result = "testCluster:testDb";
env.getResourceMgr();
result = resourceMgr;
resourceMgr.getResource(resourceName);
@ -182,4 +188,41 @@ public class LoadStmtTest {
return ((ImportColumnsStmt) SqlParserUtils.getFirstStmt(
new SqlParser(new SqlScanner(new StringReader(columnsSQL))))).getColumns();
}
@Test
public void testMySqlLoadData(@Injectable DataDescription desc) throws UserException, IOException {
File temp = File.createTempFile("testMySqlLoadData", ".txt");
new Expectations() {
{
desc.isClientLocal();
minTimes = 0;
result = false;
desc.getFilePaths();
minTimes = 0;
result = Lists.newArrayList(temp.getPath());
desc.toSql();
minTimes = 0;
result = "XXX";
desc.getTableName();
minTimes = 0;
result = "testTbl";
desc.analyzeFullDbName(null, (Analyzer) any);
minTimes = 0;
result = "testCluster:testDb";
desc.getMergeType();
minTimes = 0;
result = LoadTask.MergeType.APPEND;
}
};
LoadStmt stmt = new LoadStmt(desc, Maps.newHashMap());
stmt.analyze(analyzer);
Assert.assertNull(stmt.getLabel().getDbName());
Assert.assertEquals(EtlJobType.LOCAL_FILE, stmt.getEtlJobType());
}
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.junit.Assert;
import org.junit.Test;
public class LoadJobRowResultTest {
@Test
public void testResult() {
LoadJobRowResult result = new LoadJobRowResult();
Assert.assertEquals("Records: 0 Deleted: 0 Skipped: 0 Warnings: 0", result.toString());
result.setRecords(199);
Assert.assertEquals("Records: 199 Deleted: 0 Skipped: 0 Warnings: 0", result.toString());
result.incRecords(1);
result.setSkipped(20);
Assert.assertEquals("Records: 200 Deleted: 0 Skipped: 20 Warnings: 0", result.toString());
result.incSkipped(20);
Assert.assertEquals("Records: 200 Deleted: 0 Skipped: 40 Warnings: 0", result.toString());
Assert.assertEquals(200, result.getRecords());
Assert.assertEquals(40, result.getSkipped());
Assert.assertEquals(0, result.getWarnings());
}
}

View File

@ -39,4 +39,13 @@ public class MysqlCapabilityTest {
+ " | CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS | CLIENT_SESSION_TRACK | CLIENT_DEPRECATE_EOF",
capability.toString());
}
@Test
public void testDefaultFlags() {
MysqlCapability capability = MysqlCapability.DEFAULT_CAPABILITY;
Assert.assertEquals("CLIENT_CONNECT_WITH_DB | CLIENT_LOCAL_FILES | CLIENT_PROTOCOL_41"
+ " | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH",
capability.toString());
Assert.assertTrue(capability.supportClientLocalFile());
}
}

View File

@ -23,7 +23,8 @@ defaultDb = "regression_test"
// add useLocalSessionState so that the jdbc will not send
// init cmd like: select @@session.tx_read_only
// at each time we connect.
jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true"
// add allowLoadLocalInfile so that the jdbc can execute mysql load data from client.
jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
jdbcUser = "root"
jdbcPassword = ""

View File

@ -0,0 +1 @@
1|aaweizuo2|bbweizuo3|cc
1 1 aaweizuo2 bbweizuo3 cc

View File

@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
-2 -51 \N 1 \N \N \N \N \N \N \N 2 \N \N
-2 -50 \N 1 \N \N \N \N \N \N \N \N j \N
-- !sql1 --
2019 9 9 9 7.700 a 2019-09-09 1970-01-01T08:33:39 k7 9.0 9.0

View File

@ -0,0 +1,2 @@
-2 -50 1 \N j
-2 -51 1 2 \N
1 -2 -50 1 \N j
2 -2 -51 1 2 \N

View File

@ -0,0 +1,2 @@
ab -50 1 \N j
-2 -51 1 2 \N
1 ab -50 1 \N j
2 -2 -51 1 2 \N

View File

@ -0,0 +1 @@
2019-09-09T10:10:10

View File

@ -267,7 +267,7 @@ class Config {
if (config.jdbcUrl == null) {
//jdbcUrl needs parameter here. Refer to function: buildUrl(String dbName)
config.jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true"
config.jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true"
log.info("Set jdbcUrl to '${config.jdbcUrl}' because not specify.".toString())
}

View File

@ -322,6 +322,20 @@ class Suite implements GroovyInterceptable {
return remotePath;
}
String getLoalFilePath(String fileName) {
if (!new File(fileName).isAbsolute()) {
fileName = new File(context.dataPath, fileName).getAbsolutePath()
}
def file = new File(fileName)
if (!file.exists()) {
log.warn("Stream load input file not exists: ${file}".toString())
throw new IllegalStateException("Stream load input file not exists: ${file}");
}
def localFile = file.canonicalPath
log.info("Set stream load input: ${file.canonicalPath}".toString())
return localFile;
}
boolean enableBrokerLoad() {
String enableBrokerLoad = context.config.otherConfigs.get("enableBrokerLoad");
return (enableBrokerLoad != null && enableBrokerLoad.equals("true"));

View File

@ -20,7 +20,7 @@
// **Note**: default db will be create if not exist
defaultDb = "regression_test"
jdbcUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true"
jdbcUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLoadLocalInfile=true"
jdbcUser = "root"
jdbcPassword = ""

View File

@ -20,7 +20,7 @@
// **Note**: default db will be create if not exist
defaultDb = "regression_test"
jdbcUrl = "jdbc:mysql://172.19.0.2:9132/?useLocalSessionState=true"
jdbcUrl = "jdbc:mysql://172.19.0.2:9132/?useLocalSessionState=true&allowLoadLocalInfile=true"
jdbcUser = "root"
jdbcPassword = ""

View File

@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_mysql_load", "p0") {
sql "show tables"
def tableName = "test_mysql_load_strict"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`k1` bigint(20) NULL,
`k2` bigint(20) NULL,
`v1` tinyint(4) SUM NULL,
`v2` tinyint(4) REPLACE NULL,
`v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
`v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
`v5` int(11) REPLACE_IF_NOT_NULL NULL,
`v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
`v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
`v8` datetime REPLACE_IF_NOT_NULL NULL,
`v9` date REPLACE_IF_NOT_NULL NULL,
`v10` char(10) REPLACE_IF_NOT_NULL NULL,
`v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
`v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
PARTITION BY RANGE(`k1`)
(PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")),
PARTITION partition_b VALUES [("100000"), ("1000000000")),
PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
"""
// strict mode success
def test_strict_mode_file = getLoalFilePath "test_strict_mode.csv"
sql """
LOAD DATA
LOCAL
INFILE '${test_strict_mode_file}'
INTO TABLE ${tableName}
PARTITION (partition_a, partition_b, partition_c, partition_d)
COLUMNS TERMINATED BY '\t'
(k1, k2, v2, v10, v11)
PROPERTIES ("auth" = "root:", "strict_mode"="true");
"""
sql "sync"
qt_sql "select * from ${tableName} order by k1, k2"
// strict mode failed
test {
def test_strict_mode_fail_file = getLoalFilePath "test_strict_mode_fail.csv"
sql """
LOAD DATA
LOCAL
INFILE '${test_strict_mode_fail_file}'
INTO TABLE ${tableName}
PARTITION (partition_a, partition_b, partition_c, partition_d)
COLUMNS TERMINATED BY '\t'
(k1, k2, v2, v10, v11)
PROPERTIES ("auth" = "root:", "strict_mode"="true");
"""
exception "errCode = 2, detailMessage = [INTERNAL_ERROR]too many filtered rows"
}
// test_line_delimiter
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`id` int(11) NULL,
`value` varchar(64) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
"""
def test_line_delimiter_file = getLoalFilePath "test_line_delimiter.csv"
sql """
LOAD DATA
LOCAL
INFILE '${test_line_delimiter_file}'
INTO TABLE ${tableName}
COLUMNS TERMINATED BY '|'
LINES TERMINATED BY 'weizuo'
(id, value)
PROPERTIES ("auth" = "root:");
"""
sql "sync"
rowCount = sql "select count(1) from ${tableName}"
assertEquals(3, rowCount[0][0])
// test load_nullable_to_not_nullable
def tableName2 = "load_nullable_to_not_nullable"
sql """ DROP TABLE IF EXISTS ${tableName2} """
sql """
CREATE TABLE IF NOT EXISTS `${tableName2}` (
k1 int(32) NOT NULL,
k2 smallint NOT NULL,
k3 int NOT NULL,
k4 bigint NOT NULL,
k5 decimal(9, 3) NOT NULL,
k6 char(5) NOT NULL,
k10 date NOT NULL,
k11 datetime NOT NULL,
k7 varchar(20) NOT NULL,
k8 double max NOT NULL,
k9 float sum NOT NULL )
AGGREGATE KEY(k1,k2,k3,k4,k5,k6,k10,k11,k7)
PARTITION BY RANGE(k2) (
PARTITION partition_a VALUES LESS THAN MAXVALUE
)
DISTRIBUTED BY HASH(k1, k2, k5)
BUCKETS 3
PROPERTIES ( "replication_allocation" = "tag.location.default: 1");
"""
def test_time_local = getLoalFilePath "test_time.data"
sql """
LOAD DATA
LOCAL
INFILE '${test_time_local}'
INTO TABLE ${tableName2}
COLUMNS TERMINATED BY '\t'
(col)
SET (k1=year(col),k2=month(col),k3=month(col),k4=day(col),k5=7.7,k6="a",k10=date(col),k11=FROM_UNIXTIME(2019,"%Y-%m-%dT%H:%i:%s"),k7="k7",k8=month(col),k9=day(col))
PROPERTIES ("auth" = "root:");
"""
order_qt_sql1 " SELECT * FROM ${tableName2}"
}