- Add two new types to stream load boker load: **csv_with_names** and **csv_with_name_sand_types**
- Add two new types to export: **csv_with_names** and **csv_with_names_and_types**
Add a new column-type to speed up the approximation of quantiles.
1. The new column-type is named `quantile_state` with fixed aggregation function `quantile_union`, which stores the intermediate results of pre-aggregated approximation calculations for quantiles.
2. support pre-aggregation of new column-type and quantile_state related functions.
1. support SHOW LAST INSERT
In the current implementation, the insert operation returns a json string to describe the result information
of the insert. But this information is in the session track field of the mysql protocol,
and it is difficult to obtain programmatically.
Therefore, I provide a new syntax `show last insert` to explicitly obtain the result of the latest insert operation,
and return a normal query result set to facilitate the user to obtain the result information of the insert.
2. the `ReturnRows` field in fe.audit.log of insert operation will be set to the loaded row num of the insert.
3. Fix a bug described in #8354
In some scenarios, users cannot find a suitable hash key to avoid data skew, so we need to provide an additional data distribution for olap table to avoid data skew
example:
CREATE TABLE random_table
(
siteid INT DEFAULT '10',
citycode SMALLINT,
username VARCHAR(32) DEFAULT '',
pv BIGINT SUM DEFAULT '0'
)
AGGREGATE KEY(siteid, citycode, username)
DISTRIBUTED BY random BUCKETS 10
PROPERTIES("replication_num" = "1");
Co-authored-by: caiconghui1 <caiconghui1@jd.com>
* [improvement](show) Support that user can use show data skew statement instead of admin
This PR mainly do two things:
1. Support that user can use show data skew statement instead of admin
2. Fix fe ut failed caused by pr [improvement](rewrite) Make RewriteDateLiteralRule to be compatible with mysql #7876 and pr [feature-wip](iceberg) Step1: Support create Iceberg external table #7391
Co-authored-by: caiconghui1 <caiconghui1@jd.com>
Close related #7389
Support create Iceberg external table in Doris.
This is the first step to support Iceberg external table.
### Create Iceberg external table
This pr describes two ways to create Iceberg external tables. Both ways do not require explicitly specifying column definitions, Doris automatically converts them based on Iceberg's column definitions.
1. Create an Iceberg external table directly
```sql
CREATE [EXTERNAL] TABLE table_name
ENGINE = ICEBERG
[COMMENT "comment"]
PROPERTIES (
"iceberg.database" = "iceberg_db_name",
"iceberg.table" = "icberg_table_name",
"iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);
```
2. Create an Iceberg database and automatically create all the tables under that db.
```sql
CREATE DATABASE db_name
[COMMENT "comment"]
PROPERTIES (
"iceberg.database" = "iceberg_db_name",
"iceberg.hive.metastore.uris" = "thrift://192.168.0.1:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);
```
### Show table creation
1. For individual tables you can view them with `help show create table`.
```sql
mysql> show create table iceberg_db.logs_1;
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logs_1 | CREATE TABLE `logs_1` (
`level` varchar(-1) NOT NULL COMMENT "null",
`event_time` datetime NOT NULL COMMENT "null",
`message` varchar(-1) NOT NULL COMMENT "null"
) ENGINE=ICEBERG
COMMENT "ICEBERG"
PROPERTIES (
"iceberg.database" = "doris",
"iceberg.table" = "logs_1",
"iceberg.hive.metastore.uris" = "thrift://10.10.10.10:9087",
"iceberg.catalog.type" = "HIVE_CATALOG"
) |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
2. For Iceberg database, you can view it with `help show table creation`.
```sql
mysql> show table creation from iceberg_db;
+--------+---------+---------------------+---------------------------------------------------------+
| Table | Status | Create Time | Error Msg |
+--------+---------+---------------------+---------------------------------------------------------+
| logs | fail | 2021-12-14 13:50:10 | Cannot convert unknown type to Doris type: list<string> |
| logs_1 | success | 2021-12-14 13:50:10 | |
+--------+---------+---------------------+---------------------------------------------------------+
2 rows in set (0.00 sec)
```
This is a new syntax.
Show table creation records in Iceberg database:
Syntax:
```sql
SHOW TABLE CREATION [FROM db] [LIKE mask]
```
Add a new field `Lag` in result of `show routine load` stmt.
`Lag: {"0":10, "1":0}` means kafka partition 0 has 10 msg behind and partition 1 is update-to-date.
```
EXPORT TABLE xxx
...
PROPERTIES
(
"label" = "mylabel",
...
);
```
And than user can use label to get the info by SHOW EXPORT stmt:
```
show export from db where label="mylabel";
```
For compatibility, if not specified, a random label will be used. And for history jobs, the label will be "export_job_id";
Not like LOAD stmt, here we specify label in `properties` because this will not cause grammatical conflicts,
and there is no need to modify the meta version of the metadata.
```
alter routine load for cmy2 from kafka("kafka_broker_list" = "ip2:9094", "kafka_topic" = "my_topic");
```
This is useful when the kafka broker list or topic has been changed.
Also modify `show create routine load`, support showing "kafka_partitions" and "kafka_offsets".
[Update] Support update syntax
The current update syntax only supports updating the filtered data of a single table.
Syntax:
* UPDATE table_reference
* SET assignment_list
* [WHERE where_condition]
*
* value:
* {expr}
*
* assignment:
* col_name = value
*
* assignment_list:
* assignment [, assignment] ...
Example
Update unique_table
set v1=1
where k1=1
New Frontend Config: enable_concurrent_update
This configuration is used to control whether multi update stmt can be executed concurrently in one table.
Default value is false which means A table can only have one update task being executed at the same time.
If users want to update the same table concurrently,
they need to modify the configuration value to true and restart the master frontend.
Concurrent updates may cause write conflicts, the result is uncertain, please be careful.
The main realization principle:
1. Read the rows that meet the conditions according to the conditions set by where clause.
2. Modify the result of the row according to the set clause.
3. Write the modified row back to the table.
Some restrictions on the use of update syntax.
1. Only the unique table can be updated
2. Only the value column of the unique table can be updated
3. The where clause currently only supports single tables
Possible risks:
1. Since the current implementation method is a row update,
when the same table is updated concurrently, there may be concurrency conflicts which may cause the incorrect result.
2. Once the conditions of the where clause are unsatisfactory, it is likely to cause a full table scan and affect query performance.
Please pay attention to whether the column in the where clause can match the index when using it.
[Docs][Update] Add update document and sql-reference
Fixed#6229
## Proposed changes
Add transaction for the operation of insert. It will cost less time than non-transaction(it will cost 1/1000 time) when you want to insert a amount of rows.
### Syntax
```
BEGIN [ WITH LABEL label];
INSERT INTO table_name ...
[COMMIT | ROLLBACK];
```
### Example
commit a transaction:
```
begin;
insert into Tbl values(11, 22, 33);
commit;
```
rollback a transaction:
```
begin;
insert into Tbl values(11, 22, 33);
rollback;
```
commit a transaction with label:
```
begin with label test_label;
insert into Tbl values(11, 22, 33);
commit;
```
### Description
```
begin: begin a transaction, the next insert will execute in the transaction until commit/rollback;
commit: commit the transaction, the data in the transaction will be inserted into the table;
rollback: abort the transaction, nothing will be inserted into the table;
```
### The main realization principle:
```
1. begin a transaction in the session. next sql is executed in the transaction;
2. insert sql will be parser and get the database name and table name, they will be used to select a be and create a pipe to accept data;
3. all inserted values will be sent to the be and write into the pipe;
4. a thread will get the data from the pipe, then write them to disk;
5. commit will complete this transaction and make these data visible;
6. rollback will abort this transaction
```
### Some restrictions on the use of update syntax.
1. Only ```insert``` can be called in a transaction.
2. If something error happened, ```commit``` will not succeed, it will ```rollback``` directly;
3. By default, if part of insert in the transaction is invalid, ```commit``` will only insert the other correct data into the table.
4. If you need ```commit``` return failed when any insert in the transaction is invalid, you need execute ```set enable_insert_strict = true``` before ```begin```.
In BE, when a problem happened, in the log, we can find the database id, table id, partition id,
but no database name, table name, partition name.
In FE, there also no way to find database name/table name/partition name accourding to
database id/table id/partition id. Therefore, this patch add 3 new commands:
1. show database id;
mysql> show database 10002;
+----------------------+
| DbName |
+----------------------+
| default_cluster:test |
+----------------------+
2. show table id;
mysql> show table 11100;
+----------------------+-----------+-------+
| DbName | TableName | DbId |
+----------------------+-----------+-------+
| default_cluster:test | table2 | 10002 |
+----------------------+-----------+-------+
3. show partition id;
mysql> show partition 11099;
+----------------------+-----------+---------------+-------+---------+
| DbName | TableName | PartitionName | DbId | TableId |
+----------------------+-----------+---------------+-------+---------+
| default_cluster:test | table2 | p201708 | 10002 | 11100 |
+----------------------+-----------+---------------+-------+---------+
Support when creating a kafka routine load, start consumption from a specified point in time instead of a specific offset.
eg:
```
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.kafka_default_offsets" = "2021-10-10 11:00:00"
);
or
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00"
);
```
This PR also reconstructed the analysis method of properties when creating or altering
routine load jobs, and unified the analysis process in the `RoutineLoadDataSourceProperties` class.
Currently, the `show data` does not support sorting. When the number of tables increases, it is inconvenient to manage. Need to support sorting
like:
```
mysql> show data order by ReplicaCount desc,Size asc;
+-----------+-------------+--------------+
| TableName | Size | ReplicaCount |
+-----------+-------------+--------------+
| table_c | 3.102 KB | 40 |
| table_d | .000 | 20 |
| table_b | 324.000 B | 20 |
| table_a | 1.266 KB | 10 |
| Total | 4.684 KB | 90 |
| Quota | 1024.000 GB | 1073741824 |
| Left | 1024.000 GB | 1073741734 |
+-----------+-------------+--------------+
```
1. Support where clause in export stmt which only export selected rows.
The syntax is following:
Export table [table name]
where [expr]
To xxx
xxxx
It will filter table rows.
Only rows that meet the where condition can be exported.
2. Support utf8 separator
3. Support export to local
The syntax is following:
Export table [table name]
To (file:///xxx/xx/xx)
If user export rows to local, the broker properties is not requried.
User only need to create a local folder to store data, and fill in the path of the folder starting with file://
Change-Id: Ib7e7ece5accb3e359a67310b0bf006d42cd3f6f5
Support conditional filtering of original data in broker load and routine load
eg:
```
LOAD LABEL `label1`
(
DATA INFILE ('bos://cmy-repo/1.csv')
INTO TABLE tbl2
COLUMNS TERMINATED BY '\t'
(event_day, product_id, ocpc_stage, user_id)
SET (
ocpc_stage = ocpc_stage + 100
)
PRECEDING FILTER user_id = 1381035
WHERE ocpc_stage > 30
)
...
```
Support delete statement like:
1. delete from table partitions(p1, p2) where xxx; // apply to p1, p2
2. delete from table where xxx; // apply to all partitions
Also remove code about the deprecated sync/async delete job.
This CL changes FE meta version to 94
add a flag of fuzzy_parse, if the json file all object keys are the same and has same order, we only need to parse the first row, and then use index instead key to parse value
The name and another config name are close to each other and are indistinguishable.
So this pr modify the name.
The document description has also been changed