[improvement](query) Improve fe high concurrent query performance (#7936)
This commit is contained in:
@ -46,10 +46,10 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
public class ConnectScheduler {
|
||||
private static final Logger LOG = LogManager.getLogger(ConnectScheduler.class);
|
||||
private int maxConnections;
|
||||
private int numberConnection;
|
||||
private AtomicInteger numberConnection;
|
||||
private AtomicInteger nextConnectionId;
|
||||
private Map<Long, ConnectContext> connectionMap = Maps.newConcurrentMap();
|
||||
private Map<String, AtomicInteger> connByUser = Maps.newHashMap();
|
||||
private Map<Integer, ConnectContext> connectionMap = Maps.newConcurrentMap();
|
||||
private Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap();
|
||||
private ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true);
|
||||
|
||||
// Use a thread to check whether connection is timeout. Because
|
||||
@ -61,7 +61,7 @@ public class ConnectScheduler {
|
||||
|
||||
public ConnectScheduler(int maxConnections) {
|
||||
this.maxConnections = maxConnections;
|
||||
numberConnection = 0;
|
||||
numberConnection = new AtomicInteger(0);
|
||||
nextConnectionId = new AtomicInteger(0);
|
||||
checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
@ -70,10 +70,8 @@ public class ConnectScheduler {
|
||||
@Override
|
||||
public void run() {
|
||||
long now = System.currentTimeMillis();
|
||||
synchronized (ConnectScheduler.this) {
|
||||
for (ConnectContext connectContext : connectionMap.values()) {
|
||||
connectContext.checkTimeout(now);
|
||||
}
|
||||
for (ConnectContext connectContext : connectionMap.values()) {
|
||||
connectContext.checkTimeout(now);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -96,50 +94,50 @@ public class ConnectScheduler {
|
||||
}
|
||||
|
||||
// Register one connection with its connection id.
|
||||
public synchronized boolean registerConnection(ConnectContext ctx) {
|
||||
if (numberConnection >= maxConnections) {
|
||||
public boolean registerConnection(ConnectContext ctx) {
|
||||
if (numberConnection.incrementAndGet() > maxConnections) {
|
||||
numberConnection.decrementAndGet();
|
||||
return false;
|
||||
}
|
||||
// Check user
|
||||
if (connByUser.get(ctx.getQualifiedUser()) == null) {
|
||||
connByUser.put(ctx.getQualifiedUser(), new AtomicInteger(0));
|
||||
}
|
||||
int conns = connByUser.get(ctx.getQualifiedUser()).get();
|
||||
connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
|
||||
AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
|
||||
if (ctx.getIsTempUser()) {
|
||||
if (conns >= LdapAuthenticate.getMaxConn()) {
|
||||
if (conns.incrementAndGet() > LdapAuthenticate.getMaxConn()) {
|
||||
conns.decrementAndGet();
|
||||
numberConnection.decrementAndGet();
|
||||
return false;
|
||||
}
|
||||
} else if (conns >= ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) {
|
||||
} else if (conns.incrementAndGet() > ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) {
|
||||
conns.decrementAndGet();
|
||||
numberConnection.decrementAndGet();
|
||||
return false;
|
||||
}
|
||||
numberConnection++;
|
||||
connByUser.get(ctx.getQualifiedUser()).incrementAndGet();
|
||||
connectionMap.put((long) ctx.getConnectionId(), ctx);
|
||||
connectionMap.put(ctx.getConnectionId(), ctx);
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized void unregisterConnection(ConnectContext ctx) {
|
||||
public void unregisterConnection(ConnectContext ctx) {
|
||||
ctx.closeTxn();
|
||||
if (connectionMap.remove((long) ctx.getConnectionId()) != null) {
|
||||
numberConnection--;
|
||||
if (connectionMap.remove(ctx.getConnectionId()) != null) {
|
||||
AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
|
||||
if (conns != null) {
|
||||
conns.decrementAndGet();
|
||||
}
|
||||
numberConnection.decrementAndGet();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized ConnectContext getContext(long connectionId) {
|
||||
public ConnectContext getContext(long connectionId) {
|
||||
return connectionMap.get(connectionId);
|
||||
}
|
||||
|
||||
public synchronized int getConnectionNum() {
|
||||
return numberConnection;
|
||||
public int getConnectionNum() {
|
||||
return numberConnection.get();
|
||||
}
|
||||
|
||||
public synchronized List<ConnectContext.ThreadInfo> listConnection(String user) {
|
||||
public List<ConnectContext.ThreadInfo> listConnection(String user) {
|
||||
List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();
|
||||
|
||||
for (ConnectContext ctx : connectionMap.values()) {
|
||||
// Check auth
|
||||
if (!ctx.getQualifiedUser().equals(user) &&
|
||||
|
||||
@ -471,8 +471,9 @@ public class Coordinator {
|
||||
addressToBackendID.get(execBeAddr),
|
||||
toBrpcHost(execBeAddr),
|
||||
queryOptions.query_timeout * 1000);
|
||||
|
||||
LOG.info("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
|
||||
}
|
||||
|
||||
if (topDataSink instanceof ResultFileSink
|
||||
&& ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) {
|
||||
|
||||
@ -81,7 +81,9 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
|
||||
@Override
|
||||
public void registerQuery(TUniqueId queryId, QueryInfo info) throws UserException {
|
||||
LOG.info("register query id = " + DebugUtil.printId(queryId) + ", job: " + info.getCoord().getJobId());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("register query id = " + DebugUtil.printId(queryId) + ", job: " + info.getCoord().getJobId());
|
||||
}
|
||||
final QueryInfo result = coordinatorMap.putIfAbsent(queryId, info);
|
||||
if (result != null) {
|
||||
throw new UserException("queryId " + queryId + " already exists");
|
||||
@ -121,7 +123,9 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
public void unregisterQuery(TUniqueId queryId) {
|
||||
QueryInfo queryInfo = coordinatorMap.remove(queryId);
|
||||
if (queryInfo != null) {
|
||||
LOG.info("deregister query id {}", DebugUtil.printId(queryId));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("deregister query id {}", DebugUtil.printId(queryId));
|
||||
}
|
||||
if (queryInfo.getConnectContext() != null &&
|
||||
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
|
||||
) {
|
||||
|
||||
@ -526,7 +526,9 @@ public class StmtExecutor implements ProfileWriter {
|
||||
|
||||
// Analyze one statement to structure in memory.
|
||||
public void analyze(TQueryOptions tQueryOptions) throws UserException {
|
||||
LOG.info("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), context.getForwardedStmtId());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), context.getForwardedStmtId());
|
||||
}
|
||||
|
||||
parse();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user