Sometimes, `show load profile` will only show part of the insert opertion's profile.
This is because we assume that for all load operation(including insert), there is only one fragment in the plan.
But actually, there will be more than 1 fragment in plan. eg:
`insert into tbl1 select * from tbl1 limit 1` will have 2 fragments.
This PR mainly changes:
1. modify the `show load profile`
Before: `show load profile "/queryid/taskid/instanceid";`
After: `show load profile "/queryid/taskid/fragmentid/instanceid";`
2. Modify the display of `ReadColumns` in OlapScanNode
Because for wide table, the line of `ReadColumns` may be too long for show in profile.
So I wrap it and each line contains at most 10 columns names.
3. Fix tvf not working with pipeline engine, follow up #18376
When processing data in hash table for right join and full outer join, if the output data rows of one hash bucket excceeds batch size, the logic when continue processing this bucket is wrong, it should differentiate between different join types.
Fix tow bugs:
1. Enabling file caching requires both `FE session` and `BE` configurations(enable_file_cache=true) to be enabled.
2. `ParquetReader` has not used `IOContext` previously, but `CachedRemoteFileReader::read_at` needs `IOContext` after PR(#17586).
Co-authored-by: ByteYue <[yj976240184@gmail.com](mailto:yj976240184@gmail.com)>
This PR is an optimization for https://github.com/apache/doris/pull/17478:
1. Change the buffer size of `LineReader` to 4MB to align with the size of prefetch buffer.
2. Lazily prefetch data in the first read to prevent wasted reading.
3. S3 block size is 32MB only, which is too small for a file split. Set 128MB as default file split size.
4. Add `_end_offset` for prefetch buffer to prevent wasted reading.
The query performance of reading data on object storage is improved by more than 3x+.
jdbc read array type get result from Doris is string, PG is java.sql.array, CK is java.lang.object
it's difficult to maintain and read the code,
so change all database's array result to string, then add a cast function from string to doris array type
We want to use file cache for caching cold data in S3.
When reading them, we want to know where the data come from and the time taken to read the datas.
So we support the metrics in olap scan node.
And for clearing the information, i also update the fields about the metrics.
1. Fix value idx in bool rle decoder
2. Iceberg table support datetimev2(3). In the previous version, we converted hive timestamp to datetimev2(0) default.
1. Introduce hadoop libhdfs
2. For Linux-X86 platform, use the hadoop libhdfs
3. For other platform, use libhdfs3, because currently we don't have hadoop libhdfs binary for other platform
Co-authored-by: adonis0147 <adonis0147@gmail.com>
In clickhouse's 4.x version of jdbc, some UInt types use special Java types, so I adapted Doris's ClickHouse JDBC External
```
com.clickhouse.data.value.UnsignedByte;
com.clickhouse.data.value.UnsignedInteger;
com.clickhouse.data.value.UnsignedLong;
com.clickhouse.data.value.UnsignedShort;
```
1. add PassNullPredicate to fix topn wrong result for NULL values
2. refactor RuntimePredicate to avoid using TCondition
3. refactor using ordering_exprs in fe and vsort_node
A framework that read data from jni scanner, which can support the data source from java ecosystem(java API).
## Java Interface
Java scanner should extends `org.apache.doris.jni.JniScanner`, implements the following methods:
```
// Initialize JniScanner
public abstract void open() throws IOException;
// Close JniScanner and release resources
public abstract void close() throws IOException;
// Scan data and save as vector table
public abstract int getNext() throws IOException;
```
See demo usage in `org.apache.doris.jni.MockJniScanner`
## c++ interface
C++ reader should use `doris::JniConnector` to get data from `org.apache.doris.jni.JniScanner`. See demo usage in `doris::MockJniReader`.
## Pushed-down predicates
Java scanner can get pushed-down predicates by `org.apache.doris.jni.vec.ScanPredicate`.
## Remaining works:
1. Implement complex nested types.
2. Read hudi MOR table as the end-to-end demo usage.
when loading big file with multi bytes line delimiter, some line record maybe incomplete because of _output_buf_limit, so this incomplete data will move to the beginning of the output buf and read more data into output buf. In this case, find line delimiter should start with no offset to avoid a bug that spilt two lines as one line.
Arena can replace MemPool in most scenarios. Except for memory reuse, MemPool supports reuse of previous memory chunks after clear, but Arena does not.
Some comparisons between MemPool and Arena:
1. Expansion
Arena is less than 128M index 2 alloc chunk; more than 128M memory, allocate 128M * n > `size`, n is equal to the minimum value that satisfies the expression;
MemPool less than 512K index 2 alloc chunk, greater than 512K memory, separately apply for a `size` length chunk
After Arena applied for a chunk larger than 128M last time, the minimum chunk applied for after that is 128M. Does this seem to be a waste of memory? MemPool is also similar. After the chunk of 512K was applied for last time, the minimum chunk of subsequent applications is 512K.
2. Alignment
MemPool defaults to 16 alignment, because memtable and other places that use int128 require 16 alignment;
Arena has no default alignment;
3. Memory reuse
Arena only supports `rollback`, which reuses the memory of the current chunk, usually the memory requested last time.
MemPool supports clear(), all chunks can be reused; or call ReturnPartialAllocation() to roll back the last requested memory; if the last chunk has no memory, search for the most free chunk for allocation
4. Realloc
Arena supports realloc contiguous memory; it also supports realloc contiguous memory from any position at the time of the last allocation. The difference between `alloc_continue` and `realloc` is:
1. Alloc_continue does not need to specify the old size, but the default old size = head->pos - range_start
2. alloc_continue supports expansion from range_start when additional_bytes is between head and pos, which is equivalent to reusing a part of memory, while realloc completely allocates a new memory
MemPool does not support realloc, but supports transferring or absorbing chunks between two MemPools
5. check mem limit
MemPool checks the mem limit, and Arena checks at the Allocator layer.
6. Support for ASAN
Arena does something extra
7. Error handling
MemPool supports returning the error message of application failure directly through `Status`, and Arena throws Exception.
Tests that Arena can consider
1. After the last applied chunk is larger than 128M, the minimum applied chunk is 128M, which seems to waste memory;
2. Support clear, memory multiplexing;
3. Increase the large list, alloc the memory larger than 128M, and the size is equal to `size`, so as to avoid the current chunk not being fully used, which is wasteful.
4. In some cases, it may be possible to allocate backwards to find chunks t
Follow #17586.
This PR mainly changes:
Remove env/
Remove FileUtils/FilesystemUtils
Some methods are moved to LocalFileSystem
Remove olap/file_cache
Add s3 client cache for s3 file system
In my test, the time of open s3 file can be reduced significantly
Fix cold/hot separation bug for s3 fs.
This is the last PR of #17764.
After this, all IO operation should be in io/fs.
Except for tests in #17586, I also tested some case related to fs io:
clone
concurrency query on local/s3/hdfs
load error log create and clean
disk metrics
PR(#17330) has changed the column type of kay and value from array to normal column, but orc&parquet reader still cast to array column, resulting in cast error.
When using JDBC Catalog to query the Doris data, because Doris does not provide the cursor reading method (that is, fetchBatchSize is invalid), Doris will send the data to the client at one time, resulting in client OOM.
The MySQL protocol provides a stream reading method. Doris can use this method to avoid OOM. The requirements of using the stream method are setting fetchbatchsize = Integer.MIN_VALUE and setting ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
Problem:
1. FE will split the parquet file into split. So a file can have several splits.
2. BE will scan each split, read the footer of the parquet file.
3. If 2 splits belongs to a same parquet file, the footer of this file will be read twice.
This PR mainly changes:
1. Use kv cache to cache the footer of parquet file.
2. The kv cache is belong to a scan node, so all parquet reader belong to this scan node will share same kv cache.
3. In cache, the key is "meta_file_path", the value is parsed thrift footer.
The KV Cache is sharded into mutlti sub cache.
So that different file can use different sub cache, avoid blocking each other
In my test, a query with 26 splits can reduce the footer parse time from 4s -> 1s
See #17764 for details
I have tested:
- Unit test for local/s3/hdfs/broker file system: be/test/io/fs/file_system_test.cpp
- Outfile to local/s3/hdfs/broker.
- Load from local/s3/hdfs/broker.
- Query file on local/s3/hdfs/broker file system, with table value function and catalog.
- Backup/Restore with local/s3/hdfs/broker file system
Not test:
- cold & host data separation case.
There are many type definitions in BE. Should unify the type system and simplify the development.
---------
Co-authored-by: yiguolei <yiguolei@gmail.com>
Before this PR when encountering null values with some columns which is specified as `NOT NULL`, null values will not be filtered,thi behavior does not match with the original load behavior.
Second column alignment logic has bug :
```
template <typename ColumnInserterFn>
void align_variant_by_name_and_type(ColumnObject& dst, const ColumnObject& src, size_t row_cnt,
ColumnInserterFn inserter) {
CHECK(dst.is_finalized() && src.is_finalized());
// Use rows() here instead of size(), since size() will check_consistency
// but we could not check_consistency since num_rows will be upgraded even
// if src and dst is empty, we just increase the num_rows of dst and fill
// num_rows of default values when meet new data
size_t num_rows = dst.rows();
```