bp #35027
This commit is contained in:
@ -355,7 +355,7 @@ public class HiveMetaStoreCache {
|
||||
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
|
||||
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
|
||||
location, bindBrokerName), properties, bindBrokerName));
|
||||
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
|
||||
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
|
||||
// For Tez engine, it may generate subdirectoies for "union" query.
|
||||
// So there may be files and directories in the table directory at the same time. eg:
|
||||
// /user/hive/warehouse/region_tmp_union_all2/000000_0
|
||||
|
||||
@ -110,7 +110,7 @@ public final class HiveUtil {
|
||||
}
|
||||
|
||||
public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inputFormat,
|
||||
String location, JobConf jobConf) throws UserException {
|
||||
String location) throws UserException {
|
||||
if (remoteFileSystem instanceof BrokerFileSystem) {
|
||||
return ((BrokerFileSystem) remoteFileSystem).isSplittable(location, inputFormat);
|
||||
}
|
||||
|
||||
@ -77,6 +77,7 @@ import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HudiScanNode extends HiveScanNode {
|
||||
@ -329,49 +330,58 @@ public class HudiScanNode extends HiveScanNode {
|
||||
private void getPartitionSplits(List<HivePartition> partitions, List<Split> splits) {
|
||||
Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
|
||||
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
|
||||
AtomicReference<Throwable> throwable = new AtomicReference<>();
|
||||
partitions.forEach(partition -> executor.execute(() -> {
|
||||
String globPath;
|
||||
String partitionName = "";
|
||||
if (partition.isDummyPartition()) {
|
||||
globPath = hudiClient.getBasePathV2().toString() + "/*";
|
||||
} else {
|
||||
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
|
||||
new Path(partition.getPath()));
|
||||
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
|
||||
}
|
||||
List<FileStatus> statuses;
|
||||
try {
|
||||
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
|
||||
new Path(globPath));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
|
||||
}
|
||||
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
|
||||
timeline, statuses.toArray(new FileStatus[0]));
|
||||
String globPath;
|
||||
String partitionName = "";
|
||||
if (partition.isDummyPartition()) {
|
||||
globPath = hudiClient.getBasePathV2().toString() + "/*";
|
||||
} else {
|
||||
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
|
||||
new Path(partition.getPath()));
|
||||
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
|
||||
}
|
||||
List<FileStatus> statuses;
|
||||
try {
|
||||
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
|
||||
new Path(globPath));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
|
||||
}
|
||||
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
|
||||
timeline, statuses.toArray(new FileStatus[0]));
|
||||
|
||||
if (isCowOrRoTable) {
|
||||
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
|
||||
noLogsSplitNum.incrementAndGet();
|
||||
String filePath = baseFile.getPath();
|
||||
long fileSize = baseFile.getFileSize();
|
||||
// Need add hdfs host to location
|
||||
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
|
||||
Path splitFilePath = locationPath.toStorageLocation();
|
||||
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
|
||||
new String[0], partition.getPartitionValues()));
|
||||
});
|
||||
} else {
|
||||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
|
||||
.forEach(fileSlice -> splits.add(
|
||||
generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant)));
|
||||
if (isCowOrRoTable) {
|
||||
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
|
||||
noLogsSplitNum.incrementAndGet();
|
||||
String filePath = baseFile.getPath();
|
||||
long fileSize = baseFile.getFileSize();
|
||||
// Need add hdfs host to location
|
||||
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
|
||||
Path splitFilePath = locationPath.toStorageLocation();
|
||||
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
|
||||
new String[0], partition.getPartitionValues()));
|
||||
});
|
||||
} else {
|
||||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
|
||||
.forEach(fileSlice -> splits.add(
|
||||
generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant)));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throwable.set(t);
|
||||
} finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
countDownLatch.countDown();
|
||||
}));
|
||||
try {
|
||||
countDownLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
if (throwable.get() != null) {
|
||||
throw new RuntimeException(throwable.get().getMessage(), throwable.get());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user