[opt](task-assignment) use consistent hash as default task assigner and cache the consistent hash ring (#28522)
1. Use consistent hash algo as the default assigner for file query scan node
A consistent assignment can better utilize the page cache of BE node.
2. Cache the consistent hash ring
Init a consistent hash ring is time-consuming because there a thousands of virtual node need to be added.
So cache it for better performance
This commit is contained in:
@ -30,6 +30,9 @@ import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
@ -46,7 +49,9 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class FederationBackendPolicy {
|
||||
@ -59,6 +64,53 @@ public class FederationBackendPolicy {
|
||||
private int nextBe = 0;
|
||||
private boolean initialized = false;
|
||||
|
||||
// Create a ConsistentHash ring may be a time-consuming operation, so we cache it.
|
||||
private static LoadingCache<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>> consistentHashCache;
|
||||
|
||||
static {
|
||||
consistentHashCache = CacheBuilder.newBuilder().maximumSize(5)
|
||||
.build(new CacheLoader<HashCacheKey, ConsistentHash<TScanRangeLocations, Backend>>() {
|
||||
@Override
|
||||
public ConsistentHash<TScanRangeLocations, Backend> load(HashCacheKey key) {
|
||||
return new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
|
||||
new BackendHash(), key.bes, Config.virtual_node_number);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static class HashCacheKey {
|
||||
// sorted backend ids as key
|
||||
private List<Long> beIds;
|
||||
// backends is not part of key, just an attachment
|
||||
private List<Backend> bes;
|
||||
|
||||
HashCacheKey(List<Backend> backends) {
|
||||
this.bes = backends;
|
||||
this.beIds = backends.stream().map(b -> b.getId()).sorted().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (!(obj instanceof HashCacheKey)) {
|
||||
return false;
|
||||
}
|
||||
return Objects.equals(beIds, ((HashCacheKey) obj).beIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(beIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HashCache{" + "beIds=" + beIds + '}';
|
||||
}
|
||||
}
|
||||
|
||||
public void init() throws UserException {
|
||||
if (!initialized) {
|
||||
init(Collections.emptyList());
|
||||
@ -96,8 +148,11 @@ public class FederationBackendPolicy {
|
||||
throw new UserException("No available backends");
|
||||
}
|
||||
backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost)));
|
||||
consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
|
||||
new BackendHash(), backends, Config.virtual_node_number);
|
||||
try {
|
||||
consistentHash = consistentHashCache.get(new HashCacheKey(backends));
|
||||
} catch (ExecutionException e) {
|
||||
throw new UserException("failed to get consistent hash", e);
|
||||
}
|
||||
}
|
||||
|
||||
public Backend getNextBe() {
|
||||
@ -111,7 +166,7 @@ public class FederationBackendPolicy {
|
||||
}
|
||||
|
||||
// Try to find a local BE, if not exists, use `getNextBe` instead
|
||||
public Backend getNextLocalBe(List<String> hosts) {
|
||||
public Backend getNextLocalBe(List<String> hosts, TScanRangeLocations scanRangeLocations) {
|
||||
List<Backend> candidateBackends = Lists.newArrayListWithCapacity(hosts.size());
|
||||
for (String host : hosts) {
|
||||
List<Backend> backends = backendMap.get(host);
|
||||
@ -121,7 +176,7 @@ public class FederationBackendPolicy {
|
||||
}
|
||||
|
||||
return CollectionUtils.isEmpty(candidateBackends)
|
||||
? getNextBe()
|
||||
? getNextConsistentBe(scanRangeLocations)
|
||||
: candidateBackends.get(random.nextInt(candidateBackends.size()));
|
||||
}
|
||||
|
||||
|
||||
@ -311,7 +311,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
params.setProperties(locationProperties);
|
||||
}
|
||||
|
||||
boolean enableSqlCache = ConnectContext.get().getSessionVariable().enableFileCache;
|
||||
boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties);
|
||||
List<String> pathPartitionKeys = getPathPartitionKeys();
|
||||
for (Split split : inputSplits) {
|
||||
@ -369,14 +368,12 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
|
||||
TScanRangeLocation location = new TScanRangeLocation();
|
||||
Backend selectedBackend;
|
||||
if (enableSqlCache) {
|
||||
if (enableShortCircuitRead) {
|
||||
// Try to find a local BE if enable hdfs short circuit read
|
||||
selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations);
|
||||
} else {
|
||||
// Use consistent hash to assign the same scan range into the same backend among different queries
|
||||
selectedBackend = backendPolicy.getNextConsistentBe(curLocations);
|
||||
} else if (enableShortCircuitRead) {
|
||||
// Try to find a local BE if enable hdfs short circuit read
|
||||
selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()));
|
||||
} else {
|
||||
selectedBackend = backendPolicy.getNextBe();
|
||||
}
|
||||
setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties);
|
||||
location.setBackendId(selectedBackend.getId());
|
||||
|
||||
@ -22,6 +22,11 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.planner.external.FederationBackendPolicy;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TExternalScanRange;
|
||||
import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TFileScanRange;
|
||||
import org.apache.doris.thrift.TScanRange;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
import mockit.Mock;
|
||||
@ -93,12 +98,50 @@ public class FederationBackendPolicyTest {
|
||||
int invokeTimes = 1000000;
|
||||
Assertions.assertEquals(policy.numBackends(), backendNum);
|
||||
List<String> localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2");
|
||||
TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100);
|
||||
Stopwatch sw = Stopwatch.createStarted();
|
||||
for (int i = 0; i < invokeTimes; i++) {
|
||||
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost()));
|
||||
Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, scanRangeLocations).getHost()));
|
||||
}
|
||||
sw.stop();
|
||||
System.out.println("Invoke getNextLocalBe() " + invokeTimes
|
||||
+ " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConsistentHash() throws UserException {
|
||||
FederationBackendPolicy policy = new FederationBackendPolicy();
|
||||
policy.init();
|
||||
int backendNum = 200;
|
||||
Assertions.assertEquals(policy.numBackends(), backendNum);
|
||||
|
||||
TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100);
|
||||
Assertions.assertEquals(39, policy.getNextConsistentBe(scanRangeLocations).getId());
|
||||
|
||||
scanRangeLocations = getScanRangeLocations("path2", 0, 100);
|
||||
Assertions.assertEquals(78, policy.getNextConsistentBe(scanRangeLocations).getId());
|
||||
}
|
||||
|
||||
private TScanRangeLocations getScanRangeLocations(String path, long startOffset, long size) {
|
||||
// Generate on file scan range
|
||||
TFileScanRange fileScanRange = new TFileScanRange();
|
||||
// Scan range
|
||||
TExternalScanRange externalScanRange = new TExternalScanRange();
|
||||
externalScanRange.setFileScanRange(fileScanRange);
|
||||
TScanRange scanRange = new TScanRange();
|
||||
scanRange.setExtScanRange(externalScanRange);
|
||||
scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path, startOffset, size));
|
||||
// Locations
|
||||
TScanRangeLocations locations = new TScanRangeLocations();
|
||||
locations.setScanRange(scanRange);
|
||||
return locations;
|
||||
}
|
||||
|
||||
private TFileRangeDesc createRangeDesc(String path, long startOffset, long size) {
|
||||
TFileRangeDesc rangeDesc = new TFileRangeDesc();
|
||||
rangeDesc.setPath(path);
|
||||
rangeDesc.setStartOffset(startOffset);
|
||||
rangeDesc.setSize(size);
|
||||
return rangeDesc;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user