From f3c6ee5961dbabb65c8e2e47c159fde9ac87e59c Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Tue, 14 Mar 2023 00:13:24 +0800 Subject: [PATCH] [Enhance](ComputeNode) ES Scan node support to be scheduled to compute node (#16533) ES Scan node support to be scheduled to compute node. --- .../org/apache/doris/planner/EsScanNode.java | 50 +++---------------- .../external/ExternalFileScanNode.java | 2 +- ...licy.java => FederationBackendPolicy.java} | 10 +++- .../doris/planner/external/FileGroupInfo.java | 6 +-- .../planner/external/FileScanProviderIf.java | 2 +- .../planner/external/LoadScanProvider.java | 2 +- .../planner/external/MetadataScanNode.java | 2 +- .../planner/external/QueryScanProvider.java | 4 +- .../doris/system/BeSelectionPolicy.java | 14 ++++++ .../doris/system/SystemInfoServiceTest.java | 43 ++++++++++++++++ 10 files changed, 81 insertions(+), 54 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/planner/external/{BackendPolicy.java => FederationBackendPolicy.java} (90%) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java index 63a60f5024..a2349e7e57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -21,7 +21,6 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.EsResource; import org.apache.doris.catalog.EsTable; import org.apache.doris.catalog.PartitionInfo; @@ -37,6 +36,7 @@ import org.apache.doris.external.elasticsearch.QueryBuilders; import org.apache.doris.external.elasticsearch.QueryBuilders.BoolQueryBuilder; import org.apache.doris.external.elasticsearch.QueryBuilders.BuilderOptions; import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder; +import org.apache.doris.planner.external.FederationBackendPolicy; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TEsScanNode; @@ -49,18 +49,15 @@ import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; -import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import lombok.SneakyThrows; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -76,7 +73,6 @@ public class EsScanNode extends ScanNode { private final Random random = new Random(System.currentTimeMillis()); private Multimap backendMap; - private List backendList; private EsTablePartitions esTablePartitions; private List shardScanRanges = Lists.newArrayList(); private EsTable table; @@ -105,14 +101,12 @@ public class EsScanNode extends ScanNode { public void init(Analyzer analyzer) throws UserException { super.init(analyzer); computeColumnFilter(); - assignBackends(); computeStats(analyzer); buildQuery(); } public void init() throws UserException { computeColumnFilter(); - assignBackends(); buildQuery(); } @@ -208,20 +202,6 @@ public class EsScanNode extends ScanNode { msg.es_scan_node = esScanNode; } - private void assignBackends() throws UserException { - backendMap = HashMultimap.create(); - backendList = Lists.newArrayList(); - for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { - if (be.isAlive()) { - backendMap.put(be.getIp(), be); - backendList.add(be); - } - } - if (backendMap.isEmpty()) { - throw new UserException("No Alive backends"); - } - } - // only do partition(es index level) prune private List getShardLocations() throws UserException { // has to get partition info from es state not from table because the partition @@ -252,39 +232,23 @@ public class EsScanNode extends ScanNode { LOG.debug("partition prune finished, unpartitioned index [{}], " + "partitioned index [{}]", String.join(",", unPartitionedIndices), String.join(",", partitionedIndices)); } - int size = backendList.size(); - int beIndex = random.nextInt(size); List result = Lists.newArrayList(); for (EsShardPartitions indexState : selectedIndex) { for (List shardRouting : indexState.getShardRoutings().values()) { // get backends - Set colocatedBes = Sets.newHashSet(); - int numBe = Math.min(3, size); List shardAllocations = new ArrayList<>(); + List preLocations = new ArrayList<>(); for (EsShardRouting item : shardRouting) { shardAllocations.add(item.getHttpAddress()); + preLocations.add(item.getHttpAddress().getHostname()); } - Collections.shuffle(shardAllocations, random); - for (TNetworkAddress address : shardAllocations) { - colocatedBes.addAll(backendMap.get(address.getHostname())); - } - boolean usingRandomBackend = colocatedBes.size() == 0; - List candidateBeList = Lists.newArrayList(); - if (usingRandomBackend) { - for (int i = 0; i < numBe; ++i) { - candidateBeList.add(backendList.get(beIndex++ % size)); - } - } else { - candidateBeList.addAll(colocatedBes); - Collections.shuffle(candidateBeList); - } - - // Locations + FederationBackendPolicy backendPolicy = new FederationBackendPolicy(); + backendPolicy.init(preLocations); TScanRangeLocations locations = new TScanRangeLocations(); - for (int i = 0; i < numBe && i < candidateBeList.size(); ++i) { + for (int i = 0; i < backendPolicy.numBackends(); ++i) { TScanRangeLocation location = new TScanRangeLocation(); - Backend be = candidateBeList.get(i); + Backend be = backendPolicy.getNextBe(); location.setBackendId(be.getId()); location.setServer(new TNetworkAddress(be.getIp(), be.getBePort())); locations.addToLocations(location); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 6251045b2e..f61615a497 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -114,7 +114,7 @@ public class ExternalFileScanNode extends ExternalScanNode { } private Type type = Type.QUERY; - private final BackendPolicy backendPolicy = new BackendPolicy(); + private final FederationBackendPolicy backendPolicy = new FederationBackendPolicy(); // Only for load job. // Save all info about load attributes and files. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java similarity index 90% rename from fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java rename to fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index 3291b31741..c67f7a50bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/BackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -31,16 +31,21 @@ import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; import java.util.List; import java.util.Set; -public class BackendPolicy { - private static final Logger LOG = LogManager.getLogger(BackendPolicy.class); +public class FederationBackendPolicy { + private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class); private final List backends = Lists.newArrayList(); private int nextBe = 0; public void init() throws UserException { + init(Collections.emptyList()); + } + + public void init(List preLocations) throws UserException { Set tags = Sets.newHashSet(); if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) { String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); @@ -59,6 +64,7 @@ public class BackendPolicy { .addTags(tags) .preferComputeNode(Config.prefer_compute_node_for_external_table) .assignExpectBeNum(Config.min_backend_num_for_external_table) + .addPreLocations(preLocations) .build(); backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values())); if (backends.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java index d7d90dbec6..770ebd9a7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java @@ -159,7 +159,7 @@ public class FileGroupInfo { return hiddenColumns; } - public void getFileStatusAndCalcInstance(BackendPolicy backendPolicy) throws UserException { + public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throws UserException { if (filesAdded == 0) { throw new UserException("No source file in this table(" + targetTable.getName() + ")."); } @@ -188,7 +188,7 @@ public class FileGroupInfo { LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance); } - public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException { TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy); long curInstanceBytes = 0; @@ -242,7 +242,7 @@ public class FileGroupInfo { } protected TScanRangeLocations newLocations(TFileScanRangeParams params, BrokerDesc brokerDesc, - BackendPolicy backendPolicy) throws UserException { + FederationBackendPolicy backendPolicy) throws UserException { Backend selectedBackend = backendPolicy.getNextBe(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java index f962f4d827..d85b0eb073 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java @@ -44,7 +44,7 @@ public interface FileScanProviderIf { ParamCreateContext createContext(Analyzer analyzer) throws UserException; - void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException; int getInputSplitNum(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java index 086191e94d..17051061a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java @@ -138,7 +138,7 @@ public class LoadScanProvider implements FileScanProviderIf { } @Override - public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException { Preconditions.checkNotNull(fileGroupInfo); fileGroupInfo.getFileStatusAndCalcInstance(backendPolicy); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java index de4f0cdb56..aa9c840197 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java @@ -47,7 +47,7 @@ public class MetadataScanNode extends ScanNode { private List scanRangeLocations = Lists.newArrayList(); - private final BackendPolicy backendPolicy = new BackendPolicy(); + private final FederationBackendPolicy backendPolicy = new FederationBackendPolicy(); public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, MetadataTableValuedFunction tvf) { super(id, desc, "METADATA_SCAN_NODE", StatisticalType.METADATA_SCAN_NODE); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java index 45e48c4ff5..6636642ed5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java @@ -59,7 +59,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf { public abstract TFileAttributes getFileAttributes() throws UserException; @Override - public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy, + public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy, List scanRangeLocations) throws UserException { long start = System.currentTimeMillis(); List inputSplits = splitter.getSplits(context.conjuncts); @@ -151,7 +151,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf { return this.inputFileSize; } - private TScanRangeLocations newLocations(TFileScanRangeParams params, BackendPolicy backendPolicy) { + private TScanRangeLocations newLocations(TFileScanRangeParams params, FederationBackendPolicy backendPolicy) { // Generate on file scan range TFileScanRange fileScanRange = new TFileScanRange(); fileScanRange.setParams(params); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 3f3db6d78c..9fbd3c6104 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -49,6 +49,8 @@ public class BeSelectionPolicy { public boolean preferComputeNode = false; public int expectBeNum = 0; + public List preferredLocations = new ArrayList<>(); + private BeSelectionPolicy() { } @@ -110,6 +112,11 @@ public class BeSelectionPolicy { return this; } + public Builder addPreLocations(List preferredLocations) { + policy.preferredLocations.addAll(preferredLocations); + return this; + } + public BeSelectionPolicy build() { return policy; } @@ -141,6 +148,13 @@ public class BeSelectionPolicy { public List getCandidateBackends(ImmutableCollection backends) { List filterBackends = backends.stream().filter(this::isMatch).collect(Collectors.toList()); + List preLocationFilterBackends = filterBackends.stream() + .filter(iterm -> preferredLocations.contains(iterm.getHostName())).collect(Collectors.toList()); + // If preLocations were chosen, use the preLocation backends. Otherwise we just ignore this filter. + if (!preLocationFilterBackends.isEmpty()) { + filterBackends = preLocationFilterBackends; + } + Collections.shuffle(filterBackends); List candidates = new ArrayList<>(); if (preferComputeNode) { int num = 0; diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index ec9ce6301d..1ac62abfa9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -34,6 +34,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -272,6 +273,48 @@ public class SystemInfoServiceTest { Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy07, 3).size()); } + @Test + public void testPreferLocationsSelect() throws Exception { + Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga"); + + // add more backends + addBackend(10002, "192.168.1.2", 9050); + Backend be2 = infoService.getBackend(10002); + be2.setAlive(true); + addBackend(10003, "192.168.1.3", 9050); + Backend be3 = infoService.getBackend(10003); + be3.setAlive(true); + addBackend(10004, "192.168.1.4", 9050); + Backend be4 = infoService.getBackend(10004); + be4.setAlive(true); + addBackend(10005, "192.168.1.5", 9050); + Backend be5 = infoService.getBackend(10005); + be5.setAlive(true); + + setComputeNode(be5, taga); + + List preferLocations = new ArrayList<>(); + preferLocations.add("192.168.1.2"); + BeSelectionPolicy policy1 = new BeSelectionPolicy.Builder().addPreLocations(preferLocations).build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy1, 1).size()); + preferLocations.add("192.168.1.3"); + BeSelectionPolicy policy2 = new BeSelectionPolicy.Builder().addPreLocations(preferLocations).build(); + + Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy2, 2).size()); + + // only one preferLocations + preferLocations.clear(); + preferLocations.add("192.168.1.4"); + BeSelectionPolicy policy3 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) + .addPreLocations(preferLocations).preferComputeNode(true).assignExpectBeNum(3).build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3, 1).size()); + + preferLocations.add("192.168.1.5"); + BeSelectionPolicy policy4 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)) + .addPreLocations(preferLocations).preferComputeNode(true).assignExpectBeNum(1).build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4, 1).size()); + } + @Test public void testSelectBackendIdsForReplicaCreation() throws Exception { addBackend(10001, "192.168.1.1", 9050);