From 75efdd6e1fa721e1bb32450150ad437b6d1be1a1 Mon Sep 17 00:00:00 2001 From: xueweizhang Date: Sat, 6 Jan 2024 16:17:07 +0800 Subject: [PATCH] [fix](http) throw RejectedExecutionException to prevent http hanging by Future (#29607) --- docs/en/docs/admin-manual/config/fe-config.md | 12 ++++++++ .../docs/admin-manual/config/fe-config.md | 12 ++++++++ .../java/org/apache/doris/common/Config.java | 12 ++++++++ .../doris/common/ThreadPoolManager.java | 28 +++++++++++++++++-- .../doris/httpv2/util/LoadSubmitter.java | 4 ++- .../doris/httpv2/util/StatementSubmitter.java | 3 +- 6 files changed, 67 insertions(+), 4 deletions(-) diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index ae3c0645be..2c35a11d2d 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -688,6 +688,18 @@ Default:1048576 (1M) http header size configuration parameter, the default value is 1M. +#### `http_sql_submitter_max_worker_threads` + +Default:2 + +The max number work threads of http sql submitter + +#### `http_load_submitter_max_worker_threads` + +Default:2 + +The max number work threads of http upload submitter + ### Query Engine #### `default_max_query_instances` diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index 580cfe7fee..2d50285ded 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -688,6 +688,18 @@ workers 线程池默认不做设置,根据自己需要进行设置 http header size 配置参数 +#### `http_sql_submitter_max_worker_threads` + +默认值:2 + +http请求处理/api/query中sql任务的最大线程池 + +#### `http_load_submitter_max_worker_threads` + +默认值:2 + +http请求处理/api/upload任务的最大线程池 + ### 查询引擎 #### `default_max_query_instances` diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 7b9859ae45..d32d85d843 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2395,4 +2395,16 @@ public class Config extends ConfigBase { public static boolean enable_profile_when_analyze = false; @ConfField(mutable = true) public static boolean enable_collect_internal_query_profile = false; + + @ConfField(mutable = false, masterOnly = false, description = { + "http请求处理/api/query中sql任务的最大线程池。", + "The max number work threads of http sql submitter." + }) + public static int http_sql_submitter_max_worker_threads = 2; + + @ConfField(mutable = false, masterOnly = false, description = { + "http请求处理/api/upload任务的最大线程池。", + "The max number work threads of http upload submitter." + }) + public static int http_load_submitter_max_worker_threads = 2; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index c4fa964659..ed3c32dba1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -119,6 +119,13 @@ public class ThreadPoolManager { new LogDiscardPolicy(poolName), poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonCacheThreadPoolThrowException(int maxNumThread, + String poolName, boolean needRegisterMetric) { + return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME, + TimeUnit.SECONDS, new SynchronousQueue(), + new LogDiscardPolicyThrowException(poolName), poolName, needRegisterMetric); + } + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, String poolName, boolean needRegisterMetric) { return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, @@ -285,8 +292,8 @@ public class ThreadPoolManager { private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class); - private String threadPoolName; - private AtomicLong rejectedNum; + public String threadPoolName; + public AtomicLong rejectedNum; public LogDiscardPolicy(String threadPoolName) { this.threadPoolName = threadPoolName; @@ -300,6 +307,23 @@ public class ThreadPoolManager { } } + static class LogDiscardPolicyThrowException extends LogDiscardPolicy { + + private static final Logger LOG = LogManager.getLogger(LogDiscardPolicyThrowException.class); + + public LogDiscardPolicyThrowException(String threadPoolName) { + super(threadPoolName); + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString()); + this.rejectedNum.incrementAndGet(); + throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + + threadPoolName + " " + executor.toString()); + } + } + /** * A handler for rejected task that try to be blocked until the pool enqueue task succeed or timeout, * used for fixed thread pool diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java index bf23138e0d..fc5f85743b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/LoadSubmitter.java @@ -19,6 +19,7 @@ package org.apache.doris.httpv2.util; import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.NetUtils; @@ -53,7 +54,8 @@ import java.util.concurrent.ThreadPoolExecutor; public class LoadSubmitter { private static final Logger LOG = LogManager.getLogger(LoadSubmitter.class); - private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(2, "load-submitter", true); + private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( + Config.http_load_submitter_max_worker_threads, "load-submitter", true); private static final String STREAM_LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java index 83c5e075f2..76f6a63b71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java @@ -73,7 +73,8 @@ public class StatementSubmitter { private static final String JDBC_DRIVER = "org.mariadb.jdbc.Driver"; private static final String DB_URL_PATTERN = "jdbc:mariadb://127.0.0.1:%d/%s"; - private final ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(2, "SQL submitter", true); + private final ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException( + Config.http_sql_submitter_max_worker_threads, "SQL submitter", true); public Future submit(StmtContext queryCtx) { Worker worker = new Worker(ConnectContext.get(), queryCtx);