From 3287f350de5a1946d4150957997c3ad8c2e9e587 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Sat, 6 May 2023 14:46:48 +0800 Subject: [PATCH] [feature](table) implement the round robin selection be when create tablet (#19167) --- .../java/org/apache/doris/common/Config.java | 3 + .../doris/datasource/InternalCatalog.java | 45 ++++-- .../doris/system/SystemInfoService.java | 116 ++++++++++++++++ .../RoundRobinCreateTabletTest.java | 129 ++++++++++++++++++ 4 files changed, 285 insertions(+), 8 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 0cb93bef9b..bd6f538c82 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1895,6 +1895,9 @@ public class Config extends ConfigBase { @ConfField(mutable = false) public static boolean skip_localhost_auth_check = false; + @ConfField(mutable = true) + public static boolean enable_round_robin_create_tablet = false; + /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 1947d17a66..c317ade75f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -176,6 +176,7 @@ import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTabletType; import org.apache.doris.thrift.TTaskType; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -204,6 +205,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; + /** * The Internal catalog will manage all self-managed meta object in a Doris cluster. * Such as Database, tables, etc. @@ -2516,7 +2518,8 @@ public class InternalCatalog implements CatalogIf { LOG.info("successfully create table[{}-{}]", tableName, tableId); } - private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState, + @VisibleForTesting + public void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, Set tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws DdlException { ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); @@ -2538,6 +2541,20 @@ public class InternalCatalog implements CatalogIf { if (chooseBackendsArbitrary) { backendsPerBucketSeq = Maps.newHashMap(); } + + Map nextIndexs = new HashMap<>(); + + if (Config.enable_round_robin_create_tablet) { + for (Map.Entry entry : replicaAlloc.getAllocMap().entrySet()) { + int startPos = Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(), clusterName, + tabletMeta.getStorageMedium()); + if (startPos == -1) { + throw new DdlException("The number of BEs that match the policy is insufficient"); + } + nextIndexs.put(entry.getKey(), startPos); + } + } + for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { // create a new tablet with random chosen backends Tablet tablet = new Tablet(idGeneratorBuffer.getNextId()); @@ -2550,14 +2567,26 @@ public class InternalCatalog implements CatalogIf { Map> chosenBackendIds; if (chooseBackendsArbitrary) { // This is the first colocate table in the group, or just a normal table, - // randomly choose backends - if (!Config.disable_storage_medium_check) { - chosenBackendIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, - tabletMeta.getStorageMedium()); + // choose backends + if (Config.enable_round_robin_create_tablet) { + if (!Config.disable_storage_medium_check) { + chosenBackendIds = Env.getCurrentSystemInfo() + .getBeIdRoundRobinForReplicaCreation(replicaAlloc, clusterName, + tabletMeta.getStorageMedium(), nextIndexs); + } else { + chosenBackendIds = Env.getCurrentSystemInfo() + .getBeIdRoundRobinForReplicaCreation(replicaAlloc, clusterName, null, + nextIndexs); + } } else { - chosenBackendIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null); + if (!Config.disable_storage_medium_check) { + chosenBackendIds = Env.getCurrentSystemInfo() + .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, + tabletMeta.getStorageMedium()); + } else { + chosenBackendIds = Env.getCurrentSystemInfo() + .selectBackendIdsForReplicaCreation(replicaAlloc, clusterName, null); + } } for (Map.Entry> entry : chosenBackendIds.entrySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 74824a395c..e710665640 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -861,6 +861,122 @@ public class SystemInfoService { return classMap; } + class BeComparator implements Comparator { + public int compare(Backend a, Backend b) { + return (int) (a.getId() - b.getId()); + } + } + + public List selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy policy, int number, + int nextIndex) { + Preconditions.checkArgument(number >= -1); + List candidates = getCandidates(policy); + if (number != -1 && candidates.size() < number) { + LOG.info("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); + return Lists.newArrayList(); + } + + int realIndex = nextIndex % candidates.size(); + List partialOrderList = new ArrayList(); + partialOrderList.addAll(candidates.subList(realIndex, candidates.size()) + .stream().map(b -> b.getId()).collect(Collectors.toList())); + partialOrderList.addAll(candidates.subList(0, realIndex) + .stream().map(b -> b.getId()).collect(Collectors.toList())); + + if (number == -1) { + return partialOrderList; + } else { + return partialOrderList.subList(0, number); + } + } + + public List getCandidates(BeSelectionPolicy policy) { + List candidates = policy.getCandidateBackends(idToBackendRef.values()); + if (candidates.isEmpty()) { + LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size()); + return Lists.newArrayList(); + } + + if (!policy.allowOnSameHost) { + Map> backendMaps = Maps.newHashMap(); + for (Backend backend : candidates) { + if (backendMaps.containsKey(backend.getIp())) { + backendMaps.get(backend.getIp()).add(backend); + } else { + List list = Lists.newArrayList(); + list.add(backend); + backendMaps.put(backend.getIp(), list); + } + } + candidates.clear(); + for (List list : backendMaps.values()) { + candidates.add(list.get(0)); + } + } + + if (candidates.isEmpty()) { + LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size()); + return Lists.newArrayList(); + } + + Collections.sort(candidates, new BeComparator()); + return candidates; + } + + // Select the smallest number of tablets as the starting position of + // round robin in the BE that match the policy + public int getStartPosOfRoundRobin(Tag tag, String clusterName, TStorageMedium storageMedium) { + BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName) + .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag)) + .setStorageMedium(storageMedium); + if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { + builder.allowOnSameHost(); + } + + BeSelectionPolicy policy = builder.build(); + List candidates = getCandidates(policy); + + long minBeTabletsNum = Long.MAX_VALUE; + int minIndex = -1; + for (int i = 0; i < candidates.size(); ++i) { + long tabletsNum = Env.getCurrentInvertedIndex() + .getTabletIdsByBackendId(candidates.get(i).getId()).size(); + if (tabletsNum < minBeTabletsNum) { + minBeTabletsNum = tabletsNum; + minIndex = i; + } + } + return minIndex; + } + + public Map> getBeIdRoundRobinForReplicaCreation( + ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium storageMedium, + Map nextIndexs) throws DdlException { + Map> chosenBackendIds = Maps.newHashMap(); + Map allocMap = replicaAlloc.getAllocMap(); + short totalReplicaNum = 0; + for (Map.Entry entry : allocMap.entrySet()) { + BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder().setCluster(clusterName) + .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey())) + .setStorageMedium(storageMedium); + if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { + builder.allowOnSameHost(); + } + + BeSelectionPolicy policy = builder.build(); + int nextIndex = nextIndexs.get(entry.getKey()); + List beIds = selectBackendIdsRoundRobinByPolicy(policy, entry.getValue(), nextIndex); + nextIndexs.put(entry.getKey(), nextIndex + beIds.size()); + + if (beIds.isEmpty()) { + throw new DdlException("Failed to find " + entry.getValue() + " backend(s) for policy: " + policy); + } + chosenBackendIds.put(entry.getKey(), beIds); + totalReplicaNum += beIds.size(); + } + Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum()); + return chosenBackendIds; + } /** * Select a set of backends for replica creation. diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java new file mode 100644 index 0000000000..029ce462dd --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RoundRobinCreateTabletTest.java @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Replica.ReplicaState; +import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.common.Config; +import org.apache.doris.resource.Tag; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStorageMedium; + +import com.google.common.collect.ImmutableMap; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class RoundRobinCreateTabletTest { + private Backend backend1; + private Backend backend2; + private Backend backend3; + private Backend backend4; + + @Before + public void setUp() { + backend1 = new Backend(1L, "192.168.1.1", 9050); + backend2 = new Backend(2L, "192.168.1.2", 9050); + backend3 = new Backend(3L, "192.168.1.3", 9050); + backend4 = new Backend(4L, "192.168.1.4", 9050); + + DiskInfo diskInfo1 = new DiskInfo("/disk1"); + ImmutableMap diskRefs = ImmutableMap.of("disk1", diskInfo1); + backend1.setDisks(diskRefs); + backend2.setDisks(diskRefs); + backend3.setDisks(diskRefs); + backend4.setDisks(diskRefs); + + backend1.setAlive(true); + backend2.setAlive(true); + backend3.setAlive(true); + backend4.setAlive(true); + + Map tagMap = new HashMap<>(); + tagMap.put(Tag.TYPE_LOCATION, Tag.VALUE_DEFAULT_TAG); + + backend1.setTagMap(tagMap); + backend2.setTagMap(tagMap); + backend3.setTagMap(tagMap); + backend4.setTagMap(tagMap); + + Env.getCurrentSystemInfo().addBackend(backend1); + Env.getCurrentSystemInfo().addBackend(backend2); + Env.getCurrentSystemInfo().addBackend(backend3); + Env.getCurrentSystemInfo().addBackend(backend4); + } + + @After + public void tearDown() { + Config.enable_round_robin_create_tablet = true; + Config.disable_storage_medium_check = true; + + try { + Env.getCurrentSystemInfo().dropBackend(1L); + Env.getCurrentSystemInfo().dropBackend(2L); + Env.getCurrentSystemInfo().dropBackend(3L); + Env.getCurrentSystemInfo().dropBackend(4L); + } catch (Exception e) { + System.out.println("failed to drop backend " + e.getMessage()); + } + } + + @Test + public void testCreateTablets() { + MaterializedIndex index = new MaterializedIndex(); + String clusterName = "default_cluster"; + HashDistributionInfo distributionInfo = new HashDistributionInfo(48, null); + ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) 3); + TabletMeta tabletMeta = new TabletMeta(1L, 2L, 3L, 4L, 5, TStorageMedium.HDD); + IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(1000); + Set tabletIdSet = new HashSet<>(); + + Config.enable_round_robin_create_tablet = true; + Config.disable_storage_medium_check = true; + + try { + Env.getCurrentEnv().getInternalCatalog().createTablets(clusterName, index, ReplicaState.NORMAL, + distributionInfo, 0, replicaAlloc, tabletMeta, + tabletIdSet, idGeneratorBuffer); + } catch (Exception e) { + System.out.println("failed to create tablets " + e.getMessage()); + } + + int i = 0; + int beNum = 4; + for (Tablet tablet : index.getTablets()) { + for (Replica replica : tablet.getReplicas()) { + Assert.assertEquals((i++ % beNum) + 1, replica.getBackendId()); + } + } + } +}