[fix](broker-load) hdfs or bos path parser not support glob exprs (#8390)

This commit is contained in:
Zhengguo Yang
2022-03-12 20:10:05 +08:00
committed by GitHub
parent 23d0e7b4f9
commit ebbe6f650c
8 changed files with 345 additions and 122 deletions

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.URI;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.proto.FunctionService;
@ -228,7 +229,7 @@ public class CreateFunctionStmt extends DdlStmt {
AggregateFunction.AggregateFunctionBuilder builder = AggregateFunction.AggregateFunctionBuilder.createUdfBuilder();
builder.name(functionName).argsType(argsDef.getArgTypes()).retType(returnType.getType()).
hasVarArgs(argsDef.isVariadic()).intermediateType(intermediateType.getType()).objectFile(objectFile);
hasVarArgs(argsDef.isVariadic()).intermediateType(intermediateType.getType()).location(URI.create(objectFile));
String initFnSymbol = properties.get(INIT_KEY);
if (initFnSymbol == null) {
throw new AnalysisException("No 'init_fn' in properties");
@ -288,10 +289,11 @@ public class CreateFunctionStmt extends DdlStmt {
throw new AnalysisException("check function [" + symbol + "] failed: " + response.getStatus());
}
}
URI location = URI.create(objectFile);
function = ScalarFunction.createUdf(binaryType,
functionName, argsDef.getArgTypes(),
returnType.getType(), argsDef.isVariadic(),
objectFile, symbol, prepareFnSymbol, closeFnSymbol);
location, symbol, prepareFnSymbol, closeFnSymbol);
function.setChecksum(checksum);
}

View File

@ -31,6 +31,7 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.URI;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@ -42,8 +43,6 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -233,18 +232,18 @@ public class ExportStmt extends StatementBase {
throw new AnalysisException("No dest path specified.");
}
try {
URI uri = new URI(path);
String schema = uri.getScheme();
if (type == StorageBackend.StorageType.BROKER) {
if (schema == null || (!schema.equalsIgnoreCase("bos") && !schema.equalsIgnoreCase("afs")
URI uri = URI.create(path);
String schema = uri.getScheme();
if (type == StorageBackend.StorageType.BROKER) {
if (schema == null || (!schema.equalsIgnoreCase("bos") && !schema.equalsIgnoreCase("afs")
&& !schema.equalsIgnoreCase("hdfs"))) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' or 'BOS://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
throw new AnalysisException("Invalid export path. please use valid 'S3://' path.");
}
throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' or 'BOS://' " +
"path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
throw new AnalysisException("Invalid export path. please use valid 'S3://' path.");
}
} else if (type == StorageBackend.StorageType.HDFS) {
if (schema == null || !schema.equalsIgnoreCase("hdfs")) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://' path.");
@ -256,9 +255,6 @@ public class ExportStmt extends StatementBase {
}
path = path.substring(OutFileClause.LOCAL_FILE_PREFIX.length() - 1);
}
} catch (URISyntaxException e) {
throw new AnalysisException("Invalid path format. " + e.getMessage());
}
return path;
}

View File

@ -1,83 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.common.AnalysisException;
import com.google.common.base.Preconditions;
/*
* Represents an HDFS URI in a SQL statement.
*/
public class HdfsURI {
private final String location;
// Set during analysis
// dhc to do
// private Path uriPath;
private String uriPath;
public HdfsURI(String location) {
Preconditions.checkNotNull(location);
this.location = location.trim();
}
public String getPath() {
Preconditions.checkNotNull(uriPath);
return uriPath;
}
public void analyze(Analyzer analyzer) throws AnalysisException {
if (location.isEmpty()) {
throw new AnalysisException("URI path cannot be empty.");
}
uriPath = new String(location);
//dhc to do
/*
uriPath = new Path(location);
if (!uriPath.isUriPathAbsolute()) {
throw new AnalysisException("URI path must be absolute: " + uriPath);
}
try {
FileSystem fs = uriPath.getFileSystem(FileSystemUtil.getConfiguration());
if (!(fs instanceof DistributedFileSystem)) {
throw new AnalysisException(String.format("URI location '%s' " +
"must point to an HDFS file system.", uriPath));
}
} catch (IOException e) {
throw new AnalysisException(e.getMessage(), e);
}
// Fully-qualify the path
uriPath = FileSystemUtil.createFullyQualifiedPath(uriPath);
PrivilegeRequest req = new PrivilegeRequest(
new AuthorizeableURI(uriPath.toString()), privilege);
analyzer.getCatalog().checkAccess(analyzer.getUser(), req);
*/
}
@Override
public String toString() {
// If uriPath is null (this HdfsURI has not been analyzed yet) just return the raw
// location string the caller passed in.
return uriPath == null ? location : uriPath.toString();
}
public String getLocation() {
return location;
}
}

