[fix](http) throw RejectedExecutionException to prevent http hanging by Future (#29607)

This commit is contained in:
xueweizhang
2024-01-06 16:17:07 +08:00
committed by GitHub
parent 2c888667ed
commit 75efdd6e1f
6 changed files with 67 additions and 4 deletions

View File

@ -119,6 +119,13 @@ public class ThreadPoolManager {
new LogDiscardPolicy(poolName), poolName, needRegisterMetric);
}
public static ThreadPoolExecutor newDaemonCacheThreadPoolThrowException(int maxNumThread,
String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME,
TimeUnit.SECONDS, new SynchronousQueue(),
new LogDiscardPolicyThrowException(poolName), poolName, needRegisterMetric);
}
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
int queueSize, String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
@ -285,8 +292,8 @@ public class ThreadPoolManager {
private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);
private String threadPoolName;
private AtomicLong rejectedNum;
public String threadPoolName;
public AtomicLong rejectedNum;
public LogDiscardPolicy(String threadPoolName) {
this.threadPoolName = threadPoolName;
@ -300,6 +307,23 @@ public class ThreadPoolManager {
}
}
static class LogDiscardPolicyThrowException extends LogDiscardPolicy {
private static final Logger LOG = LogManager.getLogger(LogDiscardPolicyThrowException.class);
public LogDiscardPolicyThrowException(String threadPoolName) {
super(threadPoolName);
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
this.rejectedNum.incrementAndGet();
throw new RejectedExecutionException("Task " + r.toString() + " rejected from "
+ threadPoolName + " " + executor.toString());
}
}
/**
* A handler for rejected task that try to be blocked until the pool enqueue task succeed or timeout,
* used for fixed thread pool

View File

@ -19,6 +19,7 @@ package org.apache.doris.httpv2.util;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.NetUtils;
@ -53,7 +54,8 @@ import java.util.concurrent.ThreadPoolExecutor;
public class LoadSubmitter {
private static final Logger LOG = LogManager.getLogger(LoadSubmitter.class);
private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(2, "load-submitter", true);
private ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
Config.http_load_submitter_max_worker_threads, "load-submitter", true);
private static final String STREAM_LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";

View File

@ -73,7 +73,8 @@ public class StatementSubmitter {
private static final String JDBC_DRIVER = "org.mariadb.jdbc.Driver";
private static final String DB_URL_PATTERN = "jdbc:mariadb://127.0.0.1:%d/%s";
private final ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(2, "SQL submitter", true);
private final ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
Config.http_sql_submitter_max_worker_threads, "SQL submitter", true);
public Future<ExecutionResultSet> submit(StmtContext queryCtx) {
Worker worker = new Worker(ConnectContext.get(), queryCtx);