[fix](tablet clone) fix single replica load failed during migration (#22077)

This commit is contained in:
yujun
2023-07-27 20:38:03 +08:00
committed by GitHub
parent e39d234db9
commit 461c4dfaae
8 changed files with 120 additions and 47 deletions

View File

@ -129,9 +129,16 @@ public class Replica implements Writable {
private long furtherRepairSetTime = -1;
private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; // 20min
// if this watermarkTxnId is set, which means before deleting a replica,
// we should ensure that all txns on this replicas are finished.
private long watermarkTxnId = -1;
/* Decommission a backend B, steps are as follow:
* 1. wait peer backends catchup with B;
* 2. B change state to DECOMMISSION, set preWatermarkTxnId. B can load data now.
* 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. B can't load data now.
* 4. wait txn before postWatermarkTxnId finished, delete B.
*
*/
private long preWatermarkTxnId = -1;
private long postWatermarkTxnId = -1;
public Replica() {
}
@ -568,12 +575,20 @@ public class Replica implements Writable {
}
}
public void setWatermarkTxnId(long watermarkTxnId) {
this.watermarkTxnId = watermarkTxnId;
public void setPreWatermarkTxnId(long preWatermarkTxnId) {
this.preWatermarkTxnId = preWatermarkTxnId;
}
public long getWatermarkTxnId() {
return watermarkTxnId;
public long getPreWatermarkTxnId() {
return preWatermarkTxnId;
}
public void setPostWatermarkTxnId(long postWatermarkTxnId) {
this.postWatermarkTxnId = postWatermarkTxnId;
}
public long getPostWatermarkTxnId() {
return postWatermarkTxnId;
}
public boolean isAlive() {

View File

@ -211,19 +211,7 @@ public class Tablet extends MetaObject implements Writable {
}
public List<Long> getNormalReplicaBackendIds() {
List<Long> beIds = Lists.newArrayList();
SystemInfoService infoService = Env.getCurrentSystemInfo();
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
}
ReplicaState state = replica.getState();
if (infoService.checkBackendAlive(replica.getBackendId()) && state.canLoad()) {
beIds.add(replica.getBackendId());
}
}
return beIds;
return Lists.newArrayList(getNormalReplicaBackendPathMap().keySet());
}
// return map of (BE id -> path hash) of normal replicas
@ -232,12 +220,17 @@ public class Tablet extends MetaObject implements Writable {
Multimap<Long, Long> map = HashMultimap.create();
SystemInfoService infoService = Env.getCurrentSystemInfo();
for (Replica replica : replicas) {
if (!infoService.checkBackendAlive(replica.getBackendId())) {
continue;
}
if (replica.isBad()) {
continue;
}
ReplicaState state = replica.getState();
if (infoService.checkBackendLoadAvailable(replica.getBackendId()) && state.canLoad()) {
if (state.canLoad()
|| (state == ReplicaState.DECOMMISSION && replica.getPostWatermarkTxnId() < 0)) {
map.put(replica.getBackendId(), replica.getPathHash());
}
}

View File

@ -748,7 +748,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
//
// If we do not reset this replica state to NORMAL, the tablet's health status will be in VERSION_INCOMPLETE
// forever, because the replica in the DECOMMISSION state will not receive the load task.
chosenReplica.setWatermarkTxnId(-1);
chosenReplica.setPreWatermarkTxnId(-1);
chosenReplica.setPostWatermarkTxnId(-1);
chosenReplica.setState(ReplicaState.NORMAL);
setDecommissionTime(-1);
LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete,"
@ -1142,6 +1143,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60 * 1000L;
value += 5000L * (failedSchedCounter / 10);
if (schedFailedCode == SubCode.WAITING_DECOMMISSION) {
value += 5 * 1000L;
}
if (type == Type.BALANCE) {
value += 30 * 60 * 1000L;
}
@ -1200,7 +1205,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
// any intermediate state it set during the scheduling process.
if (replica.getState() == ReplicaState.DECOMMISSION) {
replica.setState(ReplicaState.NORMAL);
replica.setWatermarkTxnId(-1);
replica.setPreWatermarkTxnId(-1);
replica.setPostWatermarkTxnId(-1);
LOG.debug("reset replica {} on backend {} of tablet {} state from DECOMMISSION to NORMAL",
replica.getId(), replica.getBackendId(), tabletId);
}

View File

@ -1065,7 +1065,7 @@ public class TabletScheduler extends MasterDaemon {
if (matchupReplicaCount <= 1) {
LOG.info("can not delete only one replica, tabletId = {} replicaId = {}", tabletCtx.getTabletId(),
replica.getId());
throw new SchedException(Status.FINISHED, "the only one latest replia can not be dropped, tabletId = "
throw new SchedException(Status.UNRECOVERABLE, "the only one latest replia can not be dropped, tabletId = "
+ tabletCtx.getTabletId() + ", replicaId = " + replica.getId());
}
@ -1080,25 +1080,46 @@ public class TabletScheduler extends MasterDaemon {
* If all are finished, which means this replica is
* safe to be deleted.
*/
if (!force && !Config.enable_force_drop_redundant_replica && replica.getState().canLoad()
&& replica.getWatermarkTxnId() == -1 && !FeConstants.runningUnitTest) {
long nextTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
replica.setWatermarkTxnId(nextTxnId);
replica.setState(ReplicaState.DECOMMISSION);
// set priority to normal because it may wait for a long time. Remain it as VERY_HIGH may block other task.
tabletCtx.setPriority(Priority.NORMAL);
LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason);
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
"set watermark txn " + nextTxnId);
} else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) {
long watermarkTxnId = replica.getWatermarkTxnId();
if (!force && !Config.enable_force_drop_redundant_replica
&& !FeConstants.runningUnitTest
&& (replica.getState().canLoad() || replica.getState() == ReplicaState.DECOMMISSION)) {
if (replica.getState() != ReplicaState.DECOMMISSION) {
replica.setState(ReplicaState.DECOMMISSION);
// set priority to normal because it may wait for a long time.
// Remain it as VERY_HIGH may block other task.
tabletCtx.setPriority(Priority.NORMAL);
LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}",
replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason);
}
long preWatermarkTxnId = replica.getPreWatermarkTxnId();
if (preWatermarkTxnId == -1) {
preWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
replica.setPreWatermarkTxnId(preWatermarkTxnId);
}
long postWatermarkTxnId = replica.getPostWatermarkTxnId();
if (postWatermarkTxnId == -1) {
try {
if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(preWatermarkTxnId,
tabletCtx.getDbId(), tabletCtx.getTblId(), tabletCtx.getPartitionId())) {
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
"wait txn before pre watermark txn " + preWatermarkTxnId + " to be finished");
}
} catch (AnalysisException e) {
throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
}
postWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
replica.setPostWatermarkTxnId(postWatermarkTxnId);
}
try {
if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) {
if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(postWatermarkTxnId,
tabletCtx.getDbId(), tabletCtx.getTblId(), tabletCtx.getPartitionId())) {
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION,
"wait txn before " + watermarkTxnId + " to be finished");
"wait txn before post watermark txn " + postWatermarkTxnId + " to be finished");
}
} catch (AnalysisException e) {
throw new SchedException(Status.UNRECOVERABLE, e.getMessage());

View File

@ -1709,6 +1709,35 @@ public class DatabaseTransactionMgr {
return true;
}
public boolean isPreviousTransactionsFinished(long endTransactionId, long tableId, long partitionId) {
readLock();
try {
for (Map.Entry<Long, TransactionState> entry : idToRunningTransactionState.entrySet()) {
TransactionState transactionState = entry.getValue();
if (entry.getKey() > endTransactionId
|| transactionState.getTransactionStatus().isFinalStatus()
|| transactionState.getDbId() != dbId
|| !transactionState.getTableIdList().contains(tableId)) {
continue;
}
if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) {
TableCommitInfo tableCommitInfo = transactionState.getTableCommitInfo(tableId);
// txn not contains this partition
if (tableCommitInfo != null
&& tableCommitInfo.getIdToPartitionCommitInfo().get(partitionId) == null) {
continue;
}
}
return false;
}
} finally {
readUnlock();
}
return true;
}
/**
* check if there exists a intersection between the source tableId list and target tableId list
* if one of them is null or empty, that means that we don't know related tables in tableList,

View File

@ -425,6 +425,19 @@ public class GlobalTransactionMgr implements Writable {
}
}
/**
* Check whether a load job for a partition already exists before
* checking all `TransactionId` related with this load job have finished.
* finished
*
* @throws AnalysisException is database does not exist anymore
*/
public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, long tableId,
long partitionId) throws AnalysisException {
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
return dbTransactionMgr.isPreviousTransactionsFinished(endTransactionId, tableId, partitionId);
}
/**
* The txn cleaner will run at a fixed interval and try to delete expired and timeout txns:
* expired: txn is in VISIBLE or ABORTED, and is expired.

View File

@ -523,10 +523,6 @@ public class TransactionState implements Writable {
return this.idToTableCommitInfos.get(tableId);
}
public void removeTable(long tableId) {
this.idToTableCommitInfos.remove(tableId);
}
public void setTxnCommitAttachment(TxnCommitAttachment txnCommitAttachment) {
this.txnCommitAttachment = txnCommitAttachment;
}

View File

@ -307,8 +307,8 @@ public class RebalanceTest {
Replica decommissionedReplica = replicas.stream()
.filter(r -> r.getState() == Replica.ReplicaState.DECOMMISSION)
.collect(MoreCollectors.onlyElement());
// expected watermarkTxnId is 111
Assert.assertEquals(111, decommissionedReplica.getWatermarkTxnId());
Assert.assertEquals(111, decommissionedReplica.getPreWatermarkTxnId());
Assert.assertEquals(112, decommissionedReplica.getPostWatermarkTxnId());
});
// Delete replica should change invertedIndex too