diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index b877555ee3..47e1e66925 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -28,8 +28,8 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.Util; import org.apache.doris.load.loadv2.LoadTask; @@ -1113,14 +1113,14 @@ public class DataDescription implements InsertStmt.DataDesc { // file format // note(tsy): for historical reason, file format here must be string type rather than TFileFormatType if (fileFormat != null) { - if (!fileFormat.equalsIgnoreCase("parquet") - && !fileFormat.equalsIgnoreCase(FeConstants.csv) - && !fileFormat.equalsIgnoreCase("orc") - && !fileFormat.equalsIgnoreCase("json") - && !fileFormat.equalsIgnoreCase("wal") - && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names) - && !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types) - && !fileFormat.equalsIgnoreCase("hive_text")) { + if (!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_PARQUET) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ORC) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_JSON) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_WAL) + && !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_HIVE_TEXT)) { throw new AnalysisException("File Format Type " + fileFormat + " is invalid."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 816cc6246f..34cb28093d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -27,10 +27,10 @@ import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.Util; @@ -246,11 +246,11 @@ public class OutFileClause { fileFormatType = TFileFormatType.FORMAT_ORC; break; case "csv_with_names": - headerType = FeConstants.csv_with_names; + headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES; fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; case "csv_with_names_and_types": - headerType = FeConstants.csv_with_names_and_types; + headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES; fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java index acbdb92070..ac83f26d49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java @@ -24,9 +24,9 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.datasource.property.constants.S3Properties.Env; -import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; import org.apache.doris.tablefunction.S3TableValuedFunction; import com.google.common.annotations.VisibleForTesting; @@ -145,7 +145,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt { final List filePaths = dataDescription.getFilePaths(); Preconditions.checkState(filePaths.size() == 1, "there should be only one file path"); final String s3FilePath = filePaths.get(0); - params.put(S3TableValuedFunction.S3_URI, s3FilePath); + params.put(S3TableValuedFunction.PROP_URI, s3FilePath); final Map dataDescProp = dataDescription.getProperties(); if (dataDescProp != null) { @@ -153,7 +153,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt { } final String format = Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT); - params.put(ExternalFileTableValuedFunction.FORMAT, format); + params.put(FileFormatConstants.PROP_FORMAT, format); if (isCsvFormat(format)) { parseSeparator(dataDescription.getColumnSeparatorObj(), params); @@ -162,7 +162,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt { List columnsFromPath = dataDescription.getColumnsFromPath(); if (columnsFromPath != null) { - params.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS, + params.put(FileFormatConstants.PROP_PATH_PARTITION_KEYS, String.join(",", columnsFromPath)); } @@ -190,7 +190,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt { } catch (AnalysisException e) { throw new DdlException(String.format("failed to parse separator:%s", separator), e); } - tvfParams.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, separator.getSeparator()); + tvfParams.put(FileFormatConstants.PROP_COLUMN_SEPARATOR, separator.getSeparator()); } private static boolean isCsvFormat(String format) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java index e881154924..e508838207 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java @@ -39,8 +39,10 @@ public class StorageBackend implements ParseNode { if (Strings.isNullOrEmpty(path)) { throw new AnalysisException("No destination path specified."); } + checkUri(URI.create(path), type); + } - URI uri = URI.create(path); + public static void checkUri(URI uri, StorageBackend.StorageType type) throws AnalysisException { String schema = uri.getScheme(); if (schema == null) { throw new AnalysisException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index ecd0c2f4fb..b369234898 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -20,9 +20,6 @@ package org.apache.doris.common; import org.apache.doris.persist.meta.FeMetaFormat; public class FeConstants { - // Database and table's default configurations, we will never change them - public static short default_replication_num = 3; - // The default value of bucket setting && auto bucket without estimate_partition_size public static int default_bucket_num = 10; @@ -39,7 +36,6 @@ public class FeConstants { public static int heartbeat_interval_second = 5; public static int checkpoint_interval_second = 60; // 1 minutes - public static int ip_check_interval_second = 5; // dpp version public static String dpp_version = "3_2_0"; @@ -71,12 +67,6 @@ public class FeConstants { public static long tablet_checker_interval_ms = 20 * 1000L; public static long tablet_schedule_interval_ms = 1000L; - public static String csv = "csv"; - public static String csv_with_names = "csv_with_names"; - public static String csv_with_names_and_types = "csv_with_names_and_types"; - - public static String text = "hive_text"; - public static String FS_PREFIX_S3 = "s3"; public static String FS_PREFIX_S3A = "s3a"; public static String FS_PREFIX_S3N = "s3n"; @@ -92,6 +82,7 @@ public class FeConstants { public static String FS_PREFIX_HDFS = "hdfs"; public static String FS_PREFIX_VIEWFS = "viewfs"; public static String FS_PREFIX_FILE = "file"; + public static final String INTERNAL_DB_NAME = "__internal_schema"; public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java new file mode 100644 index 0000000000..7d60222d29 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import java.util.regex.Pattern; + +public class FileFormatConstants { + public static final String DEFAULT_COLUMN_SEPARATOR = "\t"; + public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001"; + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String FORMAT_CSV = "csv"; + public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names"; + public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types"; + public static final String FORMAT_HIVE_TEXT = "hive_text"; + public static final String FORMAT_PARQUET = "parquet"; + public static final String FORMAT_ORC = "orc"; + public static final String FORMAT_JSON = "json"; + public static final String FORMAT_AVRO = "avro"; + public static final String FORMAT_WAL = "wal"; + + public static final String PROP_FORMAT = "format"; + public static final String PROP_COLUMN_SEPARATOR = "column_separator"; + public static final String PROP_LINE_DELIMITER = "line_delimiter"; + public static final String PROP_JSON_ROOT = "json_root"; + public static final String PROP_JSON_PATHS = "jsonpaths"; + public static final String PROP_STRIP_OUTER_ARRAY = "strip_outer_array"; + public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line"; + public static final String PROP_NUM_AS_STRING = "num_as_string"; + public static final String PROP_FUZZY_PARSE = "fuzzy_parse"; + public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; + public static final String PROP_SKIP_LINES = "skip_lines"; + public static final String PROP_CSV_SCHEMA = "csv_schema"; + public static final String PROP_COMPRESS_TYPE = "compress_type"; + public static final String PROP_PATH_PARTITION_KEYS = "path_partition_keys"; + + // decimal(p,s) + public static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)"); + // datetime(p) + public static final Pattern DATETIME_TYPE_PATTERN = Pattern.compile("datetime\\((\\d+)\\)"); + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java new file mode 100644 index 0000000000..0b646a00b1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeNameFormat; + +import com.google.common.base.Strings; + +import java.util.List; +import java.util.regex.Matcher; + +public class FileFormatUtils { + + public static boolean isCsv(String formatStr) { + return FileFormatConstants.FORMAT_CSV.equalsIgnoreCase(formatStr) + || FileFormatConstants.FORMAT_CSV_WITH_NAMES.equalsIgnoreCase(formatStr) + || FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES.equalsIgnoreCase(formatStr) + || FileFormatConstants.FORMAT_HIVE_TEXT.equalsIgnoreCase(formatStr); + } + + // public for unit test + public static void parseCsvSchema(List csvSchema, String csvSchemaStr) + throws AnalysisException { + if (Strings.isNullOrEmpty(csvSchemaStr)) { + return; + } + // the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)" + String[] schemaStrs = csvSchemaStr.split(";"); + try { + for (String schemaStr : schemaStrs) { + String[] kv = schemaStr.replace(" ", "").split(":"); + if (kv.length != 2) { + throw new AnalysisException("invalid csv schema: " + csvSchemaStr); + } + Column column = null; + String name = kv[0].toLowerCase(); + FeNameFormat.checkColumnName(name); + String type = kv[1].toLowerCase(); + if (type.equals("tinyint")) { + column = new Column(name, PrimitiveType.TINYINT, true); + } else if (type.equals("smallint")) { + column = new Column(name, PrimitiveType.SMALLINT, true); + } else if (type.equals("int")) { + column = new Column(name, PrimitiveType.INT, true); + } else if (type.equals("bigint")) { + column = new Column(name, PrimitiveType.BIGINT, true); + } else if (type.equals("largeint")) { + column = new Column(name, PrimitiveType.LARGEINT, true); + } else if (type.equals("float")) { + column = new Column(name, PrimitiveType.FLOAT, true); + } else if (type.equals("double")) { + column = new Column(name, PrimitiveType.DOUBLE, true); + } else if (type.startsWith("decimal")) { + // regex decimal(p, s) + Matcher matcher = FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(type); + if (!matcher.find()) { + throw new AnalysisException("invalid decimal type: " + type); + } + int precision = Integer.parseInt(matcher.group(1)); + int scale = Integer.parseInt(matcher.group(2)); + column = new Column(name, ScalarType.createDecimalV3Type(precision, scale), false, null, true, null, + ""); + } else if (type.equals("date")) { + column = new Column(name, ScalarType.createDateType(), false, null, true, null, ""); + } else if (type.startsWith("datetime")) { + int scale = 0; + if (!type.equals("datetime")) { + // regex datetime(s) + Matcher matcher = FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(type); + if (!matcher.find()) { + throw new AnalysisException("invalid datetime type: " + type); + } + scale = Integer.parseInt(matcher.group(1)); + } + column = new Column(name, ScalarType.createDatetimeV2Type(scale), false, null, true, null, ""); + } else if (type.equals("string")) { + column = new Column(name, PrimitiveType.STRING, true); + } else if (type.equals("boolean")) { + column = new Column(name, PrimitiveType.BOOLEAN, true); + } else { + throw new AnalysisException("unsupported column type: " + type); + } + csvSchema.add(column); + } + } catch (Exception e) { + throw new AnalysisException("invalid csv schema: " + e.getMessage()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java index 01198f9493..7fd02926b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.qe.ConnectContext; @@ -551,17 +550,18 @@ public class Util { public static TFileFormatType getFileFormatTypeFromName(String formatName) { String lowerFileFormat = Objects.requireNonNull(formatName).toLowerCase(); - if (lowerFileFormat.equals("parquet")) { + if (lowerFileFormat.equals(FileFormatConstants.FORMAT_PARQUET)) { return TFileFormatType.FORMAT_PARQUET; - } else if (lowerFileFormat.equals("orc")) { + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ORC)) { return TFileFormatType.FORMAT_ORC; - } else if (lowerFileFormat.equals("json")) { + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_JSON)) { return TFileFormatType.FORMAT_JSON; // csv/csv_with_name/csv_with_names_and_types treat as csv format - } else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names) - || lowerFileFormat.equals(FeConstants.csv_with_names_and_types) + } else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV) + || lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + || lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES) // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format. - || lowerFileFormat.equals(FeConstants.text)) { + || lowerFileFormat.equals(FileFormatConstants.FORMAT_HIVE_TEXT)) { return TFileFormatType.FORMAT_CSV_PLAIN; } else if (lowerFileFormat.equals("wal")) { return TFileFormatType.FORMAT_WAL; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index f02c5bd2d8..31ebcea512 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -25,6 +25,8 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.NereidsException; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.Profile; +import org.apache.doris.common.util.FileFormatConstants; +import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.nereids.analyzer.UnboundAlias; @@ -52,7 +54,6 @@ import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryStateException; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; import org.apache.doris.tablefunction.HdfsTableValuedFunction; import org.apache.doris.tablefunction.S3TableValuedFunction; @@ -269,7 +270,7 @@ public class LoadCommand extends Command implements ForwardWithSync { } private static boolean isCsvType(Map tvfProperties) { - return tvfProperties.get(ExternalFileTableValuedFunction.FORMAT).equalsIgnoreCase("csv"); + return tvfProperties.get(FileFormatConstants.PROP_FORMAT).equalsIgnoreCase("csv"); } /** @@ -296,11 +297,11 @@ public class LoadCommand extends Command implements ForwardWithSync { Map sourceProperties = dataDesc.getProperties(); if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties)) { - String csvSchemaStr = sourceProperties.get(ExternalFileTableValuedFunction.CSV_SCHEMA); + String csvSchemaStr = sourceProperties.get(FileFormatConstants.PROP_CSV_SCHEMA); if (csvSchemaStr != null) { - tvfProperties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, csvSchemaStr); + tvfProperties.put(FileFormatConstants.PROP_CSV_SCHEMA, csvSchemaStr); List csvSchema = new ArrayList<>(); - ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, sourceProperties); + FileFormatUtils.parseCsvSchema(csvSchema, csvSchemaStr); List csvColumns = new ArrayList<>(); for (Column csvColumn : csvSchema) { csvColumns.add(new UnboundSlot(csvColumn.getName())); @@ -440,12 +441,12 @@ public class LoadCommand extends Command implements ForwardWithSync { String fileFormat = dataDesc.getFormatDesc().getFileFormat().orElse("csv"); if ("csv".equalsIgnoreCase(fileFormat)) { dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep -> - tvfProperties.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, sep.getSeparator())); + tvfProperties.put(FileFormatConstants.PROP_COLUMN_SEPARATOR, sep.getSeparator())); dataDesc.getFormatDesc().getLineDelimiter().ifPresent(sep -> - tvfProperties.put(ExternalFileTableValuedFunction.LINE_DELIMITER, sep.getSeparator())); + tvfProperties.put(FileFormatConstants.PROP_LINE_DELIMITER, sep.getSeparator())); } // TODO: resolve and put ExternalFileTableValuedFunction params - tvfProperties.put(ExternalFileTableValuedFunction.FORMAT, fileFormat); + tvfProperties.put(FileFormatConstants.PROP_FORMAT, fileFormat); List filePaths = dataDesc.getFilePaths(); // TODO: support multi location by union @@ -454,7 +455,7 @@ public class LoadCommand extends Command implements ForwardWithSync { S3Properties.convertToStdProperties(tvfProperties); tvfProperties.keySet().removeIf(S3Properties.Env.FS_KEYS::contains); // TODO: check file path by s3 fs list status - tvfProperties.put(S3TableValuedFunction.S3_URI, listFilePath); + tvfProperties.put(S3TableValuedFunction.PROP_URI, listFilePath); } final Map dataDescProps = dataDesc.getProperties(); @@ -463,7 +464,7 @@ public class LoadCommand extends Command implements ForwardWithSync { } List columnsFromPath = dataDesc.getColumnsFromPath(); if (columnsFromPath != null && !columnsFromPath.isEmpty()) { - tvfProperties.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS, + tvfProperties.put(FileFormatConstants.PROP_PATH_PARTITION_KEYS, String.join(",", columnsFromPath)); } return tvfProperties; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java index 6d7031f61c..d921336058 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java @@ -20,7 +20,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TupleId; -import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; @@ -65,8 +65,8 @@ public class ResultFileSink extends DataSink { public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause, ArrayList labels) { this(exchNodeId, outFileClause); - if (outFileClause.getHeaderType().equals(FeConstants.csv_with_names) - || outFileClause.getHeaderType().equals(FeConstants.csv_with_names_and_types)) { + if (outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + || outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) { header = genNames(labels, outFileClause.getColumnSeparator(), outFileClause.getLineDelimiter()); } headerType = outFileClause.getHeaderType(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 7e9078df3d..5fe573ab5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -316,7 +316,7 @@ public abstract class FileQueryScanNode extends FileScanNode { for (Split split : inputSplits) { FileSplit fileSplit = (FileSplit) split; TFileType locationType = getLocationType(fileSplit.getPath().toString()); - setLocationPropertiesIfNecessary(locationType, fileSplit, locationProperties); + setLocationPropertiesIfNecessary(locationType, locationProperties); TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. @@ -392,7 +392,7 @@ public abstract class FileQueryScanNode extends FileScanNode { scanRangeLocations.size(), (System.currentTimeMillis() - start)); } - private void setLocationPropertiesIfNecessary(TFileType locationType, FileSplit fileSplit, + private void setLocationPropertiesIfNecessary(TFileType locationType, Map locationProperties) throws UserException { if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { if (!params.isSetHdfsParams()) { @@ -479,13 +479,6 @@ public abstract class FileQueryScanNode extends FileScanNode { protected abstract Map getLocationProperties() throws UserException; - // eg: hdfs://namenode s3://buckets - protected String getFsName(FileSplit split) { - String fullPath = split.getPath().toUri().toString(); - String filePath = split.getPath().toUri().getPath(); - return fullPath.replace(filePath, ""); - } - protected static Optional getTFileType(String location) { if (location != null && !location.isEmpty()) { if (S3Util.isObjStorage(location)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index d36958ad88..52bb119d7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -27,9 +27,9 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.Util; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.Load; @@ -131,8 +131,8 @@ public class LoadScanProvider { private String getHeaderType(String formatType) { if (formatType != null) { - if (formatType.equalsIgnoreCase(FeConstants.csv_with_names) || formatType.equalsIgnoreCase( - FeConstants.csv_with_names_and_types)) { + if (formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES) + || formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) { return formatType; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java index 59f855f445..6f43608fce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java @@ -81,10 +81,6 @@ public class TVFScanNode extends FileQueryScanNode { numNodes = backendPolicy.numBackends(); } - protected String getFsName(FileSplit split) { - return tableValuedFunction.getFsName(); - } - @Override public TFileAttributes getFileAttributes() throws UserException { return tableValuedFunction.getFileAttributes(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 69b002c3cb..6f2ae5d61d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -34,12 +34,12 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.FileFormatConstants; +import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.external.TVFScanNode; @@ -71,6 +71,7 @@ import org.apache.doris.thrift.TTextSerdeType; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.protobuf.ByteString; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -84,8 +85,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; /** @@ -93,44 +92,24 @@ import java.util.stream.Collectors; */ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctionIf { public static final Logger LOG = LogManager.getLogger(ExternalFileTableValuedFunction.class); - protected static String DEFAULT_COLUMN_SEPARATOR = ","; - protected static final String DEFAULT_LINE_DELIMITER = "\n"; - public static final String FORMAT = "format"; - public static final String TABLE_ID = "table_id"; - public static final String COLUMN_SEPARATOR = "column_separator"; - public static final String LINE_DELIMITER = "line_delimiter"; - protected static final String JSON_ROOT = "json_root"; - protected static final String JSON_PATHS = "jsonpaths"; - protected static final String STRIP_OUTER_ARRAY = "strip_outer_array"; - protected static final String READ_JSON_BY_LINE = "read_json_by_line"; - protected static final String NUM_AS_STRING = "num_as_string"; - protected static final String FUZZY_PARSE = "fuzzy_parse"; - protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes"; - protected static final String SKIP_LINES = "skip_lines"; - public static final String CSV_SCHEMA = "csv_schema"; - protected static final String COMPRESS_TYPE = "compress_type"; - public static final String PATH_PARTITION_KEYS = "path_partition_keys"; - // decimal(p,s) - private static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)"); - // datetime(p) - private static final Pattern DATETIME_TYPE_PATTERN = Pattern.compile("datetime\\((\\d+)\\)"); + + public static final String PROP_TABLE_ID = "table_id"; protected static final ImmutableSet FILE_FORMAT_PROPERTIES = new ImmutableSet.Builder() - .add(FORMAT) - .add(TABLE_ID) - .add(JSON_ROOT) - .add(JSON_PATHS) - .add(STRIP_OUTER_ARRAY) - .add(READ_JSON_BY_LINE) - .add(NUM_AS_STRING) - .add(FUZZY_PARSE) - .add(COLUMN_SEPARATOR) - .add(LINE_DELIMITER) - .add(TRIM_DOUBLE_QUOTES) - .add(SKIP_LINES) - .add(CSV_SCHEMA) - .add(COMPRESS_TYPE) - .add(PATH_PARTITION_KEYS) + .add(FileFormatConstants.PROP_FORMAT) + .add(FileFormatConstants.PROP_JSON_ROOT) + .add(FileFormatConstants.PROP_JSON_PATHS) + .add(FileFormatConstants.PROP_STRIP_OUTER_ARRAY) + .add(FileFormatConstants.PROP_READ_JSON_BY_LINE) + .add(FileFormatConstants.PROP_NUM_AS_STRING) + .add(FileFormatConstants.PROP_FUZZY_PARSE) + .add(FileFormatConstants.PROP_COLUMN_SEPARATOR) + .add(FileFormatConstants.PROP_LINE_DELIMITER) + .add(FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES) + .add(FileFormatConstants.PROP_SKIP_LINES) + .add(FileFormatConstants.PROP_CSV_SCHEMA) + .add(FileFormatConstants.PROP_COMPRESS_TYPE) + .add(FileFormatConstants.PROP_PATH_PARTITION_KEYS) .build(); // Columns got from file and path(if has) @@ -142,17 +121,16 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio private List pathPartitionKeys; protected List fileStatuses = Lists.newArrayList(); - protected Map locationProperties; + protected Map locationProperties = Maps.newHashMap(); protected String filePath; - - private TFileFormatType fileFormatType; + protected TFileFormatType fileFormatType; private TFileCompressType compressionType; private String headerType = ""; private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE; - private String columnSeparator = DEFAULT_COLUMN_SEPARATOR; - private String lineDelimiter = DEFAULT_LINE_DELIMITER; + private String columnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR; + private String lineDelimiter = FileFormatConstants.DEFAULT_LINE_DELIMITER; private String jsonRoot = ""; private String jsonPaths = ""; private boolean stripOuterArray; @@ -181,20 +159,6 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio return locationProperties; } - public List getCsvSchema() { - return csvSchema; - } - - public String getFsName() { - TFileType fileType = getTFileType(); - if (fileType == TFileType.FILE_HDFS) { - return locationProperties.get(HdfsResource.HADOOP_FS_NAME); - } else if (fileType == TFileType.FILE_S3) { - return locationProperties.get(S3Properties.ENDPOINT); - } - return ""; - } - public List getPathPartitionKeys() { return pathPartitionKeys; } @@ -209,24 +173,29 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio } } - //The keys in the passed validParams map need to be lowercase. - protected void parseProperties(Map validParams) throws AnalysisException { - String formatString = validParams.getOrDefault(FORMAT, ""); + //The keys in properties map need to be lowercase. + protected Map parseCommonProperties(Map properties) throws AnalysisException { + // Copy the properties, because we will remove the key from properties. + Map copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + copiedProps.putAll(properties); + + String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, ""); + String defaultColumnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR; switch (formatString) { case "csv": this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; case "hive_text": this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; - this.DEFAULT_COLUMN_SEPARATOR = "\001"; + defaultColumnSeparator = FileFormatConstants.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR; this.textSerdeType = TTextSerdeType.HIVE_TEXT_SERDE; break; case "csv_with_names": - this.headerType = FeConstants.csv_with_names; + this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES; this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; case "csv_with_names_and_types": - this.headerType = FeConstants.csv_with_names_and_types; + this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES; this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; break; case "parquet": @@ -248,114 +217,62 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio throw new AnalysisException("format:" + formatString + " is not supported."); } - tableId = Long.valueOf(validParams.getOrDefault(TABLE_ID, "-1")).longValue(); - columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR); + tableId = Long.valueOf(getOrDefaultAndRemove(copiedProps, PROP_TABLE_ID, "-1")); + columnSeparator = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_COLUMN_SEPARATOR, + defaultColumnSeparator); if (Strings.isNullOrEmpty(columnSeparator)) { throw new AnalysisException("column_separator can not be empty."); } columnSeparator = Separator.convertSeparator(columnSeparator); - lineDelimiter = validParams.getOrDefault(LINE_DELIMITER, DEFAULT_LINE_DELIMITER); + lineDelimiter = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_LINE_DELIMITER, + FileFormatConstants.DEFAULT_LINE_DELIMITER); if (Strings.isNullOrEmpty(lineDelimiter)) { throw new AnalysisException("line_delimiter can not be empty."); } lineDelimiter = Separator.convertSeparator(lineDelimiter); - jsonRoot = validParams.getOrDefault(JSON_ROOT, ""); - jsonPaths = validParams.getOrDefault(JSON_PATHS, ""); - readJsonByLine = Boolean.valueOf(validParams.get(READ_JSON_BY_LINE)).booleanValue(); - stripOuterArray = Boolean.valueOf(validParams.get(STRIP_OUTER_ARRAY)).booleanValue(); - numAsString = Boolean.valueOf(validParams.get(NUM_AS_STRING)).booleanValue(); - fuzzyParse = Boolean.valueOf(validParams.get(FUZZY_PARSE)).booleanValue(); - trimDoubleQuotes = Boolean.valueOf(validParams.get(TRIM_DOUBLE_QUOTES)).booleanValue(); - skipLines = Integer.valueOf(validParams.getOrDefault(SKIP_LINES, "0")).intValue(); + jsonRoot = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_JSON_ROOT, ""); + jsonPaths = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_JSON_PATHS, ""); + readJsonByLine = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_READ_JSON_BY_LINE, "")).booleanValue(); + stripOuterArray = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_STRIP_OUTER_ARRAY, "")).booleanValue(); + numAsString = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_NUM_AS_STRING, "")).booleanValue(); + fuzzyParse = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FUZZY_PARSE, "")).booleanValue(); + trimDoubleQuotes = Boolean.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES, "")).booleanValue(); + skipLines = Integer.valueOf( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_SKIP_LINES, "0")).intValue(); + String compressTypeStr = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_COMPRESS_TYPE, "UNKNOWN"); try { - compressionType = Util.getFileCompressType(validParams.getOrDefault(COMPRESS_TYPE, "UNKNOWN")); + compressionType = Util.getFileCompressType(compressTypeStr); } catch (IllegalArgumentException e) { - throw new AnalysisException("Compress type : " + validParams.get(COMPRESS_TYPE) + " is not supported."); + throw new AnalysisException("Compress type : " + compressTypeStr + " is not supported."); } - if (formatString.equals("csv") || formatString.equals("csv_with_names") - || formatString.equals("csv_with_names_and_types")) { - parseCsvSchema(csvSchema, validParams); + if (FileFormatUtils.isCsv(formatString)) { + FileFormatUtils.parseCsvSchema(csvSchema, getOrDefaultAndRemove(copiedProps, + FileFormatConstants.PROP_CSV_SCHEMA, "")); + LOG.debug("get csv schema: {}", csvSchema); } - pathPartitionKeys = Optional.ofNullable(validParams.get(PATH_PARTITION_KEYS)) - .map(str -> - Arrays.stream(str.split(",")) - .map(String::trim) - .collect(Collectors.toList())) + + pathPartitionKeys = Optional.ofNullable( + getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_PATH_PARTITION_KEYS, null)) + .map(str -> Arrays.stream(str.split(",")) + .map(String::trim) + .collect(Collectors.toList())) .orElse(Lists.newArrayList()); + + return copiedProps; } - // public for unit test - public static void parseCsvSchema(List csvSchema, Map validParams) - throws AnalysisException { - String csvSchemaStr = validParams.get(CSV_SCHEMA); - if (Strings.isNullOrEmpty(csvSchemaStr)) { - return; - } - // the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)" - String[] schemaStrs = csvSchemaStr.split(";"); - try { - for (String schemaStr : schemaStrs) { - String[] kv = schemaStr.replace(" ", "").split(":"); - if (kv.length != 2) { - throw new AnalysisException("invalid csv schema: " + csvSchemaStr); - } - Column column = null; - String name = kv[0].toLowerCase(); - FeNameFormat.checkColumnName(name); - String type = kv[1].toLowerCase(); - if (type.equals("tinyint")) { - column = new Column(name, PrimitiveType.TINYINT, true); - } else if (type.equals("smallint")) { - column = new Column(name, PrimitiveType.SMALLINT, true); - } else if (type.equals("int")) { - column = new Column(name, PrimitiveType.INT, true); - } else if (type.equals("bigint")) { - column = new Column(name, PrimitiveType.BIGINT, true); - } else if (type.equals("largeint")) { - column = new Column(name, PrimitiveType.LARGEINT, true); - } else if (type.equals("float")) { - column = new Column(name, PrimitiveType.FLOAT, true); - } else if (type.equals("double")) { - column = new Column(name, PrimitiveType.DOUBLE, true); - } else if (type.startsWith("decimal")) { - // regex decimal(p, s) - Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(type); - if (!matcher.find()) { - throw new AnalysisException("invalid decimal type: " + type); - } - int precision = Integer.parseInt(matcher.group(1)); - int scale = Integer.parseInt(matcher.group(2)); - column = new Column(name, ScalarType.createDecimalV3Type(precision, scale), false, null, true, null, - ""); - } else if (type.equals("date")) { - column = new Column(name, ScalarType.createDateType(), false, null, true, null, ""); - } else if (type.startsWith("datetime")) { - int scale = 0; - if (!type.equals("datetime")) { - // regex datetime(s) - Matcher matcher = DATETIME_TYPE_PATTERN.matcher(type); - if (!matcher.find()) { - throw new AnalysisException("invalid datetime type: " + type); - } - scale = Integer.parseInt(matcher.group(1)); - } - column = new Column(name, ScalarType.createDatetimeV2Type(scale), false, null, true, null, ""); - } else if (type.equals("string")) { - column = new Column(name, PrimitiveType.STRING, true); - } else if (type.equals("boolean")) { - column = new Column(name, PrimitiveType.BOOLEAN, true); - } else { - throw new AnalysisException("unsupported column type: " + type); - } - csvSchema.add(column); - } - LOG.debug("get csv schema: {}", csvSchema); - } catch (Exception e) { - throw new AnalysisException("invalid csv schema: " + e.getMessage()); - } + protected String getOrDefaultAndRemove(Map props, String key, String defaultValue) { + String value = props.getOrDefault(key, defaultValue); + props.remove(key); + return value; } public List getFileStatuses() { @@ -588,3 +505,4 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java index 55c898b29f..051706ae47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HdfsTableValuedFunction.java @@ -25,9 +25,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.URI; import org.apache.doris.thrift.TFileType; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import org.apache.commons.collections.map.CaseInsensitiveMap; +import com.google.common.base.Strings; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -39,50 +37,41 @@ import java.util.Map; */ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction { public static final Logger LOG = LogManager.getLogger(HdfsTableValuedFunction.class); - public static final String NAME = "hdfs"; - public static final String HDFS_URI = "uri"; - // simple or kerberos + private static final String PROP_URI = "uri"; - private static final ImmutableSet LOCATION_PROPERTIES = new ImmutableSet.Builder() - .add(HDFS_URI) - .add(HdfsResource.HADOOP_SECURITY_AUTHENTICATION) - .add(HdfsResource.HADOOP_FS_NAME) - .add(HdfsResource.HADOOP_USER_NAME) - .add(HdfsResource.HADOOP_KERBEROS_PRINCIPAL) - .add(HdfsResource.HADOOP_KERBEROS_KEYTAB) - .add(HdfsResource.HADOOP_SHORT_CIRCUIT) - .add(HdfsResource.HADOOP_SOCKET_PATH) - .build(); + public HdfsTableValuedFunction(Map properties) throws AnalysisException { + init(properties); + } - private URI hdfsUri; + private void init(Map properties) throws AnalysisException { + // 1. analyze common properties + Map otherProps = super.parseCommonProperties(properties); - public HdfsTableValuedFunction(Map params) throws AnalysisException { - Map fileParams = new CaseInsensitiveMap(); - locationProperties = Maps.newHashMap(); - for (String key : params.keySet()) { - String lowerKey = key.toLowerCase(); - if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) { - fileParams.put(lowerKey, params.get(key)); - } else if (LOCATION_PROPERTIES.contains(lowerKey)) { - locationProperties.put(lowerKey, params.get(key)); - } else if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) { + // 2. analyze uri + String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null); + if (Strings.isNullOrEmpty(uriStr)) { + throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI)); + } + URI uri = URI.create(uriStr); + StorageBackend.checkUri(uri, StorageType.HDFS); + filePath = uri.getScheme() + "://" + uri.getAuthority() + uri.getPath(); + + // 3. analyze other properties + for (String key : otherProps.keySet()) { + if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) { // because HADOOP_FS_NAME contains upper and lower case - locationProperties.put(HdfsResource.HADOOP_FS_NAME, params.get(key)); + locationProperties.put(HdfsResource.HADOOP_FS_NAME, otherProps.get(key)); } else { - locationProperties.put(key, params.get(key)); + locationProperties.put(key, otherProps.get(key)); } } - - if (!locationProperties.containsKey(HDFS_URI)) { - throw new AnalysisException(String.format("Configuration '%s' is required.", HDFS_URI)); + // If the user does not specify the HADOOP_FS_NAME, we will use the uri's scheme and authority + if (!locationProperties.containsKey(HdfsResource.HADOOP_FS_NAME)) { + locationProperties.put(HdfsResource.HADOOP_FS_NAME, uri.getScheme() + "://" + uri.getAuthority()); } - StorageBackend.checkPath(locationProperties.get(HDFS_URI), StorageType.HDFS); - hdfsUri = URI.create(locationProperties.get(HDFS_URI)); - filePath = locationProperties.get(HdfsResource.HADOOP_FS_NAME) + hdfsUri.getPath(); - - super.parseProperties(fileParams); + // 4. parse file parseFile(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java index 265045d7a6..5044f045c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java @@ -20,9 +20,9 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; -import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,24 +36,15 @@ public class HttpStreamTableValuedFunction extends ExternalFileTableValuedFuncti private static final Logger LOG = LogManager.getLogger(HttpStreamTableValuedFunction.class); public static final String NAME = "http_stream"; - public HttpStreamTableValuedFunction(Map params) throws AnalysisException { - Map fileParams = new CaseInsensitiveMap(); - for (String key : params.keySet()) { - String lowerKey = key.toLowerCase(); - if (!FILE_FORMAT_PROPERTIES.contains(lowerKey)) { - throw new AnalysisException(key + " is invalid property"); - } - fileParams.put(lowerKey, params.get(key).toLowerCase()); - } + public HttpStreamTableValuedFunction(Map properties) throws AnalysisException { + // 1. analyze common properties + super.parseCommonProperties(properties); - String formatString = fileParams.getOrDefault(FORMAT, "").toLowerCase(); - if (formatString.equals("parquet") - || formatString.equals("avro") - || formatString.equals("orc")) { - throw new AnalysisException("current http_stream does not yet support parquet, avro and orc"); + if (fileFormatType == TFileFormatType.FORMAT_PARQUET + || fileFormatType == TFileFormatType.FORMAT_AVRO + || fileFormatType == TFileFormatType.FORMAT_ORC) { + throw new AnalysisException("http_stream does not yet support parquet, avro and orc"); } - - super.parseProperties(fileParams); } // =========== implement abstract methods of ExternalFileTableValuedFunction ================= diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java index bf78faec95..1f98b055bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/LocalTableValuedFunction.java @@ -31,8 +31,6 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,42 +44,31 @@ import java.util.concurrent.TimeUnit; */ public class LocalTableValuedFunction extends ExternalFileTableValuedFunction { private static final Logger LOG = LogManager.getLogger(LocalTableValuedFunction.class); - public static final String NAME = "local"; - public static final String FILE_PATH = "file_path"; - public static final String BACKEND_ID = "backend_id"; + public static final String PROP_FILE_PATH = "file_path"; + public static final String PROP_BACKEND_ID = "backend_id"; private static final ImmutableSet LOCATION_PROPERTIES = new ImmutableSet.Builder() - .add(FILE_PATH) - .add(BACKEND_ID) + .add(PROP_FILE_PATH) + .add(PROP_BACKEND_ID) .build(); private long backendId; - public LocalTableValuedFunction(Map params) throws AnalysisException { - Map fileParams = new CaseInsensitiveMap(); - locationProperties = Maps.newHashMap(); - for (String key : params.keySet()) { - String lowerKey = key.toLowerCase(); - if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) { - fileParams.put(lowerKey, params.get(key)); - } else if (LOCATION_PROPERTIES.contains(lowerKey)) { - locationProperties.put(lowerKey, params.get(key)); - } else { - throw new AnalysisException(key + " is invalid property"); - } - } + public LocalTableValuedFunction(Map properties) throws AnalysisException { + // 1. analyze common properties + Map otherProps = super.parseCommonProperties(properties); + // 2. analyze location properties for (String key : LOCATION_PROPERTIES) { - if (!locationProperties.containsKey(key)) { - throw new AnalysisException(String.format("Configuration '%s' is required.", key)); + if (!otherProps.containsKey(key)) { + throw new AnalysisException(String.format("Property '%s' is required.", key)); } } + filePath = otherProps.get(PROP_FILE_PATH); + backendId = Long.parseLong(otherProps.get(PROP_BACKEND_ID)); - filePath = locationProperties.get(FILE_PATH); - backendId = Long.parseLong(locationProperties.get(BACKEND_ID)); - super.parseProperties(fileParams); - + // 3. parse file getFileListFromBackend(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index 504730daae..9ad6232c4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -30,10 +30,9 @@ import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.thrift.TFileType; -import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; -import java.util.HashMap; import java.util.Map; /** @@ -49,71 +48,46 @@ import java.util.Map; */ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { public static final String NAME = "s3"; - public static final String S3_URI = "uri"; + public static final String PROP_URI = "uri"; private static final ImmutableSet DEPRECATED_KEYS = - ImmutableSet.of("access_key", "secret_key", "session_token", "region"); + ImmutableSet.of("access_key", "secret_key", "session_token", "region", + "ACCESS_KEY", "SECRET_KEY", "SESSION_TOKEN", "REGION"); - private static final ImmutableSet OPTIONAL_KEYS = - ImmutableSet.of(S3Properties.SESSION_TOKEN, PropertyConverter.USE_PATH_STYLE, S3Properties.REGION, - PATH_PARTITION_KEYS); - - private static final ImmutableSet LOCATION_PROPERTIES = ImmutableSet.builder() - .add(S3_URI) - .add(S3Properties.ENDPOINT) - .addAll(DEPRECATED_KEYS) - .addAll(S3Properties.TVF_REQUIRED_FIELDS) - .addAll(OPTIONAL_KEYS) - .build(); - - private final S3URI s3uri; - private final boolean forceVirtualHosted; private String virtualBucket = ""; - public S3TableValuedFunction(Map params) throws AnalysisException { + public S3TableValuedFunction(Map properties) throws AnalysisException { + // 1. analyze common properties + Map otherProps = super.parseCommonProperties(properties); - Map fileParams = new HashMap<>(); - for (Map.Entry entry : params.entrySet()) { - String key = entry.getKey(); - String lowerKey = key.toLowerCase(); - if (!LOCATION_PROPERTIES.contains(lowerKey) && !FILE_FORMAT_PROPERTIES.contains(lowerKey)) { - throw new AnalysisException("Invalid property: " + key); - } - if (DEPRECATED_KEYS.contains(lowerKey)) { - lowerKey = S3Properties.S3_PREFIX + lowerKey; - } - fileParams.put(lowerKey, entry.getValue()); + // 2. analyze uri and other properties + String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null); + if (Strings.isNullOrEmpty(uriStr)) { + throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI)); } + forwardCompatibleDeprecatedKeys(otherProps); - if (!fileParams.containsKey(S3_URI)) { - throw new AnalysisException("Missing required property: " + S3_URI); - } - - forceVirtualHosted = isVirtualHosted(fileParams); - s3uri = getS3Uri(fileParams); - final String endpoint = forceVirtualHosted - ? getEndpointAndSetVirtualBucket(params) - : s3uri.getBucketScheme(); - if (!fileParams.containsKey(S3Properties.REGION)) { + String usePathStyle = getOrDefaultAndRemove(otherProps, PropertyConverter.USE_PATH_STYLE, "false"); + boolean forceVirtualHosted = isVirtualHosted(uriStr, Boolean.parseBoolean(usePathStyle)); + S3URI s3uri = getS3Uri(uriStr, forceVirtualHosted); + String endpoint = forceVirtualHosted + ? getEndpointAndSetVirtualBucket(s3uri, otherProps) : s3uri.getBucketScheme(); + if (!otherProps.containsKey(S3Properties.REGION)) { String region = S3Properties.getRegionOfEndpoint(endpoint); - fileParams.put(S3Properties.REGION, region); + otherProps.put(S3Properties.REGION, region); } + checkNecessaryS3Properties(otherProps); CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(endpoint, - fileParams.get(S3Properties.REGION), - fileParams.get(S3Properties.ACCESS_KEY), - fileParams.get(S3Properties.SECRET_KEY)); - if (fileParams.containsKey(S3Properties.SESSION_TOKEN)) { - credential.setSessionToken(fileParams.get(S3Properties.SESSION_TOKEN)); + otherProps.get(S3Properties.REGION), + otherProps.get(S3Properties.ACCESS_KEY), + otherProps.get(S3Properties.SECRET_KEY)); + if (otherProps.containsKey(S3Properties.SESSION_TOKEN)) { + credential.setSessionToken(otherProps.get(S3Properties.SESSION_TOKEN)); } - // set S3 location properties - // these five properties is necessary, no one can be lost. locationProperties = S3Properties.credentialToMap(credential); - String usePathStyle = fileParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false"); locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle); - this.locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(this.locationProperties)); - - super.parseProperties(fileParams); + locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties)); if (forceVirtualHosted) { filePath = NAME + S3URI.SCHEME_DELIM + virtualBucket + S3URI.PATH_DELIM @@ -130,39 +104,59 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { } } - private String getEndpointAndSetVirtualBucket(Map params) throws AnalysisException { - Preconditions.checkState(forceVirtualHosted, "only invoked when force virtual hosted."); - String[] fileds = s3uri.getVirtualBucket().split("\\.", 2); - virtualBucket = fileds[0]; - if (fileds.length > 1) { + private void forwardCompatibleDeprecatedKeys(Map props) { + for (String deprecatedKey : DEPRECATED_KEYS) { + String value = props.remove(deprecatedKey); + if (!Strings.isNullOrEmpty(value)) { + props.put("s3." + deprecatedKey.toLowerCase(), value); + } + } + } + + private void checkNecessaryS3Properties(Map props) throws AnalysisException { + if (Strings.isNullOrEmpty(props.get(S3Properties.REGION))) { + throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.REGION)); + } + if (Strings.isNullOrEmpty(props.get(S3Properties.ACCESS_KEY))) { + throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ACCESS_KEY)); + } + if (Strings.isNullOrEmpty(props.get(S3Properties.SECRET_KEY))) { + throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.SECRET_KEY)); + } + } + + private String getEndpointAndSetVirtualBucket(S3URI s3uri, Map props) + throws AnalysisException { + String[] fields = s3uri.getVirtualBucket().split("\\.", 2); + virtualBucket = fields[0]; + if (fields.length > 1) { // At this point, s3uri.getVirtualBucket() is: virtualBucket.endpoint, Eg: // uri: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt // s3uri.getVirtualBucket() = my_bucket.cos.ap-beijing.myqcloud.com, // so we need separate virtualBucket and endpoint. - return fileds[1]; - } else if (params.containsKey(S3Properties.ENDPOINT)) { - return params.get(S3Properties.ENDPOINT); + return fields[1]; + } else if (props.containsKey(S3Properties.ENDPOINT)) { + return props.get(S3Properties.ENDPOINT); } else { throw new AnalysisException("can not parse endpoint, please check uri."); } } - private boolean isVirtualHosted(Map validParams) { - String originUri = validParams.getOrDefault(S3_URI, ""); - if (originUri.toLowerCase().startsWith("s3")) { + private boolean isVirtualHosted(String uri, boolean usePathStyle) { + if (uri.toLowerCase().startsWith("s3")) { // s3 protocol, default virtual-hosted style return true; } else { // not s3 protocol, forceVirtualHosted is determined by USE_PATH_STYLE. - return !Boolean.parseBoolean(validParams.get(PropertyConverter.USE_PATH_STYLE)); + return !usePathStyle; } } - private S3URI getS3Uri(Map validParams) throws AnalysisException { + private S3URI getS3Uri(String uri, boolean forceVirtualHosted) throws AnalysisException { try { - return S3URI.create(validParams.get(S3_URI), forceVirtualHosted); + return S3URI.create(uri, forceVirtualHosted); } catch (UserException e) { - throw new AnalysisException("parse s3 uri failed, uri = " + validParams.get(S3_URI), e); + throw new AnalysisException("parse s3 uri failed, uri = " + uri, e); } } @@ -189,3 +183,4 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { return "S3TableValuedFunction"; } } + diff --git a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java index f664415e6d..e5b06bd5dd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunctionTest.java @@ -21,6 +21,8 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.util.FileFormatConstants; +import org.apache.doris.common.util.FileFormatUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -35,12 +37,12 @@ public class ExternalFileTableValuedFunctionTest { public void testCsvSchemaParse() { Config.enable_date_conversion = true; Map properties = Maps.newHashMap(); - properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, + properties.put(FileFormatConstants.PROP_CSV_SCHEMA, "k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:bool;" + "k8:char(10);k9:varchar(20);k10:date;k11:datetime;k12:decimal(10,2)"); List csvSchema = Lists.newArrayList(); try { - ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, properties); + FileFormatUtils.parseCsvSchema(csvSchema, properties.get(FileFormatConstants.PROP_CSV_SCHEMA)); Assert.fail(); } catch (AnalysisException e) { e.printStackTrace(); @@ -48,11 +50,11 @@ public class ExternalFileTableValuedFunctionTest { } csvSchema.clear(); - properties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, + properties.put(FileFormatConstants.PROP_CSV_SCHEMA, "k1:int;k2:bigint;k3:float;k4:double;k5:smallint;k6:tinyint;k7:boolean;" + "k8:string;k9:date;k10:datetime;k11:decimal(10, 2);k12:decimal( 38,10); k13:datetime(5)"); try { - ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, properties); + FileFormatUtils.parseCsvSchema(csvSchema, properties.get(FileFormatConstants.PROP_CSV_SCHEMA)); Assert.assertEquals(13, csvSchema.size()); Column decimalCol = csvSchema.get(10); Assert.assertEquals(10, decimalCol.getPrecision()); diff --git a/regression-test/data/load_p0/http_stream/student_with_names.csv b/regression-test/data/load_p0/http_stream/student_with_names.csv index 62d32e39f4..dda69a5c0f 100644 --- a/regression-test/data/load_p0/http_stream/student_with_names.csv +++ b/regression-test/data/load_p0/http_stream/student_with_names.csv @@ -1,11 +1,11 @@ -id,name,age -1,alice,18 -2,bob,20 -3,jack,24 -4,jackson,19 -5,liming,18 -6,luffy,20 -7,zoro,22 -8,sanzi,26 -9,wusuopu,21 -10,nami,18 \ No newline at end of file +id name age +1 alice 18 +2 bob 20 +3 jack 24 +4 jackson 19 +5 liming 18 +6 luffy 20 +7 zoro 22 +8 sanzi 26 +9 wusuopu 21 +10 nami 18 diff --git a/regression-test/data/load_p0/http_stream/student_with_names_and_types.csv b/regression-test/data/load_p0/http_stream/student_with_names_and_types.csv index 4e88aef6d8..c80c3b4768 100644 --- a/regression-test/data/load_p0/http_stream/student_with_names_and_types.csv +++ b/regression-test/data/load_p0/http_stream/student_with_names_and_types.csv @@ -1,12 +1,12 @@ -id,name,age -INT,STRING,INT -1,alice,18 -2,bob,20 -3,jack,24 -4,jackson,19 -5,liming,18 -6,luffy,20 -7,zoro,22 -8,sanzi,26 -9,wusuopu,21 -10,nami,18 \ No newline at end of file +id name age +INT STRING INT +1 alice 18 +2 bob 20 +3 jack 24 +4 jackson 19 +5 liming 18 +6 luffy 20 +7 zoro 22 +8 sanzi 26 +9 wusuopu 21 +10 nami 18 diff --git a/regression-test/data/load_p0/http_stream/test_http_stream.csv b/regression-test/data/load_p0/http_stream/test_http_stream.csv index d5df966cda..1dfb9d34df 100644 --- a/regression-test/data/load_p0/http_stream/test_http_stream.csv +++ b/regression-test/data/load_p0/http_stream/test_http_stream.csv @@ -1,11 +1,11 @@ -10000,aa,北京,0,11,4444,5555555,41232314,3.14,123.3423,111.111,111.111,2017-10-01,2017-10-01,2017-10-01 06:00:00,2017-10-01 06:00:00 -10001,bb,北京,0,22,3333,666,2768658,5.32,123111.3242,222.222,222.222,2017-10-02,2017-10-02,2017-10-02 07:00:00,2017-10-02 07:00:00 -10002,cc,北京,1,33,2222,453,5463456,4.321,11111.23423,333.333,333.333,2017-10-03,2017-10-03,2017-10-03 17:05:45,2017-10-03 17:05:45 -10003,dd,上海,1,44,1111,-3241,-45235,1.34,54626.324,444.444,444.444,2017-10-04,2017-10-04,2017-10-04 12:59:12,2017-10-04 12:59:12 -10004,ee,成都,0,55,-9999,21342,4513456,1.22,111.33,555.555,555.555,2017-10-05,2017-10-05,2017-10-05 11:20:00,2017-10-05 11:20:00 -10005,ff,西安,0,66,8888,64562,4356,9.133,23423.45,666.666,666.666,2017-10-06,2017-10-06,2017-10-06 12:00:15,2017-10-06 12:00:15 -10006,gg,深圳,1,77,-7777,-12313342,34534,8.100,12,777.777,777.777,2017-10-07,2017-10-07,2017-10-07 13:20:22,2017-10-07 13:20:22 -10007,hh,杭州,0,88,6666,314234,43535356,34.124,324,888.888,888.888,2017-10-08,2017-10-08,2017-10-08 14:58:10,2017-10-08 14:58:10 -10008,ii,上海,1,99,-5555,1341,23434534,342.120,34234.1,999.999,999.999,2017-10-09,2017-10-09,2017-10-09 23:12:22,2017-10-09 23:12:22 -10009,jj,南京,0,11,4444,-123,53623567,11.22,324.33,111.111,111.111,2017-10-10,2017-10-10,2017-10-10 16:25:42,2017-10-10 16:25:42 -10010,kk,成都,0,22,-3333,12314,674567,13,45464.435,222.222,222.222,2017-10-11,2017-10-11,2017-10-11 17:22:24,2017-10-11 17:22:24 \ No newline at end of file +10000 aa 北京 0 11 4444 5555555 41232314 3.14 123.3423 111.111 111.111 2017-10-01 2017-10-01 2017-10-01 06:00:00 2017-10-01 06:00:00 +10001 bb 北京 0 22 3333 666 2768658 5.32 123111.3242 222.222 222.222 2017-10-02 2017-10-02 2017-10-02 07:00:00 2017-10-02 07:00:00 +10002 cc 北京 1 33 2222 453 5463456 4.321 11111.23423 333.333 333.333 2017-10-03 2017-10-03 2017-10-03 17:05:45 2017-10-03 17:05:45 +10003 dd 上海 1 44 1111 -3241 -45235 1.34 54626.324 444.444 444.444 2017-10-04 2017-10-04 2017-10-04 12:59:12 2017-10-04 12:59:12 +10004 ee 成都 0 55 -9999 21342 4513456 1.22 111.33 555.555 555.555 2017-10-05 2017-10-05 2017-10-05 11:20:00 2017-10-05 11:20:00 +10005 ff 西安 0 66 8888 64562 4356 9.133 23423.45 666.666 666.666 2017-10-06 2017-10-06 2017-10-06 12:00:15 2017-10-06 12:00:15 +10006 gg 深圳 1 77 -7777 -12313342 34534 8.100 12 777.777 777.777 2017-10-07 2017-10-07 2017-10-07 13:20:22 2017-10-07 13:20:22 +10007 hh 杭州 0 88 6666 314234 43535356 34.124 324 888.888 888.888 2017-10-08 2017-10-08 2017-10-08 14:58:10 2017-10-08 14:58:10 +10008 ii 上海 1 99 -5555 1341 23434534 342.120 34234.1 999.999 999.999 2017-10-09 2017-10-09 2017-10-09 23:12:22 2017-10-09 23:12:22 +10009 jj 南京 0 11 4444 -123 53623567 11.22 324.33 111.111 111.111 2017-10-10 2017-10-10 2017-10-10 16:25:42 2017-10-10 16:25:42 +10010 kk 成都 0 22 -3333 12314 674567 13 45464.435 222.222 222.222 2017-10-11 2017-10-11 2017-10-11 17:22:24 2017-10-11 17:22:24 diff --git a/regression-test/data/load_p0/http_stream/test_http_stream_data_model.csv b/regression-test/data/load_p0/http_stream/test_http_stream_data_model.csv index 28a17b7e4e..20ec529155 100644 --- a/regression-test/data/load_p0/http_stream/test_http_stream_data_model.csv +++ b/regression-test/data/load_p0/http_stream/test_http_stream_data_model.csv @@ -1,11 +1,11 @@ -10000,aa,北京,21,0,1234567,北京,2017-03-11 06:01:02 -10000,aa,西安,22,0,1234567,陕西西安,2016-02-21 07:05:01 -10001,bb,天津,33,1,1234567,天津,2019-01-11 17:05:45 -10001,bb,上海,20,1,1234567,上海,2012-05-22 12:59:12 -10002,bb,上海,20,1,1234567,上海,2013-06-02 12:59:12 -10003,cc,广州,32,0,1234567,广东广州,2014-07-02 11:20:00 -10003,cc,广州,32,0,1234567,广东广州,2015-08-12 11:25:00 -10004,dd,深圳,33,1,1234567,广东深圳,2016-12-01 14:04:15 -10004,dd,杭州,47,0,1234567,浙江杭州,2017-11-23 13:26:22 -10005,dd,深圳,19,0,1234567,广东深圳,2018-10-03 12:27:22 -10005,ee,成都,21,1,1234567,四川成都,2019-09-03 11:24:22 \ No newline at end of file +10000 aa 北京 21 0 1234567 北京 2017-03-11 06:01:02 +10000 aa 西安 22 0 1234567 陕西西安 2016-02-21 07:05:01 +10001 bb 天津 33 1 1234567 天津 2019-01-11 17:05:45 +10001 bb 上海 20 1 1234567 上海 2012-05-22 12:59:12 +10002 bb 上海 20 1 1234567 上海 2013-06-02 12:59:12 +10003 cc 广州 32 0 1234567 广东广州 2014-07-02 11:20:00 +10003 cc 广州 32 0 1234567 广东广州 2015-08-12 11:25:00 +10004 dd 深圳 33 1 1234567 广东深圳 2016-12-01 14:04:15 +10004 dd 杭州 47 0 1234567 浙江杭州 2017-11-23 13:26:22 +10005 dd 深圳 19 0 1234567 广东深圳 2018-10-03 12:27:22 +10005 ee 成都 21 1 1234567 四川成都 2019-09-03 11:24:22 diff --git a/regression-test/data/load_p0/http_stream/test_http_stream_multiple_times.csv b/regression-test/data/load_p0/http_stream/test_http_stream_multiple_times.csv index 1bbcf9f4d4..1383f0b40a 100644 --- a/regression-test/data/load_p0/http_stream/test_http_stream_multiple_times.csv +++ b/regression-test/data/load_p0/http_stream/test_http_stream_multiple_times.csv @@ -1,500 +1,500 @@ -541,彭秀英,675 -542,江明,248 -543,雷秀英,146 -544,毛杰,13 -545,孟明,440 -546,许强,750 -547,李洋,707 -548,易娜,969 -549,韩强,347 -550,孔磊,880 -551,杨静,708 -552,万敏,139 -553,丁伟,668 -554,吕霞,466 -555,彭涛,176 -556,孟磊,507 -557,锺洋,945 -558,汪洋,447 -559,谭涛,369 -560,姚秀英,317 -561,熊洋,556 -562,白静,949 -563,崔军,385 -564,陆静,143 -565,孟磊,769 -566,丁娜,469 -567,崔军,258 -568,傅秀兰,231 -569,康刚,414 -570,李艳,919 -571,范艳,986 -572,林勇,72 -573,方敏,454 -574,傅涛,647 -575,程洋,996 -576,沈秀兰,155 -577,吴杰,556 -578,石伟,306 -579,顾秀英,498 -580,谭秀英,734 -581,段娟,618 -582,锺霞,686 -583,汤秀兰,694 -584,魏明,395 -585,徐丽,717 -586,周芳,462 -587,尹霞,991 -588,彭杰,885 -589,崔洋,141 -590,杜丽,849 -591,万涛,575 -592,姜洋,686 -593,邵伟,35 -594,钱娜,289 -595,罗艳,540 -596,许强,260 -597,罗明,581 -598,钱秀英,3 -599,方敏,538 -600,潘秀英,456 -601,唐强,420 -602,黎杰,331 -603,魏勇,706 -604,阎敏,881 -605,程平,57 -606,方军,487 -607,邵艳,947 -608,许娜,798 -609,蒋秀兰,552 -610,梁平,273 -611,乔勇,811 -612,宋勇,178 -613,乔杰,956 -614,贾丽,979 -615,刘霞,517 -616,江超,946 -617,周超,874 -618,姜明,74 -619,姜强,249 -620,邹强,296 -621,刘洋,424 -622,汤芳,379 -623,阎平,608 -624,万敏,802 -625,陈洋,699 -626,易伟,86 -627,马艳,562 -628,魏洋,298 -629,赵洋,489 -630,郝明,355 -631,武霞,91 -632,刘明,620 -633,毛强,518 -634,高霞,17 -635,康杰,16 -636,高艳,991 -637,文芳,626 -638,谭涛,163 -639,田桂英,427 -640,林娟,395 -641,龚静,294 -642,姜涛,125 -643,常磊,485 -644,邓静,205 -645,姜刚,180 -646,崔杰,415 -647,孙娟,967 -648,许强,438 -649,高超,996 -650,陈勇,285 -651,叶静,689 -652,赖勇,861 -653,曾静,335 -654,乔刚,432 -655,郝桂英,510 -656,孟桂英,39 -657,任伟,707 -658,邱平,78 -659,曾洋,21 -660,汤芳,741 -661,侯芳,301 -662,顾涛,950 -663,段平,156 -664,何霞,715 -665,毛杰,40 -666,赖芳,517 -667,潘丽,812 -668,梁秀英,924 -669,邱强,237 -670,易超,778 -671,袁明,828 -672,李静,44 -673,谢敏,393 -674,汪勇,818 -675,龙平,705 -676,孔刚,718 -677,邵娜,232 -678,蒋静,17 -679,刘秀英,94 -680,郭娟,423 -681,戴明,748 -682,沈明,185 -683,蒋军,503 -684,林静,831 -685,韩艳,865 -686,毛勇,403 -687,杜娟,477 -688,乔秀英,197 -689,史静,198 -690,苏娟,400 -691,阎娜,991 -692,乔霞,30 -693,许勇,36 -694,黎磊,484 -695,李敏,741 -696,董秀兰,925 -697,石娟,685 -698,魏平,939 -699,魏刚,761 -700,马强,689 -701,万杰,799 -702,蔡杰,803 -703,邹艳,358 -704,薛艳,21 -705,夏娜,430 -706,邓军,725 -707,方军,909 -708,秦静,31 -709,黄静,263 -710,贾艳,678 -711,杨敏,664 -712,秦秀英,281 -713,黎磊,174 -714,郝洋,335 -715,刘芳,795 -716,吕娜,975 -717,丁娟,575 -718,石刚,790 -719,金秀兰,438 -720,钱刚,499 -721,段芳,720 -722,姚敏,158 -723,卢芳,234 -724,程娜,607 -725,尹伟,11 -726,宋洋,515 -727,罗刚,262 -728,曾强,406 -729,董霞,722 -730,史芳,697 -731,邹平,599 -732,胡秀英,860 -733,黎明,181 -734,邱艳,241 -735,顾敏,446 -736,易静,48 -737,梁芳,225 -738,郑秀英,493 -739,黎平,51 -740,吴明,301 -741,苏娜,900 -742,徐娟,195 -743,萧涛,742 -744,李静,487 -745,龚娜,240 -746,赵芳,664 -747,尹涛,787 -748,黄芳,293 -749,朱涛,448 -750,邵超,572 -751,曾静,622 -752,宋娜,952 -753,田超,968 -754,徐强,111 -755,薛磊,496 -756,陈磊,355 -757,彭霞,156 -758,田明,148 -759,郝勇,850 -760,陆明,162 -761,罗娟,105 -762,孔芳,958 -763,段军,392 -764,罗明,49 -765,宋芳,464 -766,林丽,163 -767,锺勇,140 -768,田平,31 -769,许强,598 -770,熊洋,774 -771,宋洋,455 -772,汤超,927 -773,郭艳,986 -774,陆娟,77 -775,邱芳,661 -776,曹洋,335 -777,宋秀英,123 -778,龚平,515 -779,张霞,664 -780,廖杰,55 -781,范芳,621 -782,刘秀英,79 -783,贺娜,600 -784,秦娟,527 -785,汤敏,204 -786,丁娜,589 -787,郝敏,547 -788,魏桂英,237 -789,曾涛,517 -790,朱伟,380 -791,武超,312 -792,彭芳,545 -793,郑军,338 -794,崔娟,238 -795,顾秀英,325 -796,万军,989 -797,邱伟,991 -798,蔡勇,184 -799,许杰,863 -800,侯静,961 -801,高秀兰,362 -802,刘芳,583 -803,尹敏,625 -804,邹平,910 -805,钱洋,40 -806,苏秀兰,845 -807,侯明,648 -808,侯超,959 -809,龙刚,527 -810,丁娟,867 -811,崔敏,167 -812,金洋,355 -813,秦平,250 -814,钱秀英,628 -815,陆霞,904 -816,唐丽,226 -817,任丽,906 -818,梁明,689 -819,陈霞,548 -820,宋敏,939 -821,石洋,666 -822,张秀英,98 -823,廖秀兰,67 -824,蔡军,96 -825,董强,442 -826,马伟,740 -827,董秀英,714 -828,魏军,403 -829,于桂英,615 -830,姚涛,140 -831,魏杰,845 -832,马平,871 -833,秦娜,606 -834,顾敏,421 -835,彭军,810 -836,贺超,212 -837,乔芳,524 -838,沈涛,502 -839,黄敏,198 -840,崔涛,917 -841,范霞,57 -842,崔洋,894 -843,雷芳,398 -844,毛勇,247 -845,郑军,241 -846,孔静,197 -847,马刚,805 -848,吴超,208 -849,毛娟,466 -850,雷丽,518 -851,陈杰,957 -852,吴娟,704 -853,郭娟,34 -854,梁明,103 -855,潘强,106 -856,杨超,394 -857,常军,537 -858,王秀英,889 -859,张超,412 -860,常敏,533 -861,毛桂英,206 -862,蔡涛,591 -863,史敏,179 -864,胡明,619 -865,易秀英,5 -866,吕军,787 -867,熊涛,53 -868,戴娜,474 -869,丁芳,553 -870,朱艳,432 -871,宋静,842 -872,吴刚,220 -873,汪丽,648 -874,戴伟,48 -875,龙伟,948 -876,汤磊,323 -877,叶伟,686 -878,侯洋,210 -879,顾超,186 -880,段秀英,147 -881,邓芳,804 -882,阎艳,20 -883,孔丽,904 -884,李娟,664 -885,吕娜,110 -886,萧勇,994 -887,罗丽,53 -888,卢艳,658 -889,彭杰,803 -890,刘洋,89 -891,余明,655 -892,史秀英,449 -893,谢刚,712 -894,孙涛,16 -895,韩敏,900 -896,田勇,456 -897,许勇,520 -898,郝涛,713 -899,魏超,643 -900,顾明,105 -901,吴强,819 -902,黎磊,633 -903,卢强,854 -904,徐秀兰,859 -905,方娟,145 -906,韩磊,764 -907,吕娟,545 -908,尹杰,361 -909,吴洋,601 -910,徐超,915 -911,贺平,575 -912,张秀英,419 -913,阎军,960 -914,魏杰,316 -915,黄强,263 -916,杜平,653 -917,贾明,510 -918,余芳,276 -919,黄敏,811 -920,罗刚,822 -921,杜敏,285 -922,黎娟,720 -923,谭刚,979 -924,顾桂英,869 -925,何娟,502 -926,何敏,24 -927,康明,649 -928,谢军,263 -929,罗芳,960 -930,唐军,695 -931,余军,462 -932,乔敏,107 -933,毛明,382 -934,赵丽,523 -935,吕霞,190 -936,康秀兰,37 -937,武勇,369 -938,刘伟,1000 -939,范静,332 -940,赖霞,269 -941,康霞,666 -942,尹涛,988 -943,贾勇,384 -944,汪秀兰,103 -945,邵洋,410 -946,袁伟,695 -947,韩军,488 -948,龚强,654 -949,沈涛,28 -950,顾娜,465 -951,姜超,540 -952,熊涛,865 -953,刘磊,959 -954,钱敏,167 -955,卢伟,514 -956,曾强,25 -957,刘洋,136 -958,赖娟,95 -959,邵涛,510 -960,许勇,322 -961,潘勇,235 -962,杜勇,765 -963,徐平,608 -964,周娟,211 -965,曹超,137 -966,乔艳,659 -967,范霞,361 -968,汪伟,384 -969,杨秀兰,945 -970,田强,87 -971,孙超,693 -972,卢敏,292 -973,崔涛,629 -974,马洋,816 -975,薛娟,439 -976,侯敏,478 -977,田洋,790 -978,乔洋,549 -979,卢丽,716 -980,叶磊,65 -981,金杰,657 -982,郝静,505 -983,顾军,171 -984,孙艳,871 -985,田丽,108 -986,叶秀兰,128 -987,丁芳,258 -988,姚娟,875 -989,段芳,333 -990,龙勇,213 -991,白刚,865 -992,袁静,257 -993,何娜,613 -994,潘涛,130 -995,程敏,68 -996,魏艳,637 -997,顾秀英,383 -998,程强,629 -999,姚芳,123 -1000,袁秀英,695 -1001,黎刚,157 -1002,彭敏,168 -1003,邓涛,363 -1004,熊军,97 -1005,姚秀兰,651 -1006,王平,51 -1007,陈涛,142 -1008,锺敏,529 -1009,石静,407 -1010,黄秀英,842 -1011,苏军,873 -1012,马磊,585 -1013,史强,138 -1014,傅秀兰,75 -1015,孔涛,234 -1016,周娜,34 -1017,邹秀兰,832 -1018,田杰,750 -1019,韩涛,954 -1020,程桂英,181 -1021,蔡静,845 -1022,苏丽,553 -1023,江磊,65 -1024,罗艳,549 -1025,邹桂英,480 -1026,阎敏,579 -1027,段涛,421 -1028,董涛,304 -1029,孙军,118 -1030,方芳,656 -1031,林涛,418 -1032,何明,84 -1033,韩涛,926 -1034,董秀兰,955 -1035,蒋磊,464 -1036,杨秀英,257 -1037,谢伟,767 -1038,廖秀兰,631 -1039,朱秀兰,99 -1040,刘刚,841 +541 彭秀英 675 +542 江明 248 +543 雷秀英 146 +544 毛杰 13 +545 孟明 440 +546 许强 750 +547 李洋 707 +548 易娜 969 +549 韩强 347 +550 孔磊 880 +551 杨静 708 +552 万敏 139 +553 丁伟 668 +554 吕霞 466 +555 彭涛 176 +556 孟磊 507 +557 锺洋 945 +558 汪洋 447 +559 谭涛 369 +560 姚秀英 317 +561 熊洋 556 +562 白静 949 +563 崔军 385 +564 陆静 143 +565 孟磊 769 +566 丁娜 469 +567 崔军 258 +568 傅秀兰 231 +569 康刚 414 +570 李艳 919 +571 范艳 986 +572 林勇 72 +573 方敏 454 +574 傅涛 647 +575 程洋 996 +576 沈秀兰 155 +577 吴杰 556 +578 石伟 306 +579 顾秀英 498 +580 谭秀英 734 +581 段娟 618 +582 锺霞 686 +583 汤秀兰 694 +584 魏明 395 +585 徐丽 717 +586 周芳 462 +587 尹霞 991 +588 彭杰 885 +589 崔洋 141 +590 杜丽 849 +591 万涛 575 +592 姜洋 686 +593 邵伟 35 +594 钱娜 289 +595 罗艳 540 +596 许强 260 +597 罗明 581 +598 钱秀英 3 +599 方敏 538 +600 潘秀英 456 +601 唐强 420 +602 黎杰 331 +603 魏勇 706 +604 阎敏 881 +605 程平 57 +606 方军 487 +607 邵艳 947 +608 许娜 798 +609 蒋秀兰 552 +610 梁平 273 +611 乔勇 811 +612 宋勇 178 +613 乔杰 956 +614 贾丽 979 +615 刘霞 517 +616 江超 946 +617 周超 874 +618 姜明 74 +619 姜强 249 +620 邹强 296 +621 刘洋 424 +622 汤芳 379 +623 阎平 608 +624 万敏 802 +625 陈洋 699 +626 易伟 86 +627 马艳 562 +628 魏洋 298 +629 赵洋 489 +630 郝明 355 +631 武霞 91 +632 刘明 620 +633 毛强 518 +634 高霞 17 +635 康杰 16 +636 高艳 991 +637 文芳 626 +638 谭涛 163 +639 田桂英 427 +640 林娟 395 +641 龚静 294 +642 姜涛 125 +643 常磊 485 +644 邓静 205 +645 姜刚 180 +646 崔杰 415 +647 孙娟 967 +648 许强 438 +649 高超 996 +650 陈勇 285 +651 叶静 689 +652 赖勇 861 +653 曾静 335 +654 乔刚 432 +655 郝桂英 510 +656 孟桂英 39 +657 任伟 707 +658 邱平 78 +659 曾洋 21 +660 汤芳 741 +661 侯芳 301 +662 顾涛 950 +663 段平 156 +664 何霞 715 +665 毛杰 40 +666 赖芳 517 +667 潘丽 812 +668 梁秀英 924 +669 邱强 237 +670 易超 778 +671 袁明 828 +672 李静 44 +673 谢敏 393 +674 汪勇 818 +675 龙平 705 +676 孔刚 718 +677 邵娜 232 +678 蒋静 17 +679 刘秀英 94 +680 郭娟 423 +681 戴明 748 +682 沈明 185 +683 蒋军 503 +684 林静 831 +685 韩艳 865 +686 毛勇 403 +687 杜娟 477 +688 乔秀英 197 +689 史静 198 +690 苏娟 400 +691 阎娜 991 +692 乔霞 30 +693 许勇 36 +694 黎磊 484 +695 李敏 741 +696 董秀兰 925 +697 石娟 685 +698 魏平 939 +699 魏刚 761 +700 马强 689 +701 万杰 799 +702 蔡杰 803 +703 邹艳 358 +704 薛艳 21 +705 夏娜 430 +706 邓军 725 +707 方军 909 +708 秦静 31 +709 黄静 263 +710 贾艳 678 +711 杨敏 664 +712 秦秀英 281 +713 黎磊 174 +714 郝洋 335 +715 刘芳 795 +716 吕娜 975 +717 丁娟 575 +718 石刚 790 +719 金秀兰 438 +720 钱刚 499 +721 段芳 720 +722 姚敏 158 +723 卢芳 234 +724 程娜 607 +725 尹伟 11 +726 宋洋 515 +727 罗刚 262 +728 曾强 406 +729 董霞 722 +730 史芳 697 +731 邹平 599 +732 胡秀英 860 +733 黎明 181 +734 邱艳 241 +735 顾敏 446 +736 易静 48 +737 梁芳 225 +738 郑秀英 493 +739 黎平 51 +740 吴明 301 +741 苏娜 900 +742 徐娟 195 +743 萧涛 742 +744 李静 487 +745 龚娜 240 +746 赵芳 664 +747 尹涛 787 +748 黄芳 293 +749 朱涛 448 +750 邵超 572 +751 曾静 622 +752 宋娜 952 +753 田超 968 +754 徐强 111 +755 薛磊 496 +756 陈磊 355 +757 彭霞 156 +758 田明 148 +759 郝勇 850 +760 陆明 162 +761 罗娟 105 +762 孔芳 958 +763 段军 392 +764 罗明 49 +765 宋芳 464 +766 林丽 163 +767 锺勇 140 +768 田平 31 +769 许强 598 +770 熊洋 774 +771 宋洋 455 +772 汤超 927 +773 郭艳 986 +774 陆娟 77 +775 邱芳 661 +776 曹洋 335 +777 宋秀英 123 +778 龚平 515 +779 张霞 664 +780 廖杰 55 +781 范芳 621 +782 刘秀英 79 +783 贺娜 600 +784 秦娟 527 +785 汤敏 204 +786 丁娜 589 +787 郝敏 547 +788 魏桂英 237 +789 曾涛 517 +790 朱伟 380 +791 武超 312 +792 彭芳 545 +793 郑军 338 +794 崔娟 238 +795 顾秀英 325 +796 万军 989 +797 邱伟 991 +798 蔡勇 184 +799 许杰 863 +800 侯静 961 +801 高秀兰 362 +802 刘芳 583 +803 尹敏 625 +804 邹平 910 +805 钱洋 40 +806 苏秀兰 845 +807 侯明 648 +808 侯超 959 +809 龙刚 527 +810 丁娟 867 +811 崔敏 167 +812 金洋 355 +813 秦平 250 +814 钱秀英 628 +815 陆霞 904 +816 唐丽 226 +817 任丽 906 +818 梁明 689 +819 陈霞 548 +820 宋敏 939 +821 石洋 666 +822 张秀英 98 +823 廖秀兰 67 +824 蔡军 96 +825 董强 442 +826 马伟 740 +827 董秀英 714 +828 魏军 403 +829 于桂英 615 +830 姚涛 140 +831 魏杰 845 +832 马平 871 +833 秦娜 606 +834 顾敏 421 +835 彭军 810 +836 贺超 212 +837 乔芳 524 +838 沈涛 502 +839 黄敏 198 +840 崔涛 917 +841 范霞 57 +842 崔洋 894 +843 雷芳 398 +844 毛勇 247 +845 郑军 241 +846 孔静 197 +847 马刚 805 +848 吴超 208 +849 毛娟 466 +850 雷丽 518 +851 陈杰 957 +852 吴娟 704 +853 郭娟 34 +854 梁明 103 +855 潘强 106 +856 杨超 394 +857 常军 537 +858 王秀英 889 +859 张超 412 +860 常敏 533 +861 毛桂英 206 +862 蔡涛 591 +863 史敏 179 +864 胡明 619 +865 易秀英 5 +866 吕军 787 +867 熊涛 53 +868 戴娜 474 +869 丁芳 553 +870 朱艳 432 +871 宋静 842 +872 吴刚 220 +873 汪丽 648 +874 戴伟 48 +875 龙伟 948 +876 汤磊 323 +877 叶伟 686 +878 侯洋 210 +879 顾超 186 +880 段秀英 147 +881 邓芳 804 +882 阎艳 20 +883 孔丽 904 +884 李娟 664 +885 吕娜 110 +886 萧勇 994 +887 罗丽 53 +888 卢艳 658 +889 彭杰 803 +890 刘洋 89 +891 余明 655 +892 史秀英 449 +893 谢刚 712 +894 孙涛 16 +895 韩敏 900 +896 田勇 456 +897 许勇 520 +898 郝涛 713 +899 魏超 643 +900 顾明 105 +901 吴强 819 +902 黎磊 633 +903 卢强 854 +904 徐秀兰 859 +905 方娟 145 +906 韩磊 764 +907 吕娟 545 +908 尹杰 361 +909 吴洋 601 +910 徐超 915 +911 贺平 575 +912 张秀英 419 +913 阎军 960 +914 魏杰 316 +915 黄强 263 +916 杜平 653 +917 贾明 510 +918 余芳 276 +919 黄敏 811 +920 罗刚 822 +921 杜敏 285 +922 黎娟 720 +923 谭刚 979 +924 顾桂英 869 +925 何娟 502 +926 何敏 24 +927 康明 649 +928 谢军 263 +929 罗芳 960 +930 唐军 695 +931 余军 462 +932 乔敏 107 +933 毛明 382 +934 赵丽 523 +935 吕霞 190 +936 康秀兰 37 +937 武勇 369 +938 刘伟 1000 +939 范静 332 +940 赖霞 269 +941 康霞 666 +942 尹涛 988 +943 贾勇 384 +944 汪秀兰 103 +945 邵洋 410 +946 袁伟 695 +947 韩军 488 +948 龚强 654 +949 沈涛 28 +950 顾娜 465 +951 姜超 540 +952 熊涛 865 +953 刘磊 959 +954 钱敏 167 +955 卢伟 514 +956 曾强 25 +957 刘洋 136 +958 赖娟 95 +959 邵涛 510 +960 许勇 322 +961 潘勇 235 +962 杜勇 765 +963 徐平 608 +964 周娟 211 +965 曹超 137 +966 乔艳 659 +967 范霞 361 +968 汪伟 384 +969 杨秀兰 945 +970 田强 87 +971 孙超 693 +972 卢敏 292 +973 崔涛 629 +974 马洋 816 +975 薛娟 439 +976 侯敏 478 +977 田洋 790 +978 乔洋 549 +979 卢丽 716 +980 叶磊 65 +981 金杰 657 +982 郝静 505 +983 顾军 171 +984 孙艳 871 +985 田丽 108 +986 叶秀兰 128 +987 丁芳 258 +988 姚娟 875 +989 段芳 333 +990 龙勇 213 +991 白刚 865 +992 袁静 257 +993 何娜 613 +994 潘涛 130 +995 程敏 68 +996 魏艳 637 +997 顾秀英 383 +998 程强 629 +999 姚芳 123 +1000 袁秀英 695 +1001 黎刚 157 +1002 彭敏 168 +1003 邓涛 363 +1004 熊军 97 +1005 姚秀兰 651 +1006 王平 51 +1007 陈涛 142 +1008 锺敏 529 +1009 石静 407 +1010 黄秀英 842 +1011 苏军 873 +1012 马磊 585 +1013 史强 138 +1014 傅秀兰 75 +1015 孔涛 234 +1016 周娜 34 +1017 邹秀兰 832 +1018 田杰 750 +1019 韩涛 954 +1020 程桂英 181 +1021 蔡静 845 +1022 苏丽 553 +1023 江磊 65 +1024 罗艳 549 +1025 邹桂英 480 +1026 阎敏 579 +1027 段涛 421 +1028 董涛 304 +1029 孙军 118 +1030 方芳 656 +1031 林涛 418 +1032 何明 84 +1033 韩涛 926 +1034 董秀兰 955 +1035 蒋磊 464 +1036 杨秀英 257 +1037 谢伟 767 +1038 廖秀兰 631 +1039 朱秀兰 99 +1040 刘刚 841 diff --git a/regression-test/suites/export_p2/test_export_max_file_size.groovy b/regression-test/suites/export_p2/test_export_max_file_size.groovy index 64b4ae0186..a66ac35d02 100644 --- a/regression-test/suites/export_p2/test_export_max_file_size.groovy +++ b/regression-test/suites/export_p2/test_export_max_file_size.groovy @@ -77,8 +77,8 @@ suite("test_export_max_file_size", "p2") { insert into ${table_export_name} select * from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}${load_data_path}", - "fs.defaultFS" = "${fs}", "hadoop.username" = "${user_name}", + "column_separator" = ",", "format" = "csv"); """ @@ -130,8 +130,8 @@ suite("test_export_max_file_size", "p2") { insert into ${table_load_name} select * from hdfs( "uri" = "${outfile_url}${j}.csv", - "fs.defaultFS" = "${fs}", "hadoop.username" = "${user_name}", + "column_separator" = ",", "format" = "csv"); """ } diff --git a/regression-test/suites/export_p2/test_export_with_hdfs.groovy b/regression-test/suites/export_p2/test_export_with_hdfs.groovy index 1fc87c2f67..e523fdf5a4 100644 --- a/regression-test/suites/export_p2/test_export_with_hdfs.groovy +++ b/regression-test/suites/export_p2/test_export_with_hdfs.groovy @@ -104,8 +104,8 @@ suite("test_export_with_hdfs", "p2") { // check data correctness order_qt_select """ select * from hdfs( "uri" = "${outfile_url}0.${file_suffix}", - "fs.defaultFS" = "${fs}", "hadoop.username" = "${user_name}", + "column_separator" = ",", "format" = "${format}"); """ } diff --git a/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy b/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy index da291e89dc..8fbd923b51 100644 --- a/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy +++ b/regression-test/suites/export_p2/test_outfile_orc_max_file_size.groovy @@ -63,7 +63,6 @@ suite("test_outfile_orc_max_file_size", "p2") { insert into ${table_export_name} select * from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}${load_data_path}", - "fs.defaultFS" = "${fs}", "hadoop.username" = "${user_name}", "format" = "orc"); """ diff --git a/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy b/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy index 5f856c57e2..693e8b7a06 100644 --- a/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy +++ b/regression-test/suites/external_table_p0/hive/test_different_parquet_types.groovy @@ -35,7 +35,7 @@ suite("test_different_parquet_types", "p0,external,hive,external_docker,external logger.info("record res" + res1_2.toString()) def res1_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_byte_array/delta_byte_array.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_byte_array/delta_byte_array.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res1_3.toString()) } @@ -58,7 +58,7 @@ suite("test_different_parquet_types", "p0,external,hive,external_docker,external //return nothing,but no exception def res3_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_binary_packed/delta_binary_packed.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_binary_packed/delta_binary_packed.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res3_3.toString()) } @@ -76,7 +76,7 @@ suite("test_different_parquet_types", "p0,external,hive,external_docker,external logger.info("record res" + res4_2.toString()) def res4_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_required_column/delta_encoding_required_column.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_required_column/delta_encoding_required_column.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res4_3.toString()) } @@ -95,7 +95,7 @@ suite("test_different_parquet_types", "p0,external,hive,external_docker,external logger.info("record res" + res5_2.toString()) def res5_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_optional_column/delta_encoding_optional_column.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/delta_encoding_optional_column/delta_encoding_optional_column.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res5_3.toString()) } @@ -114,7 +114,7 @@ suite("test_different_parquet_types", "p0,external,hive,external_docker,external logger.info("record res" + res6_2.toString()) def res6_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/datapage_v1-snappy-compressed-checksum/datapage_v1-snappy-compressed-checksum.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/datapage_v1-snappy-compressed-checksum/datapage_v1-snappy-compressed-checksum.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res6_3.toString()) @@ -133,7 +133,7 @@ suite("test_different_parquet_types", "p0,external,hive,external_docker,external logger.info("record res" + res7_2.toString()) def res7_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/overflow_i16_page_cnt/overflow_i16_page_cnt.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/overflow_i16_page_cnt/overflow_i16_page_cnt.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res7_3.toString()) } @@ -152,7 +152,7 @@ suite("test_different_parquet_types", "p0,external,hive,external_docker,external logger.info("record res" + res8_2.toString()) def res8_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages/alltypes_tiny_pages.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages/alltypes_tiny_pages.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res8_3.toString()) } @@ -170,7 +170,7 @@ suite("test_different_parquet_types", "p0,external,hive,external_docker,external logger.info("record res" + res9_2.toString()) def res9_3 = sql """ - select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages_plain/alltypes_tiny_pages_plain.parquet\",\"fs.defaultFS\" = \"hdfs://${externalEnvIp}:${hdfs_port}\",\"format\" = \"parquet\") limit 10 + select * from hdfs(\"uri" = \"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/preinstalled_data/different_types_parquet/alltypes_tiny_pages_plain/alltypes_tiny_pages_plain.parquet\",\"format\" = \"parquet\") limit 10 """ logger.info("record res" + res9_3.toString()) } diff --git a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy index 32315f60e6..61c049a0a0 100644 --- a/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_hdfs_tvf.groovy @@ -34,8 +34,8 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "csv" qt_csv_all_types """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by c1; """ @@ -43,15 +43,14 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "csv" qt_csv_student """ select cast(c1 as INT) as id, c2 as name, c3 as age from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by id; """ uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/array_malformat.csv" format = "csv" qt_csv_array_malformat """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "column_separator" = "|") order by c1; """ @@ -59,7 +58,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/array_normal.csv" format = "csv" qt_csv_array_normal """ select * from HDFS("uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "column_separator" = "|") order by c1; """ @@ -69,9 +67,9 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "csv" qt_csv_with_compress_type """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", + "column_separator" = ",", "compress_type" = "GZ") order by c1; """ // test csv format infer compress type @@ -79,8 +77,8 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "csv" qt_csv_infer_compress_type """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by c1; """ // test csv_with_names file format @@ -88,8 +86,8 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "csv_with_names" qt_csv_names """ select cast(id as INT) as id, name, age from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by id; """ // test csv_with_names_and_types file format @@ -97,8 +95,8 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "csv_with_names_and_types" qt_csv_names_types """ select cast(id as INT) as id, name, age from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", + "column_separator" = ",", "format" = "${format}") order by id; """ @@ -107,7 +105,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "parquet" qt_parquet """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}") order by s_suppkey limit 20; """ @@ -116,7 +113,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "orc" qt_orc """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}") order by p_partkey limit 20; """ @@ -126,7 +122,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_json """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -137,7 +132,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_json_limit1 """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -147,7 +141,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_json_limit2 """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "true", @@ -156,7 +149,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_json_limit3 """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -165,7 +157,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_json_limit4 """ select * from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -177,7 +168,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_json_root """ select cast(id as INT) as id, city, cast(code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -189,7 +179,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_json_paths """ select cast(id as INT) as id, cast(code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -201,7 +190,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_one_array """ select cast(id as INT) as id, city, cast(code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "true", @@ -213,7 +201,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "json" qt_cast """ select cast(id as INT) as id, city, cast(code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -242,7 +229,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { select cast (id as INT) as id, city, cast (code as INT) as code from HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}", "strip_outer_array" = "false", @@ -258,7 +244,6 @@ suite("test_hdfs_tvf","external,hive,tvf,external_docker") { format = "parquet" qt_desc """ desc function HDFS( "uri" = "${uri}", - "fs.defaultFS"= "${defaultFS}", "hadoop.username" = "${hdfsUserName}", "format" = "${format}"); """ } finally { diff --git a/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy b/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy index 40dc3c2440..d71e07487c 100644 --- a/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_hdfs_tvf_compression.groovy @@ -30,7 +30,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem qt_gz_1 """ select ${select_field} from HDFS( "uri" = "${baseUri}/dt=gzip/000000_0.gz", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -40,7 +39,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem qt_gz_2 """ desc function HDFS( "uri" = "${baseUri}/dt=gzip/000000_0.gz", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -52,7 +50,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select ${select_field} from HDFS( "uri" = "${baseUri}/dt=bzip2/000000_0.bz2", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -64,7 +61,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select ${select_field} from HDFS( "uri" = "${baseUri}/dt=deflate/000000_0_copy_1.deflate", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -75,7 +71,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select c7 from HDFS( "uri" = "${baseUri}/dt=deflate/000000_0_copy_1.deflate", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -88,7 +83,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select ${select_field} from HDFS( "uri" = "${baseUri}/dt=plain/000000_0", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -99,7 +93,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select c3,c4,c10 from HDFS( "uri" = "${baseUri}/dt=plain/000000_0", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "column_separator" = '\001', @@ -114,7 +107,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/ckbench_hits.part-00000.snappy.parquet", - "fs.defaultFS" = "${baseFs}", "format" = "parquet" ); """ @@ -124,7 +116,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/ckbench_hits.part-00000.snappy.parquet", - "fs.defaultFS" = "${baseFs}", "format" = "parquet" ); """ @@ -135,7 +126,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/ckbench_hits.000000_0.orc", - "fs.defaultFS" = "${baseFs}", "format" = "orc" ); """ @@ -145,7 +135,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/ckbench_hits.000000_0.orc", - "fs.defaultFS" = "${baseFs}", "format" = "orc" ); """ @@ -156,7 +145,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/tpcds_catalog_returns_data-m-00000.txt", - "fs.defaultFS" = "${baseFs}", "format" = "csv" ); """ @@ -166,7 +154,6 @@ suite("test_hdfs_tvf_compression", "p2,external,tvf,external_remote,external_rem select count(*) from HDFS( "uri" = "${test_data_dir}/test_data/tpcds_catalog_returns_data-m-00000.txt", - "fs.defaultFS" = "${baseFs}", "format" = "csv" ); """ diff --git a/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy b/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy index ad572936ae..dcd98af2f9 100644 --- a/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_path_partition_keys.groovy @@ -27,7 +27,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_1 """ select * from HDFS( "uri" = "${baseUri}/dt1=cyw/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt1" ) order by c1,c2 ; @@ -36,7 +35,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_2 """ select * from HDFS( "uri" = "${baseUri}/dt1=cyw/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt1") where dt1!="cyw" order by c1,c2 limit 3; @@ -45,7 +43,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_3 """ select dt1,c1,count(*) from HDFS( "uri" = "${baseUri}/dt1=hello/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt1") group by c1,dt1 order by c1; @@ -54,7 +51,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_4 """ select * from HDFS( "uri" = "${baseUri}/dt2=two/dt1=hello/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt1") order by c1; @@ -63,7 +59,6 @@ suite("test_path_partition_keys", "p2,external,tvf,external_remote,external_remo order_qt_hdfs_5 """ select * from HDFS( "uri" = "${baseUri}/dt2=two/dt1=cyw/*", - "fs.defaultFS"= "${baseFs}", "hadoop.username" = "hadoop", "format" = "csv", "path_partition_keys"="dt2,dt1"); diff --git a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy b/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy index 57cfdb136d..279fcb5e8a 100644 --- a/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_s3_tvf_compression.groovy @@ -34,6 +34,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}") order by c1,c2,c3,c4,c5 limit 20; """ @@ -47,6 +48,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}") order by cast(c1 as int),c4 limit 20; """ @@ -62,6 +64,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}") order by c1,c2,c3,c4,c5 limit 15; """ @@ -75,6 +78,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}") where c1!="100" order by cast(c4 as date),c1 limit 13; """ @@ -90,6 +94,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}FRAME") order by c1,c2,c3,c4,c5 limit 14; """ @@ -103,6 +108,7 @@ suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remot "s3.secret_key" = "${sk}", "REGION" = "${region}", "FORMAT" = "csv", + "column_separator" = ",", "use_path_style" = "true", "compress_type" ="${compress_type}FRAME") where c3="buHDwfGeNHfpRFdNaogneddi" order by c3,c1 limit 14; """ diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy index 853b5d2f4d..355ae73814 100644 --- a/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_tvf_p2.groovy @@ -23,43 +23,41 @@ suite("test_tvf_p2", "p2,external,tvf,external_remote,external_remote_tvf") { qt_eof_check """select * from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/bad_store_sales.parquet", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}", "format" = "parquet") where ss_store_sk = 4 and ss_addr_sk is null order by ss_item_sk""" // array_ancestor_null.parquet is parquet file whose values in the array column are all nulls in a page qt_array_ancestor_null """select count(list_double_col) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/array_ancestor_null.parquet", - "format" = "parquet", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + "format" = "parquet"); + """ // all_nested_types.parquet is parquet file that contains all complext types qt_nested_types_parquet """select count(array0), count(array1), count(array2), count(array3), count(struct0), count(struct1), count(map0) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/all_nested_types.parquet", - "format" = "parquet", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + "format" = "parquet"); + """ // all_nested_types.orc is orc file that contains all complext types qt_nested_types_orc """select count(array0), count(array1), count(array2), count(array3), count(struct0), count(struct1), count(map0) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/orc/all_nested_types.orc", - "format" = "orc", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + "format" = "orc"); + """ // a row of complex type may be stored across more pages qt_row_cross_pages """select count(id), count(m1), count(m2) from hdfs( "uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet", - "format" = "parquet", - "fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")""" + "format" = "parquet"); + """ // viewfs qt_viewfs """select count(id), count(m1), count(m2) from hdfs( "uri" = "viewfs://my-cluster/ns1/catalog/tvf/parquet/row_cross_pages.parquet", "format" = "parquet", - "fs.defaultFS" = "viewfs://my-cluster", "fs.viewfs.mounttable.my-cluster.link./ns1" = "hdfs://${nameNodeHost}:${hdfsPort}/", "fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")""" } diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy b/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy index 6510571427..32b9bac9c7 100644 --- a/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_tvf_view_count_p2.groovy @@ -26,8 +26,7 @@ suite("test_tvf_view_count_p2", "p2,external,tvf,external_remote,external_remote sql """use test_tvf_view_count_p2""" sql """set enable_nereids_planner=false""" sql """create view tvf_view_count as select * from hdfs ( - "uri"="hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0", - "fs.defaultFS"="hdfs://${nameNodeHost}:${hdfsPort}", + "uri"="hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0", "hadoop.username" = "hadoop", "format"="parquet");""" diff --git a/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy b/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy index 2323fcaff8..8939154bb5 100644 --- a/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy +++ b/regression-test/suites/external_table_p2/tvf/test_tvf_view_p2.groovy @@ -26,8 +26,7 @@ suite("test_tvf_view_p2", "p2,external,tvf,external_remote,external_remote_tvf") sql """use test_tvf_view_p2""" sql """set enable_fallback_to_original_planner=false""" sql """create view tvf_view as select * from hdfs ( - "uri"="hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0", - "fs.defaultFS"="hdfs://${nameNodeHost}:${hdfsPort}", + "uri"="hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0", "hadoop.username" = "hadoop", "format"="parquet");""" @@ -48,8 +47,7 @@ suite("test_tvf_view_p2", "p2,external,tvf,external_remote,external_remote_tvf") } explain{ sql("select * from hdfs (\n" + - " \"uri\"=\"hdfs://${nameNodeHost}:${hdfsPort}:/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0\",\n" + - " \"fs.defaultFS\"=\"hdfs://${nameNodeHost}:${hdfsPort}\",\n" + + " \"uri\"=\"hdfs://${nameNodeHost}:${hdfsPort}/usr/hive/warehouse/tpch_1000_parquet.db/part/000091_0\",\n" + " \"hadoop.username\" = \"hadoop\",\n" + " \"format\"=\"parquet\")") contains("_table_valued_function_hdfs.p_partkey") diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy index bbfc2c30f0..d504078635 100644 --- a/regression-test/suites/load_p0/http_stream/test_http_stream.groovy +++ b/regression-test/suites/load_p0/http_stream/test_http_stream.groovy @@ -540,7 +540,7 @@ suite("test_http_stream", "p0") { streamLoad { set 'version', '1' set 'sql', """ - insert into ${db}.${tableName12} (id, name) select c1, c2 from http_stream("format"="csv", "line_delimiter"="||") + insert into ${db}.${tableName12} (id, name) select c1, c2 from http_stream("format"="csv", "line_delimiter"="||", "column_separator" = ",") """ time 10000 file 'test_http_stream_line_delimiter.csv' diff --git a/regression-test/suites/load_p0/http_stream/test_http_stream_compress.groovy b/regression-test/suites/load_p0/http_stream/test_http_stream_compress.groovy index 17f580263d..5ce3527892 100644 --- a/regression-test/suites/load_p0/http_stream/test_http_stream_compress.groovy +++ b/regression-test/suites/load_p0/http_stream/test_http_stream_compress.groovy @@ -46,7 +46,7 @@ suite("test_http_stream_compress", "p0") { streamLoad { set 'version', '1' set 'sql', """ - insert into ${db}.${tableName1} select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13 from http_stream("format"="csv", "compress_type"="GZ") + insert into ${db}.${tableName1} select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13 from http_stream("format"="csv", "compress_type"="GZ", "column_separator" = ",") """ time 10000 file '../stream_load/all_types.csv.gz' @@ -86,7 +86,7 @@ suite("test_http_stream_compress", "p0") { streamLoad { set 'version', '1' set 'sql', """ - insert into ${db}.${tableName2} select c1, c2, to_bitmap(c3), hll_hash(c4) from http_stream("format"="csv", "compress_type"="bz2") + insert into ${db}.${tableName2} select c1, c2, to_bitmap(c3), hll_hash(c4) from http_stream("format"="csv", "compress_type"="bz2", "column_separator" = ",") """ time 10000 file '../stream_load/bitmap_hll.csv.bz2'