diff --git a/docs/en/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-ADD-BACKEND.md b/docs/en/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-ADD-BACKEND.md index ad35f9ac01..7006ac761a 100644 --- a/docs/en/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-ADD-BACKEND.md +++ b/docs/en/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-ADD-BACKEND.md @@ -39,10 +39,6 @@ grammar: ```sql -- Add nodes (add this method if you do not use the multi-tenancy function) ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...]; --- Add idle nodes (that is, add BACKEND that does not belong to any cluster) - ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...]; --- Add nodes to a cluster - ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...]; ```` illustrate: @@ -59,12 +55,6 @@ grammar: ALTER SYSTEM ADD BACKEND "host:port"; ```` - 1. Add an idle node - - ```sql - ALTER SYSTEM ADD FREE BACKEND "host:port"; - ```` - ### Keywords ALTER, SYSTEM, ADD, BACKEND, ALTER SYSTEM diff --git a/docs/en/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-MODIFY-BACKEND.md b/docs/en/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-MODIFY-BACKEND.md index 0923f73046..f19fa51d0d 100644 --- a/docs/en/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-MODIFY-BACKEND.md +++ b/docs/en/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-MODIFY-BACKEND.md @@ -47,16 +47,20 @@ ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]); 2. heartbeat_port is the heartbeat port of the node 3. Modify BE node properties The following properties are currently supported: -- tag.location: resource tag +- tag.xxxx: resource tag - disable_query: query disable attribute - disable_load: import disable attribute +Note: +1. A backend can be set multi resource tags. But must contain "tag.location" type. + ### Example 1. Modify the resource tag of BE ```sql ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a"); + ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a", "tag.compute" = "c1"); ```` 2. Modify the query disable property of BE diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-ADD-BACKEND.md b/docs/zh-CN/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-ADD-BACKEND.md index fbdf13b7b2..ca03ebaba1 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-ADD-BACKEND.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-ADD-BACKEND.md @@ -37,18 +37,14 @@ ALTER SYSTEM ADD BACKEND 语法: ```sql -1) 增加节点(不使用多租户功能则按照此方法添加) +1) 增加节点 ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...]; -2) 增加空闲节点(即添加不属于任何cluster的BACKEND) - ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...]; -3) 增加节点到某个cluster - ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...]; ``` 说明: 1. host 可以是主机名或者ip地址 -2. heartbeat_port 为该节点的心跳端口 +2. heartbeat_port 为该节点的心跳端口 3. 增加和删除节点为同步操作。这两种操作不考虑节点上已有的数据,节点直接从元数据中删除,请谨慎使用。 ### Example @@ -59,12 +55,6 @@ ALTER SYSTEM ADD BACKEND ALTER SYSTEM ADD BACKEND "host:port"; ``` - 1. 增加一个空闲节点 - - ```sql - ALTER SYSTEM ADD FREE BACKEND "host:port"; - ``` - ### Keywords ALTER, SYSTEM, ADD, BACKEND, ALTER SYSTEM diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-MODIFY-BACKEND.md b/docs/zh-CN/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-MODIFY-BACKEND.md index 61ca1ead02..d6b4b8b2cb 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-MODIFY-BACKEND.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Cluster-Management-Statements/ALTER-SYSTEM-MODIFY-BACKEND.md @@ -47,16 +47,20 @@ ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]); 2. heartbeat_port 为该节点的心跳端口 3. 修改 BE 节点属性目前支持以下属性: -- tag.location:资源标签 +- tag.xxx:资源标签 - disable_query: 查询禁用属性 - disable_load: 导入禁用属性 +注: +1. 可以给一个 Backend 设置多种资源标签。但必须包含 "tag.location"。 + ### Example 1. 修改 BE 的资源标签 ```sql ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a"); + ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a", "tag.compute" = "c1"); ``` 2. 修改 BE 的查询禁用属性 diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index b0aa25a6fe..9e056349f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -126,8 +126,8 @@ public class SystemHandler extends AlterHandler { && Catalog.getCurrentCatalog().getCluster(destClusterName) == null) { throw new DdlException("Cluster: " + destClusterName + " does not exist."); } - Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), - addBackendClause.isFree(), addBackendClause.getDestCluster(), addBackendClause.getTag()); + Catalog.getCurrentSystemInfo().addBackends(addBackendClause.getHostPortPairs(), addBackendClause.isFree(), + addBackendClause.getDestCluster(), addBackendClause.getTagMap()); } else if (alterClause instanceof DropBackendClause) { // drop backend DropBackendClause dropBackendClause = (DropBackendClause) alterClause; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AddBackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AddBackendClause.java index 1cd7d61250..7ccf574f7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AddBackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AddBackendClause.java @@ -18,6 +18,7 @@ package org.apache.doris.analysis; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.resource.Tag; @@ -33,7 +34,7 @@ public class AddBackendClause extends BackendClause { // cluster that backend will be added to protected String destCluster; protected Map properties = Maps.newHashMap(); - private Tag tag; + private Map tagMap; public AddBackendClause(List hostPorts) { super(hostPorts); @@ -57,14 +58,20 @@ public class AddBackendClause extends BackendClause { this.destCluster = destCluster; } - public Tag getTag() { - return tag; + public Map getTagMap() { + return tagMap; } @Override public void analyze(Analyzer analyzer) throws AnalysisException { super.analyze(analyzer); - tag = PropertyAnalyzer.analyzeBackendTagProperties(properties, Tag.DEFAULT_BACKEND_TAG); + tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties, Tag.DEFAULT_BACKEND_TAG); + if (!tagMap.containsKey(Tag.TYPE_LOCATION)) { + throw new AnalysisException(NEED_LOCATION_TAG_MSG); + } + if (!Config.enable_multi_tags && tagMap.size() > 1) { + throw new AnalysisException(MUTLI_TAG_DISABLED_MSG); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java index fb3e325292..86f6380c7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BackendClause.java @@ -33,6 +33,11 @@ public class BackendClause extends AlterClause { protected List hostPorts; protected List> hostPortPairs; + public static final String MUTLI_TAG_DISABLED_MSG = "Not support multi tags for Backend now. " + + "You can set 'enable_multi_tags=true' in fe.conf to enable this feature."; + public static final String NEED_LOCATION_TAG_MSG + = "Backend must have location type tag. Eg: 'tag.location' = 'xxx'."; + protected BackendClause(List hostPorts) { super(AlterOpType.ALTER_OTHER); this.hostPorts = hostPorts; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyBackendClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyBackendClause.java index e5e11dcdb4..5b8bd151fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyBackendClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyBackendClause.java @@ -18,6 +18,7 @@ package org.apache.doris.analysis; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.resource.Tag; @@ -30,7 +31,7 @@ import java.util.Map; public class ModifyBackendClause extends BackendClause { protected Map properties = Maps.newHashMap(); protected Map analyzedProperties = Maps.newHashMap(); - private Tag tag = null; + private Map tagMap = null; private Boolean isQueryDisabled = null; private Boolean isLoadDisabled = null; @@ -42,13 +43,24 @@ public class ModifyBackendClause extends BackendClause { @Override public void analyze(Analyzer analyzer) throws AnalysisException { super.analyze(analyzer); - tag = PropertyAnalyzer.analyzeBackendTagProperties(properties, null); + tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties, null); isQueryDisabled = PropertyAnalyzer.analyzeBackendDisableProperties(properties, PropertyAnalyzer.PROPERTIES_DISABLE_QUERY, null); isLoadDisabled = PropertyAnalyzer.analyzeBackendDisableProperties(properties, PropertyAnalyzer.PROPERTIES_DISABLE_LOAD, null); - if (tag != null) { - analyzedProperties.put(tag.type, tag.value); + if (!tagMap.isEmpty()) { + if (!tagMap.containsKey(Tag.TYPE_LOCATION)) { + throw new AnalysisException(NEED_LOCATION_TAG_MSG); + } + if (!Config.enable_multi_tags && tagMap.size() > 1) { + throw new AnalysisException(MUTLI_TAG_DISABLED_MSG); + } + // TODO: + // here we can add some privilege check so that only authorized user can modify specified type of tag. + // For example, only root user can set tag with type 'computation' + for (Map.Entry entry : tagMap.entrySet()) { + analyzedProperties.put("tag." + entry.getKey(), entry.getValue()); + } } if (isQueryDisabled != null) { analyzedProperties.put(PropertyAnalyzer.PROPERTIES_DISABLE_QUERY, String.valueOf(isQueryDisabled)); @@ -57,13 +69,13 @@ public class ModifyBackendClause extends BackendClause { analyzedProperties.put(PropertyAnalyzer.PROPERTIES_DISABLE_LOAD, String.valueOf(isLoadDisabled)); } if (!properties.isEmpty()) { - throw new AnalysisException("unknown properties setting for key (" - + StringUtils.join(properties.keySet(), ",") + ")"); + throw new AnalysisException( + "unknown properties setting for key (" + StringUtils.join(properties.keySet(), ",") + ")"); } } - public Tag getTag() { - return tag; + public Map getTagMap() { + return tagMap; } public Boolean isQueryDisabled() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 98bc673d4a..e9cd54b290 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1445,11 +1445,11 @@ public class OlapTable extends Table { if (be == null) { continue; } - short num = currentReplicaAlloc.getOrDefault(be.getTag(), (short) 0); - currentReplicaAlloc.put(be.getTag(), (short) (num + 1)); - List beIds = tag2beIds.getOrDefault(be.getTag(), Lists.newArrayList()); + short num = currentReplicaAlloc.getOrDefault(be.getLocationTag(), (short) 0); + currentReplicaAlloc.put(be.getLocationTag(), (short) (num + 1)); + List beIds = tag2beIds.getOrDefault(be.getLocationTag(), Lists.newArrayList()); beIds.add(beId); - tag2beIds.put(be.getTag(), beIds); + tag2beIds.put(be.getLocationTag(), beIds); } if (!currentReplicaAlloc.equals(replicaAlloc.getAllocMap())) { throw new DdlException("The relica allocation is " + currentReplicaAlloc.toString() @@ -1840,8 +1840,8 @@ public class OlapTable extends Table { if (be == null) { continue; } - short num = curMap.getOrDefault(be.getTag(), (short) 0); - curMap.put(be.getTag(), (short) (num + 1)); + short num = curMap.getOrDefault(be.getLocationTag(), (short) 0); + curMap.put(be.getLocationTag(), (short) (num + 1)); } if (!curMap.equals(allocMap)) { throw new UserException("replica allocation of tablet " + tablet.getId() + " is not expected" diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 007c5410f9..8e380f16c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -463,8 +463,8 @@ public class Tablet extends MetaObject implements Writable { versions.add(replica.getVersionCount()); - short curNum = currentAllocMap.getOrDefault(backend.getTag(), (short) 0); - currentAllocMap.put(backend.getTag(), (short) (curNum + 1)); + short curNum = currentAllocMap.getOrDefault(backend.getLocationTag(), (short) 0); + currentAllocMap.put(backend.getLocationTag(), (short) (curNum + 1)); } // 1. alive replicas are not enough diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java index aee2665afc..d2d04f1061 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java @@ -90,8 +90,8 @@ public class ClusterLoadStatistic { // So balance will be blocked. continue; } - BackendLoadStatistic beStatistic = new BackendLoadStatistic(backend.getId(), - backend.getOwnerClusterName(), backend.getTag(), infoService, invertedIndex); + BackendLoadStatistic beStatistic = new BackendLoadStatistic(backend.getId(), backend.getOwnerClusterName(), + backend.getLocationTag(), infoService, invertedIndex); try { beStatistic.init(); } catch (LoadBalanceException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java index 2904a9efc2..89df54bdbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java @@ -587,13 +587,12 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon { Backend be = infoService.getBackend(backendId); if (be == null) { return false; - } else if (!be.getTag().equals(tag) || excludedBeIds.contains(be.getId())) { + } else if (!be.getLocationTag().equals(tag) || excludedBeIds.contains(be.getId())) { return false; } else if (!be.isScheduleAvailable()) { // 1. BE is dead longer than "delaySecond" // 2. BE is under decommission - if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > delaySecond * 1000L) - || be.isDecommissioned()) { + if ((!be.isAlive() && (currTime - be.getLastUpdateMs()) > delaySecond * 1000L) || be.isDecommissioned()) { return false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 7fb1b75481..a8677e1cad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -714,8 +714,8 @@ public class TabletScheduler extends MasterDaemon { for (Replica replica : replicas) { Backend be = infoService.getBackend(replica.getBackendId()); if (be != null && be.isScheduleAvailable() && replica.isAlive() && !replica.tooSlow()) { - Short num = currentAllocMap.getOrDefault(be.getTag(), (short) 0); - currentAllocMap.put(be.getTag(), (short) (num + 1)); + Short num = currentAllocMap.getOrDefault(be.getLocationTag(), (short) 0); + currentAllocMap.put(be.getLocationTag(), (short) (num + 1)); } } @@ -979,7 +979,7 @@ public class TabletScheduler extends MasterDaemon { Map allocMap = tabletCtx.getReplicaAlloc().getAllocMap(); for (Replica replica : replicas) { Backend be = infoService.getBackend(replica.getBackendId()); - if (!allocMap.containsKey(be.getTag())) { + if (!allocMap.containsKey(be.getLocationTag())) { deleteReplicaInternal(tabletCtx, replica, "not in valid tag", force); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 749322e48a..307d0be79a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1693,4 +1693,7 @@ public class Config extends ConfigBase { @ConfField public static String default_storage_policy = "default_storage_policy"; + + @ConfField(mutable = false, masterOnly = true) + public static boolean enable_multi_tags = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java index 4cc330afd2..850febcd3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java @@ -163,8 +163,8 @@ public class BackendsProcDir implements ProcDirInterface { } backendInfo.add(String.format("%.2f", used) + " %"); backendInfo.add(String.format("%.2f", backend.getMaxDiskUsedPct() * 100) + " %"); - // tag - backendInfo.add(backend.getTag().toString()); + // tags + backendInfo.add(backend.getTagMapString()); // err msg backendInfo.add(backend.getHeartbeatErrMsg()); // version diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java index 0f2fb8bf3e..85d7fa4f8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ClusterLoadStatByTag.java @@ -72,7 +72,7 @@ public class ClusterLoadStatByTag implements ProcDirInterface { for (long beId : beIds) { Backend be = Catalog.getCurrentSystemInfo().getBackend(beId); if (be != null) { - tags.add(be.getTag()); + tags.add(be.getLocationTag()); } } return tags; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 49868aab0d..84f8170316 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -43,11 +43,13 @@ import org.apache.doris.thrift.TTabletType; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -568,8 +570,8 @@ public class PropertyAnalyzer { return ScalarType.createType(type); } - public static Boolean analyzeBackendDisableProperties(Map properties, - String key, Boolean defaultValue) { + public static Boolean analyzeBackendDisableProperties(Map properties, String key, + Boolean defaultValue) { if (properties.containsKey(key)) { String value = properties.remove(key); return Boolean.valueOf(value); @@ -577,13 +579,39 @@ public class PropertyAnalyzer { return defaultValue; } - public static Tag analyzeBackendTagProperties(Map properties, Tag defaultValue) + /** + * Found property with "tag." prefix and return a tag map, which key is tag type and value is tag value + * Eg. + * "tag.location" = "group_a", "tag.compute" = "x1" + * Returns: + * [location->group_a] [compute->x1] + * + * @param properties + * @param defaultValue + * @return + * @throws AnalysisException + */ + public static Map analyzeBackendTagsProperties(Map properties, Tag defaultValue) throws AnalysisException { - if (properties.containsKey(TAG_LOCATION)) { - String tagVal = properties.remove(TAG_LOCATION); - return Tag.create(Tag.TYPE_LOCATION, tagVal); + Map tagMap = Maps.newHashMap(); + Iterator> iter = properties.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (!entry.getKey().startsWith("tag.")) { + continue; + } + String[] keyParts = entry.getKey().split("\\."); + if (keyParts.length != 2) { + continue; + } + String val = entry.getValue().replaceAll(" ", ""); + tagMap.put(keyParts[1], val); + iter.remove(); } - return defaultValue; + if (tagMap.isEmpty() && defaultValue != null) { + tagMap.put(defaultValue.type, defaultValue.value); + } + return tagMap; } // There are 2 kinds of replication property: diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 17f1898437..b83ce5641f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -569,15 +569,15 @@ public class OlapScanNode extends ScanNode { for (Replica replica : replicas) { Backend backend = Catalog.getCurrentSystemInfo().getBackend(replica.getBackendId()); if (backend == null || !backend.isAlive()) { - LOG.debug("backend {} not exists or is not alive for replica {}", - replica.getBackendId(), replica.getId()); + LOG.debug("backend {} not exists or is not alive for replica {}", replica.getBackendId(), + replica.getId()); errs.add(replica.getId() + "'s backend " + replica.getBackendId() + " does not exist or not alive"); continue; } - if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getTag())) { - String err = String.format("Replica on backend %d with tag %s," - + " which is not in user's resource tags: %s", - backend.getId(), backend.getTag(), allowedTags); + if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getLocationTag())) { + String err = String.format( + "Replica on backend %d with tag %s," + " which is not in user's resource tags: %s", + backend.getId(), backend.getLocationTag(), allowedTags); if (LOG.isDebugEnabled()) { LOG.debug(err); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java index cdbb485ff1..ffee32ea10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/Tag.java @@ -23,11 +23,13 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Map; import java.util.Objects; /* @@ -96,10 +98,20 @@ public class Tag implements Writable { return new Tag(type, value); } + public static Tag createNotCheck(String type, String value) { + return new Tag(type, value); + } + public String toKey() { return type + "_" + value; } + public Map toMap() { + Map map = Maps.newHashMap(); + map.put(type, value); + return map; + } + @Override public int hashCode() { return Objects.hash(type, value); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index d8267c21e3..7fc8c5a1d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -24,12 +24,14 @@ import org.apache.doris.catalog.DiskInfo.DiskState; import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.PrintableMap; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.resource.Tag; import org.apache.doris.system.HeartbeatResponse.HbStatus; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TStorageMedium; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -44,6 +46,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -116,8 +119,14 @@ public class Backend implements Writable { // additional backendStatus information for BE, display in JSON format @SerializedName("backendStatus") private BackendStatus backendStatus = new BackendStatus(); - @SerializedName("tag") - private Tag tag = Tag.DEFAULT_BACKEND_TAG; + // the locationTag is also saved in tagMap, use a single field here to avoid + // creating this everytime we get it. + @SerializedName(value = "locationTag", alternate = {"tag"}) + private Tag locationTag = Tag.DEFAULT_BACKEND_TAG; + // tag type -> tag value. + // A backend can only be assigned to one tag type, and each type can only have one value. + @SerializedName("tagMap") + private Map tagMap = Maps.newHashMap(); public Backend() { this.host = ""; @@ -135,6 +144,7 @@ public class Backend implements Writable { this.ownerClusterName = ""; this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); + this.tagMap.put(locationTag.type, locationTag.value); } public Backend(long id, String host, int heartbeatPort) { @@ -155,6 +165,7 @@ public class Backend implements Writable { this.ownerClusterName = ""; this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); + this.tagMap.put(locationTag.type, locationTag.value); } public long getId() { @@ -534,9 +545,30 @@ public class Backend implements Writable { } } + /** + * In old version, there is only one tag for a Backend, and it is a "location" type tag. + * But in new version, a Backend can have multi tag, so we need to put locationTag to + * the new tagMap + */ + private void convertToTagMapAndSetLocationTag() { + if (tagMap == null) { + // When first upgrade from old version, tags may be null + tagMap = Maps.newHashMap(); + } + if (!tagMap.containsKey(Tag.TYPE_LOCATION)) { + // ATTN: here we use Tag.TYPE_LOCATION directly, not locationTag.type, + // because we need to make sure the previous tag must be a location type tag, + // and if not, convert it to location type. + tagMap.put(Tag.TYPE_LOCATION, locationTag.value); + } + locationTag = Tag.createNotCheck(Tag.TYPE_LOCATION, tagMap.get(Tag.TYPE_LOCATION)); + } + public static Backend read(DataInput in) throws IOException { String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, Backend.class); + Backend be = GsonUtils.GSON.fromJson(json, Backend.class); + be.convertToTagMapAndSetLocationTag(); + return be; } @Override @@ -545,32 +577,6 @@ public class Backend implements Writable { Text.writeString(out, json); } - @Deprecated - private void readFields(DataInput in) throws IOException { - id = in.readLong(); - host = Text.readString(in); - heartbeatPort = in.readInt(); - bePort = in.readInt(); - httpPort = in.readInt(); - beRpcPort = in.readInt(); - isAlive.set(in.readBoolean()); - isDecommissioned.set(in.readBoolean()); - lastUpdateMs = in.readLong(); - lastStartTime = in.readLong(); - Map disks = Maps.newHashMap(); - int size = in.readInt(); - for (int i = 0; i < size; i++) { - String rootPath = Text.readString(in); - DiskInfo diskInfo = DiskInfo.read(in); - disks.put(rootPath, diskInfo); - } - disksRef = ImmutableMap.copyOf(disks); - ownerClusterName = Text.readString(in); - backendState = in.readInt(); - decommissionType = in.readInt(); - brpcPort = in.readInt(); - } - @Override public int hashCode() { return Objects.hash(id, host, heartbeatPort, bePort, isAlive); @@ -594,7 +600,7 @@ public class Backend implements Writable { @Override public String toString() { return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() - + ", tag: " + tag + "]"; + + ", tags: " + tagMap + "]"; } public String getOwnerClusterName() { @@ -717,11 +723,34 @@ public class Backend implements Writable { public volatile boolean isLoadDisabled = false; } - public void setTag(Tag tag) { - this.tag = tag; + public Tag getLocationTag() { + return locationTag; } - public Tag getTag() { - return tag; + public void setTagMap(Map tagMap) { + Preconditions.checkState(tagMap.containsKey(Tag.TYPE_LOCATION)); + this.tagMap = tagMap; + this.locationTag = Tag.createNotCheck(Tag.TYPE_LOCATION, tagMap.get(Tag.TYPE_LOCATION)); + } + + public Map getTagMap() { + return tagMap; + } + + public String getTagMapString() { + return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}"; + } + + /** + * Get Tag by type, return Optional.empty if no such tag with given type + * + * @param type + * @return + */ + public Optional getTagByType(String type) { + if (!tagMap.containsKey(type)) { + return Optional.empty(); + } + return Optional.of(Tag.createNotCheck(type, tagMap.get(type))); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index e65c3feec3..5591cbafa9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -98,11 +98,10 @@ public class BeSelectionPolicy { } public boolean isMatch(Backend backend) { - if (needScheduleAvailable && !backend.isScheduleAvailable() - || needQueryAvailable && !backend.isQueryAvailable() - || needLoadAvailable && !backend.isLoadAvailable() - || !resourceTags.isEmpty() && !resourceTags.contains(backend.getTag()) - || storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) { + if (needScheduleAvailable && !backend.isScheduleAvailable() || needQueryAvailable && !backend.isQueryAvailable() + || needLoadAvailable && !backend.isLoadAvailable() || !resourceTags.isEmpty() && !resourceTags.contains( + backend.getLocationTag()) || storageMedium != null && !backend.hasSpecifiedStorageMedium( + storageMedium)) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 5c51163719..ae0c96edcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -96,7 +96,7 @@ public class SystemInfoService { // for deploy manager public void addBackends(List> hostPortPairs, boolean isFree) throws UserException { - addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG); + addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG.toMap()); } /** @@ -105,8 +105,8 @@ public class SystemInfoService { * @param destCluster : if not null or empty backend will be added to destCluster * @throws DdlException */ - public void addBackends(List> hostPortPairs, - boolean isFree, String destCluster, Tag tag) throws UserException { + public void addBackends(List> hostPortPairs, boolean isFree, String destCluster, + Map tagMap) throws UserException { for (Pair pair : hostPortPairs) { // check is already exist if (getBackendWithHeartbeatPort(pair.first, pair.second) != null) { @@ -115,7 +115,7 @@ public class SystemInfoService { } for (Pair pair : hostPortPairs) { - addBackend(pair.first, pair.second, isFree, destCluster, tag); + addBackend(pair.first, pair.second, isFree, destCluster, tagMap); } } @@ -137,7 +137,7 @@ public class SystemInfoService { // Final entry of adding backend private void addBackend(String host, int heartbeatPort, boolean isFree, String destCluster, - Tag tag) throws UserException { + Map tagMap) { Backend newBackend = new Backend(Catalog.getCurrentCatalog().getNextId(), host, heartbeatPort); // update idToBackend Map copiedBackends = Maps.newHashMap(idToBackendRef); @@ -162,7 +162,7 @@ public class SystemInfoService { } // set tags - newBackend.setTag(tag); + newBackend.setTagMap(tagMap); // log Catalog.getCurrentCatalog().getEditLog().logAddBackend(newBackend); @@ -1123,12 +1123,10 @@ public class SystemInfoService { for (Backend be : backends) { boolean shouldModify = false; - if (alterClause.getTag() != null) { - Tag tag = alterClause.getTag(); - if (!be.getTag().equals(tag)) { - be.setTag(tag); - shouldModify = true; - } + Map tagMap = alterClause.getTagMap(); + if (!tagMap.isEmpty()) { + be.setTagMap(tagMap); + shouldModify = true; } if (alterClause.isQueryDisabled() != null) { @@ -1154,7 +1152,7 @@ public class SystemInfoService { public void replayModifyBackend(Backend backend) { Backend memBe = getBackend(backend.getId()); - memBe.setTag(backend.getTag()); + memBe.setTagMap(backend.getTagMap()); memBe.setQueryDisabled(backend.isQueryDisabled()); memBe.setLoadDisabled(backend.isLoadDisabled()); LOG.debug("replay modify backend: {}", backend); @@ -1164,10 +1162,10 @@ public class SystemInfoService { public void checkReplicaAllocation(String cluster, ReplicaAllocation replicaAlloc) throws DdlException { List backends = getClusterBackends(cluster); for (Map.Entry entry : replicaAlloc.getAllocMap().entrySet()) { - if (backends.stream().filter(b -> b.getTag().equals(entry.getKey())).count() - < entry.getValue()) { - throw new DdlException("Failed to find enough host with tag(" + entry.getKey() - + ") in all backends. need: " + entry.getValue()); + if (backends.stream().filter(b -> b.getLocationTag().equals(entry.getKey())).count() < entry.getValue()) { + throw new DdlException( + "Failed to find enough host with tag(" + entry.getKey() + ") in all backends. need: " + + entry.getValue()); } } } @@ -1176,13 +1174,13 @@ public class SystemInfoService { List bes = getClusterBackends(clusterName); Set tags = Sets.newHashSet(); for (Backend be : bes) { - tags.add(be.getTag()); + tags.add(be.getLocationTag()); } return tags; } public List getBackendsByTagInCluster(String clusterName, Tag tag) { List bes = getClusterBackends(clusterName); - return bes.stream().filter(b -> b.getTag().equals(tag)).collect(Collectors.toList()); + return bes.stream().filter(b -> b.getLocationTag().equals(tag)).collect(Collectors.toList()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java index 9187ebe09b..6a81d37e15 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/BackendTest.java @@ -19,11 +19,13 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.common.FeConstants; +import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TDisk; import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -192,9 +194,15 @@ public class BackendTest { back1.updateOnce(1, 1, 1); back2 = new Backend(1, "a", 2); back2.updateOnce(1, 1, 1); + Map tagMap = Maps.newHashMap(); + tagMap.put(Tag.TYPE_LOCATION, "l1"); + tagMap.put("compute", "c1"); + back2.setTagMap(tagMap); Assert.assertFalse(back1.equals(back2)); - Assert.assertEquals("Backend [id=1, host=a, heartbeatPort=1, alive=true, tag: {\"location\" : \"default\"}]", back1.toString()); + Assert.assertEquals("Backend [id=1, host=a, heartbeatPort=1, alive=true, tags: {location=default}]", + back1.toString()); + Assert.assertEquals("{\"compute\" : \"c1\", \"location\" : \"l1\"}", back2.getTagMapString()); // 3. delete files dis.close(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java index c1d4599cd2..3805e61dd4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java @@ -346,7 +346,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend2.isScheduleAvailable(); result = true; minTimes = 0; - myBackend2.getTag(); + myBackend2.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; @@ -363,7 +363,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend3.getLastUpdateMs(); result = System.currentTimeMillis() - (Config.colocate_group_relocate_delay_second + 20) * 1000; minTimes = 0; - myBackend3.getTag(); + myBackend3.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; @@ -380,7 +380,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend4.getLastUpdateMs(); result = System.currentTimeMillis(); minTimes = 0; - myBackend4.getTag(); + myBackend4.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; @@ -397,7 +397,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend5.isDecommissioned(); result = true; minTimes = 0; - myBackend5.getTag(); + myBackend5.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; @@ -439,7 +439,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend2.isScheduleAvailable(); result = true; minTimes = 0; - myBackend2.getTag(); + myBackend2.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; @@ -456,7 +456,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend3.getLastUpdateMs(); result = System.currentTimeMillis() - (Config.colocate_group_relocate_delay_second + 20) * 1000; minTimes = 0; - myBackend3.getTag(); + myBackend3.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; @@ -473,7 +473,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend4.getLastUpdateMs(); result = System.currentTimeMillis(); minTimes = 0; - myBackend4.getTag(); + myBackend4.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; @@ -490,7 +490,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend5.isDecommissioned(); result = true; minTimes = 0; - myBackend5.getTag(); + myBackend5.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; @@ -507,7 +507,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend6.isDecommissioned(); result = false; minTimes = 0; - myBackend6.getTag(); + myBackend6.getLocationTag(); result = Tag.create(Tag.TYPE_LOCATION, "new_loc"); minTimes = 0; @@ -524,7 +524,7 @@ public class ColocateTableCheckerAndBalancerTest { myBackend7.isDecommissioned(); result = false; minTimes = 0; - myBackend7.getTag(); + myBackend7.getLocationTag(); result = Tag.DEFAULT_BACKEND_TAG; minTimes = 0; myBackend7.getId(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java index 718857a0aa..e542eb7c6c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java @@ -19,6 +19,7 @@ package org.apache.doris.clone; import org.apache.doris.analysis.AlterSystemStmt; import org.apache.doris.analysis.AlterTableStmt; +import org.apache.doris.analysis.BackendClause; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DropTableStmt; @@ -208,25 +209,47 @@ public class TabletRepairAndBalanceTest { AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt); } + + // Test set tag without location type, expect throw exception + Backend be1 = backends.get(0); + String alterString = "alter system modify backend \"" + be1.getHost() + ":" + be1.getHeartbeatPort() + + "\" set ('tag.compute' = 'abc')"; + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, BackendClause.NEED_LOCATION_TAG_MSG, + () -> UtFrameUtils.parseAndAnalyzeStmt(alterString, connectContext)); + + // Test set multi tag for a Backend when Config.enable_multi_tags is false + Config.enable_multi_tags = false; + String alterString2 = "alter system modify backend \"" + be1.getHost() + ":" + be1.getHeartbeatPort() + + "\" set ('tag.location' = 'zone3', 'tag.compution' = 'abc')"; + ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, BackendClause.MUTLI_TAG_DISABLED_MSG, + () -> UtFrameUtils.parseAndAnalyzeStmt(alterString2, connectContext)); + + // Test set multi tag for a Backend when Config.enable_multi_tags is true + Config.enable_multi_tags = true; + String stmtStr3 = "alter system modify backend \"" + be1.getHost() + ":" + be1.getHeartbeatPort() + + "\" set ('tag.location' = 'zone1', 'tag.compute' = 'c1')"; + AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr3, connectContext); + DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt); + Map tagMap = be1.getTagMap(); + Assert.assertEquals(2, tagMap.size()); + Assert.assertEquals("zone1", tagMap.get(Tag.TYPE_LOCATION)); + Assert.assertEquals("c1", tagMap.get("compute")); + Assert.assertEquals(Tag.createNotCheck(Tag.TYPE_LOCATION, "zone1"), be1.getLocationTag()); + Tag zone1 = Tag.create(Tag.TYPE_LOCATION, "zone1"); Tag zone2 = Tag.create(Tag.TYPE_LOCATION, "zone2"); - Assert.assertEquals(zone1, backends.get(0).getTag()); - Assert.assertEquals(zone1, backends.get(1).getTag()); - Assert.assertEquals(zone1, backends.get(2).getTag()); - Assert.assertEquals(zone2, backends.get(3).getTag()); - Assert.assertEquals(zone2, backends.get(4).getTag()); + Assert.assertEquals(zone1, backends.get(0).getLocationTag()); + Assert.assertEquals(zone1, backends.get(1).getLocationTag()); + Assert.assertEquals(zone1, backends.get(2).getLocationTag()); + Assert.assertEquals(zone2, backends.get(3).getLocationTag()); + Assert.assertEquals(zone2, backends.get(4).getLocationTag()); // create table // 1. no default tag, create will fail - String createStr = "create table test.tbl1\n" - + "(k1 date, k2 int)\n" - + "partition by range(k1)\n" - + "(\n" + String createStr = "create table test.tbl1\n" + "(k1 date, k2 int)\n" + "partition by range(k1)\n" + "(\n" + " partition p1 values less than(\"2021-06-01\"),\n" + " partition p2 values less than(\"2021-07-01\"),\n" - + " partition p3 values less than(\"2021-08-01\")\n" - + ")\n" - + "distributed by hash(k2) buckets 10;"; + + " partition p3 values less than(\"2021-08-01\")\n" + ")\n" + "distributed by hash(k2) buckets 10;"; ExceptionChecker.expectThrows(DdlException.class, () -> createTable(createStr)); // nodes of zone2 not enough, create will fail @@ -312,9 +335,9 @@ public class TabletRepairAndBalanceTest { Backend be = backends.get(2); String stmtStr = "alter system modify backend \"" + be.getHost() + ":" + be.getHeartbeatPort() + "\" set ('tag.location' = 'zone2')"; - AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); + stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt); - Assert.assertEquals(tag2, be.getTag()); + Assert.assertEquals(tag2, be.getLocationTag()); ExceptionChecker.expectThrows(UserException.class, () -> tbl.checkReplicaAllocation()); checkTableReplicaAllocation(tbl); Assert.assertEquals(90, replicaMetaTable.cellSet().size()); @@ -376,7 +399,7 @@ public class TabletRepairAndBalanceTest { + "\" set ('tag.location' = 'zone1')"; stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt); - Assert.assertEquals(tag1, be.getTag()); + Assert.assertEquals(tag1, be.getLocationTag()); ExceptionChecker.expectThrows(UserException.class, () -> tbl.checkReplicaAllocation()); checkTableReplicaAllocation(colTbl1); @@ -419,26 +442,21 @@ public class TabletRepairAndBalanceTest { Backend backend = backends.get(i); String backendStmt = "alter system modify backend \"" + backend.getHost() + ":" + backend.getHeartbeatPort() + "\" set ('tag.location' = 'default')"; - AlterSystemStmt systemStmt - = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(backendStmt, connectContext); + AlterSystemStmt systemStmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(backendStmt, + connectContext); DdlExecutor.execute(Catalog.getCurrentCatalog(), systemStmt); } - Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(0).getTag()); - Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(1).getTag()); - Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(2).getTag()); - Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(3).getTag()); - Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(4).getTag()); + Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(0).getLocationTag()); + Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(1).getLocationTag()); + Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(2).getLocationTag()); + Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(3).getLocationTag()); + Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG, backends.get(4).getLocationTag()); // create table tbl2 with "replication_num" property - String createStmt = "create table test.tbl2\n" - + "(k1 date, k2 int)\n" - + "partition by range(k1)\n" - + "(\n" + String createStmt = "create table test.tbl2\n" + "(k1 date, k2 int)\n" + "partition by range(k1)\n" + "(\n" + " partition p1 values less than(\"2021-06-01\"),\n" + " partition p2 values less than(\"2021-07-01\"),\n" - + " partition p3 values less than(\"2021-08-01\")\n" - + ")\n" - + "distributed by hash(k2) buckets 10;"; + + " partition p3 values less than(\"2021-08-01\")\n" + ")\n" + "distributed by hash(k2) buckets 10;"; ExceptionChecker.expectThrowsNoException(() -> createTable(createStmt)); OlapTable tbl2 = (OlapTable) db.getTableNullable("tbl2"); ReplicaAllocation defaultAlloc = new ReplicaAllocation((short) 3); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/PropertyAnalyzerTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/PropertyAnalyzerTest.java index 73f329fd28..78bcc1fabb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/PropertyAnalyzerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/PropertyAnalyzerTest.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; @@ -155,8 +156,8 @@ public class PropertyAnalyzerTest { @Test public void testStorageFormat() throws AnalysisException { HashMap propertiesV1 = Maps.newHashMap(); - HashMap propertiesV2 = Maps.newHashMap(); - HashMap propertiesDefault = Maps.newHashMap(); + HashMap propertiesV2 = Maps.newHashMap(); + HashMap propertiesDefault = Maps.newHashMap(); propertiesV1.put(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, "v1"); propertiesV2.put(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, "v2"); propertiesDefault.put(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, "default"); @@ -164,8 +165,24 @@ public class PropertyAnalyzerTest { Assert.assertEquals(TStorageFormat.V2, PropertyAnalyzer.analyzeStorageFormat(propertiesV2)); Assert.assertEquals(TStorageFormat.V2, PropertyAnalyzer.analyzeStorageFormat(propertiesDefault)); expectedEx.expect(AnalysisException.class); - expectedEx.expectMessage("Storage format V1 has been deprecated since version 0.14," - + " please use V2 instead"); + expectedEx.expectMessage( + "Storage format V1 has been deprecated since version 0.14," + " please use V2 instead"); PropertyAnalyzer.analyzeStorageFormat(propertiesV1); } + + @Test + public void testTag() throws AnalysisException { + HashMap properties = Maps.newHashMap(); + properties.put("tag.location", "l1"); + properties.put("other", "prop"); + Map tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties, null); + Assert.assertEquals("l1", tagMap.get("location")); + Assert.assertEquals(1, tagMap.size()); + Assert.assertEquals(1, properties.size()); + + properties.clear(); + tagMap = PropertyAnalyzer.analyzeBackendTagsProperties(properties, Tag.DEFAULT_BACKEND_TAG); + Assert.assertEquals(1, tagMap.size()); + Assert.assertEquals(Tag.DEFAULT_BACKEND_TAG.value, tagMap.get(Tag.TYPE_LOCATION)); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java index 76c7386f35..37a909d601 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java @@ -239,9 +239,9 @@ public class ResourceTagQueryTest { AlterSystemStmt stmt = (AlterSystemStmt) UtFrameUtils.parseAndAnalyzeStmt(stmtStr, connectContext); DdlExecutor.execute(Catalog.getCurrentCatalog(), stmt); } - Assert.assertEquals(tag1, backends.get(0).getTag()); - Assert.assertEquals(tag1, backends.get(1).getTag()); - Assert.assertEquals(tag1, backends.get(2).getTag()); + Assert.assertEquals(tag1, backends.get(0).getLocationTag()); + Assert.assertEquals(tag1, backends.get(1).getLocationTag()); + Assert.assertEquals(tag1, backends.get(2).getLocationTag()); queryStr = "explain select * from test.tbl1"; explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); @@ -251,7 +251,8 @@ public class ResourceTagQueryTest { // for now, 3 backends with tag zone1, 2 with tag default, so table is not stable. ExceptionChecker.expectThrows(UserException.class, () -> tbl.checkReplicaAllocation()); // alter table's replication allocation to zone1:2 and default:1 - String alterStr = "alter table test.tbl1 modify partition (p1, p2, p3) set ('replication_allocation' = 'tag.location.zone1:2, tag.location.default:1')"; + String alterStr + = "alter table test.tbl1 modify partition (p1, p2, p3) set ('replication_allocation' = 'tag.location.zone1:2, tag.location.default:1')"; ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStr)); Map expectedAllocMap = Maps.newHashMap(); expectedAllocMap.put(Tag.DEFAULT_BACKEND_TAG, (short) 1); diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index 2f9831b5e3..753d4c3294 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -152,22 +152,22 @@ public class SystemInfoServiceTest { Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga"); Tag tagb = Tag.create(Tag.TYPE_LOCATION, "tagb"); - be1.setTag(taga); - be2.setTag(taga); - be3.setTag(tagb); - be4.setTag(tagb); - be5.setTag(tagb); + be1.setTagMap(taga.toMap()); + be2.setTagMap(taga.toMap()); + be3.setTagMap(tagb.toMap()); + be4.setTagMap(tagb.toMap()); + be5.setTagMap(tagb.toMap()); - BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable() - .addTags(Sets.newHashSet(taga)).build(); + BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga)) + .build(); Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy7, 1).size()); Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy7, 2).size()); Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10001L)); Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10002L)); Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy7, 3).size()); - BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder() - .needQueryAvailable().addTags(Sets.newHashSet(tagb)).build(); + BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(tagb)) + .build(); Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy8, 3).size()); Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10003L)); Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10004L)); @@ -209,7 +209,7 @@ public class SystemInfoServiceTest { // 8. check same host addBackend(10006, "192.168.1.1", 9051); Backend be6 = infoService.getBackend(10006); - be6.setTag(taga); + be6.setTagMap(taga.toMap()); be6.setAlive(true); addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L); addDisk(be6, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);