diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1c272b1dbb..ba7ea54b09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -144,6 +144,7 @@ import java.security.SecureRandom; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -2264,7 +2265,13 @@ public class Coordinator implements CoordInterface { private void computeScanRangeAssignment() throws Exception { Map assignedBytesPerHost = Maps.newHashMap(); Map replicaNumPerHost = getReplicaNumPerHostForOlapTable(); - Collections.shuffle(scanNodes); + boolean isAllOlapTables = scanNodes.stream().allMatch(e -> e instanceof OlapScanNode); + boolean isEnableOrderedLocations = ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().enableOrderedScanRangeLocations + && isAllOlapTables; + if (isEnableOrderedLocations) { + sortScanNodes(); + } // set scan ranges/locations for scan nodes for (ScanNode scanNode : scanNodes) { if (!(scanNode instanceof ExternalScanNode)) { @@ -2277,6 +2284,9 @@ public class Coordinator implements CoordInterface { // only analysis olap scan node continue; } + if (isEnableOrderedLocations) { + sortScanRangeLocations(locations); + } Set scanNodeIds = fragmentIdToScanNodeIds.computeIfAbsent(scanNode.getFragmentId(), k -> Sets.newHashSet()); scanNodeIds.add(scanNode.getId().asInt()); @@ -2296,7 +2306,8 @@ public class Coordinator implements CoordInterface { // A fragment may contain both colocate join and bucket shuffle join // on need both compute scanRange to init basic data for query coordinator if (fragmentContainsColocateJoin) { - computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); + computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, + replicaNumPerHost, isEnableOrderedLocations); } if (fragmentContainsBucketShuffleJoin) { bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, @@ -2304,7 +2315,7 @@ public class Coordinator implements CoordInterface { } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, - replicaNumPerHost); + replicaNumPerHost, isEnableOrderedLocations); } } } @@ -2312,7 +2323,7 @@ public class Coordinator implements CoordInterface { // To ensure the same bucketSeq tablet to the same execHostPort private void computeScanRangeAssignmentByColocate( final OlapScanNode scanNode, Map assignedBytesPerHost, - Map replicaNumPerHost) throws Exception { + Map replicaNumPerHost, boolean isEnableOrderedLocations) throws Exception { if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); @@ -2333,7 +2344,8 @@ public class Coordinator implements CoordInterface { List locations = scanNode.bucketSeq2locations.get(bucketSeq); if (!bucketSeqToAddress.containsKey(bucketSeq)) { getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), - scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost, replicaNumPerHost); + scanNode.getFragmentId(), bucketSeq, assignedBytesPerHost, + replicaNumPerHost, isEnableOrderedLocations); } for (TScanRangeLocations location : locations) { @@ -2355,29 +2367,55 @@ public class Coordinator implements CoordInterface { //ensure bucket sequence distribued to every host evenly private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLocation, PlanFragmentId fragmentId, Integer bucketSeq, Map assignedBytesPerHost, - Map replicaNumPerHost) + Map replicaNumPerHost, boolean isEnableOrderedLocations) throws Exception { Reference backendIdRef = new Reference(); - selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, replicaNumPerHost, backendIdRef); + selectBackendsByRoundRobin(seqLocation, assignedBytesPerHost, replicaNumPerHost, + backendIdRef, isEnableOrderedLocations); Backend backend = this.idToBackend.get(backendIdRef.getRef()); TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } + private void sortScanNodes() { + Collections.sort(scanNodes, new Comparator() { + @Override + public int compare(ScanNode s1, ScanNode s2) { + return java.lang.Integer.compare(s1.getId().asInt(), s2.getId().asInt()); + } + }); + } + + private void sortScanRangeLocations(List locations) { + Collections.sort(locations, new Comparator() { + @Override + public int compare(TScanRangeLocations o1, TScanRangeLocations o2) { + return org.apache.thrift.TBaseHelper.compareTo( + o1.getScanRange().getPaloScanRange().tablet_id, + o2.getScanRange().getPaloScanRange().tablet_id); + } + }); + } + public TScanRangeLocation selectBackendsByRoundRobin(TScanRangeLocations seqLocation, Map assignedBytesPerHost, Map replicaNumPerHost, - Reference backendIdRef) throws UserException { + Reference backendIdRef, + boolean isEnableOrderedLocations) throws UserException { + List locations = seqLocation.getLocations(); + if (isEnableOrderedLocations) { + Collections.sort(locations); + } if (!Config.enable_local_replica_selection) { - return selectBackendsByRoundRobin(seqLocation.getLocations(), assignedBytesPerHost, replicaNumPerHost, + return selectBackendsByRoundRobin(locations, assignedBytesPerHost, replicaNumPerHost, backendIdRef); } List localLocations = new ArrayList<>(); List nonlocalLocations = new ArrayList<>(); long localBeId = Env.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress()); - for (final TScanRangeLocation location : seqLocation.getLocations()) { + for (final TScanRangeLocation location : locations) { if (location.backend_id == localBeId) { localLocations.add(location); } else { @@ -2395,14 +2433,15 @@ public class Coordinator implements CoordInterface { } } - public TScanRangeLocation selectBackendsByRoundRobin(List locations, + public TScanRangeLocation selectBackendsByRoundRobin(List sortedLocations, Map assignedBytesPerHost, Map replicaNumPerHost, Reference backendIdRef) throws UserException { Long minAssignedBytes = Long.MAX_VALUE; Long minReplicaNum = Long.MAX_VALUE; TScanRangeLocation minLocation = null; Long step = 1L; - for (final TScanRangeLocation location : locations) { + + for (final TScanRangeLocation location : sortedLocations) { Long assignedBytes = findOrInsert(assignedBytesPerHost, location.server, 0L); if (assignedBytes < minAssignedBytes || (assignedBytes.equals(minAssignedBytes) && replicaNumPerHost.get(location.server) < minReplicaNum)) { @@ -2411,10 +2450,10 @@ public class Coordinator implements CoordInterface { minLocation = location; } } - for (TScanRangeLocation location : locations) { + for (TScanRangeLocation location : sortedLocations) { replicaNumPerHost.put(location.server, replicaNumPerHost.get(location.server) - 1); } - TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, locations, + TScanRangeLocation location = SimpleScheduler.getLocation(minLocation, sortedLocations, this.idToBackend, backendIdRef); assignedBytesPerHost.put(location.server, assignedBytesPerHost.get(location.server) + step); @@ -2426,7 +2465,8 @@ public class Coordinator implements CoordInterface { final List locations, FragmentScanRangeAssignment assignment, Map assignedBytesPerHost, - Map replicaNumPerHost) throws Exception { + Map replicaNumPerHost, + boolean isEnableOrderedLocations) throws Exception { // Type of locations is List, it could have elements that have same "location" // and we do have this situation for some scan node. // The duplicate "location" will NOT be filtered by FragmentScanRangeAssignment, @@ -2435,7 +2475,7 @@ public class Coordinator implements CoordInterface { for (TScanRangeLocations scanRangeLocations : locations) { Reference backendIdRef = new Reference(); TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations, - assignedBytesPerHost, replicaNumPerHost, backendIdRef); + assignedBytesPerHost, replicaNumPerHost, backendIdRef, isEnableOrderedLocations); Backend backend = this.idToBackend.get(backendIdRef.getRef()); TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort()); this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2af7725e02..ac653b82ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -420,6 +420,8 @@ public class SessionVariable implements Serializable, Writable { */ public static final String ENABLE_UNIFIED_LOAD = "enable_unified_load"; + public static final String ENABLE_ORDERED_SCAN_RANGE_LOCATIONS = "enable_ordered_scan_range_locations"; + public static final String ENABLE_PARQUET_LAZY_MAT = "enable_parquet_lazy_materialization"; public static final String ENABLE_ORC_LAZY_MAT = "enable_orc_lazy_materialization"; @@ -1468,6 +1470,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_CTE_MATERIALIZE) public boolean enableCTEMaterialize = true; + @VariableMgr.VarAttr(name = ENABLE_ORDERED_SCAN_RANGE_LOCATIONS) + public boolean enableOrderedScanRangeLocations = false; + @VariableMgr.VarAttr(name = ENABLE_ANALYZE_COMPLEX_TYPE_COLUMN) public boolean enableAnalyzeComplexTypeColumn = false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index ca8109e40e..4c38ddd274 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -593,7 +593,7 @@ public class CoordinatorTest extends Coordinator { locations.add(tScanRangeLocations); locations.add(tScanRangeLocations1); Deencapsulation.invoke(coordinator, "computeScanRangeAssignmentByScheduler", - olapScanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost); + olapScanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost, false); for (Map.Entry entry : assignment.entrySet()) { Map> addr = (HashMap>) entry.getValue(); for (Map.Entry item : addr.entrySet()) { @@ -653,11 +653,11 @@ public class CoordinatorTest extends Coordinator { replicaNumPerHost.put(tScanRangeLocation2.server, 1L); Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost); + planFragmentId, 1, assignedBytesPerHost, replicaNumPerHost, false); Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost); + planFragmentId, 2, assignedBytesPerHost, replicaNumPerHost, false); Deencapsulation.invoke(coordinator, "getExecHostPortForFragmentIDAndBucketSeq", tScanRangeLocations, - planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost); + planFragmentId, 3, assignedBytesPerHost, replicaNumPerHost, false); List hosts = new ArrayList<>(); for (Map.Entry item : assignedBytesPerHost.entrySet()) { Assert.assertTrue((Long) item.getValue() == 1);