[pipelineX](filescan) Support parallel executing for external table scanning (#30121)

This commit is contained in:
Gabriel
2024-01-19 16:12:35 +08:00
committed by yiguolei
parent 21db1ecff2
commit caf7790797
3 changed files with 22 additions and 2 deletions

View File

@ -1736,6 +1736,11 @@ public class OlapScanNode extends ScanNode {
return true;
}
@Override
public int getScanRangeNum() {
return getScanTabletIds().size();
}
}

View File

@ -725,8 +725,11 @@ public abstract class ScanNode extends PlanNode {
&& context.getSessionVariable().isIgnoreStorageDataDistribution()
&& context.getSessionVariable().getEnablePipelineXEngine()
&& !fragment.isHasNullAwareLeftAntiJoin()
&& ((this instanceof OlapScanNode) && ((OlapScanNode) this).getScanTabletIds().size()
< ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
&& getScanRangeNum() < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
public int getScanRangeNum() {
return Integer.MAX_VALUE;
}
public boolean haveLimitAndConjunts() {

View File

@ -485,6 +485,18 @@ public abstract class FileQueryScanNode extends FileScanNode {
params.setSlotNameToSchemaPos(columnNameToPosition);
}
@Override
public int getScanRangeNum() {
Preconditions.checkNotNull(scanRangeLocations);
int i = 0;
for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
TScanRange tScanRange = tScanRangeLocations.getScanRange();
TFileScanRange tFileScanRange = tScanRange.getExtScanRange().getFileScanRange();
i += tFileScanRange.getRangesSize();
}
return i;
}
protected abstract TFileType getLocationType() throws UserException;
protected abstract TFileType getLocationType(String location) throws UserException;