From c6bc0a03a4afd2c7d91d8ca56578a104fa0fcacd Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Sun, 29 Jan 2023 14:44:59 +0800 Subject: [PATCH] [feature](Load)Suppot MySQL Load Data (#15511) Main subtask of [DSIP-28](https://cwiki.apache.org/confluence/display/DORIS/DSIP-028%3A+Suppot+MySQL+Load+Data) ## Problem summary Support mysql load syntax as below: ```sql LOAD DATA [LOCAL] INFILE 'file_name' INTO TABLE tbl_name [PARTITION (partition_name [, partition_name] ...)] [COLUMNS TERMINATED BY 'string'] [LINES TERMINATED BY 'string'] [IGNORE number {LINES | ROWS}] [(col_name_or_user_var [, col_name_or_user_var] ...)] [SET (col_name={expr | DEFAULT} [, col_name={expr | DEFAULT}] ...)] [PROPERTIES (key1 = value1 [, key2=value2]) ] ``` For example, ```sql LOAD DATA LOCAL INFILE 'local_test.file' INTO TABLE db1.table1 PARTITION (partition_a, partition_b, partition_c, partition_d) COLUMNS TERMINATED BY '\t' (k1, k2, v2, v10, v11) set (c1=k1,c2=k2,c3=v10,c4=v11) PROPERTIES ("auth" = "root:", "strict_mode"="true") ``` Note that in this pr the property named `auth` must be set since stream load need auth. I will optimize it later. --- .../io/ByteBufferNetworkInputStream.java | 105 ++++++++ .../io/ByteBufferNetworkInputStreamTest.java | 122 +++++++++ fe/fe-core/src/main/cup/sql_parser.cup | 54 +++- .../doris/analysis/DataDescription.java | 87 ++++++- .../org/apache/doris/analysis/LoadStmt.java | 36 ++- .../org/apache/doris/common/ErrorCode.java | 2 + .../org/apache/doris/load/EtlJobType.java | 1 + .../apache/doris/load/LoadJobRowResult.java | 65 +++++ .../apache/doris/load/loadv2/LoadManager.java | 10 + .../doris/load/loadv2/MysqlLoadManager.java | 244 ++++++++++++++++++ .../apache/doris/mysql/MysqlCapability.java | 6 +- .../java/org/apache/doris/qe/DdlExecutor.java | 13 - .../org/apache/doris/qe/StmtExecutor.java | 41 +++ fe/fe-core/src/main/jflex/sql_scanner.flex | 1 + .../doris/analysis/DataDescriptionTest.java | 41 +++ .../apache/doris/analysis/LoadStmtTest.java | 43 +++ .../doris/load/LoadJobRowResultTest.java | 40 +++ .../doris/mysql/MysqlCapabilityTest.java | 9 + regression-test/conf/regression-conf.groovy | 3 +- .../mysql_load/test_line_delimiter.csv | 1 + .../load_p0/mysql_load/test_mysql_load.out | 8 + .../load_p0/mysql_load/test_strict_mode.csv | 2 + .../mysql_load/test_strict_mode_fail.csv | 2 + .../data/load_p0/mysql_load/test_time.data | 1 + .../org/apache/doris/regression/Config.groovy | 2 +- .../doris/regression/suite/Suite.groovy | 14 + .../pipeline/p0/conf/regression-conf.groovy | 2 +- .../pipeline/p1/conf/regression-conf.groovy | 2 +- .../load_p0/mysql_load/test_mysql_load.groovy | 162 ++++++++++++ 29 files changed, 1090 insertions(+), 29 deletions(-) create mode 100644 fe/fe-common/src/main/java/org/apache/doris/common/io/ByteBufferNetworkInputStream.java create mode 100644 fe/fe-common/src/test/java/org/apache/doris/common/io/ByteBufferNetworkInputStreamTest.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/LoadJobRowResult.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/load/LoadJobRowResultTest.java create mode 100644 regression-test/data/load_p0/mysql_load/test_line_delimiter.csv create mode 100644 regression-test/data/load_p0/mysql_load/test_mysql_load.out create mode 100644 regression-test/data/load_p0/mysql_load/test_strict_mode.csv create mode 100644 regression-test/data/load_p0/mysql_load/test_strict_mode_fail.csv create mode 100644 regression-test/data/load_p0/mysql_load/test_time.data create mode 100644 regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/io/ByteBufferNetworkInputStream.java b/fe/fe-common/src/main/java/org/apache/doris/common/io/ByteBufferNetworkInputStream.java new file mode 100644 index 0000000000..41bac05329 --- /dev/null +++ b/fe/fe-common/src/main/java/org/apache/doris/common/io/ByteBufferNetworkInputStream.java @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.io; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class ByteBufferNetworkInputStream extends InputStream { + private ArrayBlockingQueue queue; + private ByteArrayInputStream currentInputStream; + private volatile boolean finished = false; + private volatile boolean closed = false; + + public ByteBufferNetworkInputStream() { + this(32); + } + + public ByteBufferNetworkInputStream(int capacity) { + this.queue = new ArrayBlockingQueue<>(capacity); + } + + public void fillByteBuffer(ByteBuffer buffer) throws IOException, InterruptedException { + if (closed) { + throw new IOException("Stream is already closed."); + } + ByteArrayInputStream inputStream = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); + queue.offer(inputStream, 300, TimeUnit.SECONDS); + } + + public void markFinished() { + this.finished = true; + } + + private ByteArrayInputStream getNextByteArrayStream() throws IOException { + if (currentInputStream == null || currentInputStream.available() == 0) { + // No any byte array stream will come + while (!finished || !queue.isEmpty()) { + try { + currentInputStream = queue.poll(1, TimeUnit.SECONDS); + if (currentInputStream != null) { + return currentInputStream; + } + } catch (InterruptedException e) { + throw new IOException("Failed to get next stream"); + } + } + return null; + } + return currentInputStream; + } + + @Override + public int read() throws IOException { + ByteArrayInputStream stream = getNextByteArrayStream(); + if (stream == null) { + return -1; + } + + return stream.read(); + } + + public int read(byte[] b, int off, int len) throws IOException { + ByteArrayInputStream stream = getNextByteArrayStream(); + if (stream == null) { + return -1; + } + return stream.read(b, off, len); + } + + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + public void close() throws IOException { + closed = true; + ByteArrayInputStream stream = getNextByteArrayStream(); + if (stream == null) { + return; + } + stream.close(); + + while (!queue.isEmpty()) { + queue.poll().close(); + } + } +} diff --git a/fe/fe-common/src/test/java/org/apache/doris/common/io/ByteBufferNetworkInputStreamTest.java b/fe/fe-common/src/test/java/org/apache/doris/common/io/ByteBufferNetworkInputStreamTest.java new file mode 100644 index 0000000000..1480cc31f5 --- /dev/null +++ b/fe/fe-common/src/test/java/org/apache/doris/common/io/ByteBufferNetworkInputStreamTest.java @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.io; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.ByteBuffer; + +public class ByteBufferNetworkInputStreamTest { + @Test + public void testMultiByteBuffer() throws IOException, InterruptedException { + ByteBufferNetworkInputStream inputStream = new ByteBufferNetworkInputStream(2); + + inputStream.fillByteBuffer(ByteBuffer.wrap("1\t2\n".getBytes())); + inputStream.fillByteBuffer(ByteBuffer.wrap("2\t3\n".getBytes())); + inputStream.markFinished(); + + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + Assert.assertEquals(bufferedReader.readLine(), "1\t2"); + Assert.assertEquals(bufferedReader.readLine(), "2\t3"); + Assert.assertNull(bufferedReader.readLine()); + bufferedReader.close(); + } + + @Test + public void testMultiThreadByteBuffer() throws IOException, InterruptedException { + int num = 5; + ByteBufferNetworkInputStream inputStream = new ByteBufferNetworkInputStream(2); + Thread thread1 = new Thread(() -> { + try { + for (int i = 0; i < num; i++) { + inputStream.fillByteBuffer(ByteBuffer.wrap(String.format("%d\t%d\n", i, i + 1).getBytes())); + Thread.sleep(500); + } + inputStream.markFinished(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread1.start(); + + + Thread thread2 = new Thread(() -> { + try { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + int count = 0; + String line = bufferedReader.readLine(); + while (line != null) { + Assert.assertEquals(line, String.format("%d\t%d", count, count + 1)); + count++; + line = bufferedReader.readLine(); + } + Assert.assertEquals(count, num); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread2.start(); + thread2.join(); + Assert.assertFalse(thread1.isAlive()); + inputStream.close(); + } + + + @Test + public void testMultiThreadByteBuffer2() throws IOException, InterruptedException { + int num = 5; + ByteBufferNetworkInputStream inputStream = new ByteBufferNetworkInputStream(2); + Thread thread1 = new Thread(() -> { + try { + for (int i = 0; i < num; i++) { + inputStream.fillByteBuffer(ByteBuffer.wrap(String.format("%d\t%d\n", i, i + 1).getBytes())); + } + inputStream.markFinished(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread1.start(); + + + Thread thread2 = new Thread(() -> { + try { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + int count = 0; + String line = bufferedReader.readLine(); + while (line != null) { + Assert.assertEquals(line, String.format("%d\t%d", count, count + 1)); + count++; + Thread.sleep(500); + line = bufferedReader.readLine(); + } + Assert.assertEquals(count, num); + } catch (Exception e) { + e.printStackTrace(); + } + }); + thread2.start(); + thread2.join(); + Assert.assertFalse(thread1.isAlive()); + inputStream.close(); + } +} diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 48e8246dd6..c977a9b731 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -620,7 +620,8 @@ terminal String KW_HISTOGRAM, KW_AUTO, KW_PREPARE, - KW_EXECUTE; + KW_EXECUTE, + KW_LINES; terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT, PLACEHOLDER; terminal BITAND, BITOR, BITXOR, BITNOT; @@ -816,7 +817,7 @@ nonterminal List string_list; nonterminal List integer_list, cancel_rollup_job_id_list; nonterminal AccessPrivilege privilege_type; -nonterminal DataDescription data_desc; +nonterminal DataDescription data_desc, mysql_data_desc; nonterminal List data_desc_list; nonterminal LabelName job_label; nonterminal String opt_with_label; @@ -827,7 +828,7 @@ nonterminal List opt_col_list, opt_dup_keys, opt_columns_from_path; nonterminal List opt_col_with_comment_list, col_with_comment_list; nonterminal ColWithComment col_with_comment; nonterminal List opt_col_mapping_list; -nonterminal Separator opt_field_term, separator; +nonterminal Separator opt_field_term, opt_line_term, separator; nonterminal String opt_user_role; nonterminal TablePattern tbl_pattern; nonterminal ResourcePattern resource_pattern; @@ -844,7 +845,7 @@ nonterminal ParseNode load_property; nonterminal List opt_load_property_list; // Boolean -nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate; +nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate, opt_local; nonterminal String opt_from_rollup, opt_to_rollup; nonterminal ColumnPosition opt_col_pos; @@ -2244,6 +2245,10 @@ load_stmt ::= {: RESULT = new LoadStmt(label, dataDescList, resource, properties); :} + | KW_LOAD mysql_data_desc:desc opt_properties:properties + {: + RESULT = new LoadStmt(desc, properties); + :} ; job_label ::= @@ -2351,6 +2356,24 @@ data_desc ::= :} ; +/* MySQL LOAD DATA Statement */ +mysql_data_desc ::= + KW_DATA + opt_local:clientLocal + KW_INFILE STRING_LITERAL:file + KW_INTO KW_TABLE table_name:tableName + opt_partition_names:partitionNames + opt_field_term:colSep + opt_line_term:lineDelimiter + opt_col_list:colList + opt_col_mapping_list:colMappingList + opt_properties:properties + {: + RESULT = new DataDescription(tableName, partitionNames, file, clientLocal, colList, colSep, lineDelimiter, + colMappingList, properties); + :} + ; + opt_negative ::= {: RESULT = false; @@ -2361,6 +2384,16 @@ opt_negative ::= :} ; +opt_local ::= + {: + RESULT = false; + :} + | KW_LOCAL + {: + RESULT = true; + :} + ; + opt_field_term ::= /* Empty */ {: @@ -2372,6 +2405,17 @@ opt_field_term ::= :} ; +opt_line_term ::= + /* Empty */ + {: + RESULT = null; + :} + | KW_LINES KW_TERMINATED KW_BY STRING_LITERAL:sep + {: + RESULT = new Separator(sep); + :} + ; + separator ::= KW_COLUMNS KW_TERMINATED KW_BY STRING_LITERAL:sep {: @@ -7011,6 +7055,8 @@ keyword ::= {: RESULT = id; :} | KW_MTMV:id {: RESULT = id; :} + | KW_LINES:id + {: RESULT = id; :} ; // Identifier that contain keyword diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 72b494a89d..63e4c41fc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -95,11 +96,14 @@ public class DataDescription { "substitute"); private final String tableName; + + private String dbName; private final PartitionNames partitionNames; private final List filePaths; private final Separator columnSeparator; private String fileFormat; private TFileCompressType compressType = TFileCompressType.UNKNOWN; + private boolean clientLocal = false; private final boolean isNegative; // column names in the path private final List columnsFromPath; @@ -146,6 +150,7 @@ public class DataDescription { private final Expr deleteCondition; private final Map properties; private boolean trimDoubleQuotes = false; + private boolean isMysqlLoad = false; public DataDescription(String tableName, PartitionNames partitionNames, @@ -219,6 +224,37 @@ public class DataDescription { this.properties = properties; } + // data desc for mysql client + public DataDescription(TableName tableName, + PartitionNames partitionNames, + String file, + boolean clientLocal, + List columns, + Separator columnSeparator, + Separator lineDelimiter, + List columnMappingList, + Map properties) { + this.tableName = tableName.getTbl(); + this.dbName = tableName.getDb(); + this.partitionNames = partitionNames; + this.filePaths = Lists.newArrayList(file); + this.clientLocal = clientLocal; + this.fileFieldNames = columns; + this.columnSeparator = columnSeparator; + this.lineDelimiter = lineDelimiter; + this.fileFormat = null; + this.columnsFromPath = null; + this.isNegative = false; + this.columnMappingList = columnMappingList; + this.precedingFilterExpr = null; + this.whereExpr = null; + this.srcTableName = null; + this.mergeType = null; + this.deleteCondition = null; + this.properties = properties; + this.isMysqlLoad = true; + } + // For stream load using external file scan node. public DataDescription(String tableName, LoadTaskInfo taskInfo) { this.tableName = tableName; @@ -459,6 +495,10 @@ public class DataDescription { } } + public String getDbName() { + return dbName; + } + public String getTableName() { return tableName; } @@ -497,6 +537,13 @@ public class DataDescription { return fileFieldNames; } + public List getColumnMappingList() { + if (columnMappingList == null || columnMappingList.isEmpty()) { + return null; + } + return columnMappingList; + } + public String getFileFormat() { return fileFormat; } @@ -631,6 +678,10 @@ public class DataDescription { return isHadoopLoad; } + public boolean isClientLocal() { + return clientLocal; + } + public String getSrcTableName() { return srcTableName; } @@ -647,6 +698,10 @@ public class DataDescription { return trimDoubleQuotes; } + public Map getProperties() { + return properties; + } + /* * Analyze parsedExprMap and columnToHadoopFunction from columns, columns from path and columnMappingList * Example: @@ -846,7 +901,8 @@ public class DataDescription { Map analysisMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); analysisMap.putAll(properties); - if (analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)) { + // If lineDelimiter had assigned, do not get it from properties again. + if (lineDelimiter == null && analysisMap.containsKey(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)) { lineDelimiter = new Separator(analysisMap.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER)); lineDelimiter.analyze(); } @@ -896,6 +952,21 @@ public class DataDescription { } } + public String analyzeFullDbName(String labelDbName, Analyzer analyzer) throws AnalysisException { + if (Strings.isNullOrEmpty(labelDbName)) { + String dbName = Strings.isNullOrEmpty(getDbName()) ? analyzer.getDefaultDb() : getDbName(); + this.dbName = dbName; + if (Strings.isNullOrEmpty(dbName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + return ClusterNamespace.getFullName(analyzer.getClusterName(), dbName); + } else { + // Get dbName from label. + this.dbName = ClusterNamespace.getNameFromFullName(labelDbName); + return labelDbName; + } + } + public void analyze(String fullDbName) throws AnalysisException { if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) { throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE."); @@ -927,6 +998,10 @@ public class DataDescription { columnSeparator.analyze(); } + if (lineDelimiter != null) { + lineDelimiter.analyze(); + } + if (partitionNames != null) { partitionNames.analyze(null); } @@ -985,7 +1060,10 @@ public class DataDescription { public String toSql() { StringBuilder sb = new StringBuilder(); - if (isLoadFromTable()) { + if (isMysqlLoad) { + sb.append("DATA ").append(isClientLocal() ? "LOCAL " : ""); + sb.append("INFILE '").append(filePaths.get(0)).append("'"); + } else if (isLoadFromTable()) { sb.append(mergeType.toString()); sb.append(" DATA FROM TABLE ").append(srcTableName); } else { @@ -1001,7 +1079,7 @@ public class DataDescription { if (isNegative) { sb.append(" NEGATIVE"); } - sb.append(" INTO TABLE ").append(tableName); + sb.append(" INTO TABLE ").append(isMysqlLoad ? dbName + "." + tableName : tableName); if (partitionNames != null) { sb.append(" "); sb.append(partitionNames.toSql()); @@ -1009,6 +1087,9 @@ public class DataDescription { if (columnSeparator != null) { sb.append(" COLUMNS TERMINATED BY ").append(columnSeparator.toSql()); } + if (lineDelimiter != null && isMysqlLoad) { + sb.append(" LINES TERMINATED BY ").append(lineDelimiter.toSql()); + } if (fileFormat != null && !fileFormat.isEmpty()) { sb.append(" FORMAT AS '" + fileFormat + "'"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index 3f81da06ae..60d405eb86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; +import java.io.File; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -124,6 +125,8 @@ public class LoadStmt extends DdlStmt { private final Map properties; private String user; + private boolean isMysqlLoad = false; + private EtlJobType etlJobType = EtlJobType.UNKNOWN; public static final ImmutableMap PROPERTIES_MAP = new ImmutableMap.Builder() @@ -189,6 +192,17 @@ public class LoadStmt extends DdlStmt { }) .build(); + public LoadStmt(DataDescription dataDescription, Map properties) { + this.label = new LabelName(); + this.dataDescriptions = Lists.newArrayList(dataDescription); + this.brokerDesc = null; + this.cluster = null; + this.resourceDesc = null; + this.properties = properties; + this.user = null; + this.isMysqlLoad = true; + } + public LoadStmt(LabelName label, List dataDescriptions, BrokerDesc brokerDesc, String cluster, Map properties) { this.label = label; @@ -326,7 +340,9 @@ public class LoadStmt extends DdlStmt { @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - label.analyze(analyzer); + if (!isMysqlLoad) { + label.analyze(analyzer); + } if (dataDescriptions == null || dataDescriptions.isEmpty()) { throw new AnalysisException("No data file in load statement."); } @@ -335,15 +351,16 @@ public class LoadStmt extends DdlStmt { // case 2: one hive table, one data description boolean isLoadFromTable = false; for (DataDescription dataDescription : dataDescriptions) { - if (brokerDesc == null && resourceDesc == null) { + if (brokerDesc == null && resourceDesc == null && !isMysqlLoad) { dataDescription.setIsHadoopLoad(true); } - dataDescription.analyze(label.getDbName()); + String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer); + dataDescription.analyze(fullDbName); if (dataDescription.isLoadFromTable()) { isLoadFromTable = true; } - Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(label.getDbName()); + Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(fullDbName); OlapTable table = db.getOlapTableOrAnalysisException(dataDescription.getTableName()); if (dataDescription.getMergeType() != LoadTask.MergeType.APPEND && table.getKeysType() != KeysType.UNIQUE_KEYS) { @@ -370,6 +387,15 @@ public class LoadStmt extends DdlStmt { } } + // mysql load only have one data desc. + if (isMysqlLoad && !dataDescriptions.get(0).isClientLocal()) { + for (String path : dataDescriptions.get(0).getFilePaths()) { + if (!new File(path).exists()) { + throw new AnalysisException("Path: " + path + " is not exists."); + } + } + } + if (resourceDesc != null) { resourceDesc.analyze(); etlJobType = resourceDesc.getEtlJobType(); @@ -383,6 +409,8 @@ public class LoadStmt extends DdlStmt { } } else if (brokerDesc != null) { etlJobType = EtlJobType.BROKER; + } else if (isMysqlLoad) { + etlJobType = EtlJobType.LOCAL_FILE; } else { // if cluster is null, use default hadoop cluster // if cluster is not null, use this hadoop cluster diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java index 7337438994..b65433388b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java @@ -56,6 +56,8 @@ public enum ErrorCode { + " host '%s'"), ERR_TABLEACCESS_DENIED_ERROR(1142, new byte[]{'4', '2', '0', '0', '0'}, "%s command denied to user '%s'@'%s' for " + "table '%s'"), + ERR_NOT_ALLOWED_COMMAND(1148, new byte[]{'4', '2', '0', '0', '0'}, "The used command is not allowed" + + " with this MySQL version"), ERR_WRONG_COLUMN_NAME(1166, new byte[]{'4', '2', '0', '0', '0'}, "Incorrect column name '%s'. Column regex is '%s'"), ERR_UNKNOWN_SYSTEM_VARIABLE(1193, new byte[]{'H', 'Y', '0', '0', '0'}, "Unknown system variable '%s'"), ERR_BAD_SLAVE(1200, new byte[]{'H', 'Y', '0', '0', '0'}, "The server is not configured as slave; fix in config " diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java b/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java index 824ff39a9f..d00dc21b84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/EtlJobType.java @@ -24,5 +24,6 @@ public enum EtlJobType { BROKER, DELETE, SPARK, + LOCAL_FILE, UNKNOWN } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJobRowResult.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJobRowResult.java new file mode 100644 index 0000000000..25f3491dd0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJobRowResult.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +public class LoadJobRowResult { + private long records; + private long deleted; + private int skipped; + private int warnings; + + public LoadJobRowResult() { + this.records = 0; + this.deleted = 0; + this.skipped = 0; + this.warnings = 0; + } + + public long getRecords() { + return records; + } + + public void setRecords(long records) { + this.records = records; + } + + public void incRecords(long records) { + this.records += records; + } + + public int getSkipped() { + return skipped; + } + + public void setSkipped(int skipped) { + this.skipped = skipped; + } + + public void incSkipped(int skipped) { + this.skipped += skipped; + } + + public int getWarnings() { + return warnings; + } + + @Override + public String toString() { + return "Records: " + records + " Deleted: " + deleted + " Skipped: " + skipped + " Warnings: " + warnings; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index c95103a621..5949347f8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -30,6 +30,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; @@ -41,7 +42,9 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; +import org.apache.doris.load.LoadJobRowResult; import org.apache.doris.persist.CleanLabelOperationLog; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.DatabaseTransactionMgr; import org.apache.doris.transaction.TransactionState; @@ -86,9 +89,11 @@ public class LoadManager implements Writable { private LoadJobScheduler loadJobScheduler; private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private MysqlLoadManager mysqlLoadManager; public LoadManager(LoadJobScheduler loadJobScheduler) { this.loadJobScheduler = loadJobScheduler; + this.mysqlLoadManager = new MysqlLoadManager(); } /** @@ -151,6 +156,11 @@ public class LoadManager implements Writable { } } + public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt) + throws IOException, LoadException { + return mysqlLoadManager.executeMySqlLoadJobFromStmt(context, stmt); + } + public void replayCreateLoadJob(LoadJob loadJob) { createLoadJob(loadJob); LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", "replay create load job").build()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java new file mode 100644 index 0000000000..44d9aaf139 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.LoadException; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.io.ByteBufferNetworkInputStream; +import org.apache.doris.load.LoadJobRowResult; +import org.apache.doris.load.loadv2.LoadTask.MergeType; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.BeSelectionPolicy; +import org.apache.doris.system.SystemInfoService; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.google.common.base.Joiner; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; + +public class MysqlLoadManager { + private static final Logger LOG = LogManager.getLogger(MysqlLoadManager.class); + + private final ThreadPoolExecutor mysqlLoadPool; + + public MysqlLoadManager() { + this.mysqlLoadPool = ThreadPoolManager.newDaemonCacheThreadPool(4, "Mysql Load", true); + } + + public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, LoadStmt stmt) + throws IOException, LoadException { + LoadJobRowResult loadResult = new LoadJobRowResult(); + // Mysql data load only have one data desc + DataDescription dataDesc = stmt.getDataDescriptions().get(0); + String database = dataDesc.getDbName(); + String table = dataDesc.getTableName(); + List filePaths = dataDesc.getFilePaths(); + try (final CloseableHttpClient httpclient = HttpClients.createDefault()) { + for (String file : filePaths) { + InputStreamEntity entity = getInputStreamEntity(context, dataDesc.isClientLocal(), file); + HttpPut request = generateRequestForMySqlLoad(entity, dataDesc, database, table); + try (final CloseableHttpResponse response = httpclient.execute(request)) { + JSONObject result = JSON.parseObject(EntityUtils.toString(response.getEntity())); + if (!result.getString("Status").equalsIgnoreCase("Success")) { + LOG.warn("Execute stream load for mysql data load failed with message: " + request); + throw new LoadException(result.getString("Message")); + } + loadResult.incRecords(result.getLong("NumberLoadedRows")); + loadResult.incSkipped(result.getIntValue("NumberFilteredRows")); + } + } + } + return loadResult; + } + + private InputStreamEntity getInputStreamEntity(ConnectContext context, boolean isClientLocal, String file) + throws IOException { + InputStream inputStream; + if (isClientLocal) { + // mysql client will check the file exist. + replyClientForReadFile(context, file); + inputStream = new ByteBufferNetworkInputStream(); + fillByteBufferAsync(context, (ByteBufferNetworkInputStream) inputStream); + } else { + // server side file had already check after analyze. + inputStream = Files.newInputStream(Paths.get(file)); + } + return new InputStreamEntity(inputStream, -1, ContentType.TEXT_PLAIN); + } + + private void replyClientForReadFile(ConnectContext context, String path) throws IOException { + context.getSerializer().reset(); + context.getSerializer().writeByte((byte) 0xfb); + context.getSerializer().writeEofString(path); + context.getMysqlChannel().sendAndFlush(context.getSerializer().toByteBuffer()); + } + + private void fillByteBufferAsync(ConnectContext context, ByteBufferNetworkInputStream inputStream) { + mysqlLoadPool.submit(() -> { + ByteBuffer buffer = null; + try { + buffer = context.getMysqlChannel().fetchOnePacket(); + // MySql client will send an empty packet when eof + while (buffer != null && buffer.limit() != 0) { + inputStream.fillByteBuffer(buffer); + buffer = context.getMysqlChannel().fetchOnePacket(); + } + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } finally { + inputStream.markFinished(); + } + }); + } + + // public only for test + public HttpPut generateRequestForMySqlLoad( + InputStreamEntity entity, + DataDescription desc, + String database, + String table) throws LoadException { + final HttpPut httpPut = new HttpPut(selectBackendForMySqlLoad(database, table)); + + httpPut.addHeader("Expect", "100-continue"); + httpPut.addHeader("Content-Type", "text/plain"); + + Map props = desc.getProperties(); + if (props != null) { + // auth + if (!props.containsKey("auth")) { + throw new LoadException("Must have auth(user:password) in properties."); + } + // TODO: use token to send request to avoid double auth. + String auth = props.get("auth"); + String base64Auth = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8)); + httpPut.addHeader("Authorization", "Basic " + base64Auth); + + // max_filter_ratio + if (props.containsKey(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO)) { + String maxFilterRatio = props.get(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO); + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_MAX_FILTER_RATIO, maxFilterRatio); + } + + // exec_mem_limit + if (props.containsKey(LoadStmt.EXEC_MEM_LIMIT)) { + String memory = props.get(LoadStmt.EXEC_MEM_LIMIT); + httpPut.addHeader(LoadStmt.EXEC_MEM_LIMIT, memory); + } + + // strict_mode + if (props.containsKey(LoadStmt.STRICT_MODE)) { + String strictMode = props.get(LoadStmt.STRICT_MODE); + httpPut.addHeader(LoadStmt.STRICT_MODE, strictMode); + } + } + + // column_separator + if (desc.getColumnSeparator() != null) { + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR, desc.getColumnSeparator()); + } + + // line_delimiter + if (desc.getLineDelimiter() != null) { + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER, desc.getLineDelimiter()); + } + + // merge_type + if (!desc.getMergeType().equals(MergeType.APPEND)) { + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_MERGE_TYPE, desc.getMergeType().name()); + } + + // columns + if (desc.getFileFieldNames() != null) { + List fields = desc.getFileFieldNames(); + StringBuilder fieldString = new StringBuilder(); + fieldString.append(Joiner.on(",").join(fields)); + + if (desc.getColumnMappingList() != null) { + fieldString.append(","); + List mappings = new ArrayList<>(); + for (Expr expr : desc.getColumnMappingList()) { + mappings.add(expr.toSql().replaceAll("`", "")); + } + fieldString.append(Joiner.on(",").join(mappings)); + } + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_COLUMNS, fieldString.toString()); + } + + // partitions + if (desc.getPartitionNames() != null && !desc.getPartitionNames().getPartitionNames().isEmpty()) { + List ps = desc.getPartitionNames().getPartitionNames(); + String pNames = Joiner.on(",").join(ps); + if (desc.getPartitionNames().isTemp()) { + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS, pNames); + } else { + httpPut.addHeader(LoadStmt.KEY_IN_PARAM_PARTITIONS, pNames); + } + } + httpPut.setEntity(entity); + return httpPut; + } + + private String selectBackendForMySqlLoad(String database, String table) throws LoadException { + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build(); + List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (backendIds.isEmpty()) { + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + } + + Backend backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); + if (backend == null) { + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + } + StringBuilder sb = new StringBuilder(); + sb.append("http://"); + sb.append(backend.getHost()); + sb.append(":"); + sb.append(backend.getHttpPort()); + sb.append("/api/"); + sb.append(database); + sb.append("/"); + sb.append(table); + sb.append("/_stream_load"); + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCapability.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCapability.java index 3b4cb409e7..3984563ca9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCapability.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCapability.java @@ -74,7 +74,7 @@ public class MysqlCapability { private static final int DEFAULT_FLAGS = Flag.CLIENT_PROTOCOL_41.getFlagBit() | Flag.CLIENT_CONNECT_WITH_DB.getFlagBit() | Flag.CLIENT_SECURE_CONNECTION.getFlagBit() - | Flag.CLIENT_PLUGIN_AUTH.getFlagBit(); + | Flag.CLIENT_PLUGIN_AUTH.getFlagBit() | Flag.CLIENT_LOCAL_FILES.getFlagBit(); public static final MysqlCapability DEFAULT_CAPABILITY = new MysqlCapability(DEFAULT_FLAGS); private int flags; @@ -141,6 +141,10 @@ public class MysqlCapability { return (flags & Flag.CLIENT_SESSION_TRACK.getFlagBit()) != 0; } + public boolean supportClientLocalFile() { + return (flags & Flag.CLIENT_LOCAL_FILES.getFlagBit()) != 0; + } + @Override public boolean equals(Object obj) { if (obj == null || !(obj instanceof MysqlCapability)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index b10ae7043d..4d0e873643 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -90,7 +90,6 @@ import org.apache.doris.analysis.DropUserStmt; import org.apache.doris.analysis.GrantStmt; import org.apache.doris.analysis.InstallPluginStmt; import org.apache.doris.analysis.LinkDbStmt; -import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.MigrateDbStmt; import org.apache.doris.analysis.PauseRoutineLoadStmt; import org.apache.doris.analysis.PauseSyncJobStmt; @@ -115,7 +114,6 @@ import org.apache.doris.analysis.UpdateStmt; import org.apache.doris.catalog.EncryptKeyHelper; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; -import org.apache.doris.load.EtlJobType; import org.apache.doris.load.sync.SyncJobManager; import org.apache.doris.statistics.StatisticsRepository; @@ -170,17 +168,6 @@ public class DdlExecutor { env.alterView((AlterViewStmt) ddlStmt); } else if (ddlStmt instanceof CancelAlterTableStmt) { env.cancelAlter((CancelAlterTableStmt) ddlStmt); - } else if (ddlStmt instanceof LoadStmt) { - LoadStmt loadStmt = (LoadStmt) ddlStmt; - EtlJobType jobType = loadStmt.getEtlJobType(); - if (jobType == EtlJobType.UNKNOWN) { - throw new DdlException("Unknown load job type"); - } - if (jobType == EtlJobType.HADOOP) { - throw new DdlException("Load job by hadoop cluster is disabled." - + " Try using broker load. See 'help broker load;'"); - } - env.getLoadManager().createLoadJobFromStmt(loadStmt); } else if (ddlStmt instanceof CancelExportStmt) { env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt); } else if (ddlStmt instanceof CancelLoadStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index b14c162e4e..732062a78a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -32,6 +32,7 @@ import org.apache.doris.analysis.FloatLiteral; import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.KillStmt; import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.LockTablesStmt; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.OutFileClause; @@ -89,6 +90,8 @@ import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.load.EtlJobType; +import org.apache.doris.load.LoadJobRowResult; +import org.apache.doris.load.loadv2.LoadManager; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlEofPacket; @@ -582,6 +585,8 @@ public class StmtExecutor implements ProfileWriter { // the transaction of this insert may already begin, we will abort it at outer finally block. throw t; } + } else if (parsedStmt instanceof LoadStmt) { + handleLoadStmt(); } else if (parsedStmt instanceof DdlStmt) { handleDdlStmt(); } else if (parsedStmt instanceof ShowStmt) { @@ -1836,6 +1841,42 @@ public class StmtExecutor implements ProfileWriter { context.getState().setEof(); } + private void handleLoadStmt() { + try { + LoadStmt loadStmt = (LoadStmt) parsedStmt; + EtlJobType jobType = loadStmt.getEtlJobType(); + if (jobType == EtlJobType.UNKNOWN) { + throw new DdlException("Unknown load job type"); + } + if (jobType == EtlJobType.HADOOP) { + throw new DdlException("Load job by hadoop cluster is disabled." + + " Try using broker load. See 'help broker load;'"); + } + LoadManager loadManager = context.getEnv().getLoadManager(); + if (jobType == EtlJobType.LOCAL_FILE) { + if (!context.getCapability().supportClientLocalFile()) { + context.getState().setError(ErrorCode.ERR_NOT_ALLOWED_COMMAND, "This client is not support" + + " to load client local file."); + return; + } + LoadJobRowResult submitResult = loadManager.executeMySqlLoadJobFromStmt(context, loadStmt); + context.getState().setOk(submitResult.getRecords(), submitResult.getWarnings(), + submitResult.toString()); + } else { + loadManager.createLoadJobFromStmt(loadStmt); + context.getState().setOk(); + } + } catch (UserException e) { + // Return message to info client what happened. + LOG.debug("DDL statement({}) process failed.", originStmt.originStmt, e); + context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + } catch (Exception e) { + // Maybe our bug + LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e); + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); + } + } + private void handleDdlStmt() { try { DdlExecutor.execute(context.getEnv(), (DdlStmt) parsedStmt); diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 6cf6399894..3df5f685fc 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -482,6 +482,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("auto", new Integer(SqlParserSymbols.KW_AUTO)); keywordMap.put("prepare", new Integer(SqlParserSymbols.KW_PREPARE)); keywordMap.put("execute", new Integer(SqlParserSymbols.KW_EXECUTE)); + keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES)); } // map from token id to token description diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index c8a9002236..023fdec59e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -372,4 +372,45 @@ public class DataDescriptionTest { }; desc.analyze("testDb"); } + + @Test + public void testMysqlLoadData() throws AnalysisException { + TableName tbl = new TableName(null, "testDb", "testTable"); + List params = Lists.newArrayList(); + params.add(new StringLiteral("day")); + params.add(new SlotRef(null, "k2")); + BinaryPredicate predicate = + new BinaryPredicate(Operator.EQ, new SlotRef(null, "k1"), new FunctionCallExpr("bitmap_dict", params)); + Map properties = Maps.newHashMap(); + properties.put("line_delimiter", "abc"); + DataDescription desc = + new DataDescription(tbl, new PartitionNames(false, Lists.newArrayList("p1", "p2")), "abc.txt", true, + Lists.newArrayList("k1", "k2", "v1"), new Separator("010203"), new Separator("040506"), + Lists.newArrayList(predicate), properties); + String db = desc.analyzeFullDbName(null, analyzer); + Assert.assertEquals("default_cluster:testDb", db); + Assert.assertEquals("testDb", desc.getDbName()); + db = desc.analyzeFullDbName("testCluster:testDb1", analyzer); + Assert.assertEquals("testCluster:testDb1", db); + Assert.assertEquals("testDb1", desc.getDbName()); + + desc.analyze("testDb1"); + Assert.assertEquals(1, desc.getFilePaths().size()); + Assert.assertEquals("abc.txt", desc.getFilePaths().get(0)); + + Assert.assertEquals(2, desc.getPartitionNames().getPartitionNames().size()); + Assert.assertEquals("p1", desc.getPartitionNames().getPartitionNames().get(0)); + Assert.assertEquals("p2", desc.getPartitionNames().getPartitionNames().get(1)); + + Assert.assertEquals("040506", desc.getLineDelimiter()); + Assert.assertEquals("010203", desc.getColumnSeparator()); + String sql = "DATA LOCAL INFILE 'abc.txt' " + + "INTO TABLE testDb1.testTable " + + "PARTITIONS (p1, p2) " + + "COLUMNS TERMINATED BY '010203' " + + "LINES TERMINATED BY '040506' " + + "(k1, k2, v1) " + + "SET (`k1` = bitmap_dict('day', `k2`))"; + Assert.assertEquals(sql, desc.toSql()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java index 192d7c5c1f..f027f13daf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java @@ -32,6 +32,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.task.LoadTaskInfo; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import mockit.Expectations; import mockit.Injectable; import mockit.Mocked; @@ -39,6 +40,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; import java.io.StringReader; import java.util.List; @@ -93,6 +96,9 @@ public class LoadStmtTest { desc.getTableName(); minTimes = 0; result = "testTbl"; + desc.analyzeFullDbName("testCluster:testDb", (Analyzer) any); + minTimes = 0; + result = "testCluster:testDb"; env.getResourceMgr(); result = resourceMgr; resourceMgr.getResource(resourceName); @@ -182,4 +188,41 @@ public class LoadStmtTest { return ((ImportColumnsStmt) SqlParserUtils.getFirstStmt( new SqlParser(new SqlScanner(new StringReader(columnsSQL))))).getColumns(); } + + @Test + public void testMySqlLoadData(@Injectable DataDescription desc) throws UserException, IOException { + File temp = File.createTempFile("testMySqlLoadData", ".txt"); + new Expectations() { + { + desc.isClientLocal(); + minTimes = 0; + result = false; + + desc.getFilePaths(); + minTimes = 0; + result = Lists.newArrayList(temp.getPath()); + + desc.toSql(); + minTimes = 0; + result = "XXX"; + + desc.getTableName(); + minTimes = 0; + result = "testTbl"; + + desc.analyzeFullDbName(null, (Analyzer) any); + minTimes = 0; + result = "testCluster:testDb"; + + desc.getMergeType(); + minTimes = 0; + result = LoadTask.MergeType.APPEND; + } + }; + + LoadStmt stmt = new LoadStmt(desc, Maps.newHashMap()); + stmt.analyze(analyzer); + Assert.assertNull(stmt.getLabel().getDbName()); + Assert.assertEquals(EtlJobType.LOCAL_FILE, stmt.getEtlJobType()); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/LoadJobRowResultTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/LoadJobRowResultTest.java new file mode 100644 index 0000000000..cf1268ec4d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/LoadJobRowResultTest.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import org.junit.Assert; +import org.junit.Test; + +public class LoadJobRowResultTest { + + @Test + public void testResult() { + LoadJobRowResult result = new LoadJobRowResult(); + Assert.assertEquals("Records: 0 Deleted: 0 Skipped: 0 Warnings: 0", result.toString()); + result.setRecords(199); + Assert.assertEquals("Records: 199 Deleted: 0 Skipped: 0 Warnings: 0", result.toString()); + result.incRecords(1); + result.setSkipped(20); + Assert.assertEquals("Records: 200 Deleted: 0 Skipped: 20 Warnings: 0", result.toString()); + result.incSkipped(20); + Assert.assertEquals("Records: 200 Deleted: 0 Skipped: 40 Warnings: 0", result.toString()); + Assert.assertEquals(200, result.getRecords()); + Assert.assertEquals(40, result.getSkipped()); + Assert.assertEquals(0, result.getWarnings()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlCapabilityTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlCapabilityTest.java index 1d00a1e029..270e2f4e0a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlCapabilityTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/MysqlCapabilityTest.java @@ -39,4 +39,13 @@ public class MysqlCapabilityTest { + " | CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS | CLIENT_SESSION_TRACK | CLIENT_DEPRECATE_EOF", capability.toString()); } + + @Test + public void testDefaultFlags() { + MysqlCapability capability = MysqlCapability.DEFAULT_CAPABILITY; + Assert.assertEquals("CLIENT_CONNECT_WITH_DB | CLIENT_LOCAL_FILES | CLIENT_PROTOCOL_41" + + " | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH", + capability.toString()); + Assert.assertTrue(capability.supportClientLocalFile()); + } } diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 6babba3cc4..429978646c 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -23,7 +23,8 @@ defaultDb = "regression_test" // add useLocalSessionState so that the jdbc will not send // init cmd like: select @@session.tx_read_only // at each time we connect. -jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true" +// add allowLoadLocalInfile so that the jdbc can execute mysql load data from client. +jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" jdbcUser = "root" jdbcPassword = "" diff --git a/regression-test/data/load_p0/mysql_load/test_line_delimiter.csv b/regression-test/data/load_p0/mysql_load/test_line_delimiter.csv new file mode 100644 index 0000000000..6a9e628eae --- /dev/null +++ b/regression-test/data/load_p0/mysql_load/test_line_delimiter.csv @@ -0,0 +1 @@ +1|aaweizuo2|bbweizuo3|cc diff --git a/regression-test/data/load_p0/mysql_load/test_mysql_load.out b/regression-test/data/load_p0/mysql_load/test_mysql_load.out new file mode 100644 index 0000000000..378b5a92db --- /dev/null +++ b/regression-test/data/load_p0/mysql_load/test_mysql_load.out @@ -0,0 +1,8 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +-2 -51 \N 1 \N \N \N \N \N \N \N 2 \N \N +-2 -50 \N 1 \N \N \N \N \N \N \N \N j \N + +-- !sql1 -- +2019 9 9 9 7.700 a 2019-09-09 1970-01-01T08:33:39 k7 9.0 9.0 + diff --git a/regression-test/data/load_p0/mysql_load/test_strict_mode.csv b/regression-test/data/load_p0/mysql_load/test_strict_mode.csv new file mode 100644 index 0000000000..e6481c95ef --- /dev/null +++ b/regression-test/data/load_p0/mysql_load/test_strict_mode.csv @@ -0,0 +1,2 @@ +-2 -50 1 \N j +-2 -51 1 2 \N diff --git a/regression-test/data/load_p0/mysql_load/test_strict_mode_fail.csv b/regression-test/data/load_p0/mysql_load/test_strict_mode_fail.csv new file mode 100644 index 0000000000..9ce852256a --- /dev/null +++ b/regression-test/data/load_p0/mysql_load/test_strict_mode_fail.csv @@ -0,0 +1,2 @@ +ab -50 1 \N j +-2 -51 1 2 \N diff --git a/regression-test/data/load_p0/mysql_load/test_time.data b/regression-test/data/load_p0/mysql_load/test_time.data new file mode 100644 index 0000000000..7d0430441b --- /dev/null +++ b/regression-test/data/load_p0/mysql_load/test_time.data @@ -0,0 +1 @@ +2019-09-09T10:10:10 \ No newline at end of file diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index 3220701262..6c36000029 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -267,7 +267,7 @@ class Config { if (config.jdbcUrl == null) { //jdbcUrl needs parameter here. Refer to function: buildUrl(String dbName) - config.jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true" + config.jdbcUrl = "jdbc:mysql://127.0.0.1:9030/?useLocalSessionState=true&allowLoadLocalInfile=true" log.info("Set jdbcUrl to '${config.jdbcUrl}' because not specify.".toString()) } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index e40338ed1d..d74be3947b 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -322,6 +322,20 @@ class Suite implements GroovyInterceptable { return remotePath; } + String getLoalFilePath(String fileName) { + if (!new File(fileName).isAbsolute()) { + fileName = new File(context.dataPath, fileName).getAbsolutePath() + } + def file = new File(fileName) + if (!file.exists()) { + log.warn("Stream load input file not exists: ${file}".toString()) + throw new IllegalStateException("Stream load input file not exists: ${file}"); + } + def localFile = file.canonicalPath + log.info("Set stream load input: ${file.canonicalPath}".toString()) + return localFile; + } + boolean enableBrokerLoad() { String enableBrokerLoad = context.config.otherConfigs.get("enableBrokerLoad"); return (enableBrokerLoad != null && enableBrokerLoad.equals("true")); diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index dfdf4e421d..a702de0f86 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -20,7 +20,7 @@ // **Note**: default db will be create if not exist defaultDb = "regression_test" -jdbcUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true" +jdbcUrl = "jdbc:mysql://172.19.0.2:9131/?useLocalSessionState=true&allowLoadLocalInfile=true" jdbcUser = "root" jdbcPassword = "" diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy b/regression-test/pipeline/p1/conf/regression-conf.groovy index 5aa8731fe6..455e3d8267 100644 --- a/regression-test/pipeline/p1/conf/regression-conf.groovy +++ b/regression-test/pipeline/p1/conf/regression-conf.groovy @@ -20,7 +20,7 @@ // **Note**: default db will be create if not exist defaultDb = "regression_test" -jdbcUrl = "jdbc:mysql://172.19.0.2:9132/?useLocalSessionState=true" +jdbcUrl = "jdbc:mysql://172.19.0.2:9132/?useLocalSessionState=true&allowLoadLocalInfile=true" jdbcUser = "root" jdbcPassword = "" diff --git a/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy new file mode 100644 index 0000000000..1b930a9002 --- /dev/null +++ b/regression-test/suites/load_p0/mysql_load/test_mysql_load.groovy @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mysql_load", "p0") { + sql "show tables" + + def tableName = "test_mysql_load_strict" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL, + `v4` smallint(6) REPLACE_IF_NOT_NULL NULL, + `v5` int(11) REPLACE_IF_NOT_NULL NULL, + `v6` bigint(20) REPLACE_IF_NOT_NULL NULL, + `v7` largeint(40) REPLACE_IF_NOT_NULL NULL, + `v8` datetime REPLACE_IF_NOT_NULL NULL, + `v9` date REPLACE_IF_NOT_NULL NULL, + `v10` char(10) REPLACE_IF_NOT_NULL NULL, + `v11` varchar(6) REPLACE_IF_NOT_NULL NULL, + `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")), + PARTITION partition_b VALUES [("100000"), ("1000000000")), + PARTITION partition_c VALUES [("1000000000"), ("10000000000")), + PARTITION partition_d VALUES [("10000000000"), (MAXVALUE))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + // strict mode success + def test_strict_mode_file = getLoalFilePath "test_strict_mode.csv" + + sql """ + LOAD DATA + LOCAL + INFILE '${test_strict_mode_file}' + INTO TABLE ${tableName} + PARTITION (partition_a, partition_b, partition_c, partition_d) + COLUMNS TERMINATED BY '\t' + (k1, k2, v2, v10, v11) + PROPERTIES ("auth" = "root:", "strict_mode"="true"); + """ + + sql "sync" + qt_sql "select * from ${tableName} order by k1, k2" + + + // strict mode failed + test { + def test_strict_mode_fail_file = getLoalFilePath "test_strict_mode_fail.csv" + + sql """ + LOAD DATA + LOCAL + INFILE '${test_strict_mode_fail_file}' + INTO TABLE ${tableName} + PARTITION (partition_a, partition_b, partition_c, partition_d) + COLUMNS TERMINATED BY '\t' + (k1, k2, v2, v10, v11) + PROPERTIES ("auth" = "root:", "strict_mode"="true"); + """ + exception "errCode = 2, detailMessage = [INTERNAL_ERROR]too many filtered rows" + } + + // test_line_delimiter + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` int(11) NULL, + `value` varchar(64) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + + def test_line_delimiter_file = getLoalFilePath "test_line_delimiter.csv" + sql """ + LOAD DATA + LOCAL + INFILE '${test_line_delimiter_file}' + INTO TABLE ${tableName} + COLUMNS TERMINATED BY '|' + LINES TERMINATED BY 'weizuo' + (id, value) + PROPERTIES ("auth" = "root:"); + """ + + sql "sync" + rowCount = sql "select count(1) from ${tableName}" + assertEquals(3, rowCount[0][0]) + + + // test load_nullable_to_not_nullable + def tableName2 = "load_nullable_to_not_nullable" + sql """ DROP TABLE IF EXISTS ${tableName2} """ + sql """ + CREATE TABLE IF NOT EXISTS `${tableName2}` ( + k1 int(32) NOT NULL, + k2 smallint NOT NULL, + k3 int NOT NULL, + k4 bigint NOT NULL, + k5 decimal(9, 3) NOT NULL, + k6 char(5) NOT NULL, + k10 date NOT NULL, + k11 datetime NOT NULL, + k7 varchar(20) NOT NULL, + k8 double max NOT NULL, + k9 float sum NOT NULL ) + AGGREGATE KEY(k1,k2,k3,k4,k5,k6,k10,k11,k7) + PARTITION BY RANGE(k2) ( + PARTITION partition_a VALUES LESS THAN MAXVALUE + ) + DISTRIBUTED BY HASH(k1, k2, k5) + BUCKETS 3 + PROPERTIES ( "replication_allocation" = "tag.location.default: 1"); + """ + + def test_time_local = getLoalFilePath "test_time.data" + sql """ + LOAD DATA + LOCAL + INFILE '${test_time_local}' + INTO TABLE ${tableName2} + COLUMNS TERMINATED BY '\t' + (col) + SET (k1=year(col),k2=month(col),k3=month(col),k4=day(col),k5=7.7,k6="a",k10=date(col),k11=FROM_UNIXTIME(2019,"%Y-%m-%dT%H:%i:%s"),k7="k7",k8=month(col),k9=day(col)) + PROPERTIES ("auth" = "root:"); + """ + + order_qt_sql1 " SELECT * FROM ${tableName2}" + +} +