[fix](tablet clone) clone add replica prefer choose the same medium (#25640)
This commit is contained in:
@ -414,7 +414,7 @@ public class BackendLoadStatistic {
|
||||
for (int i = 0; i < pathStatistics.size(); i++) {
|
||||
RootPathLoadStatistic pathStatistic = pathStatistics.get(i);
|
||||
// if this is a supplement task, ignore the storage medium
|
||||
if (!isSupplement && pathStatistic.getStorageMedium() != medium) {
|
||||
if (!isSupplement && medium != null && pathStatistic.getStorageMedium() != medium) {
|
||||
LOG.debug("backend {} path {}'s storage medium {} is not {} storage medium, actual: {}",
|
||||
beId, pathStatistic.getPath(), pathStatistic.getStorageMedium(), medium);
|
||||
continue;
|
||||
|
||||
@ -866,7 +866,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
|
||||
globalColocateStatistic.getBucketTotalReplicaDataSize(groupId, bucketIndex);
|
||||
|
||||
resultPaths.clear();
|
||||
BalanceStatus st = beStat.isFit(bucketDataSize, null, resultPaths, true);
|
||||
BalanceStatus st = beStat.isFit(bucketDataSize, null, resultPaths, false);
|
||||
if (!st.ok()) {
|
||||
LOG.debug("backend {} is unable to fit in group {}, tablet order idx {}, data size {}",
|
||||
destBeId, groupId, bucketIndex, bucketDataSize);
|
||||
|
||||
@ -98,9 +98,11 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic>
|
||||
toString() + " does not fit tablet with size: " + tabletSize + ", offline");
|
||||
}
|
||||
|
||||
double newUsagePerc = (usedCapacityB + tabletSize) / (double) capacityB;
|
||||
long newLeftCapacity = capacityB - usedCapacityB - tabletSize;
|
||||
if (isSupplement) {
|
||||
if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_flood_stage_usage_percent / 100.0)
|
||||
&& capacityB - usedCapacityB - tabletSize < Config.storage_flood_stage_left_capacity_bytes) {
|
||||
if (newUsagePerc > (Config.storage_flood_stage_usage_percent / 100.0)
|
||||
|| newLeftCapacity < Config.storage_flood_stage_left_capacity_bytes) {
|
||||
return new BalanceStatus(ErrCode.COMMON_ERROR,
|
||||
toString() + " does not fit tablet with size: " + tabletSize + ", limitation reached");
|
||||
} else {
|
||||
@ -108,8 +110,8 @@ public class RootPathLoadStatistic implements Comparable<RootPathLoadStatistic>
|
||||
}
|
||||
}
|
||||
|
||||
if ((usedCapacityB + tabletSize) / (double) capacityB > (Config.storage_high_watermark_usage_percent / 100.0)
|
||||
|| capacityB - usedCapacityB - tabletSize < Config.storage_min_left_capacity_bytes) {
|
||||
if (newUsagePerc > (Config.storage_high_watermark_usage_percent / 100.0)
|
||||
|| newLeftCapacity < Config.storage_min_left_capacity_bytes) {
|
||||
return new BalanceStatus(ErrCode.COMMON_ERROR,
|
||||
toString() + " does not fit tablet with size: " + tabletSize);
|
||||
}
|
||||
|
||||
@ -1318,7 +1318,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
// get all available paths which this tablet can fit in.
|
||||
// beStatistics is sorted by mix load score in ascend order, so select from first to last.
|
||||
List<BePathLoadStatPair> allFitPaths = Lists.newArrayList();
|
||||
List<BePathLoadStatPair> allFitPathsSameMedium = Lists.newArrayList();
|
||||
List<BePathLoadStatPair> allFitPathsDiffMedium = Lists.newArrayList();
|
||||
for (BackendLoadStatistic bes : beStatistics) {
|
||||
if (!bes.isAvailable()) {
|
||||
LOG.debug("backend {} is not available, skip. tablet: {}", bes.getBeId(), tabletCtx.getTabletId());
|
||||
@ -1349,27 +1350,27 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
|
||||
BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(),
|
||||
resultPaths, tabletCtx.getTabletStatus() != TabletStatus.REPLICA_RELOCATING
|
||||
/* if REPLICA_RELOCATING, then it is not a supplement task */);
|
||||
if (!st.ok()) {
|
||||
LOG.debug("unable to find path for tablet: {}. {}", tabletCtx, st);
|
||||
// This is to solve, when we decommission some BEs with SSD disks,
|
||||
// if there are no SSD disks on the remaining BEs, it will be impossible to select a
|
||||
// suitable destination path.
|
||||
// In this case, we need to ignore the storage medium property
|
||||
// and try to select the destination path again.
|
||||
// Set `isSupplement` to true will ignore the storage medium property.
|
||||
st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(),
|
||||
resultPaths, true);
|
||||
if (!st.ok()) {
|
||||
LOG.debug("unable to find path for supplementing tablet: {}. {}", tabletCtx, st);
|
||||
continue;
|
||||
resultPaths, false);
|
||||
if (st.ok()) {
|
||||
resultPaths.stream().forEach(path -> allFitPathsSameMedium.add(new BePathLoadStatPair(bes, path)));
|
||||
} else {
|
||||
LOG.debug("backend {} unable to find path for tablet: {}. {}", bes.getBeId(), tabletCtx, st);
|
||||
resultPaths.clear();
|
||||
st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), resultPaths, true);
|
||||
if (st.ok()) {
|
||||
resultPaths.stream().forEach(path -> allFitPathsDiffMedium.add(new BePathLoadStatPair(bes, path)));
|
||||
} else {
|
||||
LOG.debug("backend {} unable to find path for supplementing tablet: {}. {}",
|
||||
bes.getBeId(), tabletCtx, st);
|
||||
}
|
||||
}
|
||||
|
||||
resultPaths.stream().forEach(path -> allFitPaths.add(new BePathLoadStatPair(bes, path)));
|
||||
}
|
||||
|
||||
// all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
|
||||
// just get first available path.
|
||||
// we try to find a path with specified media type, if not find, arbitrarily use one.
|
||||
List<BePathLoadStatPair> allFitPaths =
|
||||
!allFitPathsSameMedium.isEmpty() ? allFitPathsSameMedium : allFitPathsDiffMedium;
|
||||
if (allFitPaths.isEmpty()) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to find dest path for new replica");
|
||||
}
|
||||
@ -1377,14 +1378,11 @@ public class TabletScheduler extends MasterDaemon {
|
||||
BePathLoadStatPairComparator comparator = new BePathLoadStatPairComparator(allFitPaths);
|
||||
Collections.sort(allFitPaths, comparator);
|
||||
|
||||
// all fit paths has already been sorted by load score in 'allFitPaths' in ascend order.
|
||||
// just get first available path.
|
||||
// we try to find a path with specified media type, if not find, arbitrarily use one.
|
||||
for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
|
||||
RootPathLoadStatistic rootPathLoadStatistic = bePathLoadStat.getPathLoadStatistic();
|
||||
if (rootPathLoadStatistic.getStorageMedium() != tabletCtx.getStorageMedium()) {
|
||||
LOG.debug("backend {}'s path {}'s storage medium {} "
|
||||
+ "is not equal to tablet's storage medium {}, skip. tablet: {}",
|
||||
+ "is not equal to tablet's storage medium {}, skip. tablet: {}",
|
||||
rootPathLoadStatistic.getBeId(), rootPathLoadStatistic.getPathHash(),
|
||||
rootPathLoadStatistic.getStorageMedium(), tabletCtx.getStorageMedium(),
|
||||
tabletCtx.getTabletId());
|
||||
|
||||
@ -0,0 +1,97 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AddReplicaChoseMediumTest extends TestWithFeService {
|
||||
|
||||
@Override
|
||||
protected void beforeCreatingConnectContext() throws Exception {
|
||||
Config.enable_round_robin_create_tablet = true;
|
||||
Config.allow_replica_on_same_host = true;
|
||||
Config.tablet_checker_interval_ms = 100;
|
||||
Config.tablet_schedule_interval_ms = 100;
|
||||
Config.schedule_slot_num_per_hdd_path = 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int backendNum() {
|
||||
return 4;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddReplicaChoseMedium() throws Exception {
|
||||
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
|
||||
List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
|
||||
Assertions.assertEquals(backendNum(), backends.size());
|
||||
for (Backend be : backends) {
|
||||
Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(be.getId()));
|
||||
}
|
||||
|
||||
Backend beWithSsd = backends.get(3);
|
||||
beWithSsd.getDisks().values().forEach(it -> {
|
||||
it.setStorageMedium(TStorageMedium.SSD);
|
||||
it.setTotalCapacityB(it.getTotalCapacityB() * 2);
|
||||
});
|
||||
|
||||
createDatabase("test");
|
||||
createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) "
|
||||
+ " BUCKETS 12 PROPERTIES ( \"replication_num\" = \"2\","
|
||||
+ " \"storage_medium\" = \"HDD\")");
|
||||
RebalancerTestUtil.updateReplicaPathHash();
|
||||
|
||||
Assertions.assertEquals(0, invertedIndex.getTabletNumByBackendId(beWithSsd.getId()));
|
||||
for (Backend be : backends) {
|
||||
if (be.getId() != beWithSsd.getId()) {
|
||||
Assertions.assertEquals(8, invertedIndex.getTabletNumByBackendId(be.getId()));
|
||||
}
|
||||
}
|
||||
|
||||
Backend decommissionBe = backends.get(0);
|
||||
String decommissionStmtStr = "alter system decommission backend \"" + decommissionBe.getHost()
|
||||
+ ":" + decommissionBe.getHeartbeatPort() + "\"";
|
||||
Assertions.assertNotNull(getSqlStmtExecutor(decommissionStmtStr));
|
||||
Assertions.assertTrue(decommissionBe.isDecommissioned());
|
||||
|
||||
List<Integer> gotTabletNums = null;
|
||||
List<Integer> expectTabletNums = Lists.newArrayList(0, 12, 12, 0);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
gotTabletNums = backends.stream().map(it -> invertedIndex.getTabletNumByBackendId(it.getId()))
|
||||
.collect(Collectors.toList());
|
||||
if (expectTabletNums.equals(gotTabletNums)) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
Assertions.assertEquals(expectTabletNums, gotTabletNums);
|
||||
}
|
||||
}
|
||||
@ -85,7 +85,7 @@ public class DecommissionTest {
|
||||
Map<String, TDisk> backendDisks = Maps.newHashMap();
|
||||
TDisk tDisk1 = new TDisk();
|
||||
tDisk1.setRootPath("/home/doris1.HDD");
|
||||
tDisk1.setDiskTotalCapacity(20000000);
|
||||
tDisk1.setDiskTotalCapacity(10L << 30);
|
||||
tDisk1.setDataUsedCapacity(1);
|
||||
tDisk1.setUsed(true);
|
||||
tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - tDisk1.data_used_capacity);
|
||||
@ -95,7 +95,7 @@ public class DecommissionTest {
|
||||
|
||||
TDisk tDisk2 = new TDisk();
|
||||
tDisk2.setRootPath("/home/doris2.HHD");
|
||||
tDisk2.setDiskTotalCapacity(20000000);
|
||||
tDisk2.setDiskTotalCapacity(10L << 30);
|
||||
tDisk2.setDataUsedCapacity(1);
|
||||
tDisk2.setUsed(true);
|
||||
tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - tDisk2.data_used_capacity);
|
||||
|
||||
@ -128,7 +128,7 @@ public class TabletRepairAndBalanceTest {
|
||||
Map<String, TDisk> backendDisks = Maps.newHashMap();
|
||||
TDisk tDisk1 = new TDisk();
|
||||
tDisk1.setRootPath("/home/doris.HDD");
|
||||
tDisk1.setDiskTotalCapacity(2000000000);
|
||||
tDisk1.setDiskTotalCapacity(10L << 30);
|
||||
tDisk1.setDataUsedCapacity(1);
|
||||
tDisk1.setUsed(true);
|
||||
tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity - tDisk1.data_used_capacity);
|
||||
@ -138,7 +138,7 @@ public class TabletRepairAndBalanceTest {
|
||||
|
||||
TDisk tDisk2 = new TDisk();
|
||||
tDisk2.setRootPath("/home/doris.SSD");
|
||||
tDisk2.setDiskTotalCapacity(2000000000);
|
||||
tDisk2.setDiskTotalCapacity(10L << 30);
|
||||
tDisk2.setDataUsedCapacity(1);
|
||||
tDisk2.setUsed(true);
|
||||
tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity - tDisk2.data_used_capacity);
|
||||
|
||||
@ -466,8 +466,8 @@ public abstract class TestWithFeService {
|
||||
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
|
||||
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
|
||||
diskInfo1.setPathHash(be.getId());
|
||||
diskInfo1.setTotalCapacityB(1000000);
|
||||
diskInfo1.setAvailableCapacityB(500000);
|
||||
diskInfo1.setTotalCapacityB(10L << 30);
|
||||
diskInfo1.setAvailableCapacityB(5L << 30);
|
||||
diskInfo1.setDataUsedCapacityB(480000);
|
||||
diskInfo1.setPathHash(be.getId());
|
||||
Map<String, DiskInfo> disks = Maps.newHashMap();
|
||||
|
||||
@ -298,8 +298,8 @@ public class UtFrameUtils {
|
||||
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
|
||||
Map<String, DiskInfo> disks = Maps.newHashMap();
|
||||
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
|
||||
diskInfo1.setTotalCapacityB(1000000);
|
||||
diskInfo1.setAvailableCapacityB(500000);
|
||||
diskInfo1.setTotalCapacityB(10L << 30);
|
||||
diskInfo1.setAvailableCapacityB(5L << 30);
|
||||
diskInfo1.setDataUsedCapacityB(480000);
|
||||
diskInfo1.setPathHash(be.getId());
|
||||
disks.put(diskInfo1.getRootPath(), diskInfo1);
|
||||
|
||||
Reference in New Issue
Block a user