[opt](tvf) refine the class of ExternalFileTableValuedFunction (#24706)

`ExternalFileTableValuedFunction` now has 3 derived classes:

- LocalTableValuedFunction
- HdfsTableValuedFunction
- S3TableValuedFunction

All these tvfs are for reading data from file. The difference is where to read the file, eg, from HDFS or from local filesystem.

So I refine the fields and methods of these classes.
Now there 3 kinds of properties of these tvfs:

1. File format properties

	File format properties, such as `format`, `column_separator`. For all these tvfs, they are common properties.
	So these properties should be analyzed in parenet class `ExternalFileTableValuedFunction`.
	
2. URI or file path

	The URI or file path property indicate the file location. For different storage, the format of the uri are not same.
	So they should be analyzed in each derived classes.
	
3. Other properties

	All other properties which are special for certain tvf.
	So they should be analyzed in each derived classes.
	
There are 2 new classes:

- `FileFormatConstants`: Define some common property names or variables related to file format.
- `FileFormatUtils`: Define some util methods related to file format.

After this PR, if we want to add some common properties for all these tvfs, only need to handled it in
`ExternalFileTableValuedFunction`, to avoid missing handle it in any one of them.

### Behavior change

1. Remove `fs.defaultFS` property in `hdfs()`, it can be got from `uri`
2. Use `\t` as the default column separator of csv format, same as stream load
This commit is contained in:
Mingyu Chen
2023-10-07 12:44:04 +08:00
committed by GitHub
parent 0e615a531e
commit 727fa2c0cd
37 changed files with 982 additions and 985 deletions

View File

@ -28,8 +28,8 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.loadv2.LoadTask;
@ -1113,14 +1113,14 @@ public class DataDescription implements InsertStmt.DataDesc {
// file format
// note(tsy): for historical reason, file format here must be string type rather than TFileFormatType
if (fileFormat != null) {
if (!fileFormat.equalsIgnoreCase("parquet")
&& !fileFormat.equalsIgnoreCase(FeConstants.csv)
&& !fileFormat.equalsIgnoreCase("orc")
&& !fileFormat.equalsIgnoreCase("json")
&& !fileFormat.equalsIgnoreCase("wal")
&& !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names)
&& !fileFormat.equalsIgnoreCase(FeConstants.csv_with_names_and_types)
&& !fileFormat.equalsIgnoreCase("hive_text")) {
if (!fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_PARQUET)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_ORC)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_JSON)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_WAL)
&& !fileFormat.equalsIgnoreCase(FileFormatConstants.FORMAT_HIVE_TEXT)) {
throw new AnalysisException("File Format Type " + fileFormat + " is invalid.");
}
}

View File

