[opt](split) generate and get split batch concurrently (#36044)

bp #36045, and turn on batch split, which is turn off in #36109
Generate and get split batch concurrently.
`SplitSource.getNextBatch` remove the synchronization, and make each get their splits concurrently, and `SplitAssignment` generates splits asynchronously.
This commit is contained in:
Ashin Gau
2024-06-19 16:16:02 +08:00
committed by GitHub
parent 9c896efe0b
commit f59dc4fb37
19 changed files with 410 additions and 232 deletions

View File

@ -41,7 +41,7 @@ import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.InvalidPathException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
@ -74,10 +74,14 @@ public class LocationPath {
}
private LocationPath(String location) {
this(location, new HashMap<>());
this(location, Collections.emptyMap(), true);
}
public LocationPath(String location, Map<String, String> props) {
this(location, props, true);
}
public LocationPath(String location, Map<String, String> props, boolean convertPath) {
String scheme = parseScheme(location).toLowerCase();
if (scheme.isEmpty()) {
locationType = LocationType.NOSCHEME;
@ -88,7 +92,7 @@ public class LocationPath {
locationType = LocationType.HDFS;
// Need add hdfs host to location
String host = props.get(HdfsResource.DSF_NAMESERVICES);
this.location = normalizedHdfsPath(location, host);
this.location = convertPath ? normalizedHdfsPath(location, host) : location;
break;
case FeConstants.FS_PREFIX_S3:
locationType = LocationType.S3;
@ -96,22 +100,22 @@ public class LocationPath {
break;
case FeConstants.FS_PREFIX_S3A:
locationType = LocationType.S3A;
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_S3N:
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
locationType = LocationType.S3N;
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_BOS:
locationType = LocationType.BOS;
// use s3 client to access
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_GCS:
locationType = LocationType.GCS;
// use s3 client to access
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_OSS:
if (isHdfsOnOssEndpoint(location)) {
@ -119,7 +123,7 @@ public class LocationPath {
this.location = location;
} else {
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
}
@ -128,7 +132,7 @@ public class LocationPath {
break;
case FeConstants.FS_PREFIX_COS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
}
@ -136,7 +140,7 @@ public class LocationPath {
break;
case FeConstants.FS_PREFIX_OBS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
}
@ -331,7 +335,7 @@ public class LocationPath {
if (location == null || location.isEmpty()) {
return null;
}
LocationPath locationPath = new LocationPath(location);
LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), false);
return locationPath.getTFileTypeForBE();
}

View File

@ -786,11 +786,7 @@ public abstract class ExternalCatalog
}
public String bindBrokerName() {
Map<String, String> properties = catalogProperty.getProperties();
if (properties.containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
return properties.get(HMSExternalCatalog.BIND_BROKER_NAME);
}
return null;
return catalogProperty.getProperties().get(HMSExternalCatalog.BIND_BROKER_NAME);
}
// ATTN: this method only return all cached databases.

View File

@ -77,6 +77,7 @@ public class ExternalMetaCacheMgr {
private ExecutorService rowCountRefreshExecutor;
private ExecutorService commonRefreshExecutor;
private ExecutorService fileListingExecutor;
private ExecutorService scheduleExecutor;
// catalog id -> HiveMetaStoreCache
private final Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
@ -109,6 +110,11 @@ public class ExternalMetaCacheMgr {
Config.max_external_cache_loader_thread_pool_size * 1000,
"FileListingExecutor", 10, true);
scheduleExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
Config.max_external_cache_loader_thread_pool_size,
Config.max_external_cache_loader_thread_pool_size * 1000,
"scheduleExecutor", 10, true);
fsCache = new FileSystemCache();
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
@ -121,6 +127,10 @@ public class ExternalMetaCacheMgr {
return fileListingExecutor;
}
public ExecutorService getScheduleExecutor() {
return scheduleExecutor;
}
public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
if (cache == null) {

View File

@ -317,18 +317,19 @@ public abstract class FileQueryScanNode extends FileScanNode {
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while scanning.
// Only provide the unique ID of split source to backend.
SplitAssignment splitAssignment = new SplitAssignment(backendPolicy, this);
splitAssignment = new SplitAssignment(
backendPolicy, this, this::splitToScanRange, locationProperties, pathPartitionKeys);
splitAssignment.init();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
if (splitAssignment.getCurrentAssignment().isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) {
return;
}
inputSplitsNum = splitAssignment.numApproximateSplits();
inputSplitsNum = numApproximateSplits();
TFileType locationType;
FileSplit fileSplit = (FileSplit) splitAssignment.getCurrentAssignment().values().iterator().next();
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
@ -337,10 +338,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
}
totalFileSize = fileSplit.getLength() * inputSplitsNum;
// Not accurate, only used to estimate concurrency.
int numSplitsPerBE = splitAssignment.numApproximateSplits() / backendPolicy.numBackends();
int numSplitsPerBE = numApproximateSplits() / backendPolicy.numBackends();
for (Backend backend : backendPolicy.getBackends()) {
SplitSource splitSource = new SplitSource(
this::splitToScanRange, backend, locationProperties, splitAssignment, pathPartitionKeys);
SplitSource splitSource = new SplitSource(backend, splitAssignment);
splitSources.add(splitSource);
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
TScanRangeLocations curLocations = newLocations();
@ -583,4 +583,15 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected abstract TableIf getTargetTable() throws UserException;
protected abstract Map<String, String> getLocationProperties() throws UserException;
@Override
public void stop() {
if (splitAssignment != null) {
splitAssignment.stop();
SplitSourceManager manager = Env.getCurrentEnv().getSplitSourceManager();
for (Long sourceId : splitAssignment.getSources()) {
manager.removeSplitSource(sourceId);
}
}
}
}

View File

@ -107,7 +107,11 @@ public abstract class FileScanNode extends ExternalScanNode {
output.append(getRuntimeFilterExplainString(false));
}
output.append(prefix).append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=")
output.append(prefix);
if (isBatchMode()) {
output.append("(approximate)");
}
output.append("inputSplitNum=").append(inputSplitsNum).append(", totalFileSize=")
.append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n");
output.append(prefix).append("partition=").append(readPartitionNum).append("/").append(totalPartitionNum)
.append("\n");

View File

@ -18,68 +18,160 @@
package org.apache.doris.datasource;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* When file splits are supplied in batch mode, splits are generated lazily and assigned in each call of `getNextBatch`.
* `SplitGenerator` provides the file splits, and `FederationBackendPolicy` assigns these splits to backends.
*/
public class SplitAssignment {
// magic number to estimate how many splits are allocated to BE in each batch
private static final int NUM_SPLITS_PER_BE = 1024;
// magic number to estimate how many splits are generated of each partition in each batch.
private static final int NUM_SPLITS_PER_PARTITION = 10;
private final Set<Long> sources = new HashSet<>();
private final FederationBackendPolicy backendPolicy;
private final SplitGenerator splitGenerator;
// Store the current assignment of file splits
private final Multimap<Backend, Split> assignment;
private final int maxBatchSize;
private final ConcurrentHashMap<Backend, BlockingQueue<Collection<TScanRangeLocations>>> assignment
= new ConcurrentHashMap<>();
private final SplitToScanRange splitToScanRange;
private final Map<String, String> locationProperties;
private final List<String> pathPartitionKeys;
private final Object assignLock = new Object();
private Split sampleSplit = null;
private final AtomicBoolean isStop = new AtomicBoolean(false);
private final AtomicBoolean scheduleFinished = new AtomicBoolean(false);
public SplitAssignment(FederationBackendPolicy backendPolicy, SplitGenerator splitGenerator) {
private UserException exception = null;
public SplitAssignment(
FederationBackendPolicy backendPolicy,
SplitGenerator splitGenerator,
SplitToScanRange splitToScanRange,
Map<String, String> locationProperties,
List<String> pathPartitionKeys) {
this.backendPolicy = backendPolicy;
this.splitGenerator = splitGenerator;
this.assignment = ArrayListMultimap.create();
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
maxBatchSize = Math.min(NUM_SPLITS_PER_PARTITION * numPartitions,
NUM_SPLITS_PER_BE * backendPolicy.numBackends());
this.splitToScanRange = splitToScanRange;
this.locationProperties = locationProperties;
this.pathPartitionKeys = pathPartitionKeys;
}
public void init() throws UserException {
if (assignment.isEmpty() && splitGenerator.hasNext()) {
assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize)));
splitGenerator.startSplit();
synchronized (assignLock) {
while (sampleSplit == null && waitFirstSplit()) {
try {
assignLock.wait(100);
} catch (InterruptedException e) {
throw new UserException(e.getMessage(), e);
}
}
}
if (exception != null) {
throw exception;
}
}
public Multimap<Backend, Split> getCurrentAssignment() {
return assignment;
private boolean waitFirstSplit() {
return !scheduleFinished.get() && !isStop.get() && exception == null;
}
public int numApproximateSplits() {
return splitGenerator.numApproximateSplits();
}
public synchronized Collection<Split> getNextBatch(Backend backend) throws UserException {
// Each call should consume all splits
Collection<Split> splits = assignment.removeAll(backend);
while (splits.isEmpty()) {
// Get the next batch of splits, and assign to backends
// If there is data skewing, it maybe causes splits to accumulate on some BE
if (!splitGenerator.hasNext()) {
return splits;
private void appendBatch(Multimap<Backend, Split> batch) throws UserException {
for (Backend backend : batch.keySet()) {
Collection<Split> splits = batch.get(backend);
List<TScanRangeLocations> locations = new ArrayList<>(splits.size());
for (Split split : splits) {
locations.add(splitToScanRange.getScanRange(backend, locationProperties, split, pathPartitionKeys));
}
// todo: In each batch, it's to find the optimal assignment for partial splits,
// how to solve the global data skew?
assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize)));
splits = assignment.removeAll(backend);
if (!assignment.computeIfAbsent(backend, be -> new LinkedBlockingQueue<>()).offer(locations)) {
throw new UserException("Failed to offer batch split");
}
}
}
public void registerSource(long uniqueId) {
sources.add(uniqueId);
}
public Set<Long> getSources() {
return sources;
}
public Split getSampleSplit() {
return sampleSplit;
}
public void addToQueue(List<Split> splits) {
if (splits.isEmpty()) {
return;
}
Multimap<Backend, Split> batch = null;
synchronized (assignLock) {
if (sampleSplit == null) {
sampleSplit = splits.get(0);
assignLock.notify();
}
try {
batch = backendPolicy.computeScanRangeAssignment(splits);
} catch (UserException e) {
exception = e;
}
}
if (batch != null) {
try {
appendBatch(batch);
} catch (UserException e) {
exception = e;
}
}
}
private void notifyAssignment() {
synchronized (assignLock) {
assignLock.notify();
}
}
public BlockingQueue<Collection<TScanRangeLocations>> getAssignedSplits(Backend backend) throws UserException {
if (exception != null) {
throw exception;
}
BlockingQueue<Collection<TScanRangeLocations>> splits = assignment.computeIfAbsent(backend,
be -> new LinkedBlockingQueue<>());
if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) {
return null;
}
return splits;
}
public void setException(UserException e) {
exception = e;
notifyAssignment();
}
public void finishSchedule() {
scheduleFinished.set(true);
notifyAssignment();
}
public void stop() {
isStop.set(true);
notifyAssignment();
}
public boolean isStop() {
return isStop.get();
}
}

View File

@ -28,27 +28,12 @@ import java.util.List;
* The consumer should call `getNextBatch` to fetch the next batch of splits.
*/
public interface SplitGenerator {
/**
* Get the next batch of splits. If the producer(e.g. ScanNode) doesn't support batch mode,
* should throw user exceptions.
*/
default List<Split> getNextBatch(int maxBatchSize) throws UserException {
throw new NotImplementedException("Should implement getNextBatch if in batch mode.");
}
/**
* Get all file splits if the producer doesn't support batch mode.
*/
default List<Split> getSplits() throws UserException {
// todo: remove this interface if batch mode is stable
throw new NotImplementedException("Scan node sub class need to implement getSplits interface.");
}
/**
* `getNextBatch` should return empty list even if `hasNext` returns false.
*/
default boolean hasNext() {
return false;
throw new NotImplementedException("Not implement");
}
/**
@ -65,4 +50,13 @@ public interface SplitGenerator {
default int numApproximateSplits() {
return -1;
}
default void startSplit() {
}
/**
* Close split generator, and stop the split executor
*/
default void stop() {
}
}

View File

@ -18,16 +18,17 @@
package org.apache.doris.datasource;
import org.apache.doris.common.UserException;
import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRangeLocations;
import java.util.ArrayList;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -42,28 +43,20 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class SplitSource {
private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
private static final long WAIT_TIME_OUT = 100; // 100ms
private static final long MAX_WAIT_TIME_OUT = 500; // 500ms
private final long uniqueId;
private final SplitToScanRange splitToScanRange;
private final Backend backend;
private final Map<String, String> locationProperties;
private final List<String> pathPartitionKeys;
private final SplitAssignment splitAssignment;
private Iterator<Split> splitIterator = null;
private boolean isLastBatch = false;
private final AtomicBoolean isLastBatch;
public SplitSource(
SplitToScanRange splitToScanRange,
Backend backend,
Map<String, String> locationProperties,
SplitAssignment splitAssignment,
List<String> pathPartitionKeys) {
public SplitSource(Backend backend, SplitAssignment splitAssignment) {
this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
this.splitToScanRange = splitToScanRange;
this.backend = backend;
this.locationProperties = locationProperties;
this.pathPartitionKeys = pathPartitionKeys;
this.splitAssignment = splitAssignment;
this.isLastBatch = new AtomicBoolean(false);
splitAssignment.registerSource(uniqueId);
}
public long getUniqueId() {
@ -73,22 +66,33 @@ public class SplitSource {
/**
* Get the next batch of file splits. If there's no more split, return empty list.
*/
public synchronized List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserException {
if (isLastBatch) {
public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserException {
if (isLastBatch.get()) {
return Collections.emptyList();
}
List<TScanRangeLocations> scanRanges = new ArrayList<>(maxBatchSize);
for (int i = 0; i < maxBatchSize; i++) {
if (splitIterator == null || !splitIterator.hasNext()) {
Collection<Split> splits = splitAssignment.getNextBatch(backend);
if (splits.isEmpty()) {
isLastBatch = true;
return scanRanges;
}
splitIterator = splits.iterator();
List<TScanRangeLocations> scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize);
long maxTimeOut = 0;
while (scanRanges.size() < maxBatchSize) {
BlockingQueue<Collection<TScanRangeLocations>> splits = splitAssignment.getAssignedSplits(backend);
if (splits == null) {
isLastBatch.set(true);
break;
}
while (scanRanges.size() < maxBatchSize) {
try {
Collection<TScanRangeLocations> splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
if (splitCollection == null) {
maxTimeOut += WAIT_TIME_OUT;
break;
}
scanRanges.addAll(splitCollection);
} catch (InterruptedException e) {
throw new UserException("Failed to get next batch of splits", e);
}
}
if (maxTimeOut >= MAX_WAIT_TIME_OUT && !scanRanges.isEmpty()) {
break;
}
scanRanges.add(splitToScanRange.getScanRange(
backend, locationProperties, splitIterator.next(), pathPartitionKeys));
}
return scanRanges;
}

View File

@ -30,7 +30,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.util.CacheBulkLoader;
@ -468,37 +467,39 @@ public class HiveMetaStoreCache {
public List<FileCacheValue> getFilesByPartitionsWithCache(List<HivePartition> partitions,
String bindBrokerName) {
return getFilesByPartitions(partitions, true, bindBrokerName);
return getFilesByPartitions(partitions, true, true, bindBrokerName);
}
public List<FileCacheValue> getFilesByPartitionsWithoutCache(List<HivePartition> partitions,
String bindBrokerName) {
return getFilesByPartitions(partitions, false, bindBrokerName);
return getFilesByPartitions(partitions, false, true, bindBrokerName);
}
private List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
boolean withCache, String bindBrokerName) {
public List<FileCacheValue> getFilesByPartitions(List<HivePartition> partitions,
boolean withCache,
boolean concurrent,
String bindBrokerName) {
long start = System.currentTimeMillis();
List<FileCacheKey> keys = partitions.stream().map(p -> {
FileCacheKey fileCacheKey = p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(p.getDbName(), p.getTblName(), p.getPath(),
p.getInputFormat(), bindBrokerName)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName);
return fileCacheKey;
}).collect(Collectors.toList());
List<FileCacheKey> keys = partitions.stream().map(p -> p.isDummyPartition()
? FileCacheKey.createDummyCacheKey(
p.getDbName(), p.getTblName(), p.getPath(), p.getInputFormat(), bindBrokerName)
: new FileCacheKey(p.getPath(), p.getInputFormat(), p.getPartitionValues(), bindBrokerName))
.collect(Collectors.toList());
List<FileCacheValue> fileLists;
try {
if (withCache) {
fileLists = fileCacheRef.get().getAll(keys).values().stream().collect(Collectors.toList());
fileLists = new ArrayList<>(fileCacheRef.get().getAll(keys).values());
} else {
List<Pair<FileCacheKey, Future<FileCacheValue>>> pList = keys.stream()
.map(key -> Pair.of(key, fileListingExecutor.submit(() -> loadFiles(key))))
.collect(Collectors.toList());
fileLists = Lists.newArrayListWithExpectedSize(keys.size());
for (Pair<FileCacheKey, Future<FileCacheValue>> p : pList) {
fileLists.add(p.second.get());
if (concurrent) {
List<Future<FileCacheValue>> pList = keys.stream().map(
key -> fileListingExecutor.submit(() -> loadFiles(key))).collect(Collectors.toList());
fileLists = Lists.newArrayListWithExpectedSize(keys.size());
for (Future<FileCacheValue> p : pList) {
fileLists.add(p.get());
}
} else {
fileLists = keys.stream().map(this::loadFiles).collect(Collectors.toList());
}
}
} catch (ExecutionException e) {
@ -810,7 +811,7 @@ public class HiveMetaStoreCache {
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
properties, bindBrokerName));
properties, bindBrokerName));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
@ -837,7 +838,7 @@ public class HiveMetaStoreCache {
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
properties, bindBrokerName));
properties, bindBrokerName));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {

View File

@ -26,6 +26,7 @@ import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
@ -63,14 +64,17 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class HiveScanNode extends FileQueryScanNode {
@ -98,9 +102,10 @@ public class HiveScanNode extends FileQueryScanNode {
private SelectedPartitions selectedPartitions = null;
private boolean partitionInit = false;
private final AtomicReference<UserException> batchException = new AtomicReference<>(null);
private List<HivePartition> prunedPartitions;
private Iterator<HivePartition> prunedPartitionsIter;
private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
private final Semaphore splittersOnFlight = new Semaphore(NUM_SPLITTERS_ON_FLIGHT);
private final AtomicInteger numSplitsPerPartition = new AtomicInteger(NUM_SPLITS_PER_PARTITION);
/**
* * External file scan node for Query Hive table
@ -140,7 +145,7 @@ public class HiveScanNode extends FileQueryScanNode {
List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
if (!partitionColumnTypes.isEmpty()) {
// partitioned table
boolean isPartitionPruned = selectedPartitions == null ? false : selectedPartitions.isPruned;
boolean isPartitionPruned = selectedPartitions != null && selectedPartitions.isPruned;
Collection<PartitionItem> partitionItems;
if (!isPartitionPruned) {
// partitionItems is null means that the partition is not pruned by Nereids,
@ -232,36 +237,52 @@ public class HiveScanNode extends FileQueryScanNode {
}
@Override
public List<Split> getNextBatch(int maxBatchSize) throws UserException {
try {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
List<Split> allFiles = Lists.newArrayList();
int numPartitions = 0;
while (allFiles.size() < maxBatchSize && prunedPartitionsIter.hasNext()) {
List<HivePartition> partitions = new ArrayList<>(NUM_PARTITIONS_PER_LOOP);
for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && prunedPartitionsIter.hasNext(); ++i) {
partitions.add(prunedPartitionsIter.next());
numPartitions++;
}
getFileSplitByPartitions(cache, partitions, allFiles, bindBrokerName);
}
if (allFiles.size() / numPartitions > numSplitsPerPartition) {
numSplitsPerPartition = allFiles.size() / numPartitions;
}
return allFiles;
} catch (Throwable t) {
LOG.warn("get file split failed for table: {}", hmsTable.getName(), t);
throw new UserException(
"get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t),
t);
public void startSplit() {
if (prunedPartitions.isEmpty()) {
splitAssignment.finishSchedule();
return;
}
}
@Override
public boolean hasNext() {
return prunedPartitionsIter.hasNext();
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
AtomicInteger numFinishedPartitions = new AtomicInteger(0);
CompletableFuture.runAsync(() -> {
for (HivePartition partition : prunedPartitions) {
if (batchException.get() != null || splitAssignment.isStop()) {
break;
}
try {
splittersOnFlight.acquire();
} catch (InterruptedException e) {
batchException.set(new UserException(e.getMessage(), e));
break;
}
CompletableFuture.runAsync(() -> {
try {
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(cache, Collections.singletonList(partition), allFiles, bindBrokerName);
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
splitAssignment.addToQueue(allFiles);
} catch (IOException e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
splittersOnFlight.release();
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) {
splitAssignment.finishSchedule();
}
}
}, scheduleExecutor);
}
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
});
}
@Override
@ -272,7 +293,6 @@ public class HiveScanNode extends FileQueryScanNode {
} catch (Exception e) {
return false;
}
prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
@ -281,7 +301,7 @@ public class HiveScanNode extends FileQueryScanNode {
@Override
public int numApproximateSplits() {
return numSplitsPerPartition * prunedPartitions.size();
return numSplitsPerPartition.get() * prunedPartitions.size();
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
@ -290,7 +310,8 @@ public class HiveScanNode extends FileQueryScanNode {
if (hiveTransaction != null) {
fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
} else {
fileCaches = cache.getFilesByPartitionsWithCache(partitions, bindBrokerName);
boolean withCache = Config.max_external_file_cache_num > 0;
fileCaches = cache.getFilesByPartitions(partitions, withCache, withCache, bindBrokerName);
}
if (tableSample != null) {
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches);
@ -463,10 +484,7 @@ public class HiveScanNode extends FileQueryScanNode {
public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
String aggFunctionName = aggExpr.getFnName().getFunction();
if (aggFunctionName.equalsIgnoreCase("COUNT")) {
return true;
}
return false;
return aggFunctionName.equalsIgnoreCase("COUNT");
}
@Override

View File

@ -69,13 +69,15 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -101,9 +103,11 @@ public class HudiScanNode extends HiveScanNode {
private HoodieTimeline timeline;
private Option<String> snapshotTimestamp;
private String queryInstant;
private final AtomicReference<UserException> batchException = new AtomicReference<>(null);
private List<HivePartition> prunedPartitions;
private Iterator<HivePartition> prunedPartitionsIter;
private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
private final Semaphore splittersOnFlight = new Semaphore(NUM_SPLITTERS_ON_FLIGHT);
private final AtomicInteger numSplitsPerPartition = new AtomicInteger(NUM_SPLITS_PER_PARTITION);
private boolean incrementalRead = false;
private TableScanParams scanParams;
@ -206,7 +210,6 @@ public class HudiScanNode extends HiveScanNode {
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
if (!snapshotInstant.isPresent()) {
prunedPartitions = Collections.emptyList();
prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
return;
}
@ -320,47 +323,47 @@ public class HudiScanNode extends HiveScanNode {
incrementalRelation.getEndTs())).collect(Collectors.toList());
}
private void getPartitionSplits(HivePartition partition, List<Split> splits) throws IOException {
String globPath;
String partitionName;
if (partition.isDummyPartition()) {
partitionName = "";
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(
hudiClient.getRawFs(), new Path(globPath));
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));
if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toStorageLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
.forEach(fileSlice -> splits.add(
generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant)));
}
}
private void getPartitionSplits(List<HivePartition> partitions, List<Split> splits) {
Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
AtomicReference<Throwable> throwable = new AtomicReference<>();
partitions.forEach(partition -> executor.execute(() -> {
try {
String globPath;
String partitionName = "";
if (partition.isDummyPartition()) {
globPath = hudiClient.getBasePathV2().toString() + "/*";
} else {
partitionName = FSUtils.getRelativePartitionPath(hudiClient.getBasePathV2(),
new Path(partition.getPath()));
globPath = String.format("%s/%s/*", hudiClient.getBasePathV2().toString(), partitionName);
}
List<FileStatus> statuses;
try {
statuses = FSUtils.getGlobStatusExcludingMetaFolder(hudiClient.getRawFs(),
new Path(globPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get hudi file statuses on path: " + globPath, e);
}
HoodieTableFileSystemView fileSystemView = new HoodieTableFileSystemView(hudiClient,
timeline, statuses.toArray(new FileStatus[0]));
if (isCowOrRoTable) {
fileSystemView.getLatestBaseFilesBeforeOrOn(partitionName, queryInstant).forEach(baseFile -> {
noLogsSplitNum.incrementAndGet();
String filePath = baseFile.getPath();
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toStorageLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
new String[0], partition.getPartitionValues()));
});
} else {
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
.forEach(fileSlice -> splits.add(
generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant)));
}
getPartitionSplits(partition, splits);
} catch (Throwable t) {
throwable.set(t);
} finally {
@ -394,26 +397,48 @@ public class HudiScanNode extends HiveScanNode {
}
@Override
public List<Split> getNextBatch(int maxBatchSize) throws UserException {
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
int numPartitions = 0;
while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext()) {
List<HivePartition> partitions = new ArrayList<>(NUM_PARTITIONS_PER_LOOP);
for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && prunedPartitionsIter.hasNext(); ++i) {
partitions.add(prunedPartitionsIter.next());
numPartitions++;
public void startSplit() {
if (prunedPartitions.isEmpty()) {
splitAssignment.finishSchedule();
return;
}
AtomicInteger numFinishedPartitions = new AtomicInteger(0);
CompletableFuture.runAsync(() -> {
for (HivePartition partition : prunedPartitions) {
if (batchException.get() != null || splitAssignment.isStop()) {
break;
}
try {
splittersOnFlight.acquire();
} catch (InterruptedException e) {
batchException.set(new UserException(e.getMessage(), e));
break;
}
CompletableFuture.runAsync(() -> {
try {
List<Split> allFiles = Lists.newArrayList();
getPartitionSplits(partition, allFiles);
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
splitAssignment.addToQueue(allFiles);
} catch (IOException e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
splittersOnFlight.release();
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
if (numFinishedPartitions.incrementAndGet() == prunedPartitions.size()) {
splitAssignment.finishSchedule();
}
}
});
}
getPartitionSplits(partitions, splits);
}
if (splits.size() / numPartitions > numSplitsPerPartition) {
numSplitsPerPartition = splits.size() / numPartitions;
}
return splits;
}
@Override
public boolean hasNext() {
return prunedPartitionsIter.hasNext();
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
});
}
@Override
@ -426,7 +451,6 @@ public class HudiScanNode extends HiveScanNode {
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
@ -435,7 +459,7 @@ public class HudiScanNode extends HiveScanNode {
@Override
public int numApproximateSplits() {
return numSplitsPerPartition * prunedPartitions.size();
return numSplitsPerPartition.get() * prunedPartitions.size();
}
private HudiSplit generateHudiSplit(FileSlice fileSlice, List<String> partitionValues, String queryInstant) {

View File

@ -39,9 +39,11 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FederationBackendPolicy;
import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.SplitAssignment;
import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
@ -78,7 +80,7 @@ import java.util.stream.Collectors;
public abstract class ScanNode extends PlanNode implements SplitGenerator {
private static final Logger LOG = LogManager.getLogger(ScanNode.class);
protected static final int NUM_SPLITS_PER_PARTITION = 10;
protected static final int NUM_PARTITIONS_PER_LOOP = 100;
protected static final int NUM_SPLITTERS_ON_FLIGHT = Config.max_external_cache_loader_thread_pool_size;
protected final TupleDescriptor desc;
// for distribution prunner
protected Map<String, PartitionColumnFilter> columnFilters = Maps.newHashMap();
@ -89,6 +91,7 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator {
protected List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
protected List<SplitSource> splitSources = Lists.newArrayList();
protected PartitionInfo partitionsInfo = null;
protected SplitAssignment splitAssignment = null;
// create a mapping between output slot's id and project expr
Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();

View File

@ -662,6 +662,9 @@ public class Coordinator implements CoordInterface {
@Override
public void close() {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
if (queryQueue != null && queueToken != null) {
try {
queryQueue.returnToken(queueToken);
@ -1471,6 +1474,9 @@ public class Coordinator implements CoordInterface {
@Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
lock();
try {
if (!queryStatus.ok()) {

View File

@ -1459,7 +1459,7 @@ public class SessionVariable implements Serializable, Writable {
description = {"如果分区数量超过阈值,BE将通过batch方式获取scan ranges",
"If the number of partitions exceeds the threshold, scan ranges will be got through batch mode."},
needForward = true)
public int numPartitionsInBatchMode = -1;
public int numPartitionsInBatchMode = 1024;
@VariableMgr.VarAttr(
name = ENABLE_PARQUET_LAZY_MAT,