[feature](table) implement the round robin selection be when create tablet (#19167)
This commit is contained in:
@ -176,6 +176,7 @@ import org.apache.doris.thrift.TStorageType;
|
||||
import org.apache.doris.thrift.TTabletType;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
@ -204,6 +205,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
/**
|
||||
* The Internal catalog will manage all self-managed meta object in a Doris cluster.
|
||||
* Such as Database, tables, etc.
|
||||
@ -2516,7 +2518,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
LOG.info("successfully create table[{}-{}]", tableName, tableId);
|
||||
}
|
||||
|
||||
private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
|
||||
@VisibleForTesting
|
||||
public void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
|
||||
DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta,
|
||||
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws DdlException {
|
||||
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
|
||||
@ -2538,6 +2541,20 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
if (chooseBackendsArbitrary) {
|
||||
backendsPerBucketSeq = Maps.newHashMap();
|
||||
}
|
||||
|
||||
Map<Tag, Integer> nextIndexs = new HashMap<>();
|
||||
|
||||
if (Config.enable_round_robin_create_tablet) {
|
||||
for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) {
|
||||
int startPos = Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(), clusterName,
|
||||
tabletMeta.getStorageMedium());
|
||||
if (startPos == -1) {
|
||||
throw new DdlException("The number of BEs that match the policy is insufficient");
|
||||
}
|
||||
nextIndexs.put(entry.getKey(), startPos);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
|
||||
// create a new tablet with random chosen backends
|
||||
Tablet tablet = new Tablet(idGeneratorBuffer.getNextId());
|
||||
@ -2550,14 +2567,26 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
Map<Tag, List<Long>> chosenBackendIds;
|
||||
if (chooseBackendsArbitrary) {
|
||||
// This is the first colocate table in the group, or just a normal table,
|
||||
// randomly choose backends
|
||||
if (!Config.disable_storage_medium_check) {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
|
||||
tabletMeta.getStorageMedium());
|
||||
// choose backends
|
||||
if (Config.enable_round_robin_create_tablet) {
|
||||
if (!Config.disable_storage_medium_check) {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, clusterName,
|
||||
tabletMeta.getStorageMedium(), nextIndexs);
|
||||
} else {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, clusterName, null,
|
||||
nextIndexs);
|
||||
}
|
||||
} else {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
|
||||
if (!Config.disable_storage_medium_check) {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName,
|
||||
tabletMeta.getStorageMedium());
|
||||
} else {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null);
|
||||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
|
||||
|
||||
@ -861,6 +861,122 @@ public class SystemInfoService {
|
||||
return classMap;
|
||||
}
|
||||
|
||||
class BeComparator implements Comparator<Backend> {
|
||||
public int compare(Backend a, Backend b) {
|
||||
return (int) (a.getId() - b.getId());
|
||||
}
|
||||
}
|
||||
|
||||
public List<Long> selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy policy, int number,
|
||||
int nextIndex) {
|
||||
Preconditions.checkArgument(number >= -1);
|
||||
List<Backend> candidates = getCandidates(policy);
|
||||
if (number != -1 && candidates.size() < number) {
|
||||
LOG.info("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number);
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
int realIndex = nextIndex % candidates.size();
|
||||
List<Long> partialOrderList = new ArrayList<Long>();
|
||||
partialOrderList.addAll(candidates.subList(realIndex, candidates.size())
|
||||
.stream().map(b -> b.getId()).collect(Collectors.toList()));
|
||||
partialOrderList.addAll(candidates.subList(0, realIndex)
|
||||
.stream().map(b -> b.getId()).collect(Collectors.toList()));
|
||||
|
||||
if (number == -1) {
|
||||
return partialOrderList;
|
||||
} else {
|
||||
return partialOrderList.subList(0, number);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Backend> getCandidates(BeSelectionPolicy policy) {
|
||||
List<Backend> candidates = policy.getCandidateBackends(idToBackendRef.values());
|
||||
if (candidates.isEmpty()) {
|
||||
LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size());
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
if (!policy.allowOnSameHost) {
|
||||
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
|
||||
for (Backend backend : candidates) {
|
||||
if (backendMaps.containsKey(backend.getIp())) {
|
||||
backendMaps.get(backend.getIp()).add(backend);
|
||||
} else {
|
||||
List<Backend> list = Lists.newArrayList();
|
||||
list.add(backend);
|
||||
backendMaps.put(backend.getIp(), list);
|
||||
}
|
||||
}
|
||||
candidates.clear();
|
||||
for (List<Backend> list : backendMaps.values()) {
|
||||
candidates.add(list.get(0));
|
||||
}
|
||||
}
|
||||
|
||||
if (candidates.isEmpty()) {
|
||||
LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size());
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
Collections.sort(candidates, new BeComparator());
|
||||
return candidates;
|
||||
}
|
||||
|
||||
// Select the smallest number of tablets as the starting position of
|
||||
// round robin in the BE that match the policy
|
||||
public int getStartPosOfRoundRobin(Tag tag, String clusterName, TStorageMedium storageMedium) {
|
||||
BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName)
|
||||
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag))
|
||||
.setStorageMedium(storageMedium);
|
||||
if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
|
||||
builder.allowOnSameHost();
|
||||
}
|
||||
|
||||
BeSelectionPolicy policy = builder.build();
|
||||
List<Backend> candidates = getCandidates(policy);
|
||||
|
||||
long minBeTabletsNum = Long.MAX_VALUE;
|
||||
int minIndex = -1;
|
||||
for (int i = 0; i < candidates.size(); ++i) {
|
||||
long tabletsNum = Env.getCurrentInvertedIndex()
|
||||
.getTabletIdsByBackendId(candidates.get(i).getId()).size();
|
||||
if (tabletsNum < minBeTabletsNum) {
|
||||
minBeTabletsNum = tabletsNum;
|
||||
minIndex = i;
|
||||
}
|
||||
}
|
||||
return minIndex;
|
||||
}
|
||||
|
||||
public Map<Tag, List<Long>> getBeIdRoundRobinForReplicaCreation(
|
||||
ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium,
|
||||
Map<Tag, Integer> nextIndexs) throws DdlException {
|
||||
Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
|
||||
Map<Tag, Short> allocMap = replicaAlloc.getAllocMap();
|
||||
short totalReplicaNum = 0;
|
||||
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
|
||||
BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName)
|
||||
.needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey()))
|
||||
.setStorageMedium(storageMedium);
|
||||
if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) {
|
||||
builder.allowOnSameHost();
|
||||
}
|
||||
|
||||
BeSelectionPolicy policy = builder.build();
|
||||
int nextIndex = nextIndexs.get(entry.getKey());
|
||||
List<Long> beIds = selectBackendIdsRoundRobinByPolicy(policy, entry.getValue(), nextIndex);
|
||||
nextIndexs.put(entry.getKey(), nextIndex + beIds.size());
|
||||
|
||||
if (beIds.isEmpty()) {
|
||||
throw new DdlException("Failed to find " + entry.getValue() + " backend(s) for policy: " + policy);
|
||||
}
|
||||
chosenBackendIds.put(entry.getKey(), beIds);
|
||||
totalReplicaNum += beIds.size();
|
||||
}
|
||||
Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum());
|
||||
return chosenBackendIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select a set of backends for replica creation.
|
||||
|
||||
Reference in New Issue
Block a user