@ -73,6 +73,7 @@ import java.util.StringJoiner;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
@ -315,6 +316,7 @@ public class HMSTransaction implements Transaction {
|
||||
throw t;
|
||||
} finally {
|
||||
hmsCommitter.runClearPathsForFinish();
|
||||
hmsCommitter.shutdownExecutorService();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1110,7 +1112,7 @@ public class HMSTransaction implements Transaction {
|
||||
|
||||
// update statistics for unPartitioned table or existed partition
|
||||
private final List<UpdateStatisticsTask> updateStatisticsTasks = new ArrayList<>();
|
||||
Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16);
|
||||
ExecutorService updateStatisticsExecutor = Executors.newFixedThreadPool(16);
|
||||
|
||||
// add new partition
|
||||
private final AddPartitionsTask addPartitionsTask = new AddPartitionsTask();
|
||||
@ -1529,6 +1531,10 @@ public class HMSTransaction implements Transaction {
|
||||
MoreFutures.getFutureValue(future, RuntimeException.class);
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdownExecutorService() {
|
||||
updateStatisticsExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
public Status wrapperRenameDirWithProfileSummary(String origFilePath,
|
||||
|
||||
@ -682,7 +682,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
newTable.setParameters(newParams);
|
||||
client.client.alter_table(dbName, tableName, newTable);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName);
|
||||
throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -710,7 +710,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
modifiedPartition.setParameters(newParams);
|
||||
client.client.alter_partition(dbName, tableName, modifiedPartition);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName);
|
||||
throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -731,7 +731,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
|
||||
try (ThriftHMSClient client = getClient()) {
|
||||
client.client.dropPartition(dbName, tableName, partitionValues, deleteData);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("failed to drop partition for " + dbName + "." + tableName);
|
||||
throw new RuntimeException("failed to drop partition for " + dbName + "." + tableName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user