[branch-2.1](arrow-flight-sql) Fix exceed user property max connection cause Reach limit of connections (#39836)
pick #39127 pick #39802
This commit is contained in:
@ -57,6 +57,10 @@ public class DorisFlightSqlService {
|
||||
DorisFlightSqlProducer producer = new DorisFlightSqlProducer(location, flightSessionsManager);
|
||||
flightServer = FlightServer.builder(allocator, location, producer)
|
||||
.headerAuthenticator(new FlightBearerTokenAuthenticator(flightTokenManager)).build();
|
||||
LOG.info("Arrow Flight SQL service is created, port: {}, token_cache_size: {}"
|
||||
+ ", qe_max_connection: {}, token_alive_time: {}",
|
||||
port, Config.arrow_flight_token_cache_size, Config.qe_max_connection,
|
||||
Config.arrow_flight_token_alive_time);
|
||||
}
|
||||
|
||||
// start Arrow Flight SQL service, return true if success, otherwise false
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
|
||||
package org.apache.doris.service.arrowflight.tokens;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.service.ExecuteEnv;
|
||||
import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;
|
||||
@ -31,9 +32,12 @@ import com.google.common.cache.RemovalListener;
|
||||
import com.google.common.cache.RemovalNotification;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
@ -46,7 +50,9 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
|
||||
private final int cacheSize;
|
||||
private final int cacheExpiration;
|
||||
|
||||
private LoadingCache<String, FlightTokenDetails> tokenCache;
|
||||
private final LoadingCache<String, FlightTokenDetails> tokenCache;
|
||||
// <username, <token, 1>>
|
||||
private final ConcurrentHashMap<String, LoadingCache<String, Integer>> usersTokenLRU = new ConcurrentHashMap<>();
|
||||
|
||||
public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) {
|
||||
this.cacheSize = cacheSize;
|
||||
@ -56,17 +62,19 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
|
||||
.expireAfterWrite(cacheExpiration, TimeUnit.MINUTES)
|
||||
.removalListener(new RemovalListener<String, FlightTokenDetails>() {
|
||||
@Override
|
||||
public void onRemoval(RemovalNotification<String, FlightTokenDetails> notification) {
|
||||
public void onRemoval(@NotNull RemovalNotification<String, FlightTokenDetails> notification) {
|
||||
// TODO: broadcast this message to other FE
|
||||
LOG.info("evict bearer token: " + notification.getKey() + ", reason: "
|
||||
String token = notification.getKey();
|
||||
FlightTokenDetails tokenDetails = notification.getValue();
|
||||
LOG.info("evict bearer token: " + token + ", reason: token number exceeded, "
|
||||
+ notification.getCause());
|
||||
ConnectContext context = ExecuteEnv.getInstance().getScheduler()
|
||||
.getContext(notification.getKey());
|
||||
.getContext(token);
|
||||
if (context != null) {
|
||||
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
|
||||
LOG.info("unregister flight connect context after evict bearer token: "
|
||||
+ notification.getKey());
|
||||
LOG.info("unregister flight connect context after evict bearer token: " + token);
|
||||
}
|
||||
usersTokenLRU.get(tokenDetails.getUsername()).invalidate(token);
|
||||
}
|
||||
}).build(new CacheLoader<String, FlightTokenDetails>() {
|
||||
@Override
|
||||
@ -96,6 +104,29 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
|
||||
flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp());
|
||||
|
||||
tokenCache.put(token, flightTokenDetails);
|
||||
if (!usersTokenLRU.containsKey(username)) {
|
||||
// TODO Modify usersTokenLRU size when user property maxConn changes. but LoadingCache currently not
|
||||
// support modify.
|
||||
usersTokenLRU.put(username,
|
||||
CacheBuilder.newBuilder().maximumSize(Env.getCurrentEnv().getAuth().getMaxConn(username) / 2)
|
||||
.removalListener(new RemovalListener<String, Integer>() {
|
||||
@Override
|
||||
public void onRemoval(@NotNull RemovalNotification<String, Integer> notification) {
|
||||
// TODO: broadcast this message to other FE
|
||||
assert notification.getKey() != null;
|
||||
tokenCache.invalidate(notification.getKey());
|
||||
LOG.info("evict bearer token: " + notification.getKey()
|
||||
+ ", reason: user connection exceeded, " + notification.getCause());
|
||||
}
|
||||
}).build(new CacheLoader<String, Integer>() {
|
||||
@NotNull
|
||||
@Override
|
||||
public Integer load(@NotNull String key) {
|
||||
return 1;
|
||||
}
|
||||
}));
|
||||
}
|
||||
usersTokenLRU.get(username).put(token, 1);
|
||||
LOG.info("Created flight token for user: {}, token: {}", username, token);
|
||||
return flightTokenDetails;
|
||||
}
|
||||
@ -114,6 +145,16 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
|
||||
throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, "
|
||||
+ "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration);
|
||||
}
|
||||
if (usersTokenLRU.containsKey(value.getUsername())) {
|
||||
try {
|
||||
usersTokenLRU.get(value.getUsername()).get(token);
|
||||
} catch (ExecutionException ignored) {
|
||||
throw new IllegalArgumentException("usersTokenLRU not exist bearer token: " + token);
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"bearer token not created: " + token + ", username: " + value.getUsername());
|
||||
}
|
||||
LOG.info("Validated bearer token for user: {}", value.getUsername());
|
||||
return value;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user