support query queue (#20048)

support query queue (#20048)
This commit is contained in:
wangbo
2023-05-30 19:52:27 +08:00
committed by GitHub
parent 1919355c04
commit 6f68ec9de0
7 changed files with 311 additions and 11 deletions

View File

@ -1492,6 +1492,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true, expType = ExperimentalType.EXPERIMENTAL)
public static boolean enable_resource_group = false;
@ConfField(mutable = true)
public static boolean enable_query_queue = true;
@ConfField(mutable = false, masterOnly = true)
public static int backend_rpc_timeout_ms = 60000; // 1 min

View File

@ -135,6 +135,8 @@ import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.CacheAnalyzer.CacheMode;
import org.apache.doris.resource.resourcegroup.QueryQueue;
import org.apache.doris.resource.resourcegroup.QueueOfferToken;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.RpcException;
@ -204,6 +206,9 @@ public class StmtExecutor {
private OriginStatement originStmt;
private StatementBase parsedStmt;
private Analyzer analyzer;
private QueryQueue queryQueue = null;
// by default, false means no query queued, then no need to poll when query finish
private QueueOfferToken offerRet = new QueueOfferToken(false);
private ProfileType profileType = ProfileType.QUERY;
private volatile Coordinator coord = null;
private MasterOpExecutor masterOpExecutor = null;
@ -552,6 +557,24 @@ public class StmtExecutor {
}
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
// queue query here
if (!parsedStmt.isExplain() && Config.enable_resource_group && Config.enable_query_queue) {
this.queryQueue = analyzer.getEnv().getResourceGroupMgr()
.getResourceGroupQueryQueue(context.sessionVariable.resourceGroup);
try {
this.offerRet = queryQueue.offer();
} catch (InterruptedException e) {
// this Exception means try lock/await failed, so no need to handle offer result
LOG.error("error happens when offer queue, query id=" + DebugUtil.printId(queryId) + " ", e);
throw new RuntimeException("interrupted Exception happens when queue query");
}
if (!offerRet.isOfferSuccess()) {
String retMsg = "queue failed, reason=" + offerRet.getOfferResultDetail();
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + retMsg);
throw new UserException(retMsg);
}
}
int retryTime = Config.max_query_retry_time;
for (int i = 0; i < retryTime; i++) {
try {
@ -621,6 +644,9 @@ public class StmtExecutor {
throw e;
} finally {
queryAnalysisSpan.end();
if (offerRet.isOfferSuccess()) {
queryQueue.poll();
}
}
if (isForwardToMaster()) {
if (isProxy) {
@ -801,7 +827,7 @@ public class StmtExecutor {
}
// Analyze one statement to structure in memory.
public void analyze(TQueryOptions tQueryOptions) throws UserException {
public void analyze(TQueryOptions tQueryOptions) throws UserException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(),
context.getForwardedStmtId());

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.resourcegroup;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
// note(wb) refer java BlockingQueue, but support altering capacity
// todo(wb) add wait time to profile
public class QueryQueue {
private static final Logger LOG = LogManager.getLogger(QueryQueue.class);
// note(wb) used unfair by default, need more test later
private final ReentrantLock queueLock = new ReentrantLock();
private final Condition queueLockCond = queueLock.newCondition();
// resource group property
private int maxConcurrency;
private int maxQueueSize;
private int queueTimeout; // ms
// running property
private int currentRunningQueryNum;
private int currentWaitingQueryNum;
public QueryQueue(int maxConcurrency, int maxQueueSize, int queueTimeout) {
this.maxConcurrency = maxConcurrency;
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queueTimeout;
}
public String debugString() {
return "maxConcurrency=" + maxConcurrency + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout
+ ", currentRunningQueryNum=" + currentRunningQueryNum + ", currentWaitingQueryNum="
+ currentWaitingQueryNum;
}
public QueueOfferToken offer() throws InterruptedException {
// to prevent hang
// the lock shouldn't be hold for too long
// we should catch the case when it happens
queueLock.tryLock(5, TimeUnit.SECONDS);
try {
// currentRunningQueryNum may bigger than maxRunningQueryNum
// because maxRunningQueryNum can be altered
if (currentRunningQueryNum >= maxConcurrency) {
if (currentWaitingQueryNum >= maxQueueSize) {
LOG.debug(this.debugString());
return new QueueOfferToken(false, "query waiting queue is full, queue length=" + maxQueueSize);
}
currentWaitingQueryNum++;
boolean ret;
try {
ret = queueLockCond.await(queueTimeout, TimeUnit.MILLISECONDS);
} finally {
currentWaitingQueryNum--;
}
if (!ret) {
LOG.debug(this.debugString());
return new QueueOfferToken(false, "query wait timeout " + queueTimeout + " ms");
}
}
currentRunningQueryNum++;
return new QueueOfferToken(true, "offer success");
} finally {
queueLock.unlock();
}
}
public void poll() throws InterruptedException {
queueLock.tryLock(5, TimeUnit.SECONDS);
try {
currentRunningQueryNum--;
Preconditions.checkArgument(currentRunningQueryNum >= 0);
// maybe only when currentWaitingQueryNum != 0 need to signal
queueLockCond.signal();
} finally {
queueLock.unlock();
}
}
public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout) {
try {
queueLock.tryLock(5, TimeUnit.SECONDS);
try {
this.maxConcurrency = maxConcurrency;
this.maxQueueSize = maxQueueSize;
this.queueTimeout = queryWaitTimeout;
} finally {
queueLock.unlock();
}
} catch (InterruptedException e) {
LOG.error("reset queue property failed, ", e);
throw new RuntimeException("reset queue property failed, reason=" + e.getMessage());
}
}
}

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.resource.resourcegroup;
// used to mark QueryQueue offer result
// if offer failed, then need to cancel query
// and return failed reason to user client
public class QueueOfferToken {
private Boolean offerResult;
private String offerResultDetail;
public QueueOfferToken(Boolean offerResult) {
this.offerResult = offerResult;
}
public QueueOfferToken(Boolean offerResult, String offerResultDetail) {
this.offerResult = offerResult;
this.offerResultDetail = offerResultDetail;
}
public Boolean isOfferSuccess() {
return offerResult;
}
public String getOfferResultDetail() {
return offerResultDetail;
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TPipelineResourceGroup;
@ -39,7 +40,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class ResourceGroup implements Writable {
public class ResourceGroup implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ResourceGroup.class);
public static final String CPU_SHARE = "cpu_share";
@ -48,11 +49,18 @@ public class ResourceGroup implements Writable {
public static final String ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
public static final String MAX_CONCURRENCY = "max_concurrency";
public static final String MAX_QUEUE_SIZE = "max_queue_size";
public static final String QUEUE_TIMEOUT = "queue_timeout";
private static final ImmutableSet<String> REQUIRED_PROPERTIES_NAME = new ImmutableSet.Builder<String>().add(
CPU_SHARE).add(MEMORY_LIMIT).build();
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>().add(
CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).build();
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).build();
@SerializedName(value = "id")
private long id;
@ -69,6 +77,11 @@ public class ResourceGroup implements Writable {
private double memoryLimitPercent;
private QueryQueue queryQueue;
private int maxConcurrency = Integer.MAX_VALUE;
private int maxQueueSize = 0;
private int queueTimeout = 0;
private ResourceGroup(long id, String name, Map<String, String> properties) {
this(id, name, properties, 0);
}
@ -85,11 +98,54 @@ public class ResourceGroup implements Writable {
}
}
public static ResourceGroup create(String name, Map<String, String> properties) throws DdlException {
checkProperties(properties);
return new ResourceGroup(Env.getCurrentEnv().getNextId(), name, properties);
// called when first create a resource group, load from image or user new create a group
public void initQueryQueue() {
resetQueueProperty(properties);
// if query queue property is not set, when use default value
this.queryQueue = new QueryQueue(maxConcurrency, maxQueueSize, queueTimeout);
}
void resetQueryQueue(QueryQueue queryQueue) {
resetQueueProperty(properties);
this.queryQueue = queryQueue;
this.queryQueue.resetQueueProperty(this.maxConcurrency, this.maxQueueSize, this.queueTimeout);
}
private void resetQueueProperty(Map<String, String> properties) {
if (properties.containsKey(MAX_CONCURRENCY)) {
this.maxConcurrency = Integer.parseInt(properties.get(MAX_CONCURRENCY));
} else {
this.maxConcurrency = Integer.MAX_VALUE;
properties.put(MAX_CONCURRENCY, String.valueOf(this.maxConcurrency));
}
if (properties.containsKey(MAX_QUEUE_SIZE)) {
this.maxQueueSize = Integer.parseInt(properties.get(MAX_QUEUE_SIZE));
} else {
this.maxQueueSize = 0;
properties.put(MAX_QUEUE_SIZE, String.valueOf(maxQueueSize));
}
if (properties.containsKey(QUEUE_TIMEOUT)) {
this.queueTimeout = Integer.parseInt(properties.get(QUEUE_TIMEOUT));
} else {
this.queueTimeout = 0;
properties.put(QUEUE_TIMEOUT, String.valueOf(queueTimeout));
}
}
public QueryQueue getQueryQueue() {
return this.queryQueue;
}
// new resource group
public static ResourceGroup create(String name, Map<String, String> properties) throws DdlException {
checkProperties(properties);
ResourceGroup newResourceGroup = new ResourceGroup(Env.getCurrentEnv().getNextId(), name, properties);
newResourceGroup.initQueryQueue();
return newResourceGroup;
}
// alter resource group
public static ResourceGroup copyAndUpdate(ResourceGroup resourceGroup, Map<String, String> updateProperties)
throws DdlException {
Map<String, String> newProperties = new HashMap<>(resourceGroup.getProperties());
@ -100,8 +156,13 @@ public class ResourceGroup implements Writable {
}
checkProperties(newProperties);
return new ResourceGroup(
resourceGroup.getId(), resourceGroup.getName(), newProperties, resourceGroup.getVersion() + 1);
ResourceGroup newResourceGroup = new ResourceGroup(
resourceGroup.getId(), resourceGroup.getName(), newProperties, resourceGroup.getVersion() + 1);
// note(wb) query queue should be unique and can not be copy
newResourceGroup.resetQueryQueue(resourceGroup.getQueryQueue());
return newResourceGroup;
}
private static void checkProperties(Map<String, String> properties) throws DdlException {
@ -141,6 +202,35 @@ public class ResourceGroup implements Writable {
throw new DdlException("The value of '" + ENABLE_MEMORY_OVERCOMMIT + "' must be true or false.");
}
}
// check queue property
if (properties.containsKey(MAX_CONCURRENCY)) {
try {
if (Integer.parseInt(properties.get(MAX_CONCURRENCY)) < 0) {
throw new DdlException(MAX_CONCURRENCY + " requires a positive integer");
}
} catch (NumberFormatException e) {
throw new DdlException(MAX_CONCURRENCY + " requires a positive integer");
}
}
if (properties.containsKey(MAX_QUEUE_SIZE)) {
try {
if (Integer.parseInt(properties.get(MAX_QUEUE_SIZE)) < 0) {
throw new DdlException(MAX_QUEUE_SIZE + " requires a positive integer");
}
} catch (NumberFormatException e) {
throw new DdlException(MAX_QUEUE_SIZE + " requires a positive integer");
}
}
if (properties.containsKey(QUEUE_TIMEOUT)) {
try {
if (Integer.parseInt(properties.get(QUEUE_TIMEOUT)) < 0) {
throw new DdlException(QUEUE_TIMEOUT + " requires a positive integer");
}
} catch (NumberFormatException e) {
throw new DdlException(QUEUE_TIMEOUT + " requires a positive integer");
}
}
}
public long getId() {
@ -188,4 +278,9 @@ public class ResourceGroup implements Writable {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ResourceGroup.class);
}
@Override
public void gsonPostProcess() throws IOException {
this.initQueryQueue();
}
}

View File

@ -114,6 +114,19 @@ public class ResourceGroupMgr implements Writable, GsonPostProcessable {
return resourceGroups;
}
public QueryQueue getResourceGroupQueryQueue(String groupName) throws UserException {
readLock();
try {
ResourceGroup resourceGroup = nameToResourceGroup.get(groupName);
if (resourceGroup == null) {
throw new UserException("Resource group " + groupName + " does not exist");
}
return resourceGroup.getQueryQueue();
} finally {
readUnlock();
}
}
private void checkAndCreateDefaultGroup() {
ResourceGroup defaultResourceGroup = null;
writeLock();

View File

@ -37,7 +37,7 @@ public class ResourceGroupTest {
String name1 = "g1";
ResourceGroup group1 = ResourceGroup.create(name1, properties1);
Assert.assertEquals(name1, group1.getName());
Assert.assertEquals(2, group1.getProperties().size());
Assert.assertEquals(5, group1.getProperties().size());
Assert.assertTrue(group1.getProperties().containsKey(ResourceGroup.CPU_SHARE));
Assert.assertTrue(Math.abs(group1.getMemoryLimitPercent() - 30) < 1e-6);
}
@ -92,6 +92,6 @@ public class ResourceGroupTest {
BaseProcResult result = new BaseProcResult();
group1.getProcNodeData(result);
List<List<String>> rows = result.getRows();
Assert.assertEquals(2, rows.size());
Assert.assertEquals(5, rows.size());
}
}