[Fix](exectuor)Fix Follower Fe query queue may not work when exec alter #27831
This commit is contained in:
@ -1618,6 +1618,9 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true)
|
||||
public static boolean enable_query_queue = true;
|
||||
|
||||
@ConfField(mutable = true)
|
||||
public static long query_queue_update_interval_ms = 5000;
|
||||
|
||||
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
|
||||
public static boolean enable_cpu_hard_limit = false;
|
||||
|
||||
|
||||
@ -984,6 +984,8 @@ public class Env {
|
||||
TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
|
||||
topicPublisherThread.addToTopicPublisherList(wgPublisher);
|
||||
topicPublisherThread.start();
|
||||
|
||||
workloadGroupMgr.startUpdateThread();
|
||||
}
|
||||
|
||||
// wait until FE is ready.
|
||||
|
||||
@ -44,6 +44,10 @@ public class QueryQueue {
|
||||
public static final String RUNNING_QUERY_NUM = "running_query_num";
|
||||
public static final String WAITING_QUERY_NUM = "waiting_query_num";
|
||||
|
||||
private long wgId;
|
||||
|
||||
private long propVersion;
|
||||
|
||||
int getCurrentRunningQueryNum() {
|
||||
return currentRunningQueryNum;
|
||||
}
|
||||
@ -52,16 +56,39 @@ public class QueryQueue {
|
||||
return currentWaitingQueryNum;
|
||||
}
|
||||
|
||||
public QueryQueue(int maxConcurrency, int maxQueueSize, int queueTimeout) {
|
||||
long getPropVersion() {
|
||||
return propVersion;
|
||||
}
|
||||
|
||||
long getWgId() {
|
||||
return wgId;
|
||||
}
|
||||
|
||||
int getMaxConcurrency() {
|
||||
return maxConcurrency;
|
||||
}
|
||||
|
||||
int getMaxQueueSize() {
|
||||
return maxQueueSize;
|
||||
}
|
||||
|
||||
int getQueueTimeout() {
|
||||
return queueTimeout;
|
||||
}
|
||||
|
||||
public QueryQueue(long wgId, int maxConcurrency, int maxQueueSize, int queueTimeout, long propVersion) {
|
||||
this.wgId = wgId;
|
||||
this.maxConcurrency = maxConcurrency;
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
this.queueTimeout = queueTimeout;
|
||||
this.propVersion = propVersion;
|
||||
}
|
||||
|
||||
public String debugString() {
|
||||
return "maxConcurrency=" + maxConcurrency + ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout
|
||||
+ ", currentRunningQueryNum=" + currentRunningQueryNum + ", currentWaitingQueryNum="
|
||||
+ currentWaitingQueryNum;
|
||||
return "wgId= " + wgId + ", version=" + this.propVersion + ",maxConcurrency=" + maxConcurrency
|
||||
+ ", maxQueueSize=" + maxQueueSize + ", queueTimeout=" + queueTimeout
|
||||
+ ", currentRunningQueryNum=" + currentRunningQueryNum
|
||||
+ ", currentWaitingQueryNum=" + currentWaitingQueryNum;
|
||||
}
|
||||
|
||||
public QueueOfferToken offer() throws InterruptedException {
|
||||
@ -122,13 +149,14 @@ public class QueryQueue {
|
||||
}
|
||||
}
|
||||
|
||||
public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout) {
|
||||
public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWaitTimeout, long version) {
|
||||
try {
|
||||
queueLock.tryLock(5, TimeUnit.SECONDS);
|
||||
try {
|
||||
this.maxConcurrency = maxConcurrency;
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
this.queueTimeout = queryWaitTimeout;
|
||||
this.propVersion = version;
|
||||
} finally {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(this.debugString());
|
||||
|
||||
@ -82,8 +82,6 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
private long version;
|
||||
|
||||
private double memoryLimitPercent = 0;
|
||||
|
||||
private QueryQueue queryQueue;
|
||||
private int maxConcurrency = Integer.MAX_VALUE;
|
||||
private int maxQueueSize = 0;
|
||||
private int queueTimeout = 0;
|
||||
@ -115,20 +113,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
this.cpuHardLimit = Integer.parseInt(cpuHardLimitStr);
|
||||
this.properties.put(CPU_HARD_LIMIT, cpuHardLimitStr);
|
||||
}
|
||||
}
|
||||
|
||||
// called when first create a resource group, load from image or user new create a group
|
||||
public void initQueryQueue() {
|
||||
resetQueueProperty(properties);
|
||||
// if query queue property is not set, when use default value
|
||||
this.queryQueue = new QueryQueue(maxConcurrency, maxQueueSize, queueTimeout);
|
||||
}
|
||||
|
||||
void resetQueryQueue(QueryQueue queryQueue) {
|
||||
resetQueueProperty(properties);
|
||||
this.queryQueue = queryQueue;
|
||||
this.queryQueue.resetQueueProperty(this.maxConcurrency, this.maxQueueSize, this.queueTimeout);
|
||||
|
||||
}
|
||||
|
||||
private void resetQueueProperty(Map<String, String> properties) {
|
||||
@ -152,22 +137,17 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
}
|
||||
}
|
||||
|
||||
public QueryQueue getQueryQueue() {
|
||||
return this.queryQueue;
|
||||
}
|
||||
|
||||
// new resource group
|
||||
public static WorkloadGroup create(String name, Map<String, String> properties) throws DdlException {
|
||||
checkProperties(properties);
|
||||
WorkloadGroup newWorkloadGroup = new WorkloadGroup(Env.getCurrentEnv().getNextId(), name, properties);
|
||||
newWorkloadGroup.initQueryQueue();
|
||||
return newWorkloadGroup;
|
||||
}
|
||||
|
||||
// alter resource group
|
||||
public static WorkloadGroup copyAndUpdate(WorkloadGroup workloadGroup, Map<String, String> updateProperties)
|
||||
public static WorkloadGroup copyAndUpdate(WorkloadGroup currentWorkloadGroup, Map<String, String> updateProperties)
|
||||
throws DdlException {
|
||||
Map<String, String> newProperties = new HashMap<>(workloadGroup.getProperties());
|
||||
Map<String, String> newProperties = new HashMap<>(currentWorkloadGroup.getProperties());
|
||||
for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
|
||||
if (!Strings.isNullOrEmpty(kv.getValue())) {
|
||||
newProperties.put(kv.getKey(), kv.getValue());
|
||||
@ -176,11 +156,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
|
||||
checkProperties(newProperties);
|
||||
WorkloadGroup newWorkloadGroup = new WorkloadGroup(
|
||||
workloadGroup.getId(), workloadGroup.getName(), newProperties, workloadGroup.getVersion() + 1);
|
||||
|
||||
// note(wb) query queue should be unique and can not be copy
|
||||
newWorkloadGroup.resetQueryQueue(workloadGroup.getQueryQueue());
|
||||
|
||||
currentWorkloadGroup.getId(), currentWorkloadGroup.getName(),
|
||||
newProperties, currentWorkloadGroup.getVersion() + 1);
|
||||
return newWorkloadGroup;
|
||||
}
|
||||
|
||||
@ -273,7 +250,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
return properties;
|
||||
}
|
||||
|
||||
private long getVersion() {
|
||||
public long getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
@ -281,7 +258,19 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
return memoryLimitPercent;
|
||||
}
|
||||
|
||||
public void getProcNodeData(BaseProcResult result) {
|
||||
public int getMaxConcurrency() {
|
||||
return maxConcurrency;
|
||||
}
|
||||
|
||||
public int getMaxQueueSize() {
|
||||
return maxQueueSize;
|
||||
}
|
||||
|
||||
public int getQueueTimeout() {
|
||||
return queueTimeout;
|
||||
}
|
||||
|
||||
public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
|
||||
List<String> row = new ArrayList<>();
|
||||
row.add(String.valueOf(id));
|
||||
row.add(name);
|
||||
@ -305,8 +294,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
row.add(properties.get(key));
|
||||
}
|
||||
}
|
||||
row.add(String.valueOf(queryQueue.getCurrentRunningQueryNum()));
|
||||
row.add(String.valueOf(queryQueue.getCurrentWaitingQueryNum()));
|
||||
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum()));
|
||||
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum()));
|
||||
result.addRow(row);
|
||||
}
|
||||
|
||||
@ -383,6 +372,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
if (properties.containsKey(CPU_HARD_LIMIT)) {
|
||||
this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT));
|
||||
}
|
||||
this.initQueryQueue();
|
||||
|
||||
this.resetQueueProperty(this.properties);
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,6 +54,7 @@ import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
@ -75,9 +76,60 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
|
||||
@SerializedName(value = "idToWorkloadGroup")
|
||||
private final Map<Long, WorkloadGroup> idToWorkloadGroup = Maps.newHashMap();
|
||||
private final Map<String, WorkloadGroup> nameToWorkloadGroup = Maps.newHashMap();
|
||||
private final Map<Long, QueryQueue> idToQueryQueue = Maps.newHashMap();
|
||||
private final ResourceProcNode procNode = new ResourceProcNode();
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
private Thread updatePropThread;
|
||||
|
||||
public void startUpdateThread() {
|
||||
WorkloadGroupMgr wgMgr = this;
|
||||
updatePropThread = new Thread(new Runnable() {
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
wgMgr.resetQueryQueueProp();
|
||||
Thread.sleep(Config.query_queue_update_interval_ms);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("reset query queue failed ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
updatePropThread.start();
|
||||
}
|
||||
|
||||
public void resetQueryQueueProp() {
|
||||
List<QueryQueue> newPropList = new ArrayList<>();
|
||||
Map<Long, QueryQueue> currentQueueCopyMap = new HashMap<>();
|
||||
readLock();
|
||||
try {
|
||||
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
|
||||
WorkloadGroup wg = entry.getValue();
|
||||
QueryQueue tmpQ = new QueryQueue(wg.getId(), wg.getMaxConcurrency(),
|
||||
wg.getMaxQueueSize(), wg.getQueueTimeout(), wg.getVersion());
|
||||
newPropList.add(tmpQ);
|
||||
}
|
||||
for (Map.Entry<Long, QueryQueue> entry : idToQueryQueue.entrySet()) {
|
||||
currentQueueCopyMap.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
for (QueryQueue newPropQq : newPropList) {
|
||||
QueryQueue currentQueryQueue = currentQueueCopyMap.get(newPropQq.getWgId());
|
||||
if (currentQueryQueue == null) {
|
||||
continue;
|
||||
}
|
||||
if (newPropQq.getPropVersion() > currentQueryQueue.getPropVersion()) {
|
||||
currentQueryQueue.resetQueueProperty(newPropQq.getMaxConcurrency(), newPropQq.getMaxQueueSize(),
|
||||
newPropQq.getQueueTimeout(), newPropQq.getPropVersion());
|
||||
}
|
||||
LOG.debug(currentQueryQueue.debugString()); // for test debug
|
||||
}
|
||||
}
|
||||
|
||||
public WorkloadGroupMgr() {
|
||||
}
|
||||
|
||||
@ -132,6 +184,15 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
|
||||
return workloadGroups;
|
||||
}
|
||||
|
||||
public WorkloadGroup getWorkloadGroupById(long wgId) {
|
||||
readLock();
|
||||
try {
|
||||
return idToWorkloadGroup.get(wgId);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public List<TopicInfo> getPublishTopicInfo() {
|
||||
List<TopicInfo> workloadGroups = new ArrayList();
|
||||
readLock();
|
||||
@ -147,15 +208,21 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
|
||||
|
||||
public QueryQueue getWorkloadGroupQueryQueue(ConnectContext context) throws UserException {
|
||||
String groupName = getWorkloadGroupNameAndCheckPriv(context);
|
||||
readLock();
|
||||
writeLock();
|
||||
try {
|
||||
WorkloadGroup workloadGroup = nameToWorkloadGroup.get(groupName);
|
||||
if (workloadGroup == null) {
|
||||
WorkloadGroup wg = nameToWorkloadGroup.get(groupName);
|
||||
if (wg == null) {
|
||||
throw new UserException("Workload group " + groupName + " does not exist");
|
||||
}
|
||||
return workloadGroup.getQueryQueue();
|
||||
QueryQueue queryQueue = idToQueryQueue.get(wg.getId());
|
||||
if (queryQueue == null) {
|
||||
queryQueue = new QueryQueue(wg.getId(), wg.getMaxConcurrency(), wg.getMaxQueueSize(),
|
||||
wg.getQueueTimeout(), wg.getVersion());
|
||||
idToQueryQueue.put(wg.getId(), queryQueue);
|
||||
}
|
||||
return queryQueue;
|
||||
} finally {
|
||||
readUnlock();
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -267,9 +334,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
|
||||
if (!nameToWorkloadGroup.containsKey(workloadGroupName)) {
|
||||
throw new DdlException("workload group(" + workloadGroupName + ") does not exist.");
|
||||
}
|
||||
WorkloadGroup workloadGroup = nameToWorkloadGroup.get(workloadGroupName);
|
||||
newWorkloadGroup = WorkloadGroup.copyAndUpdate(workloadGroup, properties);
|
||||
checkGlobalUnlock(newWorkloadGroup, workloadGroup);
|
||||
WorkloadGroup currentWorkloadGroup = nameToWorkloadGroup.get(workloadGroupName);
|
||||
newWorkloadGroup = WorkloadGroup.copyAndUpdate(currentWorkloadGroup, properties);
|
||||
checkGlobalUnlock(newWorkloadGroup, currentWorkloadGroup);
|
||||
nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup);
|
||||
idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup);
|
||||
Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup);
|
||||
@ -306,6 +373,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
|
||||
long groupId = workloadGroup.getId();
|
||||
idToWorkloadGroup.remove(groupId);
|
||||
nameToWorkloadGroup.remove(workloadGroupName);
|
||||
idToQueryQueue.remove(groupId);
|
||||
Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new DropWorkloadGroupOperationLog(groupId));
|
||||
} finally {
|
||||
writeUnlock();
|
||||
@ -398,7 +466,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
|
||||
workloadGroup.getName(), PrivPredicate.SHOW_WORKLOAD_GROUP)) {
|
||||
continue;
|
||||
}
|
||||
workloadGroup.getProcNodeData(result);
|
||||
workloadGroup.getProcNodeData(result, idToQueryQueue.get(workloadGroup.getId()));
|
||||
}
|
||||
} finally {
|
||||
readUnlock();
|
||||
|
||||
@ -83,7 +83,7 @@ public class WorkloadGroupTest {
|
||||
WorkloadGroup group1 = WorkloadGroup.create(name1, properties1);
|
||||
|
||||
BaseProcResult result = new BaseProcResult();
|
||||
group1.getProcNodeData(result);
|
||||
group1.getProcNodeData(result, null);
|
||||
List<List<String>> rows = result.getRows();
|
||||
Assert.assertEquals(1, rows.size());
|
||||
}
|
||||
|
||||
@ -259,9 +259,11 @@ suite("test_crud_wlg") {
|
||||
}
|
||||
|
||||
// test query queue limit
|
||||
sql "ADMIN SET FRONTEND CONFIG ('query_queue_update_interval_ms' = '1000');"
|
||||
sql "set workload_group=test_group;"
|
||||
sql "alter workload group test_group properties ( 'max_concurrency'='0' );"
|
||||
sql "alter workload group test_group properties ( 'max_queue_size'='0' );"
|
||||
Thread.sleep(3000);
|
||||
try {
|
||||
sql "select 1;"
|
||||
} catch (Exception e) {
|
||||
@ -270,6 +272,7 @@ suite("test_crud_wlg") {
|
||||
|
||||
sql "alter workload group test_group properties ( 'max_queue_size'='1' );"
|
||||
sql "alter workload group test_group properties ( 'queue_timeout'='500' );"
|
||||
Thread.sleep(3000);
|
||||
try {
|
||||
sql "select 1;"
|
||||
} catch (Exception e) {
|
||||
@ -277,7 +280,9 @@ suite("test_crud_wlg") {
|
||||
}
|
||||
|
||||
sql "alter workload group test_group properties ( 'max_concurrency'='10' );"
|
||||
Thread.sleep(3000);
|
||||
sql "select 1;"
|
||||
sql "set workload_group=normal;"
|
||||
sql "drop workload group test_group;"
|
||||
sql "ADMIN SET FRONTEND CONFIG ('query_queue_update_interval_ms' = '5000');"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user