From 111185407cd2ebf9a90ad1ebcd9c4314ae350ccd Mon Sep 17 00:00:00 2001 From: wudongliang <46414265+DongLiang-0@users.noreply.github.com> Date: Tue, 19 Dec 2023 16:37:34 +0800 Subject: [PATCH] [Improve](tvf)jni-avro support split file (#27933) --- .../vec/exec/format/avro/avro_jni_reader.cpp | 4 +++ .../org/apache/doris/avro/AvroFileCache.java | 29 ++++++++++++------- .../org/apache/doris/avro/AvroJNIScanner.java | 8 +++++ .../org/apache/doris/avro/AvroProperties.java | 3 ++ .../org/apache/doris/avro/AvroReader.java | 5 +--- 5 files changed, 34 insertions(+), 15 deletions(-) diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp b/be/src/vec/exec/format/avro/avro_jni_reader.cpp index f3cff19c04..0877e8406b 100644 --- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp +++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp @@ -89,6 +89,10 @@ Status AvroJNIReader::init_fetch_table_reader( if (type == TFileType::FILE_S3) { required_param.insert(_params.properties.begin(), _params.properties.end()); } + required_param.insert( + std::make_pair("split_start_offset", std::to_string(_range.start_offset))); + required_param.insert(std::make_pair("split_size", std::to_string(_range.size))); + required_param.insert(std::make_pair("split_file_size", std::to_string(_range.file_size))); required_param.insert(std::make_pair("uri", _range.path)); _jni_connector = std::make_unique("org/apache/doris/avro/AvroJNIScanner", required_param, column_names); diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java index 0ffcb9bc99..bcec73b87b 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroFileCache.java @@ -44,30 +44,37 @@ public class AvroFileCache { public static class AvroFileMeta { private final String schema; private Set requiredFields; - // TODO split file - private String splitInfo; + private Long splitStartOffset; + private Long splitSize; AvroFileMeta(String schema) { this.schema = schema; } - AvroFileMeta(String schema, String splitInfo) { - this.schema = schema; - this.splitInfo = splitInfo; - } - public String getSchema() { return schema; } - public String getSplitInfo() { - return splitInfo; - } - public void setRequiredFields(Set requiredFields) { this.requiredFields = requiredFields; } + public void setSplitStartOffset(Long splitStartOffset) { + this.splitStartOffset = splitStartOffset; + } + + public void setSplitSize(Long splitSize) { + this.splitSize = splitSize; + } + + public Long getSplitStartOffset() { + return this.splitStartOffset; + } + + public Long getSplitSize() { + return this.splitSize; + } + public Set getRequiredFields() { return requiredFields; } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java index 9cfaa09262..a1b573cf5b 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java @@ -74,6 +74,9 @@ public class AvroJNIScanner extends JniScanner { private AvroFileMeta avroFileMeta; private AvroWrapper> inputPair; private NullWritable ignore; + private Long splitStartOffset; + private Long splitSize; + private Long splitFileSize; /** * Call by JNI for get table data or get table schema @@ -100,6 +103,9 @@ public class AvroJNIScanner extends JniScanner { this.fieldInspectors = new ObjectInspector[requiredFields.length]; this.inputPair = new AvroWrapper<>(null); this.ignore = NullWritable.get(); + this.splitStartOffset = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_START_OFFSET)); + this.splitSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_SIZE)); + this.splitFileSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_FILE_SIZE)); } } @@ -171,6 +177,8 @@ public class AvroJNIScanner extends JniScanner { avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri); avroFileMeta = AvroFileCache.getAvroFileMeta(avroFileCacheKey); avroFileMeta.setRequiredFields(requiredFieldSet); + avroFileMeta.setSplitStartOffset(splitStartOffset); + avroFileMeta.setSplitSize(splitSize); initFieldInspector(); initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize); } catch (Exception e) { diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java index 7c6cbbcbb4..416b7d2589 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java @@ -38,5 +38,8 @@ public class AvroProperties { protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; protected static final String FS_S3A_REGION = "fs.s3a.region"; + protected static final String SPLIT_START_OFFSET = "split_start_offset"; + protected static final String SPLIT_SIZE = "split_size"; + protected static final String SPLIT_FILE_SIZE = "split_file_size"; } diff --git a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java index 9180cf617f..52a78188ea 100644 --- a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java +++ b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroReader.java @@ -31,7 +31,6 @@ import org.apache.avro.mapred.AvroJob; import org.apache.avro.mapred.AvroRecordReader; import org.apache.avro.mapred.AvroWrapper; import org.apache.avro.mapred.Pair; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; @@ -72,9 +71,7 @@ public abstract class AvroReader { protected void openDataReader(AvroFileMeta avroFileMeta) throws IOException { JobConf job = new JobConf(); projectionSchema(job, avroFileMeta); - FileStatus fileStatus = fileSystem.getFileStatus(path); - // TODO split file - FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getLen(), job); + FileSplit fileSplit = new FileSplit(path, avroFileMeta.getSplitStartOffset(), avroFileMeta.getSplitSize(), job); dataReader = new AvroRecordReader<>(job, fileSplit); LOG.debug("success open avro data reader."); }