From 03994b8d9397100f4526b4fc6b03eac828ba5699 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 11 Jul 2025 06:31:40 +0800 Subject: [PATCH] branch-2.1: [fix](iceberg)Fix the thread pool issue used for commit. #51508 (#51528) Cherry-picked from #51508 --------- Co-authored-by: wuwenchi Co-authored-by: Mingyu Chen (Rayner) --- .../doris/common/ThreadPoolManager.java | 68 +++++++++++++++++++ .../doris/datasource/ExternalCatalog.java | 12 ++++ .../datasource/hive/HMSExternalCatalog.java | 6 ++ .../iceberg/IcebergExternalCatalog.java | 7 ++ .../iceberg/IcebergMetadataOps.java | 5 ++ .../iceberg/IcebergTransaction.java | 4 +- 6 files changed, 100 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index 3be5af8ac5..0061e3702a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -18,6 +18,7 @@ package org.apache.doris.common; +import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.metric.Metric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricLabel; @@ -33,6 +34,7 @@ import java.util.Comparator; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -68,6 +70,7 @@ import java.util.function.Supplier; */ public class ThreadPoolManager { + private static final Logger LOG = LogManager.getLogger(ThreadPoolManager.class); private static Map nameToThreadPoolMap = Maps.newConcurrentMap(); @@ -126,6 +129,50 @@ public class ThreadPoolManager { new LogDiscardPolicyThrowException(poolName), poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth( + int numThread, + int queueSize, + String poolName, + boolean needRegisterMetric, + PreExecutionAuthenticator preAuth) { + return newDaemonThreadPoolWithPreAuth(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, 60), + poolName, needRegisterMetric, preAuth); + } + + public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth( + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler, + String poolName, + boolean needRegisterMetric, + PreExecutionAuthenticator preAuth) { + ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, preAuth); + ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, + keepAliveTime, unit, workQueue, threadFactory, handler); + if (needRegisterMetric) { + nameToThreadPoolMap.put(poolName, threadPool); + } + return threadPool; + } + + private static ThreadFactory namedThreadFactoryWithPreAuth(String poolName, PreExecutionAuthenticator preAuth) { + return new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(poolName + "-%d") + .setThreadFactory(runnable -> new Thread(() -> { + try { + preAuth.execute(runnable); + } catch (Exception e) { + throw new RuntimeException(e); + } + })) + .build(); + } + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, @@ -377,4 +424,25 @@ public class ThreadPoolManager { } } } + + public static void shutdownExecutorService(ExecutorService executorService) { + // Disable new tasks from being submitted + executorService.shutdown(); + try { + // Wait a while for existing tasks to terminate + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + // Cancel currently executing tasks + executorService.shutdownNow(); + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + LOG.warn("ExecutorService did not terminate"); + } + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index bd3d966ac3..6052be35f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -35,6 +35,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.io.Text; @@ -91,6 +92,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; /** @@ -117,6 +119,8 @@ public abstract class ExternalCatalog CREATE_TIME, USE_META_CACHE); + protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors(); + // Unique id of this catalog, will be assigned after catalog is loaded. @SerializedName(value = "id") protected long id; @@ -157,6 +161,7 @@ public abstract class ExternalCatalog protected Optional useMetaCache = Optional.empty(); protected MetaCache> metaCache; protected PreExecutionAuthenticator preExecutionAuthenticator; + protected ThreadPoolExecutor threadPoolWithPreAuth; private volatile Configuration cachedConf = null; private byte[] confLock = new byte[0]; @@ -759,6 +764,9 @@ public abstract class ExternalCatalog if (null != transactionManager) { transactionManager = null; } + if (threadPoolWithPreAuth != null) { + ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth); + } CatalogIf.super.onClose(); } @@ -1170,4 +1178,8 @@ public abstract class ExternalCatalog tableAutoAnalyzePolicy.put(key, policy); } } + + public ThreadPoolExecutor getThreadPoolExecutor() { + return threadPoolWithPreAuth; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index defa2a8ebe..73598adfca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -197,6 +197,12 @@ public class HMSExternalCatalog extends ExternalCatalog { String.valueOf(Config.hive_metastore_client_timeout_second)); } HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("hms_iceberg_catalog_%s_executor_pool", name), + true, + preExecutionAuthenticator); FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(), this.bindBrokerName(), this.catalogProperty.getHadoopProperties()); this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 30590f5af2..e25199adfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; @@ -65,6 +66,12 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { initCatalog(); IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("iceberg_catalog_%s_executor_pool", name), + true, + preExecutionAuthenticator); metadataOps = ops; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 4c06068aa3..78c4ef8fec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -49,6 +49,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; public class IcebergMetadataOps implements ExternalMetadataOps { @@ -291,4 +292,8 @@ public class IcebergMetadataOps implements ExternalMetadataOps { private Namespace getNamespace() { return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty()); } + + public ThreadPoolExecutor getThreadPoolWithPreAuth() { + return dorisCatalog.getThreadPoolExecutor(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index e36db86022..797caea0de 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -155,7 +155,7 @@ public class IcebergTransaction implements Transaction { private void commitAppendTxn(Table table, List pendingResults) { // commit append files. - AppendFiles appendFiles = table.newAppend(); + AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); @@ -171,7 +171,7 @@ public class IcebergTransaction implements Transaction { // 1. if dst_tb is a partitioned table, it will return directly. // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. if (!table.spec().isPartitioned()) { - OverwriteFiles overwriteFiles = table.newOverwrite(); + OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth()); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); } catch (IOException e) {