diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 1af5cb5ad9..fbe60b74c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.clone.TabletSchedCtx; import org.apache.doris.clone.TabletSchedCtx.Priority; import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Writable; import org.apache.doris.resource.Tag; @@ -429,8 +430,8 @@ public class Tablet extends MetaObject implements Writable { ArrayList versions = new ArrayList<>(); for (Replica replica : replicas) { Backend backend = systemInfoService.getBackend(replica.getBackendId()); - if (backend == null || !backend.isAlive() || !replica.isAlive() || !hosts.add(backend.getHost()) - || replica.tooSlow() || !backend.isMixNode()) { + if (backend == null || !backend.isAlive() || !replica.isAlive() + || checkHost(hosts, backend) || replica.tooSlow() || !backend.isMixNode()) { // this replica is not alive, // or if this replica is on same host with another replica, we also treat it as 'dead', // so that Tablet Scheduler will create a new replica on different host. @@ -563,6 +564,10 @@ public class Tablet extends MetaObject implements Writable { return Pair.of(TabletStatus.HEALTHY, TabletSchedCtx.Priority.NORMAL); } + private boolean checkHost(Set hosts, Backend backend) { + return !Config.allow_replica_on_same_host && !FeConstants.runningUnitTest && !hosts.add(backend.getHost()); + } + /** * Check colocate table's tablet health * 1. Mismatch: diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 108eef9c1d..990153a00a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -480,10 +480,10 @@ public class TabletSchedCtx implements Comparable { } /* - * check if existing replicas are on same BE. + * check if existing replicas are on same BE or Host. * database lock should be held. */ - public boolean containsBE(long beId) { + public boolean filterDestBE(long beId) { Backend backend = infoService.getBackend(beId); if (backend == null) { // containsBE() is currently only used for choosing dest backend to do clone task. @@ -497,11 +497,10 @@ public class TabletSchedCtx implements Comparable { // BE has been dropped, skip it continue; } - if (host.equals(be.getHost())) { + if (!Config.allow_replica_on_same_host && !FeConstants.runningUnitTest && host.equals(be.getHost())) { return true; } - // actually there is no need to check BE id anymore, because if hosts are not same, BE ids are - // not same either. But for psychological comfort, leave this check here. + if (replica.getBackendId() == beId) { return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 5a73d0230c..294dc3fcea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1314,8 +1314,8 @@ public class TabletScheduler extends MasterDaemon { continue; } - // exclude host which already has replica of this tablet - if (tabletCtx.containsBE(bes.getBeId())) { + // exclude BE which already has replica of this tablet or another BE at same host has this replica + if (tabletCtx.filterDestBE(bes.getBeId())) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index f4141bf36c..de5197ace4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1578,7 +1578,7 @@ public class Config extends ConfigBase { /* * If set to true, when creating table, Doris will allow to locate replicas of a tablet - * on same host. And also the tablet repair and balance will be disabled. + * on same host. * This is only for local test, so that we can deploy multi BE on same host and create table * with multi replicas. * DO NOT use it for production env. diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java new file mode 100644 index 0000000000..07faf89a0e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cluster; + +import org.apache.doris.analysis.AlterSystemStmt; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.system.Backend; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DecommissionBackendTest extends TestWithFeService { + + @Override + protected int backendNum() { + return 3; + } + + @Override + protected void beforeCreatingConnectContext() throws Exception { + FeConstants.default_scheduler_interval_millisecond = 1000; + FeConstants.tablet_checker_interval_ms = 1000; + Config.tablet_repair_delay_factor_second = 1; + Config.allow_replica_on_same_host = true; + } + + @Test + public void testDecommissionBackend() throws Exception { + // 1. create connect context + connectContext = createDefaultCtx(); + + ImmutableMap idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend(); + Assertions.assertEquals(backendNum(), idToBackendRef.size()); + + // 2. create database db1 + createDatabase("db1"); + System.out.println(Env.getCurrentInternalCatalog().getDbNames()); + + // 3. create table tbl1 + createTable("create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); + + // 4. query tablet num + int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); + + // 5. execute decommission + Backend srcBackend = null; + for (Backend backend : idToBackendRef.values()) { + if (Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).size() > 0) { + srcBackend = backend; + break; + } + } + + Assertions.assertTrue(srcBackend != null); + String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; + AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); + Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + + Assertions.assertEquals(true, srcBackend.isDecommissioned()); + long startTimestamp = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTimestamp < 90000 + && Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) { + Thread.sleep(1000); + } + + Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size()); + Assertions.assertEquals(tabletNum, Env.getCurrentInvertedIndex().getTabletMetaMap().size()); + + } + +} + diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index c8f8f867bf..d89e029b7e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -41,7 +41,9 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -149,6 +151,11 @@ public abstract class TestWithFeService { protected void runBeforeEach() throws Exception { } + // Override this method if you want to start multi BE + protected int backendNum() { + return 1; + } + // Help to create a mocked ConnectContext. protected ConnectContext createDefaultCtx() throws IOException { return createCtx(UserIdentity.ROOT, "127.0.0.1"); @@ -248,6 +255,18 @@ public abstract class TestWithFeService { protected int startFEServer(String runningDir) throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, InterruptedException { + IOException exception = null; + try { + return startFEServerWithoutRetry(runningDir); + } catch (IOException ignore) { + exception = ignore; + } + throw exception; + } + + protected int startFEServerWithoutRetry(String runningDir) + throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, + InterruptedException { // get DORIS_HOME dorisHome = System.getenv("DORIS_HOME"); if (Strings.isNullOrEmpty(dorisHome)) { @@ -284,7 +303,7 @@ public abstract class TestWithFeService { protected void createDorisCluster() throws InterruptedException, NotInitException, IOException, DdlException, EnvVarNotSetException, FeStartException { - createDorisCluster(runningDir, 1); + createDorisCluster(runningDir, backendNum()); } protected void createDorisCluster(String runningDir, int backendNum) @@ -335,6 +354,18 @@ public abstract class TestWithFeService { } protected Backend createBackend(String beHost, int feRpcPort) throws IOException, InterruptedException { + IOException exception = null; + for (int i = 0; i <= 3; i++) { + try { + return createBackendWithoutRetry(beHost, feRpcPort); + } catch (IOException ignore) { + exception = ignore; + } + } + throw exception; + } + + private Backend createBackendWithoutRetry(String beHost, int feRpcPort) throws IOException, InterruptedException { int beHeartbeatPort = findValidPort(); int beThriftPort = findValidPort(); int beBrpcPort = findValidPort(); @@ -350,6 +381,7 @@ public abstract class TestWithFeService { // add be Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort()); DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId()); + diskInfo1.setPathHash(be.getId()); diskInfo1.setTotalCapacityB(1000000); diskInfo1.setAvailableCapacityB(500000); diskInfo1.setDataUsedCapacityB(480000); @@ -476,6 +508,7 @@ public abstract class TestWithFeService { CreateTableStmt stmt = (CreateTableStmt) parseAndAnalyzeStmt(sql); Env.getCurrentEnv().createTable(stmt); } + updateReplicaPathHash(); } public void createView(String sql) throws Exception { @@ -545,6 +578,26 @@ public abstract class TestWithFeService { Thread.sleep(100); } + private void updateReplicaPathHash() { + com.google.common.collect.Table replicaMetaTable = Env.getCurrentInvertedIndex().getReplicaMetaTable(); + for (com.google.common.collect.Table.Cell cell : replicaMetaTable.cellSet()) { + long beId = cell.getColumnKey(); + Backend be = Env.getCurrentSystemInfo().getBackend(beId); + if (be == null) { + continue; + } + Replica replica = cell.getValue(); + TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey()); + ImmutableMap diskMap = be.getDisks(); + for (DiskInfo diskInfo : diskMap.values()) { + if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) { + replica.setPathHash(diskInfo.getPathHash()); + break; + } + } + } + } + private void checkAlterJob() throws InterruptedException { // check alter job Map alterJobs = Env.getCurrentEnv().getMaterializedViewHandler().getAlterJobsV2();