This commit is contained in:
@ -72,7 +72,7 @@ public class SystemHandler extends AlterHandler {
|
||||
runAlterJobV2();
|
||||
}
|
||||
|
||||
// check all decommissioned backends, if there is no tablet on that backend, drop it.
|
||||
// check all decommissioned backends, if there is no available tablet on that backend, drop it.
|
||||
private void runAlterJobV2() {
|
||||
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
|
||||
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
|
||||
@ -84,10 +84,10 @@ public class SystemHandler extends AlterHandler {
|
||||
}
|
||||
|
||||
List<Long> backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId);
|
||||
if (backendTabletIds.isEmpty() && Config.drop_backend_after_decommission) {
|
||||
if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds)) {
|
||||
try {
|
||||
systemInfoService.dropBackend(beId);
|
||||
LOG.info("no tablet on decommission backend {}, drop it", beId);
|
||||
LOG.info("no available tablet on decommission backend {}, drop it", beId);
|
||||
} catch (DdlException e) {
|
||||
// does not matter, may be backend not exist
|
||||
LOG.info("backend {} is dropped failed after decommission {}", beId, e.getMessage());
|
||||
@ -177,6 +177,24 @@ public class SystemHandler extends AlterHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* check if the specified backends can be dropped
|
||||
* 1. backend does not have any tablet.
|
||||
* 2. all tablets in backend have been recycled.
|
||||
*/
|
||||
private boolean checkTablets(Long beId, List<Long> backendTabletIds) {
|
||||
if (backendTabletIds.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
if (backendTabletIds.size() < Config.decommission_tablet_check_threshold
|
||||
&& Env.getCurrentRecycleBin().allTabletsInRecycledStatus(backendTabletIds)) {
|
||||
LOG.info("tablet size is {}, all tablets on decommissioned backend {} have been recycled,"
|
||||
+ " so this backend will be dropped immediately", backendTabletIds.size(), beId);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private List<Backend> checkDecommission(DecommissionBackendClause decommissionBackendClause)
|
||||
throws DdlException {
|
||||
return checkDecommission(decommissionBackendClause.getHostPortPairs());
|
||||
|
||||
@ -70,6 +70,56 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
|
||||
idToRecycleTime = Maps.newHashMap();
|
||||
}
|
||||
|
||||
public synchronized boolean allTabletsInRecycledStatus(List<Long> backendTabletIds) {
|
||||
Set<Long> recycledTabletSet = Sets.newHashSet();
|
||||
|
||||
Iterator<Map.Entry<Long, RecyclePartitionInfo>> iterator = idToPartition.entrySet().iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<Long, RecyclePartitionInfo> entry = iterator.next();
|
||||
RecyclePartitionInfo partitionInfo = entry.getValue();
|
||||
Partition partition = partitionInfo.getPartition();
|
||||
addRecycledTabletsForPartition(recycledTabletSet, partition);
|
||||
}
|
||||
|
||||
Iterator<Map.Entry<Long, RecycleTableInfo>> tableIter = idToTable.entrySet().iterator();
|
||||
while (tableIter.hasNext()) {
|
||||
Map.Entry<Long, RecycleTableInfo> entry = tableIter.next();
|
||||
RecycleTableInfo tableInfo = entry.getValue();
|
||||
Table table = tableInfo.getTable();
|
||||
addRecycledTabletsForTable(recycledTabletSet, table);
|
||||
}
|
||||
|
||||
Iterator<Map.Entry<Long, RecycleDatabaseInfo>> dbIterator = idToDatabase.entrySet().iterator();
|
||||
while (dbIterator.hasNext()) {
|
||||
Map.Entry<Long, RecycleDatabaseInfo> entry = dbIterator.next();
|
||||
RecycleDatabaseInfo dbInfo = entry.getValue();
|
||||
Database db = dbInfo.getDb();
|
||||
for (Table table : db.getTables()) {
|
||||
addRecycledTabletsForTable(recycledTabletSet, table);
|
||||
}
|
||||
}
|
||||
|
||||
return recycledTabletSet.size() >= backendTabletIds.size() && recycledTabletSet.containsAll(backendTabletIds);
|
||||
}
|
||||
|
||||
private void addRecycledTabletsForTable(Set<Long> recycledTabletSet, Table table) {
|
||||
if (table.getType() == TableType.OLAP) {
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
Collection<Partition> allPartitions = olapTable.getAllPartitions();
|
||||
for (Partition partition : allPartitions) {
|
||||
addRecycledTabletsForPartition(recycledTabletSet, partition);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addRecycledTabletsForPartition(Set<Long> recycledTabletSet, Partition partition) {
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
recycledTabletSet.add(tablet.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized boolean recycleDatabase(Database db, Set<String> tableNames, Set<Long> tableIds,
|
||||
boolean isReplay, long replayRecycleTime) {
|
||||
long recycleTime = 0;
|
||||
|
||||
@ -1314,6 +1314,15 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static boolean drop_backend_after_decommission = true;
|
||||
|
||||
/**
|
||||
* When tablet size of decommissioned backend is lower than this threshold,
|
||||
* SystemHandler will start to check if all tablets of this backend are in recycled status,
|
||||
* this backend will be dropped immediately if the check result is true.
|
||||
* For performance based considerations, better not set a very high value for this.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int decommission_tablet_check_threshold = 5000;
|
||||
|
||||
/**
|
||||
* Define thrift server's server model, default is TThreadPoolServer model
|
||||
*/
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.cluster;
|
||||
import org.apache.doris.analysis.AlterSystemStmt;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
@ -31,6 +32,8 @@ import com.google.common.collect.ImmutableMap;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class DecommissionBackendTest extends TestWithFeService {
|
||||
|
||||
@Override
|
||||
@ -100,7 +103,78 @@ public class DecommissionBackendTest extends TestWithFeService {
|
||||
Assertions.assertEquals(tabletNum + StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT * 2,
|
||||
Env.getCurrentInvertedIndex().getTabletMetaMap().size());
|
||||
|
||||
// 6. add backend
|
||||
String addBackendStmtStr = "alter system add backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
|
||||
AlterSystemStmt addBackendStmt = (AlterSystemStmt) parseAndAnalyzeStmt(addBackendStmtStr);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(addBackendStmt);
|
||||
Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecommissionBackendWithDropTable() throws Exception {
|
||||
// 1. create connect context
|
||||
connectContext = createDefaultCtx();
|
||||
|
||||
ImmutableMap<Long, Backend> idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend();
|
||||
Assertions.assertEquals(backendNum(), idToBackendRef.size());
|
||||
|
||||
// 2. create database db2
|
||||
createDatabase("db2");
|
||||
System.out.println(Env.getCurrentInternalCatalog().getDbNames());
|
||||
|
||||
// 3. create table tbl1 tbl2
|
||||
createTable("create table db2.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '2');");
|
||||
createTable("create table db2.tbl2(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
|
||||
|
||||
// 4. query tablet num
|
||||
int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size();
|
||||
Assertions.assertTrue(tabletNum > 0);
|
||||
|
||||
Backend srcBackend = null;
|
||||
for (Backend backend : idToBackendRef.values()) {
|
||||
if (Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).size() > 0) {
|
||||
srcBackend = backend;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assertions.assertTrue(srcBackend != null);
|
||||
|
||||
// 5. drop table tbl1
|
||||
dropTable("db2.tbl1", false);
|
||||
|
||||
// 6. execute decommission
|
||||
String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
|
||||
AlterSystemStmt decommissionStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionStmtStr);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
|
||||
Assertions.assertEquals(true, srcBackend.isDecommissioned());
|
||||
|
||||
long startTimestamp = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - startTimestamp < 90000
|
||||
&& Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
// BE has been dropped successfully
|
||||
Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size());
|
||||
|
||||
// tbl1 has been dropped successfully
|
||||
final String sql = "show create table db2.tbl1;";
|
||||
Assertions.assertThrows(AnalysisException.class, () -> showCreateTable(sql));
|
||||
|
||||
// TabletInvertedIndex still holds these tablets of srcBackend, but they are all in recycled status
|
||||
List<Long> tabletList = Env.getCurrentInvertedIndex().getTabletIdsByBackendId(srcBackend.getId());
|
||||
Assertions.assertTrue(tabletList.size() > 0);
|
||||
Assertions.assertTrue(Env.getCurrentRecycleBin().allTabletsInRecycledStatus(tabletList));
|
||||
|
||||
// recover tbl1, because tbl1 has more than one replica, so it still can be recovered
|
||||
Assertions.assertDoesNotThrow(() -> recoverTable("db2.tbl1"));
|
||||
Assertions.assertDoesNotThrow(() -> showCreateTable(sql));
|
||||
|
||||
String addBackendStmtStr = "alter system add backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
|
||||
AlterSystemStmt addBackendStmt = (AlterSystemStmt) parseAndAnalyzeStmt(addBackendStmtStr);
|
||||
Env.getCurrentEnv().getAlterInstance().processAlterCluster(addBackendStmt);
|
||||
Assertions.assertEquals(backendNum(), Env.getCurrentSystemInfo().getIdToBackend().size());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -32,6 +32,7 @@ import org.apache.doris.analysis.DropPolicyStmt;
|
||||
import org.apache.doris.analysis.DropSqlBlockRuleStmt;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.analysis.ExplainOptions;
|
||||
import org.apache.doris.analysis.RecoverTableStmt;
|
||||
import org.apache.doris.analysis.ShowCreateTableStmt;
|
||||
import org.apache.doris.analysis.SqlParser;
|
||||
import org.apache.doris.analysis.SqlScanner;
|
||||
@ -498,6 +499,12 @@ public abstract class TestWithFeService {
|
||||
Env.getCurrentEnv().dropTable(dropTableStmt);
|
||||
}
|
||||
|
||||
public void recoverTable(String table) throws Exception {
|
||||
RecoverTableStmt recoverTableStmt = (RecoverTableStmt) parseAndAnalyzeStmt(
|
||||
"recover table " + table + ";", connectContext);
|
||||
Env.getCurrentEnv().recoverTable(recoverTableStmt);
|
||||
}
|
||||
|
||||
public void createTableAsSelect(String sql) throws Exception {
|
||||
CreateTableAsSelectStmt createTableAsSelectStmt = (CreateTableAsSelectStmt) parseAndAnalyzeStmt(sql);
|
||||
Env.getCurrentEnv().createTableAsSelect(createTableAsSelectStmt);
|
||||
|
||||
Reference in New Issue
Block a user