[feature](iceberg) Support read iceberg data on gcs (#19815)

This commit is contained in:
Nick Young
2023-05-20 12:40:03 +08:00
committed by GitHub
parent 1b119704f8
commit 499f443779
13 changed files with 164 additions and 33 deletions

View File

@ -626,6 +626,20 @@ under the License.
</exclusions>
</dependency>
<!-- for fe recognize files stored on gcs -->
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop2-2.2.8</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.ranger/ranger-plugins-common -->
<dependency>
<groupId>org.apache.ranger</groupId>

View File

@ -77,6 +77,7 @@ public class FeConstants {
public static String FS_PREFIX_S3A = "s3a";
public static String FS_PREFIX_S3N = "s3n";
public static String FS_PREFIX_OSS = "oss";
public static String FS_PREFIX_GCS = "gs";
public static String FS_PREFIX_BOS = "bos";
public static String FS_PREFIX_COS = "cos";
public static String FS_PREFIX_OBS = "obs";
@ -87,14 +88,4 @@ public class FeConstants {
public static String FS_PREFIX_FILE = "file";
public static final String INTERNAL_DB_NAME = "__internal_schema";
public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_";
public static boolean isObjStorage(String location) {
return location.startsWith(FeConstants.FS_PREFIX_S3)
|| location.startsWith(FeConstants.FS_PREFIX_S3A)
|| location.startsWith(FeConstants.FS_PREFIX_S3N)
|| location.startsWith(FeConstants.FS_PREFIX_BOS)
|| location.startsWith(FeConstants.FS_PREFIX_COS)
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_OBS);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.common.util;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.DLFProperties;
import org.apache.doris.datasource.property.constants.GCSProperties;
import org.apache.doris.datasource.property.constants.GlueProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
@ -53,8 +54,9 @@ public class PrintableMap<K, V> {
SENSITIVE_KEY.add("bos_secret_accesskey");
SENSITIVE_KEY.add("jdbc.password");
SENSITIVE_KEY.add("elasticsearch.password");
SENSITIVE_KEY.addAll(Arrays.asList(S3Properties.SECRET_KEY, ObsProperties.SECRET_KEY, OssProperties.SECRET_KEY,
CosProperties.SECRET_KEY, GlueProperties.SECRET_KEY, DLFProperties.SECRET_KEY));
SENSITIVE_KEY.addAll(Arrays.asList(S3Properties.SECRET_KEY, ObsProperties.SECRET_KEY, OssProperties.SECRET_KEY,
GCSProperties.SECRET_KEY, CosProperties.SECRET_KEY, GlueProperties.SECRET_KEY,
DLFProperties.SECRET_KEY));
HIDDEN_KEY = Sets.newHashSet();
HIDDEN_KEY.addAll(S3Properties.Env.FS_KEYS);
HIDDEN_KEY.addAll(GlueProperties.META_KEYS);

View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.common.FeConstants;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class S3Util {
private static final Logger LOG = LogManager.getLogger(S3Util.class);
public static boolean isObjStorage(String location) {
return location.startsWith(FeConstants.FS_PREFIX_S3)
|| location.startsWith(FeConstants.FS_PREFIX_S3A)
|| location.startsWith(FeConstants.FS_PREFIX_S3N)
|| location.startsWith(FeConstants.FS_PREFIX_GCS)
|| location.startsWith(FeConstants.FS_PREFIX_BOS)
|| location.startsWith(FeConstants.FS_PREFIX_COS)
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_OBS);
}
public static String convertToS3IfNecessary(String location) {
LOG.debug("try convert location to s3 prefix: " + location);
if (isObjStorage(location)) {
int pos = location.indexOf("://");
if (pos == -1) {
throw new RuntimeException("No '://' found in location: " + location);
}
return "s3" + location.substring(pos);
}
return location;
}
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.external.hive.util.HiveUtil;
@ -296,7 +297,7 @@ public class HiveMetaStoreCache {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
String finalLocation = convertToS3IfNecessary(key.location);
String finalLocation = S3Util.convertToS3IfNecessary(key.location);
JobConf jobConf = getJobConf();
// For Tez engine, it may generate subdirectories for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
@ -345,23 +346,6 @@ public class HiveMetaStoreCache {
}
}
// convert oss:// to s3://
private String convertToS3IfNecessary(String location) {
LOG.debug("try convert location to s3 prefix: " + location);
if (location.startsWith(FeConstants.FS_PREFIX_COS)
|| location.startsWith(FeConstants.FS_PREFIX_BOS)
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_S3A)
|| location.startsWith(FeConstants.FS_PREFIX_S3N)) {
int pos = location.indexOf("://");
if (pos == -1) {
throw new RuntimeException("No '://' found in location: " + location);
}
return "s3" + location.substring(pos);
}
return location;
}
private JobConf getJobConf() {
Configuration configuration = new HdfsConfiguration();
for (Map.Entry<String, String> entry : catalog.getCatalogProperty().getHadoopProperties().entrySet()) {

View File

@ -23,6 +23,7 @@ import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.DLFProperties;
import org.apache.doris.datasource.property.constants.GCSProperties;
import org.apache.doris.datasource.property.constants.GlueProperties;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
@ -96,6 +97,8 @@ public class PropertyConverter {
return convertToS3Properties(props, S3Properties.getCredential(props));
} else if (props.containsKey(ObsProperties.ENDPOINT)) {
return convertToOBSProperties(props, ObsProperties.getCredential(props));
} else if (props.containsKey(GCSProperties.ENDPOINT)) {
return convertToGCSProperties(props, GCSProperties.getCredential(props));
} else if (props.containsKey(OssProperties.ENDPOINT)) {
return convertToOSSProperties(props, OssProperties.getCredential(props));
} else if (props.containsKey(CosProperties.ENDPOINT)) {
@ -215,6 +218,11 @@ public class PropertyConverter {
}
}
private static Map<String, String> convertToGCSProperties(Map<String, String> props, CloudCredential credential) {
// Now we use s3 client to access
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
}
private static Map<String, String> convertToOSSProperties(Map<String, String> props, CloudCredential credential) {
// Now we use s3 client to access
return convertToS3Properties(S3Properties.prefixToS3(props), credential);

View File

@ -18,6 +18,7 @@
package org.apache.doris.datasource.property;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.GCSProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
@ -37,6 +38,7 @@ public class S3ClientBEProperties {
return getBeAWSPropertiesFromS3(properties);
} else if (properties.containsKey(ObsProperties.ENDPOINT)
|| properties.containsKey(OssProperties.ENDPOINT)
|| properties.containsKey(GCSProperties.ENDPOINT)
|| properties.containsKey(CosProperties.ENDPOINT)) {
return getBeAWSPropertiesFromS3(S3Properties.prefixToS3(properties));
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.datasource.credentials.CloudCredential;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class GCSProperties extends BaseProperties {
public static final String GCS_PREFIX = "gs.";
public static final String ENDPOINT = "gs.endpoint";
public static final String REGION = "gs.region";
public static final String ACCESS_KEY = "gs.access_key";
public static final String SECRET_KEY = "gs.secret_key";
public static final String SESSION_TOKEN = "gs.session_token";
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
}

View File

@ -131,7 +131,10 @@ public class S3Properties extends BaseProperties {
if (entry.getKey().startsWith(OssProperties.OSS_PREFIX)) {
String s3Key = entry.getKey().replace(OssProperties.OSS_PREFIX, S3Properties.S3_PREFIX);
s3Properties.put(s3Key, entry.getValue());
} else if (entry.getKey().startsWith(CosProperties.COS_PREFIX)) {
} else if (entry.getKey().startsWith(GCSProperties.GCS_PREFIX)) {
String s3Key = entry.getKey().replace(GCSProperties.GCS_PREFIX, S3Properties.S3_PREFIX);
s3Properties.put(s3Key, entry.getValue());
} else if (entry.getKey().startsWith(CosProperties.COS_PREFIX)) {
String s3Key = entry.getKey().replace(CosProperties.COS_PREFIX, S3Properties.S3_PREFIX);
s3Properties.put(s3Key, entry.getValue());
} else if (entry.getKey().startsWith(ObsProperties.OBS_PREFIX)) {

View File

@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.common.util.Util;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
@ -352,7 +353,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected static Optional<TFileType> getTFileType(String location) {
if (location != null && !location.isEmpty()) {
if (FeConstants.isObjStorage(location)) {
if (S3Util.isObjStorage(location)) {
return Optional.of(TFileType.FILE_S3);
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
return Optional.of(TFileType.FILE_HDFS);

View File

@ -27,6 +27,7 @@ import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.external.iceberg.util.IcebergUtils;
@ -185,7 +186,8 @@ public class IcebergScanNode extends FileQueryScanNode {
long fileSize = task.file().fileSizeInBytes();
for (FileScanTask splitTask : task.split(splitSize)) {
String dataFilePath = splitTask.file().path().toString();
IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(),
String finalDataFilePath = S3Util.convertToS3IfNecessary(dataFilePath);
IcebergSplit split = new IcebergSplit(new Path(finalDataFilePath), splitTask.start(),
splitTask.length(), fileSize, new String[0]);
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {