From 943e0144146351bb563cc92cba0788e89d4b6400 Mon Sep 17 00:00:00 2001 From: wxy Date: Wed, 16 Nov 2022 20:43:07 +0800 Subject: [PATCH] [enhancement](decommission) speed up decommission process (#14028) (#14006) --- .../org/apache/doris/alter/SystemHandler.java | 24 +++++- .../doris/catalog/CatalogRecycleBin.java | 50 ++++++++++++ .../java/org/apache/doris/common/Config.java | 9 +++ .../cluster/DecommissionBackendTest.java | 76 ++++++++++++++++++- .../doris/utframe/TestWithFeService.java | 7 ++ 5 files changed, 162 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index a6fe838fa8..8b6dcd70ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -72,7 +72,7 @@ public class SystemHandler extends AlterHandler { runAlterJobV2(); } - // check all decommissioned backends, if there is no tablet on that backend, drop it. + // check all decommissioned backends, if there is no available tablet on that backend, drop it. private void runAlterJobV2() { SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); @@ -84,10 +84,10 @@ public class SystemHandler extends AlterHandler { } List backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId); - if (backendTabletIds.isEmpty() && Config.drop_backend_after_decommission) { + if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds)) { try { systemInfoService.dropBackend(beId); - LOG.info("no tablet on decommission backend {}, drop it", beId); + LOG.info("no available tablet on decommission backend {}, drop it", beId); } catch (DdlException e) { // does not matter, may be backend not exist LOG.info("backend {} is dropped failed after decommission {}", beId, e.getMessage()); @@ -177,6 +177,24 @@ public class SystemHandler extends AlterHandler { } } + /* + * check if the specified backends can be dropped + * 1. backend does not have any tablet. + * 2. all tablets in backend have been recycled. + */ + private boolean checkTablets(Long beId, List backendTabletIds) { + if (backendTabletIds.isEmpty()) { + return true; + } + if (backendTabletIds.size() < Config.decommission_tablet_check_threshold + && Env.getCurrentRecycleBin().allTabletsInRecycledStatus(backendTabletIds)) { + LOG.info("tablet size is {}, all tablets on decommissioned backend {} have been recycled," + + " so this backend will be dropped immediately", backendTabletIds.size(), beId); + return true; + } + return false; + } + private List checkDecommission(DecommissionBackendClause decommissionBackendClause) throws DdlException { return checkDecommission(decommissionBackendClause.getHostPortPairs()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index e6e609a8ed..ad2a7591bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -70,6 +70,56 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { idToRecycleTime = Maps.newHashMap(); } + public synchronized boolean allTabletsInRecycledStatus(List backendTabletIds) { + Set recycledTabletSet = Sets.newHashSet(); + + Iterator> iterator = idToPartition.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + RecyclePartitionInfo partitionInfo = entry.getValue(); + Partition partition = partitionInfo.getPartition(); + addRecycledTabletsForPartition(recycledTabletSet, partition); + } + + Iterator> tableIter = idToTable.entrySet().iterator(); + while (tableIter.hasNext()) { + Map.Entry entry = tableIter.next(); + RecycleTableInfo tableInfo = entry.getValue(); + Table table = tableInfo.getTable(); + addRecycledTabletsForTable(recycledTabletSet, table); + } + + Iterator> dbIterator = idToDatabase.entrySet().iterator(); + while (dbIterator.hasNext()) { + Map.Entry entry = dbIterator.next(); + RecycleDatabaseInfo dbInfo = entry.getValue(); + Database db = dbInfo.getDb(); + for (Table table : db.getTables()) { + addRecycledTabletsForTable(recycledTabletSet, table); + } + } + + return recycledTabletSet.size() >= backendTabletIds.size() && recycledTabletSet.containsAll(backendTabletIds); + } + + private void addRecycledTabletsForTable(Set recycledTabletSet, Table table) { + if (table.getType() == TableType.OLAP) { + OlapTable olapTable = (OlapTable) table; + Collection allPartitions = olapTable.getAllPartitions(); + for (Partition partition : allPartitions) { + addRecycledTabletsForPartition(recycledTabletSet, partition); + } + } + } + + private void addRecycledTabletsForPartition(Set recycledTabletSet, Partition partition) { + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { + for (Tablet tablet : index.getTablets()) { + recycledTabletSet.add(tablet.getId()); + } + } + } + public synchronized boolean recycleDatabase(Database db, Set tableNames, Set tableIds, boolean isReplay, long replayRecycleTime) { long recycleTime = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index c1bfb504e7..49da79a33a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1314,6 +1314,15 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static boolean drop_backend_after_decommission = true; + /** + * When tablet size of decommissioned backend is lower than this threshold, + * SystemHandler will start to check if all tablets of this backend are in recycled status, + * this backend will be dropped immediately if the check result is true. + * For performance based considerations, better not set a very high value for this. + */ + @ConfField(mutable = true, masterOnly = true) + public static int decommission_tablet_check_threshold = 5000; + /** * Define thrift server's server model, default is TThreadPoolServer model */ diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index 87a6f851c5..6607208bea 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -20,6 +20,7 @@ package org.apache.doris.cluster; import org.apache.doris.analysis.AlterSystemStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.statistics.StatisticConstants; @@ -31,6 +32,8 @@ import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.List; + public class DecommissionBackendTest extends TestWithFeService { @Override @@ -100,7 +103,78 @@ public class DecommissionBackendTest extends TestWithFeService { Assertions.assertEquals(tabletNum + StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT * 2, Env.getCurrentInvertedIndex().getTabletMetaMap().size()); + // 6. add backend + String addBackendStmtStr = "alter system add backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; + AlterSystemStmt addBackendStmt = (AlterSystemStmt) parseAndAnalyzeStmt(addBackendStmtStr); + Env.getCurrentEnv().getAlterInstance().processAlterCluster(addBackendStmt); + Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size()); + + } + + @Test + public void testDecommissionBackendWithDropTable() throws Exception { + // 1. create connect context + connectContext = createDefaultCtx(); + + ImmutableMap idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend(); + Assertions.assertEquals(backendNum(), idToBackendRef.size()); + + // 2. create database db2 + createDatabase("db2"); + System.out.println(Env.getCurrentInternalCatalog().getDbNames()); + + // 3. create table tbl1 tbl2 + createTable("create table db2.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '2');"); + createTable("create table db2.tbl2(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); + + // 4. query tablet num + int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); + Assertions.assertTrue(tabletNum > 0); + + Backend srcBackend = null; + for (Backend backend : idToBackendRef.values()) { + if (Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).size() > 0) { + srcBackend = backend; + break; + } + } + Assertions.assertTrue(srcBackend != null); + + // 5. drop table tbl1 + dropTable("db2.tbl1", false); + + // 6. execute decommission + String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; + AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr); + Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt); + Assertions.assertEquals(true, srcBackend.isDecommissioned()); + + long startTimestamp = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTimestamp < 90000 + && Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) { + Thread.sleep(1000); + } + + // BE has been dropped successfully + Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size()); + + // tbl1 has been dropped successfully + final String sql = "show create table db2.tbl1;"; + Assertions.assertThrows(AnalysisException.class, () -> showCreateTable(sql)); + + // TabletInvertedIndex still holds these tablets of srcBackend, but they are all in recycled status + List tabletList = Env.getCurrentInvertedIndex().getTabletIdsByBackendId(srcBackend.getId()); + Assertions.assertTrue(tabletList.size() > 0); + Assertions.assertTrue(Env.getCurrentRecycleBin().allTabletsInRecycledStatus(tabletList)); + + // recover tbl1, because tbl1 has more than one replica, so it still can be recovered + Assertions.assertDoesNotThrow(() -> recoverTable("db2.tbl1")); + Assertions.assertDoesNotThrow(() -> showCreateTable(sql)); + + String addBackendStmtStr = "alter system add backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; + AlterSystemStmt addBackendStmt = (AlterSystemStmt) parseAndAnalyzeStmt(addBackendStmtStr); + Env.getCurrentEnv().getAlterInstance().processAlterCluster(addBackendStmt); + Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size()); } } - diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index d89e029b7e..500bcce627 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -32,6 +32,7 @@ import org.apache.doris.analysis.DropPolicyStmt; import org.apache.doris.analysis.DropSqlBlockRuleStmt; import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.ExplainOptions; +import org.apache.doris.analysis.RecoverTableStmt; import org.apache.doris.analysis.ShowCreateTableStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; @@ -498,6 +499,12 @@ public abstract class TestWithFeService { Env.getCurrentEnv().dropTable(dropTableStmt); } + public void recoverTable(String table) throws Exception { + RecoverTableStmt recoverTableStmt = (RecoverTableStmt) parseAndAnalyzeStmt( + "recover table " + table + ";", connectContext); + Env.getCurrentEnv().recoverTable(recoverTableStmt); + } + public void createTableAsSelect(String sql) throws Exception { CreateTableAsSelectStmt createTableAsSelectStmt = (CreateTableAsSelectStmt) parseAndAnalyzeStmt(sql); Env.getCurrentEnv().createTableAsSelect(createTableAsSelectStmt);