During load process, the same operation are performed on all replicas such as sort and aggregation,
which are resource-intensive.
Concurrent data load would consume much CPU and memory resources.
It's better to perform write process (writing data into MemTable and then data flush) on single replica
and synchronize data files to other replicas before transaction finished.
In some cases, query mem tracker does not exist in BE when transmit block. This will result in a null pointer for get query mem tracker in brpc transmit_block
When the length of `Tuple/Block data` is greater than 2G, serialize the protoBuf request and embed the
`Tuple/Block data` into the controller attachment and transmit it through http brpc.
This is to avoid errors when the length of the protoBuf request exceeds 2G:
`Bad request, error_text=[E1003]Fail to compress request`.
In #7164, `Tuple/Block data` was put into attachment and sent via default `baidu_std brpc`,
but when the attachment exceeds 2G, it will be truncated. There is no 2G limit for sending via `http brpc`.
Also, in #7921, consider putting `Tuple/Block data` into attachment transport by default, as this theoretically
reduces one serialization and improves performance. However, the test found that the performance did not improve,
but the memory peak increased due to the addition of a memory copy.
This CL mainly changes:
1. Reducing the rpc timeout problem caused by rpc waiting for the worker thread of brpc.
1. Merge multiple fragment instances on the same BE to send requests to reduce the number of send fragment rpcs
2. If fragments size >= 3, use 2 phase RPC: one is to send all fragments, two is to start these fragments. So that there
will be at most 2 RPC for each query on one BE.
3. Set the timeout of send fragment rpc to the query timeout to ensure the consistency of users' expectation of query timeout period.
4. Do not close the connection anymore when rpc timeout occurs.
5. Change some log level from info to debug to simplify the fe.log content.
NOTICE:
1. Change the definition of execPlanFragment rpc, must first upgrade BE.
3. Remove FE config `remote_fragment_exec_timeout_ms`
1. Fix LoadTask, ChunkAllocator, TabletMeta, Brpc, the accuracy of memory track.
2. Modified some MemTracker names, deleted some unnecessary trackers, and improved readability.
3. More powerful MemTracker debugging capabilities.
4. Avoid creating TabletColumn temporary objects and improve BE startup time by 8%.
5. Fix some other details.
1. fix track bthread
- Bthread, a high performance M:N thread library used by brpc. In Doris, a brpc server response runs on one bthread, possibly on multiple pthreads. Currently, MemTracker consumption relies on pthread local variables (TLS).
- This caused pthread TLS MemTracker confusion when switching pthread TLS MemTracker in brpc server response. So replacing pthread TLS with bthread TLS in the brpc server response saves the MemTracker.
Ref: 731730da85/docs/en/server.md (bthread-local)
2. fix track vectorized query
- Added track mmap. Currently, mmap allocates memory in many places of the vectorized execution engine.
- Refactored ThreadContext to avoid dependency conflicts and make it easier to debug.
- Fix some bugs.
This PR mainly changes:
1. Change the define of PBlock
The new PBlock consists of a set of PColumnMeta and a binary buffer.
The PColumnMeta records the metadata information of all columns in the Block,
while the buffer stores the serialized binary data of all columns.
2. Refactor the serialize/deserialize method of data type
Rewrite the `serialize()/deserialize()` of IDataType. And also add
a new method `get_uncompressed_serialized_bytes()` to get the total length
of uncompressed serialized data of a column.
3. Rewrite the serialize/deserialize method of Block
Now, when serializing a Block to PBlock, it will first get the total length
of uncompressed serialized data of all columns in this Block, and then allocate
the memory to write the serialized data to the buffer.
4. Use brpc attachment to transmit the serialized column data
Support implement UDF through GRPC protocol. This brings several benefits:
1. The udf implementation language is not limited to c++, users can use any familiar language to implement udf
2. UDF is decoupled from Doris, udf will not cause doris coredump, udf computing resources are separated from doris, and doris services are not affected
But RPC's UDF has a fixed overhead, so its performance is much slower than C++ UDF, especially when the amount of data is large.
Create function like
```
CREATE FUNCTION rpc_add(INT, INT) RETURNS INT PROPERTIES (
"SYMBOL"="add_int",
"OBJECT_FILE"="127.0.0.1:9999",
"TYPE"="RPC"
);
```
Function service need to implement `check_fn` and `fn_call` methods
Note:
THIS IS AN EXPERIMENTAL FEATURE, THE INTERFACE AND DATA STRUCTURE MAY BE CHANGED IN FUTURE !!!
Currently, if we encounter a problem with a replica of a tablet during the load process,
such as a write error, rpc error, -235, etc., it will cause the entire load job to fail,
which results in a significant reduction in Doris' fault tolerance.
This PR mainly changes:
1. refined the judgment of failed replicas in the load process, so that the failure of a few replicas will not affect the normal completion of the load job.
2. fix a bug introduced from #7754 that may cause BE coredump
# Proposed changes
Issue Number: close#6238
Co-authored-by: HappenLee <happenlee@hotmail.com>
Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com>
Co-authored-by: Zhengguo Yang <yangzhgg@gmail.com>
Co-authored-by: wangbo <506340561@qq.com>
Co-authored-by: emmymiao87 <522274284@qq.com>
Co-authored-by: Pxl <952130278@qq.com>
Co-authored-by: zhangstar333 <87313068+zhangstar333@users.noreply.github.com>
Co-authored-by: thinker <zchw100@qq.com>
Co-authored-by: Zeno Yang <1521564989@qq.com>
Co-authored-by: Wang Shuo <wangshuo128@gmail.com>
Co-authored-by: zhoubintao <35688959+zbtzbtzbt@users.noreply.github.com>
Co-authored-by: Gabriel <gabrielleebuaa@gmail.com>
Co-authored-by: xinghuayu007 <1450306854@qq.com>
Co-authored-by: weizuo93 <weizuo@apache.org>
Co-authored-by: yiguolei <guoleiyi@tencent.com>
Co-authored-by: anneji-dev <85534151+anneji-dev@users.noreply.github.com>
Co-authored-by: awakeljw <993007281@qq.com>
Co-authored-by: taberylyang <95272637+taberylyang@users.noreply.github.com>
Co-authored-by: Cui Kaifeng <48012748+azurenake@users.noreply.github.com>
## Problem Summary:
### 1. Some code from clickhouse
**ClickHouse is an excellent implementation of the vectorized execution engine database,
so here we have referenced and learned a lot from its excellent implementation in terms of
data structure and function implementation.
We are based on ClickHouse v19.16.2.2 and would like to thank the ClickHouse community and developers.**
The following comment has been added to the code from Clickhouse, eg:
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/AggregationCommon.h
// and modified by Doris
### 2. Support exec node and query:
* vaggregation_node
* vanalytic_eval_node
* vassert_num_rows_node
* vblocking_join_node
* vcross_join_node
* vempty_set_node
* ves_http_scan_node
* vexcept_node
* vexchange_node
* vintersect_node
* vmysql_scan_node
* vodbc_scan_node
* volap_scan_node
* vrepeat_node
* vschema_scan_node
* vselect_node
* vset_operation_node
* vsort_node
* vunion_node
* vhash_join_node
You can run exec engine of SSB/TPCH and 70% TPCDS stand query test set.
### 3. Data Model
Vec Exec Engine Support **Dup/Agg/Unq** table, Support Block Reader Vectorized.
Segment Vec is working in process.
### 4. How to use
1. Set the environment variable `set enable_vectorized_engine = true; `(required)
2. Set the environment variable `set batch_size = 4096; ` (recommended)
### 5. Some diff from origin exec engine
https://github.com/doris-vectorized/doris-vectorized/issues/294
## Checklist(Required)
1. Does it affect the original behavior: (No)
2. Has unit tests been added: (Yes)
3. Has document been added or modified: (No)
4. Does it need to update dependencies: (No)
5. Are there any changes that cannot be rolled back: (Yes)
1. Fix some memory leaks
2. Remove redundant and invalid code
3. Fix some buggy writes to reduce extra memory copies and return null pointers to string
4. Reframing the naming to make the structure clearer
Transfer RowBatch in Protobuf Request to Controller Attachment,
when the maximum length of the RowBatch in the Protobuf Request is exceeded.
This can avoid reaching the upper limit of the Protobuf Request length (2G),
and it is expected that performance can be improved.
This is beacuse of an const MAX_PHYSICAL_PACKET_LENGTH in fe should be 2^24 -1,
but it is set as 2^24 -2 by mistake.
2. Fix bitmap_to_string may failed when the result is large than 2G
Added bprc stub cache check and reset api, used to test whether the bprc stub cache is available, and reset the bprc stub cache
add a config used for auto check and reset bprc stub
Add a use_path_style property for S3
Upgrade hadoop-common and hadoop-aws to 2.8.0 to support path style property
Fix some S3 URI bugs
Add some logs for tracing load process.
1 Avoid sending tasks if there is no data to be consumed
By fetching latest offset of partition before sending tasks.(Fix [Optimize] Avoid too many abort task in routine load job #6803 )
2 Add a preCheckNeedSchedule phase in update() of routine load.
To avoid taking write lock of job for long time when getting all kafka partitions from kafka server.
3 Upgrade librdkafka's version to 1.7.0 to fix a bug of "Local: Unknown partition"
See offsetsForTimes fails with 'Local: Unknown partition' edenhill/librdkafka#3295
4 Avoid unnecessary storage migration task if there is no that storage medium on BE.
Fix [Bug] Too many unnecessary storage migration tasks #6804
In some cases, the query plan thrift structure of a query may be very large
(for example, when there are many columns in SQL), resulting in a large number
of "send fragment timeout" errors.
This PR adds an FE config to control whether to transmit the query plan in a compressed format.
Using compressed format transmission can reduce the size by ~50%. But it may reduce
the concurrency by ~10%. Therefore, in the high concurrency small query scenario,
you can choose to turn off compaction.
1. Fix a memory leak in `collect_iterator.cpp` (Fix#6700)
2. Add a new BE config `max_segment_num_per_rowset` to limit the num of segment in new rowset.(Fix#6701)
3. Make the error msg of stream load more friendly.
## Proposed changes
Add transaction for the operation of insert. It will cost less time than non-transaction(it will cost 1/1000 time) when you want to insert a amount of rows.
### Syntax
```
BEGIN [ WITH LABEL label];
INSERT INTO table_name ...
[COMMIT | ROLLBACK];
```
### Example
commit a transaction:
```
begin;
insert into Tbl values(11, 22, 33);
commit;
```
rollback a transaction:
```
begin;
insert into Tbl values(11, 22, 33);
rollback;
```
commit a transaction with label:
```
begin with label test_label;
insert into Tbl values(11, 22, 33);
commit;
```
### Description
```
begin: begin a transaction, the next insert will execute in the transaction until commit/rollback;
commit: commit the transaction, the data in the transaction will be inserted into the table;
rollback: abort the transaction, nothing will be inserted into the table;
```
### The main realization principle:
```
1. begin a transaction in the session. next sql is executed in the transaction;
2. insert sql will be parser and get the database name and table name, they will be used to select a be and create a pipe to accept data;
3. all inserted values will be sent to the be and write into the pipe;
4. a thread will get the data from the pipe, then write them to disk;
5. commit will complete this transaction and make these data visible;
6. rollback will abort this transaction
```
### Some restrictions on the use of update syntax.
1. Only ```insert``` can be called in a transaction.
2. If something error happened, ```commit``` will not succeed, it will ```rollback``` directly;
3. By default, if part of insert in the transaction is invalid, ```commit``` will only insert the other correct data into the table.
4. If you need ```commit``` return failed when any insert in the transaction is invalid, you need execute ```set enable_insert_strict = true``` before ```begin```.
At present, some constant expression calculations are implemented on the FE side,
but they are incomplete, and some expressions cannot be completely consistent with
the value calculated by BE (such as part of the time function)
Therefore, we provide a way to pass all the constants in SQL to BE for calculation,
and then begin to analyze and plan SQL. This method can also solve the problem that some
complex constant calculations issued by BI cannot be processed on the FE side.
Here through a session variable enable_fold_constant_by_be to control this function,
which is disabled by default.
1. support in/bloomfilter/minmax
2. support broadcast/shuffle/bucket shuffle/colocate join
3. opt memory use and cpu cache miss while build runtime filter
4. opt memory use in left semi join (works well on tpcds-95)
Support when creating a kafka routine load, start consumption from a specified point in time instead of a specific offset.
eg:
```
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.kafka_default_offsets" = "2021-10-10 11:00:00"
);
or
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00"
);
```
This PR also reconstructed the analysis method of properties when creating or altering
routine load jobs, and unified the analysis process in the `RoutineLoadDataSourceProperties` class.
1. Reduce lock conflicts in RuntimeProfile of be;
2. can view query profile when the query is executing;
3. reduce wait time for 'show proc /current_queries'.
In the previous broker load, multiple OlapTableSinks would send data to the same LoadChannel,
and because of the lock granularity problem, LoadChannel could only process these requests serially,
which made it impossible to make full use of cluster resources.
This CL modifies the related locks so that LoadChannel can process these requests in parallel.
In the test, with a size of 20G, the load speed of 334 million rows of data in 3 nodes has been
increased from 9min to 5min, and after enabling 2 concurrency, it can be increased to 3min.
Also modify the profile of load job.
This CL mainly changes:
1. Avoid repeated sending of common components in Fragments
In the previous implementation, a query may generate multiple Fragments,
these Fragments contain some common information, such as DescriptorTable.
Fragment will be sent to BE in a certain order, so these public information will be sent repeatedly
and generated repeatedly on the BE side.
In some complex SQL, these public information may be very large,
thereby increasing the execution time of Fragment.
So I improved this. For multiple Fragments sent to the same BE, only the first Fragment will carry
these public information, and it will be cached on the BE side, and subsequent Fragments
no longer need to carry this information.
In the local test, the execution time of some complex SQL can be reduced from 3 seconds to 1 second.
2. Add the time-consuming part of FE logic in Profile
Including SQL analysis, planning, Fragment scheduling and sending on the FE side, and the time to fetch data.
1. Find the cache node by SQL Key, then find the corresponding partition data by Partition Key, and then decide whether to hit Cache by LastVersion and LastVersionTime
2. Refers to the classic cache algorithm LRU, which is the least recently used algorithm, using a three-layer data structure to achieve
3. The Cache elimination algorithm is implemented by ensuring the range of the partition as much as possible, to avoid the situation of partition discontinuity, which will reduce the hit rate of the Cache partition,
4. Use the two thresholds of maximum memory and elastic memory to control to avoid frequent elimination of data
Using attachement strategy of brpc to send packet with big size.
BRPC send packet should serialize it first and then send it.
If we send one batch with big size, it will encounter a connection failed.
So we can use attachment strategy to bypass the problem and eliminate
the serialization cost.
Mainly contains the following modifications:
1. Use `std::unique_ptr` to replace some naked pointers
2. Modify some methods from member-method to local-static-function
3. Modify some methods do not need to be public to private
4. Some formatting changes: such as wrapping lines that are too long
5. Remove some useless variables
6. Add or modify some comments for easier understanding
No functional changes in this patch.