[fix](arrow-flight) Fix reach limit of connections error (#32911)

Fix Reach limit of connections error
in fe.conf , arrow_flight_token_cache_size is mandatory less than qe_max_connection/2. arrow flight sql is a stateless protocol, connection is usually not actively disconnected, bearer token is evict from the cache will unregister ConnectContext.

Fix ConnectContext.command not be reset to COM_SLEEP in time, this will result in frequent kill connection after query timeout.

Fix bearer token evict log and exception.

TODO: use arrow flight session: https://mail.google.com/mail/u/0/#inbox/FMfcgzGxRdxBLQLTcvvtRpqsvmhrHpdH
This commit is contained in:
Xinyi Zou
2024-03-28 10:00:30 +08:00
committed by yiguolei
parent d959291c98
commit 80cdc74908
6 changed files with 77 additions and 46 deletions

View File

@ -2330,10 +2330,16 @@ public class Config extends ConfigBase {
})
public static int autobucket_min_buckets = 1;
@ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为2000",
@ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为512, "
+ "并强制限制小于 qe_max_connection/2, 避免`Reach limit of connections`, "
+ "因为arrow flight sql是无状态的协议,连接通常不会主动断开,"
+ "bearer token 从 cache 淘汰的同时会 unregister Connection.",
"The cache limit of all user tokens in Arrow Flight Server. which will be eliminated by"
+ "LRU rules after exceeding the limit, the default value is 2000."})
public static int arrow_flight_token_cache_size = 2000;
+ "LRU rules after exceeding the limit, the default value is 512, the mandatory limit is "
+ "less than qe_max_connection/2 to avoid `Reach limit of connections`, "
+ "because arrow flight sql is a stateless protocol, the connection is usually not actively "
+ "disconnected, bearer token is evict from the cache will unregister ConnectContext."})
public static int arrow_flight_token_cache_size = 512;
@ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位分钟,默认值为4320,即3天",
"The alive time of the user token in Arrow Flight Server, expire after write, unit minutes,"

View File

@ -238,16 +238,16 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
connectContext.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
connectContext.setCommand(MysqlCommand.COM_SLEEP);
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
} catch (Exception e) {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
+ ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
} finally {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
}
}
@ -306,6 +306,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
executorService.submit(() -> {
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
try {
connectContext.setCommand(MysqlCommand.COM_QUERY);
final String query = request.getQuery();
String preparedStatementId = UUID.randomUUID().toString();
final ByteString handle = ByteString.copyFromUtf8(context.peerIdentity() + ":" + preparedStatementId);
@ -323,7 +324,6 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData))
.toByteArray()));
} catch (Exception e) {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "create prepared statement failed, " + e.getMessage() + ", "
+ Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode()
+ ", error msg: " + connectContext.getState().getErrorMessage();
@ -333,6 +333,8 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
} catch (final Throwable t) {
listener.onError(CallStatus.INTERNAL.withDescription("Unknown error: " + t).toRuntimeException());
return;
} finally {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
}
listener.onCompleted();
});

View File

@ -47,7 +47,11 @@ public class DorisFlightSqlService {
public DorisFlightSqlService(int port) {
BufferAllocator allocator = new RootAllocator();
Location location = Location.forGrpcInsecure(FrontendOptions.getLocalHostAddress(), port);
this.flightTokenManager = new FlightTokenManagerImpl(Config.arrow_flight_token_cache_size,
// arrow_flight_token_cache_size less than qe_max_connection to avoid `Reach limit of connections`.
// arrow flight sql is a stateless protocol, connection is usually not actively disconnected.
// bearer token is evict from the cache will unregister ConnectContext.
this.flightTokenManager = new FlightTokenManagerImpl(
Math.min(Config.arrow_flight_token_cache_size, Config.qe_max_connection / 2),
Config.arrow_flight_token_alive_time);
this.flightSessionsManager = new FlightSessionsWithTokenManager(flightTokenManager);

View File

@ -86,7 +86,7 @@ public class FlightBearerTokenAuthenticator implements CallHeaderAuthenticator {
return createAuthResultWithBearerToken(token);
} catch (IllegalArgumentException e) {
LOG.error("Bearer token validation failed.", e);
throw CallStatus.UNAUTHENTICATED.toRuntimeException();
throw CallStatus.UNAUTHENTICATED.withCause(e).withDescription(e.getMessage()).toRuntimeException();
}
}

View File

@ -47,42 +47,36 @@ public class FlightSessionsWithTokenManager implements FlightSessionsManager {
ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity);
if (null == connectContext) {
connectContext = createConnectContext(peerIdentity);
if (null == connectContext) {
flightTokenManager.invalidateToken(peerIdentity);
String err = "UserSession expire after access, need reauthorize.";
LOG.error(err);
throw CallStatus.UNAUTHENTICATED.withDescription(err).toRuntimeException();
}
return connectContext;
}
return connectContext;
} catch (Exception e) {
LOG.warn("getConnectContext failed, " + e.getMessage(), e);
LOG.warn("get ConnectContext failed, " + e.getMessage(), e);
throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
}
}
@Override
public ConnectContext createConnectContext(String peerIdentity) {
try {
final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity);
if (flightTokenDetails.getCreatedSession()) {
return null;
}
flightTokenDetails.setCreatedSession(true);
ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity,
flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp());
connectContext.setConnectionId(nextConnectionId.getAndAdd(1));
connectContext.resetLoginTime();
if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
connectContext.getState()
.setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, "Reach limit of connections");
throw CallStatus.UNAUTHENTICATED.withDescription("Reach limit of connections").toRuntimeException();
}
return connectContext;
} catch (IllegalArgumentException e) {
LOG.error("Bearer token validation failed.", e);
throw CallStatus.UNAUTHENTICATED.toRuntimeException();
final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity);
if (flightTokenDetails.getCreatedSession()) {
flightTokenManager.invalidateToken(peerIdentity);
throw new IllegalArgumentException("UserSession expire after access, try reconnect, bearer token: "
+ peerIdentity + ", a peerIdentity(bearer token) can only create a ConnectContext once. "
+ "if ConnectContext is deleted without operation for a long time, it needs to be reconnected "
+ "(at the same time obtain a new bearer token).");
}
flightTokenDetails.setCreatedSession(true);
ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity,
flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp());
connectContext.setConnectionId(nextConnectionId.getAndAdd(1));
connectContext.resetLoginTime();
if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
String err = "Reach limit of connections, increase `qe_max_connection` in fe.conf, or decrease "
+ "`arrow_flight_token_cache_size` to evict unused bearer tokens and it connections faster";
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, err);
throw new IllegalArgumentException(err);
}
return connectContext;
}
}

