diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index b0a2d2e327..a5ebf34645 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -585,10 +585,18 @@ Status VFileScanner::_get_next_reader() { _current_range_path = range.path; // create reader for specific format - // TODO: add json, avro Status init_status; - // TODO: use data lake type - switch (_params.format_type) { + TFileFormatType::type format_type = _params.format_type; + // JNI reader can only push down column value range + bool push_down_predicates = !_is_load && _params.format_type != TFileFormatType::FORMAT_JNI; + if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params && + range.table_format_params.table_format_type == "hudi") { + if (range.table_format_params.hudi_params.delta_logs.empty()) { + // fall back to native reader if there is no log file + format_type = TFileFormatType::FORMAT_PARQUET; + } + } + switch (format_type) { case TFileFormatType::FORMAT_JNI: { if (_real_tuple_desc->table_desc()->table_type() == ::doris::TTableType::type::MAX_COMPUTE_TABLE) { @@ -625,7 +633,7 @@ Status VFileScanner::_get_next_reader() { SCOPED_TIMER(_open_reader_timer); RETURN_IF_ERROR(parquet_reader->open()); } - if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) { + if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) { _push_down_conjuncts.resize(_conjuncts.size()); for (size_t i = 0; i != _conjuncts.size(); ++i) { RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i])); @@ -660,7 +668,7 @@ Status VFileScanner::_get_next_reader() { std::unique_ptr orc_reader = OrcReader::create_unique( _profile, _state, _params, range, _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat); - if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) { + if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) { _push_down_conjuncts.resize(_conjuncts.size()); for (size_t i = 0; i != _conjuncts.size(); ++i) { RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i])); diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml b/fe/be-java-extensions/hudi-scanner/pom.xml index e07e5aedb5..71564e4be3 100644 --- a/fe/be-java-extensions/hudi-scanner/pom.xml +++ b/fe/be-java-extensions/hudi-scanner/pom.xml @@ -37,6 +37,12 @@ under the License. org.apache.doris java-common ${project.version} + + + fe-common + org.apache.doris + + @@ -64,6 +70,10 @@ under the License. org.apache.avro avro + + hudi-hadoop-mr + org.apache.hudi + @@ -92,7 +102,6 @@ under the License. commons-io commons-io - hudi-scanner diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java index 4b4a7bbcf7..4a7ea36e44 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiColumnValue.java @@ -25,10 +25,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -62,6 +65,11 @@ public class HudiColumnValue implements ColumnValue { return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData); } + @Override + public boolean canGetStringAsBytes() { + return true; + } + @Override public boolean isNull() { return false; @@ -143,6 +151,16 @@ public class HudiColumnValue implements ColumnValue { @Override public byte[] getBytes() { + // Get bytes directly if fieldData is BytesWritable or Text to avoid decoding&encoding + if (fieldData instanceof BytesWritable) { + return ((BytesWritable) fieldData).getBytes(); + } + if (fieldData instanceof Text) { + return ((Text) fieldData).getBytes(); + } + if (fieldData instanceof String) { + return ((String) fieldData).getBytes(StandardCharsets.UTF_8); + } return (byte[]) inspectObject(); } diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java index b559f2a0af..14a412dccb 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/MockJniScanner.java @@ -50,6 +50,11 @@ public class MockJniScanner extends JniScanner { this.j = j; } + @Override + public boolean canGetStringAsBytes() { + return false; + } + @Override public boolean isNull() { return false; diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java index 0495179260..fa2e268366 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ColumnValue.java @@ -27,6 +27,9 @@ import java.util.List; * Column value in vector column */ public interface ColumnValue { + // Get bytes directly when reading string value to avoid decoding&encoding + boolean canGetStringAsBytes(); + boolean isNull(); boolean getBoolean(); diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java index f9b35b2352..4553d29e18 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/ScanPredicate.java @@ -118,6 +118,11 @@ public class ScanPredicate { } } + @Override + public boolean canGetStringAsBytes() { + return false; + } + @Override public String toString() { return inspectObject().toString(); diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java index 70b358a87f..f65ea8fae7 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/VectorColumn.java @@ -597,7 +597,11 @@ public class VectorColumn { case CHAR: case VARCHAR: case STRING: - appendStringAndOffset(o.getString()); + if (o.canGetStringAsBytes()) { + appendBytesAndOffset(o.getBytes()); + } else { + appendStringAndOffset(o.getString()); + } break; case BINARY: appendBytesAndOffset(o.getBytes()); diff --git a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java index 4c3aff97b4..a2237b4cc4 100644 --- a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java +++ b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeColumnValue.java @@ -57,6 +57,11 @@ public class MaxComputeColumnValue implements ColumnValue { this.idx = 0; } + @Override + public boolean canGetStringAsBytes() { + return false; + } + @Override public boolean isNull() { return column.isNull(idx); diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java index e0a89c9db2..d10c876de0 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonColumnValue.java @@ -44,6 +44,11 @@ public class PaimonColumnValue implements ColumnValue { this.record = record; } + @Override + public boolean canGetStringAsBytes() { + return false; + } + @Override public boolean getBoolean() { return record.getBoolean(idx); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 498401b05b..ed004ff170 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -244,9 +244,10 @@ public abstract class FileQueryScanNode extends FileScanNode { // set hdfs params for hdfs file type. Map locationProperties = getLocationProperties(); - if (fileFormatType == TFileFormatType.FORMAT_JNI) { + if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) { params.setProperties(locationProperties); - } else if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { + } + if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { String fsName = getFsName(inputSplit); THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); tHdfsParams.setFsName(fsName); @@ -259,8 +260,6 @@ public abstract class FileQueryScanNode extends FileScanNode { } params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); } - } else if (locationType == TFileType.FILE_S3) { - params.setProperties(locationProperties); } List pathPartitionKeys = getPathPartitionKeys(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index 41b9ac80b4..fdb50e78a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -30,6 +30,7 @@ import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.planner.external.TableFormatType; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.THudiFileDesc; @@ -68,6 +69,8 @@ public class HudiScanNode extends HiveScanNode { private final boolean isCowTable; + private long noLogsSplitNum = 0; + /** * External file scan node for Query Hudi table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -140,8 +143,11 @@ public class HudiScanNode extends HiveScanNode { public List getSplits() throws UserException { if (isCowTable) { // skip hidden files start with "." - return super.getSplits().stream().filter(split -> !((FileSplit) split).getPath().getName().startsWith(".")) + List cowSplits = super.getSplits().stream() + .filter(split -> !((FileSplit) split).getPath().getName().startsWith(".")) .collect(Collectors.toList()); + noLogsSplitNum = cowSplits.size(); + return cowSplits; } HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable); @@ -211,6 +217,9 @@ public class HudiScanNode extends HiveScanNode { List logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath).map(Path::toString) .collect(Collectors.toList()); + if (logs.isEmpty()) { + noLogsSplitNum++; + } HudiSplit split = new HudiSplit(new Path(filePath), 0, fileSize, fileSize, new String[0], partition.getPartitionValues()); @@ -233,4 +242,10 @@ public class HudiScanNode extends HiveScanNode { } return splits; } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + return super.getNodeExplainString(prefix, detailLevel) + + String.format("%shudiNativeReadSplits=%d/%d\n", prefix, noLogsSplitNum, inputSplitsNum); + } }