[fix] Fix the bug that data balance causes tablet loss (#9971)

1. Provide a FE conf to test the reliability in single replica case when tablet scheduling are frequent.
2. According to #6063, almost apply this fix on current code.
This commit is contained in:
plat1ko
2022-06-15 09:52:56 +08:00
committed by GitHub
parent 7ab64f9155
commit f4e2f78a1a
40 changed files with 216 additions and 155 deletions

View File

@ -219,13 +219,14 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
List<Replica> rollupReplicas = rollupTablet.getReplicas();
for (Replica rollupReplica : rollupReplicas) {
long backendId = rollupReplica.getBackendId();
long rollupReplicaId = rollupReplica.getId();
Preconditions.checkNotNull(tabletIdMap.get(rollupTabletId)); // baseTabletId
countDownLatch.addMark(backendId, rollupTabletId);
// create replica with version 1.
// version will be updated by following load process, or when rollup task finished.
CreateReplicaTask createReplicaTask = new CreateReplicaTask(
backendId, dbId, tableId, partitionId, rollupIndexId, rollupTabletId,
rollupShortKeyColumnCount, rollupSchemaHash,
rollupReplicaId, rollupShortKeyColumnCount, rollupSchemaHash,
Partition.PARTITION_INIT_VERSION,
rollupKeysType, TStorageType.COLUMN, storageMedium,
rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch,

View File

@ -245,10 +245,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
List<Replica> shadowReplicas = shadowTablet.getReplicas();
for (Replica shadowReplica : shadowReplicas) {
long backendId = shadowReplica.getBackendId();
long shadowReplicaId = shadowReplica.getId();
countDownLatch.addMark(backendId, shadowTabletId);
CreateReplicaTask createReplicaTask = new CreateReplicaTask(
backendId, dbId, tableId, partitionId, shadowIdxId, shadowTabletId,
shadowShortKeyColumnCount, shadowSchemaHash,
shadowReplicaId, shadowShortKeyColumnCount, shadowSchemaHash,
Partition.PARTITION_INIT_VERSION,
originKeysType, TStorageType.COLUMN, storageMedium,
shadowSchema, bfColumns, bfFpp, countDownLatch, indexes,

View File

@ -957,7 +957,7 @@ public class RestoreJob extends AbstractJob {
Catalog.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
CreateReplicaTask task = new CreateReplicaTask(restoreReplica.getBackendId(), dbId,
localTbl.getId(), restorePart.getId(), restoredIdx.getId(),
restoreTablet.getId(), indexMeta.getShortKeyColumnCount(),
restoreTablet.getId(), restoreReplica.getId(), indexMeta.getShortKeyColumnCount(),
indexMeta.getSchemaHash(), restoreReplica.getVersion(),
indexMeta.getKeysType(), TStorageType.COLUMN,
TStorageMedium.HDD /* all restored replicas will be saved to HDD */,

View File

@ -4807,7 +4807,8 @@ public class Catalog {
List<Replica> replicas = tablet.getReplicas();
for (Replica replica : replicas) {
long backendId = replica.getBackendId();
DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, schemaHash);
long replicaId = replica.getId();
DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, replicaId, schemaHash);
batchTask.addTask(dropTask);
} // end for replicas
} // end for tablets

View File

@ -125,7 +125,7 @@ public class PartitionInfo implements Writable {
}
public PartitionItem handleNewSinglePartitionDesc(SinglePartitionDesc desc,
long partitionId, boolean isTemp) throws DdlException {
long partitionId, boolean isTemp) throws DdlException {
Preconditions.checkArgument(desc.isAnalyzed());
PartitionItem partitionItem = createAndCheckPartitionItem(desc, isTemp);
setItemInternal(partitionId, isTemp, partitionItem);

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
@ -282,7 +283,7 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
if (!clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), beStat.getBeId(),
if (!Config.be_rebalancer_fuzzy_test && !clusterStat.isMoreBalanced(tabletCtx.getSrcBackendId(), beStat.getBeId(),
tabletCtx.getTabletId(), tabletCtx.getTabletSize(), tabletCtx.getStorageMedium())) {
continue;
}

View File

@ -168,7 +168,7 @@ public class ClusterLoadStatistic {
continue;
}
if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / avgLoadScore > Config.balance_load_score_threshold) {
if (Config.be_rebalancer_fuzzy_test) {
if (beStat.getLoadScore(medium) > avgLoadScore) {
beStat.setClazz(medium, Classification.HIGH);
highCounter++;
@ -177,8 +177,19 @@ public class ClusterLoadStatistic {
lowCounter++;
}
} else {
beStat.setClazz(medium, Classification.MID);
midCounter++;
if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / avgLoadScore
> Config.balance_load_score_threshold) {
if (beStat.getLoadScore(medium) > avgLoadScore) {
beStat.setClazz(medium, Classification.HIGH);
highCounter++;
} else if (beStat.getLoadScore(medium) < avgLoadScore) {
beStat.setClazz(medium, Classification.LOW);
lowCounter++;
}
} else {
beStat.setClazz(medium, Classification.MID);
midCounter++;
}
}
}

View File

@ -813,7 +813,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
Backend destBe = infoService.getBackend(destBackendId);
if (destBe == null) {
throw new SchedException(Status.SCHEDULE_FAILED,
"dest backend " + srcReplica.getBackendId() + " does not exist");
"dest backend " + destBackendId + " does not exist");
}
taskTimeoutMs = getApproximateTimeoutMs();
@ -828,11 +828,6 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
// another clone task.
// That is, we may need to use 2 clone tasks to create a new replica. It is inefficient,
// but there is no other way now.
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId,
tabletId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
visibleVersion, (int) (taskTimeoutMs / 1000));
cloneTask.setPathHash(srcPathHash, destPathHash);
// if this is a balance task, or this is a repair task with REPLICA_MISSING/REPLICA_RELOCATING or REPLICA_MISSING_IN_CLUSTER,
// we create a new replica with state CLONE
@ -847,6 +842,12 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
committedVersion, /* use committed version as last failed version */
-1 /* last success version */);
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId,
tabletId, cloneReplica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
visibleVersion, (int) (taskTimeoutMs / 1000));
cloneTask.setPathHash(srcPathHash, destPathHash);
// addReplica() method will add this replica to tablet inverted index too.
tablet.addReplica(cloneReplica);
} else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
@ -861,6 +862,12 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
throw new SchedException(Status.SCHEDULE_FAILED, "dest replica's path hash is changed. "
+ "current: " + replica.getPathHash() + ", scheduled: " + destPathHash);
}
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId, indexId,
tabletId, replica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
visibleVersion, (int) (taskTimeoutMs / 1000));
cloneTask.setPathHash(srcPathHash, destPathHash);
}
this.state = State.RUNNING;

