[opt](split) get file splits in batch mode (#34032) (#35107)

bp  #34032
This commit is contained in:
Ashin Gau
2024-05-21 22:27:07 +08:00
committed by GitHub
parent 037de3dedd
commit 98f8eb5c43
28 changed files with 1045 additions and 243 deletions

View File

@ -132,6 +132,7 @@ import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.ExternalMetaIdMgr;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.SplitSourceManager;
import org.apache.doris.datasource.es.EsExternalCatalog;
import org.apache.doris.datasource.es.EsRepository;
import org.apache.doris.datasource.hive.HiveTransactionMgr;
@ -535,6 +536,8 @@ public class Env {
private final NereidsSqlCacheManager sqlCacheManager;
private final SplitSourceManager splitSourceManager;
public List<TFrontendInfo> getFrontendInfos() {
List<TFrontendInfo> res = new ArrayList<>();
@ -772,6 +775,7 @@ public class Env {
this.insertOverwriteManager = new InsertOverwriteManager();
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager();
this.splitSourceManager = new SplitSourceManager();
}
public static void destroyCheckpoint() {
@ -1712,7 +1716,7 @@ public class Env {
workloadGroupMgr.start();
workloadSchedPolicyMgr.start();
workloadRuntimeStatusMgr.start();
splitSourceManager.start();
}
private void transferToNonMaster(FrontendNodeType newType) {
@ -6058,6 +6062,10 @@ public class Env {
return sqlCacheManager;
}
public SplitSourceManager getSplitSourceManager() {
return splitSourceManager;
}
public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) {
AlterMTMV alter = new AlterMTMV(info.getMvName(), info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO);
this.alter.processAlterMTMV(alter, false);

View File

@ -60,12 +60,12 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TSplitSource;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.doris.thrift.TTextSerdeType;
import org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc;
import org.apache.doris.thrift.TTransactionalHiveDesc;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -124,6 +124,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
*/
@Override
public void init() throws UserException {
super.init();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setInitScanNodeStartTime();
}
@ -268,14 +269,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsStartTime();
}
List<Split> inputSplits = getSplits();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
this.inputSplitsNum = inputSplits.size();
if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
return;
}
TFileFormatType fileFormatType = getFileFormatType();
if (fileFormatType == TFileFormatType.FORMAT_ORC) {
genSlotToSchemaIdMapForOrc();
@ -317,72 +310,66 @@ public abstract class FileQueryScanNode extends FileScanNode {
}
List<String> pathPartitionKeys = getPathPartitionKeys();
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while scanning.
// Only provide the unique ID of split source to backend.
SplitAssignment splitAssignment = new SplitAssignment(backendPolicy, this);
splitAssignment.init();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
if (splitAssignment.getCurrentAssignment().isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
return;
}
inputSplitsNum = splitAssignment.numApproximateSplits();
Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits);
for (Backend backend : assignment.keySet()) {
Collection<Split> splits = assignment.get(backend);
for (Split split : splits) {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType;
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}
TFileType locationType;
FileSplit fileSplit = (FileSplit) splitAssignment.getCurrentAssignment().values().iterator().next();
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}
totalFileSize = fileSplit.getLength() * inputSplitsNum;
// Not accurate, only used to estimate concurrency.
int numSplitsPerBE = splitAssignment.numApproximateSplits() / backendPolicy.numBackends();
for (Backend backend : backendPolicy.getBackends()) {
SplitSource splitSource = new SplitSource(
this::splitToScanRange, backend, locationProperties, splitAssignment, pathPartitionKeys);
splitSources.add(splitSource);
Env.getCurrentEnv().getSplitSourceManager().registerSplitSource(splitSource);
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from hive partitions.
// Otherwise, use the values in file path.
boolean isACID = false;
if (fileSplit instanceof HiveSplit) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
isACID = hiveSplit.isACID();
}
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
locationType);
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
rangeDesc.setCompressType(fileCompressType);
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
deleteDeltaDescs.add(deleteDeltaDesc);
}
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
setScanParams(rangeDesc, fileSplit);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TSplitSource tSource = new TSplitSource();
tSource.setSplitSourceId(splitSource.getUniqueId());
tSource.setNumSplits(numSplitsPerBE);
curLocations.getScanRange().getExtScanRange().getFileScanRange().setSplitSource(tSource);
TScanRangeLocation location = new TScanRangeLocation();
setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
location.setBackendId(backend.getId());
location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
curLocations.addToLocations(location);
if (LOG.isDebugEnabled()) {
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(),
fileSplit.getStart(), fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
}
// So there's only one scan range for each backend.
// Each backend only starts up one ScanNode instance.
// However, even one ScanNode instance can provide maximum scanning concurrency.
scanRangeLocations.add(curLocations);
this.totalFileSize += fileSplit.getLength();
setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
}
} else {
List<Split> inputSplits = getSplits();
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
inputSplitsNum = inputSplits.size();
if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
return;
}
Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits);
for (Backend backend : assignment.keySet()) {
Collection<Split> splits = assignment.get(backend);
for (Split split : splits) {
scanRangeLocations.add(splitToScanRange(backend, locationProperties, split, pathPartitionKeys));
totalFileSize += split.getLength();
}
}
}
@ -395,6 +382,66 @@ public abstract class FileQueryScanNode extends FileScanNode {
}
}
private TScanRangeLocations splitToScanRange(
Backend backend,
Map<String, String> locationProperties,
Split split,
List<String> pathPartitionKeys) throws UserException {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType;
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from hive partitions.
// Otherwise, use the values in file path.
boolean isACID = false;
if (fileSplit instanceof HiveSplit) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
isACID = hiveSplit.isACID();
}
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
locationType);
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
rangeDesc.setCompressType(fileCompressType);
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
deleteDeltaDescs.add(deleteDeltaDesc);
}
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
setScanParams(rangeDesc, fileSplit);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
location.setBackendId(backend.getId());
location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
curLocations.addToLocations(location);
return curLocations;
}
private void setLocationPropertiesIfNecessary(Backend selectedBackend, TFileType locationType,
Map<String, String> locationProperties) throws UserException {
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {

View File

@ -69,6 +69,7 @@ public abstract class FileScanNode extends ExternalScanNode {
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
protected long readPartitionNum = 0;
protected long fileSplitSize;
public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
@ -76,6 +77,11 @@ public abstract class FileScanNode extends ExternalScanNode {
this.needCheckColumnPriv = needCheckColumnPriv;
}
@Override
public void init() throws UserException {
this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
}
@Override
protected void toThrift(TPlanNode planNode) {
planNode.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
@ -233,18 +239,17 @@ public abstract class FileScanNode extends ExternalScanNode {
result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues));
return result;
}
long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
if (splitSize <= 0) {
splitSize = blockSize;
if (fileSplitSize <= 0) {
fileSplitSize = blockSize;
}
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
splitSize = Math.max(splitSize, DEFAULT_SPLIT_SIZE);
fileSplitSize = Math.max(fileSplitSize, DEFAULT_SPLIT_SIZE);
long bytesRemaining;
for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D;
bytesRemaining -= splitSize) {
for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D;
bytesRemaining -= fileSplitSize) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
result.add(splitCreator.create(path, length - bytesRemaining, splitSize,
result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize,
length, modificationTime, hosts, partitionValues));
}
if (bytesRemaining != 0L) {

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
/**
* When file splits are supplied in batch mode, splits are generated lazily and assigned in each call of `getNextBatch`.
* `SplitGenerator` provides the file splits, and `FederationBackendPolicy` assigns these splits to backends.
*/
public class SplitAssignment {
// magic number to estimate how many splits are allocated to BE in each batch
private static final int NUM_SPLITS_PER_BE = 1024;
// magic number to estimate how many splits are generated of each partition in each batch.
private static final int NUM_SPLITS_PER_PARTITION = 10;
private final FederationBackendPolicy backendPolicy;
private final SplitGenerator splitGenerator;
// Store the current assignment of file splits
private final Multimap<Backend, Split> assignment;
private final int maxBatchSize;
public SplitAssignment(FederationBackendPolicy backendPolicy, SplitGenerator splitGenerator) {
this.backendPolicy = backendPolicy;
this.splitGenerator = splitGenerator;
this.assignment = ArrayListMultimap.create();
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
maxBatchSize = Math.min(NUM_SPLITS_PER_PARTITION * numPartitions,
NUM_SPLITS_PER_BE * backendPolicy.numBackends());
}
public void init() throws UserException {
if (assignment.isEmpty() && splitGenerator.hasNext()) {
assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize)));
}
}
public Multimap<Backend, Split> getCurrentAssignment() {
return assignment;
}
public int numApproximateSplits() {
return splitGenerator.numApproximateSplits();
}
public synchronized Collection<Split> getNextBatch(Backend backend) throws UserException {
// Each call should consume all splits
Collection<Split> splits = assignment.removeAll(backend);
while (splits.isEmpty()) {
// Get the next batch of splits, and assign to backends
// If there is data skewing, it maybe causes splits to accumulate on some BE
if (!splitGenerator.hasNext()) {
return splits;
}
// todo: In each batch, it's to find the optimal assignment for partial splits,
// how to solve the global data skew?
assignment.putAll(backendPolicy.computeScanRangeAssignment(splitGenerator.getNextBatch(maxBatchSize)));
splits = assignment.removeAll(backend);
}
return splits;
}
}

