cherry pick from #36826
This commit is contained in:
@ -41,7 +41,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -280,9 +279,9 @@ public class PartitionRebalancer extends Rebalancer {
|
||||
Preconditions.checkNotNull(slot, "unable to get slot of toBe " + move.toBe);
|
||||
|
||||
List<RootPathLoadStatistic> paths = beStat.getPathStatistics();
|
||||
Set<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
|
||||
List<Long> availPath = paths.stream().filter(path -> path.getStorageMedium() == tabletCtx.getStorageMedium()
|
||||
&& path.isFit(tabletCtx.getTabletSize(), false) == BalanceStatus.OK)
|
||||
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
|
||||
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
|
||||
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
|
||||
if (pathHash == -1) {
|
||||
throw new SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
|
||||
|
||||
@ -2006,9 +2006,12 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// path hash -> slot num
|
||||
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
|
||||
private long beId;
|
||||
// only use in takeAnAvailBalanceSlotFrom, make pick RR
|
||||
private long lastPickPathHash;
|
||||
|
||||
public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
|
||||
this.beId = beId;
|
||||
this.lastPickPathHash = -1;
|
||||
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
|
||||
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
|
||||
}
|
||||
@ -2123,19 +2126,6 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return num;
|
||||
}
|
||||
|
||||
/**
|
||||
* get path whose balance slot num is larger than 0
|
||||
*/
|
||||
public synchronized Set<Long> getAvailPathsForBalance() {
|
||||
Set<Long> pathHashs = Sets.newHashSet();
|
||||
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
|
||||
if (entry.getValue().getAvailableBalance() > 0) {
|
||||
pathHashs.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
return pathHashs;
|
||||
}
|
||||
|
||||
public synchronized List<List<String>> getSlotInfo(long beId) {
|
||||
List<List<String>> results = Lists.newArrayList();
|
||||
pathSlots.forEach((key, value) -> {
|
||||
@ -2168,15 +2158,31 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return -1;
|
||||
}
|
||||
|
||||
public synchronized long takeAnAvailBalanceSlotFrom(Set<Long> pathHashs) {
|
||||
for (Long pathHash : pathHashs) {
|
||||
Slot slot = pathSlots.get(pathHash);
|
||||
if (slot == null) {
|
||||
continue;
|
||||
public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) {
|
||||
if (pathHashs.isEmpty()) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
Collections.sort(pathHashs);
|
||||
synchronized (this) {
|
||||
int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1;
|
||||
if (preferSlotIndex < 0 || preferSlotIndex >= pathHashs.size()) {
|
||||
preferSlotIndex = 0;
|
||||
}
|
||||
if (slot.balanceUsed < slot.getBalanceTotal()) {
|
||||
slot.balanceUsed++;
|
||||
return pathHash;
|
||||
|
||||
for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
|
||||
long pathHash = pathHashs.get(i);
|
||||
if (takeBalanceSlot(pathHash) != -1) {
|
||||
lastPickPathHash = pathHash;
|
||||
return pathHash;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < preferSlotIndex; i++) {
|
||||
long pathHash = pathHashs.get(i);
|
||||
if (takeBalanceSlot(pathHash) != -1) {
|
||||
lastPickPathHash = pathHash;
|
||||
return pathHash;
|
||||
}
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
|
||||
@ -0,0 +1,64 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
class PathSlotTest {
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
Config.balance_slot_num_per_path = 2;
|
||||
Map<Long, TStorageMedium> paths = Maps.newHashMap();
|
||||
List<Long> availPathHashs = Lists.newArrayList();
|
||||
List<Long> expectPathHashs = Lists.newArrayList();
|
||||
List<Long> gotPathHashs = Lists.newArrayList();
|
||||
long startPath = 10001L;
|
||||
long endPath = 10006L;
|
||||
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
|
||||
paths.put(pathHash, TStorageMedium.HDD);
|
||||
availPathHashs.add(pathHash);
|
||||
expectPathHashs.add(pathHash);
|
||||
}
|
||||
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
|
||||
expectPathHashs.add(pathHash);
|
||||
}
|
||||
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
|
||||
expectPathHashs.add(-1L);
|
||||
}
|
||||
|
||||
PathSlot ps = new PathSlot(paths, 1L);
|
||||
for (int i = 0; i < expectPathHashs.size(); i++) {
|
||||
Collections.shuffle(availPathHashs);
|
||||
gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs));
|
||||
}
|
||||
Assert.assertEquals(expectPathHashs, gotPathHashs);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user