From 05cedfca4e5238bcfe610c1cc573c34e20ed57fd Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 22 May 2024 18:44:19 +0800 Subject: [PATCH] [fix](hudi) catch exception when getting hudi partition (#35027) (#35159) bp #35027 --- .../datasource/hive/HiveMetaStoreCache.java | 2 +- .../doris/datasource/hive/HiveUtil.java | 2 +- .../datasource/hudi/source/HudiScanNode.java | 76 +++++++++++-------- 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index b97284eda9..a22e951be4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -355,7 +355,7 @@ public class HiveMetaStoreCache { RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( location, bindBrokerName), properties, bindBrokerName)); - result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf)); + result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location)); // For Tez engine, it may generate subdirectoies for "union" query. // So there may be files and directories in the table directory at the same time. eg: // /user/hive/warehouse/region_tmp_union_all2/000000_0 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java index bca04215fc..5ca42dd024 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java @@ -110,7 +110,7 @@ public final class HiveUtil { } public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inputFormat, - String location, JobConf jobConf) throws UserException { + String location) throws UserException { if (remoteFileSystem instanceof BrokerFileSystem) { return ((BrokerFileSystem) remoteFileSystem).isSplittable(location, inputFormat); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 8dd853a48f..61edc333f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -77,6 +77,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class HudiScanNode extends HiveScanNode { @@ -329,49 +330,58 @@ public class HudiScanNode extends HiveScanNode { private void getPartitionSplits(List partitions, List splits) { Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor(); CountDownLatch countDownLatch = new CountDownLatch(partitions.size()); + AtomicReference throwable = new AtomicReference<>(); partitions.forEach(partition -> executor.execute(() -> { - String globPath; - String partitionName = ""; - if (partition.isDummyPartition()) { - globPath = hudiClient.getBasePathV2().toString() + "/*"; - } else { - partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), - new Path(partition.getPath())); - globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); - } - List statuses; try { - statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), - new Path(globPath)); - } catch (IOException e) { - throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e); - } - HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, - timeline, statuses.toArray(new FileStatus[0])); + String globPath; + String partitionName = ""; + if (partition.isDummyPartition()) { + globPath = hudiClient.getBasePathV2().toString() + "/*"; + } else { + partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(), + new Path(partition.getPath())); + globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName); + } + List statuses; + try { + statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(), + new Path(globPath)); + } catch (IOException e) { + throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e); + } + HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient, + timeline, statuses.toArray(new FileStatus[0])); - if (isCowOrRoTable) { - fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { - noLogsSplitNum.incrementAndGet(); - String filePath = baseFile.getPath(); - long fileSize = baseFile.getFileSize(); - // Need add hdfs host to location - LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); - Path splitFilePath = locationPath.toStorageLocation(); - splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, - new String[0], partition.getPartitionValues())); - }); - } else { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) - .forEach(fileSlice -> splits.add( - generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant))); + if (isCowOrRoTable) { + fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> { + noLogsSplitNum.incrementAndGet(); + String filePath = baseFile.getPath(); + long fileSize = baseFile.getFileSize(); + // Need add hdfs host to location + LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties()); + Path splitFilePath = locationPath.toStorageLocation(); + splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize, + new String[0], partition.getPartitionValues())); + }); + } else { + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) + .forEach(fileSlice -> splits.add( + generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant))); + } + } catch (Throwable t) { + throwable.set(t); + } finally { + countDownLatch.countDown(); } - countDownLatch.countDown(); })); try { countDownLatch.await(); } catch (InterruptedException e) { throw new RuntimeException(e.getMessage(), e); } + if (throwable.get() != null) { + throw new RuntimeException(throwable.get().getMessage(), throwable.get()); + } } @Override