[enhancement](test) support tablet repair and balance process in ut (#13940)
This commit is contained in:
@ -21,6 +21,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.clone.TabletSchedCtx;
|
||||
import org.apache.doris.clone.TabletSchedCtx.Priority;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.resource.Tag;
|
||||
@ -429,8 +430,8 @@ public class Tablet extends MetaObject implements Writable {
|
||||
ArrayList<Long> versions = new ArrayList<>();
|
||||
for (Replica replica : replicas) {
|
||||
Backend backend = systemInfoService.getBackend(replica.getBackendId());
|
||||
if (backend == null || !backend.isAlive() || !replica.isAlive() || !hosts.add(backend.getHost())
|
||||
|| replica.tooSlow() || !backend.isMixNode()) {
|
||||
if (backend == null || !backend.isAlive() || !replica.isAlive()
|
||||
|| checkHost(hosts, backend) || replica.tooSlow() || !backend.isMixNode()) {
|
||||
// this replica is not alive,
|
||||
// or if this replica is on same host with another replica, we also treat it as 'dead',
|
||||
// so that Tablet Scheduler will create a new replica on different host.
|
||||
@ -563,6 +564,10 @@ public class Tablet extends MetaObject implements Writable {
|
||||
return Pair.of(TabletStatus.HEALTHY, TabletSchedCtx.Priority.NORMAL);
|
||||
}
|
||||
|
||||
private boolean checkHost(Set<String> hosts, Backend backend) {
|
||||
return !Config.allow_replica_on_same_host && !FeConstants.runningUnitTest && !hosts.add(backend.getHost());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check colocate table's tablet health
|
||||
* 1. Mismatch:
|
||||
|
||||
@ -480,10 +480,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
}
|
||||
|
||||
/*
|
||||
* check if existing replicas are on same BE.
|
||||
* check if existing replicas are on same BE or Host.
|
||||
* database lock should be held.
|
||||
*/
|
||||
public boolean containsBE(long beId) {
|
||||
public boolean filterDestBE(long beId) {
|
||||
Backend backend = infoService.getBackend(beId);
|
||||
if (backend == null) {
|
||||
// containsBE() is currently only used for choosing dest backend to do clone task.
|
||||
@ -497,11 +497,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
// BE has been dropped, skip it
|
||||
continue;
|
||||
}
|
||||
if (host.equals(be.getHost())) {
|
||||
if (!Config.allow_replica_on_same_host && !FeConstants.runningUnitTest && host.equals(be.getHost())) {
|
||||
return true;
|
||||
}
|
||||
// actually there is no need to check BE id anymore, because if hosts are not same, BE ids are
|
||||
// not same either. But for psychological comfort, leave this check here.
|
||||
|
||||
if (replica.getBackendId() == beId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -1314,8 +1314,8 @@ public class TabletScheduler extends MasterDaemon {
|
||||
continue;
|
||||
}
|
||||
|
||||
// exclude host which already has replica of this tablet
|
||||
if (tabletCtx.containsBE(bes.getBeId())) {
|
||||
// exclude BE which already has replica of this tablet or another BE at same host has this replica
|
||||
if (tabletCtx.filterDestBE(bes.getBeId())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@ -1578,7 +1578,7 @@ public class Config extends ConfigBase {
|
||||
|
||||
/*
|
||||
* If set to true, when creating table, Doris will allow to locate replicas of a tablet
|
||||
* on same host. And also the tablet repair and balance will be disabled.
|
||||
* on same host.
|
||||
* This is only for local test, so that we can deploy multi BE on same host and create table
|
||||
* with multi replicas.
|
||||
* DO NOT use it for production env.
|
||||
|
||||
@ -0,0 +1,91 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.cluster;
|
||||
|
||||
import org.apache.doris.analysis.AlterSystemStmt;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class DecommissionBackendTest extends TestWithFeService {
|
||||
|
||||
@Override
|
||||
protected int backendNum() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeCreatingConnectContext() throws Exception {
|
||||
FeConstants.default_scheduler_interval_millisecond = 1000;
|
||||
FeConstants.tablet_checker_interval_ms = 1000;
|
||||
Config.tablet_repair_delay_factor_second = 1;
|
||||
Config.allow_replica_on_same_host = true;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecommissionBackend() throws Exception {
|
||||
// 1. create connect context
|
||||
connectContext = createDefaultCtx();
|
||||
|
||||
ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend();
|
||||
Assertions.assertEquals(backendNum(), idToBackendRef.size());
|
||||
|
||||
// 2. create database db1
|
||||
createDatabase("db1");
|
||||
System.out.println(Env.getCurrentInternalCatalog().getDbNames());
|
||||
|
||||
// 3. create table tbl1
|
||||
createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
|
||||
|
||||
// 4. query tablet num
|
||||
int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size();
|
||||
|
||||
// 5. execute decommission
|
||||
Backend srcBackend = null;
|
||||
for (Backend backend : idToBackendRef.values()) {
|
||||
if (Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).size() > 0) {
|
||||
srcBackend = backend;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assertions.assertTrue(srcBackend != null);
|
||||
String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
|
||||
AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
|
||||
|
||||
Assertions.assertEquals(true, srcBackend.isDecommissioned());
|
||||
long startTimestamp = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTimestamp < 90000
|
||||
&& Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size());
|
||||
Assertions.assertEquals(tabletNum, Env.getCurrentInvertedIndex().getTabletMetaMap().size());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -41,7 +41,9 @@ import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DiskInfo;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -149,6 +151,11 @@ public abstract class TestWithFeService {
|
||||
protected void runBeforeEach() throws Exception {
|
||||
}
|
||||
|
||||
// Override this method if you want to start multi BE
|
||||
protected int backendNum() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Help to create a mocked ConnectContext.
|
||||
protected ConnectContext createDefaultCtx() throws IOException {
|
||||
return createCtx(UserIdentity.ROOT, "127.0.0.1");
|
||||
@ -248,6 +255,18 @@ public abstract class TestWithFeService {
|
||||
protected int startFEServer(String runningDir)
|
||||
throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException,
|
||||
InterruptedException {
|
||||
IOException exception = null;
|
||||
try {
|
||||
return startFEServerWithoutRetry(runningDir);
|
||||
} catch (IOException ignore) {
|
||||
exception = ignore;
|
||||
}
|
||||
throw exception;
|
||||
}
|
||||
|
||||
protected int startFEServerWithoutRetry(String runningDir)
|
||||
throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException,
|
||||
InterruptedException {
|
||||
// get DORIS_HOME
|
||||
dorisHome = System.getenv("DORIS_HOME");
|
||||
if (Strings.isNullOrEmpty(dorisHome)) {
|
||||
@ -284,7 +303,7 @@ public abstract class TestWithFeService {
|
||||
protected void createDorisCluster()
|
||||
throws InterruptedException, NotInitException, IOException, DdlException, EnvVarNotSetException,
|
||||
FeStartException {
|
||||
createDorisCluster(runningDir, 1);
|
||||
createDorisCluster(runningDir, backendNum());
|
||||
}
|
||||
|
||||
protected void createDorisCluster(String runningDir, int backendNum)
|
||||
@ -335,6 +354,18 @@ public abstract class TestWithFeService {
|
||||
}
|
||||
|
||||
protected Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException {
|
||||
IOException exception = null;
|
||||
for (int i = 0; i <= 3; i++) {
|
||||
try {
|
||||
return createBackendWithoutRetry(beHost, feRpcPort);
|
||||
} catch (IOException ignore) {
|
||||
exception = ignore;
|
||||
}
|
||||
}
|
||||
throw exception;
|
||||
}
|
||||
|
||||
private Backend createBackendWithoutRetry(String beHost, int feRpcPort) throws IOException, InterruptedException {
|
||||
int beHeartbeatPort = findValidPort();
|
||||
int beThriftPort = findValidPort();
|
||||
int beBrpcPort = findValidPort();
|
||||
@ -350,6 +381,7 @@ public abstract class TestWithFeService {
|
||||
// add be
|
||||
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
|
||||
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
|
||||
diskInfo1.setPathHash(be.getId());
|
||||
diskInfo1.setTotalCapacityB(1000000);
|
||||
diskInfo1.setAvailableCapacityB(500000);
|
||||
diskInfo1.setDataUsedCapacityB(480000);
|
||||
@ -476,6 +508,7 @@ public abstract class TestWithFeService {
|
||||
CreateTableStmt stmt = (CreateTableStmt) parseAndAnalyzeStmt(sql);
|
||||
Env.getCurrentEnv().createTable(stmt);
|
||||
}
|
||||
updateReplicaPathHash();
|
||||
}
|
||||
|
||||
public void createView(String sql) throws Exception {
|
||||
@ -545,6 +578,26 @@ public abstract class TestWithFeService {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
private void updateReplicaPathHash() {
|
||||
com.google.common.collect.Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex().getReplicaMetaTable();
|
||||
for (com.google.common.collect.Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) {
|
||||
long beId = cell.getColumnKey();
|
||||
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
|
||||
if (be == null) {
|
||||
continue;
|
||||
}
|
||||
Replica replica = cell.getValue();
|
||||
TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
|
||||
ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
|
||||
for (DiskInfo diskInfo : diskMap.values()) {
|
||||
if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) {
|
||||
replica.setPathHash(diskInfo.getPathHash());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkAlterJob() throws InterruptedException {
|
||||
// check alter job
|
||||
Map<Long, AlterJobV2> alterJobs = Env.getCurrentEnv().getMaterializedViewHandler().getAlterJobsV2();
|
||||
|
||||
Reference in New Issue
Block a user