View File

@ -1135,7 +1135,8 @@ public class TabletScheduler extends MasterDaemon {
// NOTICE: only delete the replica from meta may not work. sometimes we can depend on tablet report
// deleting these replicas, but in FORCE_REDUNDANT case, replica may be added to meta again in report
// process.
sendDeleteReplicaTask(replica.getBackendId(), tabletCtx.getTabletId(), tabletCtx.getSchemaHash());
sendDeleteReplicaTask(replica.getBackendId(), tabletCtx.getTabletId(), replica.getId(),
tabletCtx.getSchemaHash());
}
// write edit log
@ -1152,8 +1153,8 @@ public class TabletScheduler extends MasterDaemon {
tabletCtx.getTabletId(), replica.getBackendId(), reason, force);
}
private void sendDeleteReplicaTask(long backendId, long tabletId, int schemaHash) {
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, schemaHash);
private void sendDeleteReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) {
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, replicaId, schemaHash);
AgentBatchTask batchTask = new AgentBatchTask();
batchTask.addTask(task);
AgentTaskExecutor.submit(batchTask);

View File

@ -1648,4 +1648,14 @@ public class Config extends ConfigBase {
@ConfField(mutable = false, masterOnly = true)
public static boolean enable_multi_catalog = false; // 1 min
/**
* If set to TRUE, FE will:
* 1. divide BE into high load and low load(no mid load) to force triggering tablet scheduling;
* 2. ignore whether the cluster can be more balanced during tablet scheduling;
*
* It's used to test the reliability in single replica case when tablet scheduling are frequent.
* Default is false.
*/
@ConfField(mutable = false, masterOnly = true)
public static boolean be_rebalancer_fuzzy_test = false;
}

