[Enhance](ComputeNode) ES Scan node support to be scheduled to compute node (#16533)

ES Scan node support to be scheduled to compute node.
This commit is contained in:
huangzhaowei
2023-03-14 00:13:24 +08:00
committed by GitHub
parent 9b7596f1c6
commit f3c6ee5961
10 changed files with 81 additions and 54 deletions

View File

@ -21,7 +21,6 @@ import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.PartitionInfo;
@ -37,6 +36,7 @@ import org.apache.doris.external.elasticsearch.QueryBuilders;
import org.apache.doris.external.elasticsearch.QueryBuilders.BoolQueryBuilder;
import org.apache.doris.external.elasticsearch.QueryBuilders.BuilderOptions;
import org.apache.doris.external.elasticsearch.QueryBuilders.QueryBuilder;
import org.apache.doris.planner.external.FederationBackendPolicy;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TEsScanNode;
@ -49,18 +49,15 @@ import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -76,7 +73,6 @@ public class EsScanNode extends ScanNode {
private final Random random = new Random(System.currentTimeMillis());
private Multimap<String, Backend> backendMap;
private List<Backend> backendList;
private EsTablePartitions esTablePartitions;
private List<TScanRangeLocations> shardScanRanges = Lists.newArrayList();
private EsTable table;
@ -105,14 +101,12 @@ public class EsScanNode extends ScanNode {
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
computeColumnFilter();
assignBackends();
computeStats(analyzer);
buildQuery();
}
public void init() throws UserException {
computeColumnFilter();
assignBackends();
buildQuery();
}
@ -208,20 +202,6 @@ public class EsScanNode extends ScanNode {
msg.es_scan_node = esScanNode;
}
private void assignBackends() throws UserException {
backendMap = HashMultimap.create();
backendList = Lists.newArrayList();
for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) {
if (be.isAlive()) {
backendMap.put(be.getIp(), be);
backendList.add(be);
}
}
if (backendMap.isEmpty()) {
throw new UserException("No Alive backends");
}
}
// only do partition(es index level) prune
private List<TScanRangeLocations> getShardLocations() throws UserException {
// has to get partition info from es state not from table because the partition
@ -252,39 +232,23 @@ public class EsScanNode extends ScanNode {
LOG.debug("partition prune finished, unpartitioned index [{}], " + "partitioned index [{}]",
String.join(",", unPartitionedIndices), String.join(",", partitionedIndices));
}
int size = backendList.size();
int beIndex = random.nextInt(size);
List<TScanRangeLocations> result = Lists.newArrayList();
for (EsShardPartitions indexState : selectedIndex) {
for (List<EsShardRouting> shardRouting : indexState.getShardRoutings().values()) {
// get backends
Set<Backend> colocatedBes = Sets.newHashSet();
int numBe = Math.min(3, size);
List<TNetworkAddress> shardAllocations = new ArrayList<>();
List<String> preLocations = new ArrayList<>();
for (EsShardRouting item : shardRouting) {
shardAllocations.add(item.getHttpAddress());
preLocations.add(item.getHttpAddress().getHostname());
}
Collections.shuffle(shardAllocations, random);
for (TNetworkAddress address : shardAllocations) {
colocatedBes.addAll(backendMap.get(address.getHostname()));
}
boolean usingRandomBackend = colocatedBes.size() == 0;
List<Backend> candidateBeList = Lists.newArrayList();
if (usingRandomBackend) {
for (int i = 0; i < numBe; ++i) {
candidateBeList.add(backendList.get(beIndex++ % size));
}
} else {
candidateBeList.addAll(colocatedBes);
Collections.shuffle(candidateBeList);
}
// Locations
FederationBackendPolicy backendPolicy = new FederationBackendPolicy();
backendPolicy.init(preLocations);
TScanRangeLocations locations = new TScanRangeLocations();
for (int i = 0; i < numBe && i < candidateBeList.size(); ++i) {
for (int i = 0; i < backendPolicy.numBackends(); ++i) {
TScanRangeLocation location = new TScanRangeLocation();
Backend be = candidateBeList.get(i);
Backend be = backendPolicy.getNextBe();
location.setBackendId(be.getId());
location.setServer(new TNetworkAddress(be.getIp(), be.getBePort()));
locations.addToLocations(location);

View File

@ -114,7 +114,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
}
private Type type = Type.QUERY;
private final BackendPolicy backendPolicy = new BackendPolicy();
private final FederationBackendPolicy backendPolicy = new FederationBackendPolicy();
// Only for load job.
// Save all info about load attributes and files.

View File

@ -31,16 +31,21 @@ import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Set;
public class BackendPolicy {
private static final Logger LOG = LogManager.getLogger(BackendPolicy.class);
public class FederationBackendPolicy {
private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class);
private final List<Backend> backends = Lists.newArrayList();
private int nextBe = 0;
public void init() throws UserException {
init(Collections.emptyList());
}
public void init(List<String> preLocations) throws UserException {
Set<Tag> tags = Sets.newHashSet();
if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
@ -59,6 +64,7 @@ public class BackendPolicy {
.addTags(tags)
.preferComputeNode(Config.prefer_compute_node_for_external_table)
.assignExpectBeNum(Config.min_backend_num_for_external_table)
.addPreLocations(preLocations)
.build();
backends.addAll(policy.getCandidateBackends(Env.getCurrentSystemInfo().getIdToBackend().values()));
if (backends.isEmpty()) {

View File

@ -159,7 +159,7 @@ public class FileGroupInfo {
return hiddenColumns;
}
public void getFileStatusAndCalcInstance(BackendPolicy backendPolicy) throws UserException {
public void getFileStatusAndCalcInstance(FederationBackendPolicy backendPolicy) throws UserException {
if (filesAdded == 0) {
throw new UserException("No source file in this table(" + targetTable.getName() + ").");
}
@ -188,7 +188,7 @@ public class FileGroupInfo {
LOG.info("number instance of file scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance);
}
public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy,
public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
TScanRangeLocations curLocations = newLocations(context.params, brokerDesc, backendPolicy);
long curInstanceBytes = 0;
@ -242,7 +242,7 @@ public class FileGroupInfo {
}
protected TScanRangeLocations newLocations(TFileScanRangeParams params, BrokerDesc brokerDesc,
BackendPolicy backendPolicy) throws UserException {
FederationBackendPolicy backendPolicy) throws UserException {
Backend selectedBackend = backendPolicy.getNextBe();

View File

@ -44,7 +44,7 @@ public interface FileScanProviderIf {
ParamCreateContext createContext(Analyzer analyzer) throws UserException;
void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy,
void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException;
int getInputSplitNum();

View File

@ -138,7 +138,7 @@ public class LoadScanProvider implements FileScanProviderIf {
}
@Override
public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy,
public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
Preconditions.checkNotNull(fileGroupInfo);
fileGroupInfo.getFileStatusAndCalcInstance(backendPolicy);

View File

@ -47,7 +47,7 @@ public class MetadataScanNode extends ScanNode {
private List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
private final BackendPolicy backendPolicy = new BackendPolicy();
private final FederationBackendPolicy backendPolicy = new FederationBackendPolicy();
public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, MetadataTableValuedFunction tvf) {
super(id, desc, "METADATA_SCAN_NODE", StatisticalType.METADATA_SCAN_NODE);

View File

@ -59,7 +59,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
public abstract TFileAttributes getFileAttributes() throws UserException;
@Override
public void createScanRangeLocations(ParamCreateContext context, BackendPolicy backendPolicy,
public void createScanRangeLocations(ParamCreateContext context, FederationBackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException {
long start = System.currentTimeMillis();
List<Split> inputSplits = splitter.getSplits(context.conjuncts);
@ -151,7 +151,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
return this.inputFileSize;
}
private TScanRangeLocations newLocations(TFileScanRangeParams params, BackendPolicy backendPolicy) {
private TScanRangeLocations newLocations(TFileScanRangeParams params, FederationBackendPolicy backendPolicy) {
// Generate on file scan range
TFileScanRange fileScanRange = new TFileScanRange();
fileScanRange.setParams(params);

View File

@ -49,6 +49,8 @@ public class BeSelectionPolicy {
public boolean preferComputeNode = false;
public int expectBeNum = 0;
public List<String> preferredLocations = new ArrayList<>();
private BeSelectionPolicy() {
}
@ -110,6 +112,11 @@ public class BeSelectionPolicy {
return this;
}
public Builder addPreLocations(List<String> preferredLocations) {
policy.preferredLocations.addAll(preferredLocations);
return this;
}
public BeSelectionPolicy build() {
return policy;
}
@ -141,6 +148,13 @@ public class BeSelectionPolicy {
public List<Backend> getCandidateBackends(ImmutableCollection<Backend> backends) {
List<Backend> filterBackends = backends.stream().filter(this::isMatch).collect(Collectors.toList());
List<Backend> preLocationFilterBackends = filterBackends.stream()
.filter(iterm -> preferredLocations.contains(iterm.getHostName())).collect(Collectors.toList());
// If preLocations were chosen, use the preLocation backends. Otherwise we just ignore this filter.
if (!preLocationFilterBackends.isEmpty()) {
filterBackends = preLocationFilterBackends;
}
Collections.shuffle(filterBackends);
List<Backend> candidates = new ArrayList<>();
if (preferComputeNode) {
int num = 0;

View File

@ -34,6 +34,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -272,6 +273,48 @@ public class SystemInfoServiceTest {
Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy07, 3).size());
}
@Test
public void testPreferLocationsSelect() throws Exception {
Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga");
// add more backends
addBackend(10002, "192.168.1.2", 9050);
Backend be2 = infoService.getBackend(10002);
be2.setAlive(true);
addBackend(10003, "192.168.1.3", 9050);
Backend be3 = infoService.getBackend(10003);
be3.setAlive(true);
addBackend(10004, "192.168.1.4", 9050);
Backend be4 = infoService.getBackend(10004);
be4.setAlive(true);
addBackend(10005, "192.168.1.5", 9050);
Backend be5 = infoService.getBackend(10005);
be5.setAlive(true);
setComputeNode(be5, taga);
List<String> preferLocations = new ArrayList<>();
preferLocations.add("192.168.1.2");
BeSelectionPolicy policy1 = new BeSelectionPolicy.Builder().addPreLocations(preferLocations).build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy1, 1).size());
preferLocations.add("192.168.1.3");
BeSelectionPolicy policy2 = new BeSelectionPolicy.Builder().addPreLocations(preferLocations).build();
Assert.assertEquals(2, infoService.selectBackendIdsByPolicy(policy2, 2).size());
// only one preferLocations
preferLocations.clear();
preferLocations.add("192.168.1.4");
BeSelectionPolicy policy3 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.addPreLocations(preferLocations).preferComputeNode(true).assignExpectBeNum(3).build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy3, 1).size());
preferLocations.add("192.168.1.5");
BeSelectionPolicy policy4 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga))
.addPreLocations(preferLocations).preferComputeNode(true).assignExpectBeNum(1).build();
Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy4, 1).size());
}
@Test
public void testSelectBackendIdsForReplicaCreation() throws Exception {
addBackend(10001, "192.168.1.1", 9050);