[refactor](backend) Refactor the logic of selecting Backend in FE. (#9478)

There are many places in FE where a group of BE nodes needs to be selected according to certain requirements. for example:
1. When creating replicas for a tablet.
2. When selecting a BE to execute Insert.
3. When Stream Load forwards http requests to BE nodes.

These operations all have the same logic. So this CL mainly changes:
1. Create a new `BeSelectionPolicy` class to describe the set of conditions for selecting BE.
2. The logic of selecting BE nodes in `SystemInfoService` has been refactored, and the following two methods are used uniformly:
    1. `selectBackendIdsByPolicy`: Select the required number of BE nodes according to the `BeSelectionPolicy`.
    2. `selectBackendIdsForReplicaCreation`: Select the BE node for the replica creation operation.

Note that there are some changes here:
For the replica creation operation, the round-robin method was used to select BE nodes before,
but now it is changed to `random` selection for the following reasons:
1. Although the previous logic is round-robin, it is actually random.
2. The final diff of the random algorithm will not be greater than 5%, so it can be considered that the random algorithm
     can distribute the data evenly.
This commit is contained in:
Mingyu Chen
2022-05-11 09:40:57 +08:00
committed by GitHub
parent a738d385db
commit 8fa0122ed0
15 changed files with 526 additions and 410 deletions

View File

@ -74,9 +74,6 @@ import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTaskType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
@ -87,6 +84,9 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table.Cell;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@ -1018,7 +1018,8 @@ public class RestoreJob extends AbstractJob {
// replicas
try {
Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null);
Map<Tag, List<Long>> beIds = Catalog.getCurrentSystemInfo()
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
for (Long beId : entry.getValue()) {
long newReplicaId = catalog.getNextId();

View File

@ -271,6 +271,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@ -296,7 +297,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
public class Catalog {
private static final Logger LOG = LogManager.getLogger(Catalog.class);
@ -4543,10 +4543,12 @@ public class Catalog {
// This is the first colocate table in the group, or just a normal table,
// randomly choose backends
if (!Config.disable_storage_medium_check) {
chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName,
chosenBackendIds =
getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
tabletMeta.getStorageMedium());
} else {
chosenBackendIds = getCurrentSystemInfo().chooseBackendIdByFilters(replicaAlloc, clusterName, null);
chosenBackendIds =
getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
}
for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {

View File

@ -500,7 +500,8 @@ public class OlapTable extends Table {
// replicas
try {
Map<Tag, List<Long>> tag2beIds = Catalog.getCurrentSystemInfo().chooseBackendIdByFilters(
Map<Tag, List<Long>> tag2beIds =
Catalog.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, db.getClusterName(), null);
for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
for (Long beId : entry3.getValue()) {

View File

@ -21,12 +21,14 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
@ -41,12 +43,10 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.util.List;
@RestController
public class LoadAction extends RestBaseController {
@ -145,21 +145,7 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult(e.getMessage());
}
} else {
// Choose a backend sequentially.
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, false, true);
List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
1, beAvailablePredicate, false, clusterName, null, null);
if (backendIds == null) {
return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
redirectAddr = selectRedirectBackend(clusterName);
}
LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}",
@ -194,22 +180,7 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected.");
}
// Choose a backend sequentially.
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, false, true);
List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
1, beAvailablePredicate, false, clusterName, null, null);
if (backendIds == null) {
return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
return new RestBaseResult(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
}
TNetworkAddress redirectAddr = new TNetworkAddress(backend.getHost(), backend.getHttpPort());
TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}",
redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation);
@ -220,4 +191,18 @@ public class LoadAction extends RestBaseController {
return new RestBaseResult(e.getMessage());
}
}
private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException {
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(clusterName).needLoadAvailable().build();
List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
}

View File

