[enhancement](recover) support skipping missing version in select by session variable (#25654)

This commit is contained in:
xy720
2023-11-02 20:01:51 +08:00
committed by GitHub
parent 89cf828f03
commit a5ef90dacc
20 changed files with 147 additions and 78 deletions

View File

@ -38,6 +38,8 @@ import java.util.Comparator;
public class Replica implements Writable {
private static final Logger LOG = LogManager.getLogger(Replica.class);
public static final VersionComparator<Replica> VERSION_DESC_COMPARATOR = new VersionComparator<Replica>();
public static final LastSuccessVersionComparator<Replica> LAST_SUCCESS_VERSION_COMPARATOR =
new LastSuccessVersionComparator<Replica>();
public static final IdComparator<Replica> ID_COMPARATOR = new IdComparator<Replica>();
public enum ReplicaState {
@ -682,6 +684,22 @@ public class Replica implements Writable {
}
}
private static class LastSuccessVersionComparator<T extends Replica> implements Comparator<T> {
public LastSuccessVersionComparator() {
}
@Override
public int compare(T replica1, T replica2) {
if (replica1.getLastSuccessVersion() < replica2.getLastSuccessVersion()) {
return 1;
} else if (replica1.getLastSuccessVersion() == replica2.getLastSuccessVersion()) {
return 0;
} else {
return -1;
}
}
}
private static class IdComparator<T extends Replica> implements Comparator<T> {
public IdComparator() {
}

View File

@ -238,7 +238,7 @@ public class Tablet extends MetaObject implements Writable {
}
// for query
public List<Replica> getQueryableReplicas(long visibleVersion) {
public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) {
List<Replica> allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size());
List<Replica> auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size());
for (Replica replica : replicas) {
@ -247,7 +247,7 @@ public class Tablet extends MetaObject implements Writable {
}
// Skip the missing version replica
if (replica.getLastFailedVersion() > 0) {
if (replica.getLastFailedVersion() > 0 && !allowFailedVersion) {
continue;
}

View File

@ -713,14 +713,19 @@ public class OlapScanNode extends ScanNode {
String visibleVersionStr = String.valueOf(visibleVersion);
Set<Tag> allowedTags = Sets.newHashSet();
int useFixReplica = -1;
boolean needCheckTags = false;
boolean skipMissingVersion = false;
if (ConnectContext.get() != null) {
allowedTags = ConnectContext.get().getResourceTags();
needCheckTags = ConnectContext.get().isResourceTagsSet();
useFixReplica = ConnectContext.get().getSessionVariable().useFixReplica;
// if use_fix_replica is set to true, set skip_missing_version to false
skipMissingVersion = useFixReplica == -1 && ConnectContext.get().getSessionVariable().skipMissingVersion;
}
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
if (!Config.recover_with_skip_missing_version.equalsIgnoreCase("disable")) {
if (skipMissingVersion) {
long tabletVersion = -1L;
for (Replica replica : tablet.getReplicas()) {
if (replica.getVersion() > tabletVersion) {
@ -743,7 +748,7 @@ public class OlapScanNode extends ScanNode {
paloRange.setTabletId(tabletId);
// random shuffle List && only collect one copy
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion);
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
if (replicas.isEmpty()) {
LOG.warn("no queryable replica found in tablet {}. visible version {}", tabletId, visibleVersion);
StringBuilder sb = new StringBuilder(
@ -757,12 +762,13 @@ public class OlapScanNode extends ScanNode {
throw new UserException(sb.toString());
}
int useFixReplica = -1;
if (ConnectContext.get() != null) {
useFixReplica = ConnectContext.get().getSessionVariable().useFixReplica;
}
if (useFixReplica == -1) {
Collections.shuffle(replicas);
if (skipMissingVersion) {
// sort by replica's last success version, higher success version in the front.
replicas.sort(Replica.LAST_SUCCESS_VERSION_COMPARATOR);
} else {
Collections.shuffle(replicas);
}
} else {
LOG.debug("use fix replica, value: {}, replica num: {}", useFixReplica, replicas.size());
// sort by replica id
@ -771,6 +777,7 @@ public class OlapScanNode extends ScanNode {
replicas.clear();
replicas.add(replica);
}
final long coolDownReplicaId = tablet.getCooldownReplicaId();
// we prefer to query using cooldown replica to make sure the cache is fully utilized
// for example: consider there are 3BEs(A,B,C) and each has one replica for tablet X. and X
@ -832,14 +839,15 @@ public class OlapScanNode extends ScanNode {
collectedStat = true;
}
scanBackendIds.add(backend.getId());
// For skipping missing version of tablet, we only select the backend with the highest last
// success version replica to save as much data as possible.
if (skipMissingVersion) {
break;
}
}
if (tabletIsNull) {
if (Config.recover_with_skip_missing_version.equalsIgnoreCase("ignore_all")) {
continue;
} else {
throw new UserException(tabletId + " have no queryable replicas. err: "
+ Joiner.on(", ").join(errs));
}
throw new UserException(tabletId + " have no queryable replicas. err: "
+ Joiner.on(", ").join(errs));
}
TScanRange scanRange = new TScanRange();
scanRange.setPaloScanRange(paloRange);

View File

@ -293,6 +293,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String SKIP_DELETE_BITMAP = "skip_delete_bitmap";
public static final String SKIP_MISSING_VERSION = "skip_missing_version";
public static final String ENABLE_PUSH_DOWN_NO_GROUP_AGG = "enable_push_down_no_group_agg";
public static final String ENABLE_CBO_STATISTICS = "enable_cbo_statistics";
@ -976,6 +978,19 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = SKIP_DELETE_BITMAP)
public boolean skipDeleteBitmap = false;
// This variable replace the original FE config `recover_with_skip_missing_version`.
// In some scenarios, all replicas of tablet are having missing versions, and the tablet is unable to recover.
// This config can control the behavior of query. When it is set to `true`, the query will ignore the
// visible version recorded in FE partition, use the replica version. If the replica on BE has missing versions,
// the query will directly skip this missing version, and only return the data of the existing versions.
// Besides, the query will always try to select the one with the highest lastSuccessVersion among all surviving
// BE replicas, so as to recover as much data as possible.
// You should only open it in the emergency scenarios mentioned above, only used for temporary recovery queries.
// This variable conflicts with the use_fix_replica variable, when the use_fix_replica variable is not -1,
// this variable will not work.
@VariableMgr.VarAttr(name = SKIP_MISSING_VERSION)
public boolean skipMissingVersion = false;
// This variable is used to avoid FE fallback to the original parser. When we execute SQL in regression tests
// for nereids, fallback will cause the Doris return the correct result although the syntax is unsupported
// in nereids for some mistaken modification. You should set it on the
@ -2436,6 +2451,8 @@ public class SessionVariable implements Serializable, Writable {
tResult.setEnableDecimal256(enableNereidsPlanner && enableDecimal256);
tResult.setSkipMissingVersion(skipMissingVersion);
return tResult;
}