[feature](paimon)support native reader (#29339)

Support native reader fro paimon.

Upgrade paimon 0.5 to 0.6 : apache/doris-shade#32
This commit is contained in:
wuwenchi
2024-01-04 14:31:48 +08:00
committed by GitHub
parent d8a08dad90
commit bfe65565d8
7 changed files with 102 additions and 11 deletions

View File

@ -25,6 +25,7 @@ import java.util.Map;
public class PaimonProperties {
public static final String WAREHOUSE = "warehouse";
public static final String FILE_FORMAT = "file.format";
public static final String PAIMON_PREFIX = "paimon";
public static final String PAIMON_CATALOG_TYPE = "metastore";
public static final String HIVE_METASTORE_URIS = "uri";

View File

@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
@ -41,15 +42,20 @@ import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.InstantiationUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -102,7 +108,11 @@ public class PaimonScanNode extends FileQueryScanNode {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
fileDesc.setPaimonSplit(encodeObjectToString(paimonSplit.getSplit()));
org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
if (split != null) {
fileDesc.setPaimonSplit(encodeObjectToString(split));
}
fileDesc.setFileFormat(source.getFileFormat());
fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName())
.collect(Collectors.joining(",")));
@ -127,13 +137,52 @@ public class PaimonScanNode extends FileQueryScanNode {
List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.withFilter(predicates)
.withProjection(projected)
.newScan().plan().splits();
boolean supportNative = supportNativeReader();
for (org.apache.paimon.table.source.Split split : paimonSplits) {
PaimonSplit paimonSplit = new PaimonSplit(split);
splits.add(paimonSplit);
if (supportNative && split instanceof DataSplit) {
DataSplit dataSplit = (DataSplit) split;
Optional<List<RawFile>> optRowFiles = dataSplit.convertToRawFiles();
if (optRowFiles.isPresent()) {
List<RawFile> rawFiles = optRowFiles.get();
for (RawFile file : rawFiles) {
LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties());
Path finalDataFilePath = locationPath.toScanRangeLocation();
try {
splits.addAll(
splitFile(
finalDataFilePath,
0,
null,
file.length(),
-1,
true,
null,
PaimonSplit.PaimonSplitCreator.DEFAULT));
} catch (IOException e) {
throw new UserException("Paimon error to split file: " + e.getMessage(), e);
}
}
} else {
splits.add(new PaimonSplit(split));
}
} else {
splits.add(new PaimonSplit(split));
}
}
return splits;
}
private boolean supportNativeReader() {
String fileFormat = source.getFileFormat().toLowerCase();
switch (fileFormat) {
case "orc":
case "parquet":
return true;
default:
return false;
}
}
//When calling 'setPaimonParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, paimon_column_names is temporarily reset here
@Override
@ -157,8 +206,8 @@ public class PaimonScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
//todo: no use
return TFileType.FILE_S3;
return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for paimon table "));
}
@Override

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.PaimonExternalTable;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;
@ -61,4 +62,8 @@ public class PaimonSource {
public ExternalCatalog getCatalog() {
return paimonExtTable.getCatalog();
}
public String getFileFormat() {
return originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc");
}
}

View File

@ -18,21 +18,30 @@
package org.apache.doris.planner.external.paimon;
import org.apache.doris.planner.external.FileSplit;
import org.apache.doris.planner.external.SplitCreator;
import org.apache.doris.planner.external.TableFormatType;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.table.source.Split;
import java.util.List;
public class PaimonSplit extends FileSplit {
private Split split;
private TableFormatType tableFormatType;
public PaimonSplit(Split split) {
super(new Path("dummyPath"), 0, 0, 0, null, null);
super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
}
public PaimonSplit(Path file, long start, long length, long fileLength, String[] hosts,
List<String> partitionList) {
super(file, start, length, fileLength, hosts, partitionList);
this.tableFormatType = TableFormatType.PAIMON;
}
public Split getSplit() {
return split;
}
@ -49,4 +58,19 @@ public class PaimonSplit extends FileSplit {
this.tableFormatType = tableFormatType;
}
public static class PaimonSplitCreator implements SplitCreator {
static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
@Override
public org.apache.doris.spi.Split create(Path path,
long start,
long length,
long fileLength,
long modificationTime,
String[] hosts,
List<String> partitionValues) {
return new PaimonSplit(path, start, length, fileLength, hosts, partitionValues);
}
}
}