[Enhancement]Make Cholocate table join more load balance (#5104)

When two colocate tables make join operation, to make join operation locally,
the tablet belongs to the same bucket sequence will be distributed to the same host.
When choosing which host for a bucket sequence, it takes random strategy.
Random strategy can not make query task load balance logically for one query.

Therefore, this patch takes round-robin strategy, make buckets distributed evenly.
For example, if there are 6 bucket sequences and 3 hosts,
it is better to distributed 2 buckets sequence for every host.
This commit is contained in:
xinghuayu007
2020-12-31 09:47:06 +08:00
committed by GitHub
parent d7a584ac59
commit 2e95b1c389
3 changed files with 127 additions and 33 deletions

View File

@ -1251,12 +1251,12 @@ public class Coordinator {
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
}
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
for(Integer bucketSeq: scanNode.bucketSeq2locations.keySet()) {
//fill scanRangeParamsList
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq);
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost);
}
for(TScanRangeLocations location: locations) {
@ -1274,50 +1274,51 @@ public class Coordinator {
}
}
// randomly choose a backend from the TScanRangeLocations for a certain bucket sequence.
private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq) throws Exception {
int randomLocation = new Random().nextInt(seqLocation.locations.size());
//ensure bucket sequence distribued to every host evenly
private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq,
HashMap<TNetworkAddress, Long> assignedBytesPerHost) throws Exception {
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef);
selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, backendIdRef);
Backend backend = this.idToBackend.get(backendIdRef.getRef());
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}
public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation,
HashMap<TNetworkAddress, Long> assignedBytesPerHost,
Reference<Long> backendIdRef) throws UserException {
Long minAssignedBytes = Long.MAX_VALUE;
TScanRangeLocation minLocation = null;
Long step = 1L;
for (final TScanRangeLocation location : seqLocation.getLocations()) {
Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L);
if (assignedBytes < minAssignedBytes) {
minAssignedBytes = assignedBytes;
minLocation = location;
}
}
TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, seqLocation.locations, this.idToBackend, backendIdRef);
if (assignedBytesPerHost.containsKey(location.server)) {
assignedBytesPerHost.put(location.server,
assignedBytesPerHost.get(location.server) + step);
} else {
assignedBytesPerHost.put(location.server, step);
}
return location;
}
private void computeScanRangeAssignmentByScheduler(
final ScanNode scanNode,
final List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment) throws Exception {
HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
Long step = 1L;
for (TScanRangeLocations scanRangeLocations : locations) {
// assign this scan range to the host w/ the fewest assigned bytes
Long minAssignedBytes = Long.MAX_VALUE;
TScanRangeLocation minLocation = null;
for (final TScanRangeLocation location : scanRangeLocations.getLocations()) {
Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L);
if (assignedBytes < minAssignedBytes) {
minAssignedBytes = assignedBytes;
minLocation = location;
}
}
assignedBytesPerHost.put(minLocation.server,
assignedBytesPerHost.get(minLocation.server) + step);
Reference<Long> backendIdRef = new Reference<Long>();
TNetworkAddress execHostPort = SimpleScheduler.getHost(minLocation.backend_id,
scanRangeLocations.getLocations(), this.idToBackend, backendIdRef);
if (!execHostPort.hostname.equals(minLocation.server.hostname) ||
execHostPort.port != minLocation.server.port) {
assignedBytesPerHost.put(minLocation.server,
assignedBytesPerHost.get(minLocation.server) - step);
Long id = assignedBytesPerHost.get(execHostPort);
if (id == null) {
assignedBytesPerHost.put(execHostPort, 0L);
} else {
assignedBytesPerHost.put(execHostPort, id + step);
}
}
TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations, assignedBytesPerHost, backendIdRef);
Backend backend = this.idToBackend.get(backendIdRef.getRef());
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
Map<Integer, List<TScanRangeParams>> scanRanges = findOrInsert(assignment, execHostPort,

View File

@ -92,6 +92,38 @@ public class SimpleScheduler {
backends, locations.size()));
}
public static TScanRangeLocation getLocation(TScanRangeLocation minLocation,
List<TScanRangeLocation> locations,
ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {
if (CollectionUtils.isEmpty(locations) || backends == null || backends.isEmpty()) {
throw new UserException("scan range location or candidate backends is empty");
}
Backend backend = backends.get(minLocation.backend_id);
if (isAvailable(backend)) {
backendIdRef.setRef(minLocation.backend_id);
return minLocation;
} else {
for (TScanRangeLocation location : locations) {
if (location.backend_id == minLocation.backend_id) {
continue;
}
// choose the first alive backend(in analysis stage, the locations are random)
Backend candidateBackend = backends.get(location.backend_id);
if (isAvailable(candidateBackend)) {
backendIdRef.setRef(location.backend_id);
return location;
}
}
}
// no backend returned
throw new UserException("there is no scanNode Backend. " +
getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()),
backends, locations.size()));
}
public static TNetworkAddress getHost(ImmutableMap<Long, Backend> backends,
Reference<Long> backendIdRef)
throws UserException {

View File

@ -59,6 +59,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Maps;
import org.apache.commons.collections.map.HashedMap;
public class CoordinatorTest extends Coordinator {
static Planner planner = new Planner();
@ -469,5 +471,64 @@ public class CoordinatorTest extends Coordinator {
}
}
}
@Test
public void testGetExecHostPortForFragmentIDAndBucketSeq() {
Coordinator coordinator = new Coordinator(context, analyzer, planner);
PlanFragmentId planFragmentId = new PlanFragmentId(1);
// each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
tScanRangeLocation0.backend_id = 0;
tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
tScanRangeLocation1.backend_id = 1;
tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
tScanRangeLocation2.backend_id = 2;
tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
tScanRangeLocations.locations = new ArrayList<>();
tScanRangeLocations.locations.add(tScanRangeLocation0);
tScanRangeLocations.locations.add(tScanRangeLocation1);
tScanRangeLocations.locations.add(tScanRangeLocation2);
// init all backend
Backend backend0 = new Backend(0, "0.0.0.0", 9060);
backend0.setAlive(true);
backend0.setBePort(9050);
Backend backend1 = new Backend(1, "0.0.0.1", 9060);
backend1.setAlive(true);
backend1.setBePort(9050);
Backend backend2 = new Backend(2, "0.0.0.2", 9060);
backend2.setAlive(true);
backend2.setBePort(9050);
ImmutableMap<Long, Backend> idToBackend =
new ImmutableMap.Builder<Long, Backend>().
put(0l, backend0).
put(1l, backend1).
put(2l, backend2).build();
Deencapsulation.setField(coordinator, "idToBackend", idToBackend);
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
fragmentIdToSeqToAddressMap.put(planFragmentId, new HashedMap());
Deencapsulation.setField(coordinator, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap);
List<TScanRangeLocations> locations = new ArrayList<>();
locations.add(tScanRangeLocations);
HashMap<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
planFragmentId, 1, assignedBytesPerHost);
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
planFragmentId, 2, assignedBytesPerHost);
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq",tScanRangeLocations,
planFragmentId, 3, assignedBytesPerHost);
List<String> hosts = new ArrayList<>();
for (Map.Entry item:assignedBytesPerHost.entrySet()) {
Assert.assertTrue((Long)item.getValue() == 1);
TNetworkAddress addr = (TNetworkAddress)item.getKey();
hosts.add(addr.hostname);
}
Assert.assertTrue(hosts.size() == 3);
}
}