[feature-wip](CN Node)Support compute node (#13231)
Introduce the node role to doris, and the table creation and tablet scheduler will control the storage only assign to the BE nodes.
This commit is contained in:
@ -1434,7 +1434,7 @@ public class OlapTable extends Table {
|
||||
Map<Tag, List<Long>> tag2beIds = Maps.newHashMap();
|
||||
for (long beId : replicaBackendIds) {
|
||||
Backend be = infoService.getBackend(beId);
|
||||
if (be == null) {
|
||||
if (be == null || !be.isMixNode()) {
|
||||
continue;
|
||||
}
|
||||
short num = currentReplicaAlloc.getOrDefault(be.getLocationTag(), (short) 0);
|
||||
@ -1896,7 +1896,7 @@ public class OlapTable extends Table {
|
||||
Map<Tag, Short> curMap = Maps.newHashMap();
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
Backend be = infoService.getBackend(replica.getBackendId());
|
||||
if (be == null) {
|
||||
if (be == null || !be.isMixNode()) {
|
||||
continue;
|
||||
}
|
||||
short num = curMap.getOrDefault(be.getLocationTag(), (short) 0);
|
||||
|
||||
@ -430,7 +430,7 @@ public class Tablet extends MetaObject implements Writable {
|
||||
for (Replica replica : replicas) {
|
||||
Backend backend = systemInfoService.getBackend(replica.getBackendId());
|
||||
if (backend == null || !backend.isAlive() || !replica.isAlive() || !hosts.add(backend.getHost())
|
||||
|| replica.tooSlow()) {
|
||||
|| replica.tooSlow() || !backend.isMixNode()) {
|
||||
// this replica is not alive,
|
||||
// or if this replica is on same host with another replica, we also treat it as 'dead',
|
||||
// so that Tablet Scheduler will create a new replica on different host.
|
||||
|
||||
@ -90,6 +90,10 @@ public class ClusterLoadStatistic {
|
||||
// So balance will be blocked.
|
||||
continue;
|
||||
}
|
||||
// only mix node have tablet statistic
|
||||
if (!backend.isMixNode()) {
|
||||
continue;
|
||||
}
|
||||
BackendLoadStatistic beStatistic = new BackendLoadStatistic(backend.getId(), backend.getOwnerClusterName(),
|
||||
backend.getLocationTag(), infoService, invertedIndex);
|
||||
try {
|
||||
|
||||
@ -588,6 +588,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
Backend be = infoService.getBackend(backendId);
|
||||
if (be == null) {
|
||||
return false;
|
||||
} else if (!be.isMixNode()) {
|
||||
return false;
|
||||
} else if (!be.getLocationTag().equals(tag) || excludedBeIds.contains(be.getId())) {
|
||||
return false;
|
||||
} else if (!be.isScheduleAvailable()) {
|
||||
|
||||
@ -713,7 +713,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
Map<Tag, Short> currentAllocMap = Maps.newHashMap();
|
||||
for (Replica replica : replicas) {
|
||||
Backend be = infoService.getBackend(replica.getBackendId());
|
||||
if (be != null && be.isScheduleAvailable() && replica.isAlive() && !replica.tooSlow()) {
|
||||
if (be != null && be.isScheduleAvailable() && replica.isAlive() && !replica.tooSlow()
|
||||
&& be.isMixNode()) {
|
||||
Short num = currentAllocMap.getOrDefault(be.getLocationTag(), (short) 0);
|
||||
currentAllocMap.put(be.getLocationTag(), (short) (num + 1));
|
||||
}
|
||||
@ -979,7 +980,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
Map<Tag, Short> allocMap = tabletCtx.getReplicaAlloc().getAllocMap();
|
||||
for (Replica replica : replicas) {
|
||||
Backend be = infoService.getBackend(replica.getBackendId());
|
||||
if (!allocMap.containsKey(be.getLocationTag())) {
|
||||
if (be.isMixNode() && !allocMap.containsKey(be.getLocationTag())) {
|
||||
deleteReplicaInternal(tabletCtx, replica, "not in valid tag", force);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1788,6 +1788,14 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = false)
|
||||
public static int statistic_task_scheduler_execution_interval_ms = 60 * 1000;
|
||||
|
||||
/**
|
||||
* The candidate of the backend node for federation query such as hive table and es table query.
|
||||
* If the backend of computation role is less than this value, it will acquire some mix backend.
|
||||
* If the computation backend is enough, federation query will only assign to computation backend.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = false)
|
||||
public static int backend_num_for_federation = 3;
|
||||
|
||||
/**
|
||||
* Max query profile num.
|
||||
*/
|
||||
|
||||
@ -52,7 +52,7 @@ public class BackendsProcDir implements ProcDirInterface {
|
||||
.add("SystemDecommissioned").add("ClusterDecommissioned").add("TabletNum")
|
||||
.add("DataUsedCapacity").add("AvailCapacity").add("TotalCapacity").add("UsedPct")
|
||||
.add("MaxDiskUsedPct").add("RemoteUsedCapacity").add("Tag").add("ErrMsg").add("Version").add("Status")
|
||||
.add("HeartbeatFailureCounter")
|
||||
.add("HeartbeatFailureCounter").add("NodeRole")
|
||||
.build();
|
||||
|
||||
public static final int HOSTNAME_INDEX = 3;
|
||||
@ -182,6 +182,9 @@ public class BackendsProcDir implements ProcDirInterface {
|
||||
// heartbeat failure counter
|
||||
backendInfo.add(backend.getHeartbeatFailureCounter());
|
||||
|
||||
// node role, show the value only when backend is alive.
|
||||
backendInfo.add(backend.isAlive() ? backend.getNodeRoleTag().value : "");
|
||||
|
||||
comparableBackendInfos.add(backendInfo);
|
||||
}
|
||||
|
||||
|
||||
@ -71,7 +71,7 @@ public class ClusterLoadStatByTag implements ProcDirInterface {
|
||||
List<Long> beIds = Env.getCurrentSystemInfo().getBackendIds(false);
|
||||
for (long beId : beIds) {
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
|
||||
if (be != null) {
|
||||
if (be != null && be.isMixNode()) {
|
||||
tags.add(be.getLocationTag());
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,7 +77,6 @@ import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -107,8 +106,6 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
private final Random random = new Random(System.currentTimeMillis());
|
||||
|
||||
// File groups need to
|
||||
private List<TScanRangeLocations> locationsList;
|
||||
|
||||
@ -430,15 +427,10 @@ public class BrokerScanNode extends LoadScanNode {
|
||||
// broker scan node is used for query or load
|
||||
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needQueryAvailable().needLoadAvailable()
|
||||
.addTags(tags).build();
|
||||
for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) {
|
||||
if (policy.isMatch(be)) {
|
||||
backends.add(be);
|
||||
}
|
||||
}
|
||||
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values()));
|
||||
if (backends.isEmpty()) {
|
||||
throw new UserException("No available backends");
|
||||
}
|
||||
Collections.shuffle(backends, random);
|
||||
}
|
||||
|
||||
private TFileFormatType formatType(String fileFormat, String path) throws UserException {
|
||||
|
||||
@ -661,6 +661,9 @@ public class OlapScanNode extends ScanNode {
|
||||
errs.add(replica.getId() + "'s backend " + replica.getBackendId() + " does not exist or not alive");
|
||||
continue;
|
||||
}
|
||||
if (!backend.isMixNode()) {
|
||||
continue;
|
||||
}
|
||||
if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getLocationTag())) {
|
||||
String err = String.format(
|
||||
"Replica on backend %d with tag %s," + " which is not in user's resource tags: %s",
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.planner.external;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.mysql.privilege.UserProperty;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -30,9 +31,7 @@ import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
public class BackendPolicy {
|
||||
@ -58,17 +57,13 @@ public class BackendPolicy {
|
||||
.needQueryAvailable()
|
||||
.needLoadAvailable()
|
||||
.addTags(tags)
|
||||
.preferComputeNode()
|
||||
.assignCandidateNum(Config.backend_num_for_federation)
|
||||
.build();
|
||||
for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) {
|
||||
if (policy.isMatch(be)) {
|
||||
backends.add(be);
|
||||
}
|
||||
}
|
||||
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values()));
|
||||
if (backends.isEmpty()) {
|
||||
throw new UserException("No available backends");
|
||||
}
|
||||
Random random = new Random(System.currentTimeMillis());
|
||||
Collections.shuffle(backends, random);
|
||||
}
|
||||
|
||||
public Backend getNextBe() {
|
||||
|
||||
@ -55,13 +55,13 @@ public class Tag implements Writable {
|
||||
public static final String TYPE_ROLE = "role";
|
||||
public static final String TYPE_FUNCTION = "function";
|
||||
public static final String TYPE_LOCATION = "location";
|
||||
|
||||
public static final String VALUE_FRONTEND = "frontend";
|
||||
public static final String VALUE_BACKEND = "backend";
|
||||
public static final String VALUE_BROKER = "broker";
|
||||
public static final String VALUE_REMOTE_STORAGE = "remote_storage";
|
||||
public static final String VALUE_STORE = "store";
|
||||
public static final String VALUE_COMPUTATION = "computation";
|
||||
public static final String VALUE_MIX = "mix";
|
||||
public static final String VALUE_DEFAULT_CLUSTER = "default_cluster";
|
||||
public static final String VALUE_DEFAULT_TAG = "default";
|
||||
public static final String VALUE_INVALID_TAG = "invalid";
|
||||
@ -70,16 +70,18 @@ public class Tag implements Writable {
|
||||
TYPE_ROLE, TYPE_FUNCTION, TYPE_LOCATION);
|
||||
public static final ImmutableSet<String> RESERVED_TAG_VALUES = ImmutableSet.of(
|
||||
VALUE_FRONTEND, VALUE_BACKEND, VALUE_BROKER, VALUE_REMOTE_STORAGE, VALUE_STORE, VALUE_COMPUTATION,
|
||||
VALUE_DEFAULT_CLUSTER);
|
||||
VALUE_MIX, VALUE_DEFAULT_CLUSTER);
|
||||
private static final String TAG_TYPE_REGEX = "^[a-z][a-z0-9_]{0,32}$";
|
||||
private static final String TAG_VALUE_REGEX = "^[a-zA-Z][a-zA-Z0-9_]{0,32}$";
|
||||
|
||||
|
||||
public static final Tag DEFAULT_BACKEND_TAG;
|
||||
public static final Tag DEFAULT_NODE_ROLE_TAG;
|
||||
public static final Tag INVALID_TAG;
|
||||
|
||||
static {
|
||||
DEFAULT_BACKEND_TAG = new Tag(TYPE_LOCATION, VALUE_DEFAULT_TAG);
|
||||
DEFAULT_NODE_ROLE_TAG = new Tag(TYPE_ROLE, VALUE_MIX);
|
||||
INVALID_TAG = new Tag(TYPE_LOCATION, VALUE_INVALID_TAG);
|
||||
}
|
||||
|
||||
@ -107,6 +109,11 @@ public class Tag implements Writable {
|
||||
return new Tag(type, value);
|
||||
}
|
||||
|
||||
// only support be and cn node role tag for be.
|
||||
public static boolean validNodeRoleTag(String value) {
|
||||
return value != null && (value.equals(VALUE_MIX) || value.equals(VALUE_COMPUTATION));
|
||||
}
|
||||
|
||||
public String toKey() {
|
||||
return type + "_" + value;
|
||||
}
|
||||
|
||||
@ -124,6 +124,10 @@ public class Backend implements Writable {
|
||||
// creating this everytime we get it.
|
||||
@SerializedName(value = "locationTag", alternate = {"tag"})
|
||||
private Tag locationTag = Tag.DEFAULT_BACKEND_TAG;
|
||||
|
||||
@SerializedName("nodeRole")
|
||||
private Tag nodeRoleTag = Tag.DEFAULT_NODE_ROLE_TAG;
|
||||
|
||||
// tag type -> tag value.
|
||||
// A backend can only be assigned to one tag type, and each type can only have one value.
|
||||
@SerializedName("tagMap")
|
||||
@ -692,6 +696,12 @@ public class Backend implements Writable {
|
||||
this.brpcPort = hbResponse.getBrpcPort();
|
||||
}
|
||||
|
||||
if (!this.getNodeRoleTag().value.equals(hbResponse.getNodeRole()) && Tag.validNodeRoleTag(
|
||||
hbResponse.getNodeRole())) {
|
||||
isChanged = true;
|
||||
this.nodeRoleTag = Tag.createNotCheck(Tag.TYPE_ROLE, hbResponse.getNodeRole());
|
||||
}
|
||||
|
||||
this.lastUpdateMs = hbResponse.getHbTime();
|
||||
if (!isAlive.get()) {
|
||||
isChanged = true;
|
||||
@ -762,10 +772,25 @@ public class Backend implements Writable {
|
||||
return locationTag;
|
||||
}
|
||||
|
||||
public Tag getNodeRoleTag() {
|
||||
return nodeRoleTag;
|
||||
}
|
||||
|
||||
public boolean isMixNode() {
|
||||
return nodeRoleTag.value.equals(Tag.VALUE_MIX);
|
||||
}
|
||||
|
||||
public boolean isComputeNode() {
|
||||
return nodeRoleTag.value.equals(Tag.VALUE_COMPUTATION);
|
||||
}
|
||||
|
||||
public void setTagMap(Map<String, String> tagMap) {
|
||||
Preconditions.checkState(tagMap.containsKey(Tag.TYPE_LOCATION));
|
||||
this.tagMap = tagMap;
|
||||
this.locationTag = Tag.createNotCheck(Tag.TYPE_LOCATION, tagMap.get(Tag.TYPE_LOCATION));
|
||||
if (tagMap.containsKey(Tag.TYPE_ROLE) && Tag.validNodeRoleTag(tagMap.get(Tag.TYPE_ROLE))) {
|
||||
this.nodeRoleTag = Tag.createNotCheck(Tag.TYPE_ROLE, tagMap.get(Tag.TYPE_ROLE));
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, String> getTagMap() {
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.system;
|
||||
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.resource.Tag;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
@ -34,6 +35,7 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable {
|
||||
private long beStartTime;
|
||||
private String host;
|
||||
private String version = "";
|
||||
private String nodeRole = Tag.VALUE_MIX;
|
||||
|
||||
public BackendHbResponse() {
|
||||
super(HeartbeatResponse.Type.BACKEND);
|
||||
@ -52,6 +54,20 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable {
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public BackendHbResponse(long beId, int bePort, int httpPort, int brpcPort,
|
||||
long hbTime, long beStartTime, String version, String nodeRole) {
|
||||
super(HeartbeatResponse.Type.BACKEND);
|
||||
this.beId = beId;
|
||||
this.status = HbStatus.OK;
|
||||
this.bePort = bePort;
|
||||
this.httpPort = httpPort;
|
||||
this.brpcPort = brpcPort;
|
||||
this.hbTime = hbTime;
|
||||
this.beStartTime = beStartTime;
|
||||
this.version = version;
|
||||
this.nodeRole = nodeRole;
|
||||
}
|
||||
|
||||
public BackendHbResponse(long beId, String errMsg) {
|
||||
super(HeartbeatResponse.Type.BACKEND);
|
||||
this.status = HbStatus.BAD;
|
||||
@ -91,6 +107,10 @@ public class BackendHbResponse extends HeartbeatResponse implements Writable {
|
||||
return version;
|
||||
}
|
||||
|
||||
public String getNodeRole() {
|
||||
return nodeRole;
|
||||
}
|
||||
|
||||
public static BackendHbResponse read(DataInput in) throws IOException {
|
||||
BackendHbResponse result = new BackendHbResponse();
|
||||
result.readFields(in);
|
||||
|
||||
@ -20,9 +20,14 @@ package org.apache.doris.system;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.collect.ImmutableCollection;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Selection policy for building BE nodes
|
||||
@ -41,6 +46,9 @@ public class BeSelectionPolicy {
|
||||
// If set to false, do not select backends on same host.
|
||||
public boolean allowOnSameHost = false;
|
||||
|
||||
public boolean preferComputeNode = false;
|
||||
public int candidateNum = Integer.MAX_VALUE;
|
||||
|
||||
private BeSelectionPolicy() {
|
||||
|
||||
}
|
||||
@ -92,12 +100,27 @@ public class BeSelectionPolicy {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder preferComputeNode() {
|
||||
policy.preferComputeNode = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder assignCandidateNum(int candidateNum) {
|
||||
policy.candidateNum = candidateNum;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BeSelectionPolicy build() {
|
||||
return policy;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isMatch(Backend backend) {
|
||||
private boolean isMatch(Backend backend) {
|
||||
// Compute node is only used when preferComputeNode is set.
|
||||
if (!preferComputeNode && backend.isComputeNode()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (needScheduleAvailable && !backend.isScheduleAvailable() || needQueryAvailable && !backend.isQueryAvailable()
|
||||
|| needLoadAvailable && !backend.isLoadAvailable() || !resourceTags.isEmpty() && !resourceTags.contains(
|
||||
backend.getLocationTag()) || storageMedium != null && !backend.hasSpecifiedStorageMedium(
|
||||
@ -116,6 +139,41 @@ public class BeSelectionPolicy {
|
||||
return true;
|
||||
}
|
||||
|
||||
public List<Backend> getCandidateBackends(ImmutableCollection<Backend> backends) {
|
||||
List<Backend> filterBackends = backends.stream().filter(this::isMatch).collect(Collectors.toList());
|
||||
Collections.shuffle(filterBackends);
|
||||
List<Backend> candidates = new ArrayList<>();
|
||||
if (preferComputeNode) {
|
||||
int num = 0;
|
||||
// pick compute node first
|
||||
for (Backend backend : filterBackends) {
|
||||
if (backend.isComputeNode()) {
|
||||
if (num >= candidateNum) {
|
||||
break;
|
||||
}
|
||||
candidates.add(backend);
|
||||
num++;
|
||||
}
|
||||
}
|
||||
// fill with some mix node.
|
||||
if (num < candidateNum) {
|
||||
for (Backend backend : filterBackends) {
|
||||
if (backend.isMixNode()) {
|
||||
if (num >= candidateNum) {
|
||||
break;
|
||||
}
|
||||
candidates.add(backend);
|
||||
num++;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
candidates.addAll(filterBackends);
|
||||
}
|
||||
|
||||
return candidates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.Version;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.persist.HbPackage;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.system.HeartbeatResponse.HbStatus;
|
||||
import org.apache.doris.thrift.FrontendService;
|
||||
@ -253,8 +254,12 @@ public class HeartbeatMgr extends MasterDaemon {
|
||||
long beStartTime = tBackendInfo.isSetBeStartTime()
|
||||
? tBackendInfo.getBeStartTime() : System.currentTimeMillis();
|
||||
// backend.updateOnce(bePort, httpPort, beRpcPort, brpcPort);
|
||||
String nodeRole = Tag.VALUE_MIX;
|
||||
if (tBackendInfo.isSetBeNodeRole()) {
|
||||
nodeRole = tBackendInfo.getBeNodeRole();
|
||||
}
|
||||
return new BackendHbResponse(backendId, bePort, httpPort, brpcPort,
|
||||
System.currentTimeMillis(), beStartTime, version);
|
||||
System.currentTimeMillis(), beStartTime, version, nodeRole);
|
||||
} else {
|
||||
return new BackendHbResponse(backendId, backend.getHost(),
|
||||
result.getStatus().getErrorMsgs().isEmpty()
|
||||
|
||||
@ -781,8 +781,7 @@ public class SystemInfoService {
|
||||
*/
|
||||
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
|
||||
Preconditions.checkArgument(number >= -1);
|
||||
List<Backend> candidates =
|
||||
idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
|
||||
List<Backend> candidates = policy.getCandidateBackends(idToBackendRef.values());
|
||||
if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) {
|
||||
LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
|
||||
return Lists.newArrayList();
|
||||
@ -1162,7 +1161,8 @@ public class SystemInfoService {
|
||||
public void checkReplicaAllocation(String cluster, ReplicaAllocation replicaAlloc) throws DdlException {
|
||||
List<Backend> backends = getClusterBackends(cluster);
|
||||
for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) {
|
||||
if (backends.stream().filter(b -> b.getLocationTag().equals(entry.getKey())).count() < entry.getValue()) {
|
||||
if (backends.stream().filter(Backend::isMixNode).filter(b -> b.getLocationTag().equals(entry.getKey()))
|
||||
.count() < entry.getValue()) {
|
||||
throw new DdlException(
|
||||
"Failed to find enough host with tag(" + entry.getKey() + ") in all backends. need: "
|
||||
+ entry.getValue());
|
||||
@ -1174,6 +1174,9 @@ public class SystemInfoService {
|
||||
List<Backend> bes = getClusterBackends(clusterName);
|
||||
Set<Tag> tags = Sets.newHashSet();
|
||||
for (Backend be : bes) {
|
||||
if (be == null || !be.isMixNode()) {
|
||||
continue;
|
||||
}
|
||||
tags.add(be.getLocationTag());
|
||||
}
|
||||
return tags;
|
||||
@ -1181,6 +1184,7 @@ public class SystemInfoService {
|
||||
|
||||
public List<Backend> getBackendsByTagInCluster(String clusterName, Tag tag) {
|
||||
List<Backend> bes = getClusterBackends(clusterName);
|
||||
return bes.stream().filter(b -> b.getLocationTag().equals(tag)).collect(Collectors.toList());
|
||||
return bes.stream().filter(Backend::isMixNode).filter(b -> b.getLocationTag().equals(tag))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@ -42,6 +42,7 @@ public class ClusterLoadStatisticsTest {
|
||||
private Backend be1;
|
||||
private Backend be2;
|
||||
private Backend be3;
|
||||
private Backend be4;
|
||||
|
||||
private Env env;
|
||||
private SystemInfoService systemInfoService;
|
||||
@ -119,10 +120,27 @@ public class ClusterLoadStatisticsTest {
|
||||
be3.setAlive(true);
|
||||
be3.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
|
||||
// compute role node
|
||||
be4 = new Backend(10004, "192.168.0.4", 9053);
|
||||
disks = Maps.newHashMap();
|
||||
diskInfo1 = new DiskInfo("/path1");
|
||||
diskInfo1.setTotalCapacityB(4000000);
|
||||
diskInfo1.setAvailableCapacityB(100000);
|
||||
diskInfo1.setDataUsedCapacityB(80000);
|
||||
disks.put(diskInfo1.getRootPath(), diskInfo1);
|
||||
|
||||
be4.setDisks(ImmutableMap.copyOf(disks));
|
||||
be4.setAlive(true);
|
||||
be4.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
Map<String, String> tagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
|
||||
tagMap.put(Tag.TYPE_ROLE, Tag.VALUE_COMPUTATION);
|
||||
be4.setTagMap(tagMap);
|
||||
|
||||
systemInfoService = new SystemInfoService();
|
||||
systemInfoService.addBackend(be1);
|
||||
systemInfoService.addBackend(be2);
|
||||
systemInfoService.addBackend(be3);
|
||||
systemInfoService.addBackend(be4);
|
||||
|
||||
// tablet
|
||||
invertedIndex = new TabletInvertedIndex();
|
||||
|
||||
@ -349,6 +349,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend2.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend2.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend3 not available, and dead for a long time
|
||||
infoService.getBackend(3L);
|
||||
@ -366,6 +369,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend3.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend3.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend4 not available, and dead for a short time
|
||||
infoService.getBackend(4L);
|
||||
@ -383,6 +389,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend4.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend4.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend5 not available, and in decommission
|
||||
infoService.getBackend(5L);
|
||||
@ -400,6 +409,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend5.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend5.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
colocateTableIndex.getBackendsByGroup(groupId, tag);
|
||||
result = allBackendsInGroup;
|
||||
@ -420,7 +432,8 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
@Mocked Backend myBackend4,
|
||||
@Mocked Backend myBackend5,
|
||||
@Mocked Backend myBackend6,
|
||||
@Mocked Backend myBackend7) throws AnalysisException {
|
||||
@Mocked Backend myBackend7,
|
||||
@Mocked Backend myBackend8) throws AnalysisException {
|
||||
List<Long> clusterBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L);
|
||||
new Expectations() {
|
||||
{
|
||||
@ -442,6 +455,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend2.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend2.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend3 not available, and dead for a long time
|
||||
infoService.getBackend(3L);
|
||||
@ -459,6 +475,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend3.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend3.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend4 available, not alive but dead for a short time
|
||||
infoService.getBackend(4L);
|
||||
@ -476,6 +495,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend4.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend4.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend5 not available, and in decommission
|
||||
infoService.getBackend(5L);
|
||||
@ -493,6 +515,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend5.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend5.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend6 is available, but with different tag
|
||||
infoService.getBackend(5L);
|
||||
@ -510,6 +535,9 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend6.getLocationTag();
|
||||
result = Tag.create(Tag.TYPE_LOCATION, "new_loc");
|
||||
minTimes = 0;
|
||||
myBackend6.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
|
||||
// backend7 is available, but in exclude sets
|
||||
infoService.getBackend(5L);
|
||||
@ -527,9 +555,32 @@ public class ColocateTableCheckerAndBalancerTest {
|
||||
myBackend7.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend7.isMixNode();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
myBackend7.getId();
|
||||
result = 999L;
|
||||
minTimes = 0;
|
||||
|
||||
// backend8 is available, it's a compute node.
|
||||
infoService.getBackend(5L);
|
||||
result = myBackend8;
|
||||
minTimes = 0;
|
||||
myBackend8.isScheduleAvailable();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend8.isAlive();
|
||||
result = true;
|
||||
minTimes = 0;
|
||||
myBackend8.isDecommissioned();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
myBackend8.getLocationTag();
|
||||
result = Tag.DEFAULT_BACKEND_TAG;
|
||||
minTimes = 0;
|
||||
myBackend8.isMixNode();
|
||||
result = false;
|
||||
minTimes = 0;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -221,6 +221,57 @@ public class SystemInfoServiceTest {
|
||||
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy15, 2).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComputeNodeBackendSelect() throws Exception {
|
||||
// only one compute node
|
||||
Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
|
||||
addBackend(20001, "192.168.2.1", 9051);
|
||||
Backend be1 = infoService.getBackend(20001);
|
||||
setComputeNode(be1, taga);
|
||||
be1.setAlive(true);
|
||||
addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
|
||||
BeSelectionPolicy policy01 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
|
||||
.setStorageMedium(TStorageMedium.HDD).build();
|
||||
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy01, 1).size());
|
||||
|
||||
BeSelectionPolicy policy02 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
|
||||
.setStorageMedium(TStorageMedium.HDD).preferComputeNode().build();
|
||||
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy02, 1).size());
|
||||
|
||||
BeSelectionPolicy policy03 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
|
||||
.setStorageMedium(TStorageMedium.HDD).preferComputeNode().assignCandidateNum(0).build();
|
||||
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy03, 1).size());
|
||||
|
||||
BeSelectionPolicy policy04 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
|
||||
.setStorageMedium(TStorageMedium.HDD).preferComputeNode().assignCandidateNum(1).build();
|
||||
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy04, 1).size());
|
||||
|
||||
// one compute node and two mix node
|
||||
addBackend(20002, "192.168.2.2", 9051);
|
||||
Backend be2 = infoService.getBackend(20002);
|
||||
be2.setTagMap(taga.toMap());
|
||||
be2.setAlive(true);
|
||||
addDisk(be2, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
|
||||
|
||||
addBackend(20003, "192.168.2.3", 9051);
|
||||
Backend be3 = infoService.getBackend(20003);
|
||||
be3.setTagMap(taga.toMap());
|
||||
be3.setAlive(true);
|
||||
addDisk(be3, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
|
||||
|
||||
BeSelectionPolicy policy05 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
|
||||
.setStorageMedium(TStorageMedium.HDD).build();
|
||||
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy05, 3).size());
|
||||
|
||||
BeSelectionPolicy policy06 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
|
||||
.setStorageMedium(TStorageMedium.HDD).preferComputeNode().assignCandidateNum(2).build();
|
||||
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy06, 2).size());
|
||||
|
||||
BeSelectionPolicy policy07 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
|
||||
.setStorageMedium(TStorageMedium.HDD).preferComputeNode().assignCandidateNum(3).build();
|
||||
Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy07, 3).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectBackendIdsForReplicaCreation() throws Exception {
|
||||
addBackend(10001, "192.168.1.1", 9050);
|
||||
@ -243,6 +294,12 @@ public class SystemInfoServiceTest {
|
||||
Backend be5 = infoService.getBackend(10005);
|
||||
addDisk(be5, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
|
||||
be5.setAlive(true);
|
||||
// no effect with compute node
|
||||
addBackend(10006, "192.168.1.6", 9050);
|
||||
Backend be6 = infoService.getBackend(10006);
|
||||
addDisk(be6, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
|
||||
be6.setAlive(true);
|
||||
setComputeNode(be6, Tag.DEFAULT_BACKEND_TAG);
|
||||
|
||||
ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
|
||||
// also check if the random selection logic can evenly distribute the replica.
|
||||
@ -271,4 +328,10 @@ public class SystemInfoServiceTest {
|
||||
map.put(diskInfo1.getRootPath(), diskInfo1);
|
||||
be.setDisks(ImmutableMap.copyOf(map));
|
||||
}
|
||||
|
||||
private void setComputeNode(Backend be, Tag tag) {
|
||||
Map<String, String> tagMap = tag.toMap();
|
||||
tagMap.put(Tag.TYPE_ROLE, Tag.VALUE_COMPUTATION);
|
||||
be.setTagMap(tagMap);
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,6 +38,7 @@ import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TDisk;
|
||||
@ -199,9 +200,10 @@ public class DemoMultiBackendsTest {
|
||||
ProcResult result = dir.fetchResult();
|
||||
Assert.assertEquals(BackendsProcDir.TITLE_NAMES.size(), result.getColumnNames().size());
|
||||
Assert.assertEquals("{\"location\" : \"default\"}", result.getRows().get(0).get(20));
|
||||
Assert.assertEquals(
|
||||
"{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
|
||||
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 2));
|
||||
Assert.assertEquals("{\"lastSuccessReportTabletsTime\":\"N/A\",\"lastStreamLoadTime\":-1,\"isQueryDisabled\":false,\"isLoadDisabled\":false}",
|
||||
result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 3));
|
||||
Assert.assertEquals("0", result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 2));
|
||||
Assert.assertEquals(Tag.VALUE_MIX, result.getRows().get(0).get(BackendsProcDir.TITLE_NAMES.size() - 1));
|
||||
}
|
||||
|
||||
private static void updateReplicaPathHash() {
|
||||
|
||||
Reference in New Issue
Block a user