From 30658ebeda5f09b3d9c54fd90ab79deea3b7ca4a Mon Sep 17 00:00:00 2001 From: wangbo Date: Sat, 26 Aug 2023 17:31:44 +0800 Subject: [PATCH] [Fix](planner) Fix query queue can not limit maxConcurrency #23418 2 Fix concurrent can not limit --- .../org/apache/doris/qe/StmtExecutor.java | 13 ++++----- .../resource/workloadgroup/QueryQueue.java | 28 +++++++++++++++---- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index b5c556286b..85235ec159 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -207,9 +207,6 @@ public class StmtExecutor { private OriginStatement originStmt; private StatementBase parsedStmt; private Analyzer analyzer; - private QueryQueue queryQueue = null; - // by default, false means no query queued, then no need to poll when query finish - private QueueOfferToken offerRet = new QueueOfferToken(false); private ProfileType profileType = ProfileType.QUERY; private volatile Coordinator coord = null; private MasterOpExecutor masterOpExecutor = null; @@ -578,17 +575,19 @@ public class StmtExecutor { private void handleQueryWithRetry(TUniqueId queryId) throws Exception { // queue query here syncJournalIfNeeded(); + QueueOfferToken offerRet = null; + QueryQueue queryQueue = null; if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue && context.getSessionVariable().getEnablePipelineEngine()) { - this.queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); + queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); try { - this.offerRet = queryQueue.offer(); + offerRet = queryQueue.offer(); } catch (InterruptedException e) { // this Exception means try lock/await failed, so no need to handle offer result LOG.error("error happens when offer queue, query id=" + DebugUtil.printId(queryId) + " ", e); throw new RuntimeException("interrupted Exception happens when queue query"); } - if (!offerRet.isOfferSuccess()) { + if (offerRet != null && !offerRet.isOfferSuccess()) { String retMsg = "queue failed, reason=" + offerRet.getOfferResultDetail(); LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + retMsg); throw new UserException(retMsg); @@ -628,7 +627,7 @@ public class StmtExecutor { } } } finally { - if (offerRet.isOfferSuccess()) { + if (offerRet != null && offerRet.isOfferSuccess()) { queryQueue.poll(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 8f364fc4a0..dc7e672973 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -59,11 +59,17 @@ public class QueryQueue { // we should catch the case when it happens queueLock.tryLock(5, TimeUnit.SECONDS); try { - // currentRunningQueryNum may bigger than maxRunningQueryNum - // because maxRunningQueryNum can be altered - if (currentRunningQueryNum >= maxConcurrency) { + if (LOG.isDebugEnabled()) { + LOG.info(this.debugString()); + } + + while (true) { + if (currentRunningQueryNum < maxConcurrency) { + break; + } + // currentRunningQueryNum may bigger than maxRunningQueryNum + // because maxRunningQueryNum can be altered if (currentWaitingQueryNum >= maxQueueSize) { - LOG.debug(this.debugString()); return new QueueOfferToken(false, "query waiting queue is full, queue length=" + maxQueueSize); } @@ -75,13 +81,15 @@ public class QueryQueue { currentWaitingQueryNum--; } if (!ret) { - LOG.debug(this.debugString()); return new QueueOfferToken(false, "query wait timeout " + queueTimeout + " ms"); } } currentRunningQueryNum++; return new QueueOfferToken(true, "offer success"); } finally { + if (LOG.isDebugEnabled()) { + LOG.info(this.debugString()); + } queueLock.unlock(); } } @@ -92,8 +100,13 @@ public class QueryQueue { currentRunningQueryNum--; Preconditions.checkArgument(currentRunningQueryNum >= 0); // maybe only when currentWaitingQueryNum != 0 need to signal - queueLockCond.signal(); + if (currentRunningQueryNum < maxConcurrency) { + queueLockCond.signal(); + } } finally { + if (LOG.isDebugEnabled()) { + LOG.info(this.debugString()); + } queueLock.unlock(); } } @@ -106,6 +119,9 @@ public class QueryQueue { this.maxQueueSize = maxQueueSize; this.queueTimeout = queryWaitTimeout; } finally { + if (LOG.isDebugEnabled()) { + LOG.debug(this.debugString()); + } queueLock.unlock(); } } catch (InterruptedException e) {