pick #37559
This commit is contained in:
@ -242,6 +242,7 @@ import org.apache.doris.qe.JournalObservable;
|
||||
import org.apache.doris.qe.QueryCancelWorker;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.qe.VariableMgr;
|
||||
import org.apache.doris.resource.AdmissionControl;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
|
||||
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
|
||||
@ -515,6 +516,8 @@ public class Env {
|
||||
|
||||
private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
|
||||
|
||||
private AdmissionControl admissionControl;
|
||||
|
||||
private QueryStats queryStats;
|
||||
|
||||
private StatisticsCleaner statisticsCleaner;
|
||||
@ -772,6 +775,7 @@ public class Env {
|
||||
this.workloadGroupMgr = new WorkloadGroupMgr();
|
||||
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
|
||||
this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
|
||||
this.admissionControl = new AdmissionControl(systemInfo);
|
||||
this.queryStats = new QueryStats();
|
||||
this.loadManagerAdapter = new LoadManagerAdapter();
|
||||
this.hiveTransactionMgr = new HiveTransactionMgr();
|
||||
@ -883,6 +887,10 @@ public class Env {
|
||||
return workloadRuntimeStatusMgr;
|
||||
}
|
||||
|
||||
public AdmissionControl getAdmissionControl() {
|
||||
return admissionControl;
|
||||
}
|
||||
|
||||
public ExternalMetaIdMgr getExternalMetaIdMgr() {
|
||||
return externalMetaIdMgr;
|
||||
}
|
||||
@ -1747,6 +1755,7 @@ public class Env {
|
||||
workloadGroupMgr.start();
|
||||
workloadSchedPolicyMgr.start();
|
||||
workloadRuntimeStatusMgr.start();
|
||||
admissionControl.start();
|
||||
splitSourceManager.start();
|
||||
}
|
||||
|
||||
|
||||
@ -62,6 +62,7 @@ import org.apache.doris.planner.ResultSink;
|
||||
import org.apache.doris.planner.RuntimeFilter;
|
||||
import org.apache.doris.planner.RuntimeFilterId;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.planner.SchemaScanNode;
|
||||
import org.apache.doris.planner.SetOperationNode;
|
||||
import org.apache.doris.planner.SortNode;
|
||||
import org.apache.doris.planner.UnionNode;
|
||||
@ -645,6 +646,22 @@ public class Coordinator implements CoordInterface {
|
||||
return fragmentParams;
|
||||
}
|
||||
|
||||
private boolean shouldQueue() {
|
||||
boolean ret = Config.enable_query_queue && !context.getSessionVariable()
|
||||
.getBypassWorkloadGroup() && !isQueryCancelled();
|
||||
if (!ret) {
|
||||
return false;
|
||||
}
|
||||
// a query with ScanNode need not queue only when all its scan node is SchemaScanNode
|
||||
for (ScanNode scanNode : this.scanNodes) {
|
||||
boolean isSchemaScanNode = scanNode instanceof SchemaScanNode;
|
||||
if (!isSchemaScanNode) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Initiate asynchronous execution of query. Returns as soon as all plan fragments
|
||||
// have started executing at their respective backends.
|
||||
// 'Request' must contain at least a coordinator plan fragment (ie, can't
|
||||
@ -656,8 +673,7 @@ public class Coordinator implements CoordInterface {
|
||||
if (context != null) {
|
||||
if (Config.enable_workload_group) {
|
||||
this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
|
||||
boolean shouldQueue = Config.enable_query_queue && !context.getSessionVariable()
|
||||
.getBypassWorkloadGroup() && !isQueryCancelled();
|
||||
boolean shouldQueue = this.shouldQueue();
|
||||
if (shouldQueue) {
|
||||
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
|
||||
if (queryQueue == null) {
|
||||
@ -666,11 +682,8 @@ public class Coordinator implements CoordInterface {
|
||||
throw new UserException("could not find query queue");
|
||||
}
|
||||
queueToken = queryQueue.getToken();
|
||||
if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
|
||||
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail());
|
||||
queryQueue.returnToken(queueToken);
|
||||
throw new UserException(queueToken.getOfferResultDetail());
|
||||
}
|
||||
queueToken.get(DebugUtil.printId(queryId),
|
||||
this.queryOptions.getExecutionTimeout() * 1000);
|
||||
}
|
||||
} else {
|
||||
context.setWorkloadGroupName("");
|
||||
@ -681,16 +694,22 @@ public class Coordinator implements CoordInterface {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (ScanNode scanNode : scanNodes) {
|
||||
scanNode.stop();
|
||||
}
|
||||
// NOTE: all close method should be no exception
|
||||
if (queryQueue != null && queueToken != null) {
|
||||
try {
|
||||
queryQueue.returnToken(queueToken);
|
||||
queryQueue.releaseAndNotify(queueToken);
|
||||
} catch (Throwable t) {
|
||||
LOG.error("error happens when coordinator close ", t);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
for (ScanNode scanNode : scanNodes) {
|
||||
scanNode.stop();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error("error happens when scannode stop ", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void execInternal() throws Exception {
|
||||
@ -1516,7 +1535,7 @@ public class Coordinator implements CoordInterface {
|
||||
public void cancel() {
|
||||
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
|
||||
if (queueToken != null) {
|
||||
queueToken.signalForCancel();
|
||||
queueToken.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -25,7 +25,6 @@ import org.apache.doris.common.profile.ExecutionProfile;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.ProfileManager;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TQueryType;
|
||||
import org.apache.doris.thrift.TReportExecStatusParams;
|
||||
@ -333,11 +332,11 @@ public final class QeProcessorImpl implements QeProcessor {
|
||||
return -1;
|
||||
}
|
||||
|
||||
public TokenState getQueueStatus() {
|
||||
public String getQueueStatus() {
|
||||
if (coord.getQueueToken() != null) {
|
||||
return coord.getQueueToken().getTokenState();
|
||||
return coord.getQueueToken().getQueueMsg();
|
||||
}
|
||||
return null;
|
||||
return "";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,156 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.resource;
|
||||
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.resource.workloadgroup.QueueToken;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.BackendService;
|
||||
import org.apache.doris.thrift.TGetBeResourceRequest;
|
||||
import org.apache.doris.thrift.TGetBeResourceResult;
|
||||
import org.apache.doris.thrift.TGlobalResourceUsage;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
public class AdmissionControl extends MasterDaemon {
|
||||
|
||||
public static final Logger LOG = LogManager.getLogger(AdmissionControl.class);
|
||||
|
||||
private volatile boolean isAllBeMemoryEnough = true;
|
||||
|
||||
private double currentMemoryLimit = 0;
|
||||
|
||||
private SystemInfoService clusterInfoService;
|
||||
|
||||
public AdmissionControl(SystemInfoService clusterInfoService) {
|
||||
super("get-be-resource-usage-thread", Config.get_be_resource_usage_interval_ms);
|
||||
this.clusterInfoService = clusterInfoService;
|
||||
}
|
||||
|
||||
private ConcurrentLinkedQueue<QueueToken> queryWaitQueue = new ConcurrentLinkedQueue();
|
||||
|
||||
public void addQueueToken(QueueToken queryQueue) {
|
||||
queryWaitQueue.offer(queryQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
getBeMemoryUsage();
|
||||
notifyWaitQuery();
|
||||
}
|
||||
|
||||
public void getBeMemoryUsage() {
|
||||
if (Config.query_queue_by_be_used_memory < 0) {
|
||||
this.isAllBeMemoryEnough = true;
|
||||
return;
|
||||
}
|
||||
Collection<Backend> backends = clusterInfoService.getIdToBackend().values();
|
||||
this.currentMemoryLimit = Config.query_queue_by_be_used_memory;
|
||||
boolean tmpIsAllBeMemoryEnough = true;
|
||||
for (Backend be : backends) {
|
||||
if (!be.isAlive()) {
|
||||
continue;
|
||||
}
|
||||
TNetworkAddress address = null;
|
||||
BackendService.Client client = null;
|
||||
TGetBeResourceResult result = null;
|
||||
boolean rpcOk = true;
|
||||
try {
|
||||
address = new TNetworkAddress(be.getHost(), be.getBePort());
|
||||
client = ClientPool.backendPool.borrowObject(address, 5000);
|
||||
result = client.getBeResource(new TGetBeResourceRequest());
|
||||
} catch (Throwable t) {
|
||||
rpcOk = false;
|
||||
LOG.warn("get be {} resource failed, ", be.getHost(), t);
|
||||
} finally {
|
||||
try {
|
||||
if (rpcOk) {
|
||||
ClientPool.backendPool.returnObject(address, client);
|
||||
} else {
|
||||
ClientPool.backendPool.invalidateObject(address, client);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("return rpc client failed. related backend[{}]", be.getHost(),
|
||||
e);
|
||||
}
|
||||
}
|
||||
if (result != null && result.isSetGlobalResourceUsage()) {
|
||||
TGlobalResourceUsage globalResourceUsage = result.getGlobalResourceUsage();
|
||||
if (globalResourceUsage != null && globalResourceUsage.isSetMemLimit()
|
||||
&& globalResourceUsage.isSetMemUsage()) {
|
||||
long memUsageL = globalResourceUsage.getMemUsage();
|
||||
long memLimitL = globalResourceUsage.getMemLimit();
|
||||
double memUsage = Double.valueOf(String.valueOf(memUsageL));
|
||||
double memLimit = Double.valueOf(String.valueOf(memLimitL));
|
||||
double memUsagePercent = memUsage / memLimit;
|
||||
|
||||
if (memUsagePercent > this.currentMemoryLimit) {
|
||||
tmpIsAllBeMemoryEnough = false;
|
||||
}
|
||||
LOG.debug(
|
||||
"be ip:{}, mem limit:{}, mem usage:{}, mem usage percent:{}, "
|
||||
+ "query queue mem:{}, query wait size:{}",
|
||||
be.getHost(), memLimitL, memUsageL, memUsagePercent, this.currentMemoryLimit,
|
||||
this.queryWaitQueue.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
this.isAllBeMemoryEnough = tmpIsAllBeMemoryEnough;
|
||||
}
|
||||
|
||||
public void notifyWaitQuery() {
|
||||
if (!isAllBeMemoryEnough()) {
|
||||
return;
|
||||
}
|
||||
int waitQueryCountSnapshot = queryWaitQueue.size();
|
||||
Iterator<QueueToken> queueTokenIterator = queryWaitQueue.iterator();
|
||||
while (waitQueryCountSnapshot > 0 && queueTokenIterator.hasNext()) {
|
||||
QueueToken queueToken = queueTokenIterator.next();
|
||||
queueToken.notifyWaitQuery();
|
||||
waitQueryCountSnapshot--;
|
||||
}
|
||||
}
|
||||
|
||||
public void removeQueueToken(QueueToken queueToken) {
|
||||
queryWaitQueue.remove(queueToken);
|
||||
}
|
||||
|
||||
public boolean isAllBeMemoryEnough() {
|
||||
return isAllBeMemoryEnough;
|
||||
}
|
||||
|
||||
//TODO(wb): add more resource type
|
||||
public boolean checkResourceAvailable(QueueToken queueToken) {
|
||||
if (isAllBeMemoryEnough()) {
|
||||
return true;
|
||||
} else {
|
||||
queueToken.setQueueMsg("WAIT_BE_MEMORY");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -17,14 +17,18 @@
|
||||
|
||||
package org.apache.doris.resource.workloadgroup;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.resource.AdmissionControl;
|
||||
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
// note(wb) refer java BlockingQueue, but support altering capacity
|
||||
@ -38,8 +42,6 @@ public class QueryQueue {
|
||||
private int maxConcurrency;
|
||||
private int maxQueueSize;
|
||||
private int queueTimeout; // ms
|
||||
// running property
|
||||
private volatile int currentRunningQueryNum;
|
||||
|
||||
public static final String RUNNING_QUERY_NUM = "running_query_num";
|
||||
public static final String WAITING_QUERY_NUM = "waiting_query_num";
|
||||
@ -48,16 +50,13 @@ public class QueryQueue {
|
||||
|
||||
private long propVersion;
|
||||
|
||||
private PriorityQueue<QueueToken> priorityTokenQueue;
|
||||
private PriorityQueue<QueueToken> waitingQueryQueue;
|
||||
private Queue<QueueToken> runningQueryQueue;
|
||||
|
||||
int getCurrentRunningQueryNum() {
|
||||
return currentRunningQueryNum;
|
||||
}
|
||||
|
||||
int getCurrentWaitingQueryNum() {
|
||||
Pair<Integer, Integer> getQueryQueueDetail() {
|
||||
try {
|
||||
queueLock.lock();
|
||||
return priorityTokenQueue.size();
|
||||
return Pair.of(runningQueryQueue.size(), waitingQueryQueue.size());
|
||||
} finally {
|
||||
queueLock.unlock();
|
||||
}
|
||||
@ -89,36 +88,47 @@ public class QueryQueue {
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
this.queueTimeout = queueTimeout;
|
||||
this.propVersion = propVersion;
|
||||
this.priorityTokenQueue = new PriorityQueue<QueueToken>();
|
||||
this.waitingQueryQueue = new PriorityQueue<QueueToken>();
|
||||
this.runningQueryQueue = new LinkedList<QueueToken>();
|
||||
}
|
||||
|
||||
public String debugString() {
|
||||
return "wgId= " + wgId + ", version=" + this.propVersion + ",maxConcurrency=" + maxConcurrency
|
||||
+ ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout
|
||||
+ ", currentRunningQueryNum=" + currentRunningQueryNum
|
||||
+ ", currentWaitingQueryNum=" + priorityTokenQueue.size();
|
||||
+ ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout + ", currentRunningQueryNum="
|
||||
+ runningQueryQueue.size() + ", currentWaitingQueryNum=" + waitingQueryQueue.size();
|
||||
}
|
||||
|
||||
public QueueToken getToken() throws UserException {
|
||||
AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl();
|
||||
queueLock.lock();
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info(this.debugString());
|
||||
}
|
||||
if (currentRunningQueryNum < maxConcurrency) {
|
||||
currentRunningQueryNum++;
|
||||
QueueToken retToken = new QueueToken(TokenState.READY_TO_RUN, queueTimeout, "offer success");
|
||||
retToken.setQueueTimeWhenOfferSuccess();
|
||||
return retToken;
|
||||
}
|
||||
if (priorityTokenQueue.size() >= maxQueueSize) {
|
||||
QueueToken queueToken = new QueueToken(queueTimeout, this);
|
||||
|
||||
boolean isReachMaxCon = runningQueryQueue.size() >= maxConcurrency;
|
||||
boolean isResourceAvailable = admissionControl.checkResourceAvailable(queueToken);
|
||||
if (!isReachMaxCon && isResourceAvailable) {
|
||||
runningQueryQueue.offer(queueToken);
|
||||
queueToken.complete();
|
||||
return queueToken;
|
||||
} else if (waitingQueryQueue.size() >= maxQueueSize) {
|
||||
throw new UserException("query waiting queue is full, queue length=" + maxQueueSize);
|
||||
} else {
|
||||
if (isReachMaxCon) {
|
||||
queueToken.setQueueMsg("WAIT_IN_QUEUE");
|
||||
}
|
||||
queueToken.setTokenState(TokenState.ENQUEUE_SUCCESS);
|
||||
this.waitingQueryQueue.offer(queueToken);
|
||||
// if a query is added to wg's queue but not in AdmissionControl's
|
||||
// queue may be blocked by be memory later,
|
||||
// then we should put query to AdmissionControl in releaseAndNotify, it's too complicated.
|
||||
// To simplify the code logic, put all waiting query to AdmissionControl,
|
||||
// waiting query can be notified when query finish or memory is enough.
|
||||
admissionControl.addQueueToken(queueToken);
|
||||
}
|
||||
QueueToken newQueryToken = new QueueToken(TokenState.ENQUEUE_SUCCESS, queueTimeout,
|
||||
"query wait timeout " + queueTimeout + " ms");
|
||||
newQueryToken.setQueueTimeWhenQueueSuccess();
|
||||
this.priorityTokenQueue.offer(newQueryToken);
|
||||
return newQueryToken;
|
||||
return queueToken;
|
||||
} finally {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info(this.debugString());
|
||||
@ -127,35 +137,36 @@ public class QueryQueue {
|
||||
}
|
||||
}
|
||||
|
||||
// If the token is acquired and do work success, then call this method to release it.
|
||||
public void returnToken(QueueToken token) {
|
||||
public void notifyWaitQuery() {
|
||||
releaseAndNotify(null);
|
||||
}
|
||||
|
||||
public void releaseAndNotify(QueueToken releaseToken) {
|
||||
AdmissionControl admissionControl = Env.getCurrentEnv().getAdmissionControl();
|
||||
queueLock.lock();
|
||||
try {
|
||||
// If current token is not in ready to run state, then it is still in the queue
|
||||
// it is not running, just remove it.
|
||||
if (!token.isReadyToRun()) {
|
||||
this.priorityTokenQueue.remove(token);
|
||||
return;
|
||||
}
|
||||
currentRunningQueryNum--;
|
||||
Preconditions.checkArgument(currentRunningQueryNum >= 0);
|
||||
// If return token and find user changed concurrency num, then maybe need signal
|
||||
// more tokens.
|
||||
while (currentRunningQueryNum < maxConcurrency) {
|
||||
QueueToken nextToken = this.priorityTokenQueue.poll();
|
||||
if (nextToken != null) {
|
||||
if (nextToken.signal()) {
|
||||
++currentRunningQueryNum;
|
||||
}
|
||||
runningQueryQueue.remove(releaseToken);
|
||||
waitingQueryQueue.remove(releaseToken);
|
||||
admissionControl.removeQueueToken(releaseToken);
|
||||
while (runningQueryQueue.size() < maxConcurrency) {
|
||||
QueueToken queueToken = waitingQueryQueue.peek();
|
||||
if (queueToken == null) {
|
||||
break;
|
||||
}
|
||||
if (admissionControl.checkResourceAvailable(queueToken)) {
|
||||
queueToken.complete();
|
||||
runningQueryQueue.offer(queueToken);
|
||||
waitingQueryQueue.remove();
|
||||
admissionControl.removeQueueToken(queueToken);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
queueLock.unlock();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info(this.debugString());
|
||||
}
|
||||
queueLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,13 +17,16 @@
|
||||
|
||||
package org.apache.doris.resource.workloadgroup;
|
||||
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
// used to mark QueryQueue offer result
|
||||
// if offer failed, then need to cancel query
|
||||
@ -38,134 +41,79 @@ public class QueueToken implements Comparable<QueueToken> {
|
||||
|
||||
public enum TokenState {
|
||||
ENQUEUE_SUCCESS,
|
||||
READY_TO_RUN,
|
||||
CANCELLED
|
||||
READY_TO_RUN
|
||||
}
|
||||
|
||||
static AtomicLong tokenIdGenerator = new AtomicLong(0);
|
||||
|
||||
private long tokenId = 0;
|
||||
|
||||
private TokenState tokenState;
|
||||
private volatile TokenState tokenState;
|
||||
|
||||
private long queueWaitTimeout = 0;
|
||||
|
||||
private String offerResultDetail;
|
||||
|
||||
private boolean isTimeout = false;
|
||||
|
||||
private final ReentrantLock tokenLock = new ReentrantLock();
|
||||
private final Condition tokenCond = tokenLock.newCondition();
|
||||
|
||||
private long queueStartTime = -1;
|
||||
private long queueEndTime = -1;
|
||||
|
||||
public QueueToken(TokenState tokenState, long queueWaitTimeout,
|
||||
String offerResultDetail) {
|
||||
private volatile String queueMsg = "";
|
||||
|
||||
QueryQueue queryQueue = null;
|
||||
|
||||
// Object is just a placeholder, it's meaningless now
|
||||
private CompletableFuture<Object> future;
|
||||
|
||||
public QueueToken(long queueWaitTimeout, QueryQueue queryQueue) {
|
||||
this.tokenId = tokenIdGenerator.addAndGet(1);
|
||||
this.tokenState = tokenState;
|
||||
this.queueWaitTimeout = queueWaitTimeout;
|
||||
this.offerResultDetail = offerResultDetail;
|
||||
this.queueStartTime = System.currentTimeMillis();
|
||||
this.queryQueue = queryQueue;
|
||||
this.future = new CompletableFuture<>();
|
||||
}
|
||||
|
||||
public boolean waitSignal(long queryTimeoutMillis) throws InterruptedException {
|
||||
this.tokenLock.lock();
|
||||
public void setQueueMsg(String msg) {
|
||||
this.queueMsg = msg;
|
||||
}
|
||||
|
||||
public void setTokenState(TokenState tokenState) {
|
||||
this.tokenState = tokenState;
|
||||
}
|
||||
|
||||
public String getQueueMsg() {
|
||||
return queueMsg;
|
||||
}
|
||||
|
||||
public void get(String queryId, int queryTimeout) throws UserException {
|
||||
if (isReadyToRun()) {
|
||||
return;
|
||||
}
|
||||
long waitTimeout = queueWaitTimeout > 0 ? Math.min(queueWaitTimeout, queryTimeout) : queryTimeout;
|
||||
waitTimeout = waitTimeout <= 0 ? 4096 : waitTimeout;
|
||||
try {
|
||||
if (isTimeout) {
|
||||
return false;
|
||||
}
|
||||
if (tokenState == TokenState.READY_TO_RUN) {
|
||||
return true;
|
||||
}
|
||||
// If query timeout is less than queue wait timeout, then should use
|
||||
// query timeout as wait timeout
|
||||
long waitTimeout = queryTimeoutMillis > queueWaitTimeout ? queueWaitTimeout : queryTimeoutMillis;
|
||||
tokenCond.await(waitTimeout, TimeUnit.MILLISECONDS);
|
||||
if (tokenState == TokenState.CANCELLED) {
|
||||
this.offerResultDetail = "query is cancelled in queue";
|
||||
return false;
|
||||
}
|
||||
// If wait timeout and is steal not ready to run, then return false
|
||||
if (tokenState != TokenState.READY_TO_RUN) {
|
||||
LOG.warn("wait in queue timeout, timeout = {}", waitTimeout);
|
||||
isTimeout = true;
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
future.get(waitTimeout, TimeUnit.MILLISECONDS);
|
||||
} catch (TimeoutException e) {
|
||||
throw new UserException("query queue timeout, timeout: " + waitTimeout + " ms ");
|
||||
} catch (CancellationException e) {
|
||||
throw new UserException("query is cancelled");
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("meet execption when wait for signal", t);
|
||||
// If any exception happens, set isTimeout to true and return false
|
||||
// Then the caller will call returnToken to queue normally.
|
||||
offerResultDetail = "meet exeption when wait for signal";
|
||||
isTimeout = true;
|
||||
return false;
|
||||
} finally {
|
||||
this.tokenLock.unlock();
|
||||
this.setQueueTimeWhenQueueEnd();
|
||||
String errMsg = String.format("error happens when query {} queue", queryId);
|
||||
LOG.error(errMsg, t);
|
||||
throw new RuntimeException(errMsg, t);
|
||||
}
|
||||
}
|
||||
|
||||
public void signalForCancel() {
|
||||
this.tokenLock.lock();
|
||||
try {
|
||||
if (this.tokenState == TokenState.ENQUEUE_SUCCESS) {
|
||||
tokenCond.signal();
|
||||
this.tokenState = TokenState.CANCELLED;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("error happens when signal for cancel", t);
|
||||
} finally {
|
||||
this.tokenLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean signal() {
|
||||
this.tokenLock.lock();
|
||||
try {
|
||||
// If current token is not ENQUEUE_SUCCESS, then it maybe has error
|
||||
// not run it any more.
|
||||
if (this.tokenState != TokenState.ENQUEUE_SUCCESS || isTimeout) {
|
||||
return false;
|
||||
}
|
||||
this.tokenState = TokenState.READY_TO_RUN;
|
||||
tokenCond.signal();
|
||||
return true;
|
||||
} catch (Throwable t) {
|
||||
isTimeout = true;
|
||||
offerResultDetail = "meet exception when signal";
|
||||
LOG.warn("failed to signal token", t);
|
||||
return false;
|
||||
} finally {
|
||||
this.tokenLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public String getOfferResultDetail() {
|
||||
return offerResultDetail;
|
||||
}
|
||||
|
||||
public boolean isReadyToRun() {
|
||||
return this.tokenState == TokenState.READY_TO_RUN;
|
||||
}
|
||||
|
||||
public boolean isCancelled() {
|
||||
return this.tokenState == TokenState.CANCELLED;
|
||||
}
|
||||
|
||||
public void setQueueTimeWhenOfferSuccess() {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
this.queueStartTime = currentTime;
|
||||
this.queueEndTime = currentTime;
|
||||
}
|
||||
|
||||
public void setQueueTimeWhenQueueSuccess() {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
this.queueStartTime = currentTime;
|
||||
}
|
||||
|
||||
public void setQueueTimeWhenQueueEnd() {
|
||||
public void complete() {
|
||||
this.queueEndTime = System.currentTimeMillis();
|
||||
this.tokenState = TokenState.READY_TO_RUN;
|
||||
this.setQueueMsg("RUNNING");
|
||||
future.complete(null);
|
||||
}
|
||||
|
||||
public void notifyWaitQuery() {
|
||||
this.queryQueue.notifyWaitQuery();
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
future.cancel(true);
|
||||
}
|
||||
|
||||
public long getQueueStartTime() {
|
||||
@ -176,8 +124,8 @@ public class QueueToken implements Comparable<QueueToken> {
|
||||
return queueEndTime;
|
||||
}
|
||||
|
||||
public TokenState getTokenState() {
|
||||
return tokenState;
|
||||
public boolean isReadyToRun() {
|
||||
return tokenState == TokenState.READY_TO_RUN;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
@ -431,6 +432,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
List<String> row = new ArrayList<>();
|
||||
row.add(String.valueOf(id));
|
||||
row.add(name);
|
||||
Pair<Integer, Integer> queryQueueDetail = qq != null ? qq.getQueryQueueDetail() : null;
|
||||
// skip id,name,running query,waiting query
|
||||
for (int i = 2; i < WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.size(); i++) {
|
||||
String key = WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i);
|
||||
@ -472,9 +474,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
row.add(val + "%");
|
||||
}
|
||||
} else if (QueryQueue.RUNNING_QUERY_NUM.equals(key)) {
|
||||
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum()));
|
||||
row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.first));
|
||||
} else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
|
||||
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum()));
|
||||
row.add(queryQueueDetail == null ? "0" : String.valueOf(queryQueueDetail.second));
|
||||
} else if (TAG.equals(key)) {
|
||||
String val = properties.get(key);
|
||||
if (StringUtils.isEmpty(val)) {
|
||||
|
||||
@ -51,7 +51,6 @@ import org.apache.doris.plsql.metastore.PlsqlStoredProcedure;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.QeProcessorImpl;
|
||||
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
|
||||
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
|
||||
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
@ -612,14 +611,8 @@ public class MetadataGenerator {
|
||||
trow.addToColumnValue(new TCell());
|
||||
}
|
||||
|
||||
TokenState tokenState = queryInfo.getQueueStatus();
|
||||
if (tokenState == null) {
|
||||
trow.addToColumnValue(new TCell());
|
||||
} else if (tokenState == TokenState.READY_TO_RUN) {
|
||||
trow.addToColumnValue(new TCell().setStringVal("RUNNING"));
|
||||
} else {
|
||||
trow.addToColumnValue(new TCell().setStringVal("QUEUED"));
|
||||
}
|
||||
String queueMsg = queryInfo.getQueueStatus();
|
||||
trow.addToColumnValue(new TCell().setStringVal(queueMsg));
|
||||
|
||||
trow.addToColumnValue(new TCell().setStringVal(queryInfo.getSql()));
|
||||
dataBatch.add(trow);
|
||||
|
||||
Reference in New Issue
Block a user