[fix](connections) fix connection hang after too many connections (#31594)
Issue Number: close #31569 Fix fe connection hang after too high qps After fix, the third SQL will return error instead of hang: ERROR 1203 (HY000): #42000Too many connections
This commit is contained in:
@ -32,6 +32,7 @@ import org.xnio.StreamConnection;
|
||||
import org.xnio.channels.AcceptingChannel;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
/**
|
||||
* listener for accept mysql connections.
|
||||
@ -64,57 +65,65 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
|
||||
context.setEnv(Env.getCurrentEnv());
|
||||
connectScheduler.submit(context);
|
||||
|
||||
channel.getWorker().execute(() -> {
|
||||
try {
|
||||
// Set thread local info
|
||||
context.setThreadLocalInfo();
|
||||
context.setConnectScheduler(connectScheduler);
|
||||
// authenticate check failed.
|
||||
if (!MysqlProto.negotiate(context)) {
|
||||
throw new AfterConnectedException("mysql negotiate failed");
|
||||
}
|
||||
if (connectScheduler.registerConnection(context)) {
|
||||
MysqlProto.sendResponsePacket(context);
|
||||
connection.setCloseListener(streamConnection -> connectScheduler.unregisterConnection(context));
|
||||
} else {
|
||||
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
|
||||
"Reach limit of connections");
|
||||
MysqlProto.sendResponsePacket(context);
|
||||
throw new AfterConnectedException("Reach limit of connections");
|
||||
}
|
||||
context.setStartTime();
|
||||
int userQueryTimeout = context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser());
|
||||
if (userQueryTimeout <= 0) {
|
||||
LOG.warn("Connection set query timeout to {}",
|
||||
context.getSessionVariable().getQueryTimeoutS());
|
||||
}
|
||||
context.setUserQueryTimeout(userQueryTimeout);
|
||||
context.setUserInsertTimeout(
|
||||
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
|
||||
ConnectProcessor processor = new MysqlConnectProcessor(context);
|
||||
context.startAcceptQuery(processor);
|
||||
} catch (AfterConnectedException e) {
|
||||
// do not need to print log for this kind of exception.
|
||||
// just clean up the context;
|
||||
context.cleanup();
|
||||
} catch (Throwable e) {
|
||||
// should be unexpected exception, so print warn log
|
||||
if (context.getCurrentUserIdentity() != null) {
|
||||
LOG.warn("connect processor exception because ", e);
|
||||
} else if (e instanceof Error) {
|
||||
LOG.error("connect processor exception because ", e);
|
||||
} else {
|
||||
// for unauthrorized access such lvs probe request,
|
||||
// may cause exception, just log it in debug level
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("connect processor exception because ", e);
|
||||
try {
|
||||
channel.getWorker().execute(() -> {
|
||||
try {
|
||||
// Set thread local info
|
||||
context.setThreadLocalInfo();
|
||||
context.setConnectScheduler(connectScheduler);
|
||||
// authenticate check failed.
|
||||
if (!MysqlProto.negotiate(context)) {
|
||||
throw new AfterConnectedException("mysql negotiate failed");
|
||||
}
|
||||
if (connectScheduler.registerConnection(context)) {
|
||||
MysqlProto.sendResponsePacket(context);
|
||||
connection.setCloseListener(
|
||||
streamConnection -> connectScheduler.unregisterConnection(context));
|
||||
} else {
|
||||
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
|
||||
"Reach limit of connections");
|
||||
MysqlProto.sendResponsePacket(context);
|
||||
throw new AfterConnectedException("Reach limit of connections");
|
||||
}
|
||||
context.setStartTime();
|
||||
int userQueryTimeout = context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser());
|
||||
if (userQueryTimeout <= 0) {
|
||||
LOG.warn("Connection set query timeout to {}",
|
||||
context.getSessionVariable().getQueryTimeoutS());
|
||||
}
|
||||
context.setUserQueryTimeout(userQueryTimeout);
|
||||
context.setUserInsertTimeout(
|
||||
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
|
||||
ConnectProcessor processor = new MysqlConnectProcessor(context);
|
||||
context.startAcceptQuery(processor);
|
||||
} catch (AfterConnectedException e) {
|
||||
// do not need to print log for this kind of exception.
|
||||
// just clean up the context;
|
||||
context.cleanup();
|
||||
} catch (Throwable e) {
|
||||
// should be unexpected exception, so print warn log
|
||||
if (context.getCurrentUserIdentity() != null) {
|
||||
LOG.warn("connect processor exception because ", e);
|
||||
} else if (e instanceof Error) {
|
||||
LOG.error("connect processor exception because ", e);
|
||||
} else {
|
||||
// for unauthrorized access such lvs probe request,
|
||||
// may cause exception, just log it in debug level
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("connect processor exception because ", e);
|
||||
}
|
||||
}
|
||||
context.cleanup();
|
||||
} finally {
|
||||
ConnectContext.remove();
|
||||
}
|
||||
context.cleanup();
|
||||
} finally {
|
||||
ConnectContext.remove();
|
||||
}
|
||||
});
|
||||
});
|
||||
} catch (RejectedExecutionException e) {
|
||||
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
|
||||
"Too many connections");
|
||||
MysqlProto.sendResponsePacket(context);
|
||||
context.cleanup();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Connection accept failed.", e);
|
||||
}
|
||||
|
||||
@ -51,7 +51,7 @@ public class MysqlServer {
|
||||
private AcceptingChannel<StreamConnection> server;
|
||||
|
||||
// default task service.
|
||||
private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPool(
|
||||
private ExecutorService taskService = ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
|
||||
Config.max_mysql_service_task_threads_num, "mysql-nio-pool", true);
|
||||
|
||||
public MysqlServer(int port, ConnectScheduler connectScheduler) {
|
||||
|
||||
Reference in New Issue
Block a user