[opt](FileCache) use modification time to determine whether the file is changed (#18906)
Get the last modification time from file status, and use the combination of path and modification time to generate cache identifier. When a file is changed, the modification time will be changed, so the former cache path will be invalid.
This commit is contained in:
@ -233,6 +233,7 @@ public class HiveMetaStoreClientHelper {
|
||||
brokerFileStatus.setIsDir(fileLocation.isDirectory());
|
||||
brokerFileStatus.setIsSplitable(true);
|
||||
brokerFileStatus.setSize(fileLocation.getSize());
|
||||
brokerFileStatus.setModificationTime(fileLocation.getModificationTime());
|
||||
// filePath.toUri().getPath() = "/path/to/partition/file_name"
|
||||
// eg: /home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse
|
||||
// + /dae.db/customer/state=CA/city=SanJose/000000_0
|
||||
|
||||
@ -102,6 +102,7 @@ public class BrokerUtil {
|
||||
if (r.isFile()) {
|
||||
TBrokerFileStatus status = new TBrokerFileStatus(r.getName(), !r.isFile(), r.getSize(), r.isFile());
|
||||
status.setBlockSize(r.getBlockSize());
|
||||
status.setModificationTime(r.getModificationTime());
|
||||
fileStatuses.add(status);
|
||||
}
|
||||
}
|
||||
|
||||
@ -324,6 +324,7 @@ public class HiveMetaStoreCache {
|
||||
// Convert the hadoop split to Doris Split.
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]);
|
||||
// todo: get modification time
|
||||
result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null, null));
|
||||
}
|
||||
}
|
||||
@ -802,6 +803,7 @@ public class HiveMetaStoreCache {
|
||||
status.setPath(file.getPath());
|
||||
status.length = file.getSize();
|
||||
status.blockSize = file.getBlockSize();
|
||||
status.modificationTime = file.getModificationTime();
|
||||
files.add(status);
|
||||
}
|
||||
|
||||
@ -823,6 +825,7 @@ public class HiveMetaStoreCache {
|
||||
Path path;
|
||||
long length;
|
||||
long blockSize;
|
||||
long modificationTime;
|
||||
}
|
||||
|
||||
@Data
|
||||
|
||||
@ -581,7 +581,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
|
||||
|
||||
List<TBrokerFileStatus> fileStatus = rep.getFiles();
|
||||
for (TBrokerFileStatus tFile : fileStatus) {
|
||||
RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, tFile.size, 0);
|
||||
RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, tFile.size, 0, tFile.getModificationTime());
|
||||
result.add(file);
|
||||
}
|
||||
LOG.info("finished to list remote path {}. get files: {}", remotePath, result);
|
||||
|
||||
@ -33,25 +33,36 @@ public class RemoteFile {
|
||||
// A large file will split into multiple blocks. The blocks are transparent to the user.
|
||||
// Default block size for HDFS 2.x is 128M.
|
||||
private final long blockSize;
|
||||
private long modificationTime;
|
||||
private Path path;
|
||||
BlockLocation[] blockLocations;
|
||||
|
||||
public RemoteFile(String name, boolean isFile, long size, long blockSize) {
|
||||
this(name, null, isFile, !isFile, size, blockSize, null);
|
||||
this(name, null, isFile, !isFile, size, blockSize, 0, null);
|
||||
}
|
||||
|
||||
public RemoteFile(String name, boolean isFile, long size, long blockSize, long modificationTime) {
|
||||
this(name, null, isFile, !isFile, size, blockSize, modificationTime, null);
|
||||
}
|
||||
|
||||
public RemoteFile(Path path, boolean isDirectory, long size, long blockSize, BlockLocation[] blockLocations) {
|
||||
this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, blockLocations);
|
||||
this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, 0, blockLocations);
|
||||
}
|
||||
|
||||
public RemoteFile(Path path, boolean isDirectory, long size, long blockSize, long modificationTime,
|
||||
BlockLocation[] blockLocations) {
|
||||
this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, modificationTime, blockLocations);
|
||||
}
|
||||
|
||||
public RemoteFile(String name, Path path, boolean isFile, boolean isDirectory,
|
||||
long size, long blockSize, BlockLocation[] blockLocations) {
|
||||
long size, long blockSize, long modificationTime, BlockLocation[] blockLocations) {
|
||||
Preconditions.checkState(!Strings.isNullOrEmpty(name));
|
||||
this.name = name;
|
||||
this.isFile = isFile;
|
||||
this.isDirectory = isDirectory;
|
||||
this.size = size;
|
||||
this.blockSize = blockSize;
|
||||
this.modificationTime = modificationTime;
|
||||
this.path = path;
|
||||
this.blockLocations = blockLocations;
|
||||
}
|
||||
@ -80,6 +91,10 @@ public class RemoteFile {
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public long getModificationTime() {
|
||||
return modificationTime;
|
||||
}
|
||||
|
||||
public BlockLocation[] getBlockLocations() {
|
||||
return blockLocations;
|
||||
}
|
||||
|
||||
@ -58,8 +58,8 @@ public abstract class RemoteFileSystem extends PersistentFileSystem {
|
||||
List<RemoteFile> locations = new ArrayList<>();
|
||||
while (locatedFiles.hasNext()) {
|
||||
LocatedFileStatus fileStatus = locatedFiles.next();
|
||||
RemoteFile location = new RemoteFile(fileStatus.getPath(), fileStatus.isDirectory(),
|
||||
fileStatus.getLen(), fileStatus.getBlockSize(), fileStatus.getBlockLocations());
|
||||
RemoteFile location = new RemoteFile(fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(),
|
||||
fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations());
|
||||
locations.add(location);
|
||||
}
|
||||
return new RemoteFiles(locations);
|
||||
|
||||
@ -79,7 +79,7 @@ public class S3FileSystem extends ObjFileSystem {
|
||||
RemoteFile remoteFile = new RemoteFile(
|
||||
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
|
||||
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
|
||||
fileStatus.getBlockSize());
|
||||
fileStatus.getBlockSize(), fileStatus.getModificationTime());
|
||||
result.add(remoteFile);
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
|
||||
@ -431,7 +431,7 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
RemoteFile remoteFile = new RemoteFile(
|
||||
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
|
||||
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
|
||||
fileStatus.getBlockSize());
|
||||
fileStatus.getBlockSize(), fileStatus.getModificationTime());
|
||||
result.add(remoteFile);
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
|
||||
@ -337,6 +337,7 @@ public class FileGroupInfo {
|
||||
rangeDesc.setSize(fileStatus.size);
|
||||
rangeDesc.setFileSize(fileStatus.size);
|
||||
}
|
||||
rangeDesc.setModificationTime(fileStatus.getModificationTime());
|
||||
return rangeDesc;
|
||||
}
|
||||
}
|
||||
|
||||
@ -306,6 +306,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
// need full path
|
||||
rangeDesc.setPath(fileSplit.getPath().toString());
|
||||
}
|
||||
rangeDesc.setModificationTime(fileSplit.getModificationTime());
|
||||
return rangeDesc;
|
||||
}
|
||||
|
||||
|
||||
@ -212,7 +212,7 @@ public class FileScanNode extends ExternalScanNode {
|
||||
}
|
||||
|
||||
protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
|
||||
boolean splittable, List<String> partitionValues) throws IOException {
|
||||
long modificationTime, boolean splittable, List<String> partitionValues) throws IOException {
|
||||
if (blockLocations == null) {
|
||||
blockLocations = new BlockLocation[0];
|
||||
}
|
||||
@ -226,7 +226,7 @@ public class FileScanNode extends ExternalScanNode {
|
||||
if (!splittable) {
|
||||
LOG.debug("Path {} is not splittable.", path);
|
||||
String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
|
||||
result.add(new FileSplit(path, 0, length, length, hosts, partitionValues));
|
||||
result.add(new FileSplit(path, 0, length, length, modificationTime, hosts, partitionValues));
|
||||
return result;
|
||||
}
|
||||
long bytesRemaining;
|
||||
@ -234,12 +234,14 @@ public class FileScanNode extends ExternalScanNode {
|
||||
bytesRemaining -= splitSize) {
|
||||
int location = getBlockIndex(blockLocations, length - bytesRemaining);
|
||||
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
|
||||
result.add(new FileSplit(path, length - bytesRemaining, splitSize, length, hosts, partitionValues));
|
||||
result.add(new FileSplit(path, length - bytesRemaining, splitSize,
|
||||
length, modificationTime, hosts, partitionValues));
|
||||
}
|
||||
if (bytesRemaining != 0L) {
|
||||
int location = getBlockIndex(blockLocations, length - bytesRemaining);
|
||||
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
|
||||
result.add(new FileSplit(path, length - bytesRemaining, bytesRemaining, length, hosts, partitionValues));
|
||||
result.add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
|
||||
length, modificationTime, hosts, partitionValues));
|
||||
}
|
||||
|
||||
LOG.debug("Path {} includes {} splits.", path, result.size());
|
||||
|
||||
@ -34,6 +34,7 @@ public class FileSplit implements Split {
|
||||
// -1 means unset.
|
||||
// If the file length is not set, the file length will be fetched from the file system.
|
||||
protected long fileLength;
|
||||
protected long modificationTime;
|
||||
protected String[] hosts;
|
||||
protected TableFormatType tableFormatType;
|
||||
// The values of partitions.
|
||||
@ -42,15 +43,21 @@ public class FileSplit implements Split {
|
||||
protected List<String> partitionValues;
|
||||
|
||||
public FileSplit(Path path, long start, long length, long fileLength,
|
||||
String[] hosts, List<String> partitionValues) {
|
||||
long modificationTime, String[] hosts, List<String> partitionValues) {
|
||||
this.path = path;
|
||||
this.start = start;
|
||||
this.length = length;
|
||||
this.fileLength = fileLength;
|
||||
this.modificationTime = modificationTime;
|
||||
this.hosts = hosts == null ? new String[0] : hosts;
|
||||
this.partitionValues = partitionValues;
|
||||
}
|
||||
|
||||
public FileSplit(Path path, long start, long length, long fileLength,
|
||||
String[] hosts, List<String> partitionValues) {
|
||||
this(path, start, length, fileLength, 0, hosts, partitionValues);
|
||||
}
|
||||
|
||||
public String[] getHosts() {
|
||||
return hosts;
|
||||
}
|
||||
|
||||
@ -170,7 +170,7 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
boolean isSplittable = fileCacheValue.isSplittable();
|
||||
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
|
||||
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
|
||||
status.getBlockLocations(), status.getLength(),
|
||||
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
|
||||
isSplittable, fileCacheValue.getPartitionValues()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -111,7 +111,7 @@ public class TVFScanNode extends FileQueryScanNode {
|
||||
Path path = new Path(fileStatus.getPath());
|
||||
try {
|
||||
splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(),
|
||||
fileStatus.isSplitable, null));
|
||||
fileStatus.getModificationTime(), fileStatus.isSplitable, null));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("get file split failed for TVF: {}", path, e);
|
||||
throw new UserException(e);
|
||||
@ -119,15 +119,4 @@ public class TVFScanNode extends FileQueryScanNode {
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
private void addFileSplits(Path path, long fileSize, long splitSize, List<Split> splits) {
|
||||
long bytesRemaining;
|
||||
for (bytesRemaining = fileSize; (double) bytesRemaining / (double) splitSize > 1.1D;
|
||||
bytesRemaining -= splitSize) {
|
||||
splits.add(new FileSplit(path, fileSize - bytesRemaining, splitSize, fileSize, new String[0], null));
|
||||
}
|
||||
if (bytesRemaining != 0L) {
|
||||
splits.add(new FileSplit(path, fileSize - bytesRemaining, bytesRemaining, fileSize, new String[0], null));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import java.util.List;
|
||||
|
||||
@Data
|
||||
public class IcebergSplit extends FileSplit {
|
||||
// File path will be changed if the file is modified, so there's no need to get modification time.
|
||||
public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts) {
|
||||
super(file, start, length, fileLength, hosts, null);
|
||||
}
|
||||
|
||||
@ -457,6 +457,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
fileRangeDesc.setStartOffset(0);
|
||||
fileRangeDesc.setSize(firstFile.getSize());
|
||||
fileRangeDesc.setFileSize(firstFile.getSize());
|
||||
fileRangeDesc.setModificationTime(firstFile.getModificationTime());
|
||||
// set TFileScanRange
|
||||
TFileScanRange fileScanRange = new TFileScanRange();
|
||||
fileScanRange.addToRanges(fileRangeDesc);
|
||||
|
||||
Reference in New Issue
Block a user