From 0a22d969e18899f6877a458adb1c9ba91e1347be Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Wed, 6 Dec 2023 13:49:11 +0800 Subject: [PATCH] [refactor](queryqueue) using a priority queue in query queue in order to implement priority management in the future (#27969) --- .../org/apache/doris/qe/StmtExecutor.java | 24 ++-- .../resource/workloadgroup/QueryQueue.java | 109 +++++++------- .../workloadgroup/QueueOfferToken.java | 46 ------ .../resource/workloadgroup/QueueToken.java | 136 ++++++++++++++++++ 4 files changed, 201 insertions(+), 114 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueOfferToken.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 29e7aaa3b9..c7658b94e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -144,7 +144,7 @@ import org.apache.doris.qe.cache.Cache; import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode; import org.apache.doris.resource.workloadgroup.QueryQueue; -import org.apache.doris.resource.workloadgroup.QueueOfferToken; +import org.apache.doris.resource.workloadgroup.QueueToken; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException; import org.apache.doris.rpc.RpcException; @@ -618,22 +618,16 @@ public class StmtExecutor { private void handleQueryWithRetry(TUniqueId queryId) throws Exception { // queue query here syncJournalIfNeeded(); - QueueOfferToken offerRet = null; + QueueToken queueToken = null; QueryQueue queryQueue = null; if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue && context.getSessionVariable().getEnablePipelineEngine()) { queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); - try { - offerRet = queryQueue.offer(); - } catch (InterruptedException e) { - // this Exception means try lock/await failed, so no need to handle offer result - LOG.error("error happens when offer queue, query id=" + DebugUtil.printId(queryId) + " ", e); - throw new RuntimeException("interrupted Exception happens when queue query"); - } - if (offerRet != null && !offerRet.isOfferSuccess()) { - String retMsg = "queue failed, reason=" + offerRet.getOfferResultDetail(); - LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + retMsg); - throw new UserException(retMsg); + queueToken = queryQueue.getToken(); + if (!queueToken.waitSignal()) { + LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail()); + queryQueue.returnToken(queueToken); + throw new UserException(queueToken.getOfferResultDetail()); } } @@ -671,8 +665,8 @@ public class StmtExecutor { } } } finally { - if (offerRet != null && offerRet.isOfferSuccess()) { - queryQueue.poll(); + if (queueToken != null) { + queryQueue.returnToken(queueToken); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index 5a5d4acdc4..5d9a61f4a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -17,12 +17,14 @@ package org.apache.doris.resource.workloadgroup; +import org.apache.doris.common.UserException; +import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; + import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; +import java.util.PriorityQueue; import java.util.concurrent.locks.ReentrantLock; // note(wb) refer java BlockingQueue, but support altering capacity @@ -32,14 +34,12 @@ public class QueryQueue { private static final Logger LOG = LogManager.getLogger(QueryQueue.class); // note(wb) used unfair by default, need more test later private final ReentrantLock queueLock = new ReentrantLock(); - private final Condition queueLockCond = queueLock.newCondition(); // resource group property private int maxConcurrency; private int maxQueueSize; private int queueTimeout; // ms // running property private volatile int currentRunningQueryNum; - private volatile int currentWaitingQueryNum; public static final String RUNNING_QUERY_NUM = "running_query_num"; public static final String WAITING_QUERY_NUM = "waiting_query_num"; @@ -48,12 +48,19 @@ public class QueryQueue { private long propVersion; + private PriorityQueue priorityTokenQueue; + int getCurrentRunningQueryNum() { return currentRunningQueryNum; } int getCurrentWaitingQueryNum() { - return currentWaitingQueryNum; + try { + queueLock.lock(); + return priorityTokenQueue.size(); + } finally { + queueLock.unlock(); + } } long getPropVersion() { @@ -82,48 +89,34 @@ public class QueryQueue { this.maxQueueSize = maxQueueSize; this.queueTimeout = queueTimeout; this.propVersion = propVersion; + this.priorityTokenQueue = new PriorityQueue(); } public String debugString() { return "wgId= " + wgId + ", version=" + this.propVersion + ",maxConcurrency=" + maxConcurrency + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout + ", currentRunningQueryNum=" + currentRunningQueryNum - + ", currentWaitingQueryNum=" + currentWaitingQueryNum; + + ", currentWaitingQueryNum=" + priorityTokenQueue.size(); } - public QueueOfferToken offer() throws InterruptedException { - // to prevent hang - // the lock shouldn't be hold for too long - // we should catch the case when it happens - queueLock.tryLock(5, TimeUnit.SECONDS); + public QueueToken getToken() throws UserException { + + queueLock.lock(); try { if (LOG.isDebugEnabled()) { LOG.info(this.debugString()); } - - while (true) { - if (currentRunningQueryNum < maxConcurrency) { - break; - } - // currentRunningQueryNum may bigger than maxRunningQueryNum - // because maxRunningQueryNum can be altered - if (currentWaitingQueryNum >= maxQueueSize) { - return new QueueOfferToken(false, "query waiting queue is full, queue length=" + maxQueueSize); - } - - currentWaitingQueryNum++; - boolean ret; - try { - ret = queueLockCond.await(queueTimeout, TimeUnit.MILLISECONDS); - } finally { - currentWaitingQueryNum--; - } - if (!ret) { - return new QueueOfferToken(false, "query wait timeout " + queueTimeout + " ms"); - } + if (currentRunningQueryNum < maxConcurrency) { + currentRunningQueryNum++; + return new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success"); } - currentRunningQueryNum++; - return new QueueOfferToken(true, "offer success"); + if (priorityTokenQueue.size() >= maxQueueSize) { + throw new UserException("query waiting queue is full, queue length=" + maxQueueSize); + } + QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout, + "query wait timeout " + queueTimeout + " ms"); + this.priorityTokenQueue.offer(newQueryToken); + return newQueryToken; } finally { if (LOG.isDebugEnabled()) { LOG.info(this.debugString()); @@ -132,14 +125,29 @@ public class QueryQueue { } } - public void poll() throws InterruptedException { - queueLock.tryLock(5, TimeUnit.SECONDS); + // If the token is acquired and do work success, then call this method to release it. + public void returnToken(QueueToken token) throws InterruptedException { + queueLock.lock(); try { + // If current token is not in ready to run state, then it is still in the queue + // it is not running, just remove it. + if (!token.isReadyToRun()) { + this.priorityTokenQueue.remove(token); + return; + } currentRunningQueryNum--; Preconditions.checkArgument(currentRunningQueryNum >= 0); - // maybe only when currentWaitingQueryNum != 0 need to signal - if (currentRunningQueryNum < maxConcurrency) { - queueLockCond.signal(); + // If return token and find user changed concurrency num, then maybe need signal + // more tokens. + while (currentRunningQueryNum < maxConcurrency) { + QueueToken nextToken = this.priorityTokenQueue.poll(); + if (nextToken != null) { + if (nextToken.signal()) { + ++currentRunningQueryNum; + } + } else { + break; + } } } finally { if (LOG.isDebugEnabled()) { @@ -150,22 +158,17 @@ public class QueryQueue { } public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout, long version) { + queueLock.lock(); try { - queueLock.tryLock(5, TimeUnit.SECONDS); - try { - this.maxConcurrency = maxConcurrency; - this.maxQueueSize = maxQueueSize; - this.queueTimeout = queryWaitTimeout; - this.propVersion = version; - } finally { - if (LOG.isDebugEnabled()) { - LOG.debug(this.debugString()); - } - queueLock.unlock(); + this.maxConcurrency = maxConcurrency; + this.maxQueueSize = maxQueueSize; + this.queueTimeout = queryWaitTimeout; + this.propVersion = version; + } finally { + if (LOG.isDebugEnabled()) { + LOG.debug(this.debugString()); } - } catch (InterruptedException e) { - LOG.error("reset queue property failed, ", e); - throw new RuntimeException("reset queue property failed, reason=" + e.getMessage()); + queueLock.unlock(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueOfferToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueOfferToken.java deleted file mode 100644 index adf16e21b8..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueOfferToken.java +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.resource.workloadgroup; - -// used to mark QueryQueue offer result -// if offer failed, then need to cancel query -// and return failed reason to user client -public class QueueOfferToken { - - private Boolean offerResult; - - private String offerResultDetail; - - public QueueOfferToken(Boolean offerResult) { - this.offerResult = offerResult; - } - - public QueueOfferToken(Boolean offerResult, String offerResultDetail) { - this.offerResult = offerResult; - this.offerResultDetail = offerResultDetail; - } - - public Boolean isOfferSuccess() { - return offerResult; - } - - public String getOfferResultDetail() { - return offerResultDetail; - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java new file mode 100644 index 0000000000..9126535dc0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueueToken.java @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.resource.workloadgroup; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +// used to mark QueryQueue offer result +// if offer failed, then need to cancel query +// and return failed reason to user client +public class QueueToken { + private static final Logger LOG = LogManager.getLogger(QueueToken.class); + + enum TokenState { + ENQUEUE_SUCCESS, + READY_TO_RUN + } + + static AtomicLong tokenIdGenerator = new AtomicLong(0); + + private long tokenId = 0; + + private TokenState tokenState; + + private long waitTimeout = 0; + + private String offerResultDetail; + + private boolean isTimeout = false; + + private final ReentrantLock tokenLock = new ReentrantLock(); + private final Condition tokenCond = tokenLock.newCondition(); + + public QueueToken(TokenState tokenState, long waitTimeout, + String offerResultDetail) { + this.tokenId = tokenIdGenerator.addAndGet(1); + this.tokenState = tokenState; + this.waitTimeout = waitTimeout; + this.offerResultDetail = offerResultDetail; + } + + public boolean waitSignal() throws InterruptedException { + this.tokenLock.lock(); + try { + if (isTimeout) { + return false; + } + if (tokenState == TokenState.READY_TO_RUN) { + return true; + } + tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS); + // If wait timeout and is steal not ready to run, then return false + if (tokenState != TokenState.READY_TO_RUN) { + LOG.warn("wait in queue timeout, timeout = {}", waitTimeout); + isTimeout = true; + return false; + } else { + return true; + } + } catch (Throwable t) { + LOG.warn("meet execption when wait for signal", t); + // If any exception happens, set isTimeout to true and return false + // Then the caller will call returnToken to queue normally. + offerResultDetail = "meet exeption when wait for signal"; + isTimeout = true; + return false; + } finally { + this.tokenLock.unlock(); + } + } + + public boolean signal() { + this.tokenLock.lock(); + try { + // If current token is not ENQUEUE_SUCCESS, then it maybe has error + // not run it any more. + if (this.tokenState != TokenState.ENQUEUE_SUCCESS || isTimeout) { + return false; + } + this.tokenState = TokenState.READY_TO_RUN; + tokenCond.signal(); + return true; + } catch (Throwable t) { + isTimeout = true; + offerResultDetail = "meet exception when signal"; + LOG.warn("failed to signal token", t); + return false; + } finally { + this.tokenLock.unlock(); + } + } + + public String getOfferResultDetail() { + return offerResultDetail; + } + + public boolean isReadyToRun() { + return this.tokenState == TokenState.READY_TO_RUN; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + QueueToken other = (QueueToken) obj; + return tokenId == other.tokenId; + } + +}