View File

@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.spi.Split;
import java.util.List;
/**
* The Producer(e.g. ScanNode) that provides the file splits in lazy and batch mode.
* The consumer should call `getNextBatch` to fetch the next batch of splits.
*/
public interface SplitGenerator {
/**
* Get the next batch of splits. If the producer(e.g. ScanNode) doesn't support batch mode,
* should throw user exceptions.
*/
default List<Split> getNextBatch(int maxBatchSize) throws UserException {
throw new NotImplementedException("Should implement getNextBatch if in batch mode.");
}
/**
* Get all file splits if the producer doesn't support batch mode.
*/
default List<Split> getSplits() throws UserException {
// todo: remove this interface if batch mode is stable
throw new NotImplementedException("Scan node sub class need to implement getSplits interface.");
}
/**
* `getNextBatch` should return empty list even if `hasNext` returns false.
*/
default boolean hasNext() {
return false;
}
/**
* Whether the producer(e.g. ScanNode) support batch mode.
*/
default boolean isBatchMode() {
return false;
}
/**
* Because file splits are generated lazily, the exact number of splits may not be known,
* provide an estimated value to show in describe statement.
*/
default int numApproximateSplits() {
return -1;
}
}

View File

@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.common.UserException;
import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRangeLocations;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* If there are many files, splitting these files into scan ranges will consume a lot of time.
* Even the simplest queries(e.g. select * from large_table limit 1) can get stuck or crash due to the split process.
* Furthermore, during the splitting process, the backend did not do anything.
* It is completely possible to split files whiling scanning data on the ready splits at once.
* `SplitSource` introduce a lazy and batch mode to provide the file splits. Each `SplitSource` has a unique ID,
* which is used by backends to call `FrontendServiceImpl#fetchSplitBatch` to fetch splits batch by batch.
* `SplitSource`s are managed by `SplitSourceManager`, which stores `SplitSource` as a weak reference, and clean
* the split source when its related scan node is GC.
*/
public class SplitSource {
private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
private final long uniqueId;
private final SplitToScanRange splitToScanRange;
private final Backend backend;
private final Map<String, String> locationProperties;
private final List<String> pathPartitionKeys;
private final SplitAssignment splitAssignment;
private Iterator<Split> splitIterator = null;
private boolean isLastBatch = false;
public SplitSource(
SplitToScanRange splitToScanRange,
Backend backend,
Map<String, String> locationProperties,
SplitAssignment splitAssignment,
List<String> pathPartitionKeys) {
this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
this.splitToScanRange = splitToScanRange;
this.backend = backend;
this.locationProperties = locationProperties;
this.pathPartitionKeys = pathPartitionKeys;
this.splitAssignment = splitAssignment;
}
public long getUniqueId() {
return uniqueId;
}
/**
* Get the next batch of file splits. If there's no more split, return empty list.
*/
public synchronized List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserException {
if (isLastBatch) {
return Collections.emptyList();
}
List<TScanRangeLocations> scanRanges = new ArrayList<>(maxBatchSize);
for (int i = 0; i < maxBatchSize; i++) {
if (splitIterator == null || !splitIterator.hasNext()) {
Collection<Split> splits = splitAssignment.getNextBatch(backend);
if (splits.isEmpty()) {
isLastBatch = true;
return scanRanges;
}
splitIterator = splits.iterator();
}
scanRanges.add(splitToScanRange.getScanRange(
backend, locationProperties, splitIterator.next(), pathPartitionKeys));
}
return scanRanges;
}
}