View File

@ -19,12 +19,16 @@
package org.apache.doris.service.arrowflight.tokens;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -39,17 +43,32 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
private static final Logger LOG = LogManager.getLogger(FlightTokenManagerImpl.class);
private final SecureRandom generator = new SecureRandom();
private final int cacheSize;
private final int cacheExpiration;
private LoadingCache<String, FlightTokenDetails> tokenCache;
public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) {
this.cacheSize = cacheSize;
this.cacheExpiration = cacheExpiration;
this.tokenCache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
this.tokenCache = CacheBuilder.newBuilder().maximumSize(cacheSize)
.expireAfterWrite(cacheExpiration, TimeUnit.MINUTES)
.build(new CacheLoader<String, FlightTokenDetails>() {
.removalListener(new RemovalListener<String, FlightTokenDetails>() {
@Override
public void onRemoval(RemovalNotification<String, FlightTokenDetails> notification) {
// TODO: broadcast this message to other FE
LOG.info("evict bearer token: " + notification.getKey() + ", reason: "
+ notification.getCause());
ConnectContext context = ExecuteEnv.getInstance().getScheduler()
.getContext(notification.getKey());
if (context != null) {
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
LOG.info("unregister flight connect context after evict bearer token: "
+ notification.getKey());
}
}
}).build(new CacheLoader<String, FlightTokenDetails>() {
@Override
public FlightTokenDetails load(String key) {
return new FlightTokenDetails();
@ -77,26 +96,32 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp());
tokenCache.put(token, flightTokenDetails);
LOG.trace("Created flight token for user: {}", username);
LOG.info("Created flight token for user: {}, token: {}", username, token);
return flightTokenDetails;
}
@Override
public FlightTokenDetails validateToken(final String token) throws IllegalArgumentException {
final FlightTokenDetails value = getTokenDetails(token);
if (System.currentTimeMillis() >= value.getExpiresAt()) {
tokenCache.invalidate(token); // removes from the store as well
throw new IllegalArgumentException("token expired");
if (value.getToken().equals("")) {
throw new IllegalArgumentException("invalid bearer token: " + token
+ ", try reconnect, bearer token may not be created, or may have been evict, search for this "
+ "token in fe.log to see the evict reason. currently in fe.conf, `arrow_flight_token_cache_size`="
+ this.cacheSize + ", `arrow_flight_token_alive_time`=" + this.cacheExpiration);
}
LOG.trace("Validated flight token for user: {}", value.getUsername());
if (System.currentTimeMillis() >= value.getExpiresAt()) {
tokenCache.invalidate(token);
throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, "
+ "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration);
}
LOG.info("Validated bearer token for user: {}", value.getUsername());
return value;
}
@Override
public void invalidateToken(final String token) {
LOG.trace("Invalidate flight token, {}", token);
tokenCache.invalidate(token); // removes from the store as well
LOG.info("Invalidate bearer token, {}", token);
tokenCache.invalidate(token);
}
private FlightTokenDetails getTokenDetails(final String token) {
@ -105,7 +130,7 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
try {
value = tokenCache.getUnchecked(token);
} catch (CacheLoader.InvalidCacheLoadException ignored) {
throw new IllegalArgumentException("invalid token");
throw new IllegalArgumentException("InvalidCacheLoadException, invalid bearer token: " + token);
}
return value;