From aa0b74d63a8d4812b86fd1496cb7792261348194 Mon Sep 17 00:00:00 2001 From: DuRipeng <453243496@qq.com> Date: Fri, 13 Oct 2023 11:04:38 +0800 Subject: [PATCH] [improvement](fe and broker) support specify broker to getSplits, check isSplitable, file scan for HMS Multi-catalog (#24830) I want to use Doris Multi-catalog to accelerate HMS query. My organization has custom distributed file system, and we think wrapping the fs access difference into broker (listLocatedFiles, openReader..) would be a elegant approach. This pr introduce HMS catalog conf `bind.broker.name`. If we set this conf, file split, query scan operation will send to broker. usage: create a hms catalog with broker usage ``` CREATE CATALOG hive_catalog_broker PROPERTIES ( 'type'='hms', 'hive.metastore.uris' = 'thrift://xxx', 'broker.name' = 'hdfs_broker' ); ``` When we try to query from this catalog, file split and query scan request will send to broker `hdfs_broker`. More details about this pr: 1. Introduce HMS catalog proporty `bind.broker.name` to specify broker name to do remote path work. When `broker.name` is set, `enable.self.splitter` must be `true` to ensure file splitting process is executed in Fe 2. Introduce 2 more interfaces to broker service: - `TBrokerIsSplittableResponse isSplittable(1: TBrokerIsSplittableRequest request)`, helps to invoke input format `isSplitable` interface. - `TBrokerListResponse listLocatedFiles(1: TBrokerListPathRequest request)`, helps to do `listFiles` or `listLocatedStatus` for remote file system 3. 3 parts of whole processing will be executed in broker: - Check whether the path with specified input format name `isSplittable` - `listLocatedFiles` of table / partition locations. - `OpenReader` for specified file splits. Co-authored-by: chenlinzhong <490103404@qq.com> --- docs/en/docs/lakehouse/multi-catalog/hive.md | 8 ++ .../docs/lakehouse/multi-catalog/hive.md | 8 ++ .../doris/datasource/ExternalCatalog.java | 8 ++ .../doris/datasource/HMSExternalCatalog.java | 2 + .../datasource/hive/HiveMetaStoreCache.java | 54 +++++---- .../doris/external/hive/util/HiveUtil.java | 12 +- .../org/apache/doris/fs/FileSystemCache.java | 17 ++- .../apache/doris/fs/FileSystemFactory.java | 11 +- .../org/apache/doris/fs/FileSystemType.java | 1 + .../doris/fs/remote/BrokerFileSystem.java | 87 ++++++++++++++ .../doris/planner/external/HiveScanNode.java | 25 +++- .../doris/statistics/util/StatisticsUtil.java | 3 +- fs_brokers/apache_hdfs_broker/pom.xml | 26 +++- .../doris/broker/hdfs/FileSystemManager.java | 55 +++++++-- .../broker/hdfs/HDFSBrokerServiceImpl.java | 44 +++++++ .../org/apache/doris/common/HiveUtils.java | 112 ++++++++++++++++++ gensrc/thrift/PaloBrokerService.thrift | 20 ++++ 17 files changed, 447 insertions(+), 46 deletions(-) create mode 100644 fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java diff --git a/docs/en/docs/lakehouse/multi-catalog/hive.md b/docs/en/docs/lakehouse/multi-catalog/hive.md index d4e2708906..398468a611 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hive.md +++ b/docs/en/docs/lakehouse/multi-catalog/hive.md @@ -401,6 +401,14 @@ If the variable `truncate_char_or_varchar_columns` is enabled, when the maximum The variable default is false. +## Access HMS with broker + +Add following setting when creating an HMS catalog, file splitting and scanning for Hive external table will be completed by broker named `test_broker` + +```sql +"broker.name" = "test_broker" +``` + ## Integrate with Apache Ranger Apache Ranger is a security framework for monitoring, enabling services, and comprehensive data security access management on the Hadoop platform. diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md index 98c722d52d..4ed341b7a1 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md @@ -380,6 +380,14 @@ CREATE CATALOG hive PROPERTIES ( 该变量默认为 false。 +## 使用 broker 访问 HMS + +创建 HMS Catalog 时增加如下配置,Hive 外表文件分片和文件扫描将会由名为 `test_broker` 的 broker 完成 + +```sql +"broker.name" = "test_broker" +``` + ## 使用 Ranger 进行权限校验 Apache Ranger是一个用来在Hadoop平台上进行监控,启用服务,以及全方位数据安全访问管理的安全框架。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 987ef2614c..e764fdd356 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -593,6 +593,14 @@ public abstract class ExternalCatalog return ret; } + public String bindBrokerName() { + Map properties = catalogProperty.getProperties(); + if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) { + return properties.get(HMSExternalCatalog.BIND_BROKER_NAME); + } + return null; + } + @Override public Collection> getAllDbs() { makeSureInitialized(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 8bdea35e45..7ca0ac0cff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -61,6 +61,8 @@ public class HMSExternalCatalog extends ExternalCatalog { private long lastSyncedEventId = -1L; public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter"; public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second"; + // broker name for file split and query scan. + public static final String BIND_BROKER_NAME = "broker.name"; private static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed"; // -1 means file cache no ttl set diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index df6f48b97d..e5a43f3bb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -365,11 +365,13 @@ public class HiveMetaStoreCache { // Get File Status by using FileSystem API. private FileCacheValue getFileCache(String location, InputFormat inputFormat, JobConf jobConf, - List partitionValues) throws UserException { + List partitionValues, + String bindBrokerName) throws UserException { FileCacheValue result = new FileCacheValue(); - result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf)); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); + new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity( + location, bindBrokerName), jobConf, bindBrokerName)); + result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf)); try { // For Tez engine, it may generate subdirectoies for "union" query. // So there may be files and directories in the table directory at the same time. eg: @@ -429,7 +431,8 @@ public class HiveMetaStoreCache { InputFormat inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = getFileCache(finalLocation, inputFormat, jobConf, key.getPartitionValues()); + result = getFileCache(finalLocation, inputFormat, jobConf, + key.getPartitionValues(), key.bindBrokerName); } else { InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); @@ -508,23 +511,23 @@ public class HiveMetaStoreCache { } public List getFilesByPartitionsWithCache(List partitions, - boolean useSelfSplitter) { - return getFilesByPartitions(partitions, useSelfSplitter, true); + boolean useSelfSplitter, String bindBrokerName) { + return getFilesByPartitions(partitions, useSelfSplitter, true, bindBrokerName); } public List getFilesByPartitionsWithoutCache(List partitions, - boolean useSelfSplitter) { - return getFilesByPartitions(partitions, useSelfSplitter, false); + boolean useSelfSplitter, String bindBrokerName) { + return getFilesByPartitions(partitions, useSelfSplitter, false, bindBrokerName); } private List getFilesByPartitions(List partitions, - boolean useSelfSplitter, boolean withCache) { + boolean useSelfSplitter, boolean withCache, String bindBrokerName) { long start = System.currentTimeMillis(); List keys = partitions.stream().map(p -> { FileCacheKey fileCacheKey = p.isDummyPartition() ? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(), - p.getInputFormat(), useSelfSplitter) - : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues()); + p.getInputFormat(), useSelfSplitter, bindBrokerName) + : new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName); fileCacheKey.setUseSelfSplitter(useSelfSplitter); return fileCacheKey; }).collect(Collectors.toList()); @@ -602,7 +605,7 @@ public class HiveMetaStoreCache { HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), - null, partition.getPartitionValues())); + null, partition.getPartitionValues(), null)); partitionCache.invalidate(partKey); } } @@ -620,7 +623,7 @@ public class HiveMetaStoreCache { * and FE will exit if some network problems occur. * */ FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey( - dbName, tblName, null, null, false); + dbName, tblName, null, null, false, null); fileCacheRef.get().invalidate(fileCacheKey); } } @@ -635,7 +638,7 @@ public class HiveMetaStoreCache { HivePartition partition = partitionCache.getIfPresent(partKey); if (partition != null) { fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(), - null, partition.getPartitionValues())); + null, partition.getPartitionValues(), null)); partitionCache.invalidate(partKey); } } @@ -781,7 +784,7 @@ public class HiveMetaStoreCache { } public List getFilesByTransaction(List partitions, ValidWriteIdList validWriteIds, - boolean isFullAcid, long tableId) { + boolean isFullAcid, long tableId, String bindBrokerName) { List fileCacheValues = Lists.newArrayList(); String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); try { @@ -812,7 +815,8 @@ public class HiveMetaStoreCache { String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey( - FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf)); + FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(), + bindBrokerName), jobConf, bindBrokerName)); Status status = fs.exists(acidVersionPath); if (status != Status.OK) { if (status.getErrCode() == ErrCode.NOT_FOUND) { @@ -833,7 +837,9 @@ public class HiveMetaStoreCache { for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) { String location = delta.getPath().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); + new FileSystemCache.FileSystemCacheKey( + FileSystemFactory.getFSIdentity(location, bindBrokerName), + jobConf, bindBrokerName)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); if (delta.isDeleteDelta()) { List deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter( @@ -851,7 +857,9 @@ public class HiveMetaStoreCache { if (directory.getBaseDirectory() != null) { String location = directory.getBaseDirectory().toString(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( - new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf)); + new FileSystemCache.FileSystemCacheKey( + FileSystemFactory.getFSIdentity(location, bindBrokerName), + jobConf, bindBrokerName)); RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false); locatedFiles.files().stream().filter( f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)) @@ -949,6 +957,8 @@ public class HiveMetaStoreCache { private String location; // not in key private String inputFormat; + // Broker name for file split and file scan. + private String bindBrokerName; // Temp variable, use self file splitter or use InputFormat.getSplits. // Will remove after self splitter is stable. private boolean useSelfSplitter; @@ -957,16 +967,18 @@ public class HiveMetaStoreCache { // partitionValues would be ["part1", "part2"] protected List partitionValues; - public FileCacheKey(String location, String inputFormat, List partitionValues) { + public FileCacheKey(String location, String inputFormat, List partitionValues, String bindBrokerName) { this.location = location; this.inputFormat = inputFormat; this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues; this.useSelfSplitter = true; + this.bindBrokerName = bindBrokerName; } public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location, - String inputFormat, boolean useSelfSplitter) { - FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null); + String inputFormat, boolean useSelfSplitter, + String bindBrokerName) { + FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName); fileCacheKey.dummyKey = dbName + "." + tblName; fileCacheKey.useSelfSplitter = useSelfSplitter; return fileCacheKey; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java index e1baea3652..85b8034fe7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -24,6 +24,8 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.BrokerFileSystem; +import org.apache.doris.fs.remote.RemoteFileSystem; import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileSystem; @@ -184,12 +186,17 @@ public final class HiveUtil { } } - public static boolean isSplittable(InputFormat inputFormat, Path path, JobConf jobConf) { + public static boolean isSplittable(RemoteFileSystem remoteFileSystem, InputFormat inputFormat, + String location, JobConf jobConf) throws UserException { + if (remoteFileSystem instanceof BrokerFileSystem) { + return ((BrokerFileSystem) remoteFileSystem) + .isSplittable(location, inputFormat.getClass().getCanonicalName()); + } + // ORC uses a custom InputFormat but is always splittable if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) { return true; } - // use reflection to get isSplitable method on FileInputFormat // ATTN: the method name is actually "isSplitable", but the right spell is "isSplittable" Method method = null; @@ -205,6 +212,7 @@ public final class HiveUtil { if (method == null) { return false; } + Path path = new Path(location); try { method.setAccessible(true); return (boolean) method.invoke(inputFormat, FileSystemFactory.getNativeByPath(path, jobConf), path); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java index edc746ebe2..7946dd5e8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java @@ -53,7 +53,7 @@ public class FileSystemCache { } private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) { - return FileSystemFactory.getByType(key.type, key.conf); + return FileSystemFactory.getRemoteFileSystem(key.type, key.conf, key.bindBrokerName); } public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) { @@ -69,11 +69,13 @@ public class FileSystemCache { // eg: hdfs://nameservices1 private final String fsIdent; private final JobConf conf; + private final String bindBrokerName; - public FileSystemCacheKey(Pair fs, JobConf conf) { + public FileSystemCacheKey(Pair fs, JobConf conf, String bindBrokerName) { this.type = fs.first; this.fsIdent = fs.second; this.conf = conf; + this.bindBrokerName = bindBrokerName; } @Override @@ -84,14 +86,21 @@ public class FileSystemCache { if (!(obj instanceof FileSystemCacheKey)) { return false; } - return type.equals(((FileSystemCacheKey) obj).type) + boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) obj).type) && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent) && conf == ((FileSystemCacheKey) obj).conf; + if (bindBrokerName == null) { + return equalsWithoutBroker; + } + return equalsWithoutBroker && bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName); } @Override public int hashCode() { - return Objects.hash(conf, fsIdent, type); + if (bindBrokerName == null) { + return Objects.hash(conf, fsIdent, type); + } + return Objects.hash(conf, fsIdent, type, bindBrokerName); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java index 3837a7eb95..e54a73bbff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -56,9 +56,11 @@ public class FileSystemFactory { } } - public static Pair getFSIdentity(String location) { + public static Pair getFSIdentity(String location, String bindBrokerName) { FileSystemType fsType; - if (S3Util.isObjStorage(location)) { + if (bindBrokerName != null) { + fsType = FileSystemType.BROKER; + } else if (S3Util.isObjStorage(location)) { if (S3Util.isHdfsOnOssEndpoint(location)) { // if hdfs service is enabled on oss, use hdfs lib to access oss. fsType = FileSystemType.DFS; @@ -83,7 +85,8 @@ public class FileSystemFactory { return Pair.of(fsType, fsIdent); } - public static RemoteFileSystem getByType(FileSystemType type, Configuration conf) { + public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf, + String bindBrokerName) { Map properties = new HashMap<>(); conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue())); switch (type) { @@ -95,6 +98,8 @@ public class FileSystemFactory { return new OFSFileSystem(properties); case JFS: return new JFSFileSystem(properties); + case BROKER: + return new BrokerFileSystem(bindBrokerName, properties); default: throw new IllegalStateException("Not supported file system type: " + type); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java index 5ddea01174..018130f0c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemType.java @@ -22,5 +22,6 @@ public enum FileSystemType { DFS, OFS, JFS, + BROKER, FILE } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java index cb87150928..ef8d484bda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java @@ -24,8 +24,10 @@ import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.fs.RemoteFiles; import org.apache.doris.fs.operations.BrokerFileOperations; import org.apache.doris.fs.operations.OpParams; import org.apache.doris.service.FrontendOptions; @@ -34,6 +36,8 @@ import org.apache.doris.thrift.TBrokerCheckPathExistResponse; import org.apache.doris.thrift.TBrokerDeletePathRequest; import org.apache.doris.thrift.TBrokerFD; import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerIsSplittableRequest; +import org.apache.doris.thrift.TBrokerIsSplittableResponse; import org.apache.doris.thrift.TBrokerListPathRequest; import org.apache.doris.thrift.TBrokerListResponse; import org.apache.doris.thrift.TBrokerOperationStatus; @@ -65,6 +69,7 @@ import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -556,6 +561,88 @@ public class BrokerFileSystem extends RemoteFileSystem { return Status.OK; } + @Override + public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException { + // get a proper broker + Pair pair = getBroker(); + if (pair == null) { + throw new UserException("failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // invoke broker 'listLocatedFiles' interface + boolean needReturn = true; + try { + TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath, + recursive, properties); + req.setOnlyFiles(onlyFiles); + TBrokerListResponse response = client.listLocatedFiles(req); + TBrokerOperationStatus operationStatus = response.getOpStatus(); + if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("failed to listLocatedFiles, remote path: " + remotePath + ". msg: " + + operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + } + List result = new ArrayList<>(); + List fileStatus = response.getFiles(); + for (TBrokerFileStatus tFile : fileStatus) { + org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(tFile.path); + RemoteFile file = new RemoteFile(path.getName(), path, !tFile.isDir, tFile.isDir, tFile.size, + tFile.getBlockSize(), tFile.getModificationTime(), null /* blockLocations is null*/); + result.add(file); + } + LOG.info("finished to listLocatedFiles, remote path {}. get files: {}", remotePath, result); + return new RemoteFiles(result); + } catch (TException e) { + needReturn = false; + throw new UserException("failed to listLocatedFiles, remote path: " + + remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + } finally { + if (needReturn) { + ClientPool.brokerPool.returnObject(address, client); + } else { + ClientPool.brokerPool.invalidateObject(address, client); + } + } + } + + public boolean isSplittable(String remotePath, String inputFormat) throws UserException { + // get a proper broker + Pair pair = getBroker(); + if (pair == null) { + throw new UserException("failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // invoke 'isSplittable' interface + boolean needReturn = true; + try { + TBrokerIsSplittableRequest req = new TBrokerIsSplittableRequest().setVersion(TBrokerVersion.VERSION_ONE) + .setPath(remotePath).setInputFormat(inputFormat).setProperties(properties); + TBrokerIsSplittableResponse response = client.isSplittable(req); + TBrokerOperationStatus operationStatus = response.getOpStatus(); + if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("failed to get path isSplittable, remote path: " + remotePath + ". msg: " + + operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + } + boolean result = response.isSplittable(); + LOG.info("finished to get path isSplittable, remote path {} with format {}, isSplittable: {}", + remotePath, inputFormat, result); + return result; + } catch (TException e) { + needReturn = false; + throw new UserException("failed to get path isSplittable, remote path: " + + remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address)); + } finally { + if (needReturn) { + ClientPool.brokerPool.returnObject(address, client); + } else { + ClientPool.brokerPool.invalidateObject(address, client); + } + } + } + // List files in remotePath @Override public Status list(String remotePath, List result, boolean fileNameOnly) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java index 1ba77fa5f9..d414181048 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java @@ -196,8 +196,14 @@ public class HiveScanNode extends FileQueryScanNode { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter(); + String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); + if (bindBrokerName != null && useSelfSplitter == false) { + // useSelfSplitter must be true if bindBrokerName is set. + throw new UserException(HMSExternalCatalog.ENABLE_SELF_SPLITTER + " should be true if " + + HMSExternalCatalog.BIND_BROKER_NAME + " is set"); + } List allFiles = Lists.newArrayList(); - getFileSplitByPartitions(cache, getPartitions(), allFiles, useSelfSplitter); + getFileSplitByPartitions(cache, getPartitions(), allFiles, useSelfSplitter, bindBrokerName); LOG.debug("get #{} files for table: {}.{}, cost: {} ms", allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start)); return allFiles; @@ -210,12 +216,13 @@ public class HiveScanNode extends FileQueryScanNode { } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List partitions, - List allFiles, boolean useSelfSplitter) throws IOException { + List allFiles, boolean useSelfSplitter, + String bindBrokerName) throws IOException { List fileCaches; if (hiveTransaction != null) { - fileCaches = getFileSplitByTransaction(cache, partitions); + fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName); } else { - fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter); + fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter, bindBrokerName); } if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime(); @@ -287,7 +294,8 @@ public class HiveScanNode extends FileQueryScanNode { return fileList.subList(0, index); } - private List getFileSplitByTransaction(HiveMetaStoreCache cache, List partitions) { + private List getFileSplitByTransaction(HiveMetaStoreCache cache, List partitions, + String bindBrokerName) { for (HivePartition partition : partitions) { if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) { // this is unpartitioned table. @@ -297,7 +305,8 @@ public class HiveScanNode extends FileQueryScanNode { } ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds( ((HMSExternalCatalog) hmsTable.getCatalog()).getClient()); - return cache.getFilesByTransaction(partitions, validWriteIds, hiveTransaction.isFullAcid(), hmsTable.getId()); + return cache.getFilesByTransaction(partitions, validWriteIds, + hiveTransaction.isFullAcid(), hmsTable.getId(), bindBrokerName); } @Override @@ -319,6 +328,10 @@ public class HiveScanNode extends FileQueryScanNode { @Override protected TFileType getLocationType(String location) throws UserException { + String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); + if (bindBrokerName != null) { + return TFileType.FILE_BROKER; + } return getTFileType(location).orElseThrow(() -> new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index f482c81287..a5c7eb0765 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -669,7 +669,8 @@ public class StatisticsUtil { table.getRemoteTable().getSd().getLocation(), null)); } // Get files for all partitions. - return cache.getFilesByPartitionsWithoutCache(hivePartitions, true); + String bindBrokerName = table.getCatalog().bindBrokerName(); + return cache.getFilesByPartitionsWithoutCache(hivePartitions, true, bindBrokerName); } /** diff --git a/fs_brokers/apache_hdfs_broker/pom.xml b/fs_brokers/apache_hdfs_broker/pom.xml index bbd58e5d5d..2cb8d892de 100644 --- a/fs_brokers/apache_hdfs_broker/pom.xml +++ b/fs_brokers/apache_hdfs_broker/pom.xml @@ -69,9 +69,10 @@ under the License. 1.8 2.18.0 github - 2.10.2 + 3.3.6 4.1.65.Final hadoop2-2.2.15 + 1.0.1 @@ -224,6 +225,29 @@ under the License. + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + javax.servlet + servlet-api + + + io.netty + netty + + + + + + org.apache.doris + hive-catalog-shade + ${doris.hive.catalog.shade.version} + + com.fasterxml.jackson.core jackson-databind diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 3548d08dcd..58a09c7e09 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -25,17 +25,19 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode; import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; -import org.apache.hadoop.fs.CommonConfigurationKeys; import java.io.File; import java.io.FileNotFoundException; @@ -1062,6 +1064,49 @@ public class FileSystemManager { } } + public List listLocatedFiles(String path, boolean onlyFiles, + boolean recursive, Map properties) { + List resultFileStatus = null; + BrokerFileSystem fileSystem = getFileSystem(path, properties); + Path locatedPath = new Path(path); + try { + FileSystem innerFileSystem = fileSystem.getDFSFileSystem(); + RemoteIterator locatedFiles = onlyFiles ? innerFileSystem.listFiles(locatedPath, recursive) + : innerFileSystem.listLocatedStatus(locatedPath); + return getFileLocations(locatedFiles); + } catch (FileNotFoundException e) { + logger.info("file not found: " + e.getMessage()); + throw new BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND, + e, "file not found"); + } catch (Exception e) { + logger.error("errors while get file status ", e); + fileSystem.closeFileSystem(); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + e, "unknown error when listLocatedFiles"); + } + } + + private List getFileLocations(RemoteIterator locatedFiles) throws IOException { + List locations = new ArrayList<>(); + while (locatedFiles.hasNext()) { + LocatedFileStatus fileStatus = locatedFiles.next(); + TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus(); + brokerFileStatus.setPath(fileStatus.getPath().toString()); + brokerFileStatus.setIsDir(fileStatus.isDirectory()); + if (fileStatus.isDirectory()) { + brokerFileStatus.setIsSplitable(false); + brokerFileStatus.setSize(-1); + } else { + brokerFileStatus.setSize(fileStatus.getLen()); + brokerFileStatus.setIsSplitable(true); + } + brokerFileStatus.setModificationTime(fileStatus.getModificationTime()); + brokerFileStatus.setBlockSize(fileStatus.getBlockSize()); + locations.add(brokerFileStatus); + } + return locations; + } + public List listPath(String path, boolean fileNameOnly, Map properties) { List resultFileStatus = null; WildcardURI pathUri = new WildcardURI(path); @@ -1283,13 +1328,7 @@ public class FileSystemManager { FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd); synchronized (fsDataOutputStream) { long currentStreamOffset; - try { - currentStreamOffset = fsDataOutputStream.getPos(); - } catch (IOException e) { - logger.error("errors while get file pos from output stream", e); - throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - "errors while get file pos from output stream"); - } + currentStreamOffset = fsDataOutputStream.getPos(); if (currentStreamOffset != offset) { // it's ok, it means that last pwrite succeed finally if (currentStreamOffset == offset + data.length) { diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java index 14ff74dd41..816462ecb3 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/HDFSBrokerServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.doris.broker.hdfs; import com.google.common.base.Stopwatch; import org.apache.doris.common.BrokerPerfMonitor; +import org.apache.doris.common.HiveUtils; import org.apache.doris.thrift.TBrokerCheckPathExistRequest; import org.apache.doris.thrift.TBrokerCheckPathExistResponse; import org.apache.doris.thrift.TBrokerCloseReaderRequest; @@ -28,6 +29,8 @@ import org.apache.doris.thrift.TBrokerFD; import org.apache.doris.thrift.TBrokerFileSizeRequest; import org.apache.doris.thrift.TBrokerFileSizeResponse; import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerIsSplittableResponse; +import org.apache.doris.thrift.TBrokerIsSplittableRequest; import org.apache.doris.thrift.TBrokerListPathRequest; import org.apache.doris.thrift.TBrokerListResponse; import org.apache.doris.thrift.TBrokerOpenReaderRequest; @@ -86,6 +89,47 @@ public class HDFSBrokerServiceImpl implements TPaloBrokerService.Iface { } } + @Override + public TBrokerListResponse listLocatedFiles(TBrokerListPathRequest request) + throws TException { + logger.info("received a listLocatedFiles request, request detail: " + request); + TBrokerListResponse response = new TBrokerListResponse(); + try { + boolean recursive = request.isIsRecursive(); + boolean onlyFiles = false; + if (request.isSetOnlyFiles()) { + onlyFiles = request.isOnlyFiles(); + } + List fileStatuses = fileSystemManager.listLocatedFiles(request.path, + onlyFiles, recursive, request.properties); + response.setOpStatus(generateOKStatus()); + response.setFiles(fileStatuses); + return response; + } catch (BrokerException e) { + logger.warn("failed to list path: " + request.path, e); + TBrokerOperationStatus errorStatus = e.generateFailedOperationStatus(); + response.setOpStatus(errorStatus); + return response; + } + } + + @Override + public TBrokerIsSplittableResponse isSplittable(TBrokerIsSplittableRequest request) throws TException { + logger.info("received a isSplittable request, request detail: " + request); + TBrokerIsSplittableResponse response = new TBrokerIsSplittableResponse(); + try { + boolean isSplittable = HiveUtils.isSplittable(request.path, request.inputFormat, request.properties); + response.setOpStatus(generateOKStatus()); + response.setSplittable(isSplittable); + return response; + } catch (BrokerException e) { + logger.warn("failed to get isSplitable with path: " + request.path, e); + TBrokerOperationStatus errorStatus = e.generateFailedOperationStatus(); + response.setOpStatus(errorStatus); + return response; + } + } + @Override public TBrokerOperationStatus deletePath(TBrokerDeletePathRequest request) throws TException { diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java new file mode 100644 index 0000000000..f2211eb202 --- /dev/null +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/HiveUtils.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common; + +import org.apache.doris.broker.hdfs.BrokerException; +import org.apache.doris.thrift.TBrokerOperationStatusCode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; + +public class HiveUtils { + private static final Logger logger = Logger.getLogger(HiveUtils.class.getName()); + + public static boolean isSplittable(String path, String inputFormatName, + Map properties) throws BrokerException { + JobConf jobConf = getJobConf(properties); + InputFormat inputFormat = getInputFormat(jobConf, inputFormatName); + return isSplittableInternal(inputFormat, new Path(path), jobConf); + } + + private static JobConf getJobConf(Map properties) { + Configuration configuration = new Configuration(); + for (Map.Entry entry : properties.entrySet()) { + configuration.set(entry.getKey(), entry.getValue()); + } + return new JobConf(configuration); + } + + private static InputFormat getInputFormat(JobConf jobConf, String inputFormatName) throws BrokerException { + try { + Class> inputFormatClass = getInputFormatClass(jobConf, inputFormatName); + if (inputFormatClass == SymlinkTextInputFormat.class) { + // symlink targets are always TextInputFormat + inputFormatClass = TextInputFormat.class; + } + + return ReflectionUtils.newInstance(inputFormatClass, jobConf); + } catch (ClassNotFoundException | RuntimeException e) { + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + "Unable to create input format " + inputFormatName, e); + } + } + + @SuppressWarnings({"unchecked", "RedundantCast"}) + private static Class> getInputFormatClass(JobConf conf, String inputFormatName) + throws ClassNotFoundException { + // CDH uses different names for Parquet + if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName) + || "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) { + return MapredParquetInputFormat.class; + } + + Class clazz = conf.getClassByName(inputFormatName); + return (Class>) clazz.asSubclass(InputFormat.class); + } + + private static boolean isSplittableInternal(InputFormat inputFormat, Path path, JobConf jobConf) { + // ORC uses a custom InputFormat but is always splittable + if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) { + return true; + } + + // use reflection to get isSplittable method on FileInputFormat + Method method = null; + for (Class clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) { + try { + method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class); + break; + } catch (NoSuchMethodException ignored) { + logger.warn(LoggerMessageFormat.format("Class {} doesn't contain isSplitable method", clazz)); + } + } + + if (method == null) { + return false; + } + try { + method.setAccessible(true); + return (boolean) method.invoke(inputFormat, path.getFileSystem(jobConf), path); + } catch (InvocationTargetException | IllegalAccessException | IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/gensrc/thrift/PaloBrokerService.thrift b/gensrc/thrift/PaloBrokerService.thrift index 308c606544..e4bc60a201 100644 --- a/gensrc/thrift/PaloBrokerService.thrift +++ b/gensrc/thrift/PaloBrokerService.thrift @@ -91,12 +91,25 @@ struct TBrokerCheckPathExistResponse { 2: required bool isPathExist; } +struct TBrokerIsSplittableResponse { + 1: optional TBrokerOperationStatus opStatus; + 2: optional bool splittable; +} + struct TBrokerListPathRequest { 1: required TBrokerVersion version; 2: required string path; 3: required bool isRecursive; 4: required map properties; 5: optional bool fileNameOnly; + 6: optional bool onlyFiles; +} + +struct TBrokerIsSplittableRequest { + 1: optional TBrokerVersion version; + 2: optional string path; + 3: optional string inputFormat; + 4: optional map properties; } struct TBrokerDeletePathRequest { @@ -184,6 +197,13 @@ service TPaloBrokerService { // return a list of files under a path TBrokerListResponse listPath(1: TBrokerListPathRequest request); + + // return located files of a given path. A broker implementation refers to + // 'org.apache.doris.fs.remote.RemoteFileSystem#listLocatedFiles' in fe-core. + TBrokerListResponse listLocatedFiles(1: TBrokerListPathRequest request); + + // return whether the path with specified input format is splittable. + TBrokerIsSplittableResponse isSplittable(1: TBrokerIsSplittableRequest request); // delete a file, if the deletion of the file fails, the status code will return an error message // input: