[Fix](planner) Fix query queue can not limit maxConcurrency #23418
2 Fix concurrent can not limit
This commit is contained in:
@ -207,9 +207,6 @@ public class StmtExecutor {
|
||||
private OriginStatement originStmt;
|
||||
private StatementBase parsedStmt;
|
||||
private Analyzer analyzer;
|
||||
private QueryQueue queryQueue = null;
|
||||
// by default, false means no query queued, then no need to poll when query finish
|
||||
private QueueOfferToken offerRet = new QueueOfferToken(false);
|
||||
private ProfileType profileType = ProfileType.QUERY;
|
||||
private volatile Coordinator coord = null;
|
||||
private MasterOpExecutor masterOpExecutor = null;
|
||||
@ -578,17 +575,19 @@ public class StmtExecutor {
|
||||
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
|
||||
// queue query here
|
||||
syncJournalIfNeeded();
|
||||
QueueOfferToken offerRet = null;
|
||||
QueryQueue queryQueue = null;
|
||||
if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue
|
||||
&& context.getSessionVariable().getEnablePipelineEngine()) {
|
||||
this.queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
|
||||
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
|
||||
try {
|
||||
this.offerRet = queryQueue.offer();
|
||||
offerRet = queryQueue.offer();
|
||||
} catch (InterruptedException e) {
|
||||
// this Exception means try lock/await failed, so no need to handle offer result
|
||||
LOG.error("error happens when offer queue, query id=" + DebugUtil.printId(queryId) + " ", e);
|
||||
throw new RuntimeException("interrupted Exception happens when queue query");
|
||||
}
|
||||
if (!offerRet.isOfferSuccess()) {
|
||||
if (offerRet != null && !offerRet.isOfferSuccess()) {
|
||||
String retMsg = "queue failed, reason=" + offerRet.getOfferResultDetail();
|
||||
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + retMsg);
|
||||
throw new UserException(retMsg);
|
||||
@ -628,7 +627,7 @@ public class StmtExecutor {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (offerRet.isOfferSuccess()) {
|
||||
if (offerRet != null && offerRet.isOfferSuccess()) {
|
||||
queryQueue.poll();
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,11 +59,17 @@ public class QueryQueue {
|
||||
// we should catch the case when it happens
|
||||
queueLock.tryLock(5, TimeUnit.SECONDS);
|
||||
try {
|
||||
// currentRunningQueryNum may bigger than maxRunningQueryNum
|
||||
// because maxRunningQueryNum can be altered
|
||||
if (currentRunningQueryNum >= maxConcurrency) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info(this.debugString());
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (currentRunningQueryNum < maxConcurrency) {
|
||||
break;
|
||||
}
|
||||
// currentRunningQueryNum may bigger than maxRunningQueryNum
|
||||
// because maxRunningQueryNum can be altered
|
||||
if (currentWaitingQueryNum >= maxQueueSize) {
|
||||
LOG.debug(this.debugString());
|
||||
return new QueueOfferToken(false, "query waiting queue is full, queue length=" + maxQueueSize);
|
||||
}
|
||||
|
||||
@ -75,13 +81,15 @@ public class QueryQueue {
|
||||
currentWaitingQueryNum--;
|
||||
}
|
||||
if (!ret) {
|
||||
LOG.debug(this.debugString());
|
||||
return new QueueOfferToken(false, "query wait timeout " + queueTimeout + " ms");
|
||||
}
|
||||
}
|
||||
currentRunningQueryNum++;
|
||||
return new QueueOfferToken(true, "offer success");
|
||||
} finally {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info(this.debugString());
|
||||
}
|
||||
queueLock.unlock();
|
||||
}
|
||||
}
|
||||
@ -92,8 +100,13 @@ public class QueryQueue {
|
||||
currentRunningQueryNum--;
|
||||
Preconditions.checkArgument(currentRunningQueryNum >= 0);
|
||||
// maybe only when currentWaitingQueryNum != 0 need to signal
|
||||
queueLockCond.signal();
|
||||
if (currentRunningQueryNum < maxConcurrency) {
|
||||
queueLockCond.signal();
|
||||
}
|
||||
} finally {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.info(this.debugString());
|
||||
}
|
||||
queueLock.unlock();
|
||||
}
|
||||
}
|
||||
@ -106,6 +119,9 @@ public class QueryQueue {
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
this.queueTimeout = queryWaitTimeout;
|
||||
} finally {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(this.debugString());
|
||||
}
|
||||
queueLock.unlock();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
||||
Reference in New Issue
Block a user