[Enhancement] Optimize the algorithm of selecting host for a bucket scan task when a backend not alive (#5133)
This commit is contained in:
@ -1560,14 +1560,22 @@ public class Coordinator {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1);
|
||||
Reference<Long> backendIdRef = new Reference<Long>();
|
||||
TNetworkAddress execHostPort = SimpleScheduler.getHost(buckendId, seqLocation.locations, idToBackend, backendIdRef);
|
||||
if (execHostPort == null) {
|
||||
throw new UserException("there is no scanNode Backend");
|
||||
}
|
||||
|
||||
//the backend with buckendId is not alive, chose another new backend
|
||||
if (backendIdRef.getRef() != buckendId) {
|
||||
//buckendIdToBucketCountMap does not contain the new backend, insert into it
|
||||
if (!buckendIdToBucketCountMap.containsKey(backendIdRef.getRef())) {
|
||||
buckendIdToBucketCountMap.put(backendIdRef.getRef(), 1);
|
||||
} else { //buckendIdToBucketCountMap contains the new backend, update it
|
||||
buckendIdToBucketCountMap.put(backendIdRef.getRef(), buckendIdToBucketCountMap.get(backendIdRef.getRef()) + 1);
|
||||
}
|
||||
} else { //the backend with buckendId is alive, update buckendIdToBucketCountMap directly
|
||||
buckendIdToBucketCountMap.put(buckendId, buckendIdToBucketCountMap.get(buckendId) + 1);
|
||||
}
|
||||
addressToBackendID.put(execHostPort, backendIdRef.getRef());
|
||||
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
|
||||
}
|
||||
|
||||
@ -49,6 +49,7 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
@ -530,5 +531,69 @@ public class CoordinatorTest extends Coordinator {
|
||||
}
|
||||
Assert.assertTrue(hosts.size() == 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBucketShuffleWithUnaliveBackend() {
|
||||
Coordinator coordinator = new Coordinator(context, analyzer, planner);
|
||||
PlanFragmentId planFragmentId = new PlanFragmentId(1);
|
||||
// each olaptable bucket have the same TScanRangeLocations, be id is {0, 1, 2}
|
||||
TScanRangeLocations tScanRangeLocations = new TScanRangeLocations();
|
||||
TScanRangeLocation tScanRangeLocation0 = new TScanRangeLocation();
|
||||
tScanRangeLocation0.backend_id = 0;
|
||||
tScanRangeLocation0.server = new TNetworkAddress("0.0.0.0", 9050);
|
||||
TScanRangeLocation tScanRangeLocation1 = new TScanRangeLocation();
|
||||
tScanRangeLocation1.backend_id = 1;
|
||||
tScanRangeLocation1.server = new TNetworkAddress("0.0.0.1", 9050);
|
||||
TScanRangeLocation tScanRangeLocation2 = new TScanRangeLocation();
|
||||
tScanRangeLocation2.backend_id = 2;
|
||||
tScanRangeLocation2.server = new TNetworkAddress("0.0.0.2", 9050);
|
||||
tScanRangeLocations.locations = new ArrayList<>();
|
||||
tScanRangeLocations.locations.add(tScanRangeLocation0);
|
||||
tScanRangeLocations.locations.add(tScanRangeLocation1);
|
||||
tScanRangeLocations.locations.add(tScanRangeLocation2);
|
||||
|
||||
// init all backend
|
||||
Backend backend0 = new Backend(0, "0.0.0.0", 9060);
|
||||
backend0.setAlive(false);
|
||||
backend0.setBePort(9050);
|
||||
Backend backend1 = new Backend(1, "0.0.0.1", 9060);
|
||||
backend1.setAlive(true);
|
||||
backend1.setBePort(9050);
|
||||
Backend backend2 = new Backend(2, "0.0.0.2", 9060);
|
||||
backend2.setAlive(true);
|
||||
backend2.setBePort(9050);
|
||||
Map<TNetworkAddress, Long> addressToBackendID = Maps.newHashMap();
|
||||
addressToBackendID.put(tScanRangeLocation0.server, tScanRangeLocation0.backend_id);
|
||||
addressToBackendID.put(tScanRangeLocation1.server, tScanRangeLocation1.backend_id);
|
||||
addressToBackendID.put(tScanRangeLocation2.server, tScanRangeLocation2.backend_id);
|
||||
|
||||
ImmutableMap<Long, Backend> idToBackend =
|
||||
new ImmutableMap.Builder<Long, Backend>().
|
||||
put(0l, backend0).
|
||||
put(1l, backend1).
|
||||
put(2l, backend2).build();
|
||||
Map<PlanFragmentId, Map<Long, Integer>> fragmentIdToBuckendIdBucketCountMap = Maps.newHashMap();
|
||||
Map<Long, Integer> backendIdBucketCountMap = new HashMap<Long, Integer>();
|
||||
fragmentIdToBuckendIdBucketCountMap.put(planFragmentId, backendIdBucketCountMap);
|
||||
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds = new HashMap<>();
|
||||
BucketShuffleJoinController controller = new BucketShuffleJoinController(fragmentIdToScanNodeIds);
|
||||
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> fragmentIdToSeqToAddressMap = Maps.newHashMap();
|
||||
fragmentIdToSeqToAddressMap.put(planFragmentId, new HashMap<Integer, TNetworkAddress>());
|
||||
Deencapsulation.setField(controller, "fragmentIdToBuckendIdBucketCountMap", fragmentIdToBuckendIdBucketCountMap);
|
||||
Deencapsulation.setField(controller, "fragmentIdToSeqToAddressMap", fragmentIdToSeqToAddressMap);
|
||||
Deencapsulation.invoke(controller, "getExecHostPortForFragmentIDAndBucketSeq",
|
||||
tScanRangeLocations, planFragmentId, 1, idToBackend, addressToBackendID);
|
||||
Assert.assertTrue(backendIdBucketCountMap.size() == 2);
|
||||
List<Long> backendIds = new ArrayList<Long>();
|
||||
List<Integer> counts = new ArrayList<Integer>();
|
||||
for (Map.Entry<Long, Integer> item:backendIdBucketCountMap.entrySet()) {
|
||||
backendIds.add(item.getKey());
|
||||
counts.add(item.getValue());
|
||||
}
|
||||
Assert.assertTrue(backendIds.get(0) == 0);
|
||||
Assert.assertTrue(counts.get(0) == 0);
|
||||
Assert.assertTrue(backendIds.get(1) == 1);
|
||||
Assert.assertTrue(counts.get(1) == 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user