@ -27,10 +27,10 @@ import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.Util;
@ -246,11 +246,11 @@ public class OutFileClause {
fileFormatType = TFileFormatType.FORMAT_ORC;
break;
case "csv_with_names":
headerType = FeConstants.csv_with_names;
headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "csv_with_names_and_types":
headerType = FeConstants.csv_with_names_and_types;
headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
default:

View File

@ -24,9 +24,9 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.datasource.property.constants.S3Properties.Env;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.tablefunction.S3TableValuedFunction;
import com.google.common.annotations.VisibleForTesting;
@ -145,7 +145,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
final List<String> filePaths = dataDescription.getFilePaths();
Preconditions.checkState(filePaths.size() == 1, "there should be only one file path");
final String s3FilePath = filePaths.get(0);
params.put(S3TableValuedFunction.S3_URI, s3FilePath);
params.put(S3TableValuedFunction.PROP_URI, s3FilePath);
final Map<String, String> dataDescProp = dataDescription.getProperties();
if (dataDescProp != null) {
@ -153,7 +153,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
}
final String format = Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT);
params.put(ExternalFileTableValuedFunction.FORMAT, format);
params.put(FileFormatConstants.PROP_FORMAT, format);
if (isCsvFormat(format)) {
parseSeparator(dataDescription.getColumnSeparatorObj(), params);
@ -162,7 +162,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
List<String> columnsFromPath = dataDescription.getColumnsFromPath();
if (columnsFromPath != null) {
params.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS,
params.put(FileFormatConstants.PROP_PATH_PARTITION_KEYS,
String.join(",", columnsFromPath));
}
@ -190,7 +190,7 @@ public class S3TvfLoadStmt extends NativeInsertStmt {
} catch (AnalysisException e) {
throw new DdlException(String.format("failed to parse separator:%s", separator), e);
}
tvfParams.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, separator.getSeparator());
tvfParams.put(FileFormatConstants.PROP_COLUMN_SEPARATOR, separator.getSeparator());
}
private static boolean isCsvFormat(String format) {

View File

@ -39,8 +39,10 @@ public class StorageBackend implements ParseNode {
if (Strings.isNullOrEmpty(path)) {
throw new AnalysisException("No destination path specified.");
}
checkUri(URI.create(path), type);
}
URI uri = URI.create(path);
public static void checkUri(URI uri, StorageBackend.StorageType type) throws AnalysisException {
String schema = uri.getScheme();
if (schema == null) {
throw new AnalysisException(

View File

@ -20,9 +20,6 @@ package org.apache.doris.common;
import org.apache.doris.persist.meta.FeMetaFormat;
public class FeConstants {
// Database and table's default configurations, we will never change them
public static short default_replication_num = 3;
// The default value of bucket setting && auto bucket without estimate_partition_size
public static int default_bucket_num = 10;
@ -39,7 +36,6 @@ public class FeConstants {
public static int heartbeat_interval_second = 5;
public static int checkpoint_interval_second = 60; // 1 minutes
public static int ip_check_interval_second = 5;
// dpp version
public static String dpp_version = "3_2_0";
@ -71,12 +67,6 @@ public class FeConstants {
public static long tablet_checker_interval_ms = 20 * 1000L;
public static long tablet_schedule_interval_ms = 1000L;
public static String csv = "csv";
public static String csv_with_names = "csv_with_names";
public static String csv_with_names_and_types = "csv_with_names_and_types";
public static String text = "hive_text";
public static String FS_PREFIX_S3 = "s3";
public static String FS_PREFIX_S3A = "s3a";
public static String FS_PREFIX_S3N = "s3n";
@ -92,6 +82,7 @@ public class FeConstants {
public static String FS_PREFIX_HDFS = "hdfs";
public static String FS_PREFIX_VIEWFS = "viewfs";
public static String FS_PREFIX_FILE = "file";
public static final String INTERNAL_DB_NAME = "__internal_schema";
public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_";

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import java.util.regex.Pattern;
public class FileFormatConstants {
public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
public static final String DEFAULT_LINE_DELIMITER = "\n";
public static final String FORMAT_CSV = "csv";
public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types";
public static final String FORMAT_HIVE_TEXT = "hive_text";
public static final String FORMAT_PARQUET = "parquet";
public static final String FORMAT_ORC = "orc";
public static final String FORMAT_JSON = "json";
public static final String FORMAT_AVRO = "avro";
public static final String FORMAT_WAL = "wal";
public static final String PROP_FORMAT = "format";
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
public static final String PROP_LINE_DELIMITER = "line_delimiter";
public static final String PROP_JSON_ROOT = "json_root";
public static final String PROP_JSON_PATHS = "jsonpaths";
public static final String PROP_STRIP_OUTER_ARRAY = "strip_outer_array";
public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line";
public static final String PROP_NUM_AS_STRING = "num_as_string";
public static final String PROP_FUZZY_PARSE = "fuzzy_parse";
public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
public static final String PROP_SKIP_LINES = "skip_lines";
public static final String PROP_CSV_SCHEMA = "csv_schema";
public static final String PROP_COMPRESS_TYPE = "compress_type";
public static final String PROP_PATH_PARTITION_KEYS = "path_partition_keys";
// decimal(p,s)
public static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
// datetime(p)
public static final Pattern DATETIME_TYPE_PATTERN = Pattern.compile("datetime\\((\\d+)\\)");
}

View File

@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import com.google.common.base.Strings;
import java.util.List;
import java.util.regex.Matcher;
public class FileFormatUtils {
public static boolean isCsv(String formatStr) {
return FileFormatConstants.FORMAT_CSV.equalsIgnoreCase(formatStr)
|| FileFormatConstants.FORMAT_CSV_WITH_NAMES.equalsIgnoreCase(formatStr)
|| FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES.equalsIgnoreCase(formatStr)
|| FileFormatConstants.FORMAT_HIVE_TEXT.equalsIgnoreCase(formatStr);
}
// public for unit test
public static void parseCsvSchema(List<Column> csvSchema, String csvSchemaStr)
throws AnalysisException {
if (Strings.isNullOrEmpty(csvSchemaStr)) {
return;
}
// the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
String[] schemaStrs = csvSchemaStr.split(";");
try {
for (String schemaStr : schemaStrs) {
String[] kv = schemaStr.replace(" ", "").split(":");
if (kv.length != 2) {
throw new AnalysisException("invalid csv schema: " + csvSchemaStr);
}
Column column = null;
String name = kv[0].toLowerCase();
FeNameFormat.checkColumnName(name);
String type = kv[1].toLowerCase();
if (type.equals("tinyint")) {
column = new Column(name, PrimitiveType.TINYINT, true);
} else if (type.equals("smallint")) {
column = new Column(name, PrimitiveType.SMALLINT, true);
} else if (type.equals("int")) {
column = new Column(name, PrimitiveType.INT, true);
} else if (type.equals("bigint")) {
column = new Column(name, PrimitiveType.BIGINT, true);
} else if (type.equals("largeint")) {
column = new Column(name, PrimitiveType.LARGEINT, true);
} else if (type.equals("float")) {
column = new Column(name, PrimitiveType.FLOAT, true);
} else if (type.equals("double")) {
column = new Column(name, PrimitiveType.DOUBLE, true);
} else if (type.startsWith("decimal")) {
// regex decimal(p, s)
Matcher matcher = FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(type);
if (!matcher.find()) {
throw new AnalysisException("invalid decimal type: " + type);
}
int precision = Integer.parseInt(matcher.group(1));
int scale = Integer.parseInt(matcher.group(2));
column = new Column(name, ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
"");
} else if (type.equals("date")) {
column = new Column(name, ScalarType.createDateType(), false, null, true, null, "");
} else if (type.startsWith("datetime")) {
int scale = 0;
if (!type.equals("datetime")) {
// regex datetime(s)
Matcher matcher = FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(type);
if (!matcher.find()) {
throw new AnalysisException("invalid datetime type: " + type);
}
scale = Integer.parseInt(matcher.group(1));
}
column = new Column(name, ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
} else if (type.equals("string")) {
column = new Column(name, PrimitiveType.STRING, true);
} else if (type.equals("boolean")) {
column = new Column(name, PrimitiveType.BOOLEAN, true);
} else {
throw new AnalysisException("unsupported column type: " + type);
}
csvSchema.add(column);
}
} catch (Exception e) {
throw new AnalysisException("invalid csv schema: " + e.getMessage());
}
}
}

View File

@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.ConnectContext;
@ -551,17 +550,18 @@ public class Util {
public static TFileFormatType getFileFormatTypeFromName(String formatName) {
String lowerFileFormat = Objects.requireNonNull(formatName).toLowerCase();
if (lowerFileFormat.equals("parquet")) {
if (lowerFileFormat.equals(FileFormatConstants.FORMAT_PARQUET)) {
return TFileFormatType.FORMAT_PARQUET;
} else if (lowerFileFormat.equals("orc")) {
} else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_ORC)) {
return TFileFormatType.FORMAT_ORC;
} else if (lowerFileFormat.equals("json")) {
} else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_JSON)) {
return TFileFormatType.FORMAT_JSON;
// csv/csv_with_name/csv_with_names_and_types treat as csv format
} else if (lowerFileFormat.equals(FeConstants.csv) || lowerFileFormat.equals(FeConstants.csv_with_names)
|| lowerFileFormat.equals(FeConstants.csv_with_names_and_types)
} else if (lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV)
|| lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
|| lowerFileFormat.equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)
// TODO: Add TEXTFILE to TFileFormatType to Support hive text file format.
|| lowerFileFormat.equals(FeConstants.text)) {
|| lowerFileFormat.equals(FileFormatConstants.FORMAT_HIVE_TEXT)) {
return TFileFormatType.FORMAT_CSV_PLAIN;
} else if (lowerFileFormat.equals("wal")) {
return TFileFormatType.FORMAT_WAL;

View File

@ -25,6 +25,8 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.analyzer.UnboundAlias;
@ -52,7 +54,6 @@ import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryStateException;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.tablefunction.HdfsTableValuedFunction;
import org.apache.doris.tablefunction.S3TableValuedFunction;
@ -269,7 +270,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
}
private static boolean isCsvType(Map<String, String> tvfProperties) {
return tvfProperties.get(ExternalFileTableValuedFunction.FORMAT).equalsIgnoreCase("csv");
return tvfProperties.get(FileFormatConstants.PROP_FORMAT).equalsIgnoreCase("csv");
}
/**
@ -296,11 +297,11 @@ public class LoadCommand extends Command implements ForwardWithSync {
Map<String, String> sourceProperties = dataDesc.getProperties();
if (dataDesc.getFileFieldNames().isEmpty() && isCsvType(tvfProperties)) {
String csvSchemaStr = sourceProperties.get(ExternalFileTableValuedFunction.CSV_SCHEMA);
String csvSchemaStr = sourceProperties.get(FileFormatConstants.PROP_CSV_SCHEMA);
if (csvSchemaStr != null) {
tvfProperties.put(ExternalFileTableValuedFunction.CSV_SCHEMA, csvSchemaStr);
tvfProperties.put(FileFormatConstants.PROP_CSV_SCHEMA, csvSchemaStr);
List<Column> csvSchema = new ArrayList<>();
ExternalFileTableValuedFunction.parseCsvSchema(csvSchema, sourceProperties);
FileFormatUtils.parseCsvSchema(csvSchema, csvSchemaStr);
List<NamedExpression> csvColumns = new ArrayList<>();
for (Column csvColumn : csvSchema) {
csvColumns.add(new UnboundSlot(csvColumn.getName()));
@ -440,12 +441,12 @@ public class LoadCommand extends Command implements ForwardWithSync {
String fileFormat = dataDesc.getFormatDesc().getFileFormat().orElse("csv");
if ("csv".equalsIgnoreCase(fileFormat)) {
dataDesc.getFormatDesc().getColumnSeparator().ifPresent(sep ->
tvfProperties.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, sep.getSeparator()));
tvfProperties.put(FileFormatConstants.PROP_COLUMN_SEPARATOR, sep.getSeparator()));
dataDesc.getFormatDesc().getLineDelimiter().ifPresent(sep ->
tvfProperties.put(ExternalFileTableValuedFunction.LINE_DELIMITER, sep.getSeparator()));
tvfProperties.put(FileFormatConstants.PROP_LINE_DELIMITER, sep.getSeparator()));
}
// TODO: resolve and put ExternalFileTableValuedFunction params
tvfProperties.put(ExternalFileTableValuedFunction.FORMAT, fileFormat);
tvfProperties.put(FileFormatConstants.PROP_FORMAT, fileFormat);
List<String> filePaths = dataDesc.getFilePaths();
// TODO: support multi location by union
@ -454,7 +455,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
S3Properties.convertToStdProperties(tvfProperties);
tvfProperties.keySet().removeIf(S3Properties.Env.FS_KEYS::contains);
// TODO: check file path by s3 fs list status
tvfProperties.put(S3TableValuedFunction.S3_URI, listFilePath);
tvfProperties.put(S3TableValuedFunction.PROP_URI, listFilePath);
}
final Map<String, String> dataDescProps = dataDesc.getProperties();
@ -463,7 +464,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
}
List<String> columnsFromPath = dataDesc.getColumnsFromPath();
if (columnsFromPath != null && !columnsFromPath.isEmpty()) {
tvfProperties.put(ExternalFileTableValuedFunction.PATH_PARTITION_KEYS,
tvfProperties.put(FileFormatConstants.PROP_PATH_PARTITION_KEYS,
String.join(",", columnsFromPath));
}
return tvfProperties;

View File

@ -20,7 +20,7 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
@ -65,8 +65,8 @@ public class ResultFileSink extends DataSink {
public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause, ArrayList<String> labels) {
this(exchNodeId, outFileClause);
if (outFileClause.getHeaderType().equals(FeConstants.csv_with_names)
|| outFileClause.getHeaderType().equals(FeConstants.csv_with_names_and_types)) {
if (outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
|| outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) {
header = genNames(labels, outFileClause.getColumnSeparator(), outFileClause.getLineDelimiter());
}
headerType = outFileClause.getHeaderType();

View File

@ -316,7 +316,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
for (Split split : inputSplits) {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType = getLocationType(fileSplit.getPath().toString());
setLocationPropertiesIfNecessary(locationType, fileSplit, locationProperties);
setLocationPropertiesIfNecessary(locationType, locationProperties);
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from hive partitions.
@ -392,7 +392,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
scanRangeLocations.size(), (System.currentTimeMillis() - start));
}
private void setLocationPropertiesIfNecessary(TFileType locationType, FileSplit fileSplit,
private void setLocationPropertiesIfNecessary(TFileType locationType,
Map<String, String> locationProperties) throws UserException {
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
if (!params.isSetHdfsParams()) {
@ -479,13 +479,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected abstract Map<String, String> getLocationProperties() throws UserException;
// eg: hdfs://namenode s3://buckets
protected String getFsName(FileSplit split) {
String fullPath = split.getPath().toUri().toString();
String filePath = split.getPath().toUri().getPath();
return fullPath.replace(filePath, "");
}
protected static Optional<TFileType> getTFileType(String location) {
if (location != null && !location.isEmpty()) {
if (S3Util.isObjStorage(location)) {

View File

@ -27,9 +27,9 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.Load;
@ -131,8 +131,8 @@ public class LoadScanProvider {
private String getHeaderType(String formatType) {
if (formatType != null) {
if (formatType.equalsIgnoreCase(FeConstants.csv_with_names) || formatType.equalsIgnoreCase(
FeConstants.csv_with_names_and_types)) {
if (formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
|| formatType.equalsIgnoreCase(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) {
return formatType;
}
}

View File

@ -81,10 +81,6 @@ public class TVFScanNode extends FileQueryScanNode {
numNodes = backendPolicy.numBackends();
}
protected String getFsName(FileSplit split) {
return tableValuedFunction.getFsName();
}
@Override
public TFileAttributes getFileAttributes() throws UserException {
return tableValuedFunction.getFileAttributes();

View File

@ -34,12 +34,12 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.external.TVFScanNode;
@ -71,6 +71,7 @@ import org.apache.doris.thrift.TTextSerdeType;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -84,8 +85,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@ -93,44 +92,24 @@ import java.util.stream.Collectors;
*/
public abstract class ExternalFileTableValuedFunction extends TableValuedFunctionIf {
public static final Logger LOG = LogManager.getLogger(ExternalFileTableValuedFunction.class);
protected static String DEFAULT_COLUMN_SEPARATOR = ",";
protected static final String DEFAULT_LINE_DELIMITER = "\n";
public static final String FORMAT = "format";
public static final String TABLE_ID = "table_id";
public static final String COLUMN_SEPARATOR = "column_separator";
public static final String LINE_DELIMITER = "line_delimiter";
protected static final String JSON_ROOT = "json_root";
protected static final String JSON_PATHS = "jsonpaths";
protected static final String STRIP_OUTER_ARRAY = "strip_outer_array";
protected static final String READ_JSON_BY_LINE = "read_json_by_line";
protected static final String NUM_AS_STRING = "num_as_string";
protected static final String FUZZY_PARSE = "fuzzy_parse";
protected static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes";
protected static final String SKIP_LINES = "skip_lines";
public static final String CSV_SCHEMA = "csv_schema";
protected static final String COMPRESS_TYPE = "compress_type";
public static final String PATH_PARTITION_KEYS = "path_partition_keys";
// decimal(p,s)
private static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
// datetime(p)
private static final Pattern DATETIME_TYPE_PATTERN = Pattern.compile("datetime\\((\\d+)\\)");
public static final String PROP_TABLE_ID = "table_id";
protected static final ImmutableSet<String> FILE_FORMAT_PROPERTIES = new ImmutableSet.Builder<String>()
.add(FORMAT)
.add(TABLE_ID)
.add(JSON_ROOT)
.add(JSON_PATHS)
.add(STRIP_OUTER_ARRAY)
.add(READ_JSON_BY_LINE)
.add(NUM_AS_STRING)
.add(FUZZY_PARSE)
.add(COLUMN_SEPARATOR)
.add(LINE_DELIMITER)
.add(TRIM_DOUBLE_QUOTES)
.add(SKIP_LINES)
.add(CSV_SCHEMA)
.add(COMPRESS_TYPE)
.add(PATH_PARTITION_KEYS)
.add(FileFormatConstants.PROP_FORMAT)
.add(FileFormatConstants.PROP_JSON_ROOT)
.add(FileFormatConstants.PROP_JSON_PATHS)
.add(FileFormatConstants.PROP_STRIP_OUTER_ARRAY)
.add(FileFormatConstants.PROP_READ_JSON_BY_LINE)
.add(FileFormatConstants.PROP_NUM_AS_STRING)
.add(FileFormatConstants.PROP_FUZZY_PARSE)
.add(FileFormatConstants.PROP_COLUMN_SEPARATOR)
.add(FileFormatConstants.PROP_LINE_DELIMITER)
.add(FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES)
.add(FileFormatConstants.PROP_SKIP_LINES)
.add(FileFormatConstants.PROP_CSV_SCHEMA)
.add(FileFormatConstants.PROP_COMPRESS_TYPE)
.add(FileFormatConstants.PROP_PATH_PARTITION_KEYS)
.build();
// Columns got from file and path(if has)
@ -142,17 +121,16 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
private List<String> pathPartitionKeys;
protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
protected Map<String, String> locationProperties;
protected Map<String, String> locationProperties = Maps.newHashMap();
protected String filePath;
private TFileFormatType fileFormatType;
protected TFileFormatType fileFormatType;
private TFileCompressType compressionType;
private String headerType = "";
private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
private String lineDelimiter = DEFAULT_LINE_DELIMITER;
private String columnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
private String lineDelimiter = FileFormatConstants.DEFAULT_LINE_DELIMITER;
private String jsonRoot = "";
private String jsonPaths = "";
private boolean stripOuterArray;
@ -181,20 +159,6 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
return locationProperties;
}
public List<Column> getCsvSchema() {
return csvSchema;
}
public String getFsName() {
TFileType fileType = getTFileType();
if (fileType == TFileType.FILE_HDFS) {
return locationProperties.get(HdfsResource.HADOOP_FS_NAME);
} else if (fileType == TFileType.FILE_S3) {
return locationProperties.get(S3Properties.ENDPOINT);
}
return "";
}
public List<String> getPathPartitionKeys() {
return pathPartitionKeys;
}
@ -209,24 +173,29 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
}
}
//The keys in the passed validParams map need to be lowercase.
protected void parseProperties(Map<String, String> validParams) throws AnalysisException {
String formatString = validParams.getOrDefault(FORMAT, "");
//The keys in properties map need to be lowercase.
protected Map<String, String> parseCommonProperties(Map<String, String> properties) throws AnalysisException {
// Copy the properties, because we will remove the key from properties.
Map<String, String> copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
copiedProps.putAll(properties);
String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, "");
String defaultColumnSeparator = FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
switch (formatString) {
case "csv":
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "hive_text":
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
this.DEFAULT_COLUMN_SEPARATOR = "\001";
defaultColumnSeparator = FileFormatConstants.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR;
this.textSerdeType = TTextSerdeType.HIVE_TEXT_SERDE;
break;
case "csv_with_names":
this.headerType = FeConstants.csv_with_names;
this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "csv_with_names_and_types":
this.headerType = FeConstants.csv_with_names_and_types;
this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
break;
case "parquet":
@ -248,114 +217,62 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
throw new AnalysisException("format:" + formatString + " is not supported.");
}
tableId = Long.valueOf(validParams.getOrDefault(TABLE_ID, "-1")).longValue();
columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR);
tableId = Long.valueOf(getOrDefaultAndRemove(copiedProps, PROP_TABLE_ID, "-1"));
columnSeparator = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_COLUMN_SEPARATOR,
defaultColumnSeparator);
if (Strings.isNullOrEmpty(columnSeparator)) {
throw new AnalysisException("column_separator can not be empty.");
}
columnSeparator = Separator.convertSeparator(columnSeparator);
lineDelimiter = validParams.getOrDefault(LINE_DELIMITER, DEFAULT_LINE_DELIMITER);
lineDelimiter = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_LINE_DELIMITER,
FileFormatConstants.DEFAULT_LINE_DELIMITER);
if (Strings.isNullOrEmpty(lineDelimiter)) {
throw new AnalysisException("line_delimiter can not be empty.");
}
lineDelimiter = Separator.convertSeparator(lineDelimiter);
jsonRoot = validParams.getOrDefault(JSON_ROOT, "");
jsonPaths = validParams.getOrDefault(JSON_PATHS, "");
readJsonByLine = Boolean.valueOf(validParams.get(READ_JSON_BY_LINE)).booleanValue();
stripOuterArray = Boolean.valueOf(validParams.get(STRIP_OUTER_ARRAY)).booleanValue();
numAsString = Boolean.valueOf(validParams.get(NUM_AS_STRING)).booleanValue();
fuzzyParse = Boolean.valueOf(validParams.get(FUZZY_PARSE)).booleanValue();
trimDoubleQuotes = Boolean.valueOf(validParams.get(TRIM_DOUBLE_QUOTES)).booleanValue();
skipLines = Integer.valueOf(validParams.getOrDefault(SKIP_LINES, "0")).intValue();
jsonRoot = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_JSON_ROOT, "");
jsonPaths = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_JSON_PATHS, "");
readJsonByLine = Boolean.valueOf(
getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_READ_JSON_BY_LINE, "")).booleanValue();
stripOuterArray = Boolean.valueOf(
getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_STRIP_OUTER_ARRAY, "")).booleanValue();
numAsString = Boolean.valueOf(
getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_NUM_AS_STRING, "")).booleanValue();
fuzzyParse = Boolean.valueOf(
getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FUZZY_PARSE, "")).booleanValue();
trimDoubleQuotes = Boolean.valueOf(
getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES, "")).booleanValue();
skipLines = Integer.valueOf(
getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_SKIP_LINES, "0")).intValue();
String compressTypeStr = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_COMPRESS_TYPE, "UNKNOWN");
try {
compressionType = Util.getFileCompressType(validParams.getOrDefault(COMPRESS_TYPE, "UNKNOWN"));
compressionType = Util.getFileCompressType(compressTypeStr);
} catch (IllegalArgumentException e) {
throw new AnalysisException("Compress type : " + validParams.get(COMPRESS_TYPE) + " is not supported.");
throw new AnalysisException("Compress type : " + compressTypeStr + " is not supported.");
}
if (formatString.equals("csv") || formatString.equals("csv_with_names")
|| formatString.equals("csv_with_names_and_types")) {
parseCsvSchema(csvSchema, validParams);
if (FileFormatUtils.isCsv(formatString)) {
FileFormatUtils.parseCsvSchema(csvSchema, getOrDefaultAndRemove(copiedProps,
FileFormatConstants.PROP_CSV_SCHEMA, ""));
LOG.debug("get csv schema: {}", csvSchema);
}
pathPartitionKeys = Optional.ofNullable(validParams.get(PATH_PARTITION_KEYS))
.map(str ->
Arrays.stream(str.split(","))
.map(String::trim)
.collect(Collectors.toList()))
pathPartitionKeys = Optional.ofNullable(
getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_PATH_PARTITION_KEYS, null))
.map(str -> Arrays.stream(str.split(","))
.map(String::trim)
.collect(Collectors.toList()))
.orElse(Lists.newArrayList());
return copiedProps;
}
// public for unit test
public static void parseCsvSchema(List<Column> csvSchema, Map<String, String> validParams)
throws AnalysisException {
String csvSchemaStr = validParams.get(CSV_SCHEMA);
if (Strings.isNullOrEmpty(csvSchemaStr)) {
return;
}
// the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)"
String[] schemaStrs = csvSchemaStr.split(";");
try {
for (String schemaStr : schemaStrs) {
String[] kv = schemaStr.replace(" ", "").split(":");
if (kv.length != 2) {
throw new AnalysisException("invalid csv schema: " + csvSchemaStr);
}
Column column = null;
String name = kv[0].toLowerCase();
FeNameFormat.checkColumnName(name);
String type = kv[1].toLowerCase();
if (type.equals("tinyint")) {
column = new Column(name, PrimitiveType.TINYINT, true);
} else if (type.equals("smallint")) {
column = new Column(name, PrimitiveType.SMALLINT, true);
} else if (type.equals("int")) {
column = new Column(name, PrimitiveType.INT, true);
} else if (type.equals("bigint")) {
column = new Column(name, PrimitiveType.BIGINT, true);
} else if (type.equals("largeint")) {
column = new Column(name, PrimitiveType.LARGEINT, true);
} else if (type.equals("float")) {
column = new Column(name, PrimitiveType.FLOAT, true);
} else if (type.equals("double")) {
column = new Column(name, PrimitiveType.DOUBLE, true);
} else if (type.startsWith("decimal")) {
// regex decimal(p, s)
Matcher matcher = DECIMAL_TYPE_PATTERN.matcher(type);
if (!matcher.find()) {
throw new AnalysisException("invalid decimal type: " + type);
}
int precision = Integer.parseInt(matcher.group(1));
int scale = Integer.parseInt(matcher.group(2));
column = new Column(name, ScalarType.createDecimalV3Type(precision, scale), false, null, true, null,
"");
} else if (type.equals("date")) {
column = new Column(name, ScalarType.createDateType(), false, null, true, null, "");
} else if (type.startsWith("datetime")) {
int scale = 0;
if (!type.equals("datetime")) {
// regex datetime(s)
Matcher matcher = DATETIME_TYPE_PATTERN.matcher(type);
if (!matcher.find()) {
throw new AnalysisException("invalid datetime type: " + type);
}
scale = Integer.parseInt(matcher.group(1));
}
column = new Column(name, ScalarType.createDatetimeV2Type(scale), false, null, true, null, "");
} else if (type.equals("string")) {
column = new Column(name, PrimitiveType.STRING, true);
} else if (type.equals("boolean")) {
column = new Column(name, PrimitiveType.BOOLEAN, true);
} else {
throw new AnalysisException("unsupported column type: " + type);
}
csvSchema.add(column);
}
LOG.debug("get csv schema: {}", csvSchema);
} catch (Exception e) {
throw new AnalysisException("invalid csv schema: " + e.getMessage());
}
protected String getOrDefaultAndRemove(Map<String, String> props, String key, String defaultValue) {
String value = props.getOrDefault(key, defaultValue);
props.remove(key);
return value;
}
public List<TBrokerFileStatus> getFileStatuses() {
@ -588,3 +505,4 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
}

View File

@ -25,9 +25,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.URI;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import com.google.common.base.Strings;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@ -39,50 +37,41 @@ import java.util.Map;
*/
public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction {
public static final Logger LOG = LogManager.getLogger(HdfsTableValuedFunction.class);
public static final String NAME = "hdfs";
public static final String HDFS_URI = "uri";
// simple or kerberos
private static final String PROP_URI = "uri";
private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>()
.add(HDFS_URI)
.add(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
.add(HdfsResource.HADOOP_FS_NAME)
.add(HdfsResource.HADOOP_USER_NAME)
.add(HdfsResource.HADOOP_KERBEROS_PRINCIPAL)
.add(HdfsResource.HADOOP_KERBEROS_KEYTAB)
.add(HdfsResource.HADOOP_SHORT_CIRCUIT)
.add(HdfsResource.HADOOP_SOCKET_PATH)
.build();
public HdfsTableValuedFunction(Map<String, String> properties) throws AnalysisException {
init(properties);
}
private URI hdfsUri;
private void init(Map<String, String> properties) throws AnalysisException {
// 1. analyze common properties
Map<String, String> otherProps = super.parseCommonProperties(properties);
public HdfsTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> fileParams = new CaseInsensitiveMap();
locationProperties = Maps.newHashMap();
for (String key : params.keySet()) {
String lowerKey = key.toLowerCase();
if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
fileParams.put(lowerKey, params.get(key));
} else if (LOCATION_PROPERTIES.contains(lowerKey)) {
locationProperties.put(lowerKey, params.get(key));
} else if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) {
// 2. analyze uri
String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null);
if (Strings.isNullOrEmpty(uriStr)) {
throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI));
}
URI uri = URI.create(uriStr);
StorageBackend.checkUri(uri, StorageType.HDFS);
filePath = uri.getScheme() + "://" + uri.getAuthority() + uri.getPath();
// 3. analyze other properties
for (String key : otherProps.keySet()) {
if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) {
// because HADOOP_FS_NAME contains upper and lower case
locationProperties.put(HdfsResource.HADOOP_FS_NAME, params.get(key));
locationProperties.put(HdfsResource.HADOOP_FS_NAME, otherProps.get(key));
} else {
locationProperties.put(key, params.get(key));
locationProperties.put(key, otherProps.get(key));
}
}
if (!locationProperties.containsKey(HDFS_URI)) {
throw new AnalysisException(String.format("Configuration '%s' is required.", HDFS_URI));
// If the user does not specify the HADOOP_FS_NAME, we will use the uri's scheme and authority
if (!locationProperties.containsKey(HdfsResource.HADOOP_FS_NAME)) {
locationProperties.put(HdfsResource.HADOOP_FS_NAME, uri.getScheme() + "://" + uri.getAuthority());
}
StorageBackend.checkPath(locationProperties.get(HDFS_URI), StorageType.HDFS);
hdfsUri = URI.create(locationProperties.get(HDFS_URI));
filePath = locationProperties.get(HdfsResource.HADOOP_FS_NAME) + hdfsUri.getPath();
super.parseProperties(fileParams);
// 4. parse file
parseFile();
}

