Fix some tablet scheduler bug (#644)
1. Replica in CLONE state should be deleted when schedule finished 2. Set replica as bad if the only remained replica is missing
This commit is contained in:
@ -684,7 +684,7 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL, olapTable.getState().name());
|
||||
|
||||
// process properties first
|
||||
// for now. properties has 2 options
|
||||
// for now. properties has 3 options
|
||||
// property 1. to specify short key column count.
|
||||
// eg.
|
||||
// "indexname1#short_key" = "3"
|
||||
@ -773,7 +773,7 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
bfFpp = 0;
|
||||
}
|
||||
|
||||
// property 3 storage type
|
||||
// property 3: storage type
|
||||
// from now on, we only support COLUMN storage type
|
||||
TStorageType newStorageType = TStorageType.COLUMN;
|
||||
|
||||
@ -1245,7 +1245,7 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema().entrySet()) {
|
||||
indexSchemaMap.put(entry.getKey(), new LinkedList<Column>(entry.getValue()));
|
||||
}
|
||||
// index name -> properties
|
||||
|
||||
Map<String, String> propertyMap = new HashMap<String, String>();
|
||||
for (AlterClause alterClause : alterClauses) {
|
||||
// get properties
|
||||
|
||||
@ -90,7 +90,7 @@ public class MetadataViewer {
|
||||
|
||||
ReplicaStatus status = ReplicaStatus.OK;
|
||||
Backend be = infoService.getBackend(replica.getBackendId());
|
||||
if (be == null || !be.isAvailable()) {
|
||||
if (be == null || !be.isAvailable() || replica.isBad()) {
|
||||
status = ReplicaStatus.DEAD;
|
||||
} else if (replica.getVersion() < visibleVersion
|
||||
|| replica.getLastFailedVersion() > 0) {
|
||||
|
||||
@ -556,8 +556,27 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
|
||||
if (cloneTask != null) {
|
||||
AgentTaskQueue.removeTask(cloneTask.getBackendId(), TTaskType.CLONE, cloneTask.getSignature());
|
||||
|
||||
// clear all CLONE replicas
|
||||
Database db = Catalog.getInstance().getDb(dbId);
|
||||
if (db != null) {
|
||||
db.writeLock();
|
||||
try {
|
||||
List<Replica> cloneReplicas = Lists.newArrayList();
|
||||
tablet.getReplicas().stream().filter(r -> r.getState() == ReplicaState.CLONE).forEach(r -> {
|
||||
cloneReplicas.add(r);
|
||||
});
|
||||
|
||||
for (Replica cloneReplica : cloneReplicas) {
|
||||
tablet.deleteReplica(cloneReplica);
|
||||
}
|
||||
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
reset();
|
||||
}
|
||||
|
||||
@ -763,11 +782,28 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
*/
|
||||
if (replica.getLastFailedVersion() == reportedTablet.getVersion()
|
||||
&& replica.getLastFailedVersionHash() != reportedTablet.getVersion_hash()) {
|
||||
// do not throw exception, cause we want this clone task retry again.
|
||||
throw new SchedException(Status.RUNNING_FAILED,
|
||||
"replica's last failed version equals to report version: "
|
||||
+ replica.getLastFailedTimestamp() + " but hash is different: "
|
||||
+ replica.getLastFailedVersionHash() + " vs. " + reportedTablet.getVersion_hash());
|
||||
|
||||
if (replica.getLastFailedVersion() == 2 && replica.getLastFailedVersionHash() == 0
|
||||
&& visibleVersion == 1 && visibleVersionHash == 0) {
|
||||
// this is a very tricky case.
|
||||
// the partitions's visible version is (1-0), and once there is a load job success in BE
|
||||
// but failed in FE. so in BE, the replica's version is (2-xx), and the clone task will
|
||||
// report (2-xx), which is not equal to what we set (2-0)
|
||||
// the version (2-xx) is delta version which need to be reverted. but because no more load
|
||||
// job being submitted, this delta version become a residual version.
|
||||
// we just let this pass
|
||||
LOG.warn("replica's last failed version equals to report version: "
|
||||
+ replica.getLastFailedTimestamp() + " but hash is different: "
|
||||
+ replica.getLastFailedVersionHash() + " vs. "
|
||||
+ reportedTablet.getVersion_hash() + ", but we let it pass.");
|
||||
} else {
|
||||
// do not throw exception, cause we want this clone task retry again.
|
||||
throw new SchedException(Status.RUNNING_FAILED,
|
||||
"replica's last failed version equals to report version: "
|
||||
+ replica.getLastFailedTimestamp() + " but hash is different: "
|
||||
+ replica.getLastFailedVersionHash() + " vs. "
|
||||
+ reportedTablet.getVersion_hash());
|
||||
}
|
||||
}
|
||||
|
||||
replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getVersion_hash(),
|
||||
|
||||
@ -353,7 +353,7 @@ public class TabletScheduler extends Daemon {
|
||||
if (e.getStatus() == Status.SCHEDULE_FAILED) {
|
||||
// if balance is disabled, remove this tablet
|
||||
if (tabletCtx.getType() == Type.BALANCE && Config.disable_balance) {
|
||||
removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED,
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED,
|
||||
"disable balance and " + e.getMessage());
|
||||
} else {
|
||||
// we must release resource it current hold, and be scheduled again
|
||||
@ -365,19 +365,19 @@ public class TabletScheduler extends Daemon {
|
||||
} else if (e.getStatus() == Status.FINISHED) {
|
||||
// schedule redundant tablet will throw this exception
|
||||
stat.counterTabletScheduledSucceeded.incrementAndGet();
|
||||
removeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getMessage());
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, e.getMessage());
|
||||
} else {
|
||||
Preconditions.checkState(e.getStatus() == Status.UNRECOVERABLE, e.getStatus());
|
||||
// discard
|
||||
stat.counterTabletScheduledDiscard.incrementAndGet();
|
||||
removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
|
||||
}
|
||||
continue;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("got unexpected exception, discard this schedule. tablet: {}",
|
||||
tabletCtx.getTabletId(), e);
|
||||
stat.counterTabletScheduledFailed.incrementAndGet();
|
||||
removeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage());
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage());
|
||||
}
|
||||
|
||||
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING);
|
||||
@ -848,16 +848,28 @@ public class TabletScheduler extends Daemon {
|
||||
addTablet(tabletCtx, true /* force */);
|
||||
}
|
||||
|
||||
private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, String reason) {
|
||||
private void finalizeTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state, String reason) {
|
||||
// use 2 steps to avoid nested database lock and synchronized.(releaseTabletCtx() may hold db lock)
|
||||
// remove the tablet ctx, so that no other process can see it
|
||||
removeTabletCtx(tabletCtx, reason);
|
||||
// release resources taken by tablet ctx
|
||||
releaseTabletCtx(tabletCtx, state);
|
||||
}
|
||||
|
||||
private void releaseTabletCtx(TabletSchedCtx tabletCtx, TabletSchedCtx.State state) {
|
||||
tabletCtx.setState(state);
|
||||
tabletCtx.releaseResource(this);
|
||||
tabletCtx.setFinishedTime(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String reason) {
|
||||
runningTablets.remove(tabletCtx.getTabletId());
|
||||
allTabletIds.remove(tabletCtx.getTabletId());
|
||||
schedHistory.add(tabletCtx);
|
||||
LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(), reason);
|
||||
}
|
||||
|
||||
|
||||
// get next batch of tablets from queue.
|
||||
private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
|
||||
List<TabletSchedCtx> list = Lists.newArrayList();
|
||||
@ -907,25 +919,25 @@ public class TabletScheduler extends Daemon {
|
||||
} else if (e.getStatus() == Status.UNRECOVERABLE) {
|
||||
// unrecoverable
|
||||
stat.counterTabletScheduledDiscard.incrementAndGet();
|
||||
removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
|
||||
return true;
|
||||
} else if (e.getStatus() == Status.FINISHED) {
|
||||
// tablet is already healthy, just remove
|
||||
removeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, e.getMessage());
|
||||
return true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("got unexpected exception when finish clone task. tablet: {}",
|
||||
tabletCtx.getTabletId(), e);
|
||||
stat.counterTabletScheduledDiscard.incrementAndGet();
|
||||
removeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage());
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage());
|
||||
return true;
|
||||
}
|
||||
|
||||
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.FINISHED);
|
||||
stat.counterCloneTaskSucceeded.incrementAndGet();
|
||||
gatherStatistics(tabletCtx);
|
||||
removeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished");
|
||||
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, "finished");
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -976,14 +988,22 @@ public class TabletScheduler extends Daemon {
|
||||
*
|
||||
* If task is timeout, remove the tablet.
|
||||
*/
|
||||
public synchronized void handleRunningTablets() {
|
||||
public void handleRunningTablets() {
|
||||
// 1. remove the tablet ctx if timeout
|
||||
List<TabletSchedCtx> timeoutTablets = Lists.newArrayList();
|
||||
runningTablets.values().stream().filter(t -> t.isTimeout()).forEach(t -> {
|
||||
timeoutTablets.add(t);
|
||||
});
|
||||
synchronized (this) {
|
||||
runningTablets.values().stream().filter(t -> t.isTimeout()).forEach(t -> {
|
||||
timeoutTablets.add(t);
|
||||
});
|
||||
|
||||
for (TabletSchedCtx tabletSchedCtx : timeoutTablets) {
|
||||
removeTabletCtx(tabletSchedCtx, "timeout");
|
||||
}
|
||||
}
|
||||
|
||||
// 2. release ctx
|
||||
timeoutTablets.stream().forEach(t -> {
|
||||
removeTabletCtx(t, TabletSchedCtx.State.TIMEOUT, "timeout");
|
||||
releaseTabletCtx(t, TabletSchedCtx.State.CANCELLED);
|
||||
stat.counterCloneTaskTimeout.incrementAndGet();
|
||||
});
|
||||
}
|
||||
|
||||
@ -29,16 +29,16 @@ import java.util.List;
|
||||
|
||||
public class IncompleteTabletsProcNode implements ProcNodeInterface {
|
||||
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
|
||||
.add("IncompleteTablets").add("InconsistentTablets")
|
||||
.add("UnhealthyTablets").add("InconsistentTablets")
|
||||
.build();
|
||||
|
||||
private static final Joiner JOINER = Joiner.on(",");
|
||||
|
||||
Collection<Long> incompleteTabletIds;
|
||||
Collection<Long> unhealthyTabletIds;
|
||||
Collection<Long> inconsistentTabletIds;
|
||||
|
||||
public IncompleteTabletsProcNode(Collection<Long> incompleteTabletIds, Collection<Long> inconsistentTabletIds) {
|
||||
this.incompleteTabletIds = incompleteTabletIds;
|
||||
public IncompleteTabletsProcNode(Collection<Long> unhealthyTabletIds, Collection<Long> inconsistentTabletIds) {
|
||||
this.unhealthyTabletIds = unhealthyTabletIds;
|
||||
this.inconsistentTabletIds = inconsistentTabletIds;
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ public class IncompleteTabletsProcNode implements ProcNodeInterface {
|
||||
|
||||
List<String> row = new ArrayList<String>(1);
|
||||
|
||||
String incompleteTablets = JOINER.join(Arrays.asList(incompleteTabletIds));
|
||||
String incompleteTablets = JOINER.join(Arrays.asList(unhealthyTabletIds));
|
||||
String inconsistentTablets = JOINER.join(Arrays.asList(inconsistentTabletIds));
|
||||
row.add(incompleteTablets);
|
||||
row.add(inconsistentTablets);
|
||||
|
||||
@ -231,8 +231,6 @@ public class ReportHandler extends Daemon {
|
||||
ListMultimap<Long, TPartitionVersionInfo> transactionsToPublish = LinkedListMultimap.create();
|
||||
ListMultimap<Long, Long> transactionsToClear = LinkedListMultimap.create();
|
||||
|
||||
List<CreateReplicaTask> createReplicaTasks = Lists.newArrayList();
|
||||
|
||||
// db id -> tablet id
|
||||
ListMultimap<Long, Long> tabletRecoveryMap = LinkedListMultimap.create();
|
||||
|
||||
@ -252,7 +250,7 @@ public class ReportHandler extends Daemon {
|
||||
|
||||
// 3. delete (meta - be)
|
||||
// BE will automatically drop defective tablets. these tablets should also be dropped in catalog
|
||||
deleteFromMeta(tabletDeleteFromMeta, backendId, backendReportVersion, createReplicaTasks);
|
||||
deleteFromMeta(tabletDeleteFromMeta, backendId, backendReportVersion, forceRecovery);
|
||||
|
||||
// 4. handle (be - meta)
|
||||
deleteFromBackend(backendTablets, foundTabletsWithValidSchema, foundTabletsWithInvalidSchema, backendId);
|
||||
@ -452,8 +450,8 @@ public class ReportHandler extends Daemon {
|
||||
}
|
||||
|
||||
private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta, long backendId,
|
||||
long backendReportVersion,
|
||||
List<CreateReplicaTask> createReplicaTasks) {
|
||||
long backendReportVersion, boolean forceRecovery) {
|
||||
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
|
||||
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
|
||||
for (Long dbId : tabletDeleteFromMeta.keySet()) {
|
||||
Database db = Catalog.getInstance().getDb(dbId);
|
||||
@ -511,23 +509,42 @@ public class ReportHandler extends Daemon {
|
||||
LOG.error("backend [{}] invalid situation. tablet[{}] has few replica[{}], "
|
||||
+ "replica num setting is [{}]",
|
||||
backendId, tabletId, replicas.size(), replicationNum);
|
||||
// there is a replica in fe, but not in be and there is only one replica in this tablet
|
||||
// in this case, it means data is lost
|
||||
// should generate a create replica request to be to create a replica forcibly
|
||||
// there is a replica in FE, but not in BE and there is only one replica in this tablet
|
||||
// in this case, it means data is lost.
|
||||
// should generate a create replica request to BE to create a replica forcibly.
|
||||
if (replicas.size() == 1) {
|
||||
short shortKeyColumnCount = olapTable.getShortKeyColumnCountByIndexId(indexId);
|
||||
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
|
||||
KeysType keysType = olapTable.getKeysType();
|
||||
List<Column> columns = olapTable.getSchemaByIndexId(indexId);
|
||||
Set<String> bfColumns = olapTable.getCopiedBfColumns();
|
||||
double bfFpp = olapTable.getBfFpp();
|
||||
CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId,
|
||||
tableId, partitionId, indexId, tabletId, shortKeyColumnCount,
|
||||
schemaHash, partition.getVisibleVersion(),
|
||||
partition.getVisibleVersionHash(), keysType,
|
||||
TStorageType.COLUMN,
|
||||
TStorageMedium.HDD, columns, bfColumns, bfFpp, null);
|
||||
createReplicaTasks.add(createReplicaTask);
|
||||
if (forceRecovery) {
|
||||
// only create this task if force recovery is true
|
||||
LOG.warn("tablet {} has only one replica {} on backend {}"
|
||||
+ "and it is lost. create an empty replica to recover it",
|
||||
tabletId, replica.getId(), backendId);
|
||||
short shortKeyColumnCount = olapTable.getShortKeyColumnCountByIndexId(indexId);
|
||||
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
|
||||
KeysType keysType = olapTable.getKeysType();
|
||||
List<Column> columns = olapTable.getSchemaByIndexId(indexId);
|
||||
Set<String> bfColumns = olapTable.getCopiedBfColumns();
|
||||
double bfFpp = olapTable.getBfFpp();
|
||||
CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId,
|
||||
tableId, partitionId, indexId, tabletId, shortKeyColumnCount,
|
||||
schemaHash, partition.getVisibleVersion(),
|
||||
partition.getVisibleVersionHash(), keysType,
|
||||
TStorageType.COLUMN,
|
||||
TStorageMedium.HDD, columns, bfColumns, bfFpp, null);
|
||||
createReplicaBatchTask.addTask(createReplicaTask);
|
||||
} else {
|
||||
// just set this replica as bad
|
||||
if (replica.setBad(true)) {
|
||||
LOG.warn("tablet {} has only one replica {} on backend {}"
|
||||
+ "and it is lost, set it as bad",
|
||||
tabletId, replica.getId(), backendId);
|
||||
BackendTabletsInfo tabletsInfo = new BackendTabletsInfo(backendId);
|
||||
tabletsInfo.setBad(true);
|
||||
tabletsInfo.addTabletWithSchemaHash(tabletId,
|
||||
olapTable.getSchemaHashByIndexId(indexId));
|
||||
Catalog.getInstance().getEditLog().logBackendTabletsInfo(tabletsInfo);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@ -567,6 +584,10 @@ public class ReportHandler extends Daemon {
|
||||
db.writeUnlock();
|
||||
}
|
||||
} // end for dbs
|
||||
|
||||
if (forceRecovery && createReplicaBatchTask.getTaskNum() > 0) {
|
||||
AgentTaskExecutor.submit(createReplicaBatchTask);
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteFromBackend(Map<Long, TTablet> backendTablets,
|
||||
|
||||
Reference in New Issue
Block a user