@ -19,10 +19,11 @@ package org.apache.doris.httpv2.util;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.httpv2.rest.UploadAction;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import com.google.common.base.Strings;
@ -136,19 +137,15 @@ public class LoadSubmitter {
return file;
}
private Backend selectOneBackend() throws DdlException {
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, false, true);
List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
1, beAvailablePredicate, false,
SystemInfoService.DEFAULT_CLUSTER, null, null);
if (backendIds == null) {
throw new DdlException("No alive backend");
private Backend selectOneBackend() throws LoadException {
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendIds.get(0));
if (backend == null) {
throw new DdlException("No alive backend");
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
return backend;
}

View File

@ -26,7 +26,7 @@ import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@ -63,13 +63,11 @@ public class InsertStreamTxnExecutor {
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
StreamLoadPlanner planner = new StreamLoadPlanner(txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask);
TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId());
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, true, true);
List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(
1, beAvailablePredicate, false,
txnEntry.getDb().getClusterName(), null, null);
if (beIds == null || beIds.isEmpty()) {
throw new UserException("there is no backend load available or scanNode backend available.");
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName())
.needLoadAvailable().needQueryAvailable().build();
List<Long> beIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (beIds.isEmpty()) {
throw new UserException("No available backend to match the policy: " + policy);
}
tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());

View File