View File

@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* `SplitSource` is obtained by RPC call of `FrontendServiceImpl#fetchSplitBatch`.
* Each `SplitSource` is reference by its unique ID. `SplitSourceManager` provides the register, get, and remove
* function to manage the split sources. In order to clean the split source when the query finished,
* `SplitSource` is stored as a weak reference, and use `ReferenceQueue` to remove split source when GC.
*/
public class SplitSourceManager extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(SplitSourceManager.class);
public static class SplitSourceReference extends WeakReference<SplitSource> {
private final long uniqueId;
public SplitSourceReference(SplitSource splitSource, ReferenceQueue<? super SplitSource> queue) {
super(splitSource, queue);
uniqueId = splitSource.getUniqueId();
}
public long getUniqueId() {
return uniqueId;
}
}
private final ReferenceQueue<SplitSource> splitsRefQueue = new ReferenceQueue<>();
private final Map<Long, WeakReference<SplitSource>> splits = new ConcurrentHashMap<>();
public void registerSplitSource(SplitSource splitSource) {
splits.put(splitSource.getUniqueId(), new SplitSourceReference(splitSource, splitsRefQueue));
}
public void removeSplitSource(long uniqueId) {
splits.remove(uniqueId);
}
public SplitSource getSplitSource(long uniqueId) {
return splits.get(uniqueId).get();
}
@Override
protected void runAfterCatalogReady() {
while (true) {
try {
SplitSourceReference reference = (SplitSourceReference) splitsRefQueue.remove();
removeSplitSource(reference.getUniqueId());
} catch (Exception e) {
LOG.warn("Failed to clean split source", e);
}
}
}
}