View File

@ -26,7 +26,7 @@ import com.google.gson.Gson;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.HdfsURI;
import org.apache.doris.common.util.URI;
import org.apache.doris.thrift.TAggregateFunction;
import org.apache.doris.thrift.TFunction;
import org.apache.doris.thrift.TFunctionBinaryType;
@ -95,7 +95,7 @@ public class AggregateFunction extends Function {
public AggregateFunction(FunctionName fnName, List<Type> argTypes,
Type retType, Type intermediateType,
HdfsURI location, String updateFnSymbol, String initFnSymbol,
URI location, String updateFnSymbol, String initFnSymbol,
String serializeFnSymbol, String mergeFnSymbol, String getValueFnSymbol,
String removeFnSymbol, String finalizeFnSymbol) {
this(fnName, argTypes, retType, intermediateType, location, updateFnSymbol, initFnSymbol, serializeFnSymbol,
@ -136,7 +136,7 @@ public class AggregateFunction extends Function {
public AggregateFunction(FunctionName fnName, List<Type> argTypes,
Type retType, Type intermediateType,
HdfsURI location, String updateFnSymbol, String initFnSymbol,
URI location, String updateFnSymbol, String initFnSymbol,
String serializeFnSymbol, String mergeFnSymbol, String getValueFnSymbol,
String removeFnSymbol, String finalizeFnSymbol, boolean vectorized) {
this(fnName, argTypes, retType, intermediateType, false, location, updateFnSymbol, initFnSymbol, serializeFnSymbol,
@ -145,7 +145,7 @@ public class AggregateFunction extends Function {
public AggregateFunction(FunctionName fnName, List<Type> argTypes,
Type retType, Type intermediateType, boolean hasVarArgs,
HdfsURI location, String updateFnSymbol, String initFnSymbol,
URI location, String updateFnSymbol, String initFnSymbol,
String serializeFnSymbol, String mergeFnSymbol, String getValueFnSymbol,
String removeFnSymbol, String finalizeFnSymbol, boolean vectorized) {
// only `count` is always not nullable, other aggregate function is always nullable
@ -280,12 +280,12 @@ public class AggregateFunction extends Function {
// Used to create UDAF
public AggregateFunction(FunctionName fnName, Type[] argTypes,
Type retType, boolean hasVarArgs, Type intermediateType, String location,
Type retType, boolean hasVarArgs, Type intermediateType, URI location,
String initFnSymbol, String updateFnSymbol, String mergeFnSymbol,
String serializeFnSymbol, String finalizeFnSymbol,
String getValueFnSymbol, String removeFnSymbol) {
super(fnName, Arrays.asList(argTypes), retType, hasVarArgs);
this.setLocation(new HdfsURI(location));
this.setLocation(location);
this.intermediateType = (intermediateType.equals(retType)) ? null : intermediateType;
this.updateFnSymbol = updateFnSymbol;
this.initFnSymbol = initFnSymbol;
@ -307,7 +307,7 @@ public class AggregateFunction extends Function {
Type retType;
boolean hasVarArgs;
Type intermediateType;
String objectFile;
URI location;
String initFnSymbol;
String updateFnSymbol;
String serializeFnSymbol;
@ -349,8 +349,8 @@ public class AggregateFunction extends Function {
return this;
}
public AggregateFunctionBuilder objectFile(String objectFile) {
this.objectFile = objectFile;
public AggregateFunctionBuilder location(URI location) {
this.location = location;
return this;
}
@ -391,7 +391,7 @@ public class AggregateFunction extends Function {
public AggregateFunction build() {
AggregateFunction fn = new AggregateFunction(name, argTypes, retType, hasVarArgs, intermediateType,
objectFile, initFnSymbol, updateFnSymbol, mergeFnSymbol,
location, initFnSymbol, updateFnSymbol, mergeFnSymbol,
serializeFnSymbol, finalizeFnSymbol,
getValueFnSymbol, removeFnSymbol);
fn.setBinaryType(binaryType);

View File

@ -20,9 +20,10 @@ package org.apache.doris.catalog;
import static org.apache.doris.common.io.IOUtils.writeOptionString;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.HdfsURI;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.URI;
import org.apache.doris.thrift.TFunction;
import org.apache.doris.thrift.TFunctionBinaryType;
@ -113,7 +114,7 @@ public class Function implements Writable {
// Absolute path in HDFS for the binary that contains this function.
// e.g. /udfs/udfs.jar
private HdfsURI location;
private URI location;
private TFunctionBinaryType binaryType;
protected NullableMode nullableMode = NullableMode.DEPEND_ON_ARGUMENT;
@ -190,11 +191,11 @@ public class Function implements Writable {
return argTypes.length;
}
public HdfsURI getLocation() {
public URI getLocation() {
return location;
}
public void setLocation(HdfsURI loc) {
public void setLocation(URI loc) {
location = loc;
}
@ -448,7 +449,7 @@ public class Function implements Writable {
fn.setName(name.toThrift());
fn.setBinaryType(binaryType);
if (location != null) {
fn.setHdfsLocation(location.toString());
fn.setHdfsLocation(location.getLocation());
}
fn.setArgTypes(Type.toThrift(argTypes));
fn.setRetType(getReturnType().toThrift());
@ -635,7 +636,7 @@ public class Function implements Writable {
// write library URL
String libUrl = "";
if (location != null) {
libUrl = location.toString();
libUrl = location.getLocation();
}
writeOptionString(output, libUrl);
writeOptionString(output, checksum);
@ -661,7 +662,13 @@ public class Function implements Writable {
boolean hasLocation = input.readBoolean();
if (hasLocation) {
location = new HdfsURI(Text.readString(input));
String locationStr = Text.readString(input);
try {
location = URI.create(locationStr);
} catch (AnalysisException e) {
LOG.warn("failed to parse location:" + locationStr);
}
}
boolean hasChecksum = input.readBoolean();
if (hasChecksum) {

View File

@ -19,8 +19,8 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.HdfsURI;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.URI;
import org.apache.doris.thrift.TFunction;
import org.apache.doris.thrift.TFunctionBinaryType;
import org.apache.doris.thrift.TScalarFunction;
@ -77,7 +77,7 @@ public class ScalarFunction extends Function {
}
public ScalarFunction(FunctionName fnName, List<Type> argTypes,
Type retType, HdfsURI location, String symbolName, String initFnSymbol,
Type retType, URI location, String symbolName, String initFnSymbol,
String closeFnSymbol) {
super(fnName, argTypes, retType, false);
setLocation(location);
@ -315,13 +315,13 @@ public class ScalarFunction extends Function {
TFunctionBinaryType binaryType,
FunctionName name, Type[] args,
Type returnType, boolean isVariadic,
String objectFile, String symbol, String prepareFnSymbol, String closeFnSymbol) {
URI location, String symbol, String prepareFnSymbol, String closeFnSymbol) {
ScalarFunction fn = new ScalarFunction(name, Arrays.asList(args), returnType, isVariadic, binaryType,
true, false);
fn.symbolName = symbol;
fn.prepareFnSymbol = prepareFnSymbol;
fn.closeFnSymbol = closeFnSymbol;
fn.setLocation(new HdfsURI(objectFile));
fn.setLocation(location);
return fn;
}

View File

@ -0,0 +1,204 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.common.AnalysisException;
import org.apache.commons.lang.StringUtils;
import org.apache.parquet.Strings;
import java.util.TreeMap;
public class URI {
private static final String SCHEME_DELIM = "://";
private static final String AUTH_DELIM = "@";
private static final String PATH_DELIM = "/";
private static final String QUERY_DELIM = "\\?";
private static final String FRAGMENT_DELIM = "#";
private static final String PASS_DELIM = ":";
private static final String PORT_DELIM = ":";
private static final String QUERY_ITEM_DELIM = "&";
private static final String QUERY_KV_DELIM = "=";
private final String location;
private String scheme;
private String authority;
private String host;
private int port = -1;
private String userInfo;
private String userName;
private String passWord;
private String path = "";
private String query;
private String fragment;
private TreeMap<String, String> queryMap;
private URI(String location) throws AnalysisException {
if (Strings.isNullOrEmpty(location)) {
throw new AnalysisException("location can not be null");
}
this.location = location.trim();
parse();
}
public static URI create(String location) throws AnalysisException {
return new URI(location);
}
public String getLocation() {
return location;
}
public String getScheme() {
return scheme;
}
public String getAuthority() {
return authority;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String getUserInfo() {
return userInfo;
}
public String getUserName() {
return userName;
}
public String getPassWord() {
return passWord;
}
public String getPath() {
return path;
}
public String getQuery() {
return query;
}
public String getFragment() {
return fragment;
}
public TreeMap<String, String> getQueryMap() {
return queryMap;
}
private void parse() throws AnalysisException {
String[] schemeSplit = this.location.split(SCHEME_DELIM);
String hierarchical;
if (schemeSplit.length == 1) {
hierarchical = schemeSplit[0];
} else if (schemeSplit.length == 2) {
scheme = schemeSplit[0];
hierarchical = schemeSplit[1];
} else {
throw new AnalysisException("Invalid uri: " + this.location);
}
String[] authoritySplit = hierarchical.split(PATH_DELIM, 2);
String hostPortPath;
if (authoritySplit.length == 1) {
if (StringUtils.isBlank(scheme)) {
hostPortPath = authoritySplit[0];
} else {
authority = authoritySplit[0];
parseAuthority(authority);
return;
}
} else if (authoritySplit.length == 2) {
authority = authoritySplit[0];
parseAuthority(authority);
hostPortPath = authoritySplit[1];
} else {
throw new AnalysisException("Invalid uri: " + this.location);
}
String[] fragSplit = hostPortPath.split(FRAGMENT_DELIM);
if (fragSplit.length == 2) {
fragment = fragSplit[1];
}
String[] querySplit = fragSplit[0].split(QUERY_DELIM);
if (querySplit.length == 1) {
path = StringUtils.isBlank(host) ? querySplit[0] : PATH_DELIM + querySplit[0];
} else if (querySplit.length == 2) {
path = StringUtils.isBlank(host) ? querySplit[0] : PATH_DELIM + querySplit[0];
query = querySplit[1];
parseQuery();
} else {
throw new AnalysisException("Invalid path: " + this.location);
}
}
private void parseQuery() {
if (StringUtils.isBlank(query)) {
return;
}
String[] split = query.split(QUERY_ITEM_DELIM);
queryMap = new TreeMap<>();
for (String item : split) {
String[] itemSplit = item.split(QUERY_KV_DELIM);
if (itemSplit.length == 2) {
queryMap.put(itemSplit[0], itemSplit[1]);
}
}
}
private void parseAuthority(String str) throws AnalysisException {
if (StringUtils.isBlank(str)) {
return;
}
String[] authSplit = str.split(AUTH_DELIM);
String hostPort;
if (authSplit.length == 1) {
hostPort = authSplit[0];
} else if (authSplit.length == 2) {
userInfo = authSplit[0];
hostPort = authSplit[1];
String[] userSplit = userInfo.split(PASS_DELIM);
if (userSplit.length == 1) {
userName = userInfo;
} else if (userSplit.length == 2) {
userName = userSplit[0];
passWord = userSplit[1];
} else {
throw new AnalysisException("Invalid userinfo: " + userInfo);
}
} else {
throw new AnalysisException("Invalid authority: " + str);
}
String[] hostSplit = hostPort.split(PORT_DELIM);
if (hostSplit.length == 1) {
host = hostSplit[0];
} else if (hostSplit.length == 2) {
host = hostSplit[0];
port = Integer.parseInt(hostSplit[1]);
} else {
throw new AnalysisException("Invalid host port: " + hostPort);
}
}
}

View File

@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.common.UserException;
import org.junit.Assert;
import org.junit.Test;
import java.net.URISyntaxException;
public class URITest {
private void check(java.net.URI javaURI, URI myURI) {
Assert.assertEquals(javaURI.getAuthority(), myURI.getAuthority());
Assert.assertEquals(javaURI.getPath(), myURI.getPath());
Assert.assertEquals(javaURI.getHost(), myURI.getHost());
Assert.assertEquals(javaURI.getPort(), myURI.getPort());
Assert.assertEquals(javaURI.getScheme(), myURI.getScheme());
Assert.assertEquals(javaURI.getQuery(), myURI.getQuery());
Assert.assertEquals(javaURI.getFragment(), myURI.getFragment());
Assert.assertEquals(javaURI.getUserInfo(), myURI.getUserInfo());
}
@Test
public void testNormal() throws UserException, URISyntaxException {
String str1 = "foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#nose";
java.net.URI javaURI1 = new java.net.URI(str1);
URI myURI1 = URI.create(str1);
check(javaURI1, myURI1);
Assert.assertEquals(myURI1.getUserName(), "username");
Assert.assertEquals(myURI1.getPassWord(), "password");
Assert.assertEquals(myURI1.getQueryMap().get("type"), "animal");
String str2 = "foo://example.com/over/there/index.dtb#nose";
java.net.URI javaURI2 = new java.net.URI(str2);
URI myURI2 = URI.create(str2);
check(javaURI2, myURI2);
Assert.assertEquals(myURI2.getFragment(), "nose");
String str3 = "foo://example.com/over/there/index.dtb?type=animal";
java.net.URI javaURI3 = new java.net.URI(str3);
URI myURI3 = URI.create(str3);
check(javaURI3, myURI3);
Assert.assertEquals(myURI3.getQueryMap().get("type"), "animal");
String str4 = "foo://:password@example.com/over/there/index.dtb?type=animal";
java.net.URI javaURI4 = new java.net.URI(str4);
URI myURI4 = URI.create(str4);
check(javaURI4, myURI4);
Assert.assertEquals(myURI4.getQueryMap().get("type"), "animal");
String str5 = "foo://password@example.com/over/there/index.dtb?type=animal";
java.net.URI javaURI5 = new java.net.URI(str5);
URI myURI5 = URI.create(str5);
check(javaURI5, myURI5);
Assert.assertEquals(myURI5.getQueryMap().get("type"), "animal");
String str6 = "foo://example.com";
java.net.URI javaURI6 = new java.net.URI(str6);
URI myURI6 = URI.create(str6);
check(javaURI6, myURI6);
String str7 = "example.com";
java.net.URI javaURI7 = new java.net.URI(str7);
URI myURI7 = URI.create(str7);
check(javaURI7, myURI7);
String str8 = "example.com";
java.net.URI javaURI8 = new java.net.URI(str8);
URI myURI8 = URI.create(str8);
check(javaURI8, myURI8);
URI myURI9 = URI.create("hdfs://ip:12/test/test/data/{20220131,20220201}/*");
Assert.assertEquals(myURI9.getScheme(), "hdfs");
Assert.assertEquals(myURI9.getPath(), "/test/test/data/{20220131,20220201}/*");
Assert.assertEquals(myURI9.getHost(), "ip");
Assert.assertEquals(myURI9.getPort(), 12);
Assert.assertEquals(myURI9.getAuthority(), "ip:12");
URI myURI10 = URI.create("hdfs");
Assert.assertEquals(myURI10.getPath(), "hdfs");
}
}