[improvement](external)add some improvements for external scan (#38946) (#43156)

bp #38946

Co-authored-by: wuwenchi <wuwenchihdu@hotmail.com>
This commit is contained in:
Mingyu Chen (Rayner)
2024-11-04 09:40:08 +08:00
committed by GitHub
parent e3170d6319
commit d1e63c5201
14 changed files with 230 additions and 21 deletions

View File

@ -125,7 +125,7 @@ public class PaimonJniScanner extends JniScanner {
int[] projected = getProjected();
readBuilder.withProjection(projected);
readBuilder.withFilter(getPredicates());
reader = readBuilder.newRead().createReader(getSplit());
reader = readBuilder.newRead().executeFilter().createReader(getSplit());
paimonDataTypeList =
Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList());
}

View File

@ -45,8 +45,10 @@ public abstract class ExternalScanNode extends ScanNode {
protected boolean needCheckColumnPriv;
protected final FederationBackendPolicy backendPolicy = (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableFileCache)
? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING) : new FederationBackendPolicy();
&& (ConnectContext.get().getSessionVariable().enableFileCache
|| ConnectContext.get().getSessionVariable().getUseConsistentHashForExternalScan()))
? new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
: new FederationBackendPolicy();
public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {

View File

@ -495,7 +495,7 @@ public class FederationBackendPolicy {
private static class SplitHash implements Funnel<Split> {
@Override
public void funnel(Split split, PrimitiveSink primitiveSink) {
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
primitiveSink.putBytes(split.getConsistentHashString().getBytes(StandardCharsets.UTF_8));
primitiveSink.putLong(split.getStart());
primitiveSink.putLong(split.getLength());
}

View File

@ -47,6 +47,9 @@ public class FileSplit implements Split {
// the location type for BE, eg: HDFS, LOCAL, S3
protected TFileType locationType;
public Long selfSplitWeight;
public Long targetSplitSize;
public FileSplit(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues) {
this.path = path;
@ -89,4 +92,20 @@ public class FileSplit implements Split {
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
}
}
@Override
public void setTargetSplitSize(Long targetSplitSize) {
this.targetSplitSize = targetSplitSize;
}
@Override
public SplitWeight getSplitWeight() {
if (selfSplitWeight != null && targetSplitSize != null) {
double computedWeight = selfSplitWeight * 1.0 / targetSplitSize;
// Clamp the value be between the minimum weight and 1.0 (standard weight)
return SplitWeight.fromProportion(Math.min(Math.max(computedWeight, 0.01), 1.0));
} else {
return SplitWeight.standard();
}
}
}

View File

@ -25,23 +25,25 @@ import java.util.OptionalLong;
@Data
public class IcebergDeleteFileFilter {
private String deleteFilePath;
private long filesize;
public IcebergDeleteFileFilter(String deleteFilePath) {
public IcebergDeleteFileFilter(String deleteFilePath, long filesize) {
this.deleteFilePath = deleteFilePath;
this.filesize = filesize;
}
public static PositionDelete createPositionDelete(String deleteFilePath, Long positionLowerBound,
Long positionUpperBound) {
return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound);
Long positionUpperBound, long filesize) {
return new PositionDelete(deleteFilePath, positionLowerBound, positionUpperBound, filesize);
}
public static EqualityDelete createEqualityDelete(String deleteFilePath, List<Integer> fieldIds) {
public static EqualityDelete createEqualityDelete(String deleteFilePath, List<Integer> fieldIds, long fileSize) {
// todo:
// Schema deleteSchema = TypeUtil.select(scan.schema(), new HashSet<>(fieldIds));
// StructLikeSet deleteSet = StructLikeSet.create(deleteSchema.asStruct());
// pass deleteSet to BE
// compare two StructLike value, if equals, filtered
return new EqualityDelete(deleteFilePath, fieldIds);
return new EqualityDelete(deleteFilePath, fieldIds, fileSize);
}
static class PositionDelete extends IcebergDeleteFileFilter {
@ -49,8 +51,8 @@ public class IcebergDeleteFileFilter {
private final Long positionUpperBound;
public PositionDelete(String deleteFilePath, Long positionLowerBound,
Long positionUpperBound) {
super(deleteFilePath);
Long positionUpperBound, long fileSize) {
super(deleteFilePath, fileSize);
this.positionLowerBound = positionLowerBound;
this.positionUpperBound = positionUpperBound;
}
@ -67,8 +69,8 @@ public class IcebergDeleteFileFilter {
static class EqualityDelete extends IcebergDeleteFileFilter {
private List<Integer> fieldIds;
public EqualityDelete(String deleteFilePath, List<Integer> fieldIds) {
super(deleteFilePath);
public EqualityDelete(String deleteFilePath, List<Integer> fieldIds, long fileSize) {
super(deleteFilePath, fileSize);
this.fieldIds = fieldIds;
}

View File

@ -282,7 +282,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
selectedPartitionNum = partitionPathSet.size();
splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}
@ -315,10 +315,11 @@ public class IcebergScanNode extends FileQueryScanNode {
.map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
.map(bytes -> Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L)));
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L),
delete.fileSizeInBytes()));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
filters.add(IcebergDeleteFileFilter.createEqualityDelete(
delete.path().toString(), delete.equalityFieldIds()));
delete.path().toString(), delete.equalityFieldIds(), delete.fileSizeInBytes()));
} else {
throw new IllegalStateException("Unknown delete content: " + delete.content());
}

View File

@ -47,6 +47,7 @@ public class IcebergSplit extends FileSplit {
this.formatVersion = formatVersion;
this.config = config;
this.originalPath = originalPath;
this.selfSplitWeight = length;
}
public long getRowCount() {
@ -56,4 +57,9 @@ public class IcebergSplit extends FileSplit {
public void setRowCount(long rowCount) {
this.rowCount = rowCount;
}
public void setDeleteFileFilters(List<IcebergDeleteFileFilter> deleteFileFilters) {
this.deleteFileFilters = deleteFileFilters;
this.selfSplitWeight += deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
}
}

View File

@ -28,7 +28,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@ -101,9 +101,14 @@ public class PaimonScanNode extends FileQueryScanNode {
private int rawFileSplitNum = 0;
private int paimonSplitNum = 0;
private List<SplitStat> splitStats = new ArrayList<>();
private SessionVariable sessionVariable;
public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
public PaimonScanNode(PlanNodeId id,
TupleDescriptor desc,
boolean needCheckColumnPriv,
SessionVariable sessionVariable) {
super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, needCheckColumnPriv);
this.sessionVariable = sessionVariable;
}
@Override
@ -176,7 +181,9 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public List<Split> getSplits() throws UserException {
boolean forceJniScanner = ConnectContext.get().getSessionVariable().isForceJniScanner();
boolean forceJniScanner = sessionVariable.isForceJniScanner();
SessionVariable.IgnoreSplitType ignoreSplitType =
SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType());
List<Split> splits = new ArrayList<>();
int[] projected = desc.getSlots().stream().mapToInt(
slot -> (source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
@ -196,7 +203,11 @@ public class PaimonScanNode extends FileQueryScanNode {
selectedPartitionValues.add(partitionValue);
Optional<List<RawFile>> optRawFiles = dataSplit.convertToRawFiles();
Optional<List<DeletionFile>> optDeletionFiles = dataSplit.deletionFiles();
if (supportNativeReader(optRawFiles)) {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_NATIVE) {
continue;
}
splitStat.setType(SplitReadType.NATIVE);
splitStat.setRawFileConvertable(true);
List<RawFile> rawFiles = optRawFiles.get();
@ -252,10 +263,16 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
} else {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) {
continue;
}
splits.add(new PaimonSplit(split));
++paimonSplitNum;
}
} else {
if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) {
continue;
}
splits.add(new PaimonSplit(split));
++paimonSplitNum;
}
@ -263,6 +280,8 @@ public class PaimonScanNode extends FileQueryScanNode {
}
this.selectedPartitionNum = selectedPartitionValues.size();
// TODO: get total partition number
// We should set fileSplitSize at the end because fileSplitSize may be modified in splitFile.
splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
return splits;
}

View File

@ -23,11 +23,14 @@ import org.apache.doris.datasource.SplitCreator;
import org.apache.doris.datasource.TableFormatType;
import com.google.common.collect.Maps;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.Split;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
public class PaimonSplit extends FileSplit {
private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap());
@ -35,11 +38,20 @@ public class PaimonSplit extends FileSplit {
private TableFormatType tableFormatType;
private Optional<DeletionFile> optDeletionFile;
public PaimonSplit(Split split) {
super(DUMMY_PATH, 0, 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
if (split instanceof DataSplit) {
List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
this.path = new LocationPath("/" + dataFileMetas.get(0).fileName());
this.selfSplitWeight = dataFileMetas.stream().mapToLong(DataFileMeta::fileSize).sum();
} else {
this.selfSplitWeight = split.rowCount();
}
}
private PaimonSplit(LocationPath file, long start, long length, long fileLength, long modificationTime,
@ -47,6 +59,15 @@ public class PaimonSplit extends FileSplit {
super(file, start, length, fileLength, modificationTime, hosts, partitionList);
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
this.selfSplitWeight = length;
}
@Override
public String getConsistentHashString() {
if (this.path == DUMMY_PATH) {
return UUID.randomUUID().toString();
}
return getPathString();
}
public Split getSplit() {
@ -66,6 +87,7 @@ public class PaimonSplit extends FileSplit {
}
public void setDeletionFile(DeletionFile deletionFile) {
this.selfSplitWeight += deletionFile.length();
this.optDeletionFile = Optional.of(deletionFile);
}

View File

@ -606,7 +606,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
} else if (table instanceof IcebergExternalTable) {
scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else if (table instanceof PaimonExternalTable) {
scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
scanNode = new PaimonScanNode(context.nextPlanNodeId(), tupleDescriptor, false,
ConnectContext.get().getSessionVariable());
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
} else {

View File

@ -1987,7 +1987,8 @@ public class SingleNodePlanner {
scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
case PAIMON_EXTERNAL_TABLE:
scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
scanNode = new PaimonScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true,
ConnectContext.get().getSessionVariable());
break;
case MAX_COMPUTE_EXTERNAL_TABLE:
// TODO: support max compute scan node

View File

@ -920,6 +920,26 @@ public class SessionVariable implements Serializable, Writable {
setter = "setPipelineTaskNum")
public int parallelPipelineTaskNum = 0;
public enum IgnoreSplitType {
NONE,
IGNORE_JNI,
IGNORE_NATIVE
}
public static final String IGNORE_SPLIT_TYPE = "ignore_split_type";
@VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE,
checker = "checkIgnoreSplitType",
options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"},
description = {"忽略指定类型的split", "Ignore splits of the specified type"})
public String ignoreSplitType = IgnoreSplitType.NONE.toString();
public static final String USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN = "use_consistent_hash_for_external_scan";
@VariableMgr.VarAttr(name = USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN,
description = {"对外表采用一致性hash的方式做split的分发",
"Use consistent hashing to split the appearance for external scan"})
public boolean useConsistentHashForExternalScan = false;
@VariableMgr.VarAttr(name = PROFILE_LEVEL, fuzzy = true)
public int profileLevel = 1;
@ -4226,6 +4246,22 @@ public class SessionVariable implements Serializable, Writable {
return forceJniScanner;
}
public String getIgnoreSplitType() {
return ignoreSplitType;
}
public void checkIgnoreSplitType(String value) {
try {
IgnoreSplitType.valueOf(value);
} catch (Exception e) {
throw new UnsupportedOperationException("We only support `NONE`, `IGNORE_JNI` and `IGNORE_NATIVE`");
}
}
public boolean getUseConsistentHashForExternalScan() {
return useConsistentHashForExternalScan;
}
public void setForceJniScanner(boolean force) {
forceJniScanner = force;
}

View File

@ -48,4 +48,9 @@ public interface Split {
void setAlternativeHosts(List<String> alternativeHosts);
default String getConsistentHashString() {
return getPathString();
}
void setTargetSplitSize(Long targetSplitSize);
}

View File

@ -739,4 +739,99 @@ public class FederationBackendPolicyTest {
return entries1.containsAll(entries2) && entries2.containsAll(entries1);
}
@Test
public void testSplitWeight() {
FileSplit fileSplit = new FileSplit(new LocationPath("s1"), 0, 1000, 1000, 0, null, Collections.emptyList());
fileSplit.setSelfSplitWeight(1000L);
fileSplit.setTargetSplitSize(10L);
Assert.assertEquals(100L, fileSplit.getSplitWeight().getRawValue(), 100L);
fileSplit.setTargetSplitSize(10000000L);
Assert.assertEquals(1L, fileSplit.getSplitWeight().getRawValue());
fileSplit.setTargetSplitSize(2000L);
Assert.assertEquals(50, fileSplit.getSplitWeight().getRawValue());
}
@Test
public void testBiggerSplit() throws UserException {
SystemInfoService service = new SystemInfoService();
Backend backend1 = new Backend(1L, "172.30.0.100", 9050);
backend1.setAlive(true);
service.addBackend(backend1);
Backend backend2 = new Backend(2L, "172.30.0.106", 9050);
backend2.setAlive(true);
service.addBackend(backend2);
Backend backend3 = new Backend(3L, "172.30.0.118", 9050);
backend3.setAlive(true);
service.addBackend(backend3);
new MockUp<Env>() {
@Mock
public SystemInfoService getCurrentSystemInfo() {
return service;
}
};
List<Split> splits = new ArrayList<>();
splits.add(genFileSplit("s1", 1000000L, 1000L)); // belong 2
splits.add(genFileSplit("s2", 100000L, 1000L)); // belong 2
splits.add(genFileSplit("s3", 200000L, 1000L)); // belong 2
splits.add(genFileSplit("s4", 300000L, 1000L)); // belong 2
splits.add(genFileSplit("s5", 800000L, 1000L)); // belong 1
FederationBackendPolicy policy = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
// Set these options to ensure that the consistent hash algorithm is consistent.
policy.setEnableSplitsRedistribution(false);
Config.split_assigner_min_consistent_hash_candidate_num = 1;
policy.init();
Multimap<Backend, Split> assignment = policy.computeScanRangeAssignment(splits);
Map<Backend, List<Split>> backendListMap = mergeAssignment(assignment);
backendListMap.forEach((k, v) -> {
if (k.getId() == 1) {
Assert.assertEquals(800000L, v.stream().mapToLong(Split::getLength).sum());
} else if (k.getId() == 2) {
Assert.assertEquals(1600000L, v.stream().mapToLong(Split::getLength).sum());
}
});
Config.split_assigner_min_consistent_hash_candidate_num = 1;
FederationBackendPolicy policy2 = new FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
policy2.init();
Multimap<Backend, Split> assignment2 = policy2.computeScanRangeAssignment(splits);
Map<Backend, List<Split>> backendListMap2 = mergeAssignment(assignment2);
backendListMap2.forEach((k, v) -> {
if (k.getId() == 1) {
Assert.assertEquals(900000L, v.stream().mapToLong(Split::getLength).sum());
} else if (k.getId() == 2) {
Assert.assertEquals(500000L, v.stream().mapToLong(Split::getLength).sum());
} else if (k.getId() == 3) {
Assert.assertEquals(1000000L, v.stream().mapToLong(Split::getLength).sum());
}
});
}
private Map<Backend, List<Split>> mergeAssignment(Multimap<Backend, Split> ass) {
HashMap<Backend, List<Split>> map = new HashMap<>();
ass.forEach((k, v) -> {
if (map.containsKey(k)) {
map.get(k).add(v);
} else {
ArrayList<Split> splits = new ArrayList<>();
splits.add(v);
map.put(k, splits);
}
});
return map;
}
private FileSplit genFileSplit(String path, long length, long targetSplit) {
FileSplit s = new FileSplit(new LocationPath(path), 0, length, length, 0, null, Collections.emptyList());
s.setSelfSplitWeight(length);
s.setTargetSplitSize(targetSplit);
return s;
}
}