From 0251cb89416a4d933d77fd721a270288fe124ea7 Mon Sep 17 00:00:00 2001 From: plat1ko Date: Sun, 26 Feb 2023 12:36:55 +0800 Subject: [PATCH] [fix](cooldown) Handle re-add replica with cooldowned data #17047 Modify rule of choosing cooldown replica, only alive replica can be cooldown replica. Handle re-add replica with cooldowned data. --- .../doris/catalog/TabletInvertedIndex.java | 10 +- .../doris/cooldown/CooldownConfHandler.java | 5 +- .../apache/doris/master/ReportHandler.java | 91 ++++++++++++------- .../doris/service/FrontendServiceImpl.java | 13 +++ 4 files changed, 82 insertions(+), 37 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 7fe4556dec..726e86544a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -372,19 +372,21 @@ public class TabletInvertedIndex { return; } - // validate replica is active + // check cooldown replica is alive Map replicaMap = replicaMetaTable.row(beTabletInfo.getTabletId()); if (replicaMap.isEmpty()) { return; } - boolean replicaInvalid = true; + boolean replicaAlive = false; for (Replica replica : replicaMap.values()) { if (replica.getId() == cooldownConf.first) { - replicaInvalid = false; + if (replica.isAlive()) { + replicaAlive = true; + } break; } } - if (replicaInvalid) { + if (!replicaAlive) { CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(), tabletMeta.getPartitionId(), tabletMeta.getIndexId(), beTabletInfo.tablet_id, cooldownConf.second); synchronized (cooldownConfToUpdate) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java index f840a304fc..813521c6c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java @@ -72,8 +72,9 @@ public class CooldownConfHandler extends MasterDaemon { TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (CooldownConf conf : confToUpdate) { - // choose cooldown replica - List replicas = invertedIndex.getReplicas(conf.getTabletId()); + // choose cooldown replica from alive replicas + List replicas = invertedIndex.getReplicas(conf.getTabletId()).stream().filter(r -> r.isAlive()) + .collect(Collectors.toList()); if (replicas.isEmpty()) { continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 61c8d4e6cf..1f4a8715e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -849,18 +849,12 @@ public class ReportHandler extends Daemon { if (isBackendReplicaHealthy(backendTabletInfo)) { // if this tablet meta is still in invertedIndex. try to add it. // if add failed. delete this tablet from backend. - try { - tabletMeta = invertedIndex.getTabletMeta(tabletId); - if (tabletMeta != null) { - addReplica(tabletId, tabletMeta, backendTabletInfo, backendId); - // update counter - ++addToMetaCounter; - } else { - needDelete = true; - } - } catch (MetaNotFoundException e) { - LOG.debug("failed add to meta. tablet[{}], backend[{}]. {}", - tabletId, backendId, e.getMessage()); + tabletMeta = invertedIndex.getTabletMeta(tabletId); + if (tabletMeta != null && addReplica(tabletId, tabletMeta, backendTabletInfo, backendId)) { + // update counter + ++addToMetaCounter; + } else { + LOG.debug("failed add to meta. tablet[{}], backend[{}]", tabletId, backendId); needDelete = true; } } else { @@ -1053,8 +1047,9 @@ public class ReportHandler extends Daemon { AgentTaskExecutor.submit(batchTask); } - private static void addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo backendTabletInfo, long backendId) - throws MetaNotFoundException { + // return false if add replica failed + private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo backendTabletInfo, + long backendId) { long dbId = tabletMeta.getDbId(); long tableId = tabletMeta.getTableId(); long partitionId = tabletMeta.getPartitionId(); @@ -1066,50 +1061,62 @@ public class ReportHandler extends Daemon { long remoteDataSize = backendTabletInfo.getRemoteDataSize(); long rowCount = backendTabletInfo.getRowCount(); - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); - OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP); - olapTable.writeLockOrMetaException(); + Database db; + OlapTable olapTable; + try { + db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId); + olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP); + olapTable.writeLockOrMetaException(); + } catch (MetaNotFoundException e) { + LOG.warn(e); + return false; + } + try { Partition partition = olapTable.getPartition(partitionId); if (partition == null) { - throw new MetaNotFoundException("partition[" + partitionId + "] does not exist"); + LOG.warn("partition[{}] does not exist", partitionId); + return false; } ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()); MaterializedIndex materializedIndex = partition.getIndex(indexId); if (materializedIndex == null) { - throw new MetaNotFoundException("index[" + indexId + "] does not exist"); + LOG.warn("index[{}] does not exist", indexId); + return false; } Tablet tablet = materializedIndex.getTablet(tabletId); if (tablet == null) { - throw new MetaNotFoundException("tablet[" + tabletId + "] does not exist"); + LOG.warn("tablet[{}] does not exist", tabletId); + return false; } // check replica id long replicaId = backendTabletInfo.getReplicaId(); if (replicaId <= 0) { - throw new MetaNotFoundException("replica id is invalid"); + LOG.warn("replica id is invalid"); + return false; } long visibleVersion = partition.getVisibleVersion(); // check replica version if (version < visibleVersion) { - throw new MetaNotFoundException("version is invalid. tablet[" + version + "]" - + ", visible[" + visibleVersion + "]"); + LOG.warn("version is invalid. tablet[{}], visible[{}]", version, visibleVersion); + return false; } // check schema hash if (schemaHash != olapTable.getSchemaHashByIndexId(indexId)) { - throw new MetaNotFoundException("schema hash is diff[" + schemaHash + "-" - + olapTable.getSchemaHashByIndexId(indexId) + "]"); + LOG.warn("schema hash is diff[{}-{}]", schemaHash, olapTable.getSchemaHashByIndexId(indexId)); + return false; } // colocate table will delete Replica in meta when balance // but we need to rely on MetaNotFoundException to decide whether delete the tablet in backend if (Env.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { - return; + return true; } SystemInfoService infoService = Env.getCurrentSystemInfo(); @@ -1130,11 +1137,32 @@ public class ReportHandler extends Daemon { // just throw exception in this case if (version > partition.getNextVersion() - 1) { // this is a fatal error - throw new MetaNotFoundException("version is invalid. tablet[" + version + "]" - + ", partition's max version [" + (partition.getNextVersion() - 1) + "]"); + LOG.warn("version is invalid. tablet[{}], partition's max version [{}]", version, + partition.getNextVersion() - 1); + return false; } else if (version < partition.getCommittedVersion()) { lastFailedVersion = partition.getCommittedVersion(); } + + if (backendTabletInfo.isSetCooldownMetaId()) { + // replica has cooldowned data + do { + if (backendTabletInfo.getReplicaId() == tablet.getCooldownConf().first) { + // this replica is true cooldown replica, so replica's cooldowned data must not be deleted + break; + } + if (backendTabletInfo.getReplicaId() != backendTabletInfo.getCooldownReplicaId() + && Env.getCurrentInvertedIndex().getReplicas(tabletId).stream() + .anyMatch(r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) { + // this replica can not cooldown data, and shares same cooldowned data with others replica, + // so replica's cooldowned data must not be deleted + break; + } + LOG.warn("replica's cooldowned data may have been deleted"); + return false; + } while (false); + } + // use replicaId reported by BE to maintain replica meta consistent between FE and BE Replica replica = new Replica(replicaId, backendId, version, schemaHash, dataSize, remoteDataSize, rowCount, ReplicaState.NORMAL, @@ -1151,17 +1179,18 @@ public class ReportHandler extends Daemon { Env.getCurrentEnv().getEditLog().logAddReplica(info); LOG.info("add replica[{}-{}] to catalog. backend[{}]", tabletId, replicaId, backendId); + return true; } else { // replica is enough. check if this tablet is already in meta // (status changed between 'tabletReport()' and 'addReplica()') for (Replica replica : tablet.getReplicas()) { if (replica.getBackendId() == backendId) { // tablet is already in meta. return true - return; + return true; } } - throw new MetaNotFoundException( - "replica is enough[" + tablet.getReplicas().size() + "-" + replicaAlloc.toCreateStmt() + "]"); + LOG.warn("replica is enough[{}-{}]", tablet.getReplicas().size(), replicaAlloc.toCreateStmt()); + return false; } } finally { olapTable.writeUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index c1bce3100a..52341ebf45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -201,11 +201,14 @@ public class FrontendServiceImpl implements FrontendService.Iface { return; } Tablet tablet; + int replicaNum; try { OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId()) .getTable(tabletMeta.getTableId()) .get(); table.readLock(); + replicaNum = table.getPartitionInfo().getReplicaAllocation(tabletMeta.getPartitionId()) + .getTotalReplicaNum(); try { tablet = table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId()) .getTablet(info.tablet_id); @@ -225,7 +228,17 @@ public class FrontendServiceImpl implements FrontendService.Iface { } // check cooldownMetaId of all replicas are the same List replicas = Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id); + // FIXME(plat1ko): We only delete remote files when tablet is under a stable state: enough replicas and + // all replicas are alive. Are these conditions really sufficient or necessary? + if (replicas.size() < replicaNum) { + LOG.info("num replicas are not enough, tablet={}", info.tablet_id); + return; + } for (Replica replica : replicas) { + if (!replica.isAlive()) { + LOG.info("replica is not alive, tablet={}, replica={}", info.tablet_id, replica.getId()); + return; + } if (!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) { LOG.info("cooldown meta id are not same, tablet={}", info.tablet_id); return;