From 80cdc749084fadfc7dbc78f5f2bf3495fa3975cb Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 28 Mar 2024 10:00:30 +0800 Subject: [PATCH] [fix](arrow-flight) Fix reach limit of connections error (#32911) Fix Reach limit of connections error in fe.conf , arrow_flight_token_cache_size is mandatory less than qe_max_connection/2. arrow flight sql is a stateless protocol, connection is usually not actively disconnected, bearer token is evict from the cache will unregister ConnectContext. Fix ConnectContext.command not be reset to COM_SLEEP in time, this will result in frequent kill connection after query timeout. Fix bearer token evict log and exception. TODO: use arrow flight session: https://mail.google.com/mail/u/0/#inbox/FMfcgzGxRdxBLQLTcvvtRpqsvmhrHpdH --- .../java/org/apache/doris/common/Config.java | 12 +++-- .../arrowflight/DorisFlightSqlProducer.java | 8 +-- .../arrowflight/DorisFlightSqlService.java | 6 ++- .../auth2/FlightBearerTokenAuthenticator.java | 2 +- .../FlightSessionsWithTokenManager.java | 46 ++++++++--------- .../tokens/FlightTokenManagerImpl.java | 49 ++++++++++++++----- 6 files changed, 77 insertions(+), 46 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 228acd5e88..5a6bdf1b6d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2330,10 +2330,16 @@ public class Config extends ConfigBase { }) public static int autobucket_min_buckets = 1; - @ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为2000", + @ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为512, " + + "并强制限制小于 qe_max_connection/2, 避免`Reach limit of connections`, " + + "因为arrow flight sql是无状态的协议,连接通常不会主动断开," + + "bearer token 从 cache 淘汰的同时会 unregister Connection.", "The cache limit of all user tokens in Arrow Flight Server. which will be eliminated by" - + "LRU rules after exceeding the limit, the default value is 2000."}) - public static int arrow_flight_token_cache_size = 2000; + + "LRU rules after exceeding the limit, the default value is 512, the mandatory limit is " + + "less than qe_max_connection/2 to avoid `Reach limit of connections`, " + + "because arrow flight sql is a stateless protocol, the connection is usually not actively " + + "disconnected, bearer token is evict from the cache will unregister ConnectContext."}) + public static int arrow_flight_token_cache_size = 512; @ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位分钟,默认值为4320,即3天", "The alive time of the user token in Arrow Flight Server, expire after write, unit minutes," diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 20ea583648..2c7aaae4f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -238,16 +238,16 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable connectContext.getResultFlightServerAddr().port); List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. - connectContext.setCommand(MysqlCommand.COM_SLEEP); return new FlightInfo(schema, descriptor, endpoints, -1, -1); } } catch (Exception e) { - connectContext.setCommand(MysqlCommand.COM_SLEEP); String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); LOG.warn(errMsg, e); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + } finally { + connectContext.setCommand(MysqlCommand.COM_SLEEP); } } @@ -306,6 +306,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable executorService.submit(() -> { ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); try { + connectContext.setCommand(MysqlCommand.COM_QUERY); final String query = request.getQuery(); String preparedStatementId = UUID.randomUUID().toString(); final ByteString handle = ByteString.copyFromUtf8(context.peerIdentity() + ":" + preparedStatementId); @@ -323,7 +324,6 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData)) .toByteArray())); } catch (Exception e) { - connectContext.setCommand(MysqlCommand.COM_SLEEP); String errMsg = "create prepared statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); @@ -333,6 +333,8 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable } catch (final Throwable t) { listener.onError(CallStatus.INTERNAL.withDescription("Unknown error: " + t).toRuntimeException()); return; + } finally { + connectContext.setCommand(MysqlCommand.COM_SLEEP); } listener.onCompleted(); }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java index fe5a60f0cc..a8ad9a05c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java @@ -47,7 +47,11 @@ public class DorisFlightSqlService { public DorisFlightSqlService(int port) { BufferAllocator allocator = new RootAllocator(); Location location = Location.forGrpcInsecure(FrontendOptions.getLocalHostAddress(), port); - this.flightTokenManager = new FlightTokenManagerImpl(Config.arrow_flight_token_cache_size, + // arrow_flight_token_cache_size less than qe_max_connection to avoid `Reach limit of connections`. + // arrow flight sql is a stateless protocol, connection is usually not actively disconnected. + // bearer token is evict from the cache will unregister ConnectContext. + this.flightTokenManager = new FlightTokenManagerImpl( + Math.min(Config.arrow_flight_token_cache_size, Config.qe_max_connection / 2), Config.arrow_flight_token_alive_time); this.flightSessionsManager = new FlightSessionsWithTokenManager(flightTokenManager); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java index ef6e28b034..9f4479c6bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java @@ -86,7 +86,7 @@ public class FlightBearerTokenAuthenticator implements CallHeaderAuthenticator { return createAuthResultWithBearerToken(token); } catch (IllegalArgumentException e) { LOG.error("Bearer token validation failed.", e); - throw CallStatus.UNAUTHENTICATED.toRuntimeException(); + throw CallStatus.UNAUTHENTICATED.withCause(e).withDescription(e.getMessage()).toRuntimeException(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java index 26a48f0cfd..b7e5ffa464 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java @@ -47,42 +47,36 @@ public class FlightSessionsWithTokenManager implements FlightSessionsManager { ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity); if (null == connectContext) { connectContext = createConnectContext(peerIdentity); - if (null == connectContext) { - flightTokenManager.invalidateToken(peerIdentity); - String err = "UserSession expire after access, need reauthorize."; - LOG.error(err); - throw CallStatus.UNAUTHENTICATED.withDescription(err).toRuntimeException(); - } return connectContext; } return connectContext; } catch (Exception e) { - LOG.warn("getConnectContext failed, " + e.getMessage(), e); + LOG.warn("get ConnectContext failed, " + e.getMessage(), e); throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException(); } } @Override public ConnectContext createConnectContext(String peerIdentity) { - try { - final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity); - if (flightTokenDetails.getCreatedSession()) { - return null; - } - flightTokenDetails.setCreatedSession(true); - ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity, - flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp()); - connectContext.setConnectionId(nextConnectionId.getAndAdd(1)); - connectContext.resetLoginTime(); - if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) { - connectContext.getState() - .setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, "Reach limit of connections"); - throw CallStatus.UNAUTHENTICATED.withDescription("Reach limit of connections").toRuntimeException(); - } - return connectContext; - } catch (IllegalArgumentException e) { - LOG.error("Bearer token validation failed.", e); - throw CallStatus.UNAUTHENTICATED.toRuntimeException(); + final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity); + if (flightTokenDetails.getCreatedSession()) { + flightTokenManager.invalidateToken(peerIdentity); + throw new IllegalArgumentException("UserSession expire after access, try reconnect, bearer token: " + + peerIdentity + ", a peerIdentity(bearer token) can only create a ConnectContext once. " + + "if ConnectContext is deleted without operation for a long time, it needs to be reconnected " + + "(at the same time obtain a new bearer token)."); } + flightTokenDetails.setCreatedSession(true); + ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity, + flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp()); + connectContext.setConnectionId(nextConnectionId.getAndAdd(1)); + connectContext.resetLoginTime(); + if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) { + String err = "Reach limit of connections, increase `qe_max_connection` in fe.conf, or decrease " + + "`arrow_flight_token_cache_size` to evict unused bearer tokens and it connections faster"; + connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, err); + throw new IllegalArgumentException(err); + } + return connectContext; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java index 54e53e931d..cd1b492de0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java @@ -19,12 +19,16 @@ package org.apache.doris.service.arrowflight.tokens; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.arrowflight.auth2.FlightAuthResult; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -39,17 +43,32 @@ public class FlightTokenManagerImpl implements FlightTokenManager { private static final Logger LOG = LogManager.getLogger(FlightTokenManagerImpl.class); private final SecureRandom generator = new SecureRandom(); + private final int cacheSize; private final int cacheExpiration; private LoadingCache tokenCache; public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) { + this.cacheSize = cacheSize; this.cacheExpiration = cacheExpiration; - this.tokenCache = CacheBuilder.newBuilder() - .maximumSize(cacheSize) + this.tokenCache = CacheBuilder.newBuilder().maximumSize(cacheSize) .expireAfterWrite(cacheExpiration, TimeUnit.MINUTES) - .build(new CacheLoader() { + .removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + // TODO: broadcast this message to other FE + LOG.info("evict bearer token: " + notification.getKey() + ", reason: " + + notification.getCause()); + ConnectContext context = ExecuteEnv.getInstance().getScheduler() + .getContext(notification.getKey()); + if (context != null) { + ExecuteEnv.getInstance().getScheduler().unregisterConnection(context); + LOG.info("unregister flight connect context after evict bearer token: " + + notification.getKey()); + } + } + }).build(new CacheLoader() { @Override public FlightTokenDetails load(String key) { return new FlightTokenDetails(); @@ -77,26 +96,32 @@ public class FlightTokenManagerImpl implements FlightTokenManager { flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp()); tokenCache.put(token, flightTokenDetails); - LOG.trace("Created flight token for user: {}", username); + LOG.info("Created flight token for user: {}, token: {}", username, token); return flightTokenDetails; } @Override public FlightTokenDetails validateToken(final String token) throws IllegalArgumentException { final FlightTokenDetails value = getTokenDetails(token); - if (System.currentTimeMillis() >= value.getExpiresAt()) { - tokenCache.invalidate(token); // removes from the store as well - throw new IllegalArgumentException("token expired"); + if (value.getToken().equals("")) { + throw new IllegalArgumentException("invalid bearer token: " + token + + ", try reconnect, bearer token may not be created, or may have been evict, search for this " + + "token in fe.log to see the evict reason. currently in fe.conf, `arrow_flight_token_cache_size`=" + + this.cacheSize + ", `arrow_flight_token_alive_time`=" + this.cacheExpiration); } - - LOG.trace("Validated flight token for user: {}", value.getUsername()); + if (System.currentTimeMillis() >= value.getExpiresAt()) { + tokenCache.invalidate(token); + throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, " + + "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration); + } + LOG.info("Validated bearer token for user: {}", value.getUsername()); return value; } @Override public void invalidateToken(final String token) { - LOG.trace("Invalidate flight token, {}", token); - tokenCache.invalidate(token); // removes from the store as well + LOG.info("Invalidate bearer token, {}", token); + tokenCache.invalidate(token); } private FlightTokenDetails getTokenDetails(final String token) { @@ -105,7 +130,7 @@ public class FlightTokenManagerImpl implements FlightTokenManager { try { value = tokenCache.getUnchecked(token); } catch (CacheLoader.InvalidCacheLoadException ignored) { - throw new IllegalArgumentException("invalid token"); + throw new IllegalArgumentException("InvalidCacheLoadException, invalid bearer token: " + token); } return value;