[refactor](queryqueue) using a priority queue in query queue in order to implement priority management in the future (#27969)

This commit is contained in:
yiguolei
2023-12-06 13:49:11 +08:00
committed by GitHub
parent a0fee4c96e
commit 0a22d969e1
4 changed files with 201 additions and 114 deletions

View File

@ -144,7 +144,7 @@ import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueOfferToken;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.RpcException;
@ -618,22 +618,16 @@ public class StmtExecutor {
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
// queue query here
syncJournalIfNeeded();
QueueOfferToken offerRet = null;
QueueToken queueToken = null;
QueryQueue queryQueue = null;
if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue
&& context.getSessionVariable().getEnablePipelineEngine()) {
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
try {
offerRet = queryQueue.offer();
} catch (InterruptedException e) {
// this Exception means try lock/await failed, so no need to handle offer result
LOG.error("error happens when offer queue, query id=" + DebugUtil.printId(queryId) + " ", e);
throw new RuntimeException("interrupted Exception happens when queue query");
}
if (offerRet != null && !offerRet.isOfferSuccess()) {
String retMsg = "queue failed, reason=" + offerRet.getOfferResultDetail();
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + retMsg);
throw new UserException(retMsg);
queueToken = queryQueue.getToken();
if (!queueToken.waitSignal()) {
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail());
queryQueue.returnToken(queueToken);
throw new UserException(queueToken.getOfferResultDetail());
}
}
@ -671,8 +665,8 @@ public class StmtExecutor {
}
}
} finally {
if (offerRet != null && offerRet.isOfferSuccess()) {
queryQueue.poll();
if (queueToken != null) {
queryQueue.returnToken(queueToken);
}
}
}

View File

