From d1e63c520184dce47b89d382ed5be4fb7340f94d Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Mon, 4 Nov 2024 09:40:08 +0800 Subject: [PATCH] [improvement](external)add some improvements for external scan (#38946) (#43156) bp #38946 Co-authored-by: wuwenchi --- .../apache/doris/paimon/PaimonJniScanner.java | 2 +- .../doris/datasource/ExternalScanNode.java | 6 +- .../datasource/FederationBackendPolicy.java | 2 +- .../apache/doris/datasource/FileSplit.java | 19 ++++ .../source/IcebergDeleteFileFilter.java | 20 ++-- .../iceberg/source/IcebergScanNode.java | 7 +- .../iceberg/source/IcebergSplit.java | 6 ++ .../paimon/source/PaimonScanNode.java | 25 ++++- .../datasource/paimon/source/PaimonSplit.java | 22 +++++ .../translator/PhysicalPlanTranslator.java | 3 +- .../doris/planner/SingleNodePlanner.java | 3 +- .../org/apache/doris/qe/SessionVariable.java | 36 +++++++ .../main/java/org/apache/doris/spi/Split.java | 5 + .../planner/FederationBackendPolicyTest.java | 95 +++++++++++++++++++ 14 files changed, 230 insertions(+), 21 deletions(-) diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index f229134e9d..7bd9fa631c 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -125,7 +125,7 @@ public class PaimonJniScanner extends JniScanner { int[] projected = getProjected(); readBuilder.withProjection(projected); readBuilder.withFilter(getPredicates()); - reader = readBuilder.newRead().createReader(getSplit()); + reader = readBuilder.newRead().executeFilter().createReader(getSplit()); paimonDataTypeList = Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java index aa61637218..978d8e94cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java @@ -45,8 +45,10 @@ public abstract class ExternalScanNode extends ScanNode { protected boolean needCheckColumnPriv; protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null - && ConnectContext.get().getSessionVariable().enableFileCache) - ? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy(); + && (ConnectContext.get().getSessionVariable().enableFileCache + || ConnectContext.get().getSessionVariable().getUseConsistentHashForExternalScan())) + ? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) + : new FederationBackendPolicy(); public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 13756c978f..7a616d224a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -495,7 +495,7 @@ public class FederationBackendPolicy { private static class SplitHash implements Funnel { @Override public void funnel(Split split, PrimitiveSink primitiveSink) { - primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8)); + primitiveSink.putBytes(split.getConsistentHashString().getBytes(StandardCharsets.UTF_8)); primitiveSink.putLong(split.getStart()); primitiveSink.putLong(split.getLength()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java index 7eaa87b74a..1ebb390e90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java @@ -47,6 +47,9 @@ public class FileSplit implements Split { // the location type for BE, eg: HDFS, LOCAL, S3 protected TFileType locationType; + public Long selfSplitWeight; + public Long targetSplitSize; + public FileSplit(LocationPath path, long start, long length, long fileLength, long modificationTime, String[] hosts, List partitionValues) { this.path = path; @@ -89,4 +92,20 @@ public class FileSplit implements Split { return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); } } + + @Override + public void setTargetSplitSize(Long targetSplitSize) { + this.targetSplitSize = targetSplitSize; + } + + @Override + public SplitWeight getSplitWeight() { + if (selfSplitWeight != null && targetSplitSize != null) { + double computedWeight = selfSplitWeight * 1.0 / targetSplitSize; + // Clamp the value be between the minimum weight and 1.0 (standard weight) + return SplitWeight.fromProportion(Math.min(Math.max(computedWeight, 0.01), 1.0)); + } else { + return SplitWeight.standard(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java index 394bc849a5..b876732ff3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java @@ -25,23 +25,25 @@ import java.util.OptionalLong; @Data public class IcebergDeleteFileFilter { private String deleteFilePath; + private long filesize; - public IcebergDeleteFileFilter(String deleteFilePath) { + public IcebergDeleteFileFilter(String deleteFilePath, long filesize) { this.deleteFilePath = deleteFilePath; + this.filesize = filesize; } public static PositionDelete createPositionDelete(String deleteFilePath, Long positionLowerBound, - Long positionUpperBound) { - return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound); + Long positionUpperBound, long filesize) { + return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound, filesize); } - public static EqualityDelete createEqualityDelete(String deleteFilePath, List fieldIds) { + public static EqualityDelete createEqualityDelete(String deleteFilePath, List fieldIds, long fileSize) { // todo: // Schema deleteSchema = TypeUtil.select(scan.schema(), new HashSet<>(fieldIds)); // StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct()); // pass deleteSet to BE // compare two StructLike value, if equals, filtered - return new EqualityDelete(deleteFilePath, fieldIds); + return new EqualityDelete(deleteFilePath, fieldIds, fileSize); } static class PositionDelete extends IcebergDeleteFileFilter { @@ -49,8 +51,8 @@ public class IcebergDeleteFileFilter { private final Long positionUpperBound; public PositionDelete(String deleteFilePath, Long positionLowerBound, - Long positionUpperBound) { - super(deleteFilePath); + Long positionUpperBound, long fileSize) { + super(deleteFilePath, fileSize); this.positionLowerBound = positionLowerBound; this.positionUpperBound = positionUpperBound; } @@ -67,8 +69,8 @@ public class IcebergDeleteFileFilter { static class EqualityDelete extends IcebergDeleteFileFilter { private List fieldIds; - public EqualityDelete(String deleteFilePath, List fieldIds) { - super(deleteFilePath); + public EqualityDelete(String deleteFilePath, List fieldIds, long fileSize) { + super(deleteFilePath, fileSize); this.fieldIds = fieldIds; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index fe6c54cf53..56dda7b4fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -282,7 +282,7 @@ public class IcebergScanNode extends FileQueryScanNode { } selectedPartitionNum = partitionPathSet.size(); - + splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } @@ -315,10 +315,11 @@ public class IcebergScanNode extends FileQueryScanNode { .map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId())) .map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes)); filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(), - positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L))); + positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L), + delete.fileSizeInBytes())); } else if (delete.content() == FileContent.EQUALITY_DELETES) { filters.add(IcebergDeleteFileFilter.createEqualityDelete( - delete.path().toString(), delete.equalityFieldIds())); + delete.path().toString(), delete.equalityFieldIds(), delete.fileSizeInBytes())); } else { throw new IllegalStateException("Unknown delete content: " + delete.content()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index 46e8f96ba3..580d3cf1bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -47,6 +47,7 @@ public class IcebergSplit extends FileSplit { this.formatVersion = formatVersion; this.config = config; this.originalPath = originalPath; + this.selfSplitWeight = length; } public long getRowCount() { @@ -56,4 +57,9 @@ public class IcebergSplit extends FileSplit { public void setRowCount(long rowCount) { this.rowCount = rowCount; } + + public void setDeleteFileFilters(List deleteFileFilters) { + this.deleteFileFilters = deleteFileFilters; + this.selfSplitWeight += deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 02f831ba37..cd477cc9b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -28,7 +28,7 @@ import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -101,9 +101,14 @@ public class PaimonScanNode extends FileQueryScanNode { private int rawFileSplitNum = 0; private int paimonSplitNum = 0; private List splitStats = new ArrayList<>(); + private SessionVariable sessionVariable; - public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + public PaimonScanNode(PlanNodeId id, + TupleDescriptor desc, + boolean needCheckColumnPriv, + SessionVariable sessionVariable) { super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv); + this.sessionVariable = sessionVariable; } @Override @@ -176,7 +181,9 @@ public class PaimonScanNode extends FileQueryScanNode { @Override public List getSplits() throws UserException { - boolean forceJniScanner = ConnectContext.get().getSessionVariable().isForceJniScanner(); + boolean forceJniScanner = sessionVariable.isForceJniScanner(); + SessionVariable.IgnoreSplitType ignoreSplitType = + SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType()); List splits = new ArrayList<>(); int[] projected = desc.getSlots().stream().mapToInt( slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName()))) @@ -196,7 +203,11 @@ public class PaimonScanNode extends FileQueryScanNode { selectedPartitionValues.add(partitionValue); Optional> optRawFiles = dataSplit.convertToRawFiles(); Optional> optDeletionFiles = dataSplit.deletionFiles(); + if (supportNativeReader(optRawFiles)) { + if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_NATIVE) { + continue; + } splitStat.setType(SplitReadType.NATIVE); splitStat.setRawFileConvertable(true); List rawFiles = optRawFiles.get(); @@ -252,10 +263,16 @@ public class PaimonScanNode extends FileQueryScanNode { } } } else { + if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) { + continue; + } splits.add(new PaimonSplit(split)); ++paimonSplitNum; } } else { + if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) { + continue; + } splits.add(new PaimonSplit(split)); ++paimonSplitNum; } @@ -263,6 +280,8 @@ public class PaimonScanNode extends FileQueryScanNode { } this.selectedPartitionNum = selectedPartitionValues.size(); // TODO: get total partition number + // We should set fileSplitSize at the end because fileSplitSize may be modified in splitFile. + splits.forEach(s -> s.setTargetSplitSize(fileSplitSize)); return splits; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index ffd063d77e..3ab38c7db2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -23,11 +23,14 @@ import org.apache.doris.datasource.SplitCreator; import org.apache.doris.datasource.TableFormatType; import com.google.common.collect.Maps; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.Split; import java.util.List; import java.util.Optional; +import java.util.UUID; public class PaimonSplit extends FileSplit { private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap()); @@ -35,11 +38,20 @@ public class PaimonSplit extends FileSplit { private TableFormatType tableFormatType; private Optional optDeletionFile; + public PaimonSplit(Split split) { super(DUMMY_PATH, 0, 0, 0, 0, null, null); this.split = split; this.tableFormatType = TableFormatType.PAIMON; this.optDeletionFile = Optional.empty(); + + if (split instanceof DataSplit) { + List dataFileMetas = ((DataSplit) split).dataFiles(); + this.path = new LocationPath("/" + dataFileMetas.get(0).fileName()); + this.selfSplitWeight = dataFileMetas.stream().mapToLong(DataFileMeta::fileSize).sum(); + } else { + this.selfSplitWeight = split.rowCount(); + } } private PaimonSplit(LocationPath file, long start, long length, long fileLength, long modificationTime, @@ -47,6 +59,15 @@ public class PaimonSplit extends FileSplit { super(file, start, length, fileLength, modificationTime, hosts, partitionList); this.tableFormatType = TableFormatType.PAIMON; this.optDeletionFile = Optional.empty(); + this.selfSplitWeight = length; + } + + @Override + public String getConsistentHashString() { + if (this.path == DUMMY_PATH) { + return UUID.randomUUID().toString(); + } + return getPathString(); } public Split getSplit() { @@ -66,6 +87,7 @@ public class PaimonSplit extends FileSplit { } public void setDeletionFile(DeletionFile deletionFile) { + this.selfSplitWeight += deletionFile.length(); this.optDeletionFile = Optional.of(deletionFile); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 679df9bbb8..284277427b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -606,7 +606,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor alternativeHosts); + default String getConsistentHashString() { + return getPathString(); + } + + void setTargetSplitSize(Long targetSplitSize); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index 57e64f0f22..30ceac6dba 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -739,4 +739,99 @@ public class FederationBackendPolicyTest { return entries1.containsAll(entries2) && entries2.containsAll(entries1); } + + @Test + public void testSplitWeight() { + FileSplit fileSplit = new FileSplit(new LocationPath("s1"), 0, 1000, 1000, 0, null, Collections.emptyList()); + fileSplit.setSelfSplitWeight(1000L); + + fileSplit.setTargetSplitSize(10L); + Assert.assertEquals(100L, fileSplit.getSplitWeight().getRawValue(), 100L); + + fileSplit.setTargetSplitSize(10000000L); + Assert.assertEquals(1L, fileSplit.getSplitWeight().getRawValue()); + + fileSplit.setTargetSplitSize(2000L); + Assert.assertEquals(50, fileSplit.getSplitWeight().getRawValue()); + } + + @Test + public void testBiggerSplit() throws UserException { + SystemInfoService service = new SystemInfoService(); + + Backend backend1 = new Backend(1L, "172.30.0.100", 9050); + backend1.setAlive(true); + service.addBackend(backend1); + Backend backend2 = new Backend(2L, "172.30.0.106", 9050); + backend2.setAlive(true); + service.addBackend(backend2); + Backend backend3 = new Backend(3L, "172.30.0.118", 9050); + backend3.setAlive(true); + service.addBackend(backend3); + + new MockUp() { + @Mock + public SystemInfoService getCurrentSystemInfo() { + return service; + } + }; + + List splits = new ArrayList<>(); + splits.add(genFileSplit("s1", 1000000L, 1000L)); // belong 2 + splits.add(genFileSplit("s2", 100000L, 1000L)); // belong 2 + splits.add(genFileSplit("s3", 200000L, 1000L)); // belong 2 + splits.add(genFileSplit("s4", 300000L, 1000L)); // belong 2 + splits.add(genFileSplit("s5", 800000L, 1000L)); // belong 1 + + FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + // Set these options to ensure that the consistent hash algorithm is consistent. + policy.setEnableSplitsRedistribution(false); + Config.split_assigner_min_consistent_hash_candidate_num = 1; + policy.init(); + Multimap assignment = policy.computeScanRangeAssignment(splits); + Map> backendListMap = mergeAssignment(assignment); + backendListMap.forEach((k, v) -> { + if (k.getId() == 1) { + Assert.assertEquals(800000L, v.stream().mapToLong(Split::getLength).sum()); + } else if (k.getId() == 2) { + Assert.assertEquals(1600000L, v.stream().mapToLong(Split::getLength).sum()); + } + }); + + Config.split_assigner_min_consistent_hash_candidate_num = 1; + FederationBackendPolicy policy2 = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING); + policy2.init(); + Multimap assignment2 = policy2.computeScanRangeAssignment(splits); + Map> backendListMap2 = mergeAssignment(assignment2); + backendListMap2.forEach((k, v) -> { + if (k.getId() == 1) { + Assert.assertEquals(900000L, v.stream().mapToLong(Split::getLength).sum()); + } else if (k.getId() == 2) { + Assert.assertEquals(500000L, v.stream().mapToLong(Split::getLength).sum()); + } else if (k.getId() == 3) { + Assert.assertEquals(1000000L, v.stream().mapToLong(Split::getLength).sum()); + } + }); + } + + private Map> mergeAssignment(Multimap ass) { + HashMap> map = new HashMap<>(); + ass.forEach((k, v) -> { + if (map.containsKey(k)) { + map.get(k).add(v); + } else { + ArrayList splits = new ArrayList<>(); + splits.add(v); + map.put(k, splits); + } + }); + return map; + } + + private FileSplit genFileSplit(String path, long length, long targetSplit) { + FileSplit s = new FileSplit(new LocationPath(path), 0, length, length, 0, null, Collections.emptyList()); + s.setSelfSplitWeight(length); + s.setTargetSplitSize(targetSplit); + return s; + } }