A workload group's tag property may be three cases as below: 1 empty string, null or '', it could be published to all BE. 2 a value match some BE' location, then the workload group could only be published to the BE with same tag. 3 not an empty string, but some invalid string which can not math any BE's location, then it could not be published any BE. ## Proposed changes Issue Number: close #xxx <!--Describe your changes.-->
This commit is contained in:
@ -27,6 +27,7 @@ import org.apache.doris.thrift.BackendService;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPublishTopicRequest;
|
||||
import org.apache.doris.thrift.TTopicInfoType;
|
||||
import org.apache.doris.thrift.TWorkloadGroupInfo;
|
||||
import org.apache.doris.thrift.TopicInfo;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -35,8 +36,10 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
public class TopicPublisherThread extends MasterDaemon {
|
||||
@ -120,7 +123,30 @@ public class TopicPublisherThread extends MasterDaemon {
|
||||
try {
|
||||
address = new TNetworkAddress(be.getHost(), be.getBePort());
|
||||
client = ClientPool.backendPool.borrowObject(address);
|
||||
client.publishTopicInfo(request);
|
||||
// check whether workload group tag math current be
|
||||
TPublishTopicRequest copiedRequest = request.deepCopy();
|
||||
if (copiedRequest.isSetTopicMap()) {
|
||||
Map<TTopicInfoType, List<TopicInfo>> topicMap = copiedRequest.getTopicMap();
|
||||
List<TopicInfo> topicInfoList = topicMap.get(TTopicInfoType.WORKLOAD_GROUP);
|
||||
if (topicInfoList != null) {
|
||||
Set<String> beTagSet = be.getBeWorkloadGroupTagSet();
|
||||
Iterator<TopicInfo> topicIter = topicInfoList.iterator();
|
||||
while (topicIter.hasNext()) {
|
||||
TopicInfo topicInfo = topicIter.next();
|
||||
if (topicInfo.isSetWorkloadGroupInfo()) {
|
||||
TWorkloadGroupInfo tWgInfo = topicInfo.getWorkloadGroupInfo();
|
||||
if (tWgInfo.isSetTag() && !Backend.isMatchWorkloadGroupTag(
|
||||
tWgInfo.getTag(), beTagSet)) {
|
||||
// currently TopicInfo could not contain both policy and workload group,
|
||||
// so we can remove TopicInfo directly.
|
||||
topicIter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.publishTopicInfo(copiedRequest);
|
||||
ok = true;
|
||||
LOG.info("[topic_publish]publish topic info to be {} success, time cost={} ms, details:{}",
|
||||
be.getHost(), (System.currentTimeMillis() - beginTime), logStr);
|
||||
|
||||
@ -66,6 +66,8 @@ public class Tag implements Writable {
|
||||
public static final String VALUE_DEFAULT_TAG = "default";
|
||||
public static final String VALUE_INVALID_TAG = "invalid";
|
||||
|
||||
public static final String WORKLOAD_GROUP = "workload_group";
|
||||
|
||||
public static final ImmutableSet<String> RESERVED_TAG_TYPE = ImmutableSet.of(
|
||||
TYPE_ROLE, TYPE_FUNCTION, TYPE_LOCATION);
|
||||
public static final ImmutableSet<String> RESERVED_TAG_VALUES = ImmutableSet.of(
|
||||
|
||||
@ -18,8 +18,10 @@
|
||||
package org.apache.doris.resource.workloadgroup;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
@ -29,7 +31,6 @@ import org.apache.doris.thrift.TPipelineWorkloadGroup;
|
||||
import org.apache.doris.thrift.TWorkloadGroupInfo;
|
||||
import org.apache.doris.thrift.TopicInfo;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -184,9 +185,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
throws DdlException {
|
||||
Map<String, String> newProperties = new HashMap<>(currentWorkloadGroup.getProperties());
|
||||
for (Map.Entry<String, String> kv : updateProperties.entrySet()) {
|
||||
if (!Strings.isNullOrEmpty(kv.getValue())) {
|
||||
newProperties.put(kv.getKey(), kv.getValue());
|
||||
}
|
||||
newProperties.put(kv.getKey(), kv.getValue());
|
||||
}
|
||||
|
||||
checkProperties(newProperties);
|
||||
@ -382,6 +381,18 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
+ SPILL_THRESHOLD_LOW_WATERMARK + "(" + lowWaterMark + ")");
|
||||
}
|
||||
|
||||
String tagStr = properties.get(TAG);
|
||||
if (!StringUtils.isEmpty(tagStr)) {
|
||||
String[] tagArr = tagStr.split(",");
|
||||
for (String tag : tagArr) {
|
||||
try {
|
||||
FeNameFormat.checkCommonName("workload group tag name", tag);
|
||||
} catch (AnalysisException e) {
|
||||
throw new DdlException("workload group tag name format is illegal, " + tagStr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
@ -553,6 +564,11 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
|
||||
tWorkloadGroupInfo.setSpillThresholdHighWatermark(Integer.parseInt(spillHighWatermarkStr));
|
||||
}
|
||||
|
||||
String tagStr = properties.get(TAG);
|
||||
if (!StringUtils.isEmpty(tagStr)) {
|
||||
tWorkloadGroupInfo.setTag(tagStr);
|
||||
}
|
||||
|
||||
TopicInfo topicInfo = new TopicInfo();
|
||||
topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo);
|
||||
return topicInfo;
|
||||
|
||||
@ -385,6 +385,9 @@ public class WorkloadGroupMgr extends MasterDaemon implements Writable, GsonPost
|
||||
public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException {
|
||||
String workloadGroupName = stmt.getWorkloadGroupName();
|
||||
Map<String, String> properties = stmt.getProperties();
|
||||
if (properties.size() == 0) {
|
||||
throw new DdlException("alter workload group should contain at least one property");
|
||||
}
|
||||
WorkloadGroup newWorkloadGroup;
|
||||
writeLock();
|
||||
try {
|
||||
|
||||
@ -37,7 +37,9 @@ import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -45,9 +47,12 @@ import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
@ -834,4 +839,36 @@ public class Backend implements Writable {
|
||||
return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}";
|
||||
}
|
||||
|
||||
public Set<String> getBeWorkloadGroupTagSet() {
|
||||
Set<String> beTagSet = Sets.newHashSet();
|
||||
String beTagStr = this.tagMap.get(Tag.WORKLOAD_GROUP);
|
||||
if (StringUtils.isEmpty(beTagStr)) {
|
||||
return beTagSet;
|
||||
}
|
||||
|
||||
String[] beTagArr = beTagStr.split(",");
|
||||
for (String beTag : beTagArr) {
|
||||
beTagSet.add(beTag.trim());
|
||||
}
|
||||
|
||||
return beTagSet;
|
||||
}
|
||||
|
||||
public static boolean isMatchWorkloadGroupTag(String wgTagStr, Set<String> beTagSet) {
|
||||
if (StringUtils.isEmpty(wgTagStr)) {
|
||||
return true;
|
||||
}
|
||||
if (beTagSet.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String[] wgTagArr = wgTagStr.split(",");
|
||||
Set<String> wgTagSet = new HashSet<>();
|
||||
for (String wgTag : wgTagArr) {
|
||||
wgTagSet.add(wgTag.trim());
|
||||
}
|
||||
|
||||
return !Collections.disjoint(wgTagSet, beTagSet);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user