[Improve](tvf)jni-avro support split file (#27933)
This commit is contained in:
@ -89,6 +89,10 @@ Status AvroJNIReader::init_fetch_table_reader(
|
||||
if (type == TFileType::FILE_S3) {
|
||||
required_param.insert(_params.properties.begin(), _params.properties.end());
|
||||
}
|
||||
required_param.insert(
|
||||
std::make_pair("split_start_offset", std::to_string(_range.start_offset)));
|
||||
required_param.insert(std::make_pair("split_size", std::to_string(_range.size)));
|
||||
required_param.insert(std::make_pair("split_file_size", std::to_string(_range.file_size)));
|
||||
required_param.insert(std::make_pair("uri", _range.path));
|
||||
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
|
||||
required_param, column_names);
|
||||
|
||||
@ -44,30 +44,37 @@ public class AvroFileCache {
|
||||
public static class AvroFileMeta {
|
||||
private final String schema;
|
||||
private Set<String> requiredFields;
|
||||
// TODO split file
|
||||
private String splitInfo;
|
||||
private Long splitStartOffset;
|
||||
private Long splitSize;
|
||||
|
||||
AvroFileMeta(String schema) {
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
AvroFileMeta(String schema, String splitInfo) {
|
||||
this.schema = schema;
|
||||
this.splitInfo = splitInfo;
|
||||
}
|
||||
|
||||
public String getSchema() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
public String getSplitInfo() {
|
||||
return splitInfo;
|
||||
}
|
||||
|
||||
public void setRequiredFields(Set<String> requiredFields) {
|
||||
this.requiredFields = requiredFields;
|
||||
}
|
||||
|
||||
public void setSplitStartOffset(Long splitStartOffset) {
|
||||
this.splitStartOffset = splitStartOffset;
|
||||
}
|
||||
|
||||
public void setSplitSize(Long splitSize) {
|
||||
this.splitSize = splitSize;
|
||||
}
|
||||
|
||||
public Long getSplitStartOffset() {
|
||||
return this.splitStartOffset;
|
||||
}
|
||||
|
||||
public Long getSplitSize() {
|
||||
return this.splitSize;
|
||||
}
|
||||
|
||||
public Set<String> getRequiredFields() {
|
||||
return requiredFields;
|
||||
}
|
||||
|
||||
@ -74,6 +74,9 @@ public class AvroJNIScanner extends JniScanner {
|
||||
private AvroFileMeta avroFileMeta;
|
||||
private AvroWrapper<Pair<Integer, Long>> inputPair;
|
||||
private NullWritable ignore;
|
||||
private Long splitStartOffset;
|
||||
private Long splitSize;
|
||||
private Long splitFileSize;
|
||||
|
||||
/**
|
||||
* Call by JNI for get table data or get table schema
|
||||
@ -100,6 +103,9 @@ public class AvroJNIScanner extends JniScanner {
|
||||
this.fieldInspectors = new ObjectInspector[requiredFields.length];
|
||||
this.inputPair = new AvroWrapper<>(null);
|
||||
this.ignore = NullWritable.get();
|
||||
this.splitStartOffset = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_START_OFFSET));
|
||||
this.splitSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_SIZE));
|
||||
this.splitFileSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_FILE_SIZE));
|
||||
}
|
||||
}
|
||||
|
||||
@ -171,6 +177,8 @@ public class AvroJNIScanner extends JniScanner {
|
||||
avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri);
|
||||
avroFileMeta = AvroFileCache.getAvroFileMeta(avroFileCacheKey);
|
||||
avroFileMeta.setRequiredFields(requiredFieldSet);
|
||||
avroFileMeta.setSplitStartOffset(splitStartOffset);
|
||||
avroFileMeta.setSplitSize(splitSize);
|
||||
initFieldInspector();
|
||||
initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize);
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -38,5 +38,8 @@ public class AvroProperties {
|
||||
protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
|
||||
protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
|
||||
protected static final String FS_S3A_REGION = "fs.s3a.region";
|
||||
protected static final String SPLIT_START_OFFSET = "split_start_offset";
|
||||
protected static final String SPLIT_SIZE = "split_size";
|
||||
protected static final String SPLIT_FILE_SIZE = "split_file_size";
|
||||
|
||||
}
|
||||
|
||||
@ -31,7 +31,6 @@ import org.apache.avro.mapred.AvroJob;
|
||||
import org.apache.avro.mapred.AvroRecordReader;
|
||||
import org.apache.avro.mapred.AvroWrapper;
|
||||
import org.apache.avro.mapred.Pair;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
@ -72,9 +71,7 @@ public abstract class AvroReader {
|
||||
protected void openDataReader(AvroFileMeta avroFileMeta) throws IOException {
|
||||
JobConf job = new JobConf();
|
||||
projectionSchema(job, avroFileMeta);
|
||||
FileStatus fileStatus = fileSystem.getFileStatus(path);
|
||||
// TODO split file
|
||||
FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getLen(), job);
|
||||
FileSplit fileSplit = new FileSplit(path, avroFileMeta.getSplitStartOffset(), avroFileMeta.getSplitSize(), job);
|
||||
dataReader = new AvroRecordReader<>(job, fileSplit);
|
||||
LOG.debug("success open avro data reader.");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user