See #17764 for details
I have tested:
- Unit test for local/s3/hdfs/broker file system: be/test/io/fs/file_system_test.cpp
- Outfile to local/s3/hdfs/broker.
- Load from local/s3/hdfs/broker.
- Query file on local/s3/hdfs/broker file system, with table value function and catalog.
- Backup/Restore with local/s3/hdfs/broker file system
Not test:
- cold & host data separation case.
`CachedRemoteFileReader` has used fixed segment size(file_cache_max_file_segment_size=4M) to cache remote file blocks. However, the column size in a rowgroup/strip maybe smaller than 10K if a parquet/orc file has many columns, resulting in particularly serious read amplification. For example:
Q1 in clickbench: select count(*) from hits
```
- FileCache: 0ns
- IOHitCacheNum: 552
- IOTotalNum: 835
- ReadFromFileCacheBytes: 19.98 MB
- ReadFromWriteCacheBytes: 0.00
- ReadTotalBytes: 29.52 MB
- SkipCacheBytes: 0.00
- WriteInFileCacheBytes: 915.77 MB
- WriteInFileCacheNum: 283
```
Only 30MB of data is needed, but 900MB+ of data is read from hdfs. The query time of Q1(single scan thread) increased from **5.17s** to **24.45s** when enable file cache.
Therefore, this PR introduce dynamic segment size which is based on the `read_size` of the data. In order to prevent too small or too large IO, the segment size is limited in [4096, file_cache_max_file_segment_size].
Q1 in clickbench is **5.66s** when enable file cache. The performance is almost the same as if the cache is disabled, and the data size read from hdfs is reduced to 45MB.
```
- FileCache: 0ns
- IOHitCacheNum: 297
- IOTotalNum: 835
- ReadFromFileCacheBytes: 8.73 MB
- ReadFromWriteCacheBytes: 0.00
- ReadTotalBytes: 29.52 MB
- SkipCacheBytes: 0.00
- WriteInFileCacheBytes: 45.66 MB
- WriteInFileCacheNum: 544
```
## Remaining Problems
Small queries may result in a large number of small files(4KB at least), and the `BE` saves too much meta information of cached segments.
## Fix bug
`FileCachePolicy` in `FileReaderOptions` is a constant reference, but the parameter passed in `FileFactory::create_file_reader` is a temporary variable, resulting in segmentation fault.
Since Filesystem inherited std::enable_shared_from_this , it is dangerous to create native point of FileSystem.
To avoid this behavior, making the constructor of XxxFileSystem a private method and using the static method create(...) to get a new FileSystem object.
The main purpose of this pr is to import `fileCache` for lakehouse reading remote files.
Use the local disk as the cache for reading remote file, so the next time this file is read,
the data can be obtained directly from the local disk.
In addition, this pr includes a few other minor changes
Import File Cache:
1. The imported `fileCache` is called `block_file_cache`, which uses lru replacement policy.
2. Implement a new FileRereader `CachedRemoteFilereader`, so that the logic of `file cache` is hidden under `CachedRemoteFilereader`.
Other changes:
1. Add a new interface `fs()` for `FileReader`.
2. `IOContext` adds some statistical information to count the situation of `FileCache`
Co-authored-by: Lightman <31928846+Lchangliang@users.noreply.github.com>
1. Refactor the file reader creation in FileFactory, for simplicity.
Previously, FileFactory had too many `create_file_reader` interfaces.
Now unified into two categories: the interface used by the previous BrokerScanNode,
and the interface used by the new FileScanNode.
And separate the creation methods of readers that read `StreamLoadPipe` and other readers that read files.
2. Modify the StreamLoadPlanner on FE side to support using ExternalFileScanNode
3. Now for generic reader, the file reader will be created inside the reader, not passed from the outside.
4. Add some test cases for csv stream load, the behavior is same as the old broker scanner.
Refactor of scanners. Support broker load.
This pr is part of the refactor scanner tasks. It provide support for borker load using new VFileScanner.
Work still in progress.
This is an example of s3 hms_catalog:
```sql
CREATE CATALOG hms_catalog properties(
"type" = "hms",
"hive.metastore.uris"="thrift://localhost:9083",
"AWS_ACCESS_KEY" = "your access key",
"AWS_SECRET_KEY"="your secret key",
"AWS_ENDPOINT"="s3 endpoint",
"AWS_REGION"="s3-region",
"fs.s3a.paging.maximum"="1000");
```
All these params are necessary;