1. Support some function alias of mod/fmod, adddate/add_data
2. Support some function of multi args: week, yearweek
3. Fix bug of multi args function call the DATETIME type not effective in DATE type
The two phase batch commit means:
During Stream load, after data is written, the message will be returned to the client,
the data is invisible at this point and the transaction status is PRECOMMITTED.
The data will be visible only after COMMIT is triggered by client.
1. User can invoke the following interface to trigger commit operations for transaction:
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" \
http://fe_host:http_port/api/{db}/_stream_load_2pc
or
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" \
http://be_host:webserver_port/api/{db}/_stream_load_2pc
2.User can invoke the following interface to trigger abort operations for transaction:
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" \
http://fe_host:http_port/api/{db}/_stream_load_2pc
or
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" \
http://be_host:webserver_port/api/{db}/_stream_load_2pc
This PR mainly changes:
1. Change the define of PBlock
The new PBlock consists of a set of PColumnMeta and a binary buffer.
The PColumnMeta records the metadata information of all columns in the Block,
while the buffer stores the serialized binary data of all columns.
2. Refactor the serialize/deserialize method of data type
Rewrite the `serialize()/deserialize()` of IDataType. And also add
a new method `get_uncompressed_serialized_bytes()` to get the total length
of uncompressed serialized data of a column.
3. Rewrite the serialize/deserialize method of Block
Now, when serializing a Block to PBlock, it will first get the total length
of uncompressed serialized data of all columns in this Block, and then allocate
the memory to write the serialized data to the buffer.
4. Use brpc attachment to transmit the serialized column data
Support implement UDF through GRPC protocol. This brings several benefits:
1. The udf implementation language is not limited to c++, users can use any familiar language to implement udf
2. UDF is decoupled from Doris, udf will not cause doris coredump, udf computing resources are separated from doris, and doris services are not affected
But RPC's UDF has a fixed overhead, so its performance is much slower than C++ UDF, especially when the amount of data is large.
Create function like
```
CREATE FUNCTION rpc_add(INT, INT) RETURNS INT PROPERTIES (
"SYMBOL"="add_int",
"OBJECT_FILE"="127.0.0.1:9999",
"TYPE"="RPC"
);
```
Function service need to implement `check_fn` and `fn_call` methods
Note:
THIS IS AN EXPERIMENTAL FEATURE, THE INTERFACE AND DATA STRUCTURE MAY BE CHANGED IN FUTURE !!!
This PR mainly changes:
1. Fix bug when enable `transfer_data_by_brpc_attachment`
In `data_stream_sender`, we will send a serialized PRowBatch data to multiple Channels.
And if `transfer_data_by_brpc_attachment` is enabled, we will mistakenly clear the data in PRowBatch
after sending PRowBatch to the first Channel.
As a result, the following Channel cannot receive the correct data, causing an error.
So I use a separate buffer instead of `tuple_data` in PRowBatch to store the serialized data
and reuse it in multiple channels.
2. Fix bug that the the offset in serialized row batch may overflow
Use int64 to replace int32 offset. And for compatibility, add a new field `new_tuple_offsets` in PRowBatch.
Change 1: Support an adaptive runtime filter: IN_OR_BLOOM_FILTER
The processing logic is
If the number of rows in the right table < runtime_filter_max_in_num, then IN predicate will work
If the number of rows in the right table >= runtime_filter_max_in_num, then Bloom filter can take effect
Change 2: The default runtime filter is changed to filter: IN_OR_BLOOM_FILTER
Close related #7389
Support create Iceberg external table in Doris.
This is the first step to support Iceberg external table.
### Create Iceberg external table
This pr describes two ways to create Iceberg external tables. Both ways do not require explicitly specifying column definitions, Doris automatically converts them based on Iceberg's column definitions.
1. Create an Iceberg external table directly
```sql
CREATE [EXTERNAL] TABLE table_name
ENGINE = ICEBERG
[COMMENT "comment"]
PROPERTIES (
"iceberg.database" = "iceberg_db_name",
"iceberg.table" = "icberg_table_name",
"iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);
```
2. Create an Iceberg database and automatically create all the tables under that db.
```sql
CREATE DATABASE db_name
[COMMENT "comment"]
PROPERTIES (
"iceberg.database" = "iceberg_db_name",
"iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);
```
### Show table creation
1. For individual tables you can view them with `help show create table`.
```sql
mysql> show create table iceberg_db.logs_1;
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logs_1 | CREATE TABLE `logs_1` (
`level` varchar(-1) NOT NULL COMMENT "null",
`event_time` datetime NOT NULL COMMENT "null",
`message` varchar(-1) NOT NULL COMMENT "null"
) ENGINE=ICEBERG
COMMENT "ICEBERG"
PROPERTIES (
"iceberg.database" = "doris",
"iceberg.table" = "logs_1",
"iceberg.hive.metastore.uris" = "thrift://10.10.10.10:9087",
"iceberg.catalog.type" = "HIVE_CATALOG"
) |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
2. For Iceberg database, you can view it with `help show table creation`.
```sql
mysql> show table creation from iceberg_db;
+--------+---------+---------------------+---------------------------------------------------------+
| Table | Status | Create Time | Error Msg |
+--------+---------+---------------------+---------------------------------------------------------+
| logs | fail | 2021-12-14 13:50:10 | Cannot convert unknown type to Doris type: list<string> |
| logs_1 | success | 2021-12-14 13:50:10 | |
+--------+---------+---------------------+---------------------------------------------------------+
2 rows in set (0.00 sec)
```
This is a new syntax.
Show table creation records in Iceberg database:
Syntax:
```sql
SHOW TABLE CREATION [FROM db] [LIKE mask]
```
Currently, if we encounter a problem with a replica of a tablet during the load process,
such as a write error, rpc error, -235, etc., it will cause the entire load job to fail,
which results in a significant reduction in Doris' fault tolerance.
This PR mainly changes:
1. refined the judgment of failed replicas in the load process, so that the failure of a few replicas will not affect the normal completion of the load job.
2. fix a bug introduced from #7754 that may cause BE coredump
This PR mainly changes:
1. Help to Cancel the load job ASAP when encounter unqualified data.
Solution is described in #6318 .
Also replace some std::stringstream with fmt::memory_buffer to avoid performance issues.
2. fix a NPE bug when create user with empty host
3. fix compile warning after rebasing the master(vectorization)
# Proposed changes
Issue Number: close#6238
Co-authored-by: HappenLee <happenlee@hotmail.com>
Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com>
Co-authored-by: Zhengguo Yang <yangzhgg@gmail.com>
Co-authored-by: wangbo <506340561@qq.com>
Co-authored-by: emmymiao87 <522274284@qq.com>
Co-authored-by: Pxl <952130278@qq.com>
Co-authored-by: zhangstar333 <87313068+zhangstar333@users.noreply.github.com>
Co-authored-by: thinker <zchw100@qq.com>
Co-authored-by: Zeno Yang <1521564989@qq.com>
Co-authored-by: Wang Shuo <wangshuo128@gmail.com>
Co-authored-by: zhoubintao <35688959+zbtzbtzbt@users.noreply.github.com>
Co-authored-by: Gabriel <gabrielleebuaa@gmail.com>
Co-authored-by: xinghuayu007 <1450306854@qq.com>
Co-authored-by: weizuo93 <weizuo@apache.org>
Co-authored-by: yiguolei <guoleiyi@tencent.com>
Co-authored-by: anneji-dev <85534151+anneji-dev@users.noreply.github.com>
Co-authored-by: awakeljw <993007281@qq.com>
Co-authored-by: taberylyang <95272637+taberylyang@users.noreply.github.com>
Co-authored-by: Cui Kaifeng <48012748+azurenake@users.noreply.github.com>
## Problem Summary:
### 1. Some code from clickhouse
**ClickHouse is an excellent implementation of the vectorized execution engine database,
so here we have referenced and learned a lot from its excellent implementation in terms of
data structure and function implementation.
We are based on ClickHouse v19.16.2.2 and would like to thank the ClickHouse community and developers.**
The following comment has been added to the code from Clickhouse, eg:
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/AggregationCommon.h
// and modified by Doris
### 2. Support exec node and query:
* vaggregation_node
* vanalytic_eval_node
* vassert_num_rows_node
* vblocking_join_node
* vcross_join_node
* vempty_set_node
* ves_http_scan_node
* vexcept_node
* vexchange_node
* vintersect_node
* vmysql_scan_node
* vodbc_scan_node
* volap_scan_node
* vrepeat_node
* vschema_scan_node
* vselect_node
* vset_operation_node
* vsort_node
* vunion_node
* vhash_join_node
You can run exec engine of SSB/TPCH and 70% TPCDS stand query test set.
### 3. Data Model
Vec Exec Engine Support **Dup/Agg/Unq** table, Support Block Reader Vectorized.
Segment Vec is working in process.
### 4. How to use
1. Set the environment variable `set enable_vectorized_engine = true; `(required)
2. Set the environment variable `set batch_size = 4096; ` (recommended)
### 5. Some diff from origin exec engine
https://github.com/doris-vectorized/doris-vectorized/issues/294
## Checklist(Required)
1. Does it affect the original behavior: (No)
2. Has unit tests been added: (Yes)
3. Has document been added or modified: (No)
4. Does it need to update dependencies: (No)
5. Are there any changes that cannot be rolled back: (Yes)
If an load task has a relatively short timeout, then we need to ensure that
each RPC of this task does not get blocked for a long time.
And an RPC is usually blocked for two reasons.
1. handling "memory exceeds limit" in the RPC
If the system finds that the memory occupied by the load exceeds the threshold,
it will select the load channel that occupies the most memory and flush the memtable in it.
this operation is done in the RPC, which may be more time consuming.
2. close the load channel
When the load channel receives the last batch, it will end the task.
It will wait for all memtables flushes to finish synchronously. This process is also time consuming.
Therefore, this PR solves this problem by.
1. Use timeout to determine whether it is a high-priority load task
If the timeout of an load task is relatively short, then we mark it as a high-priority task.
2. not processing "memory exceeds limit" for high priority tasks
3. use a separate flush thread to flush memtable for high priority tasks.
Support merge IN predicate when exist remote target(e.g. shuffle hash join).
Remote the code that IN predicate implicit conversion to Bloom filter then exist remote target.
Close related #7546
For the first, we need to make a parameter to discribe the data is local or remote.
At then, we need to support some basic function to support the operation for remote storage.
1. Fix some memory leaks
2. Remove redundant and invalid code
3. Fix some buggy writes to reduce extra memory copies and return null pointers to string
4. Reframing the naming to make the structure clearer
Transfer RowBatch in Protobuf Request to Controller Attachment,
when the maximum length of the RowBatch in the Protobuf Request is exceeded.
This can avoid reaching the upper limit of the Protobuf Request length (2G),
and it is expected that performance can be improved.
This is beacuse of an const MAX_PHYSICAL_PACKET_LENGTH in fe should be 2^24 -1,
but it is set as 2^24 -2 by mistake.
2. Fix bitmap_to_string may failed when the result is large than 2G
If the calculation of the lateral view function is completed,
the result will be directly returned to the upper layer.
It will cause a lot of memory copy and network transmission.
The reason is that the original column that generally participates
in the lateral view is very likely to be a very long value.
If Doris still retain this column after calculating the lateral view,
it need to perform a memory copy.
However, in many cases, the upper plan node does not need the original columns of the lateral view,
so it is necessary to perform column pruning after the calculation of the lateral view,
so as to avoid useless memory copy and network transmission.
For example, the following query can prune the original column v1
```select k1, e1 from table lateral view explode_split(v1, ",") tmp as e1;```
The `outputSlotIds` in TableFunctionNode is used to store the columns that should be retained after pruning.
* Support scalar function in lateral view
The child 0 of explode_split function could be a scalar function
such as: concat(k1, ",", k2)
This pr mainly detects whether the lateral view with function satisfies the following specifications in semantics.
1. The columns in the function must all belong to the original table
2. The function must be a scalar function
1. Forbidden non-string column as params of explode_view.
The first param of explode_view must be string column(VARCHAR/CHAR/STRING)
2. N-1 n lateral views map one TableFunctionNode
The TableFunctionNode include all of fnExprs which belongs to one table.
For example:
select pageid,mycol1, mycol2 from pageAds
lateral view explode_string(col1) myTable1 as mycol1
lateral view explode_string(col2) myTable2 as mycol2;
TableFunctionNode
|----
|- fnExprList: explode_string(col1), explode_string(col2)
Users can directly query the data in the hive table in Doris, and can use join to perform complex queries without laboriously importing data from hive.
Main changes list below:
FE:
Extend HiveScanNode from BrokerScanNode
HiveMetaStoreClientHelper communicate with HIVE and HDFS.
BE:
Treate HiveScanNode as BrokerScanNode, treate HiveTable as BrokerTable.
broker_scanner.cpp: suppot read column from HDFS path.
orc_scanner.cpp: support read hdfs file.
POM:
Add hive.version=2.3.7, hive-metastore and hive-exec
Add hadoop.version=2.8.0, hadoop-hdfs
Upgrade commons-lang to fix incompatiblity of Java 9 and later.
Thrift:
Add THiveTable
Add read_by_column_def in TBrokerRangeDesc
Added bprc stub cache check and reset api, used to test whether the bprc stub cache is available, and reset the bprc stub cache
add a config used for auto check and reset bprc stub
Add a use_path_style property for S3
Upgrade hadoop-common and hadoop-aws to 2.8.0 to support path style property
Fix some S3 URI bugs
Add some logs for tracing load process.