[improvement](tablet clone) fix balanced new replica will be removed when load txn continuously and none-stop (#24845)
This commit is contained in:
@ -929,6 +929,18 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static long tablet_repair_delay_factor_second = 60;
|
||||
|
||||
/**
|
||||
* clone a tablet, further repair timeout.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static long tablet_further_repair_timeout_second = 20 * 60;
|
||||
|
||||
/**
|
||||
* clone a tablet, further repair max times.
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int tablet_further_repair_max_times = 5;
|
||||
|
||||
/**
|
||||
* the default slot number per path for hdd in tablet scheduler
|
||||
* TODO(cmy): remove this config and dynamically adjust it by clone task statistic
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
@ -114,21 +115,25 @@ public class Replica implements Writable {
|
||||
private long cooldownTerm = -1;
|
||||
|
||||
/*
|
||||
* If set to true, with means this replica need to be repaired. explicitly.
|
||||
* This can happen when this replica is created by a balance clone task, and
|
||||
* when task finished, the version of this replica is behind the partition's visible version.
|
||||
* So this replica need a further repair.
|
||||
* If we do not do this, this replica will be treated as version stale, and will be removed,
|
||||
* so that the balance task is failed, which is unexpected.
|
||||
*
|
||||
* furtherRepairSetTime set alone with needFurtherRepair.
|
||||
* furtherRepairSetTime and leftFurtherRepairCount are set alone with needFurtherRepair.
|
||||
* This is an insurance, in case that further repair task always fail. If 20 min passed
|
||||
* since we set needFurtherRepair to true, the 'needFurtherRepair' will be set to false.
|
||||
*/
|
||||
private boolean needFurtherRepair = false;
|
||||
private long furtherRepairSetTime = -1;
|
||||
private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; // 20min
|
||||
private int leftFurtherRepairCount = 0;
|
||||
|
||||
// During full clone, the replica's state is CLONE, it will not load the data.
|
||||
// After full clone finished, even if the replica's version = partition's visible version,
|
||||
//
|
||||
// notice: furtherRepairWatermarkTxnTd is used to clone a replica, protected it from be removed.
|
||||
//
|
||||
private long furtherRepairWatermarkTxnTd = -1;
|
||||
|
||||
/* Decommission a backend B, steps are as follow:
|
||||
* 1. wait peer backends catchup with B;
|
||||
@ -136,6 +141,8 @@ public class Replica implements Writable {
|
||||
* 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. B can't load data now.
|
||||
* 4. wait txn before postWatermarkTxnId finished, delete B.
|
||||
*
|
||||
* notice: preWatermarkTxnId and postWatermarkTxnId are used to delete this replica.
|
||||
*
|
||||
*/
|
||||
private long preWatermarkTxnId = -1;
|
||||
private long postWatermarkTxnId = -1;
|
||||
@ -263,15 +270,35 @@ public class Replica implements Writable {
|
||||
}
|
||||
|
||||
public boolean needFurtherRepair() {
|
||||
if (needFurtherRepair && System.currentTimeMillis() - this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return leftFurtherRepairCount > 0
|
||||
&& System.currentTimeMillis() < furtherRepairSetTime
|
||||
+ Config.tablet_further_repair_timeout_second * 1000;
|
||||
}
|
||||
|
||||
public void setNeedFurtherRepair(boolean needFurtherRepair) {
|
||||
this.needFurtherRepair = needFurtherRepair;
|
||||
this.furtherRepairSetTime = System.currentTimeMillis();
|
||||
if (needFurtherRepair) {
|
||||
furtherRepairSetTime = System.currentTimeMillis();
|
||||
leftFurtherRepairCount = Config.tablet_further_repair_max_times;
|
||||
} else {
|
||||
leftFurtherRepairCount = 0;
|
||||
furtherRepairSetTime = -1;
|
||||
}
|
||||
}
|
||||
|
||||
public void incrFurtherRepairCount() {
|
||||
leftFurtherRepairCount--;
|
||||
}
|
||||
|
||||
public int getLeftFurtherRepairCount() {
|
||||
return leftFurtherRepairCount;
|
||||
}
|
||||
|
||||
public long getFurtherRepairWatermarkTxnTd() {
|
||||
return furtherRepairWatermarkTxnTd;
|
||||
}
|
||||
|
||||
public void setFurtherRepairWatermarkTxnTd(long furtherRepairWatermarkTxnTd) {
|
||||
this.furtherRepairWatermarkTxnTd = furtherRepairWatermarkTxnTd;
|
||||
}
|
||||
|
||||
// for compatibility
|
||||
@ -300,6 +327,7 @@ public class Replica implements Writable {
|
||||
|
||||
public synchronized void adminUpdateVersionInfo(Long version, Long lastFailedVersion, Long lastSuccessVersion,
|
||||
long updateTime) {
|
||||
long oldLastFailedVersion = this.lastFailedVersion;
|
||||
if (version != null) {
|
||||
this.version = version;
|
||||
}
|
||||
@ -324,6 +352,13 @@ public class Replica implements Writable {
|
||||
if (this.lastSuccessVersion < this.version) {
|
||||
this.lastSuccessVersion = this.version;
|
||||
}
|
||||
if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) {
|
||||
LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}",
|
||||
this, oldLastFailedVersion);
|
||||
} else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) {
|
||||
LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}",
|
||||
this, oldLastFailedVersion);
|
||||
}
|
||||
}
|
||||
|
||||
/* last failed version: LFV
|
||||
@ -374,6 +409,8 @@ public class Replica implements Writable {
|
||||
return;
|
||||
}
|
||||
|
||||
long oldLastFailedVersion = this.lastFailedVersion;
|
||||
|
||||
this.version = newVersion;
|
||||
this.dataSize = newDataSize;
|
||||
this.remoteDataSize = newRemoteDataSize;
|
||||
@ -427,6 +464,14 @@ public class Replica implements Writable {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("after update {}", this.toString());
|
||||
}
|
||||
|
||||
if (oldLastFailedVersion < 0 && this.lastFailedVersion > 0) {
|
||||
LOG.info("change replica last failed version from '< 0' to '> 0', replica {}, old last failed version {}",
|
||||
this, oldLastFailedVersion);
|
||||
} else if (oldLastFailedVersion > 0 && this.lastFailedVersion < 0) {
|
||||
LOG.info("change replica last failed version from '> 0' to '< 0', replica {}, old last failed version {}",
|
||||
this, oldLastFailedVersion);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void updateLastFailedVersion(long lastFailedVersion) {
|
||||
|
||||
@ -188,6 +188,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
private Replica tempSrcReplica = null;
|
||||
private long destBackendId = -1;
|
||||
private long destPathHash = -1;
|
||||
private long destOldVersion = -1;
|
||||
// for disk balance to set migration task's datadir
|
||||
private String destPath = null;
|
||||
private String errMsg = null;
|
||||
@ -912,12 +913,12 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
// if this is a balance task, or this is a repair task with
|
||||
// REPLICA_MISSING/REPLICA_RELOCATING,
|
||||
// we create a new replica with state CLONE
|
||||
long replicaId = 0;
|
||||
Replica replica = null;
|
||||
if (tabletStatus == TabletStatus.REPLICA_MISSING
|
||||
|| tabletStatus == TabletStatus.REPLICA_RELOCATING || type == Type.BALANCE
|
||||
|| tabletStatus == TabletStatus.COLOCATE_MISMATCH
|
||||
|| tabletStatus == TabletStatus.REPLICA_MISSING_FOR_TAG) {
|
||||
Replica cloneReplica = new Replica(
|
||||
replica = new Replica(
|
||||
Env.getCurrentEnv().getNextId(), destBackendId,
|
||||
-1 /* version */, schemaHash,
|
||||
-1 /* data size */, -1, -1 /* row count */,
|
||||
@ -925,15 +926,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
committedVersion, /* use committed version as last failed version */
|
||||
-1 /* last success version */);
|
||||
|
||||
LOG.info("create clone task to make new replica, tabletId={}, replicaId={}", tabletId,
|
||||
cloneReplica.getId());
|
||||
// addReplica() method will add this replica to tablet inverted index too.
|
||||
tablet.addReplica(cloneReplica);
|
||||
replicaId = cloneReplica.getId();
|
||||
} else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
|
||||
tablet.addReplica(replica);
|
||||
} else {
|
||||
// tabletStatus is VERSION_INCOMPLETE || NEED_FURTHER_REPAIR
|
||||
Preconditions.checkState(type == Type.REPAIR, type);
|
||||
// double check
|
||||
Replica replica = tablet.getReplicaByBackendId(destBackendId);
|
||||
replica = tablet.getReplicaByBackendId(destBackendId);
|
||||
if (replica == null) {
|
||||
throw new SchedException(Status.SCHEDULE_FAILED, "dest replica does not exist on BE " + destBackendId);
|
||||
}
|
||||
@ -942,17 +941,18 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
throw new SchedException(Status.SCHEDULE_FAILED, "dest replica's path hash is changed. "
|
||||
+ "current: " + replica.getPathHash() + ", scheduled: " + destPathHash);
|
||||
}
|
||||
replicaId = replica.getId();
|
||||
}
|
||||
|
||||
TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(), srcBe.getHttpPort());
|
||||
TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(), destBe.getHttpPort());
|
||||
|
||||
cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId, partitionId, indexId, tabletId,
|
||||
replicaId, schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
|
||||
replica.getId(), schemaHash, Lists.newArrayList(tSrcBe), storageMedium,
|
||||
visibleVersion, (int) (taskTimeoutMs / 1000));
|
||||
destOldVersion = replica.getVersion();
|
||||
cloneTask.setPathHash(srcPathHash, destPathHash);
|
||||
LOG.info("create clone task to repair replica, tabletId={}, replicaId={}", tabletId, replicaId);
|
||||
LOG.info("create clone task to repair replica, tabletId={}, replica={}, visible version {}, tablet status {}",
|
||||
tabletId, replica, visibleVersion, tabletStatus);
|
||||
|
||||
this.state = State.RUNNING;
|
||||
return cloneTask;
|
||||
@ -1078,16 +1078,51 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
replica.setPathHash(reportedTablet.getPathHash());
|
||||
}
|
||||
|
||||
if (this.type == Type.BALANCE) {
|
||||
long partitionVisibleVersion = partition.getVisibleVersion();
|
||||
if (replica.getVersion() < partitionVisibleVersion) {
|
||||
// see comment 'needFurtherRepair' of Replica for explanation.
|
||||
// no need to persist this info. If FE restart, just do it again.
|
||||
replica.setNeedFurtherRepair(true);
|
||||
if (type == Type.BALANCE) {
|
||||
replica.setNeedFurtherRepair(true);
|
||||
try {
|
||||
long furtherRepairWatermarkTxnTd = Env.getCurrentGlobalTransactionMgr()
|
||||
.getTransactionIDGenerator().getNextTransactionId();
|
||||
replica.setFurtherRepairWatermarkTxnTd(furtherRepairWatermarkTxnTd);
|
||||
LOG.info("new replica {} of tablet {} set further repair watermark id {}",
|
||||
replica, tabletId, furtherRepairWatermarkTxnTd);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("new replica {} set further repair watermark id failed", replica, e);
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
// isCatchup should check the txns during ReplicaState CLONE finished.
|
||||
// Because when replica's state = CLONE, it will not load txns.
|
||||
// Even if this replica version = partition visible version, but later if the txns during CLONE
|
||||
// change from prepare to committed or visible, this replica will be fall behind and be removed
|
||||
// in REDUNDANT detection.
|
||||
//
|
||||
boolean isCatchup = false;
|
||||
if (replica.getVersion() >= partition.getVisibleVersion() && replica.getLastFailedVersion() < 0) {
|
||||
long furtherRepairWatermarkTxnTd = replica.getFurtherRepairWatermarkTxnTd();
|
||||
if (furtherRepairWatermarkTxnTd < 0) {
|
||||
isCatchup = true;
|
||||
} else {
|
||||
try {
|
||||
if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
|
||||
furtherRepairWatermarkTxnTd, dbId, tblId, partitionId)) {
|
||||
isCatchup = true;
|
||||
LOG.info("new replica {} of tablet {} has catchup with further repair watermark id {}",
|
||||
replica, tabletId, furtherRepairWatermarkTxnTd);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
isCatchup = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
replica.incrFurtherRepairCount();
|
||||
if (isCatchup || replica.getLeftFurtherRepairCount() <= 0) {
|
||||
replica.setNeedFurtherRepair(false);
|
||||
}
|
||||
if (!replica.needFurtherRepair()) {
|
||||
replica.setFurtherRepairWatermarkTxnTd(-1);
|
||||
}
|
||||
|
||||
ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId, tblId, partitionId, indexId,
|
||||
tabletId, destBackendId, replica.getId(),
|
||||
@ -1109,7 +1144,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
}
|
||||
|
||||
state = State.FINISHED;
|
||||
LOG.info("clone finished: {}", this);
|
||||
LOG.info("clone finished: {}, replica {}, replica old version {}, need further repair {}, is catchup {}",
|
||||
this, replica, destOldVersion, replica.needFurtherRepair(), isCatchup);
|
||||
} finally {
|
||||
olapTable.writeUnlock();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user