@ -41,6 +41,7 @@ import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
@ -91,14 +92,13 @@ public class MultiLoadMgr {
if (infoMap.containsKey(multiLabel)) {
throw new LabelAlreadyUsedException(label);
}
MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
SystemInfoService.BeAvailablePredicate beAvailablePredicate =
new SystemInfoService.BeAvailablePredicate(false, false, true);
List<Long> backendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(1,
beAvailablePredicate, false, ConnectContext.get().getClusterName(), null, null);
if (backendIds == null) {
throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().setCluster(ConnectContext.get().getClusterName())
.needLoadAvailable().build();
List<Long> backendIds = Catalog.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (backendIds.isEmpty()) {
throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + " policy: " + policy);
}
MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
multiLoadDesc.setBackendId(backendIds.get(0));
infoMap.put(multiLabel, multiLoadDesc);
} finally {

View File

@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.Sets;
import java.util.Set;
/**
* Selection policy for building BE nodes
*/
public class BeSelectionPolicy {
public String cluster = SystemInfoService.DEFAULT_CLUSTER;
public boolean needScheduleAvailable = false;
public boolean needQueryAvailable = false;
public boolean needLoadAvailable = false;
// Resource tag. Empty means no need to consider resource tag.
public Set<Tag> resourceTags = Sets.newHashSet();
// storage medium. null means no need to consider storage medium.
public TStorageMedium storageMedium = null;
// Check if disk usage reaches limit. false means no need to check.
public boolean checkDiskUsage = false;
// If set to false, do not select backends on same host.
public boolean allowOnSameHost = false;
private BeSelectionPolicy() {
}
public static class Builder {
private BeSelectionPolicy policy;
public Builder() {
policy = new BeSelectionPolicy();
}
public Builder setCluster(String cluster) {
policy.cluster = cluster;
return this;
}
public Builder needScheduleAvailable() {
policy.needScheduleAvailable = true;
return this;
}
public Builder needQueryAvailable() {
policy.needQueryAvailable = true;
return this;
}
public Builder needLoadAvailable() {
policy.needLoadAvailable = true;
return this;
}
public Builder addTags(Set<Tag> tags) {
policy.resourceTags.addAll(tags);
return this;
}
public Builder setStorageMedium(TStorageMedium medium) {
policy.storageMedium = medium;
return this;
}
public Builder needCheckDiskUsage() {
policy.checkDiskUsage = true;
return this;
}
public Builder allowOnSameHost() {
policy.allowOnSameHost = true;
return this;
}
public BeSelectionPolicy build() {
return policy;
}
}
public boolean isMatch(Backend backend) {
if (needScheduleAvailable && !backend.isScheduleAvailable()
|| needQueryAvailable && !backend.isQueryAvailable()
|| needLoadAvailable && !backend.isLoadAvailable()
|| !resourceTags.isEmpty() && !resourceTags.contains(backend.getTag())
|| storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) {
return false;
}
if (checkDiskUsage) {
if (storageMedium == null && backend.diskExceedLimit()) {
return false;
}
if (storageMedium != null && backend.diskExceedLimitByStorageMedium(storageMedium)) {
return false;
}
}
return true;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("cluster|query|load|schedule|tags|medium: ");
sb.append(cluster).append("|");
sb.append(needQueryAvailable).append("|");
sb.append(needLoadAvailable).append("|");
sb.append(needScheduleAvailable).append("|");
sb.append(resourceTags).append("|");
sb.append(storageMedium);
return sb.toString();
}
}

View File

@ -40,7 +40,6 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@ -61,10 +60,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SystemInfoService {
private static final Logger LOG = LogManager.getLogger(SystemInfoService.class);
@ -75,42 +72,16 @@ public class SystemInfoService {
public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is no scanNode Backend available.";
public static class BeAvailablePredicate {
private boolean scheduleAvailable;
private volatile ImmutableMap<Long, Backend> idToBackendRef = ImmutableMap.of();
private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = ImmutableMap.of();
private boolean queryAvailable;
private boolean loadAvailable;
public BeAvailablePredicate(boolean scheduleAvailable, boolean queryAvailable, boolean loadAvailable) {
this.scheduleAvailable = scheduleAvailable;
this.queryAvailable = queryAvailable;
this.loadAvailable = loadAvailable;
}
public boolean isMatch(Backend backend) {
if (scheduleAvailable && !backend.isScheduleAvailable() || queryAvailable && !backend.isQueryAvailable() ||
loadAvailable && !backend.isLoadAvailable()) {
return false;
}
return true;
}
}
private volatile ImmutableMap<Long, Backend> idToBackendRef;
private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef;
// last backend id used by round robin for sequential choosing backends for
// tablet creation
private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap;
// last backend id used by round robin for sequential choosing backends in
// other jobs
private ConcurrentHashMap<String, Long> lastBackendIdForOtherMap;
// last backend id used by round robin for sequential selecting backends for replica creation
private Map<Tag, Long> lastBackendIdForReplicaCreation = Maps.newConcurrentMap();
private long lastBackendIdForCreation = -1;
private long lastBackendIdForOther = -1;
private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef;
private volatile ImmutableMap<Long, DiskInfo> pathHashToDishInfoRef = ImmutableMap.of();
// sort host backends list by num of backends, descending
private static final Comparator<List<Backend>> hostBackendsListComparator = new Comparator<List<Backend>>() {
@ -124,15 +95,6 @@ public class SystemInfoService {
}
};
public SystemInfoService() {
idToBackendRef = ImmutableMap.<Long, Backend>of();
idToReportVersionRef = ImmutableMap.<Long, AtomicLong>of();
lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>();
lastBackendIdForOtherMap = new ConcurrentHashMap<String, Long>();
pathHashToDishInfoRef = ImmutableMap.<Long, DiskInfo>of();
}
// for deploy manager
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean isFree) throws UserException {
addBackends(hostPortPairs, isFree, "", Tag.DEFAULT_BACKEND_TAG);
@ -432,9 +394,6 @@ public class SystemInfoService {
LOG.warn("not enough available backends. require :" + instanceNum + " get:" + chosenBackendIds.size());
return null;
}
lastBackendIdForCreationMap.put(clusterName, (long) -1);
lastBackendIdForOtherMap.put(clusterName, (long) -1);
return chosenBackendIds;
}
@ -462,9 +421,6 @@ public class SystemInfoService {
}
}
}
lastBackendIdForCreationMap.remove(clusterName);
lastBackendIdForOtherMap.remove(clusterName);
}
/**
@ -779,21 +735,35 @@ public class SystemInfoService {
}
// Find enough backend to allocate replica of a tablet.
// filters include: tag, cluster, storage medium
public Map<Tag, List<Long>> chooseBackendIdByFilters(ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium)
/**
* Select a set of backends for replica creation.
* The following parameters need to be considered when selecting backends.
*
* @param replicaAlloc
* @param clusterName
* @param storageMedium
* @return return the selected backend ids group by tag.
* @throws DdlException
*/
public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium)
throws DdlException {
Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
short totalReplicaNum = 0;
BeAvailablePredicate beAvailablePredicate = new BeAvailablePredicate(true, false, false);
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
List<Long> beIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMediumAndTag(entry.getValue(),
beAvailablePredicate, true, clusterName, storageMedium, entry.getKey());
if (beIds == null) {
throw new DdlException("Failed to find enough host with storage medium and tag("
+ (storageMedium == null ? "NaN" : storageMedium) + "/" + entry.getKey()
+ ") in all backends. need: " + entry.getValue());
BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName)
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
.setStorageMedium(storageMedium);
if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
builder.allowOnSameHost();
}
BeSelectionPolicy policy = builder.build();
List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue());
if (beIds.isEmpty()) {
throw new DdlException("Failed to find " + entry.getValue() + " backends for policy: " + policy);
}
chosenBackendIds.put(entry.getKey(), beIds);
totalReplicaNum += beIds.size();
@ -802,61 +772,34 @@ public class SystemInfoService {
return chosenBackendIds;
}
public List<Long> seqChooseBackendIdsByStorageMediumAndTag(int backendNum, BeAvailablePredicate beAvailablePredicate,
boolean isCreate, String clusterName,
TStorageMedium storageMedium, Tag tag) {
Stream<Backend> beStream = getClusterBackends(clusterName).stream();
if (storageMedium == null) {
beStream = beStream.filter(v -> !v.diskExceedLimit());
} else {
beStream = beStream.filter(v -> !v.diskExceedLimitByStorageMedium(storageMedium));
/**
* Select a set of backends by the given policy.
*
* @param policy
* @param number number of backends which need to be selected.
* @return return #number of backend ids,
* or empty set if no backends match the policy, or the number of matched backends is less than "number";
*/
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
List<Backend> candidates =
idToBackendRef.values().stream().filter(policy::isMatch).collect(Collectors.toList());
if (candidates.size() < number) {
return Lists.newArrayList();
}
if (tag != null) {
beStream = beStream.filter(v -> v.getTag().equals(tag));
}
final List<Backend> backends = beStream.collect(Collectors.toList());
return seqChooseBackendIds(backendNum, beAvailablePredicate, isCreate, clusterName, backends);
}
// choose backends by round robin
// return null if not enough backend
// use synchronized to run serially
public synchronized List<Long> seqChooseBackendIds(int backendNum, BeAvailablePredicate beAvailablePredicate,
boolean isCreate, String clusterName,
final List<Backend> srcBackends) {
long lastBackendId;
if (clusterName.equals(DEFAULT_CLUSTER)) {
if (isCreate) {
lastBackendId = lastBackendIdForCreation;
} else {
lastBackendId = lastBackendIdForOther;
}
} else {
if (isCreate) {
if (lastBackendIdForCreationMap.containsKey(clusterName)) {
lastBackendId = lastBackendIdForCreationMap.get(clusterName);
} else {
lastBackendId = -1;
lastBackendIdForCreationMap.put(clusterName, lastBackendId);
}
} else {
if (lastBackendIdForOtherMap.containsKey(clusterName)) {
lastBackendId = lastBackendIdForOtherMap.get(clusterName);
} else {
lastBackendId = -1;
lastBackendIdForOtherMap.put(clusterName, lastBackendId);
}
}
// If only need one Backend, just return a random one.
if (number == 1) {
Collections.shuffle(candidates);
return Lists.newArrayList(candidates.get(0).getId());
}
// host -> BE list
List<Backend> sourceBackend = srcBackends;
if (sourceBackend == null) {
sourceBackend = getClusterBackends(clusterName);
if (policy.allowOnSameHost) {
Collections.shuffle(candidates);
return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
}
// for each host, random select one backend.
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
for (Backend backend : sourceBackend) {
for (Backend backend : candidates) {
if (backendMaps.containsKey(backend.getHost())) {
backendMaps.get(backend.getHost()).add(backend);
} else {
@ -865,94 +808,16 @@ public class SystemInfoService {
backendMaps.put(backend.getHost(), list);
}
}
// if more than one backend exists in same host, select a backend at random
List<Backend> backends = Lists.newArrayList();
candidates.clear();
for (List<Backend> list : backendMaps.values()) {
if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
backends.addAll(list);
} else {
list = list.stream().filter(beAvailablePredicate::isMatch).collect(Collectors.toList());
if (list.isEmpty()) {
continue;
}
Collections.shuffle(list);
backends.add(list.get(0));
}
Collections.shuffle(list);
candidates.add(list.get(0));
}
Collections.shuffle(backends);
List<Long> backendIds = Lists.newArrayList();
// get last backend index
int lastBackendIndex = -1;
int index = -1;
for (Backend backend : backends) {
index++;
if (backend.getId() == lastBackendId) {
lastBackendIndex = index;
break;
}
if (candidates.size() < number) {
return Lists.newArrayList();
}
Iterator<Backend> iterator = Iterators.cycle(backends);
index = -1;
boolean failed = false;
// 2 cycle at most
int maxIndex = 2 * backends.size();
while (iterator.hasNext() && backendIds.size() < backendNum) {
Backend backend = iterator.next();
index++;
if (index <= lastBackendIndex) {
continue;
}
if (index > maxIndex) {
failed = true;
break;
}
if (!beAvailablePredicate.isMatch(backend)) {
continue;
}
long backendId = backend.getId();
if (!backendIds.contains(backendId)) {
backendIds.add(backendId);
lastBackendId = backendId;
} else {
failed = true;
break;
}
}
if (clusterName.equals(DEFAULT_CLUSTER)) {
if (isCreate) {
lastBackendIdForCreation = lastBackendId;
} else {
lastBackendIdForOther = lastBackendId;
}
} else {
// update last backendId
if (isCreate) {
lastBackendIdForCreationMap.put(clusterName, lastBackendId);
} else {
lastBackendIdForOtherMap.put(clusterName, lastBackendId);
}
}
if (backendIds.size() != backendNum) {
failed = true;
}
if (!failed) {
return backendIds;
}
// debug
for (Backend backend : backends) {
LOG.debug("random select: {}", backend.toString());
}
return null;
Collections.shuffle(candidates);
return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList());
}
public ImmutableMap<Long, Backend> getIdToBackend() {

View File

@ -22,7 +22,6 @@ import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
import org.apache.doris.backup.RestoreJob.RestoreJobState;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
@ -38,35 +37,19 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Injectable;
@ -161,12 +144,12 @@ public class RestoreJobTest {
new Expectations() {
{
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
anyString, (TStorageMedium) any);
minTimes = 0;
result = new Delegate() {
public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive,
boolean isCreate, String clusterName) {
public synchronized List<Long> selectBackendIdsForReplicaCreation(
ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) {
List<Long> beIds = Lists.newArrayList();
beIds.add(CatalogMocker.BACKEND1_ID);
beIds.add(CatalogMocker.BACKEND2_ID);
@ -259,113 +242,6 @@ public class RestoreJobTest {
backupMeta = new BackupMeta(tbls, resources);
}
@Ignore
@Test
public void testRun() {
// pending
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
Assert.assertEquals(12, job.getFileMapping().getMapping().size());
// 2. snapshoting
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.SNAPSHOTING, job.getState());
Assert.assertEquals(12 * 2, AgentTaskQueue.getTaskNum());
// 3. snapshot finished
List<AgentTask> agentTasks = Lists.newArrayList();
Map<TTaskType, Set<Long>> runningTasks = Maps.newHashMap();
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
agentTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
Assert.assertEquals(12 * 2, agentTasks.size());
for (AgentTask agentTask : agentTasks) {
if (agentTask.getTaskType() != TTaskType.MAKE_SNAPSHOT) {
continue;
}
SnapshotTask task = (SnapshotTask) agentTask;
String snapshotPath = "/path/to/snapshot/" + System.currentTimeMillis();
TStatus taskStatus = new TStatus(TStatusCode.OK);
TBackend tBackend = new TBackend("", 0, 1);
TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
task.getSignature(), taskStatus);
request.setSnapshotPath(snapshotPath);
Assert.assertTrue(job.finishTabletSnapshotTask(task, request));
}
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.DOWNLOAD, job.getState());
// download
AgentTaskQueue.clearAllTasks();
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
Assert.assertEquals(9, AgentTaskQueue.getTaskNum());
// downloading
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.DOWNLOADING, job.getState());
List<AgentTask> downloadTasks = Lists.newArrayList();
runningTasks = Maps.newHashMap();
downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
downloadTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
Assert.assertEquals(9, downloadTasks.size());
List<Long> downloadedTabletIds = Lists.newArrayList();
for (AgentTask agentTask : downloadTasks) {
TStatus taskStatus = new TStatus(TStatusCode.OK);
TBackend tBackend = new TBackend("", 0, 1);
TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
agentTask.getSignature(), taskStatus);
request.setDownloadedTabletIds(downloadedTabletIds);
Assert.assertTrue(job.finishTabletDownloadTask((DownloadTask) agentTask, request));
}
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.COMMIT, job.getState());
// commit
AgentTaskQueue.clearAllTasks();
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
Assert.assertEquals(12, AgentTaskQueue.getTaskNum());
// committing
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.COMMITTING, job.getState());
List<AgentTask> dirMoveTasks = Lists.newArrayList();
runningTasks = Maps.newHashMap();
dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND1_ID, runningTasks));
dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND2_ID, runningTasks));
dirMoveTasks.addAll(AgentTaskQueue.getDiffTasks(CatalogMocker.BACKEND3_ID, runningTasks));
Assert.assertEquals(12, dirMoveTasks.size());
for (AgentTask agentTask : dirMoveTasks) {
TStatus taskStatus = new TStatus(TStatusCode.OK);
TBackend tBackend = new TBackend("", 0, 1);
TFinishTaskRequest request = new TFinishTaskRequest(tBackend, TTaskType.MAKE_SNAPSHOT,
agentTask.getSignature(), taskStatus);
job.finishDirMoveTask((DirMoveTask) agentTask, request);
}
job.run();
Assert.assertEquals(Status.OK, job.getStatus());
Assert.assertEquals(RestoreJobState.FINISHED, job.getState());
}
@Test
public void testSignature() throws AnalysisException {
Adler32 sig1 = new Adler32();

View File

@ -241,8 +241,7 @@ public class CreateTableTest {
+ "properties('replication_num' = '1', 'short_key' = '4');"));
ExceptionChecker
.expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " +
"tag(NaN/{\"location\" : \"default\"}) in all backends. need: 3",
.expectThrowsWithMsg(DdlException.class, "Failed to find 3 backends for policy",
() -> createTable("create table test.atbl5\n" + "(k1 int, k2 int, k3 int)\n"
+ "duplicate key(k1, k2, k3)\n" + "distributed by hash(k1) buckets 1\n"
+ "properties('replication_num' = '3');"));
@ -259,8 +258,7 @@ public class CreateTableTest {
ConfigBase.setMutableConfig("disable_storage_medium_check", "false");
ExceptionChecker
.expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium and " +
"tag(SSD/{\"location\" : \"default\"}) in all backends. need: 1",
.expectThrowsWithMsg(DdlException.class, " Failed to find 1 backends for policy:",
() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n"
+ "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));

View File

@ -86,7 +86,7 @@ public class ModifyBackendTest {
");";
CreateTableStmt createStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext);
ExceptionChecker.expectThrowsWithMsg(DdlException.class,
"Failed to find enough host with storage medium and tag(HDD/{\"location\" : \"default\"}) in all backends. need: 1",
"Failed to find 1 backends for policy:",
() -> DdlExecutor.execute(Catalog.getCurrentCatalog(), createStmt));
createStr = "create table test.tbl1(\n" +
@ -119,7 +119,7 @@ public class ModifyBackendTest {
Database db = Catalog.getCurrentCatalog().getDbNullable("default_cluster:test");
Table tbl3 = db.getTableNullable("tbl3");
String err = Catalog.getCurrentCatalog().getDynamicPartitionScheduler().getRuntimeInfo(tbl3.getId(), DynamicPartitionScheduler.CREATE_PARTITION_MSG);
Assert.assertTrue(err.contains("Failed to find enough host with storage medium and tag"));
Assert.assertTrue(err.contains("Failed to find 1 backends for policy:"));
createStr = "create table test.tbl4(\n" +
"k1 date, k2 int\n" +
@ -171,7 +171,7 @@ public class ModifyBackendTest {
+ " set ('replication_allocation' = 'tag.location.zonex:1')";
AlterTableStmt alterStmt2 = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStr, connectContext);
ExceptionChecker.expectThrowsWithMsg(DdlException.class,
"Failed to find enough host with tag({\"location\" : \"zonex\"}) in all backends. need: 1",
"Failed to find enough host with tag",
() -> DdlExecutor.execute(Catalog.getCurrentCatalog(), alterStmt2));
tblProperties = tableProperty.getProperties();
Assert.assertTrue(tblProperties.containsKey("default.replication_allocation"));

View File

@ -20,12 +20,12 @@ package org.apache.doris.load.sync.canal;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.resource.Tag;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@ -59,7 +59,6 @@ import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
@ -150,8 +149,8 @@ public class CanalSyncDataTest {
minTimes = 0;
result = execPlanFragmentParams;
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any, anyBoolean, anyString,
(TStorageMedium) any, (Tag) any);
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
anyString, (TStorageMedium) any);
minTimes = 0;
result = backendIds;

View File

@ -20,9 +20,8 @@ package org.apache.doris.qe;
import org.apache.doris.backup.CatalogMocker;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.Lists;
@ -31,7 +30,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
@ -45,7 +43,7 @@ public class MultiLoadMgrTest {
@Mocked
private SystemInfoService systemInfoService;
@Before
public void setUp() {
public void setUp() throws Exception {
new Expectations() {
{
ConnectContext.get();
@ -62,13 +60,10 @@ public class MultiLoadMgrTest {
};
new Expectations() {
{
systemInfoService.seqChooseBackendIdsByStorageMediumAndTag(anyInt, (SystemInfoService.BeAvailablePredicate) any,
anyBoolean, anyString, (TStorageMedium) any, (Tag) any);
systemInfoService.selectBackendIdsByPolicy((BeSelectionPolicy) any, anyInt);
minTimes = 0;
result = new Delegate() {
public synchronized List<Long> seqChooseBackendIdsByStorageMediumAndTag(
int backendNum, SystemInfoService.BeAvailablePredicate availablePredicate,
boolean isCreate, String clusterName, TStorageMedium medium, Tag tag) {
public List<Long> selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) {
List<Long> beIds = Lists.newArrayList();
beIds.add(CatalogMocker.BACKEND1_ID);
beIds.add(CatalogMocker.BACKEND2_ID);

View File

@ -0,0 +1,268 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import mockit.Expectations;
import mockit.Mocked;
public class SystemInfoServiceTest {
@Mocked
private Catalog catalog;
@Mocked
private EditLog editLog;
private SystemInfoService infoService;
@Before
public void setUp() {
new Expectations() {
{
catalog.getEditLog();
minTimes = 0;
result = editLog;
editLog.logAddBackend((Backend) any);
minTimes = 0;
Catalog.getCurrentCatalog();
minTimes = 0;
result = catalog;
}
};
infoService = new SystemInfoService();
}
private void addBackend(long beId, String host, int hbPort) {
Backend backend = new Backend(beId, host, hbPort);
infoService.addBackend(backend);
}
@Test
public void testSelectBackendIdsByPolicy() throws Exception {
// 1. no backend
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size());
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 4).size());
// 2. add one backend but not alive
addBackend(10001, "192.168.1.1", 9050);
Backend be1 = infoService.getBackend(10001);
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 1).size());
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy, 0).size());
// policy with no condition
BeSelectionPolicy policy2 = new BeSelectionPolicy.Builder().build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy2, 1).size());
// 3. add more backends
addBackend(10002, "192.168.1.2", 9050);
Backend be2 = infoService.getBackend(10002);
be2.setAlive(true);
addBackend(10003, "192.168.1.3", 9050);
Backend be3 = infoService.getBackend(10003);
be3.setAlive(true);
addBackend(10004, "192.168.1.4", 9050);
Backend be4 = infoService.getBackend(10004);
be4.setAlive(true);
addBackend(10005, "192.168.1.5", 9050);
Backend be5 = infoService.getBackend(10005);
// b1 and be5 is dead, be2,3,4 is alive
BeSelectionPolicy policy3 = new BeSelectionPolicy.Builder().needScheduleAvailable().build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3, 1).size());
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10001L));
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy3, 1).contains(10005L));
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy3, 2).size());
Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy3, 3).size());
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10002L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10003L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy3, 3).contains(10004L));
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy3, 4).size());
// 4. set be status
be2.setLoadDisabled(true);
be3.setQueryDisabled(true);
be4.setDecommissioned(true);
// now, only b3,b4 is loadable, only be2,b4 is queryable, only be2,3 is schedulable
BeSelectionPolicy policy4 = new BeSelectionPolicy.Builder().needScheduleAvailable().build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4, 1).size());
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10001L));
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10004L));
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy4, 1).contains(10005L));
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy4, 2).size());
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10002L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy4, 2).contains(10003L));
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy4, 3).size());
BeSelectionPolicy policy5 = new BeSelectionPolicy.Builder().needLoadAvailable().build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy5, 1).size());
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10001L));
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10002L));
Assert.assertFalse(infoService.selectBackendIdsByPolicy(policy5, 1).contains(10005L));
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy5, 2).size());
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10003L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy5, 2).contains(10004L));
// 5. set tags
// reset all be
be1.setAlive(true);
be2.setLoadDisabled(false);
be3.setQueryDisabled(false);
be5.setAlive(true);
be3.setAlive(true);
be4.setAlive(true);
be4.setDecommissioned(false);
be5.setAlive(true);
BeSelectionPolicy policy6 = new BeSelectionPolicy.Builder().needQueryAvailable().build();
Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy6, 5).size());
Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
Tag tagb = Tag.create(Tag.TYPE_LOCATION, "tagb");
be1.setTag(taga);
be2.setTag(taga);
be3.setTag(tagb);
be4.setTag(tagb);
be5.setTag(tagb);
BeSelectionPolicy policy7 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga)).build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy7, 1).size());
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy7, 2).size());
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10001L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy7, 2).contains(10002L));
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy7, 3).size());
BeSelectionPolicy policy8 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(tagb)).build();
Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy8, 3).size());
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10003L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10004L));
Assert.assertTrue(infoService.selectBackendIdsByPolicy(policy8, 3).contains(10005L));
BeSelectionPolicy policy9 = new BeSelectionPolicy.Builder().needQueryAvailable().addTags(Sets.newHashSet(taga, tagb)).build();
Assert.assertEquals(5, infoService.selectBackendIdsByPolicy(policy9, 5).size());
// 6. check storage medium
addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 1 * 1024 * 1024L);
addDisk(be2, "path2", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
addDisk(be3, "path3", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
addDisk(be4, "path4", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
addDisk(be5, "path5", TStorageMedium.SSD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
BeSelectionPolicy policy10 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga, tagb))
.setStorageMedium(TStorageMedium.SSD).build();
Assert.assertEquals(4, infoService.selectBackendIdsByPolicy(policy10, 4).size());
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy10, 5).size());
BeSelectionPolicy policy11 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(tagb))
.setStorageMedium(TStorageMedium.HDD).build();
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy11, 1).size());
// 7. check disk usage
BeSelectionPolicy policy12 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.setStorageMedium(TStorageMedium.HDD).build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy12, 1).size());
BeSelectionPolicy policy13 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.setStorageMedium(TStorageMedium.HDD).needCheckDiskUsage().build();
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy13, 1).size());
// 8. check same host
addBackend(10006, "192.168.1.1", 9051);
Backend be6 = infoService.getBackend(10006);
be6.setTag(taga);
be6.setAlive(true);
addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
addDisk(be6, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 100 * 1024 * 1024L);
BeSelectionPolicy policy14 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.setStorageMedium(TStorageMedium.HDD).build();
Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy14, 2).size());
BeSelectionPolicy policy15 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.setStorageMedium(TStorageMedium.HDD).allowOnSameHost().build();
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy15, 2).size());
}
@Test
public void testSelectBackendIdsForReplicaCreation() throws Exception {
addBackend(10001, "192.168.1.1", 9050);
Backend be1 = infoService.getBackend(10001);
addDisk(be1, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
be1.setAlive(true);
addBackend(10002, "192.168.1.2", 9050);
Backend be2 = infoService.getBackend(10002);
addDisk(be2, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
be2.setAlive(true);
addBackend(10003, "192.168.1.3", 9050);
Backend be3 = infoService.getBackend(10003);
addDisk(be3, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
be3.setAlive(true);
addBackend(10004, "192.168.1.4", 9050);
Backend be4 = infoService.getBackend(10004);
addDisk(be4, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
be4.setAlive(true);
addBackend(10005, "192.168.1.5", 9050);
Backend be5 = infoService.getBackend(10005);
addDisk(be5, "path1", TStorageMedium.HDD, 200 * 1024 * 1024L, 150 * 1024 * 1024L);
be5.setAlive(true);
ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
// also check if the random selection logic can evenly distribute the replica.
Map<Long, Integer> beCounterMap = Maps.newHashMap();
for (int i = 0; i < 10000; ++i) {
Map<Tag, List<Long>> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
SystemInfoService.DEFAULT_CLUSTER, TStorageMedium.HDD);
Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size());
for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) {
beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);
}
}
System.out.println(beCounterMap);
List<Integer> list = Lists.newArrayList(beCounterMap.values());
Collections.sort(list);
int diff = list.get(list.size() - 1) - list.get(0);
// The max replica num and min replica num's diff is less than 5%.
Assert.assertTrue((diff * 1.0 / list.get(0)) < 0.05);
}
private void addDisk(Backend be, String path, TStorageMedium medium, long totalB, long availB) {
DiskInfo diskInfo1 = new DiskInfo(path);
diskInfo1.setTotalCapacityB(totalB);
diskInfo1.setAvailableCapacityB(availB);
diskInfo1.setStorageMedium(medium);
Map<String, DiskInfo> map = Maps.newHashMap();
map.put(diskInfo1.getRootPath(), diskInfo1);
be.setDisks(ImmutableMap.copyOf(map));
}
}