View File

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.common.UserException;
import org.apache.doris.spi.Split;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRangeLocations;
import java.util.List;
import java.util.Map;
public interface SplitToScanRange {
TScanRangeLocations getScanRange(
Backend backend,
Map<String, String> locationProperties,
Split split,
List<String> pathPartitionKeys) throws UserException;
}

View File

@ -63,8 +63,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -95,6 +97,11 @@ public class HiveScanNode extends FileQueryScanNode {
@Setter
private SelectedPartitions selectedPartitions = null;
private boolean partitionInit = false;
private List<HivePartition> prunedPartitions;
private Iterator<HivePartition> prunedPartitionsIter;
private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
/**
* * External file scan node for Query Hive table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
@ -195,14 +202,21 @@ public class HiveScanNode extends FileQueryScanNode {
}
@Override
protected List<Split> getSplits() throws UserException {
public List<Split> getSplits() throws UserException {
long start = System.currentTimeMillis();
try {
if (!partitionInit) {
prunedPartitions = getPartitions();
partitionInit = true;
}
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(cache, getPartitions(), allFiles, bindBrokerName);
getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
}
if (LOG.isDebugEnabled()) {
LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
allFiles.size(), hmsTable.getDbName(), hmsTable.getName(),
@ -217,6 +231,59 @@ public class HiveScanNode extends FileQueryScanNode {
}
}
@Override
public List<Split> getNextBatch(int maxBatchSize) throws UserException {
try {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
List<Split> allFiles = Lists.newArrayList();
int numPartitions = 0;
while (allFiles.size() < maxBatchSize && prunedPartitionsIter.hasNext()) {
List<HivePartition> partitions = new ArrayList<>(NUM_PARTITIONS_PER_LOOP);
for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && prunedPartitionsIter.hasNext(); ++i) {
partitions.add(prunedPartitionsIter.next());
numPartitions++;
}
getFileSplitByPartitions(cache, partitions, allFiles, bindBrokerName);
}
if (allFiles.size() / numPartitions > numSplitsPerPartition) {
numSplitsPerPartition = allFiles.size() / numPartitions;
}
return allFiles;
} catch (Throwable t) {
LOG.warn("get file split failed for table: {}", hmsTable.getName(), t);
throw new UserException(
"get file split failed for table: " + hmsTable.getName() + ", err: " + Util.getRootCauseMessage(t),
t);
}
}
@Override
public boolean hasNext() {
return prunedPartitionsIter.hasNext();
}
@Override
public boolean isBatchMode() {
if (!partitionInit) {
try {
prunedPartitions = getPartitions();
} catch (Exception e) {
return false;
}
prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
return numPartitions >= 0 && prunedPartitions.size() >= numPartitions;
}
@Override
public int numApproximateSplits() {
return numSplitsPerPartition * prunedPartitions.size();
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
List<Split> allFiles, String bindBrokerName) throws IOException {
List<FileCacheValue> fileCaches;
@ -225,9 +292,6 @@ public class HiveScanNode extends FileQueryScanNode {
} else {
fileCaches = cache.getFilesByPartitionsWithCache(partitions, bindBrokerName);
}
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
}
if (tableSample != null) {
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches);
splitAllFiles(allFiles, hiveFileStatuses);

View File

@ -35,6 +35,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@ -68,6 +69,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -97,6 +99,14 @@ public class HudiScanNode extends HiveScanNode {
private boolean incrementalRead = false;
private IncrementalRelation incrementalRelation;
private boolean partitionInit = false;
private HoodieTimeline timeline;
private Option<String> snapshotTimestamp;
private String queryInstant;
private List<HivePartition> prunedPartitions;
private Iterator<HivePartition> prunedPartitionsIter;
private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
/**
* External file scan node for Query Hudi table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
@ -193,6 +203,22 @@ public class HudiScanNode extends HiveScanNode {
} else {
incrementalRelation = null;
}
timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
if (desc.getRef().getTableSnapshot() != null) {
queryInstant = desc.getRef().getTableSnapshot().getTime();
snapshotTimestamp = Option.of(queryInstant);
} else {
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
if (!snapshotInstant.isPresent()) {
prunedPartitions = Collections.emptyList();
prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
return;
}
queryInstant = snapshotInstant.get().getTimestamp();
snapshotTimestamp = Option.empty();
}
}
@Override
@ -300,32 +326,8 @@ public class HudiScanNode extends HiveScanNode {
incrementalRelation.getEndTs())).collect(Collectors.toList());
}
@Override
public List<Split> getSplits() throws UserException {
if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
return getIncrementalSplits();
}
HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
String queryInstant;
Option<String> snapshotTimestamp;
if (desc.getRef().getTableSnapshot() != null) {
queryInstant = desc.getRef().getTableSnapshot().getTime();
snapshotTimestamp = Option.of(queryInstant);
} else {
Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
if (!snapshotInstant.isPresent()) {
return Collections.emptyList();
}
queryInstant = snapshotInstant.get().getTimestamp();
snapshotTimestamp = Option.empty();
}
// Non partition table will get one dummy partition
List<HivePartition> partitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
private void getPartitionSplits(List<HivePartition> partitions, List<Split> splits) {
Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
partitions.forEach(partition -> executor.execute(() -> {
String globPath;
@ -370,9 +372,69 @@ public class HudiScanNode extends HiveScanNode {
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public List<Split> getSplits() throws UserException {
if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
return getIncrementalSplits();
}
if (!partitionInit) {
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
partitionInit = true;
}
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
getPartitionSplits(prunedPartitions, splits);
return splits;
}
@Override
public List<Split> getNextBatch(int maxBatchSize) throws UserException {
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
int numPartitions = 0;
while (splits.size() < maxBatchSize && prunedPartitionsIter.hasNext()) {
List<HivePartition> partitions = new ArrayList<>(NUM_PARTITIONS_PER_LOOP);
for (int i = 0; i < NUM_PARTITIONS_PER_LOOP && prunedPartitionsIter.hasNext(); ++i) {
partitions.add(prunedPartitionsIter.next());
numPartitions++;
}
getPartitionSplits(partitions, splits);
}
if (splits.size() / numPartitions > numSplitsPerPartition) {
numSplitsPerPartition = splits.size() / numPartitions;
}
return splits;
}
@Override
public boolean hasNext() {
return prunedPartitionsIter.hasNext();
}
@Override
public boolean isBatchMode() {
if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
return false;
}
if (!partitionInit) {
// Non partition table will get one dummy partition
prunedPartitions = HiveMetaStoreClientHelper.ugiDoAs(
HiveMetaStoreClientHelper.getConfiguration(hmsTable),
() -> getPrunedPartitions(hudiClient, snapshotTimestamp));
prunedPartitionsIter = prunedPartitions.iterator();
partitionInit = true;
}
int numPartitions = ConnectContext.get().getSessionVariable().getNumPartitionsInBatchMode();
return numPartitions >= 0 && prunedPartitions.size() >= numPartitions;
}
@Override
public int numApproximateSplits() {
return numSplitsPerPartition * prunedPartitions.size();
}
private HudiSplit generateHudiSplit(FileSlice fileSlice, List<String> partitionValues, String queryInstant) {
Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional();
String filePath = baseFile.map(BaseFile::getPath).orElse("");
@ -404,7 +466,11 @@ public class HudiScanNode extends HiveScanNode {
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum);
if (isBatchMode()) {
return super.getNodeExplainString(prefix, detailLevel);
} else {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum.get(), inputSplitsNum);
}
}
}

View File

@ -116,7 +116,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
}
@Override
protected List<Split> getSplits() throws UserException {
public List<Split> getSplits() throws UserException {
List<Split> result = new ArrayList<>();
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {

View File

@ -39,13 +39,13 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FederationBackendPolicy;
import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
import org.apache.doris.system.Backend;
@ -75,8 +75,10 @@ import java.util.stream.Collectors;
/**
* Representation of the common elements of all scan nodes.
*/
public abstract class ScanNode extends PlanNode {
public abstract class ScanNode extends PlanNode implements SplitGenerator {
private static final Logger LOG = LogManager.getLogger(ScanNode.class);
protected static final int NUM_SPLITS_PER_PARTITION = 10;
protected static final int NUM_PARTITIONS_PER_LOOP = 100;
protected final TupleDescriptor desc;
// for distribution prunner
protected Map<String, PartitionColumnFilter> columnFilters = Maps.newHashMap();
@ -85,6 +87,7 @@ public abstract class ScanNode extends PlanNode {
protected String sortColumn = null;
protected Analyzer analyzer;
protected List<TScanRangeLocations> scanRangeLocations = Lists.newArrayList();
protected List<SplitSource> splitSources = Lists.newArrayList();
protected PartitionInfo partitionsInfo = null;
// create a mapping between output slot's id and project expr
@ -123,10 +126,6 @@ public abstract class ScanNode extends PlanNode {
sortColumn = column;
}
protected List<Split> getSplits() throws UserException {
throw new NotImplementedException("Scan node sub class need to implement getSplits interface.");
}
/**
* cast expr to SlotDescriptor type
*/

View File

@ -1472,7 +1472,8 @@ public class Coordinator implements CoordInterface {
// Print an error stack here to know why send cancel again.
LOG.warn("Query {} already in abnormal status {}, but received cancel again,"
+ "so that send cancel to BE again",
DebugUtil.printId(queryId), queryStatus.toString(), new Exception());
DebugUtil.printId(queryId), queryStatus.toString(),
new Exception("cancel failed"));
} else {
queryStatus.updateStatus(TStatusCode.CANCELLED, "cancelled");
}
@ -2462,12 +2463,16 @@ public class Coordinator implements CoordInterface {
if (externalScanRange != null) {
TFileScanRange fileScanRange = externalScanRange.getFileScanRange();
if (fileScanRange != null) {
scanRangeNum += fileScanRange.getRanges().size();
if (fileScanRange.isSetRanges()) {
scanRangeNum += fileScanRange.getRanges().size();
} else if (fileScanRange.isSetSplitSource()) {
scanRangeNum += fileScanRange.getSplitSource().getNumSplits();
}
}
}
TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
if (paloScanRange != null) {
scanRangeNum = scanRangeNum + 1;
scanRangeNum += 1;
}
// TODO: more ranges?
}

View File

@ -413,6 +413,8 @@ public class SessionVariable implements Serializable, Writable {
// Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3.
public static final String FILE_SPLIT_SIZE = "file_split_size";
public static final String NUM_PARTITIONS_IN_BATCH_MODE = "num_partitions_in_batch_mode";
/**
* use insert stmt as the unified backend for all loads
*/
@ -1429,6 +1431,13 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
public long fileSplitSize = 0;
@VariableMgr.VarAttr(
name = NUM_PARTITIONS_IN_BATCH_MODE,
description = {"如果分区数量超过阈值,BE将通过batch方式获取scan ranges",
"If the number of partitions exceeds the threshold, scan ranges will be got through batch mode."},
needForward = true)
public int numPartitionsInBatchMode = 1024;
@VariableMgr.VarAttr(
name = ENABLE_PARQUET_LAZY_MAT,
description = {"控制 parquet reader 是否启用延迟物化技术。默认为 true。",
@ -2633,6 +2642,14 @@ public class SessionVariable implements Serializable, Writable {
this.fileSplitSize = fileSplitSize;
}
public int getNumPartitionsInBatchMode() {
return numPartitionsInBatchMode;
}
public void setNumSplitsInBatchMode(int numPartitionsInBatchMode) {
this.numPartitionsInBatchMode = numPartitionsInBatchMode;
}
public boolean isEnableParquetLazyMat() {
return enableParquetLazyMat;
}

View File

@ -68,6 +68,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.load.routineload.ErrorReason;
@ -140,6 +141,8 @@ import org.apache.doris.thrift.TFeResult;
import org.apache.doris.thrift.TFetchResourceResult;
import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
import org.apache.doris.thrift.TFetchSchemaTableDataResult;
import org.apache.doris.thrift.TFetchSplitBatchRequest;
import org.apache.doris.thrift.TFetchSplitBatchResult;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
@ -209,6 +212,7 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest;
import org.apache.doris.thrift.TRestoreSnapshotResult;
import org.apache.doris.thrift.TRollbackTxnRequest;
import org.apache.doris.thrift.TRollbackTxnResult;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TSchemaTableName;
import org.apache.doris.thrift.TShowProcessListRequest;
import org.apache.doris.thrift.TShowProcessListResult;
@ -956,6 +960,23 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr());
}
@Override
public TFetchSplitBatchResult fetchSplitBatch(TFetchSplitBatchRequest request) throws TException {
TFetchSplitBatchResult result = new TFetchSplitBatchResult();
SplitSource splitSource =
Env.getCurrentEnv().getSplitSourceManager().getSplitSource(request.getSplitSourceId());
if (splitSource == null) {
throw new TException("Split source " + request.getSplitSourceId() + " is released");
}
try {
List<TScanRangeLocations> locations = splitSource.getNextBatch(request.getMaxNumSplits());
result.setSplits(locations);
return result;
} catch (Exception e) {
throw new TException("Failed to get split source " + request.getSplitSourceId(), e);
}
}
@Override
public TMasterResult finishTask(TFinishTaskRequest request) throws TException {
return masterImpl.finishTask(request);