[opt](Nereids) opt locality under multi-replica (#34927)
Make tablet locality fixed under multi-replica cases. Session variable: set enable_ordered_scan_range_locations = true, default false; 3 replica tpcds 100g: 7% improvement
This commit is contained in:
@ -144,6 +144,7 @@ import java.security.SecureRandom;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -2264,7 +2265,13 @@ public class Coordinator implements CoordInterface {
|
||||
private void computeScanRangeAssignment() throws Exception {
|
||||
Map<TNetworkAddress, Long> assignedBytesPerHost = Maps.newHashMap();
|
||||
Map<TNetworkAddress, Long> replicaNumPerHost = getReplicaNumPerHostForOlapTable();
|
||||
Collections.shuffle(scanNodes);
|
||||
boolean isAllOlapTables = scanNodes.stream().allMatch(e -> e instanceof OlapScanNode);
|
||||
boolean isEnableOrderedLocations = ConnectContext.get() != null
|
||||
&& ConnectContext.get().getSessionVariable().enableOrderedScanRangeLocations
|
||||
&& isAllOlapTables;
|
||||
if (isEnableOrderedLocations) {
|
||||
sortScanNodes();
|
||||
}
|
||||
// set scan ranges/locations for scan nodes
|
||||
for (ScanNode scanNode : scanNodes) {
|
||||
if (!(scanNode instanceof ExternalScanNode)) {
|
||||
@ -2277,6 +2284,9 @@ public class Coordinator implements CoordInterface {
|
||||
// only analysis olap scan node
|
||||
continue;
|
||||
}
|
||||
if (isEnableOrderedLocations) {
|
||||
sortScanRangeLocations(locations);
|
||||
}
|
||||
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.computeIfAbsent(scanNode.getFragmentId(),
|
||||
k -> Sets.newHashSet());
|
||||
scanNodeIds.add(scanNode.getId().asInt());
|
||||
@ -2296,7 +2306,8 @@ public class Coordinator implements CoordInterface {
|
||||
// A fragment may contain both colocate join and bucket shuffle join
|
||||
// on need both compute scanRange to init basic data for query coordinator
|
||||
if (fragmentContainsColocateJoin) {
|
||||
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost);
|
||||
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost,
|
||||
replicaNumPerHost, isEnableOrderedLocations);
|
||||
}
|
||||
if (fragmentContainsBucketShuffleJoin) {
|
||||
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode,
|
||||
@ -2304,7 +2315,7 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) {
|
||||
computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost,
|
||||
replicaNumPerHost);
|
||||
replicaNumPerHost, isEnableOrderedLocations);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2312,7 +2323,7 @@ public class Coordinator implements CoordInterface {
|
||||
// To ensure the same bucketSeq tablet to the same execHostPort
|
||||
private void computeScanRangeAssignmentByColocate(
|
||||
final OlapScanNode scanNode, Map<TNetworkAddress, Long> assignedBytesPerHost,
|
||||
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
|
||||
Map<TNetworkAddress, Long> replicaNumPerHost, boolean isEnableOrderedLocations) throws Exception {
|
||||
if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
|
||||
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>());
|
||||
fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
|
||||
@ -2333,7 +2344,8 @@ public class Coordinator implements CoordInterface {
|
||||
List<TScanRangeLocations> locations = scanNode.bucketSeq2locations.get(bucketSeq);
|
||||
if (!bucketSeqToAddress.containsKey(bucketSeq)) {
|
||||
getExecHostPortForFragmentIDAndBucketSeq(locations.get(0),
|
||||
scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost, replicaNumPerHost);
|
||||
scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost,
|
||||
replicaNumPerHost, isEnableOrderedLocations);
|
||||
}
|
||||
|
||||
for (TScanRangeLocations location : locations) {
|
||||
@ -2355,29 +2367,55 @@ public class Coordinator implements CoordInterface {
|
||||
//ensure bucket sequence distribued to every host evenly
|
||||
private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation,
|
||||
PlanFragmentId fragmentId, Integer bucketSeq, Map<TNetworkAddress, Long> assignedBytesPerHost,
|
||||
Map<TNetworkAddress, Long> replicaNumPerHost)
|
||||
Map<TNetworkAddress, Long> replicaNumPerHost, boolean isEnableOrderedLocations)
|
||||
throws Exception {
|
||||
Reference<Long> backendIdRef = new Reference<Long>();
|
||||
selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, replicaNumPerHost, backendIdRef);
|
||||
selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, replicaNumPerHost,
|
||||
backendIdRef, isEnableOrderedLocations);
|
||||
Backend backend = this.idToBackend.get(backendIdRef.getRef());
|
||||
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
|
||||
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
|
||||
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
|
||||
}
|
||||
|
||||
private void sortScanNodes() {
|
||||
Collections.sort(scanNodes, new Comparator<ScanNode>() {
|
||||
@Override
|
||||
public int compare(ScanNode s1, ScanNode s2) {
|
||||
return java.lang.Integer.compare(s1.getId().asInt(), s2.getId().asInt());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void sortScanRangeLocations(List<TScanRangeLocations> locations) {
|
||||
Collections.sort(locations, new Comparator<TScanRangeLocations>() {
|
||||
@Override
|
||||
public int compare(TScanRangeLocations o1, TScanRangeLocations o2) {
|
||||
return org.apache.thrift.TBaseHelper.compareTo(
|
||||
o1.getScanRange().getPaloScanRange().tablet_id,
|
||||
o2.getScanRange().getPaloScanRange().tablet_id);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation,
|
||||
Map<TNetworkAddress, Long> assignedBytesPerHost,
|
||||
Map<TNetworkAddress, Long> replicaNumPerHost,
|
||||
Reference<Long> backendIdRef) throws UserException {
|
||||
Reference<Long> backendIdRef,
|
||||
boolean isEnableOrderedLocations) throws UserException {
|
||||
List<TScanRangeLocation> locations = seqLocation.getLocations();
|
||||
if (isEnableOrderedLocations) {
|
||||
Collections.sort(locations);
|
||||
}
|
||||
if (!Config.enable_local_replica_selection) {
|
||||
return selectBackendsByRoundRobin(seqLocation.getLocations(), assignedBytesPerHost, replicaNumPerHost,
|
||||
return selectBackendsByRoundRobin(locations, assignedBytesPerHost, replicaNumPerHost,
|
||||
backendIdRef);
|
||||
}
|
||||
|
||||
List<TScanRangeLocation> localLocations = new ArrayList<>();
|
||||
List<TScanRangeLocation> nonlocalLocations = new ArrayList<>();
|
||||
long localBeId = Env.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
|
||||
for (final TScanRangeLocation location : seqLocation.getLocations()) {
|
||||
for (final TScanRangeLocation location : locations) {
|
||||
if (location.backend_id == localBeId) {
|
||||
localLocations.add(location);
|
||||
} else {
|
||||
@ -2395,14 +2433,15 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
|
||||
public TScanRangeLocation selectBackendsByRoundRobin(List<TScanRangeLocation> locations,
|
||||
public TScanRangeLocation selectBackendsByRoundRobin(List<TScanRangeLocation> sortedLocations,
|
||||
Map<TNetworkAddress, Long> assignedBytesPerHost, Map<TNetworkAddress, Long> replicaNumPerHost,
|
||||
Reference<Long> backendIdRef) throws UserException {
|
||||
Long minAssignedBytes = Long.MAX_VALUE;
|
||||
Long minReplicaNum = Long.MAX_VALUE;
|
||||
TScanRangeLocation minLocation = null;
|
||||
Long step = 1L;
|
||||
for (final TScanRangeLocation location : locations) {
|
||||
|
||||
for (final TScanRangeLocation location : sortedLocations) {
|
||||
Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L);
|
||||
if (assignedBytes < minAssignedBytes || (assignedBytes.equals(minAssignedBytes)
|
||||
&& replicaNumPerHost.get(location.server) < minReplicaNum)) {
|
||||
@ -2411,10 +2450,10 @@ public class Coordinator implements CoordInterface {
|
||||
minLocation = location;
|
||||
}
|
||||
}
|
||||
for (TScanRangeLocation location : locations) {
|
||||
for (TScanRangeLocation location : sortedLocations) {
|
||||
replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) - 1);
|
||||
}
|
||||
TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, locations,
|
||||
TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, sortedLocations,
|
||||
this.idToBackend, backendIdRef);
|
||||
assignedBytesPerHost.put(location.server, assignedBytesPerHost.get(location.server) + step);
|
||||
|
||||
@ -2426,7 +2465,8 @@ public class Coordinator implements CoordInterface {
|
||||
final List<TScanRangeLocations> locations,
|
||||
FragmentScanRangeAssignment assignment,
|
||||
Map<TNetworkAddress, Long> assignedBytesPerHost,
|
||||
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
|
||||
Map<TNetworkAddress, Long> replicaNumPerHost,
|
||||
boolean isEnableOrderedLocations) throws Exception {
|
||||
// Type of locations is List, it could have elements that have same "location"
|
||||
// and we do have this situation for some scan node.
|
||||
// The duplicate "location" will NOT be filtered by FragmentScanRangeAssignment,
|
||||
@ -2435,7 +2475,7 @@ public class Coordinator implements CoordInterface {
|
||||
for (TScanRangeLocations scanRangeLocations : locations) {
|
||||
Reference<Long> backendIdRef = new Reference<Long>();
|
||||
TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations,
|
||||
assignedBytesPerHost, replicaNumPerHost, backendIdRef);
|
||||
assignedBytesPerHost, replicaNumPerHost, backendIdRef, isEnableOrderedLocations);
|
||||
Backend backend = this.idToBackend.get(backendIdRef.getRef());
|
||||
TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
|
||||
this.addressToBackendID.put(execHostPort, backendIdRef.getRef());
|
||||
|
||||
@ -420,6 +420,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
*/
|
||||
public static final String ENABLE_UNIFIED_LOAD = "enable_unified_load";
|
||||
|
||||
public static final String ENABLE_ORDERED_SCAN_RANGE_LOCATIONS = "enable_ordered_scan_range_locations";
|
||||
|
||||
public static final String ENABLE_PARQUET_LAZY_MAT = "enable_parquet_lazy_materialization";
|
||||
|
||||
public static final String ENABLE_ORC_LAZY_MAT = "enable_orc_lazy_materialization";
|
||||
@ -1468,6 +1470,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_CTE_MATERIALIZE)
|
||||
public boolean enableCTEMaterialize = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_ORDERED_SCAN_RANGE_LOCATIONS)
|
||||
public boolean enableOrderedScanRangeLocations = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_ANALYZE_COMPLEX_TYPE_COLUMN)
|
||||
public boolean enableAnalyzeComplexTypeColumn = false;
|
||||
|
||||
|
||||
@ -593,7 +593,7 @@ public class CoordinatorTest extends Coordinator {
|
||||
locations.add(tScanRangeLocations);
|
||||
locations.add(tScanRangeLocations1);
|
||||
Deencapsulation.invoke(coordinator, "computeScanRangeAssignmentByScheduler",
|
||||
olapScanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost);
|
||||
olapScanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost, false);
|
||||
for (Map.Entry entry : assignment.entrySet()) {
|
||||
Map<Integer, List<TScanRangeParams>> addr = (HashMap<Integer, List<TScanRangeParams>>) entry.getValue();
|
||||
for (Map.Entry item : addr.entrySet()) {
|
||||
@ -653,11 +653,11 @@ public class CoordinatorTest extends Coordinator {
|
||||
replicaNumPerHost.put(tScanRangeLocation2.server, 1L);
|
||||
|
||||
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
|
||||
planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost);
|
||||
planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost, false);
|
||||
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
|
||||
planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost);
|
||||
planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost, false);
|
||||
Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations,
|
||||
planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost);
|
||||
planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost, false);
|
||||
List<String> hosts = new ArrayList<>();
|
||||
for (Map.Entry item : assignedBytesPerHost.entrySet()) {
|
||||
Assert.assertTrue((Long) item.getValue() == 1);
|
||||
|
||||
Reference in New Issue
Block a user