diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 6002f4eea6..c70295e131 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -736,11 +736,22 @@ Status VFileScanner::_get_next_reader() { // JNI reader can only push down column value range bool push_down_predicates = !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI; - if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params && - range.table_format_params.table_format_type == "hudi") { - if (range.table_format_params.hudi_params.delta_logs.empty()) { + if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) { + if (range.table_format_params.table_format_type == "hudi" && + range.table_format_params.hudi_params.delta_logs.empty()) { // fall back to native reader if there is no log file format_type = TFileFormatType::FORMAT_PARQUET; + } else if (range.table_format_params.table_format_type == "paimon" && + !range.table_format_params.paimon_params.__isset.paimon_split) { + // use native reader + auto format = range.table_format_params.paimon_params.file_format; + if (format == "orc") { + format_type = TFileFormatType::FORMAT_ORC; + } else if (format == "parquet") { + format_type = TFileFormatType::FORMAT_PARQUET; + } else { + return Status::InternalError("Not supported paimon file format: {}", format); + } } } bool need_to_get_parsed_schema = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java index 06d205ff22..318e2bac30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/PaimonProperties.java @@ -25,6 +25,7 @@ import java.util.Map; public class PaimonProperties { public static final String WAREHOUSE = "warehouse"; + public static final String FILE_FORMAT = "file.format"; public static final String PAIMON_PREFIX = "paimon"; public static final String PAIMON_CATALOG_TYPE = "metastore"; public static final String HIVE_METASTORE_URIS = "uri"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java index 1e23d2f781..cf82290199 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonScanNode.java @@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; @@ -41,15 +42,20 @@ import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.AbstractFileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.utils.InstantiationUtil; +import java.io.IOException; import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -102,7 +108,11 @@ public class PaimonScanNode extends FileQueryScanNode { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value()); TPaimonFileDesc fileDesc = new TPaimonFileDesc(); - fileDesc.setPaimonSplit(encodeObjectToString(paimonSplit.getSplit())); + org.apache.paimon.table.source.Split split = paimonSplit.getSplit(); + if (split != null) { + fileDesc.setPaimonSplit(encodeObjectToString(split)); + } + fileDesc.setFileFormat(source.getFileFormat()); fileDesc.setPaimonPredicate(encodeObjectToString(predicates)); fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName()) .collect(Collectors.joining(","))); @@ -127,13 +137,52 @@ public class PaimonScanNode extends FileQueryScanNode { List paimonSplits = readBuilder.withFilter(predicates) .withProjection(projected) .newScan().plan().splits(); + boolean supportNative = supportNativeReader(); for (org.apache.paimon.table.source.Split split : paimonSplits) { - PaimonSplit paimonSplit = new PaimonSplit(split); - splits.add(paimonSplit); + if (supportNative && split instanceof DataSplit) { + DataSplit dataSplit = (DataSplit) split; + Optional> optRowFiles = dataSplit.convertToRawFiles(); + if (optRowFiles.isPresent()) { + List rawFiles = optRowFiles.get(); + for (RawFile file : rawFiles) { + LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties()); + Path finalDataFilePath = locationPath.toScanRangeLocation(); + try { + splits.addAll( + splitFile( + finalDataFilePath, + 0, + null, + file.length(), + -1, + true, + null, + PaimonSplit.PaimonSplitCreator.DEFAULT)); + } catch (IOException e) { + throw new UserException("Paimon error to split file: " + e.getMessage(), e); + } + } + } else { + splits.add(new PaimonSplit(split)); + } + } else { + splits.add(new PaimonSplit(split)); + } } return splits; } + private boolean supportNativeReader() { + String fileFormat = source.getFileFormat().toLowerCase(); + switch (fileFormat) { + case "orc": + case "parquet": + return true; + default: + return false; + } + } + //When calling 'setPaimonParams' and 'getSplits', the column trimming has not been performed yet, // Therefore, paimon_column_names is temporarily reset here @Override @@ -157,8 +206,8 @@ public class PaimonScanNode extends FileQueryScanNode { @Override public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException { - //todo: no use - return TFileType.FILE_S3; + return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() -> + new DdlException("Unknown file location " + location + " for paimon table ")); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java index 2f55e30c08..fa838350c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSource.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.PaimonExternalTable; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.property.constants.PaimonProperties; import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileAttributes; @@ -61,4 +62,8 @@ public class PaimonSource { public ExternalCatalog getCatalog() { return paimonExtTable.getCatalog(); } + + public String getFileFormat() { + return originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java index 8ecf539db9..13263fb584 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/paimon/PaimonSplit.java @@ -18,21 +18,30 @@ package org.apache.doris.planner.external.paimon; import org.apache.doris.planner.external.FileSplit; +import org.apache.doris.planner.external.SplitCreator; import org.apache.doris.planner.external.TableFormatType; import org.apache.hadoop.fs.Path; import org.apache.paimon.table.source.Split; +import java.util.List; + public class PaimonSplit extends FileSplit { private Split split; private TableFormatType tableFormatType; public PaimonSplit(Split split) { - super(new Path("dummyPath"), 0, 0, 0, null, null); + super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null); this.split = split; this.tableFormatType = TableFormatType.PAIMON; } + public PaimonSplit(Path file, long start, long length, long fileLength, String[] hosts, + List partitionList) { + super(file, start, length, fileLength, hosts, partitionList); + this.tableFormatType = TableFormatType.PAIMON; + } + public Split getSplit() { return split; } @@ -49,4 +58,19 @@ public class PaimonSplit extends FileSplit { this.tableFormatType = tableFormatType; } + public static class PaimonSplitCreator implements SplitCreator { + + static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator(); + + @Override + public org.apache.doris.spi.Split create(Path path, + long start, + long length, + long fileLength, + long modificationTime, + String[] hosts, + List partitionValues) { + return new PaimonSplit(path, start, length, fileLength, hosts, partitionValues); + } + } } diff --git a/fe/pom.xml b/fe/pom.xml index ef0747931c..dbb691ddf2 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -221,7 +221,7 @@ under the License. ${fe.dir}/../ 1.2-SNAPSHOT UTF-8 - 1.0.2 + 1.0.3 1.8 1.8 @@ -343,7 +343,7 @@ under the License. 2.3.2 - 0.5.0-incubating + 0.6.0-incubating 3.4.4 395 diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0e35f6ebc5..23da0f23eb 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -311,6 +311,7 @@ struct TPaimonFileDesc { 8: optional i64 db_id 9: optional i64 tbl_id 10: optional i64 last_update_time + 11: optional string file_format } struct TMaxComputeFileDesc {