View File

@ -1638,9 +1638,10 @@ public class InternalDataSource implements DataSourceIf {
long tabletId = tablet.getId();
for (Replica replica : tablet.getReplicas()) {
long backendId = replica.getBackendId();
long replicaId = replica.getId();
countDownLatch.addMark(backendId, tabletId);
CreateReplicaTask task =
new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId,
new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId, tabletId, replicaId,
shortKeyColumnCount, schemaHash, version, keysType, storageType, storageMedium,
schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType,
dataSortInfo, compressionType);

View File

@ -594,7 +594,8 @@ public class ReportHandler extends Daemon {
Set<String> bfColumns = olapTable.getCopiedBfColumns();
double bfFpp = olapTable.getBfFpp();
CreateReplicaTask createReplicaTask = new CreateReplicaTask(backendId, dbId,
tableId, partitionId, indexId, tabletId, indexMeta.getShortKeyColumnCount(),
tableId, partitionId, indexId, tabletId, replica.getId(),
indexMeta.getShortKeyColumnCount(),
indexMeta.getSchemaHash(), partition.getVisibleVersion(),
indexMeta.getKeysType(),
TStorageType.COLUMN,
@ -692,7 +693,9 @@ public class ReportHandler extends Daemon {
if (needDelete) {
// drop replica
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, backendTabletInfo.getSchemaHash());
long replicaId = backendTabletInfo.getReplicaId();
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, replicaId,
backendTabletInfo.getSchemaHash());
batchTask.addTask(task);
LOG.warn("delete tablet[" + tabletId + "] from backend[" + backendId + "] because not found in meta");
++deleteFromBackendCounter;

View File

@ -30,6 +30,7 @@ public class CloneTask extends AgentTask {
public static final int VERSION_2 = 2;
private int schemaHash;
private long replicaId;
private List<TBackend> srcBackends;
private TStorageMedium storageMedium;
@ -42,10 +43,11 @@ public class CloneTask extends AgentTask {
private int taskVersion = VERSION_1;
public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId,
long tabletId, int schemaHash, List<TBackend> srcBackends, TStorageMedium storageMedium,
long visibleVersion, int timeoutS) {
public CloneTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
long replicaId, int schemaHash, List<TBackend> srcBackends, TStorageMedium storageMedium,
long visibleVersion, int timeoutS) {
super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId, indexId, tabletId);
this.replicaId = replicaId;
this.schemaHash = schemaHash;
this.srcBackends = srcBackends;
this.storageMedium = storageMedium;
@ -77,6 +79,7 @@ public class CloneTask extends AgentTask {
public TCloneReq toThrift() {
TCloneReq request = new TCloneReq(tabletId, schemaHash, srcBackends);
request.setReplicaId(replicaId);
request.setStorageMedium(storageMedium);
request.setCommittedVersion(visibleVersion);
request.setTaskVersion(taskVersion);
@ -92,10 +95,12 @@ public class CloneTask extends AgentTask {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("tablet id: ").append(tabletId).append(", schema hash: ").append(schemaHash);
sb.append("tablet id: ").append(tabletId).append(", replica id: ").append(replicaId).append(", schema hash: ")
.append(schemaHash);
sb.append(", storageMedium: ").append(storageMedium.name());
sb.append(", visible version(hash): ").append(visibleVersion);
sb.append(", src backend: ").append(srcBackends.get(0).getHost()).append(", src path hash: ").append(srcPathHash);
sb.append(", src backend: ").append(srcBackends.get(0).getHost()).append(", src path hash: ")
.append(srcPathHash);
sb.append(", dest backend: ").append(backendId).append(", dest path hash: ").append(destPathHash);
return sb.toString();
}

View File

@ -47,6 +47,7 @@ import java.util.Set;
public class CreateReplicaTask extends AgentTask {
private static final Logger LOG = LogManager.getLogger(CreateReplicaTask.class);
private long replicaId;
private short shortKeyColumnCount;
private int schemaHash;
@ -89,7 +90,7 @@ public class CreateReplicaTask extends AgentTask {
private DataSortInfo dataSortInfo;
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
short shortKeyColumnCount, int schemaHash, long version,
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
KeysType keysType, TStorageType storageType,
TStorageMedium storageMedium, List<Column> columns,
Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch,
@ -98,6 +99,7 @@ public class CreateReplicaTask extends AgentTask {
TTabletType tabletType, TCompressionType compressionType) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
this.replicaId = replicaId;
this.shortKeyColumnCount = shortKeyColumnCount;
this.schemaHash = schemaHash;
@ -121,7 +123,7 @@ public class CreateReplicaTask extends AgentTask {
}
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
short shortKeyColumnCount, int schemaHash, long version,
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
KeysType keysType, TStorageType storageType,
TStorageMedium storageMedium, List<Column> columns,
Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch,
@ -132,6 +134,7 @@ public class CreateReplicaTask extends AgentTask {
TCompressionType compressionType) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
this.replicaId = replicaId;
this.shortKeyColumnCount = shortKeyColumnCount;
this.schemaHash = schemaHash;
@ -261,6 +264,7 @@ public class CreateReplicaTask extends AgentTask {
}
createTabletReq.setTableId(tableId);
createTabletReq.setPartitionId(partitionId);
createTabletReq.setReplicaId(replicaId);
if (baseTabletId != -1) {
createTabletReq.setBaseTabletId(baseTabletId);

View File

@ -22,10 +22,12 @@ import org.apache.doris.thrift.TTaskType;
public class DropReplicaTask extends AgentTask {
private int schemaHash; // set -1L as unknown
private long replicaId;
public DropReplicaTask(long backendId, long tabletId, int schemaHash) {
public DropReplicaTask(long backendId, long tabletId, long replicaId, int schemaHash) {
super(null, backendId, TTaskType.DROP, -1L, -1L, -1L, -1L, tabletId);
this.schemaHash = schemaHash;
this.replicaId = replicaId;
}
public TDropTabletReq toThrift() {
@ -33,10 +35,15 @@ public class DropReplicaTask extends AgentTask {
if (this.schemaHash != -1) {
request.setSchemaHash(schemaHash);
}
request.setReplicaId(replicaId);
return request;
}
public int getSchemaHash() {
return schemaHash;
}
public long getReplicaId() {
return replicaId;
}
}

View File

@ -107,14 +107,14 @@ public class AgentTaskTest {
// create
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId,
indexId1, tabletId1, shortKeyNum, schemaHash1,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1,
version, KeysType.AGG_KEYS,
storageType, TStorageMedium.SSD,
columns, null, 0, latch, null,
false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1);
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1);
// push
pushTask =
@ -124,7 +124,7 @@ public class AgentTaskTest {
// clone
cloneTask =
new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, schemaHash1,
new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, schemaHash1,
Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, 3600);
// storageMediaMigrationTask
@ -240,7 +240,7 @@ public class AgentTaskTest {
Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, TTaskType.DROP, true));
dropTask.failed();
DropReplicaTask dropTask2 = new DropReplicaTask(backendId2, tabletId1, schemaHash1);
DropReplicaTask dropTask2 = new DropReplicaTask(backendId2, tabletId1, replicaId1, schemaHash1);
AgentTaskQueue.addTask(dropTask2);
dropTask2.failed();
Assert.assertEquals(1, AgentTaskQueue.getTaskNum(backendId1, TTaskType.DROP, true));