[bugfix](hive)Handle exceptions when submitting tasks fails to prevent dead loop for 2.1 (#40708) (#41052)

bp: #40708
This commit is contained in:
wuwenchi
2024-09-21 08:47:31 +08:00
committed by GitHub
parent c6a6adb3a4
commit e568ba3b4c

View File

@ -257,30 +257,32 @@ public class HiveScanNode extends FileQueryScanNode {
}
try {
splittersOnFlight.acquire();
} catch (InterruptedException e) {
CompletableFuture.runAsync(() -> {
try {
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(
cache, Collections.singletonList(partition), allFiles, bindBrokerName);
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
splitAssignment.addToQueue(allFiles);
} catch (IOException e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
splittersOnFlight.release();
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) {
splitAssignment.finishSchedule();
}
}
}, scheduleExecutor);
} catch (Exception e) {
// When submitting a task, an exception will be thrown if the task pool(scheduleExecutor) is full
batchException.set(new UserException(e.getMessage(), e));
break;
}
CompletableFuture.runAsync(() -> {
try {
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(cache, Collections.singletonList(partition), allFiles, bindBrokerName);
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
splitAssignment.addToQueue(allFiles);
} catch (IOException e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
splittersOnFlight.release();
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) {
splitAssignment.finishSchedule();
}
}
}, scheduleExecutor);
}
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());