[fix](decommission) fix cann't decommission mtmv (#33823)

This commit is contained in:
yujun
2024-04-24 22:22:03 +08:00
committed by yiguolei
parent a15a8e119f
commit 450f443413
13 changed files with 291 additions and 43 deletions

View File

@ -314,7 +314,7 @@ public class SystemHandler extends AlterHandler {
for (Table table : db.getTables()) {
table.readLock();
try {
if (!table.needSchedule()) {
if (!table.isManagedTable()) {
continue;
}

View File

@ -564,31 +564,6 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
return table;
}
/*
* 1. Only schedule OLAP table.
* 2. If table is colocate with other table, not schedule it.
* 3. (deprecated). if table's state is ROLLUP or SCHEMA_CHANGE, but alter job's state is FINISHING, we should also
* schedule the tablet to repair it(only for VERSION_INCOMPLETE case, this will be checked in
* TabletScheduler).
* 4. Even if table's state is ROLLUP or SCHEMA_CHANGE, check it. Because we can repair the tablet of base index.
*/
public boolean needSchedule() {
if (type != TableType.OLAP) {
return false;
}
OlapTable olapTable = (OlapTable) this;
if (Env.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("table {} is a colocate table, skip tablet checker.", name);
}
return false;
}
return true;
}
public boolean isHasCompoundKey() {
return hasCompoundKey;
}

View File

@ -852,6 +852,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
int targetSeqIndex = -1;
long minDataSizeDiff = Long.MAX_VALUE;
boolean destBeContainsAllBuckets = true;
boolean theSameHostContainsAllBuckets = true;
for (int seqIndex : seqIndexes) {
// the bucket index.
// eg: 0 / 3 = 0, so that the bucket index of the 4th backend id in flatBackendsPerBucketSeq is 0.
@ -859,9 +861,15 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
List<Long> backendsSet = backendsPerBucketSeq.get(bucketIndex);
List<String> hostsSet = hostsPerBucketSeq.get(bucketIndex);
// the replicas of a tablet can not locate in same Backend or same host
if (backendsSet.contains(destBeId) || hostsSet.contains(destBe.getHost())) {
if (backendsSet.contains(destBeId)) {
continue;
}
destBeContainsAllBuckets = false;
if (!Config.allow_replica_on_same_host && hostsSet.contains(destBe.getHost())) {
continue;
}
theSameHostContainsAllBuckets = false;
Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId);
long bucketDataSize =
@ -890,8 +898,19 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
if (targetSeqIndex < 0) {
// we use next node as dst node
LOG.info("unable to replace backend {} with backend {} in colocate group {}",
srcBeId, destBeId, groupId);
String failedReason;
if (destBeContainsAllBuckets) {
failedReason = "dest be contains all the same buckets";
} else if (theSameHostContainsAllBuckets) {
failedReason = "dest be's host contains all the same buckets "
+ "and Config.allow_replica_on_same_host=false";
} else {
failedReason = "dest be has no fit path, maybe disk usage is exceeds "
+ "Config.storage_high_watermark_usage_percent";
}
LOG.info("unable to replace backend {} with dest backend {} in colocate group {}, "
+ "failed reason: {}",
srcBeId, destBeId, groupId, failedReason);
continue;
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.clone;
import org.apache.doris.analysis.AdminCancelRepairTableStmt;
import org.apache.doris.analysis.AdminRepairTableStmt;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
@ -241,6 +242,8 @@ public class TabletChecker extends MasterDaemon {
copiedPrios = HashBasedTable.create(prios);
}
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
OUT:
for (long dbId : copiedPrios.rowKeySet()) {
Database db = env.getInternalCatalog().getDbNullable(dbId);
@ -250,17 +253,21 @@ public class TabletChecker extends MasterDaemon {
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
Map<Long, Set<PrioPart>> tblPartMap = copiedPrios.row(dbId);
for (long tblId : tblPartMap.keySet()) {
OlapTable tbl = (OlapTable) db.getTableNullable(tblId);
if (tbl == null) {
Table tbl = db.getTableNullable(tblId);
if (tbl == null || !tbl.isManagedTable()) {
continue;
}
tbl.readLock();
OlapTable olapTable = (OlapTable) tbl;
olapTable.readLock();
try {
if (!tbl.needSchedule()) {
if (colocateTableIndex.isColocateTable(olapTable.getId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("table {} is a colocate table, skip tablet checker.", olapTable.getName());
}
continue;
}
for (Partition partition : tbl.getAllPartitions()) {
LoopControlStatus st = handlePartitionTablet(db, tbl, partition, true, aliveBeIds, start,
for (Partition partition : olapTable.getAllPartitions()) {
LoopControlStatus st = handlePartitionTablet(db, olapTable, partition, true, aliveBeIds, start,
counter);
if (st == LoopControlStatus.BREAK_OUT) {
break OUT;
@ -269,7 +276,7 @@ public class TabletChecker extends MasterDaemon {
}
}
} finally {
tbl.readUnlock();
olapTable.readUnlock();
}
}
}
@ -291,9 +298,16 @@ public class TabletChecker extends MasterDaemon {
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
for (Table table : tableList) {
if (!table.isManagedTable()) {
continue;
}
table.readLock();
try {
if (!table.needSchedule()) {
if (colocateTableIndex.isColocateTable(table.getId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("table {} is a colocate table, skip tablet checker.", table.getName());
}
continue;
}