[fix](cooldown) Fix bug in concurrent update_cooldown_conf and operations that update cooldowned data (#17086)
This commit is contained in:
@ -111,6 +111,7 @@ public class Replica implements Writable {
|
||||
private boolean bad = false;
|
||||
|
||||
private TUniqueId cooldownMetaId;
|
||||
private long cooldownTerm = -1;
|
||||
|
||||
/*
|
||||
* If set to true, with means this replica need to be repaired. explicitly.
|
||||
@ -246,6 +247,14 @@ public class Replica implements Writable {
|
||||
this.cooldownMetaId = cooldownMetaId;
|
||||
}
|
||||
|
||||
public long getCooldownTerm() {
|
||||
return cooldownTerm;
|
||||
}
|
||||
|
||||
public void setCooldownTerm(long cooldownTerm) {
|
||||
this.cooldownTerm = cooldownTerm;
|
||||
}
|
||||
|
||||
public boolean needFurtherRepair() {
|
||||
if (needFurtherRepair && System.currentTimeMillis() - this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
|
||||
return true;
|
||||
|
||||
@ -190,10 +190,11 @@ public class TabletInvertedIndex {
|
||||
}
|
||||
}
|
||||
|
||||
if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownReplicaId()) {
|
||||
if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownTerm()) {
|
||||
handleCooldownConf(tabletMeta, backendTabletInfo, cooldownConfToPush,
|
||||
cooldownConfToUpdate);
|
||||
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
|
||||
replica.setCooldownTerm(backendTabletInfo.getCooldownTerm());
|
||||
}
|
||||
|
||||
long partitionId = tabletMeta.getPartitionId();
|
||||
@ -395,7 +396,7 @@ public class TabletInvertedIndex {
|
||||
return;
|
||||
}
|
||||
|
||||
if (cooldownConf.first != beTabletInfo.getCooldownReplicaId()) {
|
||||
if (beTabletInfo.getCooldownTerm() < cooldownConf.second) {
|
||||
CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id, cooldownConf.first, cooldownConf.second);
|
||||
synchronized (cooldownConfToPush) {
|
||||
cooldownConfToPush.add(conf);
|
||||
|
||||
@ -1147,18 +1147,40 @@ public class ReportHandler extends Daemon {
|
||||
if (backendTabletInfo.isSetCooldownMetaId()) {
|
||||
// replica has cooldowned data
|
||||
do {
|
||||
if (backendTabletInfo.getReplicaId() == tablet.getCooldownConf().first) {
|
||||
Pair<Long, Long> cooldownConf = tablet.getCooldownConf();
|
||||
if (backendTabletInfo.getCooldownTerm() > cooldownConf.second) {
|
||||
// should not be here
|
||||
LOG.warn("report cooldownTerm({}) > cooldownTerm in TabletMeta({}), tabletId={}",
|
||||
backendTabletInfo.getCooldownTerm(), cooldownConf.second, tabletId);
|
||||
return false;
|
||||
}
|
||||
if (backendTabletInfo.getReplicaId() == cooldownConf.first) {
|
||||
// this replica is true cooldown replica, so replica's cooldowned data must not be deleted
|
||||
break;
|
||||
}
|
||||
if (backendTabletInfo.getReplicaId() != backendTabletInfo.getCooldownReplicaId()
|
||||
&& Env.getCurrentInvertedIndex().getReplicas(tabletId).stream()
|
||||
.anyMatch(r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
|
||||
// this replica can not cooldown data, and shares same cooldowned data with others replica,
|
||||
// so replica's cooldowned data must not be deleted
|
||||
break;
|
||||
List<Replica> replicas = Env.getCurrentInvertedIndex().getReplicas(tabletId);
|
||||
if (backendTabletInfo.getCooldownTerm() <= 0) {
|
||||
if (replicas.stream().anyMatch(
|
||||
r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
|
||||
// this backend is just restarted, and shares same cooldowned data with others replica,
|
||||
// so replica's cooldowned data must not be deleted
|
||||
break;
|
||||
}
|
||||
}
|
||||
LOG.warn("replica's cooldowned data may have been deleted");
|
||||
long minCooldownTerm = Long.MAX_VALUE;
|
||||
for (Replica r : replicas) {
|
||||
minCooldownTerm = Math.min(r.getCooldownTerm(), minCooldownTerm);
|
||||
}
|
||||
if (backendTabletInfo.getCooldownTerm() >= minCooldownTerm) {
|
||||
if (replicas.stream().anyMatch(
|
||||
r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
|
||||
// this replica shares same cooldowned data with others replica, and won't follow data
|
||||
// of lower cooldown term, so replica's cooldowned data must not be deleted
|
||||
break;
|
||||
}
|
||||
}
|
||||
LOG.warn("replica's cooldowned data may have been deleted. tabletId={}, replicaId={}", tabletId,
|
||||
replicaId);
|
||||
return false;
|
||||
} while (false);
|
||||
}
|
||||
|
||||
@ -48,6 +48,7 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DuplicatedRequestException;
|
||||
import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.PatternMatcher;
|
||||
import org.apache.doris.common.PatternMatcherException;
|
||||
import org.apache.doris.common.ThriftServerContext;
|
||||
@ -220,9 +221,9 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
return;
|
||||
}
|
||||
// check cooldownReplicaId
|
||||
long cooldownReplicaId = tablet.getCooldownConf().first;
|
||||
if (cooldownReplicaId != info.cooldown_replica_id) {
|
||||
LOG.info("cooldown replica id not match({} vs {}), tablet={}", cooldownReplicaId,
|
||||
Pair<Long, Long> cooldownConf = tablet.getCooldownConf();
|
||||
if (cooldownConf.first != info.cooldown_replica_id) {
|
||||
LOG.info("cooldown replica id not match({} vs {}), tablet={}", cooldownConf.first,
|
||||
info.cooldown_replica_id, info.tablet_id);
|
||||
return;
|
||||
}
|
||||
@ -239,6 +240,11 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
LOG.info("replica is not alive, tablet={}, replica={}", info.tablet_id, replica.getId());
|
||||
return;
|
||||
}
|
||||
if (replica.getCooldownTerm() != cooldownConf.second) {
|
||||
LOG.info("replica's cooldown term not match({} vs {}), tablet={}", cooldownConf.second,
|
||||
replica.getCooldownTerm(), info.tablet_id);
|
||||
return;
|
||||
}
|
||||
if (!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) {
|
||||
LOG.info("cooldown meta id are not same, tablet={}", info.tablet_id);
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user