diff --git a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index d70994da38..3391f8ba77 100644 --- a/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -684,7 +684,7 @@ public class SchemaChangeHandler extends AlterHandler { Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL, olapTable.getState().name()); // process properties first - // for now. properties has 2 options + // for now. properties has 3 options // property 1. to specify short key column count. // eg. // "indexname1#short_key" = "3" @@ -773,7 +773,7 @@ public class SchemaChangeHandler extends AlterHandler { bfFpp = 0; } - // property 3 storage type + // property 3: storage type // from now on, we only support COLUMN storage type TStorageType newStorageType = TStorageType.COLUMN; @@ -1245,7 +1245,7 @@ public class SchemaChangeHandler extends AlterHandler { for (Map.Entry> entry : olapTable.getIndexIdToSchema().entrySet()) { indexSchemaMap.put(entry.getKey(), new LinkedList(entry.getValue())); } - // index name -> properties + Map propertyMap = new HashMap(); for (AlterClause alterClause : alterClauses) { // get properties diff --git a/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java index 0e91b80189..05e56e8560 100644 --- a/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -90,7 +90,7 @@ public class MetadataViewer { ReplicaStatus status = ReplicaStatus.OK; Backend be = infoService.getBackend(replica.getBackendId()); - if (be == null || !be.isAvailable()) { + if (be == null || !be.isAvailable() || replica.isBad()) { status = ReplicaStatus.DEAD; } else if (replica.getVersion() < visibleVersion || replica.getLastFailedVersion() > 0) { diff --git a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index e93c2c5660..73e5279c74 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -556,8 +556,27 @@ public class TabletSchedCtx implements Comparable { if (cloneTask != null) { AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature()); + + // clear all CLONE replicas + Database db = Catalog.getInstance().getDb(dbId); + if (db != null) { + db.writeLock(); + try { + List cloneReplicas = Lists.newArrayList(); + tablet.getReplicas().stream().filter(r -> r.getState() == ReplicaState.CLONE).forEach(r -> { + cloneReplicas.add(r); + }); + + for (Replica cloneReplica : cloneReplicas) { + tablet.deleteReplica(cloneReplica); + } + + } finally { + db.writeUnlock(); + } + } } - + reset(); } @@ -763,11 +782,28 @@ public class TabletSchedCtx implements Comparable { */ if (replica.getLastFailedVersion() == reportedTablet.getVersion() && replica.getLastFailedVersionHash() != reportedTablet.getVersion_hash()) { - // do not throw exception, cause we want this clone task retry again. - throw new SchedException(Status.RUNNING_FAILED, - "replica's last failed version equals to report version: " - + replica.getLastFailedTimestamp() + " but hash is different: " - + replica.getLastFailedVersionHash() + " vs. " + reportedTablet.getVersion_hash()); + + if (replica.getLastFailedVersion() == 2 && replica.getLastFailedVersionHash() == 0 + && visibleVersion == 1 && visibleVersionHash == 0) { + // this is a very tricky case. + // the partitions's visible version is (1-0), and once there is a load job success in BE + // but failed in FE. so in BE, the replica's version is (2-xx), and the clone task will + // report (2-xx), which is not equal to what we set (2-0) + // the version (2-xx) is delta version which need to be reverted. but because no more load + // job being submitted, this delta version become a residual version. + // we just let this pass + LOG.warn("replica's last failed version equals to report version: " + + replica.getLastFailedTimestamp() + " but hash is different: " + + replica.getLastFailedVersionHash() + " vs. " + + reportedTablet.getVersion_hash() + ", but we let it pass."); + } else { + // do not throw exception, cause we want this clone task retry again. + throw new SchedException(Status.RUNNING_FAILED, + "replica's last failed version equals to report version: " + + replica.getLastFailedTimestamp() + " but hash is different: " + + replica.getLastFailedVersionHash() + " vs. " + + reportedTablet.getVersion_hash()); + } } replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getVersion_hash(), diff --git a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java index 91e61242d5..ec2425552b 100644 --- a/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -353,7 +353,7 @@ public class TabletScheduler extends Daemon { if (e.getStatus() == Status.SCHEDULE_FAILED) { // if balance is disabled, remove this tablet if (tabletCtx.getType() == Type.BALANCE && Config.disable_balance) { - removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, "disable balance and " + e.getMessage()); } else { // we must release resource it current hold, and be scheduled again @@ -365,19 +365,19 @@ public class TabletScheduler extends Daemon { } else if (e.getStatus() == Status.FINISHED) { // schedule redundant tablet will throw this exception stat.counterTabletScheduledSucceeded.incrementAndGet(); - removeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getMessage()); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getMessage()); } else { Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus()); // discard stat.counterTabletScheduledDiscard.incrementAndGet(); - removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); } continue; } catch (Exception e) { LOG.warn("got unexpected exception, discard this schedule. tablet: {}", tabletCtx.getTabletId(), e); stat.counterTabletScheduledFailed.incrementAndGet(); - removeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage()); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage()); } Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING); @@ -848,16 +848,28 @@ public class TabletScheduler extends Daemon { addTablet(tabletCtx, true /* force */); } - private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, String reason) { + private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, String reason) { + // use 2 steps to avoid nested database lock and synchronized.(releaseTabletCtx() may hold db lock) + // remove the tablet ctx, so that no other process can see it + removeTabletCtx(tabletCtx, reason); + // release resources taken by tablet ctx + releaseTabletCtx(tabletCtx, state); + } + + private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state) { tabletCtx.setState(state); tabletCtx.releaseResource(this); tabletCtx.setFinishedTime(System.currentTimeMillis()); + } + + private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String reason) { runningTablets.remove(tabletCtx.getTabletId()); allTabletIds.remove(tabletCtx.getTabletId()); schedHistory.add(tabletCtx); LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason); } + // get next batch of tablets from queue. private synchronized List getNextTabletCtxBatch() { List list = Lists.newArrayList(); @@ -907,25 +919,25 @@ public class TabletScheduler extends Daemon { } else if (e.getStatus() == Status.UNRECOVERABLE) { // unrecoverable stat.counterTabletScheduledDiscard.incrementAndGet(); - removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); return true; } else if (e.getStatus() == Status.FINISHED) { // tablet is already healthy, just remove - removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage()); return true; } } catch (Exception e) { LOG.warn("got unexpected exception when finish clone task. tablet: {}", tabletCtx.getTabletId(), e); stat.counterTabletScheduledDiscard.incrementAndGet(); - removeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage()); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage()); return true; } Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.FINISHED); stat.counterCloneTaskSucceeded.incrementAndGet(); gatherStatistics(tabletCtx); - removeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished"); + finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished"); return true; } @@ -976,14 +988,22 @@ public class TabletScheduler extends Daemon { * * If task is timeout, remove the tablet. */ - public synchronized void handleRunningTablets() { + public void handleRunningTablets() { + // 1. remove the tablet ctx if timeout List timeoutTablets = Lists.newArrayList(); - runningTablets.values().stream().filter(t -> t.isTimeout()).forEach(t -> { - timeoutTablets.add(t); - }); + synchronized (this) { + runningTablets.values().stream().filter(t -> t.isTimeout()).forEach(t -> { + timeoutTablets.add(t); + }); + + for (TabletSchedCtx tabletSchedCtx : timeoutTablets) { + removeTabletCtx(tabletSchedCtx, "timeout"); + } + } + // 2. release ctx timeoutTablets.stream().forEach(t -> { - removeTabletCtx(t, TabletSchedCtx.State.TIMEOUT, "timeout"); + releaseTabletCtx(t, TabletSchedCtx.State.CANCELLED); stat.counterCloneTaskTimeout.incrementAndGet(); }); } diff --git a/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java index c27c00aad4..f0f89a3b71 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/IncompleteTabletsProcNode.java @@ -29,16 +29,16 @@ import java.util.List; public class IncompleteTabletsProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("IncompleteTablets").add("InconsistentTablets") + .add("UnhealthyTablets").add("InconsistentTablets") .build(); private static final Joiner JOINER = Joiner.on(","); - Collection incompleteTabletIds; + Collection unhealthyTabletIds; Collection inconsistentTabletIds; - public IncompleteTabletsProcNode(Collection incompleteTabletIds, Collection inconsistentTabletIds) { - this.incompleteTabletIds = incompleteTabletIds; + public IncompleteTabletsProcNode(Collection unhealthyTabletIds, Collection inconsistentTabletIds) { + this.unhealthyTabletIds = unhealthyTabletIds; this.inconsistentTabletIds = inconsistentTabletIds; } @@ -50,7 +50,7 @@ public class IncompleteTabletsProcNode implements ProcNodeInterface { List row = new ArrayList(1); - String incompleteTablets = JOINER.join(Arrays.asList(incompleteTabletIds)); + String incompleteTablets = JOINER.join(Arrays.asList(unhealthyTabletIds)); String inconsistentTablets = JOINER.join(Arrays.asList(inconsistentTabletIds)); row.add(incompleteTablets); row.add(inconsistentTablets); diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index d47cd147f9..548cfabd57 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -231,8 +231,6 @@ public class ReportHandler extends Daemon { ListMultimap transactionsToPublish = LinkedListMultimap.create(); ListMultimap transactionsToClear = LinkedListMultimap.create(); - List createReplicaTasks = Lists.newArrayList(); - // db id -> tablet id ListMultimap tabletRecoveryMap = LinkedListMultimap.create(); @@ -252,7 +250,7 @@ public class ReportHandler extends Daemon { // 3. delete (meta - be) // BE will automatically drop defective tablets. these tablets should also be dropped in catalog - deleteFromMeta(tabletDeleteFromMeta, backendId, backendReportVersion, createReplicaTasks); + deleteFromMeta(tabletDeleteFromMeta, backendId, backendReportVersion, forceRecovery); // 4. handle (be - meta) deleteFromBackend(backendTablets, foundTabletsWithValidSchema, foundTabletsWithInvalidSchema, backendId); @@ -452,8 +450,8 @@ public class ReportHandler extends Daemon { } private static void deleteFromMeta(ListMultimap tabletDeleteFromMeta, long backendId, - long backendReportVersion, - List createReplicaTasks) { + long backendReportVersion, boolean forceRecovery) { + AgentBatchTask createReplicaBatchTask = new AgentBatchTask(); TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); for (Long dbId : tabletDeleteFromMeta.keySet()) { Database db = Catalog.getInstance().getDb(dbId); @@ -511,23 +509,42 @@ public class ReportHandler extends Daemon { LOG.error("backend [{}] invalid situation. tablet[{}] has few replica[{}], " + "replica num setting is [{}]", backendId, tabletId, replicas.size(), replicationNum); - // there is a replica in fe, but not in be and there is only one replica in this tablet - // in this case, it means data is lost - // should generate a create replica request to be to create a replica forcibly + // there is a replica in FE, but not in BE and there is only one replica in this tablet + // in this case, it means data is lost. + // should generate a create replica request to BE to create a replica forcibly. if (replicas.size() == 1) { - short shortKeyColumnCount = olapTable.getShortKeyColumnCountByIndexId(indexId); - int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - KeysType keysType = olapTable.getKeysType(); - List columns = olapTable.getSchemaByIndexId(indexId); - Set bfColumns = olapTable.getCopiedBfColumns(); - double bfFpp = olapTable.getBfFpp(); - CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId, - tableId, partitionId, indexId, tabletId, shortKeyColumnCount, - schemaHash, partition.getVisibleVersion(), - partition.getVisibleVersionHash(), keysType, - TStorageType.COLUMN, - TStorageMedium.HDD, columns, bfColumns, bfFpp, null); - createReplicaTasks.add(createReplicaTask); + if (forceRecovery) { + // only create this task if force recovery is true + LOG.warn("tablet {} has only one replica {} on backend {}" + + "and it is lost. create an empty replica to recover it", + tabletId, replica.getId(), backendId); + short shortKeyColumnCount = olapTable.getShortKeyColumnCountByIndexId(indexId); + int schemaHash = olapTable.getSchemaHashByIndexId(indexId); + KeysType keysType = olapTable.getKeysType(); + List columns = olapTable.getSchemaByIndexId(indexId); + Set bfColumns = olapTable.getCopiedBfColumns(); + double bfFpp = olapTable.getBfFpp(); + CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId, + tableId, partitionId, indexId, tabletId, shortKeyColumnCount, + schemaHash, partition.getVisibleVersion(), + partition.getVisibleVersionHash(), keysType, + TStorageType.COLUMN, + TStorageMedium.HDD, columns, bfColumns, bfFpp, null); + createReplicaBatchTask.addTask(createReplicaTask); + } else { + // just set this replica as bad + if (replica.setBad(true)) { + LOG.warn("tablet {} has only one replica {} on backend {}" + + "and it is lost, set it as bad", + tabletId, replica.getId(), backendId); + BackendTabletsInfo tabletsInfo = new BackendTabletsInfo(backendId); + tabletsInfo.setBad(true); + tabletsInfo.addTabletWithSchemaHash(tabletId, + olapTable.getSchemaHashByIndexId(indexId)); + Catalog.getInstance().getEditLog().logBackendTabletsInfo(tabletsInfo); + } + + } } continue; } @@ -567,6 +584,10 @@ public class ReportHandler extends Daemon { db.writeUnlock(); } } // end for dbs + + if (forceRecovery && createReplicaBatchTask.getTaskNum() > 0) { + AgentTaskExecutor.submit(createReplicaBatchTask); + } } private static void deleteFromBackend(Map backendTablets,