Add 3 counters for ExecNode:
ExecTime - Total execution time(excluding the execution time of children).
OutputBytes - The total number of bytes output to parent.
BlockCount - The total count of blocks output to parent.
For comparison predicate, two arguments must be cast to datetime and push down to storage if either one is date type. This PR disables predicate push-down for this case.
this pr is aim to update for filter_by_select logic and change delete limit
only support scala type in delete statement where condition
only support column nullable and predict column support filter_by_select logic, because we can not push down non-scala type to storage layer to pack in predict column but do filter logic
Iceberg has its own metadata information, which includes count statistics for table data. If the table does not contain equli'ty delete, we can get the count data of the current table directly from the count statistics.
For load request, there are 2 tuples on scan node, input tuple and output tuple.
The input tuple is for reading file, and it will be converted to output tuple based on user specified column mappings.
And the broker load support different column mapping in different data description to same table(or partition).
So for each scanner, the output tuples are same but the input tuple can be different.
The previous implements save the input tuple in scan node level, causing different scanner using same input tuple,
which is incorrect.
This PR remove the input tuple from scan node and save them in each scanners.
Optimization "select count(*) from table" stmtement , push down "count" type to BE.
support file type : parquet ,orc in hive .
1. 4kfiles , 60kwline num
before: 1 min 37.70 sec
after: 50.18 sec
2. 50files , 60kwline num
before: 1.12 sec
after: 0.82 sec
Co-authored-by: Jerry Hu <mrhhsg@gmail.com>
1. Filtering is done at the sending end rather than the receiving end
2. Projection is done at the sending end rather than the receiving end
3. Each sender can use different shuffle policies to send data
For pipeline, olap table sink close is divided into three stages, try_close() --> pending_finish() --> close()
only after all node channels are done or canceled, pending_finish() will return false, close() will start.
this will avoid block pipeline on close().
In close, check the index channel intolerable failure status after each node channel failure,
if intolerable failure is true, the close will be terminated in advance, and all node channels will be canceled to avoid meaningless blocking.
Currently, there are many profiles using add child profile to orgnanize profile into blocks. But it is wrong. Child profile will have a total time counter. Actually, what we should use is just a label.
- MemoryUsage:
- HashTable: 23.98 KB
- SerializeKeyArena: 446.75 KB
Add a new macro ADD_LABEL_COUNTER to add just a label in the profile.
---------
Co-authored-by: yiguolei <yiguolei@gmail.com>
1 Currently, Node's total timer couter has timed twice(in Open and alloc_resource), this may cause timer in profile is not correct.
2 Add more timer to find more code which may cost much time.
Refactoring the filtering conditions in the current ExecNode from an expression tree to an array can simplify the process of adding runtime filters. It eliminates the need for complex merge operations and removes the requirement for the frontend to combine expressions into a single entity.
By representing the filtering conditions as an array, each condition can be treated individually, making it easier to add runtime filters without the need for complex merging logic. The array can store the individual conditions, and the runtime filter logic can iterate through the array to apply the filters as needed.
This refactoring simplifies the codebase, improves readability, and reduces the complexity associated with handling filtering conditions and adding runtime filters. It separates the conditions into discrete entities, enabling more straightforward manipulation and management within the execution node.
Firstly, to reduce memory usage, we do not pre-allocate blocks, instead we lazily allocate block when upper call get_free_block. And when upper call return_free_block to return free block, we add the block to a queue for memory reuse, and we will free the blocks in the queue when the scanner_context was closed instead of destructed.
Secondly, to limit the memory usage of the scanner, we introduce a variable _free_blocks_capacity to indicate the current number of free blocks available to the scanners. The number of scanners that can be scheduled will be calculated based on this value.
ssb flat test
previous
lineorder 1.2G:
load time: 3s, query time: 0.355s
lineorder 5.8G:
load time: 330s, query time: 0.970s
load time: 349s, query time: 0.949s
load time: 349s, query time: 0.955s
load time: 360s, query time: 0.889s (pipeline enabled)
after
lineorder 1.2G:
load time: 3s, query time: 0.349s
lineorder 5.8G:
load time: 342s, query time: 0.929s
load time: 337s, query time: 0.913s
load time: 345s, query time: 0.946s
load time: 346s, query time: 0.865s (pipeline enabled)
1. Remove an exec node method corresponding to a span and replace it with an exec node corresponding to a span;
2. Fix some problems with tracing in pipeline.
Co-authored-by: yiguolei <yiguolei@gmail.com>
Currently, exec node save exprcontext**, but the object is in object pool, the code is very unclear. we could just use exprcontext*.
TabletSink and LoadChannel in BE are M: N relationship,
Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, so usually all LoadChannel runtime profiles are saved on each TabletSink, and the timeliness of the same LoadChannel profile saved on different TabletSinks is different, and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself, and ensures to update the latest LoadChannel profile according to the timestamp.