From 66cfcc67cb6ea6ddd316f97e05a8bf17eea04e66 Mon Sep 17 00:00:00 2001 From: wangbo Date: Sat, 2 Dec 2023 20:19:50 +0800 Subject: [PATCH] [Fix](exectuor)Fix Follower Fe query queue may not work when exec alter #27831 --- .../java/org/apache/doris/common/Config.java | 3 + .../java/org/apache/doris/catalog/Env.java | 2 + .../resource/workloadgroup/QueryQueue.java | 38 ++++++-- .../resource/workloadgroup/WorkloadGroup.java | 54 +++++------- .../workloadgroup/WorkloadGroupMgr.java | 86 +++++++++++++++++-- .../workloadgroup/WorkloadGroupTest.java | 2 +- .../workload_manager_p0/test_curd_wlg.groovy | 5 ++ 7 files changed, 143 insertions(+), 47 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 5c0ea31c9e..d1f47bfe6f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1618,6 +1618,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean enable_query_queue = true; + @ConfField(mutable = true) + public static long query_queue_update_interval_ms = 5000; + @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) public static boolean enable_cpu_hard_limit = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1f38b07d9f..473e80ae57 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -984,6 +984,8 @@ public class Env { TopicPublisher wgPublisher = new WorkloadGroupPublisher(this); topicPublisherThread.addToTopicPublisherList(wgPublisher); topicPublisherThread.start(); + + workloadGroupMgr.startUpdateThread(); } // wait until FE is ready. diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java index fd9ce458e2..5a5d4acdc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/QueryQueue.java @@ -44,6 +44,10 @@ public class QueryQueue { public static final String RUNNING_QUERY_NUM = "running_query_num"; public static final String WAITING_QUERY_NUM = "waiting_query_num"; + private long wgId; + + private long propVersion; + int getCurrentRunningQueryNum() { return currentRunningQueryNum; } @@ -52,16 +56,39 @@ public class QueryQueue { return currentWaitingQueryNum; } - public QueryQueue(int maxConcurrency, int maxQueueSize, int queueTimeout) { + long getPropVersion() { + return propVersion; + } + + long getWgId() { + return wgId; + } + + int getMaxConcurrency() { + return maxConcurrency; + } + + int getMaxQueueSize() { + return maxQueueSize; + } + + int getQueueTimeout() { + return queueTimeout; + } + + public QueryQueue(long wgId, int maxConcurrency, int maxQueueSize, int queueTimeout, long propVersion) { + this.wgId = wgId; this.maxConcurrency = maxConcurrency; this.maxQueueSize = maxQueueSize; this.queueTimeout = queueTimeout; + this.propVersion = propVersion; } public String debugString() { - return "maxConcurrency=" + maxConcurrency + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout - + ", currentRunningQueryNum=" + currentRunningQueryNum + ", currentWaitingQueryNum=" - + currentWaitingQueryNum; + return "wgId= " + wgId + ", version=" + this.propVersion + ",maxConcurrency=" + maxConcurrency + + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout + + ", currentRunningQueryNum=" + currentRunningQueryNum + + ", currentWaitingQueryNum=" + currentWaitingQueryNum; } public QueueOfferToken offer() throws InterruptedException { @@ -122,13 +149,14 @@ public class QueryQueue { } } - public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout) { + public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout, long version) { try { queueLock.tryLock(5, TimeUnit.SECONDS); try { this.maxConcurrency = maxConcurrency; this.maxQueueSize = maxQueueSize; this.queueTimeout = queryWaitTimeout; + this.propVersion = version; } finally { if (LOG.isDebugEnabled()) { LOG.debug(this.debugString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index a2e5c6005c..91a516b9c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -82,8 +82,6 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { private long version; private double memoryLimitPercent = 0; - - private QueryQueue queryQueue; private int maxConcurrency = Integer.MAX_VALUE; private int maxQueueSize = 0; private int queueTimeout = 0; @@ -115,20 +113,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { this.cpuHardLimit = Integer.parseInt(cpuHardLimitStr); this.properties.put(CPU_HARD_LIMIT, cpuHardLimitStr); } - } - - // called when first create a resource group, load from image or user new create a group - public void initQueryQueue() { resetQueueProperty(properties); - // if query queue property is not set, when use default value - this.queryQueue = new QueryQueue(maxConcurrency, maxQueueSize, queueTimeout); - } - - void resetQueryQueue(QueryQueue queryQueue) { - resetQueueProperty(properties); - this.queryQueue = queryQueue; - this.queryQueue.resetQueueProperty(this.maxConcurrency, this.maxQueueSize, this.queueTimeout); - } private void resetQueueProperty(Map properties) { @@ -152,22 +137,17 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } - public QueryQueue getQueryQueue() { - return this.queryQueue; - } - // new resource group public static WorkloadGroup create(String name, Map properties) throws DdlException { checkProperties(properties); WorkloadGroup newWorkloadGroup = new WorkloadGroup(Env.getCurrentEnv().getNextId(), name, properties); - newWorkloadGroup.initQueryQueue(); return newWorkloadGroup; } // alter resource group - public static WorkloadGroup copyAndUpdate(WorkloadGroup workloadGroup, Map updateProperties) + public static WorkloadGroup copyAndUpdate(WorkloadGroup currentWorkloadGroup, Map updateProperties) throws DdlException { - Map newProperties = new HashMap<>(workloadGroup.getProperties()); + Map newProperties = new HashMap<>(currentWorkloadGroup.getProperties()); for (Map.Entry kv : updateProperties.entrySet()) { if (!Strings.isNullOrEmpty(kv.getValue())) { newProperties.put(kv.getKey(), kv.getValue()); @@ -176,11 +156,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { checkProperties(newProperties); WorkloadGroup newWorkloadGroup = new WorkloadGroup( - workloadGroup.getId(), workloadGroup.getName(), newProperties, workloadGroup.getVersion() + 1); - - // note(wb) query queue should be unique and can not be copy - newWorkloadGroup.resetQueryQueue(workloadGroup.getQueryQueue()); - + currentWorkloadGroup.getId(), currentWorkloadGroup.getName(), + newProperties, currentWorkloadGroup.getVersion() + 1); return newWorkloadGroup; } @@ -273,7 +250,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { return properties; } - private long getVersion() { + public long getVersion() { return version; } @@ -281,7 +258,19 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { return memoryLimitPercent; } - public void getProcNodeData(BaseProcResult result) { + public int getMaxConcurrency() { + return maxConcurrency; + } + + public int getMaxQueueSize() { + return maxQueueSize; + } + + public int getQueueTimeout() { + return queueTimeout; + } + + public void getProcNodeData(BaseProcResult result, QueryQueue qq) { List row = new ArrayList<>(); row.add(String.valueOf(id)); row.add(name); @@ -305,8 +294,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { row.add(properties.get(key)); } } - row.add(String.valueOf(queryQueue.getCurrentRunningQueryNum())); - row.add(String.valueOf(queryQueue.getCurrentWaitingQueryNum())); + row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum())); + row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum())); result.addRow(row); } @@ -383,6 +372,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { if (properties.containsKey(CPU_HARD_LIMIT)) { this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT)); } - this.initQueryQueue(); + + this.resetQueueProperty(this.properties); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 290bfd548f..4bb30b118d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -54,6 +54,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -75,9 +76,60 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { @SerializedName(value = "idToWorkloadGroup") private final Map idToWorkloadGroup = Maps.newHashMap(); private final Map nameToWorkloadGroup = Maps.newHashMap(); + private final Map idToQueryQueue = Maps.newHashMap(); private final ResourceProcNode procNode = new ResourceProcNode(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private Thread updatePropThread; + + public void startUpdateThread() { + WorkloadGroupMgr wgMgr = this; + updatePropThread = new Thread(new Runnable() { + public void run() { + while (true) { + try { + wgMgr.resetQueryQueueProp(); + Thread.sleep(Config.query_queue_update_interval_ms); + } catch (Throwable e) { + LOG.warn("reset query queue failed ", e); + } + } + } + }); + updatePropThread.start(); + } + + public void resetQueryQueueProp() { + List newPropList = new ArrayList<>(); + Map currentQueueCopyMap = new HashMap<>(); + readLock(); + try { + for (Map.Entry entry : idToWorkloadGroup.entrySet()) { + WorkloadGroup wg = entry.getValue(); + QueryQueue tmpQ = new QueryQueue(wg.getId(), wg.getMaxConcurrency(), + wg.getMaxQueueSize(), wg.getQueueTimeout(), wg.getVersion()); + newPropList.add(tmpQ); + } + for (Map.Entry entry : idToQueryQueue.entrySet()) { + currentQueueCopyMap.put(entry.getKey(), entry.getValue()); + } + } finally { + readUnlock(); + } + + for (QueryQueue newPropQq : newPropList) { + QueryQueue currentQueryQueue = currentQueueCopyMap.get(newPropQq.getWgId()); + if (currentQueryQueue == null) { + continue; + } + if (newPropQq.getPropVersion() > currentQueryQueue.getPropVersion()) { + currentQueryQueue.resetQueueProperty(newPropQq.getMaxConcurrency(), newPropQq.getMaxQueueSize(), + newPropQq.getQueueTimeout(), newPropQq.getPropVersion()); + } + LOG.debug(currentQueryQueue.debugString()); // for test debug + } + } + public WorkloadGroupMgr() { } @@ -132,6 +184,15 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { return workloadGroups; } + public WorkloadGroup getWorkloadGroupById(long wgId) { + readLock(); + try { + return idToWorkloadGroup.get(wgId); + } finally { + readUnlock(); + } + } + public List getPublishTopicInfo() { List workloadGroups = new ArrayList(); readLock(); @@ -147,15 +208,21 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { public QueryQueue getWorkloadGroupQueryQueue(ConnectContext context) throws UserException { String groupName = getWorkloadGroupNameAndCheckPriv(context); - readLock(); + writeLock(); try { - WorkloadGroup workloadGroup = nameToWorkloadGroup.get(groupName); - if (workloadGroup == null) { + WorkloadGroup wg = nameToWorkloadGroup.get(groupName); + if (wg == null) { throw new UserException("Workload group " + groupName + " does not exist"); } - return workloadGroup.getQueryQueue(); + QueryQueue queryQueue = idToQueryQueue.get(wg.getId()); + if (queryQueue == null) { + queryQueue = new QueryQueue(wg.getId(), wg.getMaxConcurrency(), wg.getMaxQueueSize(), + wg.getQueueTimeout(), wg.getVersion()); + idToQueryQueue.put(wg.getId(), queryQueue); + } + return queryQueue; } finally { - readUnlock(); + writeUnlock(); } } @@ -267,9 +334,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { if (!nameToWorkloadGroup.containsKey(workloadGroupName)) { throw new DdlException("workload group(" + workloadGroupName + ") does not exist."); } - WorkloadGroup workloadGroup = nameToWorkloadGroup.get(workloadGroupName); - newWorkloadGroup = WorkloadGroup.copyAndUpdate(workloadGroup, properties); - checkGlobalUnlock(newWorkloadGroup, workloadGroup); + WorkloadGroup currentWorkloadGroup = nameToWorkloadGroup.get(workloadGroupName); + newWorkloadGroup = WorkloadGroup.copyAndUpdate(currentWorkloadGroup, properties); + checkGlobalUnlock(newWorkloadGroup, currentWorkloadGroup); nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup); idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup); Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup); @@ -306,6 +373,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { long groupId = workloadGroup.getId(); idToWorkloadGroup.remove(groupId); nameToWorkloadGroup.remove(workloadGroupName); + idToQueryQueue.remove(groupId); Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new DropWorkloadGroupOperationLog(groupId)); } finally { writeUnlock(); @@ -398,7 +466,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { workloadGroup.getName(), PrivPredicate.SHOW_WORKLOAD_GROUP)) { continue; } - workloadGroup.getProcNodeData(result); + workloadGroup.getProcNodeData(result, idToQueryQueue.get(workloadGroup.getId())); } } finally { readUnlock(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java index 1acb8d1afa..f99fd4f352 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupTest.java @@ -83,7 +83,7 @@ public class WorkloadGroupTest { WorkloadGroup group1 = WorkloadGroup.create(name1, properties1); BaseProcResult result = new BaseProcResult(); - group1.getProcNodeData(result); + group1.getProcNodeData(result, null); List> rows = result.getRows(); Assert.assertEquals(1, rows.size()); } diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index debc444276..9e50473b21 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -259,9 +259,11 @@ suite("test_crud_wlg") { } // test query queue limit + sql "ADMIN SET FRONTEND CONFIG ('query_queue_update_interval_ms' = '1000');" sql "set workload_group=test_group;" sql "alter workload group test_group properties ( 'max_concurrency'='0' );" sql "alter workload group test_group properties ( 'max_queue_size'='0' );" + Thread.sleep(3000); try { sql "select 1;" } catch (Exception e) { @@ -270,6 +272,7 @@ suite("test_crud_wlg") { sql "alter workload group test_group properties ( 'max_queue_size'='1' );" sql "alter workload group test_group properties ( 'queue_timeout'='500' );" + Thread.sleep(3000); try { sql "select 1;" } catch (Exception e) { @@ -277,7 +280,9 @@ suite("test_crud_wlg") { } sql "alter workload group test_group properties ( 'max_concurrency'='10' );" + Thread.sleep(3000); sql "select 1;" sql "set workload_group=normal;" sql "drop workload group test_group;" + sql "ADMIN SET FRONTEND CONFIG ('query_queue_update_interval_ms' = '5000');" }