[feature](resource-tag) support multi tag for a single Backend (#10901)
This commit is contained in:
@ -39,10 +39,6 @@ grammar:
|
||||
```sql
|
||||
-- Add nodes (add this method if you do not use the multi-tenancy function)
|
||||
ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
|
||||
-- Add idle nodes (that is, add BACKEND that does not belong to any cluster)
|
||||
ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
|
||||
-- Add nodes to a cluster
|
||||
ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...];
|
||||
````
|
||||
|
||||
illustrate:
|
||||
@ -59,12 +55,6 @@ grammar:
|
||||
ALTER SYSTEM ADD BACKEND "host:port";
|
||||
````
|
||||
|
||||
1. Add an idle node
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM ADD FREE BACKEND "host:port";
|
||||
````
|
||||
|
||||
### Keywords
|
||||
|
||||
ALTER, SYSTEM, ADD, BACKEND, ALTER SYSTEM
|
||||
|
||||
@ -47,16 +47,20 @@ ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
|
||||
2. heartbeat_port is the heartbeat port of the node
|
||||
3. Modify BE node properties The following properties are currently supported:
|
||||
|
||||
- tag.location: resource tag
|
||||
- tag.xxxx: resource tag
|
||||
- disable_query: query disable attribute
|
||||
- disable_load: import disable attribute
|
||||
|
||||
Note:
|
||||
1. A backend can be set multi resource tags. But must contain "tag.location" type.
|
||||
|
||||
### Example
|
||||
|
||||
1. Modify the resource tag of BE
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a");
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a", "tag.compute" = "c1");
|
||||
````
|
||||
|
||||
2. Modify the query disable property of BE
|
||||
|
||||
@ -37,18 +37,14 @@ ALTER SYSTEM ADD BACKEND
|
||||
语法:
|
||||
|
||||
```sql
|
||||
1) 增加节点(不使用多租户功能则按照此方法添加)
|
||||
1) 增加节点
|
||||
ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
|
||||
2) 增加空闲节点(即添加不属于任何cluster的BACKEND)
|
||||
ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
|
||||
3) 增加节点到某个cluster
|
||||
ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...];
|
||||
```
|
||||
|
||||
说明:
|
||||
|
||||
1. host 可以是主机名或者ip地址
|
||||
2. heartbeat_port 为该节点的心跳端口
|
||||
2. heartbeat_port 为该节点的心跳端口
|
||||
3. 增加和删除节点为同步操作。这两种操作不考虑节点上已有的数据,节点直接从元数据中删除,请谨慎使用。
|
||||
|
||||
### Example
|
||||
@ -59,12 +55,6 @@ ALTER SYSTEM ADD BACKEND
|
||||
ALTER SYSTEM ADD BACKEND "host:port";
|
||||
```
|
||||
|
||||
1. 增加一个空闲节点
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM ADD FREE BACKEND "host:port";
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
ALTER, SYSTEM, ADD, BACKEND, ALTER SYSTEM
|
||||
|
||||
@ -47,16 +47,20 @@ ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
|
||||
2. heartbeat_port 为该节点的心跳端口
|
||||
3. 修改 BE 节点属性目前支持以下属性:
|
||||
|
||||
- tag.location:资源标签
|
||||
- tag.xxx:资源标签
|
||||
- disable_query: 查询禁用属性
|
||||
- disable_load: 导入禁用属性
|
||||
|
||||
注:
|
||||
1. 可以给一个 Backend 设置多种资源标签。但必须包含 "tag.location"。
|
||||
|
||||
### Example
|
||||
|
||||
1. 修改 BE 的资源标签
|
||||
|
||||
```sql
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a");
|
||||
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a", "tag.compute" = "c1");
|
||||
```
|
||||
|
||||
2. 修改 BE 的查询禁用属性
|
||||
|
||||
@ -126,8 +126,8 @@ public class SystemHandler extends AlterHandler {
|
||||
&& Catalog.getCurrentCatalog().getCluster(destClusterName) == null) {
|
||||
throw new DdlException("Cluster: " + destClusterName + " does not exist.");
|
||||
}
|
||||
Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(),
|
||||
addBackendClause.isFree(), addBackendClause.getDestCluster(), addBackendClause.getTag());
|
||||
Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), addBackendClause.isFree(),
|
||||
addBackendClause.getDestCluster(), addBackendClause.getTagMap());
|
||||
} else if (alterClause instanceof DropBackendClause) {
|
||||
// drop backend
|
||||
DropBackendClause dropBackendClause = (DropBackendClause) alterClause;
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.resource.Tag;
|
||||
|
||||
@ -33,7 +34,7 @@ public class AddBackendClause extends BackendClause {
|
||||
// cluster that backend will be added to
|
||||
protected String destCluster;
|
||||
protected Map<String, String> properties = Maps.newHashMap();
|
||||
private Tag tag;
|
||||
private Map<String, String> tagMap;
|
||||
|
||||
public AddBackendClause(List<String> hostPorts) {
|
||||
super(hostPorts);
|
||||
@ -57,14 +58,20 @@ public class AddBackendClause extends BackendClause {
|
||||
this.destCluster = destCluster;
|
||||
}
|
||||
|
||||
public Tag getTag() {
|
||||
return tag;
|
||||
public Map<String, String> getTagMap() {
|
||||
return tagMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
super.analyze(analyzer);
|
||||
tag = PropertyAnalyzer.analyzeBackendTagProperties(properties, Tag.DEFAULT_BACKEND_TAG);
|
||||
tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties, Tag.DEFAULT_BACKEND_TAG);
|
||||
if (!tagMap.containsKey(Tag.TYPE_LOCATION)) {
|
||||
throw new AnalysisException(NEED_LOCATION_TAG_MSG);
|
||||
}
|
||||
if (!Config.enable_multi_tags && tagMap.size() > 1) {
|
||||
throw new AnalysisException(MUTLI_TAG_DISABLED_MSG);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -33,6 +33,11 @@ public class BackendClause extends AlterClause {
|
||||
protected List<String> hostPorts;
|
||||
protected List<Pair<String, Integer>> hostPortPairs;
|
||||
|
||||
public static final String MUTLI_TAG_DISABLED_MSG = "Not support multi tags for Backend now. "
|
||||
+ "You can set 'enable_multi_tags=true' in fe.conf to enable this feature.";
|
||||
public static final String NEED_LOCATION_TAG_MSG
|
||||
= "Backend must have location type tag. Eg: 'tag.location' = 'xxx'.";
|
||||
|
||||
protected BackendClause(List<String> hostPorts) {
|
||||
super(AlterOpType.ALTER_OTHER);
|
||||
this.hostPorts = hostPorts;
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.resource.Tag;
|
||||
|
||||
@ -30,7 +31,7 @@ import java.util.Map;
|
||||
public class ModifyBackendClause extends BackendClause {
|
||||
protected Map<String, String> properties = Maps.newHashMap();
|
||||
protected Map<String, String> analyzedProperties = Maps.newHashMap();
|
||||
private Tag tag = null;
|
||||
private Map<String, String> tagMap = null;
|
||||
private Boolean isQueryDisabled = null;
|
||||
private Boolean isLoadDisabled = null;
|
||||
|
||||
@ -42,13 +43,24 @@ public class ModifyBackendClause extends BackendClause {
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
super.analyze(analyzer);
|
||||
tag = PropertyAnalyzer.analyzeBackendTagProperties(properties, null);
|
||||
tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties, null);
|
||||
isQueryDisabled = PropertyAnalyzer.analyzeBackendDisableProperties(properties,
|
||||
PropertyAnalyzer.PROPERTIES_DISABLE_QUERY, null);
|
||||
isLoadDisabled = PropertyAnalyzer.analyzeBackendDisableProperties(properties,
|
||||
PropertyAnalyzer.PROPERTIES_DISABLE_LOAD, null);
|
||||
if (tag != null) {
|
||||
analyzedProperties.put(tag.type, tag.value);
|
||||
if (!tagMap.isEmpty()) {
|
||||
if (!tagMap.containsKey(Tag.TYPE_LOCATION)) {
|
||||
throw new AnalysisException(NEED_LOCATION_TAG_MSG);
|
||||
}
|
||||
if (!Config.enable_multi_tags && tagMap.size() > 1) {
|
||||
throw new AnalysisException(MUTLI_TAG_DISABLED_MSG);
|
||||
}
|
||||
// TODO:
|
||||
// here we can add some privilege check so that only authorized user can modify specified type of tag.
|
||||
// For example, only root user can set tag with type 'computation'
|
||||
for (Map.Entry<String, String> entry : tagMap.entrySet()) {
|
||||
analyzedProperties.put("tag." + entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
if (isQueryDisabled != null) {
|
||||
analyzedProperties.put(PropertyAnalyzer.PROPERTIES_DISABLE_QUERY, String.valueOf(isQueryDisabled));
|
||||
@ -57,13 +69,13 @@ public class ModifyBackendClause extends BackendClause {
|
||||
analyzedProperties.put(PropertyAnalyzer.PROPERTIES_DISABLE_LOAD, String.valueOf(isLoadDisabled));
|
||||
}
|
||||
if (!properties.isEmpty()) {
|
||||
throw new AnalysisException("unknown properties setting for key ("
|
||||
+ StringUtils.join(properties.keySet(), ",") + ")");
|
||||
throw new AnalysisException(
|
||||
"unknown properties setting for key (" + StringUtils.join(properties.keySet(), ",") + ")");
|
||||
}
|
||||
}
|
||||
|
||||
public Tag getTag() {
|
||||
return tag;
|
||||
public Map<String, String> getTagMap() {
|
||||
return tagMap;
|
||||
}
|
||||
|
||||
public Boolean isQueryDisabled() {
|
||||
|
||||
@ -1445,11 +1445,11 @@ public class OlapTable extends Table {
|
||||
if (be == null) {
|
||||
continue;
|
||||
}
|
||||
short num = currentReplicaAlloc.getOrDefault(be.getTag(), (short) 0);
|
||||
currentReplicaAlloc.put(be.getTag(), (short) (num + 1));
|
||||
List<Long> beIds = tag2beIds.getOrDefault(be.getTag(), Lists.newArrayList());
|
||||
short num = currentReplicaAlloc.getOrDefault(be.getLocationTag(), (short) 0);
|
||||
currentReplicaAlloc.put(be.getLocationTag(), (short) (num + 1));
|
||||
List<Long> beIds = tag2beIds.getOrDefault(be.getLocationTag(), Lists.newArrayList());
|
||||
beIds.add(beId);
|
||||
tag2beIds.put(be.getTag(), beIds);
|
||||
tag2beIds.put(be.getLocationTag(), beIds);
|
||||
}
|
||||
if (!currentReplicaAlloc.equals(replicaAlloc.getAllocMap())) {
|
||||
throw new DdlException("The relica allocation is " + currentReplicaAlloc.toString()
|
||||
@ -1840,8 +1840,8 @@ public class OlapTable extends Table {
|
||||
if (be == null) {
|
||||
continue;
|
||||
}
|
||||
short num = curMap.getOrDefault(be.getTag(), (short) 0);
|
||||
curMap.put(be.getTag(), (short) (num + 1));
|
||||
short num = curMap.getOrDefault(be.getLocationTag(), (short) 0);
|
||||
curMap.put(be.getLocationTag(), (short) (num + 1));
|
||||
}
|
||||
if (!curMap.equals(allocMap)) {
|
||||
throw new UserException("replica allocation of tablet " + tablet.getId() + " is not expected"
|
||||
|
||||
@ -463,8 +463,8 @@ public class Tablet extends MetaObject implements Writable {
|
||||
|
||||
versions.add(replica.getVersionCount());
|
||||
|
||||
short curNum = currentAllocMap.getOrDefault(backend.getTag(), (short) 0);
|
||||
currentAllocMap.put(backend.getTag(), (short) (curNum + 1));
|
||||
short curNum = currentAllocMap.getOrDefault(backend.getLocationTag(), (short) 0);
|
||||
currentAllocMap.put(backend.getLocationTag(), (short) (curNum + 1));
|
||||
}
|
||||
|
||||
// 1. alive replicas are not enough
|
||||
|
||||
@ -90,8 +90,8 @@ public class ClusterLoadStatistic {
|
||||
// So balance will be blocked.
|
||||
continue;
|
||||
}
|
||||
BackendLoadStatistic beStatistic = new BackendLoadStatistic(backend.getId(),
|
||||
backend.getOwnerClusterName(), backend.getTag(), infoService, invertedIndex);
|
||||
BackendLoadStatistic beStatistic = new BackendLoadStatistic(backend.getId(), backend.getOwnerClusterName(),
|
||||
backend.getLocationTag(), infoService, invertedIndex);
|
||||
try {
|
||||
beStatistic.init();
|
||||
} catch (LoadBalanceException e) {
|
||||
|
||||
@ -587,13 +587,12 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
Backend be = infoService.getBackend(backendId);
|
||||
if (be == null) {
|
||||
return false;
|
||||
} else if (!be.getTag().equals(tag) || excludedBeIds.contains(be.getId())) {
|
||||
} else if (!be.getLocationTag().equals(tag) || excludedBeIds.contains(be.getId())) {
|
||||
return false;
|
||||
} else if (!be.isScheduleAvailable()) {
|
||||
// 1. BE is dead longer than "delaySecond"
|
||||
// 2. BE is under decommission
|
||||
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > delaySecond * 1000L)
|
||||
|| be.isDecommissioned()) {
|
||||
if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > delaySecond * 1000L) || be.isDecommissioned()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -714,8 +714,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
for (Replica replica : replicas) {
|
||||
Backend be = infoService.getBackend(replica.getBackendId());
|
||||
if (be != null && be.isScheduleAvailable() && replica.isAlive() && !replica.tooSlow()) {
|
||||
Short num = currentAllocMap.getOrDefault(be.getTag(), (short) 0);
|
||||
currentAllocMap.put(be.getTag(), (short) (num + 1));
|
||||
Short num = currentAllocMap.getOrDefault(be.getLocationTag(), (short) 0);
|
||||
currentAllocMap.put(be.getLocationTag(), (short) (num + 1));
|
||||
}
|
||||
}
|
||||
|
||||
@ -979,7 +979,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
Map<Tag, Short> allocMap = tabletCtx.getReplicaAlloc().getAllocMap();
|
||||
for (Replica replica : replicas) {
|
||||
Backend be = infoService.getBackend(replica.getBackendId());
|
||||
if (!allocMap.containsKey(be.getTag())) {
|
||||
if (!allocMap.containsKey(be.getLocationTag())) {
|
||||
deleteReplicaInternal(tabletCtx, replica, "not in valid tag", force);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1693,4 +1693,7 @@ public class Config extends ConfigBase {
|
||||
|
||||
@ConfField
|
||||
public static String default_storage_policy = "default_storage_policy";
|
||||
|
||||
@ConfField(mutable = false, masterOnly = true)
|
||||
public static boolean enable_multi_tags = false;
|
||||
}
|
||||
|
||||
@ -163,8 +163,8 @@ public class BackendsProcDir implements ProcDirInterface {
|
||||
}
|
||||
backendInfo.add(String.format("%.2f", used) + " %");
|
||||
backendInfo.add(String.format("%.2f", backend.getMaxDiskUsedPct() * 100) + " %");
|
||||
// tag
|
||||
backendInfo.add(backend.getTag().toString());
|
||||
// tags
|
||||
backendInfo.add(backend.getTagMapString());
|
||||
// err msg
|
||||
backendInfo.add(backend.getHeartbeatErrMsg());
|
||||
// version
|
||||
|
||||
@ -72,7 +72,7 @@ public class ClusterLoadStatByTag implements ProcDirInterface {
|
||||
for (long beId : beIds) {
|
||||
Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
|
||||
if (be != null) {
|
||||
tags.add(be.getTag());
|
||||
tags.add(be.getLocationTag());
|
||||
}
|
||||
}
|
||||
return tags;
|
||||
|
||||
@ -43,11 +43,13 @@ import org.apache.doris.thrift.TTabletType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -568,8 +570,8 @@ public class PropertyAnalyzer {
|
||||
return ScalarType.createType(type);
|
||||
}
|
||||
|
||||
public static Boolean analyzeBackendDisableProperties(Map<String, String> properties,
|
||||
String key, Boolean defaultValue) {
|
||||
public static Boolean analyzeBackendDisableProperties(Map<String, String> properties, String key,
|
||||
Boolean defaultValue) {
|
||||
if (properties.containsKey(key)) {
|
||||
String value = properties.remove(key);
|
||||
return Boolean.valueOf(value);
|
||||
@ -577,13 +579,39 @@ public class PropertyAnalyzer {
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
public static Tag analyzeBackendTagProperties(Map<String, String> properties, Tag defaultValue)
|
||||
/**
|
||||
* Found property with "tag." prefix and return a tag map, which key is tag type and value is tag value
|
||||
* Eg.
|
||||
* "tag.location" = "group_a", "tag.compute" = "x1"
|
||||
* Returns:
|
||||
* [location->group_a] [compute->x1]
|
||||
*
|
||||
* @param properties
|
||||
* @param defaultValue
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static Map<String, String> analyzeBackendTagsProperties(Map<String, String> properties, Tag defaultValue)
|
||||
throws AnalysisException {
|
||||
if (properties.containsKey(TAG_LOCATION)) {
|
||||
String tagVal = properties.remove(TAG_LOCATION);
|
||||
return Tag.create(Tag.TYPE_LOCATION, tagVal);
|
||||
Map<String, String> tagMap = Maps.newHashMap();
|
||||
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Map.Entry<String, String> entry = iter.next();
|
||||
if (!entry.getKey().startsWith("tag.")) {
|
||||
continue;
|
||||
}
|
||||
String[] keyParts = entry.getKey().split("\\.");
|
||||
if (keyParts.length != 2) {
|
||||
continue;
|
||||
}
|
||||
String val = entry.getValue().replaceAll(" ", "");
|
||||
tagMap.put(keyParts[1], val);
|
||||
iter.remove();
|
||||
}
|
||||
return defaultValue;
|
||||
if (tagMap.isEmpty() && defaultValue != null) {
|
||||
tagMap.put(defaultValue.type, defaultValue.value);
|
||||
}
|
||||
return tagMap;
|
||||
}
|
||||
|
||||
// There are 2 kinds of replication property:
|
||||
|
||||
@ -569,15 +569,15 @@ public class OlapScanNode extends ScanNode {
|
||||
for (Replica replica : replicas) {
|
||||
Backend backend = Catalog.getCurrentSystemInfo().getBackend(replica.getBackendId());
|
||||
if (backend == null || !backend.isAlive()) {
|
||||
LOG.debug("backend {} not exists or is not alive for replica {}",
|
||||
replica.getBackendId(), replica.getId());
|
||||
LOG.debug("backend {} not exists or is not alive for replica {}", replica.getBackendId(),
|
||||
replica.getId());
|
||||
errs.add(replica.getId() + "'s backend " + replica.getBackendId() + " does not exist or not alive");
|
||||
continue;
|
||||
}
|
||||
if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getTag())) {
|
||||
String err = String.format("Replica on backend %d with tag %s,"
|
||||
+ " which is not in user's resource tags: %s",
|
||||
backend.getId(), backend.getTag(), allowedTags);
|
||||
if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getLocationTag())) {
|
||||
String err = String.format(
|
||||
"Replica on backend %d with tag %s," + " which is not in user's resource tags: %s",
|
||||
backend.getId(), backend.getLocationTag(), allowedTags);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(err);
|
||||
}
|
||||
|
||||
@ -23,11 +23,13 @@ import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/*
|
||||
@ -96,10 +98,20 @@ public class Tag implements Writable {
|
||||
return new Tag(type, value);
|
||||
}
|
||||
|
||||
public static Tag createNotCheck(String type, String value) {
|
||||
return new Tag(type, value);
|
||||
}
|
||||
|
||||
public String toKey() {
|
||||
return type + "_" + value;
|
||||
}
|
||||
|
||||
public Map<String, String> toMap() {
|
||||
Map<String, String> map = Maps.newHashMap();
|
||||
map.put(type, value);
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(type, value);
|
||||
|
||||
@ -24,12 +24,14 @@ import org.apache.doris.catalog.DiskInfo.DiskState;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.PrintableMap;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.HeartbeatResponse.HbStatus;
|
||||
import org.apache.doris.thrift.TDisk;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
@ -44,6 +46,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
@ -116,8 +119,14 @@ public class Backend implements Writable {
|
||||
// additional backendStatus information for BE, display in JSON format
|
||||
@SerializedName("backendStatus")
|
||||
private BackendStatus backendStatus = new BackendStatus();
|
||||
@SerializedName("tag")
|
||||
private Tag tag = Tag.DEFAULT_BACKEND_TAG;
|
||||
// the locationTag is also saved in tagMap, use a single field here to avoid
|
||||
// creating this everytime we get it.
|
||||
@SerializedName(value = "locationTag", alternate = {"tag"})
|
||||
private Tag locationTag = Tag.DEFAULT_BACKEND_TAG;
|
||||
// tag type -> tag value.
|
||||
// A backend can only be assigned to one tag type, and each type can only have one value.
|
||||
@SerializedName("tagMap")
|
||||
private Map<String, String> tagMap = Maps.newHashMap();
|
||||
|
||||
public Backend() {
|
||||
this.host = "";
|
||||
@ -135,6 +144,7 @@ public class Backend implements Writable {
|
||||
this.ownerClusterName = "";
|
||||
this.backendState = BackendState.free.ordinal();
|
||||
this.decommissionType = DecommissionType.SystemDecommission.ordinal();
|
||||
this.tagMap.put(locationTag.type, locationTag.value);
|
||||
}
|
||||
|
||||
public Backend(long id, String host, int heartbeatPort) {
|
||||
@ -155,6 +165,7 @@ public class Backend implements Writable {
|
||||
this.ownerClusterName = "";
|
||||
this.backendState = BackendState.free.ordinal();
|
||||
this.decommissionType = DecommissionType.SystemDecommission.ordinal();
|
||||
this.tagMap.put(locationTag.type, locationTag.value);
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
@ -534,9 +545,30 @@ public class Backend implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In old version, there is only one tag for a Backend, and it is a "location" type tag.
|
||||
* But in new version, a Backend can have multi tag, so we need to put locationTag to
|
||||
* the new tagMap
|
||||
*/
|
||||
private void convertToTagMapAndSetLocationTag() {
|
||||
if (tagMap == null) {
|
||||
// When first upgrade from old version, tags may be null
|
||||
tagMap = Maps.newHashMap();
|
||||
}
|
||||
if (!tagMap.containsKey(Tag.TYPE_LOCATION)) {
|
||||
// ATTN: here we use Tag.TYPE_LOCATION directly, not locationTag.type,
|
||||
// because we need to make sure the previous tag must be a location type tag,
|
||||
// and if not, convert it to location type.
|
||||
tagMap.put(Tag.TYPE_LOCATION, locationTag.value);
|
||||
}
|
||||
locationTag = Tag.createNotCheck(Tag.TYPE_LOCATION, tagMap.get(Tag.TYPE_LOCATION));
|
||||
}
|
||||
|
||||
public static Backend read(DataInput in) throws IOException {
|
||||
String json = Text.readString(in);
|
||||
return GsonUtils.GSON.fromJson(json, Backend.class);
|
||||
Backend be = GsonUtils.GSON.fromJson(json, Backend.class);
|
||||
be.convertToTagMapAndSetLocationTag();
|
||||
return be;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -545,32 +577,6 @@ public class Backend implements Writable {
|
||||
Text.writeString(out, json);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
private void readFields(DataInput in) throws IOException {
|
||||
id = in.readLong();
|
||||
host = Text.readString(in);
|
||||
heartbeatPort = in.readInt();
|
||||
bePort = in.readInt();
|
||||
httpPort = in.readInt();
|
||||
beRpcPort = in.readInt();
|
||||
isAlive.set(in.readBoolean());
|
||||
isDecommissioned.set(in.readBoolean());
|
||||
lastUpdateMs = in.readLong();
|
||||
lastStartTime = in.readLong();
|
||||
Map<String, DiskInfo> disks = Maps.newHashMap();
|
||||
int size = in.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
String rootPath = Text.readString(in);
|
||||
DiskInfo diskInfo = DiskInfo.read(in);
|
||||
disks.put(rootPath, diskInfo);
|
||||
}
|
||||
disksRef = ImmutableMap.copyOf(disks);
|
||||
ownerClusterName = Text.readString(in);
|
||||
backendState = in.readInt();
|
||||
decommissionType = in.readInt();
|
||||
brpcPort = in.readInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, host, heartbeatPort, bePort, isAlive);
|
||||
@ -594,7 +600,7 @@ public class Backend implements Writable {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get()
|
||||
+ ", tag: " + tag + "]";
|
||||
+ ", tags: " + tagMap + "]";
|
||||
}
|
||||
|
||||
public String getOwnerClusterName() {
|
||||
@ -717,11 +723,34 @@ public class Backend implements Writable {
|
||||
public volatile boolean isLoadDisabled = false;
|
||||
}
|
||||
|
||||
public void setTag(Tag tag) {
|
||||
this.tag = tag;
|
||||
public Tag getLocationTag() {
|
||||
return locationTag;
|
||||
}
|
||||
|
||||
public Tag getTag() {
|
||||
return tag;
|
||||
public void setTagMap(Map<String, String> tagMap) {
|
||||
Preconditions.checkState(tagMap.containsKey(Tag.TYPE_LOCATION));
|
||||
this.tagMap = tagMap;
|
||||
this.locationTag = Tag.createNotCheck(Tag.TYPE_LOCATION, tagMap.get(Tag.TYPE_LOCATION));
|
||||
}
|
||||
|
||||
public Map<String, String> getTagMap() {
|
||||
return tagMap;
|
||||
}
|
||||
|
||||
public String getTagMapString() {
|
||||
return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}";
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Tag by type, return Optional.empty if no such tag with given type
|
||||
*
|
||||
* @param type
|
||||
* @return
|
||||
*/
|
||||
public Optional<Tag> getTagByType(String type) {
|
||||
if (!tagMap.containsKey(type)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(Tag.createNotCheck(type, tagMap.get(type)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,11 +98,10 @@ public class BeSelectionPolicy {
|
||||
}
|
||||
|
||||
public boolean isMatch(Backend backend) {
|
||||
if (needScheduleAvailable && !backend.isScheduleAvailable()
|
||||
|| needQueryAvailable && !backend.isQueryAvailable()
|
||||
|| needLoadAvailable && !backend.isLoadAvailable()
|
||||
|| !resourceTags.isEmpty() && !resourceTags.contains(backend.getTag())
|
||||
|| storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) {
|
||||
if (needScheduleAvailable && !backend.isScheduleAvailable() || needQueryAvailable && !backend.isQueryAvailable()
|
||||
|| needLoadAvailable && !backend.isLoadAvailable() || !resourceTags.isEmpty() && !resourceTags.contains(
|
||||
backend.getLocationTag()) || storageMedium != null && !backend.hasSpecifiedStorageMedium(
|
||||
storageMedium)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@ -96,7 +96,7 @@ public class SystemInfoService {
|
||||
|
||||
// for deploy manager
|
||||
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws UserException {
|
||||
addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG);
|
||||
addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG.toMap());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -105,8 +105,8 @@ public class SystemInfoService {
|
||||
* @param destCluster : if not null or empty backend will be added to destCluster
|
||||
* @throws DdlException
|
||||
*/
|
||||
public void addBackends(List<Pair<String, Integer>> hostPortPairs,
|
||||
boolean isFree, String destCluster, Tag tag) throws UserException {
|
||||
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree, String destCluster,
|
||||
Map<String, String> tagMap) throws UserException {
|
||||
for (Pair<String, Integer> pair : hostPortPairs) {
|
||||
// check is already exist
|
||||
if (getBackendWithHeartbeatPort(pair.first, pair.second) != null) {
|
||||
@ -115,7 +115,7 @@ public class SystemInfoService {
|
||||
}
|
||||
|
||||
for (Pair<String, Integer> pair : hostPortPairs) {
|
||||
addBackend(pair.first, pair.second, isFree, destCluster, tag);
|
||||
addBackend(pair.first, pair.second, isFree, destCluster, tagMap);
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,7 +137,7 @@ public class SystemInfoService {
|
||||
|
||||
// Final entry of adding backend
|
||||
private void addBackend(String host, int heartbeatPort, boolean isFree, String destCluster,
|
||||
Tag tag) throws UserException {
|
||||
Map<String, String> tagMap) {
|
||||
Backend newBackend = new Backend(Catalog.getCurrentCatalog().getNextId(), host, heartbeatPort);
|
||||
// update idToBackend
|
||||
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
|
||||
@ -162,7 +162,7 @@ public class SystemInfoService {
|
||||
}
|
||||
|
||||
// set tags
|
||||
newBackend.setTag(tag);
|
||||
newBackend.setTagMap(tagMap);
|
||||
|
||||
// log
|
||||
Catalog.getCurrentCatalog().getEditLog().logAddBackend(newBackend);
|
||||
@ -1123,12 +1123,10 @@ public class SystemInfoService {
|
||||
|
||||
for (Backend be : backends) {
|
||||
boolean shouldModify = false;
|
||||
if (alterClause.getTag() != null) {
|
||||
Tag tag = alterClause.getTag();
|
||||
if (!be.getTag().equals(tag)) {
|
||||
be.setTag(tag);
|
||||
shouldModify = true;
|
||||
}
|
||||
Map<String, String> tagMap = alterClause.getTagMap();
|
||||
if (!tagMap.isEmpty()) {
|
||||
be.setTagMap(tagMap);
|
||||
shouldModify = true;
|
||||
}
|
||||
|
||||
if (alterClause.isQueryDisabled() != null) {
|
||||
@ -1154,7 +1152,7 @@ public class SystemInfoService {
|
||||
|
||||
public void replayModifyBackend(Backend backend) {
|
||||
Backend memBe = getBackend(backend.getId());
|
||||
memBe.setTag(backend.getTag());
|
||||
memBe.setTagMap(backend.getTagMap());
|
||||
memBe.setQueryDisabled(backend.isQueryDisabled());
|
||||
memBe.setLoadDisabled(backend.isLoadDisabled());
|
||||
LOG.debug("replay modify backend: {}", backend);
|
||||
@ -1164,10 +1162,10 @@ public class SystemInfoService {
|
||||
public void checkReplicaAllocation(String cluster, ReplicaAllocation replicaAlloc) throws DdlException {
|
||||
List<Backend> backends = getClusterBackends(cluster);
|
||||
for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) {
|
||||
if (backends.stream().filter(b -> b.getTag().equals(entry.getKey())).count()
|
||||
< entry.getValue()) {
|
||||
throw new DdlException("Failed to find enough host with tag(" + entry.getKey()
|
||||
+ ") in all backends. need: " + entry.getValue());
|
||||
if (backends.stream().filter(b -> b.getLocationTag().equals(entry.getKey())).count() < entry.getValue()) {
|
||||
throw new DdlException(
|
||||
"Failed to find enough host with tag(" + entry.getKey() + ") in all backends. need: "
|
||||
+ entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1176,13 +1174,13 @@ public class SystemInfoService {
|
||||
List<Backend> bes = getClusterBackends(clusterName);
|
||||
Set<Tag> tags = Sets.newHashSet();
|
||||
for (Backend be : bes) {
|
||||
tags.add(be.getTag());
|
||||
tags.add(be.getLocationTag());
|
||||
}
|
||||
return tags;
|
||||
}
|
||||
|
||||
public List<Backend> getBackendsByTagInCluster(String clusterName, Tag tag) {
|
||||
List<Backend> bes = getClusterBackends(clusterName);
|
||||
return bes.stream().filter(b -> b.getTag().equals(tag)).collect(Collectors.toList());
|
||||
return bes.stream().filter(b -> b.getLocationTag().equals(tag)).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,11 +19,13 @@ package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.AccessTestUtil;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TDisk;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -192,9 +194,15 @@ public class BackendTest {
|
||||
back1.updateOnce(1, 1, 1);
|
||||
back2 = new Backend(1, "a", 2);
|
||||
back2.updateOnce(1, 1, 1);
|
||||
Map<String, String> tagMap = Maps.newHashMap();
|
||||
tagMap.put(Tag.TYPE_LOCATION, "l1");
|
||||
tagMap.put("compute", "c1");
|
||||
back2.setTagMap(tagMap);
|
||||
Assert.assertFalse(back1.equals(back2));
|
||||
|
||||
Assert.assertEquals("Backend [id=1, host=a, heartbeatPort=1, alive=true, tag: {\"location\" : \"default\"}]", back1.toString());
|
||||
Assert.assertEquals("Backend [id=1, host=a, heartbeatPort=1, alive=true, tags: {location=default}]",
|
||||
back1.toString());
|
||||
Assert.assertEquals("{\"compute\" : \"c1\", \"location\" : \"l1\"}", back2.getTagMapString());
|
||||
|
||||
// 3. delete files
|
||||
dis.close();
|
||||
|
||||
@ -346,7 +346,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend2.isScheduleAvailable();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
myBackend2.getTag();
|
||||
myBackend2.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
|
||||
@ -363,7 +363,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend3.getLastUpdateMs();
|
||||
result = System.currentTimeMillis() - (Config.colocate_group_relocate_delay_second + 20) * 1000;
|
||||
minTimes = 0;
|
||||
myBackend3.getTag();
|
||||
myBackend3.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
|
||||
@ -380,7 +380,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend4.getLastUpdateMs();
|
||||
result = System.currentTimeMillis();
|
||||
minTimes = 0;
|
||||
myBackend4.getTag();
|
||||
myBackend4.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
|
||||
@ -397,7 +397,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend5.isDecommissioned();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
myBackend5.getTag();
|
||||
myBackend5.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
|
||||
@ -439,7 +439,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend2.isScheduleAvailable();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
myBackend2.getTag();
|
||||
myBackend2.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
|
||||
@ -456,7 +456,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend3.getLastUpdateMs();
|
||||
result = System.currentTimeMillis() - (Config.colocate_group_relocate_delay_second + 20) * 1000;
|
||||
minTimes = 0;
|
||||
myBackend3.getTag();
|
||||
myBackend3.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
|
||||
@ -473,7 +473,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend4.getLastUpdateMs();
|
||||
result = System.currentTimeMillis();
|
||||
minTimes = 0;
|
||||
myBackend4.getTag();
|
||||
myBackend4.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
|
||||
@ -490,7 +490,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend5.isDecommissioned();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
myBackend5.getTag();
|
||||
myBackend5.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
|
||||
@ -507,7 +507,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend6.isDecommissioned();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend6.getTag();
|
||||
myBackend6.getLocationTag();
|
||||
result = Tag.create(Tag.TYPE_LOCATION, "new_loc");
|
||||
minTimes = 0;
|
||||
|
||||
@ -524,7 +524,7 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend7.isDecommissioned();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend7.getTag();
|
||||
myBackend7.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend7.getId();
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.analysis.AlterSystemStmt;
|
||||
import org.apache.doris.analysis.AlterTableStmt;
|
||||
import org.apache.doris.analysis.BackendClause;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
@ -208,25 +209,47 @@ public class TabletRepairAndBalanceTest {
|
||||
AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
|
||||
DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt);
|
||||
}
|
||||
|
||||
// Test set tag without location type, expect throw exception
|
||||
Backend be1 = backends.get(0);
|
||||
String alterString = "alter system modify backend \"" + be1.getHost() + ":" + be1.getHeartbeatPort()
|
||||
+ "\" set ('tag.compute' = 'abc')";
|
||||
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, BackendClause.NEED_LOCATION_TAG_MSG,
|
||||
() -> UtFrameUtils.parseAndAnalyzeStmt(alterString, connectContext));
|
||||
|
||||
// Test set multi tag for a Backend when Config.enable_multi_tags is false
|
||||
Config.enable_multi_tags = false;
|
||||
String alterString2 = "alter system modify backend \"" + be1.getHost() + ":" + be1.getHeartbeatPort()
|
||||
+ "\" set ('tag.location' = 'zone3', 'tag.compution' = 'abc')";
|
||||
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, BackendClause.MUTLI_TAG_DISABLED_MSG,
|
||||
() -> UtFrameUtils.parseAndAnalyzeStmt(alterString2, connectContext));
|
||||
|
||||
// Test set multi tag for a Backend when Config.enable_multi_tags is true
|
||||
Config.enable_multi_tags = true;
|
||||
String stmtStr3 = "alter system modify backend \"" + be1.getHost() + ":" + be1.getHeartbeatPort()
|
||||
+ "\" set ('tag.location' = 'zone1', 'tag.compute' = 'c1')";
|
||||
AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr3, connectContext);
|
||||
DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt);
|
||||
Map<String, String> tagMap = be1.getTagMap();
|
||||
Assert.assertEquals(2, tagMap.size());
|
||||
Assert.assertEquals("zone1", tagMap.get(Tag.TYPE_LOCATION));
|
||||
Assert.assertEquals("c1", tagMap.get("compute"));
|
||||
Assert.assertEquals(Tag.createNotCheck(Tag.TYPE_LOCATION, "zone1"), be1.getLocationTag());
|
||||
|
||||
Tag zone1 = Tag.create(Tag.TYPE_LOCATION, "zone1");
|
||||
Tag zone2 = Tag.create(Tag.TYPE_LOCATION, "zone2");
|
||||
Assert.assertEquals(zone1, backends.get(0).getTag());
|
||||
Assert.assertEquals(zone1, backends.get(1).getTag());
|
||||
Assert.assertEquals(zone1, backends.get(2).getTag());
|
||||
Assert.assertEquals(zone2, backends.get(3).getTag());
|
||||
Assert.assertEquals(zone2, backends.get(4).getTag());
|
||||
Assert.assertEquals(zone1, backends.get(0).getLocationTag());
|
||||
Assert.assertEquals(zone1, backends.get(1).getLocationTag());
|
||||
Assert.assertEquals(zone1, backends.get(2).getLocationTag());
|
||||
Assert.assertEquals(zone2, backends.get(3).getLocationTag());
|
||||
Assert.assertEquals(zone2, backends.get(4).getLocationTag());
|
||||
|
||||
// create table
|
||||
// 1. no default tag, create will fail
|
||||
String createStr = "create table test.tbl1\n"
|
||||
+ "(k1 date, k2 int)\n"
|
||||
+ "partition by range(k1)\n"
|
||||
+ "(\n"
|
||||
String createStr = "create table test.tbl1\n" + "(k1 date, k2 int)\n" + "partition by range(k1)\n" + "(\n"
|
||||
+ " partition p1 values less than(\"2021-06-01\"),\n"
|
||||
+ " partition p2 values less than(\"2021-07-01\"),\n"
|
||||
+ " partition p3 values less than(\"2021-08-01\")\n"
|
||||
+ ")\n"
|
||||
+ "distributed by hash(k2) buckets 10;";
|
||||
+ " partition p3 values less than(\"2021-08-01\")\n" + ")\n" + "distributed by hash(k2) buckets 10;";
|
||||
ExceptionChecker.expectThrows(DdlException.class, () -> createTable(createStr));
|
||||
|
||||
// nodes of zone2 not enough, create will fail
|
||||
@ -312,9 +335,9 @@ public class TabletRepairAndBalanceTest {
|
||||
Backend be = backends.get(2);
|
||||
String stmtStr = "alter system modify backend \"" + be.getHost() + ":" + be.getHeartbeatPort()
|
||||
+ "\" set ('tag.location' = 'zone2')";
|
||||
AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
|
||||
stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
|
||||
DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt);
|
||||
Assert.assertEquals(tag2, be.getTag());
|
||||
Assert.assertEquals(tag2, be.getLocationTag());
|
||||
ExceptionChecker.expectThrows(UserException.class, () -> tbl.checkReplicaAllocation());
|
||||
checkTableReplicaAllocation(tbl);
|
||||
Assert.assertEquals(90, replicaMetaTable.cellSet().size());
|
||||
@ -376,7 +399,7 @@ public class TabletRepairAndBalanceTest {
|
||||
+ "\" set ('tag.location' = 'zone1')";
|
||||
stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
|
||||
DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt);
|
||||
Assert.assertEquals(tag1, be.getTag());
|
||||
Assert.assertEquals(tag1, be.getLocationTag());
|
||||
ExceptionChecker.expectThrows(UserException.class, () -> tbl.checkReplicaAllocation());
|
||||
|
||||
checkTableReplicaAllocation(colTbl1);
|
||||
@ -419,26 +442,21 @@ public class TabletRepairAndBalanceTest {
|
||||
Backend backend = backends.get(i);
|
||||
String backendStmt = "alter system modify backend \"" + backend.getHost() + ":" + backend.getHeartbeatPort()
|
||||
+ "\" set ('tag.location' = 'default')";
|
||||
AlterSystemStmt systemStmt
|
||||
= (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(backendStmt, connectContext);
|
||||
AlterSystemStmt systemStmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(backendStmt,
|
||||
connectContext);
|
||||
DdlExecutor.execute(Catalog.getCurrentCatalog(), systemStmt);
|
||||
}
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(0).getTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(1).getTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(2).getTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(3).getTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(4).getTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(0).getLocationTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(1).getLocationTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(2).getLocationTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(3).getLocationTag());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(4).getLocationTag());
|
||||
|
||||
// create table tbl2 with "replication_num" property
|
||||
String createStmt = "create table test.tbl2\n"
|
||||
+ "(k1 date, k2 int)\n"
|
||||
+ "partition by range(k1)\n"
|
||||
+ "(\n"
|
||||
String createStmt = "create table test.tbl2\n" + "(k1 date, k2 int)\n" + "partition by range(k1)\n" + "(\n"
|
||||
+ " partition p1 values less than(\"2021-06-01\"),\n"
|
||||
+ " partition p2 values less than(\"2021-07-01\"),\n"
|
||||
+ " partition p3 values less than(\"2021-08-01\")\n"
|
||||
+ ")\n"
|
||||
+ "distributed by hash(k2) buckets 10;";
|
||||
+ " partition p3 values less than(\"2021-08-01\")\n" + ")\n" + "distributed by hash(k2) buckets 10;";
|
||||
ExceptionChecker.expectThrowsNoException(() -> createTable(createStmt));
|
||||
OlapTable tbl2 = (OlapTable) db.getTableNullable("tbl2");
|
||||
ReplicaAllocation defaultAlloc = new ReplicaAllocation((short) 3);
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.thrift.TStorageFormat;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
@ -155,8 +156,8 @@ public class PropertyAnalyzerTest {
|
||||
@Test
|
||||
public void testStorageFormat() throws AnalysisException {
|
||||
HashMap<String, String> propertiesV1 = Maps.newHashMap();
|
||||
HashMap<String, String> propertiesV2 = Maps.newHashMap();
|
||||
HashMap<String, String> propertiesDefault = Maps.newHashMap();
|
||||
HashMap<String, String> propertiesV2 = Maps.newHashMap();
|
||||
HashMap<String, String> propertiesDefault = Maps.newHashMap();
|
||||
propertiesV1.put(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, "v1");
|
||||
propertiesV2.put(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, "v2");
|
||||
propertiesDefault.put(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, "default");
|
||||
@ -164,8 +165,24 @@ public class PropertyAnalyzerTest {
|
||||
Assert.assertEquals(TStorageFormat.V2, PropertyAnalyzer.analyzeStorageFormat(propertiesV2));
|
||||
Assert.assertEquals(TStorageFormat.V2, PropertyAnalyzer.analyzeStorageFormat(propertiesDefault));
|
||||
expectedEx.expect(AnalysisException.class);
|
||||
expectedEx.expectMessage("Storage format V1 has been deprecated since version 0.14,"
|
||||
+ " please use V2 instead");
|
||||
expectedEx.expectMessage(
|
||||
"Storage format V1 has been deprecated since version 0.14," + " please use V2 instead");
|
||||
PropertyAnalyzer.analyzeStorageFormat(propertiesV1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTag() throws AnalysisException {
|
||||
HashMap<String, String> properties = Maps.newHashMap();
|
||||
properties.put("tag.location", "l1");
|
||||
properties.put("other", "prop");
|
||||
Map<String, String> tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties, null);
|
||||
Assert.assertEquals("l1", tagMap.get("location"));
|
||||
Assert.assertEquals(1, tagMap.size());
|
||||
Assert.assertEquals(1, properties.size());
|
||||
|
||||
properties.clear();
|
||||
tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties, Tag.DEFAULT_BACKEND_TAG);
|
||||
Assert.assertEquals(1, tagMap.size());
|
||||
Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG.value, tagMap.get(Tag.TYPE_LOCATION));
|
||||
}
|
||||
}
|
||||
|
||||
@ -239,9 +239,9 @@ public class ResourceTagQueryTest {
|
||||
AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext);
|
||||
DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt);
|
||||
}
|
||||
Assert.assertEquals(tag1, backends.get(0).getTag());
|
||||
Assert.assertEquals(tag1, backends.get(1).getTag());
|
||||
Assert.assertEquals(tag1, backends.get(2).getTag());
|
||||
Assert.assertEquals(tag1, backends.get(0).getLocationTag());
|
||||
Assert.assertEquals(tag1, backends.get(1).getLocationTag());
|
||||
Assert.assertEquals(tag1, backends.get(2).getLocationTag());
|
||||
|
||||
queryStr = "explain select * from test.tbl1";
|
||||
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
|
||||
@ -251,7 +251,8 @@ public class ResourceTagQueryTest {
|
||||
// for now, 3 backends with tag zone1, 2 with tag default, so table is not stable.
|
||||
ExceptionChecker.expectThrows(UserException.class, () -> tbl.checkReplicaAllocation());
|
||||
// alter table's replication allocation to zone1:2 and default:1
|
||||
String alterStr = "alter table test.tbl1 modify partition (p1, p2, p3) set ('replication_allocation' = 'tag.location.zone1:2, tag.location.default:1')";
|
||||
String alterStr
|
||||
= "alter table test.tbl1 modify partition (p1, p2, p3) set ('replication_allocation' = 'tag.location.zone1:2, tag.location.default:1')";
|
||||
ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStr));
|
||||
Map<Tag, Short> expectedAllocMap = Maps.newHashMap();
|
||||
expectedAllocMap.put(Tag.DEFAULT_BACKEND_TAG, (short) 1);
|
||||
|
||||
@ -152,22 +152,22 @@ public class SystemInfoServiceTest {
|
||||
|
||||
Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
|
||||
Tag tagb = Tag.create(Tag.TYPE_LOCATION, "tagb");
|
||||
be1.setTag(taga);
|
||||
be2.setTag(taga);
|
||||
be3.setTag(tagb);
|
||||
be4.setTag(tagb);
|
||||
be5.setTag(tagb);
|
||||
be1.setTagMap(taga.toMap());
|
||||
be2.setTagMap(taga.toMap());
|
||||
be3.setTagMap(tagb.toMap());
|
||||
be4.setTagMap(tagb.toMap());
|
||||
be5.setTagMap(tagb.toMap());
|
||||
|
||||
BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable()
|
||||
.addTags(Sets.newHashSet(taga)).build();
|
||||
BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga))
|
||||
.build();
|
||||
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy7, 1).size());
|
||||
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy7, 2).size());
|
||||
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10001L));
|
||||
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10002L));
|
||||
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy7, 3).size());
|
||||
|
||||
BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder()
|
||||
.needQueryAvailable().addTags(Sets.newHashSet(tagb)).build();
|
||||
BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(tagb))
|
||||
.build();
|
||||
Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy8, 3).size());
|
||||
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10003L));
|
||||
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10004L));
|
||||
@ -209,7 +209,7 @@ public class SystemInfoServiceTest {
|
||||
// 8. check same host
|
||||
addBackend(10006, "192.168.1.1", 9051);
|
||||
Backend be6 = infoService.getBackend(10006);
|
||||
be6.setTag(taga);
|
||||
be6.setTagMap(taga.toMap());
|
||||
be6.setAlive(true);
|
||||
addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
|
||||
addDisk(be6, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
|
||||
|
||||
Reference in New Issue
Block a user