1.Reconstruct the logic of decode to read parquet. The parquet reader first reads the data according to the parquet physical type, and then performs a type conversion.
2.Support hive alter table.
The sink who creates the delta writer may be closed while other sinks still using this delta writer.
The parent profile is deconstructed and when the last sink trying to update the profile, it will meet use-after-free.
To address this issue, we record the profile number in delta writer,
and the last sink who close the delta writer will create and update the profile.
`ParquetReader` confuses logical/physical/slot id of columns. If only reading the scalar types, there's nothing wrong, but when reading complex types, `RowGroup` and `PageIndex` will get wrong statistics. Therefore, if the query contains complex types and pushed-down predicates, the probability of the result set is incorrect.
* [Improve](performance) introduce SchemaCache to cache TabletSchame & Schema
1. When the system is under high-concurrency load with wide table point queries, the frequent memory allocation and deallocation of Schema become evident system bottlenecks. Additionally, the initialization of TabletSchema and Schema also becomes a CPU hotspot.Therefore, the introduction of a SchemaCache is implemented to cache these resources for reuse.
2. Make some variables wrapped with std::unique<unique_ptr>
Performance:
| 状态 | QPS | 平均响应时间 (avg) | P99 响应时间 |
|------------------|-----|------------------|-------------|
| 开启 SchemaCache | 501 | 20ms | 34ms |
| 关闭 SchemaCache | 321 | 31ms | 61ms |
* handle schema change with schema version
* remove useless header
* rebase
Refactoring the filtering conditions in the current ExecNode from an expression tree to an array can simplify the process of adding runtime filters. It eliminates the need for complex merge operations and removes the requirement for the frontend to combine expressions into a single entity.
By representing the filtering conditions as an array, each condition can be treated individually, making it easier to add runtime filters without the need for complex merging logic. The array can store the individual conditions, and the runtime filter logic can iterate through the array to apply the filters as needed.
This refactoring simplifies the codebase, improves readability, and reduces the complexity associated with handling filtering conditions and adding runtime filters. It separates the conditions into discrete entities, enabling more straightforward manipulation and management within the execution node.
Currently, there are some useless includes in the codebase. We can use a tool named include-what-you-use to optimize these includes. By using a strict include-what-you-use policy, we can get lots of benefits from it.
disallow call new method explicitly
force to use create_shared or create_unique to use shared ptr
placement new is allowed
reference https://abseil.io/tips/42 to add factory method to all class.
I think we should follow this guide because if throw exception in new method, the program will terminate.
---------
Co-authored-by: yiguolei <yiguolei@gmail.com>
Currently, there are some useless includes in the codebase. We can use a tool named include-what-you-use to optimize these includes. By using a strict include-what-you-use policy, we can get lots of benefits from it.
Fix tow bugs:
1. Enabling file caching requires both `FE session` and `BE` configurations(enable_file_cache=true) to be enabled.
2. `ParquetReader` has not used `IOContext` previously, but `CachedRemoteFileReader::read_at` needs `IOContext` after PR(#17586).
Problem:
1. FE will split the parquet file into split. So a file can have several splits.
2. BE will scan each split, read the footer of the parquet file.
3. If 2 splits belongs to a same parquet file, the footer of this file will be read twice.
This PR mainly changes:
1. Use kv cache to cache the footer of parquet file.
2. The kv cache is belong to a scan node, so all parquet reader belong to this scan node will share same kv cache.
3. In cache, the key is "meta_file_path", the value is parsed thrift footer.
The KV Cache is sharded into mutlti sub cache.
So that different file can use different sub cache, avoid blocking each other
In my test, a query with 26 splits can reduce the footer parse time from 4s -> 1s
See #17764 for details
I have tested:
- Unit test for local/s3/hdfs/broker file system: be/test/io/fs/file_system_test.cpp
- Outfile to local/s3/hdfs/broker.
- Load from local/s3/hdfs/broker.
- Query file on local/s3/hdfs/broker file system, with table value function and catalog.
- Backup/Restore with local/s3/hdfs/broker file system
Not test:
- cold & host data separation case.
There are many type definitions in BE. Should unify the type system and simplify the development.
---------
Co-authored-by: yiguolei <yiguolei@gmail.com>
Support iceberg schema evolution for parquet file format.
Iceberg use unique id for each column to support schema evolution.
To support this feature in Doris, FE side need to get the current column id for each column and send the ids to be side.
Be read column id from parquet key_value_metadata, set the changed column name in Block to match the name in parquet file before reading data. And set the name back after reading data.
Since Filesystem inherited std::enable_shared_from_this , it is dangerous to create native point of FileSystem.
To avoid this behavior, making the constructor of XxxFileSystem a private method and using the static method create(...) to get a new FileSystem object.
The main purpose of this pr is to import `fileCache` for lakehouse reading remote files.
Use the local disk as the cache for reading remote file, so the next time this file is read,
the data can be obtained directly from the local disk.
In addition, this pr includes a few other minor changes
Import File Cache:
1. The imported `fileCache` is called `block_file_cache`, which uses lru replacement policy.
2. Implement a new FileRereader `CachedRemoteFilereader`, so that the logic of `file cache` is hidden under `CachedRemoteFilereader`.
Other changes:
1. Add a new interface `fs()` for `FileReader`.
2. `IOContext` adds some statistical information to count the situation of `FileCache`
Co-authored-by: Lightman <31928846+Lchangliang@users.noreply.github.com>
Fix three bugs when read iceberg v2 tables:
1. The `delete position` in `delete file` represents the position of delete row in the entire file, but the `read range` in
`RowGroupReader` represents the position in current row group. Therefore, we need to subtract the position of first
row of current row group from `delete position`.
2. When only reading the partition columns, `RowGroupReader` skips processing the `delete position`.
3. If the `delete position` has delete all rows in a row group, the `read range` is empty, but we read the whole row
group in such case.
Optimize four performance issues:
1. We change `delete position` to `delete range`, and then merge `delete range` and `read range` into the final read
ranges. This process is too tedious and time-consuming. . we can merge `delete position` and `read range` directly.
2. `delete position` is ordered in a `delete file`, so we can use merge-sort, instead of ordered-set.
3. Initialize `RowGroupReader` when reading, instead of initialize all row groups when opening a `ParquetReader`, to
save memory usage, and the same as `IcebergReader`.
4. Change the recursive call of `_do_lazy_read` to loop logic.