View File

@ -20,9 +20,9 @@ package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -36,24 +36,15 @@ public class HttpStreamTableValuedFunction extends ExternalFileTableValuedFuncti
private static final Logger LOG = LogManager.getLogger(HttpStreamTableValuedFunction.class);
public static final String NAME = "http_stream";
public HttpStreamTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> fileParams = new CaseInsensitiveMap();
for (String key : params.keySet()) {
String lowerKey = key.toLowerCase();
if (!FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
throw new AnalysisException(key + " is invalid property");
}
fileParams.put(lowerKey, params.get(key).toLowerCase());
}
public HttpStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException {
// 1. analyze common properties
super.parseCommonProperties(properties);
String formatString = fileParams.getOrDefault(FORMAT, "").toLowerCase();
if (formatString.equals("parquet")
|| formatString.equals("avro")
|| formatString.equals("orc")) {
throw new AnalysisException("current http_stream does not yet support parquet, avro and orc");
if (fileFormatType == TFileFormatType.FORMAT_PARQUET
|| fileFormatType == TFileFormatType.FORMAT_AVRO
|| fileFormatType == TFileFormatType.FORMAT_ORC) {
throw new AnalysisException("http_stream does not yet support parquet, avro and orc");
}
super.parseProperties(fileParams);
}
// =========== implement abstract methods of ExternalFileTableValuedFunction =================

View File

@ -31,8 +31,6 @@ import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -46,42 +44,31 @@ import java.util.concurrent.TimeUnit;
*/
public class LocalTableValuedFunction extends ExternalFileTableValuedFunction {
private static final Logger LOG = LogManager.getLogger(LocalTableValuedFunction.class);
public static final String NAME = "local";
public static final String FILE_PATH = "file_path";
public static final String BACKEND_ID = "backend_id";
public static final String PROP_FILE_PATH = "file_path";
public static final String PROP_BACKEND_ID = "backend_id";
private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>()
.add(FILE_PATH)
.add(BACKEND_ID)
.add(PROP_FILE_PATH)
.add(PROP_BACKEND_ID)
.build();
private long backendId;
public LocalTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> fileParams = new CaseInsensitiveMap();
locationProperties = Maps.newHashMap();
for (String key : params.keySet()) {
String lowerKey = key.toLowerCase();
if (FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
fileParams.put(lowerKey, params.get(key));
} else if (LOCATION_PROPERTIES.contains(lowerKey)) {
locationProperties.put(lowerKey, params.get(key));
} else {
throw new AnalysisException(key + " is invalid property");
}
}
public LocalTableValuedFunction(Map<String, String> properties) throws AnalysisException {
// 1. analyze common properties
Map<String, String> otherProps = super.parseCommonProperties(properties);
// 2. analyze location properties
for (String key : LOCATION_PROPERTIES) {
if (!locationProperties.containsKey(key)) {
throw new AnalysisException(String.format("Configuration '%s' is required.", key));
if (!otherProps.containsKey(key)) {
throw new AnalysisException(String.format("Property '%s' is required.", key));
}
}
filePath = otherProps.get(PROP_FILE_PATH);
backendId = Long.parseLong(otherProps.get(PROP_BACKEND_ID));
filePath = locationProperties.get(FILE_PATH);
backendId = Long.parseLong(locationProperties.get(BACKEND_ID));
super.parseProperties(fileParams);
// 3. parse file
getFileListFromBackend();
}

View File

@ -30,10 +30,9 @@ import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.thrift.TFileType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.Map;
/**
@ -49,71 +48,46 @@ import java.util.Map;
*/
public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
public static final String NAME = "s3";
public static final String S3_URI = "uri";
public static final String PROP_URI = "uri";
private static final ImmutableSet<String> DEPRECATED_KEYS =
ImmutableSet.of("access_key", "secret_key", "session_token", "region");
ImmutableSet.of("access_key", "secret_key", "session_token", "region",
"ACCESS_KEY", "SECRET_KEY", "SESSION_TOKEN", "REGION");
private static final ImmutableSet<String> OPTIONAL_KEYS =
ImmutableSet.of(S3Properties.SESSION_TOKEN, PropertyConverter.USE_PATH_STYLE, S3Properties.REGION,
PATH_PARTITION_KEYS);
private static final ImmutableSet<String> LOCATION_PROPERTIES = ImmutableSet.<String>builder()
.add(S3_URI)
.add(S3Properties.ENDPOINT)
.addAll(DEPRECATED_KEYS)
.addAll(S3Properties.TVF_REQUIRED_FIELDS)
.addAll(OPTIONAL_KEYS)
.build();
private final S3URI s3uri;
private final boolean forceVirtualHosted;
private String virtualBucket = "";
public S3TableValuedFunction(Map<String, String> params) throws AnalysisException {
public S3TableValuedFunction(Map<String, String> properties) throws AnalysisException {
// 1. analyze common properties
Map<String, String> otherProps = super.parseCommonProperties(properties);
Map<String, String> fileParams = new HashMap<>();
for (Map.Entry<String, String> entry : params.entrySet()) {
String key = entry.getKey();
String lowerKey = key.toLowerCase();
if (!LOCATION_PROPERTIES.contains(lowerKey) && !FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
throw new AnalysisException("Invalid property: " + key);
}
if (DEPRECATED_KEYS.contains(lowerKey)) {
lowerKey = S3Properties.S3_PREFIX + lowerKey;
}
fileParams.put(lowerKey, entry.getValue());
// 2. analyze uri and other properties
String uriStr = getOrDefaultAndRemove(otherProps, PROP_URI, null);
if (Strings.isNullOrEmpty(uriStr)) {
throw new AnalysisException(String.format("Properties '%s' is required.", PROP_URI));
}
forwardCompatibleDeprecatedKeys(otherProps);
if (!fileParams.containsKey(S3_URI)) {
throw new AnalysisException("Missing required property: " + S3_URI);
}
forceVirtualHosted = isVirtualHosted(fileParams);
s3uri = getS3Uri(fileParams);
final String endpoint = forceVirtualHosted
? getEndpointAndSetVirtualBucket(params)
: s3uri.getBucketScheme();
if (!fileParams.containsKey(S3Properties.REGION)) {
String usePathStyle = getOrDefaultAndRemove(otherProps, PropertyConverter.USE_PATH_STYLE, "false");
boolean forceVirtualHosted = isVirtualHosted(uriStr, Boolean.parseBoolean(usePathStyle));
S3URI s3uri = getS3Uri(uriStr, forceVirtualHosted);
String endpoint = forceVirtualHosted
? getEndpointAndSetVirtualBucket(s3uri, otherProps) : s3uri.getBucketScheme();
if (!otherProps.containsKey(S3Properties.REGION)) {
String region = S3Properties.getRegionOfEndpoint(endpoint);
fileParams.put(S3Properties.REGION, region);
otherProps.put(S3Properties.REGION, region);
}
checkNecessaryS3Properties(otherProps);
CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(endpoint,
fileParams.get(S3Properties.REGION),
fileParams.get(S3Properties.ACCESS_KEY),
fileParams.get(S3Properties.SECRET_KEY));
if (fileParams.containsKey(S3Properties.SESSION_TOKEN)) {
credential.setSessionToken(fileParams.get(S3Properties.SESSION_TOKEN));
otherProps.get(S3Properties.REGION),
otherProps.get(S3Properties.ACCESS_KEY),
otherProps.get(S3Properties.SECRET_KEY));
if (otherProps.containsKey(S3Properties.SESSION_TOKEN)) {
credential.setSessionToken(otherProps.get(S3Properties.SESSION_TOKEN));
}
// set S3 location properties
// these five properties is necessary, no one can be lost.
locationProperties = S3Properties.credentialToMap(credential);
String usePathStyle = fileParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false");
locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
this.locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(this.locationProperties));
super.parseProperties(fileParams);
locationProperties.putAll(S3ClientBEProperties.getBeFSProperties(locationProperties));
if (forceVirtualHosted) {
filePath = NAME + S3URI.SCHEME_DELIM + virtualBucket + S3URI.PATH_DELIM
@ -130,39 +104,59 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
}
}
private String getEndpointAndSetVirtualBucket(Map<String, String> params) throws AnalysisException {
Preconditions.checkState(forceVirtualHosted, "only invoked when force virtual hosted.");
String[] fileds = s3uri.getVirtualBucket().split("\\.", 2);
virtualBucket = fileds[0];
if (fileds.length > 1) {
private void forwardCompatibleDeprecatedKeys(Map<String, String> props) {
for (String deprecatedKey : DEPRECATED_KEYS) {
String value = props.remove(deprecatedKey);
if (!Strings.isNullOrEmpty(value)) {
props.put("s3." + deprecatedKey.toLowerCase(), value);
}
}
}
private void checkNecessaryS3Properties(Map<String, String> props) throws AnalysisException {
if (Strings.isNullOrEmpty(props.get(S3Properties.REGION))) {
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.REGION));
}
if (Strings.isNullOrEmpty(props.get(S3Properties.ACCESS_KEY))) {
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ACCESS_KEY));
}
if (Strings.isNullOrEmpty(props.get(S3Properties.SECRET_KEY))) {
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.SECRET_KEY));
}
}
private String getEndpointAndSetVirtualBucket(S3URI s3uri, Map<String, String> props)
throws AnalysisException {
String[] fields = s3uri.getVirtualBucket().split("\\.", 2);
virtualBucket = fields[0];
if (fields.length > 1) {
// At this point, s3uri.getVirtualBucket() is: virtualBucket.endpoint, Eg:
// uri: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt
// s3uri.getVirtualBucket() = my_bucket.cos.ap-beijing.myqcloud.com,
// so we need separate virtualBucket and endpoint.
return fileds[1];
} else if (params.containsKey(S3Properties.ENDPOINT)) {
return params.get(S3Properties.ENDPOINT);
return fields[1];
} else if (props.containsKey(S3Properties.ENDPOINT)) {
return props.get(S3Properties.ENDPOINT);
} else {
throw new AnalysisException("can not parse endpoint, please check uri.");
}
}
private boolean isVirtualHosted(Map<String, String> validParams) {
String originUri = validParams.getOrDefault(S3_URI, "");
if (originUri.toLowerCase().startsWith("s3")) {
private boolean isVirtualHosted(String uri, boolean usePathStyle) {
if (uri.toLowerCase().startsWith("s3")) {
// s3 protocol, default virtual-hosted style
return true;
} else {
// not s3 protocol, forceVirtualHosted is determined by USE_PATH_STYLE.
return !Boolean.parseBoolean(validParams.get(PropertyConverter.USE_PATH_STYLE));
return !usePathStyle;
}
}
private S3URI getS3Uri(Map<String, String> validParams) throws AnalysisException {
private S3URI getS3Uri(String uri, boolean forceVirtualHosted) throws AnalysisException {
try {
return S3URI.create(validParams.get(S3_URI), forceVirtualHosted);
return S3URI.create(uri, forceVirtualHosted);
} catch (UserException e) {
throw new AnalysisException("parse s3 uri failed, uri = " + validParams.get(S3_URI), e);
throw new AnalysisException("parse s3 uri failed, uri = " + uri, e);
}
}
@ -189,3 +183,4 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
return "S3TableValuedFunction";
}
}