@ -17,12 +17,14 @@
package org.apache.doris.resource.workloadgroup;
import org.apache.doris.common.UserException;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.PriorityQueue;
import java.util.concurrent.locks.ReentrantLock;
// note(wb) refer java BlockingQueue, but support altering capacity
@ -32,14 +34,12 @@ public class QueryQueue {
private static final Logger LOG = LogManager.getLogger(QueryQueue.class);
// note(wb) used unfair by default, need more test later
private final ReentrantLock queueLock = new ReentrantLock();
private final Condition queueLockCond = queueLock.newCondition();
// resource group property
private int maxConcurrency;
private int maxQueueSize;
private int queueTimeout; // ms
// running property
private volatile int currentRunningQueryNum;
private volatile int currentWaitingQueryNum;
public static final String RUNNING_QUERY_NUM = "running_query_num";
public static final String WAITING_QUERY_NUM = "waiting_query_num";
@ -48,12 +48,19 @@ public class QueryQueue {
private long propVersion;
private PriorityQueue<QueueToken> priorityTokenQueue;
int getCurrentRunningQueryNum() {
return currentRunningQueryNum;
}
int getCurrentWaitingQueryNum() {
return currentWaitingQueryNum;
try {
queueLock.lock();
return priorityTokenQueue.size();
} finally {
queueLock.unlock();
}
}
long getPropVersion() {
@ -82,48 +89,34 @@ public class QueryQueue {
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queueTimeout;
this.propVersion = propVersion;
this.priorityTokenQueue = new PriorityQueue<QueueToken>();
}
public String debugString() {
return "wgId= " + wgId + ", version=" + this.propVersion + ",maxConcurrency=" + maxConcurrency
+ ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout
+ ", currentRunningQueryNum=" + currentRunningQueryNum
+ ", currentWaitingQueryNum=" + currentWaitingQueryNum;
+ ", currentWaitingQueryNum=" + priorityTokenQueue.size();
}
public QueueOfferToken offer() throws InterruptedException {
// to prevent hang
// the lock shouldn't be hold for too long
// we should catch the case when it happens
queueLock.tryLock(5, TimeUnit.SECONDS);
public QueueToken getToken() throws UserException {
queueLock.lock();
try {
if (LOG.isDebugEnabled()) {
LOG.info(this.debugString());
}
while (true) {
if (currentRunningQueryNum < maxConcurrency) {
break;
}
// currentRunningQueryNum may bigger than maxRunningQueryNum
// because maxRunningQueryNum can be altered
if (currentWaitingQueryNum >= maxQueueSize) {
return new QueueOfferToken(false, "query waiting queue is full, queue length=" + maxQueueSize);
}
currentWaitingQueryNum++;
boolean ret;
try {
ret = queueLockCond.await(queueTimeout, TimeUnit.MILLISECONDS);
} finally {
currentWaitingQueryNum--;
}
if (!ret) {
return new QueueOfferToken(false, "query wait timeout " + queueTimeout + " ms");
}
if (currentRunningQueryNum < maxConcurrency) {
currentRunningQueryNum++;
return new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success");
}
currentRunningQueryNum++;
return new QueueOfferToken(true, "offer success");
if (priorityTokenQueue.size() >= maxQueueSize) {
throw new UserException("query waiting queue is full, queue length=" + maxQueueSize);
}
QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
"query wait timeout " + queueTimeout + " ms");
this.priorityTokenQueue.offer(newQueryToken);
return newQueryToken;
} finally {
if (LOG.isDebugEnabled()) {
LOG.info(this.debugString());
@ -132,14 +125,29 @@ public class QueryQueue {
}
}
public void poll() throws InterruptedException {
queueLock.tryLock(5, TimeUnit.SECONDS);
// If the token is acquired and do work success, then call this method to release it.
public void returnToken(QueueToken token) throws InterruptedException {
queueLock.lock();
try {
// If current token is not in ready to run state, then it is still in the queue
// it is not running, just remove it.
if (!token.isReadyToRun()) {
this.priorityTokenQueue.remove(token);
return;
}
currentRunningQueryNum--;
Preconditions.checkArgument(currentRunningQueryNum >= 0);
// maybe only when currentWaitingQueryNum != 0 need to signal
if (currentRunningQueryNum < maxConcurrency) {
queueLockCond.signal();
// If return token and find user changed concurrency num, then maybe need signal
// more tokens.
while (currentRunningQueryNum < maxConcurrency) {
QueueToken nextToken = this.priorityTokenQueue.poll();
if (nextToken != null) {
if (nextToken.signal()) {
++currentRunningQueryNum;
}
} else {
break;
}
}
} finally {
if (LOG.isDebugEnabled()) {
@ -150,22 +158,17 @@ public class QueryQueue {
}
public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout, long version) {
queueLock.lock();
try {
queueLock.tryLock(5, TimeUnit.SECONDS);
try {
this.maxConcurrency = maxConcurrency;
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queryWaitTimeout;
this.propVersion = version;
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(this.debugString());
}
queueLock.unlock();
this.maxConcurrency = maxConcurrency;
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queryWaitTimeout;
this.propVersion = version;
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(this.debugString());
}
} catch (InterruptedException e) {
LOG.error("reset queue property failed, ", e);
throw new RuntimeException("reset queue property failed, reason=" + e.getMessage());
queueLock.unlock();
}
}

View File

@ -1,46 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadgroup;
// used to mark QueryQueue offer result
// if offer failed, then need to cancel query
// and return failed reason to user client
public class QueueOfferToken {
private Boolean offerResult;
private String offerResultDetail;
public QueueOfferToken(Boolean offerResult) {
this.offerResult = offerResult;
}
public QueueOfferToken(Boolean offerResult, String offerResultDetail) {
this.offerResult = offerResult;
this.offerResultDetail = offerResultDetail;
}
public Boolean isOfferSuccess() {
return offerResult;
}
public String getOfferResultDetail() {
return offerResultDetail;
}
}

View File

@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.workloadgroup;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
// used to mark QueryQueue offer result
// if offer failed, then need to cancel query
// and return failed reason to user client
public class QueueToken {
private static final Logger LOG = LogManager.getLogger(QueueToken.class);
enum TokenState {
ENQUEUE_SUCCESS,
READY_TO_RUN
}
static AtomicLong tokenIdGenerator = new AtomicLong(0);
private long tokenId = 0;
private TokenState tokenState;
private long waitTimeout = 0;
private String offerResultDetail;
private boolean isTimeout = false;
private final ReentrantLock tokenLock = new ReentrantLock();
private final Condition tokenCond = tokenLock.newCondition();
public QueueToken(TokenState tokenState, long waitTimeout,
String offerResultDetail) {
this.tokenId = tokenIdGenerator.addAndGet(1);
this.tokenState = tokenState;
this.waitTimeout = waitTimeout;
this.offerResultDetail = offerResultDetail;
}
public boolean waitSignal() throws InterruptedException {
this.tokenLock.lock();
try {
if (isTimeout) {
return false;
}
if (tokenState == TokenState.READY_TO_RUN) {
return true;
}
tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS);
// If wait timeout and is steal not ready to run, then return false
if (tokenState != TokenState.READY_TO_RUN) {
LOG.warn("wait in queue timeout, timeout = {}", waitTimeout);
isTimeout = true;
return false;
} else {
return true;
}
} catch (Throwable t) {
LOG.warn("meet execption when wait for signal", t);
// If any exception happens, set isTimeout to true and return false
// Then the caller will call returnToken to queue normally.
offerResultDetail = "meet exeption when wait for signal";
isTimeout = true;
return false;
} finally {
this.tokenLock.unlock();
}
}
public boolean signal() {
this.tokenLock.lock();
try {
// If current token is not ENQUEUE_SUCCESS, then it maybe has error
// not run it any more.
if (this.tokenState != TokenState.ENQUEUE_SUCCESS || isTimeout) {
return false;
}
this.tokenState = TokenState.READY_TO_RUN;
tokenCond.signal();
return true;
} catch (Throwable t) {
isTimeout = true;
offerResultDetail = "meet exception when signal";
LOG.warn("failed to signal token", t);
return false;
} finally {
this.tokenLock.unlock();
}
}
public String getOfferResultDetail() {
return offerResultDetail;
}
public boolean isReadyToRun() {
return this.tokenState == TokenState.READY_TO_RUN;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
QueueToken other = (QueueToken) obj;
return tokenId == other.tokenId;
}
}