branch-2.1: [fix](iceberg)Fix the thread pool issue used for commit. #51508 (#51528)

Cherry-picked from #51508

---------

Co-authored-by: wuwenchi <wuwenchi@selectdb.com>
Co-authored-by: Mingyu Chen (Rayner) <yunyou@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-07-11 06:31:40 +08:00
committed by GitHub
parent 6e24d2c66e
commit 03994b8d93
6 changed files with 100 additions and 2 deletions

View File

@ -18,6 +18,7 @@
package org.apache.doris.common;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricLabel;
@ -33,6 +34,7 @@ import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@ -68,6 +70,7 @@ import java.util.function.Supplier;
*/
public class ThreadPoolManager {
private static final Logger LOG = LogManager.getLogger(ThreadPoolManager.class);
private static Map<String, ThreadPoolExecutor> nameToThreadPoolMap = Maps.newConcurrentMap();
@ -126,6 +129,50 @@ public class ThreadPoolManager {
new LogDiscardPolicyThrowException(poolName), poolName, needRegisterMetric);
}
public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth(
int numThread,
int queueSize,
String poolName,
boolean needRegisterMetric,
PreExecutionAuthenticator preAuth) {
return newDaemonThreadPoolWithPreAuth(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, 60),
poolName, needRegisterMetric, preAuth);
}
public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler,
String poolName,
boolean needRegisterMetric,
PreExecutionAuthenticator preAuth) {
ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, preAuth);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue, threadFactory, handler);
if (needRegisterMetric) {
nameToThreadPoolMap.put(poolName, threadPool);
}
return threadPool;
}
private static ThreadFactory namedThreadFactoryWithPreAuth(String poolName, PreExecutionAuthenticator preAuth) {
return new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(poolName + "-%d")
.setThreadFactory(runnable -> new Thread(() -> {
try {
preAuth.execute(runnable);
} catch (Exception e) {
throw new RuntimeException(e);
}
}))
.build();
}
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
int queueSize, String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
@ -377,4 +424,25 @@ public class ThreadPoolManager {
}
}
}
public static void shutdownExecutorService(ExecutorService executorService) {
// Disable new tasks from being submitted
executorService.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
// Cancel currently executing tasks
executorService.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOG.warn("ExecutorService did not terminate");
}
}
} catch (InterruptedException e) {
// (Re-)Cancel if current thread also interrupted
executorService.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.io.Text;
@ -91,6 +92,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
/**
@ -117,6 +119,8 @@ public abstract class ExternalCatalog
CREATE_TIME,
USE_META_CACHE);
protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors();
// Unique id of this catalog, will be assigned after catalog is loaded.
@SerializedName(value = "id")
protected long id;
@ -157,6 +161,7 @@ public abstract class ExternalCatalog
protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
protected PreExecutionAuthenticator preExecutionAuthenticator;
protected ThreadPoolExecutor threadPoolWithPreAuth;
private volatile Configuration cachedConf = null;
private byte[] confLock = new byte[0];
@ -759,6 +764,9 @@ public abstract class ExternalCatalog
if (null != transactionManager) {
transactionManager = null;
}
if (threadPoolWithPreAuth != null) {
ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth);
}
CatalogIf.super.onClose();
}
@ -1170,4 +1178,8 @@ public abstract class ExternalCatalog
tableAutoAnalyzePolicy.put(key, policy);
}
}
public ThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolWithPreAuth;
}
}

View File

@ -197,6 +197,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
String.valueOf(Config.hive_metastore_client_timeout_second));
}
HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
Integer.MAX_VALUE,
String.format("hms_iceberg_catalog_%s_executor_pool", name),
true,
preExecutionAuthenticator);
FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
this.bindBrokerName(), this.catalogProperty.getHadoopProperties());
this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,

View File

@ -17,6 +17,7 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
@ -65,6 +66,12 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
initCatalog();
IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops);
threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
Integer.MAX_VALUE,
String.format("iceberg_catalog_%s_executor_pool", name),
true,
preExecutionAuthenticator);
metadataOps = ops;
}

View File

@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
public class IcebergMetadataOps implements ExternalMetadataOps {
@ -291,4 +292,8 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
private Namespace getNamespace() {
return externalCatalogName.map(Namespace::of).orElseGet(() -> Namespace.empty());
}
public ThreadPoolExecutor getThreadPoolWithPreAuth() {
return dorisCatalog.getThreadPoolExecutor();
}
}

View File

@ -155,7 +155,7 @@ public class IcebergTransaction implements Transaction {
private void commitAppendTxn(Table table, List<WriteResult> pendingResults) {
// commit append files.
AppendFiles appendFiles = table.newAppend();
AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
for (WriteResult result : pendingResults) {
Preconditions.checkState(result.referencedDataFiles().length == 0,
"Should have no referenced data files for append.");
@ -171,7 +171,7 @@ public class IcebergTransaction implements Transaction {
// 1. if dst_tb is a partitioned table, it will return directly.
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied.
if (!table.spec().isPartitioned()) {
OverwriteFiles overwriteFiles = table.newOverwrite();
OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file()));
} catch (IOException e) {