[feature](balance) Support balance between disks on a single BE (#8553)
Current situation of Doris is that the cluster is balanced, but the disks of a backend may be unbalanced. for example, backend A have two disks: disk1 and disk2, disk1's usage is 98%, but disk2's usage is only 40%. disk1 is unable to take more data, therefore only one disk of backend A can take new data, the available write throughput of backend A is only half of its ability, and we can not resolve this through load or partition rebalance now. So we introduce disk rebalancer, disk rebalancer is different from other rebalancer(load or partition) which take care of cluster-wide data balancing. it takes care about backend-wide data balancing. [For more details see #8550](https://github.com/apache/incubator-doris/issues/8550)
This commit is contained in:
@ -241,7 +241,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A
|
||||
KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_COMPACT,
|
||||
KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CREATION, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER,
|
||||
KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE,
|
||||
KW_DELETE, KW_UPDATE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE,
|
||||
KW_DELETE, KW_UPDATE, KW_DISK, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE,
|
||||
KW_ELSE, KW_ENABLE, KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXCEPT, KW_EXCLUDE,
|
||||
KW_EXISTS, KW_EXPORT, KW_EXTENDED, KW_EXTERNAL, KW_EXTRACT,
|
||||
KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FIELDS, KW_FILE, KW_FILTER, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS,
|
||||
@ -260,7 +260,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALIAS, KW_ALL, KW_A
|
||||
KW_PLUGIN, KW_PLUGINS,
|
||||
KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES, KW_PROPERTY,
|
||||
KW_QUERY, KW_QUOTA,
|
||||
KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME,
|
||||
KW_RANDOM, KW_RANGE, KW_READ, KW_REBALANCE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RELEASE, KW_RENAME,
|
||||
KW_REPAIR, KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE, KW_REPLACE_IF_NOT_NULL, KW_REPLICA, KW_RESOURCE, KW_RESOURCES, KW_RESTORE, KW_RETURNS, KW_RESUME, KW_REVOKE,
|
||||
KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROUTINE, KW_ROW, KW_ROWS,
|
||||
KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI, KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED,
|
||||
@ -5297,6 +5297,22 @@ admin_stmt ::=
|
||||
{:
|
||||
RESULT = new AdminCheckTabletsStmt(tabletIds, properties);
|
||||
:}
|
||||
| KW_ADMIN KW_REBALANCE KW_DISK KW_ON LPAREN string_list:backends RPAREN
|
||||
{:
|
||||
RESULT = new AdminRebalanceDiskStmt(backends);
|
||||
:}
|
||||
| KW_ADMIN KW_REBALANCE KW_DISK
|
||||
{:
|
||||
RESULT = new AdminRebalanceDiskStmt(null);
|
||||
:}
|
||||
| KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK KW_ON LPAREN string_list:backends RPAREN
|
||||
{:
|
||||
RESULT = new AdminCancelRebalanceDiskStmt(backends);
|
||||
:}
|
||||
| KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK
|
||||
{:
|
||||
RESULT = new AdminCancelRebalanceDiskStmt(null);
|
||||
:}
|
||||
| KW_ADMIN KW_CLEAN KW_TRASH KW_ON LPAREN string_list:backends RPAREN
|
||||
{:
|
||||
RESULT = new AdminCleanTrashStmt(backends);
|
||||
|
||||
@ -0,0 +1,73 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class AdminCancelRebalanceDiskStmt extends DdlStmt {
|
||||
private List<Backend> backends = Lists.newArrayList();
|
||||
|
||||
public AdminCancelRebalanceDiskStmt(List<String> backends) {
|
||||
ImmutableMap<Long, Backend> backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend();
|
||||
Map<String, Long> backendsID = new HashMap<String, Long>();
|
||||
for (Backend backend : backendsInfo.values()) {
|
||||
backendsID.put(String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()), backend.getId());
|
||||
}
|
||||
if (backends == null) {
|
||||
for (Backend backend : backendsInfo.values()) {
|
||||
this.backends.add(backend);
|
||||
}
|
||||
} else {
|
||||
for (String backend : backends) {
|
||||
if (backendsID.get(backend) != null) {
|
||||
this.backends.add(backendsInfo.get(backendsID.get(backend)));
|
||||
backendsID.remove(backend); // avoid repetition
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public List<Backend> getBackends() {
|
||||
return backends;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return RedirectStatus.NO_FORWARD;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,79 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class AdminRebalanceDiskStmt extends DdlStmt {
|
||||
private List<Backend> backends = Lists.newArrayList();
|
||||
private long timeoutS = 0;
|
||||
|
||||
public AdminRebalanceDiskStmt(List<String> backends) {
|
||||
ImmutableMap<Long, Backend> backendsInfo = Catalog.getCurrentSystemInfo().getIdToBackend();
|
||||
Map<String, Long> backendsID = new HashMap<String, Long>();
|
||||
for (Backend backend : backendsInfo.values()) {
|
||||
backendsID.put(String.valueOf(backend.getHost()) + ":" + String.valueOf(backend.getHeartbeatPort()), backend.getId());
|
||||
}
|
||||
if (backends == null) {
|
||||
for (Backend backend : backendsInfo.values()) {
|
||||
this.backends.add(backend);
|
||||
}
|
||||
} else {
|
||||
for (String backend : backends) {
|
||||
if (backendsID.get(backend) != null) {
|
||||
this.backends.add(backendsInfo.get(backendsID.get(backend)));
|
||||
backendsID.remove(backend); // avoid repetition
|
||||
}
|
||||
}
|
||||
}
|
||||
timeoutS = 24 * 3600; // default 24 hours
|
||||
}
|
||||
|
||||
public List<Backend> getBackends() {
|
||||
return backends;
|
||||
}
|
||||
|
||||
public long getTimeoutS() {
|
||||
return timeoutS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return RedirectStatus.NO_FORWARD;
|
||||
}
|
||||
}
|
||||
@ -246,8 +246,8 @@ public class BackendLoadStatistic {
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("classify path by load. storage: {} avg used percent: {}. low/mid/high: {}/{}/{}",
|
||||
avgUsedPercent, medium, lowCounter, midCounter, highCounter);
|
||||
LOG.debug("classify path by load. be id: {} storage: {} avg used percent: {}. low/mid/high: {}/{}/{}",
|
||||
beId, medium, avgUsedPercent, lowCounter, midCounter, highCounter);
|
||||
}
|
||||
|
||||
public void calcScore(Map<TStorageMedium, Double> avgClusterUsedCapacityPercentMap,
|
||||
@ -315,6 +315,60 @@ public class BackendLoadStatistic {
|
||||
return status;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check whether the backend can be more balance if we migrate a tablet with size 'tabletSize' from
|
||||
* `srcPath` to 'destPath'
|
||||
* 1. recalculate the load score of src and dest path after migrate the tablet.
|
||||
* 2. if the summary of the diff between the new score and average score becomes smaller, we consider it
|
||||
* as more balance.
|
||||
*/
|
||||
public boolean isMoreBalanced(long srcPath, long destPath, long tabletId, long tabletSize,
|
||||
TStorageMedium medium) {
|
||||
long totalCapacity = 0;
|
||||
long totalUsedCapacity = 0;
|
||||
RootPathLoadStatistic srcPathStat = null;
|
||||
RootPathLoadStatistic destPathStat = null;
|
||||
for (RootPathLoadStatistic pathStat : pathStatistics) {
|
||||
if (pathStat.getStorageMedium() == medium) {
|
||||
totalCapacity += pathStat.getCapacityB();
|
||||
totalUsedCapacity += pathStat.getUsedCapacityB();
|
||||
if (pathStat.getPathHash() == srcPath) {
|
||||
srcPathStat = pathStat;
|
||||
} else if (pathStat.getPathHash() == destPath) {
|
||||
destPathStat = pathStat;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (srcPathStat == null || destPathStat == null) {
|
||||
LOG.info("migrate {}(size: {}) from {} to {} failed, medium: {}, src or dest path stat does not exist.",
|
||||
tabletId, tabletSize, srcPath, destPath, medium);
|
||||
return false;
|
||||
}
|
||||
double avgUsedPercent = totalCapacity == 0 ? 0.0 : totalUsedCapacity / (double) totalCapacity;
|
||||
double currentSrcPathScore = srcPathStat.getCapacityB() == 0
|
||||
? 0.0 : srcPathStat.getUsedCapacityB() / (double) srcPathStat.getCapacityB();
|
||||
double currentDestPathScore = destPathStat.getCapacityB() == 0
|
||||
? 0.0 : destPathStat.getUsedCapacityB() / (double) destPathStat.getCapacityB();
|
||||
|
||||
double newSrcPathScore = srcPathStat.getCapacityB() == 0
|
||||
? 0.0 : (srcPathStat.getUsedCapacityB() - tabletSize) / (double) srcPathStat.getCapacityB();
|
||||
double newDestPathScore = destPathStat.getCapacityB() == 0
|
||||
? 0.0 : (destPathStat.getUsedCapacityB() + tabletSize) / (double) destPathStat.getCapacityB();
|
||||
|
||||
double currentDiff = Math.abs(currentSrcPathScore - avgUsedPercent)
|
||||
+ Math.abs(currentDestPathScore - avgUsedPercent);
|
||||
double newDiff = Math.abs(newSrcPathScore - avgUsedPercent) + Math.abs(newDestPathScore - avgUsedPercent);
|
||||
|
||||
LOG.debug("after migrate {}(size: {}) from {} to {}, medium: {}, the load score changed."
|
||||
+ " src: {} -> {}, dest: {}->{}, average score: {}. current diff: {}, new diff: {},"
|
||||
+ " more balanced: {}",
|
||||
tabletId, tabletSize, srcPath, destPath, medium, currentSrcPathScore, newSrcPathScore,
|
||||
currentDestPathScore, newDestPathScore, avgUsedPercent, currentDiff, newDiff,
|
||||
(newDiff < currentDiff));
|
||||
|
||||
return newDiff < currentDiff;
|
||||
}
|
||||
|
||||
public boolean hasAvailDisk() {
|
||||
for (RootPathLoadStatistic rootPathLoadStatistic : pathStatistics) {
|
||||
if (rootPathLoadStatistic.getDiskState() == DiskState.ONLINE) {
|
||||
|
||||
@ -0,0 +1,334 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.clone.SchedException.Status;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
import org.apache.doris.clone.TabletSchedCtx.BalanceType;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/*
|
||||
|
||||
* This DiskBalancer is different from other Balancers which takes care of cluster-wide data balancing.
|
||||
* This DiskBalancer chooses a backend and moves tablet from one disk to another.
|
||||
* DiskRebalancer strategy:
|
||||
* 1. only works while the cluster is balanced(which means the cluster has no high and mid load backends)
|
||||
* 1.1 if user has given prio backends, then select tablets from prio backends no matter cluster is balanced or not.
|
||||
* 2. selecting alternative tablets from mid load backends, and return them to tablet scheduler.
|
||||
* 3. given a tablet which has src path(disk), find a path(disk) to migration.
|
||||
*/
|
||||
public class DiskRebalancer extends Rebalancer {
|
||||
private static final Logger LOG = LogManager.getLogger(DiskRebalancer.class);
|
||||
|
||||
public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
|
||||
super(infoService, invertedIndex);
|
||||
}
|
||||
|
||||
public List<BackendLoadStatistic> filterByPrioBackends(List<BackendLoadStatistic> bes) {
|
||||
List<BackendLoadStatistic> stats = Lists.newArrayList();
|
||||
for (BackendLoadStatistic backend : bes) {
|
||||
long backendId = backend.getBeId();
|
||||
Long timeoutS = prioBackends.getOrDefault(backendId, 0L);
|
||||
if (timeoutS != 0) {
|
||||
if (timeoutS > System.currentTimeMillis()) {
|
||||
// remove backends from prio if timeout
|
||||
prioBackends.remove(backendId);
|
||||
continue;
|
||||
}
|
||||
stats.add(backend);
|
||||
}
|
||||
}
|
||||
return stats;
|
||||
}
|
||||
|
||||
// true means BE has low and high paths for balance after reclassification
|
||||
private boolean checkAndReclassifyPaths(Set<Long> pathLow, Set<Long> pathMid, Set<Long> pathHigh) {
|
||||
if (pathLow.isEmpty() && pathHigh.isEmpty()) {
|
||||
// balanced
|
||||
return false;
|
||||
}
|
||||
if (pathLow.isEmpty()) {
|
||||
// mid => low
|
||||
pathLow.addAll(pathMid);
|
||||
} else if (pathHigh.isEmpty()) {
|
||||
// mid => high
|
||||
pathHigh.addAll(pathMid);
|
||||
}
|
||||
if (pathLow.isEmpty() || pathHigh.isEmpty()) {
|
||||
// check again
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to select alternative tablets to balance the disks.
|
||||
* 1. Classify the backend into low, mid and high class by load score.
|
||||
* 2. Try to select tablets from mid load backends.
|
||||
* 1. Here we only select alternative tablets, without considering selected tablets' status,
|
||||
* and whether it is benefit for balance (All these will be checked in tablet scheduler)
|
||||
* 2. Only select tablets from 'mid' backends.
|
||||
* 3. Only select tablets from 'high' paths.
|
||||
* 3. Try to select tablets from prio backends.
|
||||
*
|
||||
* Here we only select tablets from mid load node, do not set its dest, all this will be set
|
||||
* when this tablet is being scheduled in tablet scheduler.
|
||||
*
|
||||
* NOTICE that we may select any available tablets here, ignore their state.
|
||||
* The state will be checked when being scheduled in tablet scheduler.
|
||||
*/
|
||||
@Override
|
||||
protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
|
||||
ClusterLoadStatistic clusterStat, TStorageMedium medium) {
|
||||
String clusterName = clusterStat.getClusterName();
|
||||
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
|
||||
|
||||
// get classification of backends
|
||||
List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
|
||||
List<BackendLoadStatistic> midBEs = Lists.newArrayList();
|
||||
List<BackendLoadStatistic> highBEs = Lists.newArrayList();
|
||||
clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs, medium);
|
||||
|
||||
if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
|
||||
// the cluster is not balanced
|
||||
if (prioBackends.isEmpty()) {
|
||||
LOG.info("cluster is not balanced: {} with medium: {}. skip", clusterName, medium);
|
||||
return alternativeTablets;
|
||||
} else {
|
||||
// prioBEs are not empty, we only schedule prioBEs' disk balance task
|
||||
midBEs.addAll(lowBEs);
|
||||
midBEs.addAll(highBEs);
|
||||
midBEs = filterByPrioBackends(midBEs);
|
||||
}
|
||||
}
|
||||
|
||||
// first we should check if mid backends is available.
|
||||
// if all mid backends is not available, we should not start balance
|
||||
if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
|
||||
LOG.info("all mid load backends is dead: {} with medium: {}. skip",
|
||||
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) {
|
||||
LOG.info("all mid load backends {} have no available disk with medium: {}. skip",
|
||||
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
Set<Long> unbalancedBEs = Sets.newHashSet();
|
||||
// choose tablets from backends randomly.
|
||||
Collections.shuffle(midBEs);
|
||||
for (int i = midBEs.size() - 1; i >= 0; i--) {
|
||||
BackendLoadStatistic beStat = midBEs.get(i);
|
||||
|
||||
// classify the paths.
|
||||
Set<Long> pathLow = Sets.newHashSet();
|
||||
Set<Long> pathMid = Sets.newHashSet();
|
||||
Set<Long> pathHigh = Sets.newHashSet();
|
||||
// we only select tablets from available high load path
|
||||
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
|
||||
// check if BE has low and high paths for balance after reclassification
|
||||
if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// get all tablets on this backend, and shuffle them for random selection
|
||||
List<Long> tabletIds = invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium);
|
||||
Collections.shuffle(tabletIds);
|
||||
|
||||
// for each path, we try to select at most BALANCE_SLOT_NUM_FOR_PATH tablets
|
||||
Map<Long, Integer> remainingPaths = Maps.newHashMap();
|
||||
for (Long pathHash : pathHigh) {
|
||||
remainingPaths.put(pathHash, TabletScheduler.BALANCE_SLOT_NUM_FOR_PATH);
|
||||
}
|
||||
|
||||
if (remainingPaths.isEmpty()) {
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
// select tablet from shuffled tablets
|
||||
for (Long tabletId : tabletIds) {
|
||||
Replica replica = invertedIndex.getReplica(tabletId, beStat.getBeId());
|
||||
if (replica == null) {
|
||||
continue;
|
||||
}
|
||||
// ignore empty replicas as they do not make disk more balance. (disk usage)
|
||||
if (replica.getDataSize() == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// check if replica's is on 'high' path.
|
||||
// and only select it if the selected tablets num of this path
|
||||
// does not exceed the limit (BALANCE_SLOT_NUM_FOR_PATH).
|
||||
long replicaPathHash = replica.getPathHash();
|
||||
if (remainingPaths.containsKey(replicaPathHash)) {
|
||||
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
|
||||
if (tabletMeta == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TabletSchedCtx tabletCtx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName,
|
||||
tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(),
|
||||
tabletMeta.getIndexId(), tabletId, null /* replica alloc is not used for balance*/,
|
||||
System.currentTimeMillis());
|
||||
// we set temp src here to simplify completeSchedCtx method, and avoid take slot here
|
||||
tabletCtx.setTempSrc(replica);
|
||||
tabletCtx.setTag(clusterStat.getTag());
|
||||
if (prioBackends.containsKey(beStat.getBeId())) {
|
||||
// priority of balance task of prio BE is NORMAL
|
||||
tabletCtx.setOrigPriority(Priority.NORMAL);
|
||||
} else {
|
||||
// balance task's default priority is LOW
|
||||
tabletCtx.setOrigPriority(Priority.LOW);
|
||||
}
|
||||
// we must set balanceType to DISK_BALANCE for create migration task
|
||||
tabletCtx.setBalanceType(BalanceType.DISK_BALANCE);
|
||||
|
||||
alternativeTablets.add(tabletCtx);
|
||||
unbalancedBEs.add(beStat.getBeId());
|
||||
// update remaining paths
|
||||
int remaining = remainingPaths.get(replicaPathHash) - 1;
|
||||
if (remaining <= 0) {
|
||||
remainingPaths.remove(replicaPathHash);
|
||||
} else {
|
||||
remainingPaths.put(replicaPathHash, remaining);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // end for mid backends
|
||||
|
||||
// remove balanced BEs from prio backends
|
||||
prioBackends.keySet().removeIf(id -> !unbalancedBEs.contains(id));
|
||||
LOG.info("select alternative tablets for cluster: {}, medium: {}, num: {}, detail: {}",
|
||||
clusterName, medium, alternativeTablets.size(),
|
||||
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
|
||||
return alternativeTablets;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a StorageMediaMigrationTask of this selected tablet for balance.
|
||||
* 1. Check if the cluster is balanced. if not, the balance will be cancelled.
|
||||
* 2. Check if the src replica still on high load path. If not, the balance will be cancelled.
|
||||
* 3. Select a low load path from this backend as destination.
|
||||
*/
|
||||
@Override
|
||||
public void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots) throws SchedException {
|
||||
ClusterLoadStatistic clusterStat = statisticMap.get(tabletCtx.getCluster(), tabletCtx.getTag());
|
||||
if (clusterStat == null) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "cluster does not exist");
|
||||
}
|
||||
if (tabletCtx.getTempSrcBackendId() == -1 || tabletCtx.getTempSrcPathHash() == -1) {
|
||||
throw new SchedException(Status.UNRECOVERABLE,
|
||||
"src does not appear to be set correctly, something goes wrong");
|
||||
}
|
||||
Replica replica = invertedIndex.getReplica(tabletCtx.getTabletId(), tabletCtx.getTempSrcBackendId());
|
||||
// check src replica still there
|
||||
if (replica == null || replica.getPathHash() != tabletCtx.getTempSrcPathHash()) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "src replica may be rebalanced");
|
||||
}
|
||||
// ignore empty replicas as they do not make disk more balance
|
||||
if (replica.getDataSize() == 0) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "size of src replica is zero");
|
||||
}
|
||||
// check src slot
|
||||
PathSlot slot = backendsWorkingSlots.get(replica.getBackendId());
|
||||
if (slot == null) {
|
||||
LOG.debug("BE does not have slot: {}", replica.getBackendId());
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot");
|
||||
}
|
||||
long pathHash = slot.takeBalanceSlot(replica.getPathHash());
|
||||
if (pathHash == -1) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to take src slot");
|
||||
}
|
||||
// after take src slot, we can set src replica now
|
||||
tabletCtx.setSrc(replica);
|
||||
|
||||
BackendLoadStatistic beStat = clusterStat.getBackendLoadStatistic(replica.getBackendId());
|
||||
if (!beStat.isAvailable()) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "the backend is not available");
|
||||
}
|
||||
// classify the paths.
|
||||
// If src path is 'high', then we can select path from 'low' and 'mid'
|
||||
// If src path is 'mid', then we can only select path from 'low'
|
||||
// If src path is 'low', then we have nothing to do
|
||||
Set<Long> pathLow = Sets.newHashSet();
|
||||
Set<Long> pathMid = Sets.newHashSet();
|
||||
Set<Long> pathHigh = Sets.newHashSet();
|
||||
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
|
||||
if (pathHigh.contains(replica.getPathHash())) {
|
||||
pathLow.addAll(pathMid);
|
||||
} else if (!pathMid.contains(replica.getPathHash())) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "src path is low load");
|
||||
}
|
||||
// check if this migration task can make the be's disks more balance.
|
||||
List<RootPathLoadStatistic> availPaths = Lists.newArrayList();
|
||||
BalanceStatus bs;
|
||||
if ((bs = beStat.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), availPaths,
|
||||
false /* not supplement */)) != BalanceStatus.OK) {
|
||||
LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(), bs.getErrMsgs());
|
||||
throw new SchedException(Status.UNRECOVERABLE, "tablet not fit in BE");
|
||||
}
|
||||
// Select a low load path as destination.
|
||||
boolean setDest = false;
|
||||
for (RootPathLoadStatistic stat : availPaths) {
|
||||
// check if avail path is src path
|
||||
if (stat.getPathHash() == replica.getPathHash()) {
|
||||
continue;
|
||||
}
|
||||
// check if avail path is low path
|
||||
if (!pathLow.contains(stat.getPathHash())) {
|
||||
LOG.debug("the path :{} is not low load", stat.getPathHash());
|
||||
continue;
|
||||
}
|
||||
if (!beStat.isMoreBalanced(tabletCtx.getSrcPathHash(), stat.getPathHash(),
|
||||
tabletCtx.getTabletId(), tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) {
|
||||
LOG.debug("the path :{} can not make more balance", stat.getPathHash());
|
||||
continue;
|
||||
}
|
||||
long destPathHash = slot.takeBalanceSlot(stat.getPathHash());
|
||||
if (destPathHash == -1) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to take dest slot");
|
||||
}
|
||||
tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath());
|
||||
setDest = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!setDest) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to find low load path");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -20,11 +20,13 @@ package org.apache.doris.clone;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTask;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Table;
|
||||
|
||||
@ -50,6 +52,8 @@ public abstract class Rebalancer {
|
||||
protected Table<String, Tag, ClusterLoadStatistic> statisticMap = HashBasedTable.create();
|
||||
protected TabletInvertedIndex invertedIndex;
|
||||
protected SystemInfoService infoService;
|
||||
// be id -> end time of prio
|
||||
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
|
||||
|
||||
public Rebalancer(SystemInfoService infoService, TabletInvertedIndex invertedIndex) {
|
||||
this.infoService = infoService;
|
||||
@ -71,10 +75,14 @@ public abstract class Rebalancer {
|
||||
protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
|
||||
ClusterLoadStatistic clusterStat, TStorageMedium medium);
|
||||
|
||||
public void createBalanceTask(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots,
|
||||
AgentBatchTask batchTask) throws SchedException {
|
||||
public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map<Long, PathSlot> backendsWorkingSlots)
|
||||
throws SchedException {
|
||||
completeSchedCtx(tabletCtx, backendsWorkingSlots);
|
||||
batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
|
||||
if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) {
|
||||
return tabletCtx.createCloneReplicaAndTask();
|
||||
} else {
|
||||
return tabletCtx.createStorageMediaMigrationTask();
|
||||
}
|
||||
}
|
||||
|
||||
// Before createCloneReplicaAndTask, we need to complete the TabletSchedCtx.
|
||||
@ -93,4 +101,21 @@ public abstract class Rebalancer {
|
||||
public void updateLoadStatistic(Table<String, Tag, ClusterLoadStatistic> statisticMap) {
|
||||
this.statisticMap = statisticMap;
|
||||
}
|
||||
|
||||
public void addPrioBackends(List<Backend> backends, long timeoutS) {
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
for (Backend backend : backends) {
|
||||
prioBackends.put(backend.getId(), currentTimeMillis + timeoutS);
|
||||
}
|
||||
}
|
||||
|
||||
public void removePrioBackends(List<Backend> backends) {
|
||||
for (Backend backend : backends) {
|
||||
prioBackends.remove(backend.getId());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean hasPrioBackends() {
|
||||
return !prioBackends.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,6 +40,7 @@ import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.AgentTaskQueue;
|
||||
import org.apache.doris.task.CloneTask;
|
||||
import org.apache.doris.task.StorageMediaMigrationTask;
|
||||
import org.apache.doris.thrift.TBackend;
|
||||
import org.apache.doris.thrift.TFinishTaskRequest;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
@ -108,6 +109,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
BALANCE, REPAIR
|
||||
}
|
||||
|
||||
public enum BalanceType {
|
||||
BE_BALANCE, DISK_BALANCE
|
||||
}
|
||||
|
||||
public enum Priority {
|
||||
LOW,
|
||||
NORMAL,
|
||||
@ -141,6 +146,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
}
|
||||
|
||||
private Type type;
|
||||
private BalanceType balanceType;
|
||||
|
||||
/*
|
||||
* origPriority is the origin priority being set when this tablet being added to scheduler.
|
||||
@ -193,11 +199,16 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
|
||||
private Replica srcReplica = null;
|
||||
private long srcPathHash = -1;
|
||||
// for disk balance to keep src path, and avoid take slot on selectAlternativeTabletsForCluster
|
||||
private Replica tempSrcReplica = null;
|
||||
private long destBackendId = -1;
|
||||
private long destPathHash = -1;
|
||||
// for disk balance to set migration task's datadir
|
||||
private String destPath = null;
|
||||
private String errMsg = null;
|
||||
|
||||
private CloneTask cloneTask = null;
|
||||
private StorageMediaMigrationTask storageMediaMigrationTask = null;
|
||||
|
||||
// statistics gathered from clone task report
|
||||
// the total size of clone files and the total cost time in ms.
|
||||
@ -227,6 +238,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
this.infoService = Catalog.getCurrentSystemInfo();
|
||||
this.state = State.PENDING;
|
||||
this.replicaAlloc = replicaAlloc;
|
||||
this.balanceType = BalanceType.BE_BALANCE;
|
||||
}
|
||||
|
||||
public ReplicaAllocation getReplicaAlloc() {
|
||||
@ -249,6 +261,14 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setBalanceType(BalanceType type) {
|
||||
this.balanceType = type;
|
||||
}
|
||||
|
||||
public BalanceType getBalanceType() {
|
||||
return balanceType;
|
||||
}
|
||||
|
||||
public Priority getOrigPriority() {
|
||||
return origPriority;
|
||||
}
|
||||
@ -380,6 +400,11 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
this.destBackendId = destBeId;
|
||||
this.destPathHash = destPathHash;
|
||||
}
|
||||
|
||||
public void setDest(Long destBeId, long destPathHash, String destPath) {
|
||||
setDest(destBeId, destPathHash);
|
||||
this.destPath = destPath;
|
||||
}
|
||||
|
||||
public void setErrMsg(String errMsg) {
|
||||
this.errMsg = errMsg;
|
||||
@ -414,6 +439,24 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
this.srcPathHash = srcReplica.getPathHash();
|
||||
}
|
||||
|
||||
public void setTempSrc(Replica srcReplica) {
|
||||
this.tempSrcReplica = srcReplica;
|
||||
}
|
||||
|
||||
public long getTempSrcBackendId() {
|
||||
if (tempSrcReplica != null) {
|
||||
return tempSrcReplica.getBackendId();
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
public long getTempSrcPathHash() {
|
||||
if (tempSrcReplica != null) {
|
||||
return tempSrcReplica.getPathHash();
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
public long getDestBackendId() {
|
||||
return destBackendId;
|
||||
}
|
||||
@ -422,6 +465,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
return destPathHash;
|
||||
}
|
||||
|
||||
public String getDestPath() {
|
||||
return destPath;
|
||||
}
|
||||
|
||||
// database lock should be held.
|
||||
public long getTabletSize() {
|
||||
long max = Long.MIN_VALUE;
|
||||
@ -687,6 +734,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
}
|
||||
}
|
||||
|
||||
if (storageMediaMigrationTask != null) {
|
||||
AgentTaskQueue.removeTask(storageMediaMigrationTask.getBackendId(), TTaskType.STORAGE_MEDIUM_MIGRATE, storageMediaMigrationTask.getSignature());
|
||||
}
|
||||
if (cloneTask != null) {
|
||||
AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature());
|
||||
|
||||
@ -729,13 +779,28 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
this.srcPathHash = -1;
|
||||
this.destBackendId = -1;
|
||||
this.destPathHash = -1;
|
||||
this.destPath = null;
|
||||
this.cloneTask = null;
|
||||
this.storageMediaMigrationTask = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteReplica(Replica replica) {
|
||||
tablet.deleteReplicaByBackendId(replica.getBackendId());
|
||||
}
|
||||
|
||||
public StorageMediaMigrationTask createStorageMediaMigrationTask() throws SchedException {
|
||||
storageMediaMigrationTask = new StorageMediaMigrationTask(getSrcBackendId(), getTabletId(),
|
||||
getSchemaHash(), getStorageMedium());
|
||||
if (destPath == null || destPath.isEmpty()) {
|
||||
throw new SchedException(Status.UNRECOVERABLE,
|
||||
"backend " + srcReplica.getBackendId() + ", dest path is empty");
|
||||
}
|
||||
storageMediaMigrationTask.setDataDir(destPath);
|
||||
this.taskTimeoutMs = getApproximateTimeoutMs();
|
||||
this.state = State.RUNNING;
|
||||
return storageMediaMigrationTask;
|
||||
}
|
||||
|
||||
// database lock should be held.
|
||||
public CloneTask createCloneReplicaAndTask() throws SchedException {
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
|
||||
import org.apache.doris.analysis.AdminRebalanceDiskStmt;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.ColocateTableIndex;
|
||||
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
|
||||
@ -51,7 +53,9 @@ import org.apache.doris.task.AgentTaskExecutor;
|
||||
import org.apache.doris.task.AgentTaskQueue;
|
||||
import org.apache.doris.task.CloneTask;
|
||||
import org.apache.doris.task.DropReplicaTask;
|
||||
import org.apache.doris.task.StorageMediaMigrationTask;
|
||||
import org.apache.doris.thrift.TFinishTaskRequest;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.transaction.DatabaseTransactionMgr;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
|
||||
@ -101,13 +105,14 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s
|
||||
|
||||
public static final int BALANCE_SLOT_NUM_FOR_PATH = 2;
|
||||
// 1 slot for reduce unnecessary balance task, provided a more accurate estimate of capacity
|
||||
public static final int BALANCE_SLOT_NUM_FOR_PATH = 1;
|
||||
|
||||
/*
|
||||
* Tablet is added to pendingTablets as well it's id in allTabletIds.
|
||||
* TabletScheduler will take tablet from pendingTablets but will not remove it's id from allTabletIds when
|
||||
* handling a tablet.
|
||||
* Tablet' id can only be removed after the clone task is done(timeout, cancelled or finished).
|
||||
* Tablet' id can only be removed after the clone task or migration task is done(timeout, cancelled or finished).
|
||||
* So if a tablet's id is still in allTabletIds, TabletChecker can not add tablet to TabletScheduler.
|
||||
*
|
||||
* pendingTablets + runningTablets = allTabletIds
|
||||
@ -135,6 +140,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
private ColocateTableIndex colocateTableIndex;
|
||||
private TabletSchedulerStat stat;
|
||||
private Rebalancer rebalancer;
|
||||
private Rebalancer diskRebalancer;
|
||||
|
||||
// result of adding a tablet to pendingTablets
|
||||
public enum AddResult {
|
||||
@ -157,6 +163,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
} else {
|
||||
this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
|
||||
}
|
||||
// if rebalancer can not get new task, then use diskRebalancer to get task
|
||||
this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex);
|
||||
}
|
||||
|
||||
public TabletSchedulerStat getStat() {
|
||||
@ -244,6 +252,14 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return allTabletIds.contains(tabletId);
|
||||
}
|
||||
|
||||
public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
|
||||
diskRebalancer.addPrioBackends(stmt.getBackends(), stmt.getTimeoutS());
|
||||
}
|
||||
|
||||
public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt stmt) {
|
||||
diskRebalancer.removePrioBackends(stmt.getBackends());
|
||||
}
|
||||
|
||||
/**
|
||||
* Iterate current tablets, change their priority to VERY_HIGH if necessary.
|
||||
*/
|
||||
@ -300,6 +316,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
|
||||
updateClusterLoadStatistic();
|
||||
rebalancer.updateLoadStatistic(statisticMap);
|
||||
diskRebalancer.updateLoadStatistic(statisticMap);
|
||||
|
||||
adjustPriorities();
|
||||
|
||||
@ -463,7 +480,6 @@ public class TabletScheduler extends MasterDaemon {
|
||||
* Try to schedule a single tablet.
|
||||
*/
|
||||
private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
|
||||
LOG.debug("schedule tablet: {}, type: {}, status: {}", tabletCtx.getTabletId(), tabletCtx.getType(), tabletCtx.getTabletStatus());
|
||||
long currentTime = System.currentTimeMillis();
|
||||
tabletCtx.setLastSchedTime(currentTime);
|
||||
tabletCtx.setLastVisitedTime(currentTime);
|
||||
@ -561,6 +577,11 @@ public class TabletScheduler extends MasterDaemon {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "tablet is unhealthy when doing balance");
|
||||
}
|
||||
|
||||
// for disk balance more accutely, we only schedule tablet when has lastly stat info about disk
|
||||
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE &&
|
||||
tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) {
|
||||
checkDiskBalanceLastSuccTime(tabletCtx.getTempSrcBackendId(), tabletCtx.getTempSrcPathHash());
|
||||
}
|
||||
// we do not concern priority here.
|
||||
// once we take the tablet out of priority queue, priority is meaningless.
|
||||
tabletCtx.setTablet(tablet);
|
||||
@ -574,6 +595,25 @@ public class TabletScheduler extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws SchedException {
|
||||
PathSlot pathSlot = backendsWorkingSlots.get(beId);
|
||||
if (pathSlot == null) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "path slot does not exist");
|
||||
}
|
||||
long succTime = pathSlot.getDiskBalanceLastSuccTime(pathHash);
|
||||
if (succTime > lastStatUpdateTime) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "stat info is outdated");
|
||||
}
|
||||
}
|
||||
|
||||
public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
|
||||
PathSlot pathSlot = backendsWorkingSlots.get(beId);
|
||||
if (pathSlot == null) {
|
||||
return;
|
||||
}
|
||||
pathSlot.updateDiskBalanceLastSuccTime(pathHash);
|
||||
}
|
||||
|
||||
private void handleTabletByTypeAndStatus(TabletStatus status, TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
|
||||
throws SchedException {
|
||||
if (tabletCtx.getType() == Type.REPAIR) {
|
||||
@ -1189,6 +1229,21 @@ public class TabletScheduler extends MasterDaemon {
|
||||
for (TabletSchedCtx tabletCtx : alternativeTablets) {
|
||||
addTablet(tabletCtx, false);
|
||||
}
|
||||
if (Config.disable_disk_balance) {
|
||||
LOG.info("disk balance is disabled. skip selecting tablets for disk balance");
|
||||
return;
|
||||
}
|
||||
List<TabletSchedCtx> diskBalanceTablets = Lists.newArrayList();
|
||||
// if default rebalancer can not get new task or user given prio BEs, then use disk rebalancer to get task
|
||||
if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) {
|
||||
diskBalanceTablets = diskRebalancer.selectAlternativeTablets();
|
||||
}
|
||||
for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
|
||||
// add if task from prio backend or cluster is balanced
|
||||
if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() == TabletSchedCtx.Priority.NORMAL) {
|
||||
addTablet(tabletCtx, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1196,7 +1251,18 @@ public class TabletScheduler extends MasterDaemon {
|
||||
*/
|
||||
private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask) throws SchedException {
|
||||
stat.counterBalanceSchedule.incrementAndGet();
|
||||
rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots, batchTask);
|
||||
AgentTask task = null;
|
||||
if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) {
|
||||
task = diskRebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots);
|
||||
checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
|
||||
checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
|
||||
} else if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.BE_BALANCE) {
|
||||
task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots);
|
||||
} else {
|
||||
throw new SchedException(Status.UNRECOVERABLE,
|
||||
"unknown balance type: " + tabletCtx.getBalanceType().toString());
|
||||
}
|
||||
batchTask.addTask(task);
|
||||
}
|
||||
|
||||
// choose a path on a backend which is fit for the tablet
|
||||
@ -1347,7 +1413,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// get next batch of tablets from queue.
|
||||
private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
|
||||
List<TabletSchedCtx> list = Lists.newArrayList();
|
||||
int count = Math.max(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
|
||||
int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
|
||||
while (count > 0) {
|
||||
TabletSchedCtx tablet = pendingTablets.poll();
|
||||
if (tablet == null) {
|
||||
@ -1368,6 +1434,29 @@ public class TabletScheduler extends MasterDaemon {
|
||||
return total;
|
||||
}
|
||||
|
||||
public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask migrationTask,
|
||||
TFinishTaskRequest request) {
|
||||
long tabletId = migrationTask.getTabletId();
|
||||
TabletSchedCtx tabletCtx = takeRunningTablets(tabletId);
|
||||
if (tabletCtx == null) {
|
||||
// tablet does not exist, the task may be created by ReportHandler.tabletReport(ssd => hdd)
|
||||
LOG.warn("tablet info does not exist: {}", tabletId);
|
||||
return true;
|
||||
}
|
||||
if (tabletCtx.getBalanceType() != TabletSchedCtx.BalanceType.DISK_BALANCE) {
|
||||
// this should not happen
|
||||
LOG.warn("task type is not as excepted. tablet {}", tabletId);
|
||||
return true;
|
||||
}
|
||||
if (request.getTaskStatus().getStatusCode() == TStatusCode.OK) {
|
||||
// if we have a success task, then stat must be refreshed before schedule a new task
|
||||
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), tabletCtx.getSrcPathHash());
|
||||
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), tabletCtx.getDestPathHash());
|
||||
}
|
||||
// we need this function to free slot for this migration task
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished");
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* return true if we want to remove the clone task from AgentTaskQueue
|
||||
*/
|
||||
@ -1379,6 +1468,11 @@ public class TabletScheduler extends MasterDaemon {
|
||||
// tablet does not exist, no need to keep task.
|
||||
return true;
|
||||
}
|
||||
if (tabletCtx.getBalanceType() == TabletSchedCtx.BalanceType.DISK_BALANCE) {
|
||||
// this should not happen
|
||||
LOG.warn("task type is not as excepted. tablet {}", tabletId);
|
||||
return true;
|
||||
}
|
||||
|
||||
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState());
|
||||
try {
|
||||
@ -1706,6 +1800,22 @@ public class TabletScheduler extends MasterDaemon {
|
||||
slot.balanceSlot++;
|
||||
slot.rectify();
|
||||
}
|
||||
|
||||
public synchronized void updateDiskBalanceLastSuccTime(long pathHash) {
|
||||
Slot slot = pathSlots.get(pathHash);
|
||||
if (slot == null) {
|
||||
return;
|
||||
}
|
||||
slot.diskBalanceLastSuccTime = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
public synchronized long getDiskBalanceLastSuccTime(long pathHash) {
|
||||
Slot slot = pathSlots.get(pathHash);
|
||||
if (slot == null) {
|
||||
return 0L;
|
||||
}
|
||||
return slot.diskBalanceLastSuccTime;
|
||||
}
|
||||
}
|
||||
|
||||
public List<List<String>> getSlotsInfo() {
|
||||
@ -1726,6 +1836,9 @@ public class TabletScheduler extends MasterDaemon {
|
||||
public long totalCopySize = 0;
|
||||
public long totalCopyTimeMs = 0;
|
||||
|
||||
// for disk balance
|
||||
public long diskBalanceLastSuccTime = 0;
|
||||
|
||||
public Slot(int total) {
|
||||
this.total = total;
|
||||
this.available = total;
|
||||
|
||||
@ -1133,6 +1133,12 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static boolean disable_balance = false;
|
||||
|
||||
/**
|
||||
* if set to true, TabletScheduler will not do disk balance.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static boolean disable_disk_balance = false;
|
||||
|
||||
// if the number of scheduled tablets in TabletScheduler exceed max_scheduling_tablets
|
||||
// skip checking.
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
|
||||
@ -47,6 +47,7 @@ import org.apache.doris.task.DownloadTask;
|
||||
import org.apache.doris.task.PublishVersionTask;
|
||||
import org.apache.doris.task.PushTask;
|
||||
import org.apache.doris.task.SnapshotTask;
|
||||
import org.apache.doris.task.StorageMediaMigrationTask;
|
||||
import org.apache.doris.task.UpdateTabletMetaInfoTask;
|
||||
import org.apache.doris.task.UploadTask;
|
||||
import org.apache.doris.thrift.TBackend;
|
||||
@ -115,8 +116,8 @@ public class MasterImpl {
|
||||
|
||||
AgentTask task = AgentTaskQueue.getTask(backendId, taskType, signature);
|
||||
if (task == null) {
|
||||
if (taskType != TTaskType.DROP && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
|
||||
&& taskType != TTaskType.RELEASE_SNAPSHOT && taskType != TTaskType.CLEAR_TRANSACTION_TASK) {
|
||||
if (taskType != TTaskType.DROP && taskType != TTaskType.RELEASE_SNAPSHOT
|
||||
&& taskType != TTaskType.CLEAR_TRANSACTION_TASK) {
|
||||
String errMsg = "cannot find task. type: " + taskType + ", backendId: " + backendId
|
||||
+ ", signature: " + signature;
|
||||
LOG.warn(errMsg);
|
||||
@ -137,7 +138,8 @@ public class MasterImpl {
|
||||
if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD
|
||||
&& taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE
|
||||
&& taskType != TTaskType.CLONE && taskType != TTaskType.PUBLISH_VERSION
|
||||
&& taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO) {
|
||||
&& taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO
|
||||
&& taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -175,6 +177,9 @@ public class MasterImpl {
|
||||
case CLONE:
|
||||
finishClone(task, request);
|
||||
break;
|
||||
case STORAGE_MEDIUM_MIGRATE:
|
||||
finishStorageMediumMigrate(task, request);
|
||||
break;
|
||||
case CHECK_CONSISTENCY:
|
||||
finishConsistencyCheck(task, request);
|
||||
break;
|
||||
@ -699,6 +704,12 @@ public class MasterImpl {
|
||||
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CLONE, task.getSignature());
|
||||
}
|
||||
|
||||
private void finishStorageMediumMigrate(AgentTask task, TFinishTaskRequest request) {
|
||||
StorageMediaMigrationTask migrationTask = (StorageMediaMigrationTask) task;
|
||||
Catalog.getCurrentCatalog().getTabletScheduler().finishStorageMediaMigrationTask(migrationTask, request);
|
||||
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.STORAGE_MEDIUM_MIGRATE, task.getSignature());
|
||||
}
|
||||
|
||||
private void finishConsistencyCheck(AgentTask task, TFinishTaskRequest request) {
|
||||
CheckConsistencyTask checkConsistencyTask = (CheckConsistencyTask) task;
|
||||
|
||||
|
||||
@ -360,10 +360,12 @@ public class ReportHandler extends Daemon {
|
||||
// 1. CREATE
|
||||
// 2. SYNC DELETE
|
||||
// 3. CHECK_CONSISTENCY
|
||||
// 4. STORAGE_MDEIUM_MIGRATE
|
||||
if (task.getTaskType() == TTaskType.CREATE
|
||||
|| (task.getTaskType() == TTaskType.PUSH && ((PushTask) task).getPushType() == TPushType.DELETE
|
||||
&& ((PushTask) task).isSyncDelete())
|
||||
|| task.getTaskType() == TTaskType.CHECK_CONSISTENCY) {
|
||||
|| task.getTaskType() == TTaskType.CHECK_CONSISTENCY
|
||||
|| task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,8 @@ import org.apache.doris.analysis.AdminCancelRepairTableStmt;
|
||||
import org.apache.doris.analysis.AdminCheckTabletsStmt;
|
||||
import org.apache.doris.analysis.AdminCleanTrashStmt;
|
||||
import org.apache.doris.analysis.AdminCompactTableStmt;
|
||||
import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
|
||||
import org.apache.doris.analysis.AdminRebalanceDiskStmt;
|
||||
import org.apache.doris.analysis.AdminRepairTableStmt;
|
||||
import org.apache.doris.analysis.AdminSetConfigStmt;
|
||||
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
|
||||
@ -281,6 +283,10 @@ public class DdlExecutor {
|
||||
catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AdminCleanTrashStmt) {
|
||||
catalog.cleanTrash((AdminCleanTrashStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AdminRebalanceDiskStmt) {
|
||||
catalog.getTabletScheduler().rebalanceDisk((AdminRebalanceDiskStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AdminCancelRebalanceDiskStmt) {
|
||||
catalog.getTabletScheduler().cancelRebalanceDisk((AdminCancelRebalanceDiskStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof CreateSqlBlockRuleStmt) {
|
||||
catalog.getSqlBlockRuleMgr().createSqlBlockRule((CreateSqlBlockRuleStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof AlterSqlBlockRuleStmt) {
|
||||
|
||||
@ -21,10 +21,14 @@ import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.thrift.TStorageMediumMigrateReq;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
public class StorageMediaMigrationTask extends AgentTask {
|
||||
|
||||
private int schemaHash;
|
||||
private TStorageMedium toStorageMedium;
|
||||
// if dataDir is specified, the toStorageMedium is meaning less
|
||||
private String dataDir;
|
||||
|
||||
public StorageMediaMigrationTask(long backendId, long tabletId, int schemaHash,
|
||||
TStorageMedium toStorageMedium) {
|
||||
@ -36,9 +40,20 @@ public class StorageMediaMigrationTask extends AgentTask {
|
||||
|
||||
public TStorageMediumMigrateReq toThrift() {
|
||||
TStorageMediumMigrateReq request = new TStorageMediumMigrateReq(tabletId, schemaHash, toStorageMedium);
|
||||
if (!Strings.isNullOrEmpty(dataDir)) {
|
||||
request.setDataDir(dataDir);
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
public String getDataDir() {
|
||||
return dataDir;
|
||||
}
|
||||
|
||||
public void setDataDir(String dataDir) {
|
||||
this.dataDir = dataDir;
|
||||
}
|
||||
|
||||
public int getSchemaHash() {
|
||||
return schemaHash;
|
||||
}
|
||||
|
||||
@ -171,6 +171,7 @@ import org.apache.doris.qe.SqlModeHelper;
|
||||
keywordMap.put("distinctpcsa", new Integer(SqlParserSymbols.KW_DISTINCTPCSA));
|
||||
keywordMap.put("distributed", new Integer(SqlParserSymbols.KW_DISTRIBUTED));
|
||||
keywordMap.put("distribution", new Integer(SqlParserSymbols.KW_DISTRIBUTION));
|
||||
keywordMap.put("disk", new Integer(SqlParserSymbols.KW_DISK));
|
||||
keywordMap.put("dynamic", new Integer(SqlParserSymbols.KW_DYNAMIC));
|
||||
keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV));
|
||||
keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE));
|
||||
@ -319,6 +320,7 @@ import org.apache.doris.qe.SqlModeHelper;
|
||||
keywordMap.put("range", new Integer(SqlParserSymbols.KW_RANGE));
|
||||
keywordMap.put("read", new Integer(SqlParserSymbols.KW_READ));
|
||||
keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE));
|
||||
keywordMap.put("rebalance", new Integer(SqlParserSymbols.KW_REBALANCE));
|
||||
keywordMap.put("recover", new Integer(SqlParserSymbols.KW_RECOVER));
|
||||
keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH));
|
||||
keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP));
|
||||
|
||||
@ -0,0 +1,82 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.clone.RebalancerTestUtil;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.mysql.privilege.MockedAuth;
|
||||
import org.apache.doris.mysql.privilege.PaloAuth;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import mockit.Mocked;
|
||||
|
||||
public class AdminCancelRebalanceDiskStmtTest {
|
||||
|
||||
private static Analyzer analyzer;
|
||||
|
||||
@Mocked
|
||||
private PaloAuth auth;
|
||||
@Mocked
|
||||
private ConnectContext ctx;
|
||||
|
||||
@Before()
|
||||
public void setUp() {
|
||||
Config.disable_cluster_feature = false;
|
||||
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
|
||||
MockedAuth.mockedAuth(auth);
|
||||
MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1");
|
||||
|
||||
List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
|
||||
beIds.forEach(id -> Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id, 2048, 0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParticularBackends() throws AnalysisException {
|
||||
List<String> backends = Lists.newArrayList(
|
||||
"192.168.0.10003:9051", "192.168.0.10004:9051", "192.168.0.10005:9051", "192.168.0.10006:9051");
|
||||
final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(backends);
|
||||
stmt.analyze(analyzer);
|
||||
Assert.assertEquals(2, stmt.getBackends().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmpty() throws AnalysisException {
|
||||
List<String> backends = Lists.newArrayList();
|
||||
final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(backends);
|
||||
stmt.analyze(analyzer);
|
||||
Assert.assertEquals(0, stmt.getBackends().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNull() throws AnalysisException {
|
||||
final AdminCancelRebalanceDiskStmt stmt = new AdminCancelRebalanceDiskStmt(null);
|
||||
stmt.analyze(analyzer);
|
||||
Assert.assertEquals(4, stmt.getBackends().size());
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,83 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.clone.RebalancerTestUtil;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.mysql.privilege.MockedAuth;
|
||||
import org.apache.doris.mysql.privilege.PaloAuth;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
//import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import mockit.Mocked;
|
||||
|
||||
public class AdminRebalanceDiskStmtTest {
|
||||
|
||||
private static Analyzer analyzer;
|
||||
|
||||
@Mocked
|
||||
private PaloAuth auth;
|
||||
@Mocked
|
||||
private ConnectContext ctx;
|
||||
|
||||
@Before()
|
||||
public void setUp() {
|
||||
Config.disable_cluster_feature = false;
|
||||
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
|
||||
MockedAuth.mockedAuth(auth);
|
||||
MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1");
|
||||
|
||||
List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
|
||||
beIds.forEach(id -> Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id, 2048, 0)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParticularBackends() throws AnalysisException {
|
||||
List<String> backends = Lists.newArrayList(
|
||||
"192.168.0.10003:9051", "192.168.0.10004:9051", "192.168.0.10005:9051", "192.168.0.10006:9051");
|
||||
final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(backends);
|
||||
stmt.analyze(analyzer);
|
||||
Assert.assertEquals(2, stmt.getBackends().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmpty() throws AnalysisException {
|
||||
List<String> backends = Lists.newArrayList();
|
||||
final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(backends);
|
||||
stmt.analyze(analyzer);
|
||||
Assert.assertEquals(0, stmt.getBackends().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNull() throws AnalysisException {
|
||||
final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(null);
|
||||
stmt.analyze(analyzer);
|
||||
Assert.assertEquals(4, stmt.getBackends().size());
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,262 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DataProperty;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.HashDistributionInfo;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.RangePartitionInfo;
|
||||
import org.apache.doris.catalog.ReplicaAllocation;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.clone.TabletScheduler.PathSlot;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTask;
|
||||
import org.apache.doris.task.StorageMediaMigrationTask;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.thrift.TStorageType;
|
||||
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Table;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.logging.log4j.Level;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.core.config.Configurator;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
//import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
import mockit.Delegate;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
//import static com.google.common.collect.MoreCollectors.onlyElement;
|
||||
|
||||
public class DiskRebalanceTest {
|
||||
private static final Logger LOG = LogManager.getLogger(DiskRebalanceTest.class);
|
||||
|
||||
@Mocked
|
||||
private Catalog catalog;
|
||||
|
||||
private long id = 10086;
|
||||
|
||||
private Database db;
|
||||
private OlapTable olapTable;
|
||||
|
||||
private final SystemInfoService systemInfoService = new SystemInfoService();
|
||||
private final TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
|
||||
private Table<String, Tag, ClusterLoadStatistic> statisticMap;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
db = new Database(1, "test db");
|
||||
db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
new Expectations() {
|
||||
{
|
||||
catalog.getDbIds();
|
||||
minTimes = 0;
|
||||
result = db.getId();
|
||||
|
||||
catalog.getDbNullable(anyLong);
|
||||
minTimes = 0;
|
||||
result = db;
|
||||
|
||||
catalog.getDbOrException(anyLong, (Function<Long, SchedException>) any);
|
||||
minTimes = 0;
|
||||
result = db;
|
||||
|
||||
Catalog.getCurrentCatalogJournalVersion();
|
||||
minTimes = 0;
|
||||
result = FeConstants.meta_version;
|
||||
|
||||
catalog.getNextId();
|
||||
minTimes = 0;
|
||||
result = new Delegate() {
|
||||
long a() {
|
||||
return id++;
|
||||
}
|
||||
};
|
||||
|
||||
Catalog.getCurrentSystemInfo();
|
||||
minTimes = 0;
|
||||
result = systemInfoService;
|
||||
|
||||
Catalog.getCurrentInvertedIndex();
|
||||
minTimes = 0;
|
||||
result = invertedIndex;
|
||||
|
||||
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId();
|
||||
result = 111;
|
||||
|
||||
Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(anyLong, anyLong, (List<Long>) any);
|
||||
result = true;
|
||||
}
|
||||
};
|
||||
// Test mock validation
|
||||
Assert.assertEquals(111, Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
|
||||
Assert.assertTrue(Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2, Lists.newArrayList(3L)));
|
||||
}
|
||||
|
||||
private void generateStatisticMap() {
|
||||
ClusterLoadStatistic loadStatistic = new ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER,
|
||||
Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex);
|
||||
loadStatistic.init();
|
||||
statisticMap = HashBasedTable.create();
|
||||
statisticMap.put(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG, loadStatistic);
|
||||
}
|
||||
|
||||
private void createPartitionsForTable(OlapTable olapTable, MaterializedIndex index, Long partitionCount) {
|
||||
// partition id start from 31
|
||||
LongStream.range(0, partitionCount).forEach(idx -> {
|
||||
long id = 31 + idx;
|
||||
Partition partition = new Partition(id, "p" + idx, index, new HashDistributionInfo());
|
||||
olapTable.addPartition(partition);
|
||||
olapTable.getPartitionInfo().addPartition(id, new DataProperty(TStorageMedium.HDD),
|
||||
ReplicaAllocation.DEFAULT_ALLOCATION, false);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskRebalancerWithSameUsageDisk() {
|
||||
// init system
|
||||
List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L);
|
||||
beIds.forEach(id -> systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, Lists.newArrayList(512L,512L), 2)));
|
||||
|
||||
olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS,
|
||||
new RangePartitionInfo(), new HashDistributionInfo());
|
||||
db.createTable(olapTable);
|
||||
|
||||
// 1 table, 3 partitions p0,p1,p2
|
||||
MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null);
|
||||
createPartitionsForTable(olapTable, materializedIndex, 3L);
|
||||
olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()),
|
||||
0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
|
||||
|
||||
// Tablet distribution: we add them to olapTable & build invertedIndex manually
|
||||
// all of tablets are in first path of it's backend
|
||||
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD,
|
||||
50000, Lists.newArrayList(10001L, 10002L, 10003L));
|
||||
|
||||
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD,
|
||||
60000, Lists.newArrayList(10001L, 10002L, 10003L));
|
||||
|
||||
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD,
|
||||
70000, Lists.newArrayList(10001L, 10002L, 10003L));
|
||||
|
||||
// case start
|
||||
Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG);
|
||||
|
||||
Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex());
|
||||
generateStatisticMap();
|
||||
rebalancer.updateLoadStatistic(statisticMap);
|
||||
List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
|
||||
// check alternativeTablets;
|
||||
Assert.assertTrue(alternativeTablets.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDiskRebalancerWithDiffUsageDisk() {
|
||||
// init system
|
||||
systemInfoService.addBackend(RebalancerTestUtil.createBackend(10001L, 2048, Lists.newArrayList(1024L), 1));
|
||||
systemInfoService.addBackend(RebalancerTestUtil.createBackend(10002L, 2048, Lists.newArrayList(1024L, 512L), 2));
|
||||
systemInfoService.addBackend(RebalancerTestUtil.createBackend(10003L, 2048, Lists.newArrayList(1024L, 512L, 513L), 3));
|
||||
|
||||
olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS,
|
||||
new RangePartitionInfo(), new HashDistributionInfo());
|
||||
db.createTable(olapTable);
|
||||
|
||||
// 1 table, 3 partitions p0,p1,p2
|
||||
MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null);
|
||||
createPartitionsForTable(olapTable, materializedIndex, 3L);
|
||||
olapTable.setIndexMeta(materializedIndex.getId(), "fake index", Lists.newArrayList(new Column()),
|
||||
0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
|
||||
|
||||
// Tablet distribution: we add them to olapTable & build invertedIndex manually
|
||||
// all of tablets are in first path of it's backend
|
||||
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0", TStorageMedium.HDD,
|
||||
50000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(0L, 100L, 300L));
|
||||
|
||||
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1", TStorageMedium.HDD,
|
||||
60000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(50L, 0L, 200L));
|
||||
|
||||
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2", TStorageMedium.HDD,
|
||||
70000, Lists.newArrayList(10001L, 10002L, 10003L), Lists.newArrayList(100L, 200L, 0L));
|
||||
|
||||
// case start
|
||||
Configurator.setLevel("org.apache.doris.clone.DiskRebalancer", Level.DEBUG);
|
||||
|
||||
Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex());
|
||||
generateStatisticMap();
|
||||
rebalancer.updateLoadStatistic(statisticMap);
|
||||
List<TabletSchedCtx> alternativeTablets = rebalancer.selectAlternativeTablets();
|
||||
// check alternativeTablets;
|
||||
Assert.assertEquals(2, alternativeTablets.size());
|
||||
Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
|
||||
for (Backend be : Catalog.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER)) {
|
||||
if (!backendsWorkingSlots.containsKey(be.getId())) {
|
||||
List<Long> pathHashes = be.getDisks().values().stream().map(DiskInfo::getPathHash).collect(Collectors.toList());
|
||||
PathSlot slot = new PathSlot(pathHashes, Config.schedule_slot_num_per_path);
|
||||
backendsWorkingSlots.put(be.getId(), slot);
|
||||
}
|
||||
}
|
||||
|
||||
for (TabletSchedCtx tabletCtx : alternativeTablets) {
|
||||
LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
|
||||
try {
|
||||
tabletCtx.setStorageMedium(TStorageMedium.HDD);
|
||||
tabletCtx.setTablet(olapTable.getPartition(tabletCtx.getPartitionId()).getIndex(tabletCtx.getIndexId()).getTablet(tabletCtx.getTabletId()));
|
||||
tabletCtx.setVersionInfo(1, 1);
|
||||
tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
|
||||
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first
|
||||
|
||||
AgentTask task = rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots);
|
||||
if (tabletCtx.getTabletSize() == 0) {
|
||||
Assert.fail("no exception");
|
||||
} else {
|
||||
Assert.assertTrue(task instanceof StorageMediaMigrationTask);
|
||||
}
|
||||
} catch (SchedException e) {
|
||||
LOG.info("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTask;
|
||||
@ -182,6 +183,31 @@ public class RebalanceTest {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrioBackends() {
|
||||
Rebalancer rebalancer = new DiskRebalancer(Catalog.getCurrentSystemInfo(), Catalog.getCurrentInvertedIndex());
|
||||
// add
|
||||
{
|
||||
List<Backend> backends = Lists.newArrayList();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
backends.add(RebalancerTestUtil.createBackend(10086 + i, 2048, 0));
|
||||
}
|
||||
rebalancer.addPrioBackends(backends, 1000);
|
||||
Assert.assertTrue(rebalancer.hasPrioBackends());
|
||||
}
|
||||
|
||||
// remove
|
||||
for (int i = 0; i < 3; i++) {
|
||||
List<Backend> backends = Lists.newArrayList(RebalancerTestUtil.createBackend(10086 + i, 2048, 0));
|
||||
rebalancer.removePrioBackends(backends);
|
||||
if (i == 2) {
|
||||
Assert.assertFalse(rebalancer.hasPrioBackends());
|
||||
} else {
|
||||
Assert.assertTrue(rebalancer.hasPrioBackends());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartitionRebalancer() {
|
||||
Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer", Level.DEBUG);
|
||||
@ -218,7 +244,8 @@ public class RebalanceTest {
|
||||
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); // rebalance tablet should be healthy first
|
||||
|
||||
// createCloneReplicaAndTask, create replica will change invertedIndex too.
|
||||
rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots(), batchTask);
|
||||
AgentTask task = rebalancer.createBalanceTask(tabletCtx, tabletScheduler.getBackendsWorkingSlots());
|
||||
batchTask.addTask(task);
|
||||
} catch (SchedException e) {
|
||||
LOG.warn("schedule tablet {} failed: {}", tabletCtx.getTabletId(), e.getMessage());
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.clone;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
@ -40,14 +41,20 @@ public class RebalancerTestUtil {
|
||||
|
||||
// Add only one path, PathHash:id
|
||||
public static Backend createBackend(long id, long totalCap, long usedCap) {
|
||||
return createBackend(id, totalCap, Lists.newArrayList(usedCap), 1);
|
||||
}
|
||||
// size of usedCaps should equal to diskNum
|
||||
public static Backend createBackend(long id, long totalCap, List<Long> usedCaps, int diskNum) {
|
||||
// ip:port won't be checked
|
||||
Backend be = new Backend(id, "192.168.0." + id, 9051);
|
||||
Map<String, DiskInfo> disks = Maps.newHashMap();
|
||||
DiskInfo diskInfo = new DiskInfo("/path1");
|
||||
diskInfo.setPathHash(id);
|
||||
diskInfo.setTotalCapacityB(totalCap);
|
||||
diskInfo.setDataUsedCapacityB(usedCap);
|
||||
disks.put(diskInfo.getRootPath(), diskInfo);
|
||||
for (int i = 0; i < diskNum; i++) {
|
||||
DiskInfo diskInfo = new DiskInfo("/path" + (i + 1));
|
||||
diskInfo.setPathHash(id + i);
|
||||
diskInfo.setTotalCapacityB(totalCap);
|
||||
diskInfo.setDataUsedCapacityB(usedCaps.get(i));
|
||||
disks.put(diskInfo.getRootPath(), diskInfo);
|
||||
}
|
||||
be.setDisks(ImmutableMap.copyOf(disks));
|
||||
be.setAlive(true);
|
||||
be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
@ -59,28 +66,37 @@ public class RebalancerTestUtil {
|
||||
// Only use the partition's baseIndex for simplicity
|
||||
public static void createTablet(TabletInvertedIndex invertedIndex, Database db, OlapTable olapTable, String partitionName, TStorageMedium medium,
|
||||
int tabletId, List<Long> beIds) {
|
||||
createTablet(invertedIndex, db, olapTable, partitionName, medium, tabletId, beIds, null);
|
||||
}
|
||||
public static void createTablet(TabletInvertedIndex invertedIndex, Database db, OlapTable olapTable, String partitionName, TStorageMedium medium,
|
||||
int tabletId, List<Long> beIds, List<Long> replicaSizes) {
|
||||
Partition partition = olapTable.getPartition(partitionName);
|
||||
MaterializedIndex baseIndex = partition.getBaseIndex();
|
||||
int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
|
||||
|
||||
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partition.getId(), baseIndex.getId(),
|
||||
schemaHash, medium);
|
||||
schemaHash, medium);
|
||||
Tablet tablet = new Tablet(tabletId);
|
||||
|
||||
// add tablet to olapTable
|
||||
olapTable.getPartition("p0").getBaseIndex().addTablet(tablet, tabletMeta);
|
||||
createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds);
|
||||
createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds, replicaSizes);
|
||||
}
|
||||
|
||||
// Create replicas on backends which are numbered in beIds.
|
||||
// The tablet & replicas will be added to invertedIndex.
|
||||
public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex, TabletMeta tabletMeta, Tablet tablet, List<Long> beIds) {
|
||||
public static void createReplicasAndAddToIndex(TabletInvertedIndex invertedIndex, TabletMeta tabletMeta,
|
||||
Tablet tablet, List<Long> beIds, List<Long> replicaSizes) {
|
||||
invertedIndex.addTablet(tablet.getId(), tabletMeta);
|
||||
|
||||
IntStream.range(0, beIds.size()).forEach(i -> {
|
||||
Replica replica = new Replica(tablet.getId() + i, beIds.get(i), Replica.ReplicaState.NORMAL, 1, tabletMeta.getOldSchemaHash());
|
||||
// We've set pathHash to beId for simplicity
|
||||
replica.setPathHash(beIds.get(i));
|
||||
if (replicaSizes != null) {
|
||||
// for disk rebalancer, every beId corresponding to a replicaSize
|
||||
replica.updateStat(replicaSizes.get(i), 0);
|
||||
}
|
||||
// isRestore set true, to avoid modifying Catalog.getCurrentInvertedIndex
|
||||
tablet.addReplica(replica, true);
|
||||
invertedIndex.addReplica(tablet.getId(), replica);
|
||||
|
||||
@ -89,6 +89,7 @@ public class AgentTaskTest {
|
||||
private AgentTask rollupTask;
|
||||
private AgentTask schemaChangeTask;
|
||||
private AgentTask cancelDeleteTask;
|
||||
private AgentTask storageMediaMigrationTask;
|
||||
|
||||
@Before
|
||||
public void setUp() throws AnalysisException {
|
||||
@ -140,6 +141,11 @@ public class AgentTaskTest {
|
||||
new SchemaChangeTask(null, backendId1, dbId, tableId, partitionId, indexId1,
|
||||
tabletId1, replicaId1, columns, schemaHash2, schemaHash1,
|
||||
shortKeyNum, storageType, null, 0, TKeysType.AGG_KEYS);
|
||||
|
||||
// storageMediaMigrationTask
|
||||
storageMediaMigrationTask =
|
||||
new StorageMediaMigrationTask(backendId1, tabletId1, schemaHash1, TStorageMedium.HDD);
|
||||
((StorageMediaMigrationTask) storageMediaMigrationTask).setDataDir("/home/a");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -211,6 +217,15 @@ public class AgentTaskTest {
|
||||
Assert.assertEquals(TTaskType.SCHEMA_CHANGE, request6.getTaskType());
|
||||
Assert.assertEquals(schemaChangeTask.getSignature(), request6.getSignature());
|
||||
Assert.assertNotNull(request6.getAlterTabletReq());
|
||||
|
||||
// storageMediaMigrationTask
|
||||
TAgentTaskRequest request7 =
|
||||
(TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, storageMediaMigrationTask);
|
||||
Assert.assertEquals(TTaskType.STORAGE_MEDIUM_MIGRATE, request7.getTaskType());
|
||||
Assert.assertEquals(storageMediaMigrationTask.getSignature(), request7.getSignature());
|
||||
Assert.assertNotNull(request7.getStorageMediumMigrateReq());
|
||||
Assert.assertTrue(request7.getStorageMediumMigrateReq().isSetDataDir());
|
||||
Assert.assertEquals(request7.getStorageMediumMigrateReq().getDataDir(), "/home/a");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user