[improvement](fe and broker) support specify broker to getSplits, check isSplitable, file scan for HMS Multi-catalog (#24830)

I want to use Doris Multi-catalog to accelerate HMS query. My organization has custom distributed file system, and we think wrapping the fs access difference into broker (listLocatedFiles, openReader..) would be a elegant approach.

This pr introduce HMS catalog conf `bind.broker.name`. If we set this conf, file split, query scan operation will send to broker.

usage:
create a hms catalog with broker usage
```
CREATE CATALOG hive_catalog_broker PROPERTIES (
    'type'='hms',
    'hive.metastore.uris' = 'thrift://xxx',
    'broker.name' = 'hdfs_broker'
);
```
When we try to query from this catalog, file split and query scan request will send to broker `hdfs_broker`.

More details about this pr:
1. Introduce HMS catalog proporty `bind.broker.name` to specify broker name to do remote path work. When `broker.name` is set, `enable.self.splitter` must be `true` to ensure file splitting process is executed in Fe
2. Introduce 2 more interfaces to broker service:
- `TBrokerIsSplittableResponse isSplittable(1: TBrokerIsSplittableRequest request)`, helps to invoke input format `isSplitable` interface.
- `TBrokerListResponse listLocatedFiles(1: TBrokerListPathRequest request)`, helps to do `listFiles` or `listLocatedStatus` for remote file system
3. 3 parts of whole processing will be executed in broker:
- Check whether the path with specified input format name `isSplittable`
- `listLocatedFiles` of table / partition locations.
- `OpenReader` for specified file splits.

Co-authored-by: chenlinzhong <490103404@qq.com>
This commit is contained in:
DuRipeng
2023-10-13 11:04:38 +08:00
committed by GitHub
parent ed67d5a2c2
commit aa0b74d63a
17 changed files with 447 additions and 46 deletions

View File

@ -401,6 +401,14 @@ If the variable `truncate_char_or_varchar_columns` is enabled, when the maximum
The variable default is false.
## Access HMS with broker
Add following setting when creating an HMS catalog, file splitting and scanning for Hive external table will be completed by broker named `test_broker`
```sql
"broker.name" = "test_broker"
```
## Integrate with Apache Ranger
Apache Ranger is a security framework for monitoring, enabling services, and comprehensive data security access management on the Hadoop platform.

View File

@ -380,6 +380,14 @@ CREATE CATALOG hive PROPERTIES (
该变量默认为 false。
## 使用 broker 访问 HMS
创建 HMS Catalog 时增加如下配置,Hive 外表文件分片和文件扫描将会由名为 `test_broker` 的 broker 完成
```sql
"broker.name" = "test_broker"
```
## 使用 Ranger 进行权限校验
Apache Ranger是一个用来在Hadoop平台上进行监控,启用服务,以及全方位数据安全访问管理的安全框架。

View File

@ -593,6 +593,14 @@ public abstract class ExternalCatalog
return ret;
}
public String bindBrokerName() {
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
}
return null;
}
@Override
public Collection<DatabaseIf<? extends TableIf>> getAllDbs() {
makeSureInitialized();

View File

@ -61,6 +61,8 @@ public class HMSExternalCatalog extends ExternalCatalog {
private long lastSyncedEventId = -1L;
public static final String ENABLE_SELF_SPLITTER = "enable.self.splitter";
public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second";
// broker name for file split and query scan.
public static final String BIND_BROKER_NAME = "broker.name";
private static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed";
// -1 means file cache no ttl set

View File

@ -365,11 +365,13 @@ public class HiveMetaStoreCache {
// Get File Status by using FileSystem API.
private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat,
JobConf jobConf,
List<String> partitionValues) throws UserException {
List<String> partitionValues,
String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
result.setSplittable(HiveUtil.isSplittable(inputFormat, new Path(location), jobConf));
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(
location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
try {
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
@ -429,7 +431,8 @@ public class HiveMetaStoreCache {
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
if (key.useSelfSplitter) {
result = getFileCache(finalLocation, inputFormat, jobConf, key.getPartitionValues());
result = getFileCache(finalLocation, inputFormat, jobConf,
key.getPartitionValues(), key.bindBrokerName);
} else {
InputSplit[] splits;
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
@ -508,23 +511,23 @@ public class HiveMetaStoreCache {
}
public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions,
boolean useSelfSplitter) {
return getFilesByPartitions(partitions, useSelfSplitter, true);
boolean useSelfSplitter, String bindBrokerName) {
return getFilesByPartitions(partitions, useSelfSplitter, true, bindBrokerName);
}
public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
boolean useSelfSplitter) {
return getFilesByPartitions(partitions, useSelfSplitter, false);
boolean useSelfSplitter, String bindBrokerName) {
return getFilesByPartitions(partitions, useSelfSplitter, false, bindBrokerName);
}
private List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
boolean useSelfSplitter, boolean withCache) {
boolean useSelfSplitter, boolean withCache, String bindBrokerName) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys = partitions.stream().map(p -> {
FileCacheKey fileCacheKey = p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(),
p.getInputFormat(), useSelfSplitter)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues());
p.getInputFormat(), useSelfSplitter, bindBrokerName)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName);
fileCacheKey.setUseSelfSplitter(useSelfSplitter);
return fileCacheKey;
}).collect(Collectors.toList());
@ -602,7 +605,7 @@ public class HiveMetaStoreCache {
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
null, partition.getPartitionValues()));
null, partition.getPartitionValues(), null));
partitionCache.invalidate(partKey);
}
}
@ -620,7 +623,7 @@ public class HiveMetaStoreCache {
* and FE will exit if some network problems occur.
* */
FileCacheKey fileCacheKey = FileCacheKey.createDummyCacheKey(
dbName, tblName, null, null, false);
dbName, tblName, null, null, false, null);
fileCacheRef.get().invalidate(fileCacheKey);
}
}
@ -635,7 +638,7 @@ public class HiveMetaStoreCache {
HivePartition partition = partitionCache.getIfPresent(partKey);
if (partition != null) {
fileCacheRef.get().invalidate(new FileCacheKey(partition.getPath(),
null, partition.getPartitionValues()));
null, partition.getPartitionValues(), null));
partitionCache.invalidate(partKey);
}
}
@ -781,7 +784,7 @@ public class HiveMetaStoreCache {
}
public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
boolean isFullAcid, long tableId) {
boolean isFullAcid, long tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
try {
@ -812,7 +815,8 @@ public class HiveMetaStoreCache {
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString()), jobConf));
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName), jobConf, bindBrokerName));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
@ -833,7 +837,9 @@ public class HiveMetaStoreCache {
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter(
@ -851,7 +857,9 @@ public class HiveMetaStoreCache {
if (directory.getBaseDirectory() != null) {
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(location), jobConf));
new FileSystemCache.FileSystemCacheKey(
FileSystemFactory.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
locatedFiles.files().stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
@ -949,6 +957,8 @@ public class HiveMetaStoreCache {
private String location;
// not in key
private String inputFormat;
// Broker name for file split and file scan.
private String bindBrokerName;
// Temp variable, use self file splitter or use InputFormat.getSplits.
// Will remove after self splitter is stable.
private boolean useSelfSplitter;
@ -957,16 +967,18 @@ public class HiveMetaStoreCache {
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;
public FileCacheKey(String location, String inputFormat, List<String> partitionValues) {
public FileCacheKey(String location, String inputFormat, List<String> partitionValues, String bindBrokerName) {
this.location = location;
this.inputFormat = inputFormat;
this.partitionValues = partitionValues == null ? Lists.newArrayList() : partitionValues;
this.useSelfSplitter = true;
this.bindBrokerName = bindBrokerName;
}
public static FileCacheKey createDummyCacheKey(String dbName, String tblName, String location,
String inputFormat, boolean useSelfSplitter) {
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null);
String inputFormat, boolean useSelfSplitter,
String bindBrokerName) {
FileCacheKey fileCacheKey = new FileCacheKey(location, inputFormat, null, bindBrokerName);
fileCacheKey.dummyKey = dbName + "." + tblName;
fileCacheKey.useSelfSplitter = useSelfSplitter;
return fileCacheKey;

View File

@ -24,6 +24,8 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
@ -184,12 +186,17 @@ public final class HiveUtil {
}
}
public static boolean isSplittable(InputFormat<?, ?> inputFormat, Path path, JobConf jobConf) {
public static boolean isSplittable(RemoteFileSystem remoteFileSystem, InputFormat<?, ?> inputFormat,
String location, JobConf jobConf) throws UserException {
if (remoteFileSystem instanceof BrokerFileSystem) {
return ((BrokerFileSystem) remoteFileSystem)
.isSplittable(location, inputFormat.getClass().getCanonicalName());
}
// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
return true;
}
// use reflection to get isSplitable method on FileInputFormat
// ATTN: the method name is actually "isSplitable", but the right spell is "isSplittable"
Method method = null;
@ -205,6 +212,7 @@ public final class HiveUtil {
if (method == null) {
return false;
}
Path path = new Path(location);
try {
method.setAccessible(true);
return (boolean) method.invoke(inputFormat, FileSystemFactory.getNativeByPath(path, jobConf), path);

View File

@ -53,7 +53,7 @@ public class FileSystemCache {
}
private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
return FileSystemFactory.getByType(key.type, key.conf);
return FileSystemFactory.getRemoteFileSystem(key.type, key.conf, key.bindBrokerName);
}
public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
@ -69,11 +69,13 @@ public class FileSystemCache {
// eg: hdfs://nameservices1
private final String fsIdent;
private final JobConf conf;
private final String bindBrokerName;
public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf conf) {
public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf conf, String bindBrokerName) {
this.type = fs.first;
this.fsIdent = fs.second;
this.conf = conf;
this.bindBrokerName = bindBrokerName;
}
@Override
@ -84,14 +86,21 @@ public class FileSystemCache {
if (!(obj instanceof FileSystemCacheKey)) {
return false;
}
return type.equals(((FileSystemCacheKey) obj).type)
boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) obj).type)
&& fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
&& conf == ((FileSystemCacheKey) obj).conf;
if (bindBrokerName == null) {
return equalsWithoutBroker;
}
return equalsWithoutBroker && bindBrokerName.equals(((FileSystemCacheKey) obj).bindBrokerName);
}
@Override
public int hashCode() {
return Objects.hash(conf, fsIdent, type);
if (bindBrokerName == null) {
return Objects.hash(conf, fsIdent, type);
}
return Objects.hash(conf, fsIdent, type, bindBrokerName);
}
}
}

View File

@ -56,9 +56,11 @@ public class FileSystemFactory {
}
}
public static Pair<FileSystemType, String> getFSIdentity(String location) {
public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) {
FileSystemType fsType;
if (S3Util.isObjStorage(location)) {
if (bindBrokerName != null) {
fsType = FileSystemType.BROKER;
} else if (S3Util.isObjStorage(location)) {
if (S3Util.isHdfsOnOssEndpoint(location)) {
// if hdfs service is enabled on oss, use hdfs lib to access oss.
fsType = FileSystemType.DFS;
@ -83,7 +85,8 @@ public class FileSystemFactory {
return Pair.of(fsType, fsIdent);
}
public static RemoteFileSystem getByType(FileSystemType type, Configuration conf) {
public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf,
String bindBrokerName) {
Map<String, String> properties = new HashMap<>();
conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue()));
switch (type) {
@ -95,6 +98,8 @@ public class FileSystemFactory {
return new OFSFileSystem(properties);
case JFS:
return new JFSFileSystem(properties);
case BROKER:
return new BrokerFileSystem(bindBrokerName, properties);
default:
throw new IllegalStateException("Not supported file system type: " + type);
}

View File

@ -22,5 +22,6 @@ public enum FileSystemType {
DFS,
OFS,
JFS,
BROKER,
FILE
}

View File

@ -24,8 +24,10 @@ import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.RemoteFiles;
import org.apache.doris.fs.operations.BrokerFileOperations;
import org.apache.doris.fs.operations.OpParams;
import org.apache.doris.service.FrontendOptions;
@ -34,6 +36,8 @@ import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
import org.apache.doris.thrift.TBrokerDeletePathRequest;
import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TBrokerIsSplittableRequest;
import org.apache.doris.thrift.TBrokerIsSplittableResponse;
import org.apache.doris.thrift.TBrokerListPathRequest;
import org.apache.doris.thrift.TBrokerListResponse;
import org.apache.doris.thrift.TBrokerOperationStatus;
@ -65,6 +69,7 @@ import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -556,6 +561,88 @@ public class BrokerFileSystem extends RemoteFileSystem {
return Status.OK;
}
@Override
public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
throw new UserException("failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// invoke broker 'listLocatedFiles' interface
boolean needReturn = true;
try {
TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath,
recursive, properties);
req.setOnlyFiles(onlyFiles);
TBrokerListResponse response = client.listLocatedFiles(req);
TBrokerOperationStatus operationStatus = response.getOpStatus();
if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("failed to listLocatedFiles, remote path: " + remotePath + ". msg: "
+ operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
}
List<RemoteFile> result = new ArrayList<>();
List<TBrokerFileStatus> fileStatus = response.getFiles();
for (TBrokerFileStatus tFile : fileStatus) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(tFile.path);
RemoteFile file = new RemoteFile(path.getName(), path, !tFile.isDir, tFile.isDir, tFile.size,
tFile.getBlockSize(), tFile.getModificationTime(), null /* blockLocations is null*/);
result.add(file);
}
LOG.info("finished to listLocatedFiles, remote path {}. get files: {}", remotePath, result);
return new RemoteFiles(result);
} catch (TException e) {
needReturn = false;
throw new UserException("failed to listLocatedFiles, remote path: "
+ remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
}
public boolean isSplittable(String remotePath, String inputFormat) throws UserException {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
throw new UserException("failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// invoke 'isSplittable' interface
boolean needReturn = true;
try {
TBrokerIsSplittableRequest req = new TBrokerIsSplittableRequest().setVersion(TBrokerVersion.VERSION_ONE)
.setPath(remotePath).setInputFormat(inputFormat).setProperties(properties);
TBrokerIsSplittableResponse response = client.isSplittable(req);
TBrokerOperationStatus operationStatus = response.getOpStatus();
if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("failed to get path isSplittable, remote path: " + remotePath + ". msg: "
+ operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
}
boolean result = response.isSplittable();
LOG.info("finished to get path isSplittable, remote path {} with format {}, isSplittable: {}",
remotePath, inputFormat, result);
return result;
} catch (TException e) {
needReturn = false;
throw new UserException("failed to get path isSplittable, remote path: "
+ remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
}
// List files in remotePath
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {

View File

@ -196,8 +196,14 @@ public class HiveScanNode extends FileQueryScanNode {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter();
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
if (bindBrokerName != null && useSelfSplitter == false) {
// useSelfSplitter must be true if bindBrokerName is set.
throw new UserException(HMSExternalCatalog.ENABLE_SELF_SPLITTER + " should be true if "
+ HMSExternalCatalog.BIND_BROKER_NAME + " is set");
}
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(cache, getPartitions(), allFiles, useSelfSplitter);
getFileSplitByPartitions(cache, getPartitions(), allFiles, useSelfSplitter, bindBrokerName);
LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), (System.currentTimeMillis() - start));
return allFiles;
@ -210,12 +216,13 @@ public class HiveScanNode extends FileQueryScanNode {
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
List<Split> allFiles, boolean useSelfSplitter) throws IOException {
List<Split> allFiles, boolean useSelfSplitter,
String bindBrokerName) throws IOException {
List<FileCacheValue> fileCaches;
if (hiveTransaction != null) {
fileCaches = getFileSplitByTransaction(cache, partitions);
fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
} else {
fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter);
fileCaches = cache.getFilesByPartitionsWithCache(partitions, useSelfSplitter, bindBrokerName);
}
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
@ -287,7 +294,8 @@ public class HiveScanNode extends FileQueryScanNode {
return fileList.subList(0, index);
}
private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache, List<HivePartition> partitions) {
private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache, List<HivePartition> partitions,
String bindBrokerName) {
for (HivePartition partition : partitions) {
if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) {
// this is unpartitioned table.
@ -297,7 +305,8 @@ public class HiveScanNode extends FileQueryScanNode {
}
ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
return cache.getFilesByTransaction(partitions, validWriteIds, hiveTransaction.isFullAcid(), hmsTable.getId());
return cache.getFilesByTransaction(partitions, validWriteIds,
hiveTransaction.isFullAcid(), hmsTable.getId(), bindBrokerName);
}
@Override
@ -319,6 +328,10 @@ public class HiveScanNode extends FileQueryScanNode {
@Override
protected TFileType getLocationType(String location) throws UserException {
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
if (bindBrokerName != null) {
return TFileType.FILE_BROKER;
}
return getTFileType(location).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
}

View File

@ -669,7 +669,8 @@ public class StatisticsUtil {
table.getRemoteTable().getSd().getLocation(), null));
}
// Get files for all partitions.
return cache.getFilesByPartitionsWithoutCache(hivePartitions, true);
String bindBrokerName = table.getCatalog().bindBrokerName();
return cache.getFilesByPartitionsWithoutCache(hivePartitions, true, bindBrokerName);
}
/**

View File

@ -69,9 +69,10 @@ under the License.
<maven.compiler.target>1.8</maven.compiler.target>
<log4j2.version>2.18.0</log4j2.version>
<project.scm.id>github</project.scm.id>
<hadoop.version>2.10.2</hadoop.version>
<hadoop.version>3.3.6</hadoop.version>
<netty.version>4.1.65.Final</netty.version>
<gcs.version>hadoop2-2.2.15</gcs.version>
<doris.hive.catalog.shade.version>1.0.1</doris.hive.catalog.shade.version>
</properties>
<profiles>
<!-- for custom internal repository -->
@ -224,6 +225,29 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.doris/hive-catalog-shade -->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>hive-catalog-shade</artifactId>
<version>${doris.hive.catalog.shade.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>

View File

@ -25,17 +25,19 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import java.io.File;
import java.io.FileNotFoundException;
@ -1062,6 +1064,49 @@ public class FileSystemManager {
}
}
public List<TBrokerFileStatus> listLocatedFiles(String path, boolean onlyFiles,
boolean recursive, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
BrokerFileSystem fileSystem = getFileSystem(path, properties);
Path locatedPath = new Path(path);
try {
FileSystem innerFileSystem = fileSystem.getDFSFileSystem();
RemoteIterator<LocatedFileStatus> locatedFiles = onlyFiles ? innerFileSystem.listFiles(locatedPath, recursive)
: innerFileSystem.listLocatedStatus(locatedPath);
return getFileLocations(locatedFiles);
} catch (FileNotFoundException e) {
logger.info("file not found: " + e.getMessage());
throw new BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND,
e, "file not found");
} catch (Exception e) {
logger.error("errors while get file status ", e);
fileSystem.closeFileSystem();
throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "unknown error when listLocatedFiles");
}
}
private List<TBrokerFileStatus> getFileLocations(RemoteIterator<LocatedFileStatus> locatedFiles) throws IOException {
List<TBrokerFileStatus> locations = new ArrayList<>();
while (locatedFiles.hasNext()) {
LocatedFileStatus fileStatus = locatedFiles.next();
TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus();
brokerFileStatus.setPath(fileStatus.getPath().toString());
brokerFileStatus.setIsDir(fileStatus.isDirectory());
if (fileStatus.isDirectory()) {
brokerFileStatus.setIsSplitable(false);
brokerFileStatus.setSize(-1);
} else {
brokerFileStatus.setSize(fileStatus.getLen());
brokerFileStatus.setIsSplitable(true);
}
brokerFileStatus.setModificationTime(fileStatus.getModificationTime());
brokerFileStatus.setBlockSize(fileStatus.getBlockSize());
locations.add(brokerFileStatus);
}
return locations;
}
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
@ -1283,13 +1328,7 @@ public class FileSystemManager {
FSDataOutputStream fsDataOutputStream = clientContextManager.getFsDataOutputStream(fd);
synchronized (fsDataOutputStream) {
long currentStreamOffset;
try {
currentStreamOffset = fsDataOutputStream.getPos();
} catch (IOException e) {
logger.error("errors while get file pos from output stream", e);
throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
"errors while get file pos from output stream");
}
currentStreamOffset = fsDataOutputStream.getPos();
if (currentStreamOffset != offset) {
// it's ok, it means that last pwrite succeed finally
if (currentStreamOffset == offset + data.length) {

View File

@ -19,6 +19,7 @@ package org.apache.doris.broker.hdfs;
import com.google.common.base.Stopwatch;
import org.apache.doris.common.BrokerPerfMonitor;
import org.apache.doris.common.HiveUtils;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
import org.apache.doris.thrift.TBrokerCloseReaderRequest;
@ -28,6 +29,8 @@ import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TBrokerFileSizeRequest;
import org.apache.doris.thrift.TBrokerFileSizeResponse;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TBrokerIsSplittableResponse;
import org.apache.doris.thrift.TBrokerIsSplittableRequest;
import org.apache.doris.thrift.TBrokerListPathRequest;
import org.apache.doris.thrift.TBrokerListResponse;
import org.apache.doris.thrift.TBrokerOpenReaderRequest;
@ -86,6 +89,47 @@ public class HDFSBrokerServiceImpl implements TPaloBrokerService.Iface {
}
}
@Override
public TBrokerListResponse listLocatedFiles(TBrokerListPathRequest request)
throws TException {
logger.info("received a listLocatedFiles request, request detail: " + request);
TBrokerListResponse response = new TBrokerListResponse();
try {
boolean recursive = request.isIsRecursive();
boolean onlyFiles = false;
if (request.isSetOnlyFiles()) {
onlyFiles = request.isOnlyFiles();
}
List<TBrokerFileStatus> fileStatuses = fileSystemManager.listLocatedFiles(request.path,
onlyFiles, recursive, request.properties);
response.setOpStatus(generateOKStatus());
response.setFiles(fileStatuses);
return response;
} catch (BrokerException e) {
logger.warn("failed to list path: " + request.path, e);
TBrokerOperationStatus errorStatus = e.generateFailedOperationStatus();
response.setOpStatus(errorStatus);
return response;
}
}
@Override
public TBrokerIsSplittableResponse isSplittable(TBrokerIsSplittableRequest request) throws TException {
logger.info("received a isSplittable request, request detail: " + request);
TBrokerIsSplittableResponse response = new TBrokerIsSplittableResponse();
try {
boolean isSplittable = HiveUtils.isSplittable(request.path, request.inputFormat, request.properties);
response.setOpStatus(generateOKStatus());
response.setSplittable(isSplittable);
return response;
} catch (BrokerException e) {
logger.warn("failed to get isSplitable with path: " + request.path, e);
TBrokerOperationStatus errorStatus = e.generateFailedOperationStatus();
response.setOpStatus(errorStatus);
return response;
}
}
@Override
public TBrokerOperationStatus deletePath(TBrokerDeletePathRequest request)
throws TException {

View File

@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common;
import org.apache.doris.broker.hdfs.BrokerException;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
public class HiveUtils {
private static final Logger logger = Logger.getLogger(HiveUtils.class.getName());
public static boolean isSplittable(String path, String inputFormatName,
Map<String, String> properties) throws BrokerException {
JobConf jobConf = getJobConf(properties);
InputFormat inputFormat = getInputFormat(jobConf, inputFormatName);
return isSplittableInternal(inputFormat, new Path(path), jobConf);
}
private static JobConf getJobConf(Map<String, String> properties) {
Configuration configuration = new Configuration();
for (Map.Entry<String, String> entry : properties.entrySet()) {
configuration.set(entry.getKey(), entry.getValue());
}
return new JobConf(configuration);
}
private static InputFormat<?, ?> getInputFormat(JobConf jobConf, String inputFormatName) throws BrokerException {
try {
Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
if (inputFormatClass == SymlinkTextInputFormat.class) {
// symlink targets are always TextInputFormat
inputFormatClass = TextInputFormat.class;
}
return ReflectionUtils.newInstance(inputFormatClass, jobConf);
} catch (ClassNotFoundException | RuntimeException e) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"Unable to create input format " + inputFormatName, e);
}
}
@SuppressWarnings({"unchecked", "RedundantCast"})
private static Class<? extends InputFormat<?, ?>> getInputFormatClass(JobConf conf, String inputFormatName)
throws ClassNotFoundException {
// CDH uses different names for Parquet
if ("parquet.hive.DeprecatedParquetInputFormat".equals(inputFormatName)
|| "parquet.hive.MapredParquetInputFormat".equals(inputFormatName)) {
return MapredParquetInputFormat.class;
}
Class<?> clazz = conf.getClassByName(inputFormatName);
return (Class<? extends InputFormat<?, ?>>) clazz.asSubclass(InputFormat.class);
}
private static boolean isSplittableInternal(InputFormat<?, ?> inputFormat, Path path, JobConf jobConf) {
// ORC uses a custom InputFormat but is always splittable
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
return true;
}
// use reflection to get isSplittable method on FileInputFormat
Method method = null;
for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
try {
method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class);
break;
} catch (NoSuchMethodException ignored) {
logger.warn(LoggerMessageFormat.format("Class {} doesn't contain isSplitable method", clazz));
}
}
if (method == null) {
return false;
}
try {
method.setAccessible(true);
return (boolean) method.invoke(inputFormat, path.getFileSystem(jobConf), path);
} catch (InvocationTargetException | IllegalAccessException | IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -91,12 +91,25 @@ struct TBrokerCheckPathExistResponse {
2: required bool isPathExist;
}
struct TBrokerIsSplittableResponse {
1: optional TBrokerOperationStatus opStatus;
2: optional bool splittable;
}
struct TBrokerListPathRequest {
1: required TBrokerVersion version;
2: required string path;
3: required bool isRecursive;
4: required map<string,string> properties;
5: optional bool fileNameOnly;
6: optional bool onlyFiles;
}
struct TBrokerIsSplittableRequest {
1: optional TBrokerVersion version;
2: optional string path;
3: optional string inputFormat;
4: optional map<string,string> properties;
}
struct TBrokerDeletePathRequest {
@ -184,6 +197,13 @@ service TPaloBrokerService {
// return a list of files under a path
TBrokerListResponse listPath(1: TBrokerListPathRequest request);
// return located files of a given path. A broker implementation refers to
// 'org.apache.doris.fs.remote.RemoteFileSystem#listLocatedFiles' in fe-core.
TBrokerListResponse listLocatedFiles(1: TBrokerListPathRequest request);
// return whether the path with specified input format is splittable.
TBrokerIsSplittableResponse isSplittable(1: TBrokerIsSplittableRequest request);
// delete a file, if the deletion of the file fails, the status code will return an error message
// input: