[Refactor][doc] add doc for auth management and conf (#8951)
This commit is contained in:
@ -456,7 +456,7 @@ Cgroups assigned to doris
|
||||
### `doris_max_scan_key_num`
|
||||
|
||||
* Type: int
|
||||
* Description: Used to limit the maximum number of scan keys that a scan node can split in a query request. When a conditional query request reaches the scan node, the scan node will try to split the conditions related to the key column in the query condition into multiple scan key ranges. After that, these scan key ranges will be assigned to multiple scanner threads for data scanning. A larger value usually means that more scanner threads can be used to increase the parallelism of the scanning operation. However, in high concurrency scenarios, too many threads may bring greater scheduling overhead and system load, and will slow down the query response speed. An empirical value is 50. This configuration can be configured separately at the session level. For details, please refer to the description of `max_scan_key_num` in [Variables](../variables.md).
|
||||
* Description: Used to limit the maximum number of scan keys that a scan node can split in a query request. When a conditional query request reaches the scan node, the scan node will try to split the conditions related to the key column in the query condition into multiple scan key ranges. After that, these scan key ranges will be assigned to multiple scanner threads for data scanning. A larger value usually means that more scanner threads can be used to increase the parallelism of the scanning operation. However, in high concurrency scenarios, too many threads may bring greater scheduling overhead and system load, and will slow down the query response speed. An empirical value is 50. This configuration can be configured separately at the session level. For details, please refer to the description of `max_scan_key_num` in [Variables](../../advanced/variables.md).
|
||||
* Default value: 1024
|
||||
|
||||
When the concurrency cannot be improved in high concurrency scenarios, try to reduce this value and observe the impact.
|
||||
@ -787,7 +787,7 @@ The maximum external scan cache batch count, which means that the cache max_memo
|
||||
### `max_pushdown_conditions_per_column`
|
||||
|
||||
* Type: int
|
||||
* Description: Used to limit the maximum number of conditions that can be pushed down to the storage engine for a single column in a query request. During the execution of the query plan, the filter conditions on some columns can be pushed down to the storage engine, so that the index information in the storage engine can be used for data filtering, reducing the amount of data that needs to be scanned by the query. Such as equivalent conditions, conditions in IN predicates, etc. In most cases, this parameter only affects queries containing IN predicates. Such as `WHERE colA IN (1,2,3,4, ...)`. A larger number means that more conditions in the IN predicate can be pushed to the storage engine, but too many conditions may cause an increase in random reads, and in some cases may reduce query efficiency. This configuration can be individually configured for session level. For details, please refer to the description of `max_pushdown_conditions_per_column` in [Variables](../ variables.md).
|
||||
* Description: Used to limit the maximum number of conditions that can be pushed down to the storage engine for a single column in a query request. During the execution of the query plan, the filter conditions on some columns can be pushed down to the storage engine, so that the index information in the storage engine can be used for data filtering, reducing the amount of data that needs to be scanned by the query. Such as equivalent conditions, conditions in IN predicates, etc. In most cases, this parameter only affects queries containing IN predicates. Such as `WHERE colA IN (1,2,3,4, ...)`. A larger number means that more conditions in the IN predicate can be pushed to the storage engine, but too many conditions may cause an increase in random reads, and in some cases may reduce query efficiency. This configuration can be individually configured for session level. For details, please refer to the description of `max_pushdown_conditions_per_column` in [Variables](../../advanced/variables.html).
|
||||
* Default value: 1024
|
||||
|
||||
* Example
|
||||
@ -1154,7 +1154,7 @@ Cache for storage page size
|
||||
* 3./home/disk2/doris, indicates capacity limit is disk capacity, HDD(default)
|
||||
|
||||
eg.2: `storage_root_path=/home/disk1/doris,medium:hdd,capacity:50;/home/disk2/doris,medium:ssd,capacity:50`
|
||||
|
||||
|
||||
* 1./home/disk1/doris,medium:hdd,capacity:10,capacity limit is 10GB, HDD;
|
||||
* 2./home/disk2/doris,medium:ssd,capacity:50,capacity limit is 50GB, SSD;
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
---
|
||||
---
|
||||
{
|
||||
"title": "User Property",
|
||||
"language": "en"
|
||||
|
||||
@ -221,4 +221,6 @@ Here are some usage scenarios of Doris privilege system.
|
||||
|
||||
Doris itself does not support blacklist, only whitelist, but we can simulate blacklist in some way. Suppose you first create a user named `user@'192.%'`, which allows users from `192.*` to login. At this time, if you want to prohibit users from `192.168.10.1` from logging in, you can create another user with `cmy@'192.168.10.1'` and set a new password. Since `192.168.10.1` has a higher priority than `192.%`, user can no longer login by using the old password from `192.168.10.1`.
|
||||
|
||||
## More help
|
||||
|
||||
For more detailed syntax and best practices for permission management use, please refer to the [GRANTS](../sql-manual/sql-reference-v2/Account-Management-Statements/GRANT.html) command manual. Enter `HELP GRANTS` at the command line of the MySql client for more help information.
|
||||
|
||||
@ -24,4 +24,180 @@ specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# Broker
|
||||
# Broker
|
||||
|
||||
Broker is an optional process in the Doris cluster. It is mainly used to support Doris to read and write files or directories on remote storage, such as HDFS, BOS, and AFS.
|
||||
|
||||
Broker provides services through an RPC service port. It is a stateless JVM process that is responsible for encapsulating some POSIX-like file operations for read and write operations on remote storage, such as open, pred, pwrite, and so on.
|
||||
In addition, the Broker does not record any other information, so the connection information, file information, permission information, and so on stored remotely need to be passed to the Broker process in the RPC call through parameters in order for the Broker to read and write files correctly .
|
||||
|
||||
Broker only acts as a data channel and does not participate in any calculations, so it takes up less memory. Usually one or more Broker processes are deployed in a Doris system. And the same type of Broker will form a group and set a ** Broker name **.
|
||||
|
||||
Broker's position in the Doris system architecture is as follows:
|
||||
|
||||
```
|
||||
+----+ +----+
|
||||
| FE | | BE |
|
||||
+-^--+ +--^-+
|
||||
| |
|
||||
| |
|
||||
+-v---------v-+
|
||||
| Broker |
|
||||
+------^------+
|
||||
|
|
||||
|
|
||||
+------v------+
|
||||
|HDFS/BOS/AFS |
|
||||
+-------------+
|
||||
```
|
||||
|
||||
This document mainly introduces the parameters that Broker needs when accessing different remote storages, such as connection information,
|
||||
authorization information, and so on.
|
||||
|
||||
## Supported Storage System
|
||||
|
||||
Different types of brokers support different storage systems.
|
||||
|
||||
1. Community HDFS
|
||||
|
||||
* Support simple authentication access
|
||||
* Support kerberos authentication access
|
||||
* Support HDFS HA mode access
|
||||
|
||||
2. Baidu HDFS / AFS (not supported by open source version)
|
||||
|
||||
* Support UGI simple authentication access
|
||||
|
||||
3. Baidu Object Storage BOS (not supported by open source version)
|
||||
|
||||
* Support AK / SK authentication access
|
||||
|
||||
## Function provided by Broker
|
||||
|
||||
1. [Broker Load](../data-operate/import/import-way/broker-load-manual.html)
|
||||
2. [Export](../data-operate/export/export-manual.html)
|
||||
3. [Backup](../admin-manual/data-admin/backup.md)
|
||||
|
||||
## Broker Information
|
||||
|
||||
Broker information includes two parts: ** Broker name ** and ** Certification information **. The general syntax is as follows:
|
||||
|
||||
```
|
||||
WITH BROKER "broker_name"
|
||||
(
|
||||
"username" = "xxx",
|
||||
"password" = "yyy",
|
||||
"other_prop" = "prop_value",
|
||||
...
|
||||
);
|
||||
```
|
||||
|
||||
### Broker Name
|
||||
|
||||
Usually the user needs to specify an existing Broker Name through the `WITH BROKER" broker_name "` clause in the operation command.
|
||||
Broker Name is a name that the user specifies when adding a Broker process through the ALTER SYSTEM ADD BROKER command.
|
||||
A name usually corresponds to one or more broker processes. Doris selects available broker processes based on the name.
|
||||
You can use the `SHOW BROKER` command to view the Brokers that currently exist in the cluster.
|
||||
|
||||
**Note: Broker Name is just a user-defined name and does not represent the type of Broker.**
|
||||
|
||||
### Certification Information
|
||||
|
||||
Different broker types and different access methods need to provide different authentication information.
|
||||
Authentication information is usually provided as a Key-Value in the Property Map after `WITH BROKER" broker_name "`.
|
||||
|
||||
#### Community HDFS
|
||||
|
||||
1. Simple Authentication
|
||||
|
||||
Simple authentication means that Hadoop configures `hadoop.security.authentication` to` simple`.
|
||||
|
||||
Use system users to access HDFS. Or add in the environment variable started by Broker: ```HADOOP_USER_NAME```.
|
||||
|
||||
```
|
||||
(
|
||||
"username" = "user",
|
||||
"password" = ""
|
||||
);
|
||||
```
|
||||
|
||||
Just leave the password blank.
|
||||
|
||||
2. Kerberos Authentication
|
||||
|
||||
The authentication method needs to provide the following information::
|
||||
|
||||
* `hadoop.security.authentication`: Specify the authentication method as kerberos.
|
||||
* `kerberos_principal`: Specify the principal of kerberos.
|
||||
* `kerberos_keytab`: Specify the path to the keytab file for kerberos. The file must be an absolute path to a file on the server where the broker process is located. And can be accessed by the Broker process.
|
||||
* `kerberos_keytab_content`: Specify the content of the keytab file in kerberos after base64 encoding. You can choose one of these with `kerberos_keytab` configuration.
|
||||
|
||||
Examples are as follows:
|
||||
|
||||
```
|
||||
(
|
||||
"hadoop.security.authentication" = "kerberos",
|
||||
"kerberos_principal" = "doris@YOUR.COM",
|
||||
"kerberos_keytab" = "/home/doris/my.keytab"
|
||||
)
|
||||
```
|
||||
```
|
||||
(
|
||||
"hadoop.security.authentication" = "kerberos",
|
||||
"kerberos_principal" = "doris@YOUR.COM",
|
||||
"kerberos_keytab_content" = "ASDOWHDLAWIDJHWLDKSALDJSDIWALD"
|
||||
)
|
||||
```
|
||||
If Kerberos authentication is used, the [krb5.conf](https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html) file is required when deploying the Broker process.
|
||||
The krb5.conf file contains Kerberos configuration information, Normally, you should install your krb5.conf file in the directory /etc. You can override the default location by setting the environment variable KRB5_CONFIG.
|
||||
An example of the contents of the krb5.conf file is as follows:
|
||||
```
|
||||
[libdefaults]
|
||||
default_realm = DORIS.HADOOP
|
||||
default_tkt_enctypes = des3-hmac-sha1 des-cbc-crc
|
||||
default_tgs_enctypes = des3-hmac-sha1 des-cbc-crc
|
||||
dns_lookup_kdc = true
|
||||
dns_lookup_realm = false
|
||||
|
||||
[realms]
|
||||
DORIS.HADOOP = {
|
||||
kdc = kerberos-doris.hadoop.service:7005
|
||||
}
|
||||
```
|
||||
|
||||
3. HDFS HA Mode
|
||||
|
||||
This configuration is used to access HDFS clusters deployed in HA mode.
|
||||
|
||||
* `dfs.nameservices`: Specify the name of the hdfs service, custom, such as "dfs.nameservices" = "my_ha".
|
||||
* `dfs.ha.namenodes.xxx`: Custom namenode names. Multiple names are separated by commas, where xxx is the custom name in `dfs.nameservices`, such as" dfs.ha.namenodes.my_ha "=" my_nn ".
|
||||
* `dfs.namenode.rpc-address.xxx.nn`: Specify the rpc address information of namenode, Where nn represents the name of the namenode configured in `dfs.ha.namenodes.xxx`, such as: "dfs.namenode.rpc-address.my_ha.my_nn" = "host:port".
|
||||
* `dfs.client.failover.proxy.provider`: Specify the provider for the client to connect to the namenode. The default is: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.
|
||||
|
||||
Examples are as follows:
|
||||
|
||||
```
|
||||
(
|
||||
"dfs.nameservices" = "my_ha",
|
||||
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
|
||||
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
|
||||
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
|
||||
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
|
||||
)
|
||||
```
|
||||
|
||||
The HA mode can be combined with the previous two authentication methods for cluster access. If you access HA HDFS with simple authentication:
|
||||
|
||||
```
|
||||
(
|
||||
"username"="user",
|
||||
"password"="passwd",
|
||||
"dfs.nameservices" = "my_ha",
|
||||
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
|
||||
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
|
||||
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
|
||||
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
|
||||
)
|
||||
```
|
||||
The configuration for accessing the HDFS cluster can be written to the hdfs-site.xml file. When users use the Broker process to read data from the HDFS cluster, they only need to fill in the cluster file path and authentication information.
|
||||
|
||||
|
||||
@ -26,3 +26,80 @@ under the License.
|
||||
|
||||
# Bucket Shuffle Join
|
||||
|
||||
Bucket Shuffle Join is a new function officially added in Doris 0.14. The purpose is to provide local optimization for some join queries to reduce the time-consuming of data transmission between nodes and speed up the query.
|
||||
|
||||
It's design, implementation can be referred to [ISSUE 4394](https://github.com/apache/incubator-doris/issues/4394)。
|
||||
|
||||
## Noun Interpretation
|
||||
|
||||
* FE: Frontend, the front-end node of Doris. Responsible for metadata management and request access.
|
||||
* BE: Backend, Doris's back-end node. Responsible for query execution and data storage.
|
||||
* Left table: the left table in join query. Perform probe expr. The order can be adjusted by join reorder.
|
||||
* Right table: the right table in join query. Perform build expr The order can be adjusted by join reorder.
|
||||
|
||||
## Principle
|
||||
The conventional distributed join methods supported by Doris is: `Shuffle Join, Broadcast Join`. Both of these join will lead to some network overhead.
|
||||
|
||||
For example, there are join queries for table A and table B. the join method is hashjoin. The cost of different join types is as follows:
|
||||
* **Broadcast Join**: If table a has three executing hashjoinnodes according to the data distribution, table B needs to be sent to the three HashJoinNode. Its network overhead is `3B `, and its memory overhead is `3B`.
|
||||
* **Shuffle Join**: Shuffle join will distribute the data of tables A and B to the nodes of the cluster according to hash calculation, so its network overhead is `A + B` and memory overhead is `B`.
|
||||
|
||||
The data distribution information of each Doris table is saved in FE. If the join statement hits the data distribution column of the left table, we should use the data distribution information to reduce the network and memory overhead of the join query. This is the source of the idea of bucket shuffle join.
|
||||
|
||||

|
||||
|
||||
The picture above shows how the Bucket Shuffle Join works. The SQL query is A table join B table. The equivalent expression of join hits the data distribution column of A. According to the data distribution information of table A. Bucket Shuffle Join sends the data of table B to the corresponding data storage and calculation node of table A. The cost of Bucket Shuffle Join is as follows:
|
||||
|
||||
* network cost: ``` B < min(3B, A + B) ```
|
||||
|
||||
* memory cost: ``` B <= min(3B, B) ```
|
||||
|
||||
Therefore, compared with Broadcast Join and Shuffle Join, Bucket shuffle join has obvious performance advantages. It reduces the time-consuming of data transmission between nodes and the memory cost of join. Compared with Doris's original join method, it has the following advantages
|
||||
|
||||
* First of all, Bucket Shuffle Join reduces the network and memory cost which makes some join queries have better performance. Especially when FE can perform partition clipping and bucket clipping of the left table.
|
||||
* Secondly, unlike Colorate Join, it is not intrusive to the data distribution of tables, which is transparent to users. There is no mandatory requirement for the data distribution of the table, which is not easy to lead to the problem of data skew.
|
||||
* Finally, it can provide more optimization space for join reorder.
|
||||
|
||||
## Usage
|
||||
|
||||
### Set session variable
|
||||
|
||||
Set session variable `enable_bucket_shuffle_join` to `true`, FE will automatically plan queries that can be converted to Bucket Shuffle Join.
|
||||
|
||||
```
|
||||
set enable_bucket_shuffle_join = true;
|
||||
```
|
||||
|
||||
In FE's distributed query planning, the priority order is Colorate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join. However, if the user explicitly hints the type of join, for example:
|
||||
|
||||
```
|
||||
select * from test join [shuffle] baseall on test.k1 = baseall.k1;
|
||||
```
|
||||
the above order of preference will not take effect.
|
||||
|
||||
The session variable is set to `true` by default in version 0.14, while it needs to be set to `true` manually in version 0.13.
|
||||
|
||||
### View the type of join
|
||||
|
||||
You can use the `explain` command to check whether the join is a Bucket Shuffle Join
|
||||
|
||||
```sql
|
||||
| 2:HASH JOIN |
|
||||
| | join op: INNER JOIN (BUCKET_SHUFFLE) |
|
||||
| | hash predicates: |
|
||||
| | colocate: false, reason: table not in the same group |
|
||||
| | equal join conjunct: `test`.`k1` = `baseall`.`k1`
|
||||
```
|
||||
|
||||
The join type indicates that the join method to be used is:`BUCKET_SHUFFLE`。
|
||||
|
||||
## Planning rules of Bucket Shuffle Join
|
||||
|
||||
In most scenarios, users only need to turn on the session variable by default to transparently use the performance improvement brought by this join method. However, if we understand the planning rules of Bucket Shuffle Join, we can use it to write more efficient SQL.
|
||||
|
||||
* Bucket Shuffle Join only works when the join condition is equivalent. The reason is similar to Colorate Join. They all rely on hash to calculate the determined data distribution.
|
||||
* The bucket column of two tables is included in the equivalent join condition. When the bucket column of the left table is an equivalent join condition, it has a high probability of being planned as a Bucket Shuffle Join.
|
||||
* Because the hash values of different data types have different calculation results. Bucket Shuffle Join requires that the bucket column type of the left table and the equivalent join column type of the right table should be consistent, otherwise the corresponding planning cannot be carried out.
|
||||
* Bucket Shuffle Join only works on Doris native OLAP tables. For ODBC, MySQL, ES External Table, when they are used as left tables, they cannot be planned as Bucket Shuffle Join.
|
||||
* For partitioned tables, because the data distribution rules of each partition may be different, the Bucket Shuffle Join can only guarantee that the left table is a single partition. Therefore, in SQL execution, we need to use the `where` condition as far as possible to make the partition clipping policy effective.
|
||||
* If the left table is a colorate table, the data distribution rules of each partition are determined. So the bucket shuffle join can perform better on the colorate table.
|
||||
|
||||
232
new-docs/en/advanced/cache/partition-cache.md
vendored
232
new-docs/en/advanced/cache/partition-cache.md
vendored
@ -26,3 +26,235 @@ under the License.
|
||||
|
||||
# Partition Cache
|
||||
|
||||
## Demand scenario
|
||||
|
||||
In most data analysis scenarios, write less and read more. Data is written once and read frequently. For example, the dimensions and indicators involved in a report are calculated at one time in the early morning, but there are hundreds or even thousands of times every day. page access, so it is very suitable for caching the result set. In data analysis or BI applications, the following business scenarios exist:
|
||||
|
||||
- **High concurrency scenario**, Doris can better support high concurrency, but a single server cannot carry too high QPS
|
||||
- **Kanban for complex charts**, complex Dashboard or large-screen applications, the data comes from multiple tables, each page has dozens of queries, although each query is only tens of milliseconds, but the overall query time will be in a few seconds
|
||||
- **Trend analysis**, the query for a given date range, the indicators are displayed by day, such as querying the trend of the number of users in the last 7 days, this type of query has a large amount of data and a wide range of queries, and the query time often takes tens of seconds
|
||||
- **User repeated query**, if the product does not have an anti-reload mechanism, the user repeatedly refreshes the page due to hand error or other reasons, resulting in a large number of repeated SQL submissions
|
||||
|
||||
In the above four scenarios, the solution at the application layer is to put the query results in Redis, update the cache periodically or manually refresh the cache by the user, but this solution has the following problems:
|
||||
|
||||
- **Data inconsistency**, unable to perceive the update of data, causing users to often see old data
|
||||
- **Low hit rate**, cache the entire query result, if the data is written in real time, the cache is frequently invalidated, the hit rate is low and the system load is heavy
|
||||
- **Additional cost**, the introduction of external cache components will bring system complexity and increase additional costs
|
||||
|
||||
## Solution
|
||||
|
||||
This partitioned caching strategy can solve the above problems, giving priority to ensuring data consistency. On this basis, the cache granularity is refined and the hit rate is improved. Therefore, it has the following characteristics:
|
||||
|
||||
- Users do not need to worry about data consistency, cache invalidation is controlled by version, and the cached data is consistent with the data queried from BE
|
||||
- No additional components and costs, cached results are stored in BE's memory, users can adjust the cache memory size as needed
|
||||
- Implemented two caching strategies, SQLCache and PartitionCache, the latter has a finer cache granularity
|
||||
- Use consistent hashing to solve the problem of BE nodes going online and offline. The caching algorithm in BE is an improved LRU
|
||||
|
||||
## SQLCache
|
||||
|
||||
SQLCache stores and retrieves the cache according to the SQL signature, the partition ID of the queried table, and the latest version of the partition. The combination of the three determines a cached data set. If any one changes, such as SQL changes, such as query fields or conditions are different, or the version changes after the data is updated, the cache will not be hit.
|
||||
|
||||
If multiple tables are joined, use the latest updated partition ID and the latest version number. If one of the tables is updated, the partition ID or version number will be different, and the cache will also not be hit.
|
||||
|
||||
SQLCache is more suitable for T+1 update scenarios. Data is updated in the early morning. The results obtained from the BE for the first query are put into the cache, and subsequent identical queries are obtained from the cache. Real-time update data can also be used, but there may be a problem of low hit rate. You can refer to the following PartitionCache.
|
||||
|
||||
## PartitionCache
|
||||
|
||||
### Design Principles
|
||||
|
||||
1. SQL can be split in parallel, Q = Q1 ∪ Q2 ... ∪ Qn, R= R1 ∪ R2 ... ∪ Rn, Q is the query statement, R is the result set
|
||||
2. Split into read-only partitions and updatable partitions, read-only partitions are cached, and update partitions are not cached
|
||||
|
||||
As above, query the number of users per day in the last 7 days, such as partitioning by date, the data is only written to the partition of the day, and the data of other partitions other than the day is fixed. Under the same query SQL, query a certain part that does not update Partition indicators are fixed. As follows, the number of users in the first 7 days is queried on 2020-03-09, the data from 2020-03-03 to 2020-03-07 comes from the cache, the first query on 2020-03-08 comes from the partition, and subsequent queries come from the cache , 2020-03-09 is from the partition because it is constantly being written that day.
|
||||
|
||||
Therefore, when querying N days of data, the data is updated on the most recent D days. Every day is only a query with a different date range and a similar query. Only D partitions need to be queried, and the other parts are from the cache, which can effectively reduce the cluster load and reduce query time.
|
||||
|
||||
```sql
|
||||
MySQL [(none)]> SELECT eventdate,count(userid) FROM testdb.appevent WHERE eventdate>="2020-03-03" AND eventdate<="2020-03-09" GROUP BY eventdate ORDER BY eventdate;
|
||||
+------------+-----------------+
|
||||
| eventdate | count(`userid`) |
|
||||
+------------+-----------------+
|
||||
| 2020-03-03 | 15 |
|
||||
| 2020-03-04 | 20 |
|
||||
| 2020-03-05 | 25 |
|
||||
| 2020-03-06 | 30 |
|
||||
| 2020-03-07 | 35 |
|
||||
| 2020-03-08 | 40 | //First from partition, subsequent from cache
|
||||
| 2020-03-09 | 25 | //from partition
|
||||
+------------+-----------------+
|
||||
7 rows in set (0.02 sec)
|
||||
```
|
||||
|
||||
In PartitionCache, the first-level key of the cache is the 128-bit MD5 signature of the SQL after the partition condition is removed. The following is the rewritten SQL to be signed:
|
||||
|
||||
```sql
|
||||
SELECT eventdate,count(userid) FROM testdb.appevent GROUP BY eventdate ORDER BY eventdate;
|
||||
```
|
||||
|
||||
The cached second-level key is the content of the partition field of the query result set, such as the content of the eventdate column of the query result above, and the auxiliary information of the second-level key is the version number and version update time of the partition.
|
||||
|
||||
The following demonstrates the process of executing the above SQL for the first time on 2020-03-09:
|
||||
|
||||
1. Get data from cache
|
||||
|
||||
```text
|
||||
+------------+-----------------+
|
||||
| 2020-03-03 | 15 |
|
||||
| 2020-03-04 | 20 |
|
||||
| 2020-03-05 | 25 |
|
||||
| 2020-03-06 | 30 |
|
||||
| 2020-03-07 | 35 |
|
||||
+------------+-----------------+
|
||||
```
|
||||
|
||||
1. SQL and data to get data from BE SQL and data to get data from BE
|
||||
|
||||
```sql
|
||||
SELECT eventdate,count(userid) FROM testdb.appevent WHERE eventdate>="2020-03-08" AND eventdate<="2020-03-09" GROUP BY eventdate ORDER BY eventdate;
|
||||
|
||||
+------------+-----------------+
|
||||
| 2020-03-08 | 40 |
|
||||
+------------+-----------------+
|
||||
| 2020-03-09 | 25 |
|
||||
+------------+-----------------+
|
||||
```
|
||||
|
||||
1. The last data sent to the terminal
|
||||
|
||||
```text
|
||||
+------------+-----------------+
|
||||
| eventdate | count(`userid`) |
|
||||
+------------+-----------------+
|
||||
| 2020-03-03 | 15 |
|
||||
| 2020-03-04 | 20 |
|
||||
| 2020-03-05 | 25 |
|
||||
| 2020-03-06 | 30 |
|
||||
| 2020-03-07 | 35 |
|
||||
| 2020-03-08 | 40 |
|
||||
| 2020-03-09 | 25 |
|
||||
+------------+-----------------+
|
||||
```
|
||||
|
||||
1. data sent to cache
|
||||
|
||||
```text
|
||||
+------------+-----------------+
|
||||
| 2020-03-08 | 40 |
|
||||
+------------+-----------------+
|
||||
```
|
||||
|
||||
Partition cache is suitable for partitioning by date, some partitions are updated in real time, and the query SQL is relatively fixed.
|
||||
|
||||
Partition fields can also be other fields, but need to ensure that only a small number of partition updates.
|
||||
|
||||
### Some restrictions
|
||||
|
||||
- Only OlapTable is supported, other tables such as MySQL have no version information and cannot sense whether the data is updated
|
||||
- Only supports grouping by partition field, does not support grouping by other fields, grouping by other fields, the grouped data may be updated, which will cause the cache to be invalid
|
||||
- Only the first half of the result set, the second half of the result set and all cache hits are supported, and the result set is not supported to be divided into several parts by the cached data
|
||||
|
||||
## How to use
|
||||
|
||||
### Enable SQLCache
|
||||
|
||||
Make sure cache_enable_sql_mode=true in fe.conf (default is true)
|
||||
|
||||
```text
|
||||
vim fe/conf/fe.conf
|
||||
cache_enable_sql_mode=true
|
||||
```
|
||||
|
||||
Setting variables in MySQL command line
|
||||
|
||||
```sql
|
||||
MySQL [(none)]> set [global] enable_sql_cache=true;
|
||||
```
|
||||
|
||||
Note: global is a global variable, not referring to the current session variable
|
||||
|
||||
### Enable PartitionCache
|
||||
|
||||
Make sure cache_enable_partition_mode=true in fe.conf (default is true)
|
||||
|
||||
```text
|
||||
vim fe/conf/fe.conf
|
||||
cache_enable_partition_mode=true
|
||||
```
|
||||
|
||||
Setting variables in MySQL command line
|
||||
|
||||
```sql
|
||||
MySQL [(none)]> set [global] enable_partition_cache=true;
|
||||
```
|
||||
|
||||
If two caching strategies are enabled at the same time, the following parameters need to be paid attention to:
|
||||
|
||||
```text
|
||||
cache_last_version_interval_second=900
|
||||
```
|
||||
|
||||
If the interval between the latest version of the partition is greater than cache_last_version_interval_second, the entire query result will be cached first. If it is less than this interval, if it meets the conditions of PartitionCache, press PartitionCache data.
|
||||
|
||||
### Monitoring
|
||||
|
||||
FE monitoring items:
|
||||
|
||||
```text
|
||||
query_table //Number of tables in Query
|
||||
query_olap_table //Number of Olap tables in Query
|
||||
cache_mode_sql //Identify the number of queries whose cache mode is sql
|
||||
cache_hit_sql //The number of Cache hits by Query with mode sql
|
||||
query_mode_partition //Identify the number of queries whose cache mode is Partition
|
||||
cache_hit_partition //Number of queries hit by Partition
|
||||
partition_all //All partitions scanned in Query
|
||||
partition_hit //Number of partitions hit by Cache
|
||||
|
||||
Cache hit ratio = (cache_hit_sql + cache_hit_partition) / query_olap_table
|
||||
Partition hit ratio = partition_hit / partition_all
|
||||
```
|
||||
|
||||
BE's monitoring items:
|
||||
|
||||
```text
|
||||
query_cache_memory_total_byte //Cache memory size
|
||||
query_query_cache_sql_total_count //Number of SQL in Cache
|
||||
query_cache_partition_total_count //Number of Cache partitions
|
||||
|
||||
SQL average data size = cache_memory_total / cache_sql_total
|
||||
Partition average data size = cache_memory_total / cache_partition_total
|
||||
```
|
||||
|
||||
Other monitoring: You can view the CPU and memory indicators of the BE node, the Query Percentile and other indicators in the Query statistics from Grafana, and adjust the Cache parameters to achieve business goals.
|
||||
|
||||
### Optimization parameters
|
||||
|
||||
The configuration item cache_result_max_row_count of FE, the maximum number of rows in the cache for the query result set, can be adjusted according to the actual situation, but it is recommended not to set it too large to avoid taking up too much memory, and the result set exceeding this size will not be cached.
|
||||
|
||||
```text
|
||||
vim fe/conf/fe.conf
|
||||
cache_result_max_row_count=3000
|
||||
```
|
||||
|
||||
The maximum number of partitions in BE cache_max_partition_count refers to the maximum number of partitions corresponding to each SQL. If it is partitioned by date, it can cache data for more than 2 years. If you want to keep the cache for a longer time, please set this parameter to a larger value and modify it at the same time. Parameter of cache_result_max_row_count.
|
||||
|
||||
```text
|
||||
vim be/conf/be.conf
|
||||
cache_max_partition_count=1024
|
||||
```
|
||||
|
||||
The cache memory setting in BE consists of two parameters, query_cache_max_size and query_cache_elasticity_size (in MB). If the memory exceeds query_cache_max_size + cache_elasticity_size, it will start to clean up and control the memory to below query_cache_max_size. These two parameters can be set according to the number of BE nodes, node memory size, and cache hit rate.
|
||||
|
||||
```text
|
||||
query_cache_max_size_mb=256
|
||||
query_cache_elasticity_size_mb=128
|
||||
```
|
||||
|
||||
Calculation method:
|
||||
|
||||
If 10K queries are cached, each query caches 1000 rows, each row is 128 bytes, distributed on 10 BEs, then each BE requires 128M memory (10K*1000*128/10).
|
||||
|
||||
## Unfinished Matters
|
||||
|
||||
- Can the data of T+1 also be cached by Partition? Currently not supported
|
||||
- Similar SQL, 2 indicators were queried before, but now 3 indicators are queried. Can the cache of 2 indicators be used? Not currently supported
|
||||
- Partition by date, but need to aggregate data by week dimension, is PartitionCache available? Not currently supported
|
||||
|
||||
@ -26,3 +26,384 @@ under the License.
|
||||
|
||||
# Colocation Join
|
||||
|
||||
Colocation Join is a new feature introduced in Doris 0.9. The purpose of this paper is to provide local optimization for some Join queries to reduce data transmission time between nodes and speed up queries.
|
||||
|
||||
The original design, implementation and effect can be referred to [ISSUE 245](https://github.com/apache/incubator-doris/issues/245).
|
||||
|
||||
The Colocation Join function has undergone a revision, and its design and use are slightly different from the original design. This document mainly introduces Colocation Join's principle, implementation, usage and precautions.
|
||||
|
||||
## Noun Interpretation
|
||||
|
||||
* FE: Frontend, the front-end node of Doris. Responsible for metadata management and request access.
|
||||
* BE: Backend, Doris's back-end node. Responsible for query execution and data storage.
|
||||
* Colocation Group (CG): A CG contains one or more tables. Tables within the same group have the same Colocation Group Schema and the same data fragmentation distribution.
|
||||
* Colocation Group Schema (CGS): Used to describe table in a CG and general Schema information related to Colocation. Including bucket column type, bucket number and copy number.
|
||||
|
||||
## Principle
|
||||
|
||||
The Colocation Join function is to make a CG of a set of tables with the same CGS. Ensure that the corresponding data fragments of these tables will fall on the same BE node. When tables in CG perform Join operations on bucket columns, local data Join can be directly performed to reduce data transmission time between nodes.
|
||||
|
||||
The data of a table will eventually fall into a barrel according to the barrel column value Hash and the number of barrels modeled. Assuming that the number of buckets in a table is 8, there are eight buckets `[0, 1, 2, 3, 4, 5, 6, 7] `Buckets'. We call such a sequence a `Buckets Sequence`. Each Bucket has one or more Tablets. When a table is a single partitioned table, there is only one Tablet in a Bucket. If it is a multi-partition table, there will be more than one.
|
||||
|
||||
In order for a table to have the same data distribution, the table in the same CG must ensure the following attributes are the same:
|
||||
|
||||
1. Barrel row and number of barrels
|
||||
|
||||
Bucket column, that is, the column specified in `DISTRIBUTED BY HASH (col1, col2,...)'in the table building statement. Bucket columns determine which column values are used to Hash data from a table into different Tablets. Tables in the same CG must ensure that the type and number of barrel columns are identical, and the number of barrels is identical, so that the data fragmentation of multiple tables can be controlled one by one.
|
||||
|
||||
2. Number of copies
|
||||
|
||||
The number of copies of all partitions of all tables in the same CG must be the same. If inconsistent, there may be a copy of a Tablet, and there is no corresponding copy of other table fragments on the same BE.
|
||||
|
||||
Tables in the same CG do not require consistency in the number, scope, and type of partition columns.
|
||||
|
||||
After fixing the number of bucket columns and buckets, the tables in the same CG will have the same Buckets Sequence. The number of replicas determines the number of replicas of Tablets in each bucket, which BE they are stored on. Suppose that Buckets Sequence is `[0, 1, 2, 3, 4, 5, 6, 7] `, and that BE nodes have `[A, B, C, D] `4. A possible distribution of data is as follows:
|
||||
|
||||
```
|
||||
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
|
||||
| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |
|
||||
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
|
||||
| A | | B | | C | | D | | A | | B | | C | | D |
|
||||
| | | | | | | | | | | | | | | |
|
||||
| B | | C | | D | | A | | B | | C | | D | | A |
|
||||
| | | | | | | | | | | | | | | |
|
||||
| C | | D | | A | | B | | C | | D | | A | | B |
|
||||
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
|
||||
```
|
||||
|
||||
The data of all tables in CG will be uniformly distributed according to the above rules, which ensures that the data with the same barrel column value are on the same BE node, and local data Join can be carried out.
|
||||
|
||||
## Usage
|
||||
|
||||
### Establishment of tables
|
||||
|
||||
When creating a table, you can specify the attribute `"colocate_with"="group_name"` in `PROPERTIES`, which means that the table is a Colocation Join table and belongs to a specified Colocation Group.
|
||||
|
||||
Examples:
|
||||
|
||||
```
|
||||
CREATE TABLE tbl (k1 int, v1 int sum)
|
||||
DISTRIBUTED BY HASH(k1)
|
||||
BUCKETS 8
|
||||
PROPERTIES(
|
||||
"colocate_with" = "group1"
|
||||
);
|
||||
```
|
||||
|
||||
If the specified group does not exist, Doris automatically creates a group that contains only the current table. If the Group already exists, Doris checks whether the current table satisfies the Colocation Group Schema. If satisfied, the table is created and added to the Group. At the same time, tables create fragments and replicas based on existing data distribution rules in Groups.
|
||||
Group belongs to a database, and its name is unique in a database. Internal storage is the full name of Group `dbId_groupName`, but users only perceive groupName.
|
||||
|
||||
### Delete table
|
||||
|
||||
When the last table in Group is deleted completely (deleting completely means deleting from the recycle bin). Usually, when a table is deleted by the `DROP TABLE` command, it will be deleted after the default one-day stay in the recycle bin, and the group will be deleted automatically.
|
||||
|
||||
### View Group
|
||||
|
||||
The following command allows you to view the existing Group information in the cluster.
|
||||
|
||||
```
|
||||
SHOW PROC '/colocation_group';
|
||||
|
||||
+-------------+--------------+--------------+------------+----------------+----------+----------+
|
||||
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
|
||||
+-------------+--------------+--------------+------------+----------------+----------+----------+
|
||||
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
|
||||
+-------------+--------------+--------------+------------+----------------+----------+----------+
|
||||
```
|
||||
|
||||
* GroupId: The unique identity of a group's entire cluster, with DB ID in the first half and group ID in the second half.
|
||||
* GroupName: The full name of Group.
|
||||
* Tablet Ids: The group contains a list of Tables'ID.
|
||||
* Buckets Num: Number of barrels.
|
||||
* Replication Num: Number of copies.
|
||||
* DistCols: Distribution columns,
|
||||
* IsStable: Is the group stable (for the definition of stability, see section `Collocation replica balancing and repair').
|
||||
|
||||
You can further view the data distribution of a group by following commands:
|
||||
|
||||
```
|
||||
SHOW PROC '/colocation_group/10005.10008';
|
||||
|
||||
+-------------+---------------------+
|
||||
| BucketIndex | BackendIds |
|
||||
+-------------+---------------------+
|
||||
| 0 | 10004, 10002, 10001 |
|
||||
| 1 | 10003, 10002, 10004 |
|
||||
| 2 | 10002, 10004, 10001 |
|
||||
| 3 | 10003, 10002, 10004 |
|
||||
| 4 | 10002, 10004, 10003 |
|
||||
| 5 | 10003, 10002, 10001 |
|
||||
| 6 | 10003, 10004, 10001 |
|
||||
| 7 | 10003, 10004, 10002 |
|
||||
+-------------+---------------------+
|
||||
```
|
||||
|
||||
* BucketIndex: Subscript to the bucket sequence.
|
||||
* Backend Ids: A list of BE node IDs where data fragments are located in buckets.
|
||||
|
||||
> The above commands require ADMIN privileges. Normal user view is not supported at this time.
|
||||
|
||||
### Modify Colocate Group
|
||||
|
||||
You can modify the Colocation Group property of a table that has been created. Examples:
|
||||
|
||||
`ALTER TABLE tbl SET ("colocate_with" = "group2");`
|
||||
|
||||
* If the table has not previously specified a Group, the command checks the Schema and adds the table to the Group (if the Group does not exist, it will be created).
|
||||
* If other groups are specified before the table, the command first removes the table from the original group and adds a new group (if the group does not exist, it will be created).
|
||||
|
||||
You can also delete the Colocation attribute of a table by following commands:
|
||||
|
||||
`ALTER TABLE tbl SET ("colocate_with" = "");`
|
||||
|
||||
### Other related operations
|
||||
|
||||
When an ADD PARTITION is added to a table with a Colocation attribute and the number of copies is modified, Doris checks whether the modification violates the Colocation Group Schema and rejects it if it does.
|
||||
|
||||
## Colocation Duplicate Balancing and Repair
|
||||
|
||||
Copy distribution of Colocation tables needs to follow the distribution specified in Group, so it is different from common fragmentation in replica repair and balancing.
|
||||
|
||||
Group itself has a Stable attribute, when Stable is true, which indicates that all fragments of the table in the current Group are not changing, and the Colocation feature can be used normally. When Stable is false, it indicates that some tables in Group are being repaired or migrated. At this time, Colocation Join of related tables will degenerate into ordinary Join.
|
||||
|
||||
### Replica Repair
|
||||
|
||||
Copies can only be stored on specified BE nodes. So when a BE is unavailable (downtime, Decommission, etc.), a new BE is needed to replace it. Doris will first look for the BE with the lowest load to replace it. After replacement, all data fragments on the old BE in the Bucket will be repaired. During the migration process, Group is marked Unstable.
|
||||
|
||||
### Duplicate Equilibrium
|
||||
|
||||
Doris will try to distribute the fragments of the Collocation table evenly across all BE nodes. For the replica balancing of common tables, the granularity is single replica, that is to say, it is enough to find BE nodes with lower load for each replica alone. The equilibrium of the Colocation table is at the Bucket level, where all replicas within a Bucket migrate together. We adopt a simple equalization algorithm, which distributes Buckets Sequence evenly on all BEs, regardless of the actual size of the replicas, but only according to the number of replicas. Specific algorithms can be referred to the code annotations in `ColocateTableBalancer.java`.
|
||||
|
||||
> Note 1: Current Colocation replica balancing and repair algorithms may not work well for heterogeneous deployed Oris clusters. The so-called heterogeneous deployment, that is, the BE node's disk capacity, number, disk type (SSD and HDD) is inconsistent. In the case of heterogeneous deployment, small BE nodes and large BE nodes may store the same number of replicas.
|
||||
>
|
||||
> Note 2: When a group is in an Unstable state, the Join of the table in it will degenerate into a normal Join. At this time, the query performance of the cluster may be greatly reduced. If you do not want the system to balance automatically, you can set the FE configuration item `disable_colocate_balance` to prohibit automatic balancing. Then open it at the right time. (See Section `Advanced Operations` for details)
|
||||
|
||||
## Query
|
||||
|
||||
The Colocation table is queried in the same way as ordinary tables, and users do not need to perceive Colocation attributes. If the Group in which the Colocation table is located is in an Unstable state, it will automatically degenerate to a normal Join.
|
||||
|
||||
Examples are given to illustrate:
|
||||
|
||||
Table 1:
|
||||
|
||||
```
|
||||
CREATE TABLE `tbl1` (
|
||||
`k1` date NOT NULL COMMENT "",
|
||||
`k2` int(11) NOT NULL COMMENT "",
|
||||
`v1` int(11) SUM NOT NULL COMMENT ""
|
||||
) ENGINE=OLAP
|
||||
AGGREGATE KEY(`k1`, `k2`)
|
||||
PARTITION BY RANGE(`k1`)
|
||||
(
|
||||
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
|
||||
PARTITION p2 VALUES LESS THAN ('2019-06-30')
|
||||
)
|
||||
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
|
||||
PROPERTIES (
|
||||
"colocate_with" = "group1"
|
||||
);
|
||||
```
|
||||
|
||||
Table 2:
|
||||
|
||||
```
|
||||
CREATE TABLE `tbl2` (
|
||||
`k1` datetime NOT NULL COMMENT "",
|
||||
`k2` int(11) NOT NULL COMMENT "",
|
||||
`v1` double SUM NOT NULL COMMENT ""
|
||||
) ENGINE=OLAP
|
||||
AGGREGATE KEY(`k1`, `k2`)
|
||||
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
|
||||
PROPERTIES (
|
||||
"colocate_with" = "group1"
|
||||
);
|
||||
```
|
||||
|
||||
View the query plan:
|
||||
|
||||
```
|
||||
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
|
||||
|
||||
+----------------------------------------------------+
|
||||
| Explain String |
|
||||
+----------------------------------------------------+
|
||||
| PLAN FRAGMENT 0 |
|
||||
| OUTPUT EXPRS:`tbl1`.`k1` | |
|
||||
| PARTITION: RANDOM |
|
||||
| |
|
||||
| RESULT SINK |
|
||||
| |
|
||||
| 2:HASH JOIN |
|
||||
| | join op: INNER JOIN |
|
||||
| | hash predicates: |
|
||||
| | colocate: true |
|
||||
| | `tbl1`.`k2` = `tbl2`.`k2` |
|
||||
| | tuple ids: 0 1 |
|
||||
| | |
|
||||
| |----1:OlapScanNode |
|
||||
| | TABLE: tbl2 |
|
||||
| | PREAGGREGATION: OFF. Reason: null |
|
||||
| | partitions=0/1 |
|
||||
| | rollup: null |
|
||||
| | buckets=0/0 |
|
||||
| | cardinality=-1 |
|
||||
| | avgRowSize=0.0 |
|
||||
| | numNodes=0 |
|
||||
| | tuple ids: 1 |
|
||||
| | |
|
||||
| 0:OlapScanNode |
|
||||
| TABLE: tbl1 |
|
||||
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
|
||||
| partitions=0/2 |
|
||||
| rollup: null |
|
||||
| buckets=0/0 |
|
||||
| cardinality=-1 |
|
||||
| avgRowSize=0.0 |
|
||||
| numNodes=0 |
|
||||
| tuple ids: 0 |
|
||||
+----------------------------------------------------+
|
||||
```
|
||||
|
||||
If Colocation Join works, the Hash Join Node will show `colocate: true`.
|
||||
|
||||
If not, the query plan is as follows:
|
||||
|
||||
```
|
||||
+----------------------------------------------------+
|
||||
| Explain String |
|
||||
+----------------------------------------------------+
|
||||
| PLAN FRAGMENT 0 |
|
||||
| OUTPUT EXPRS:`tbl1`.`k1` | |
|
||||
| PARTITION: RANDOM |
|
||||
| |
|
||||
| RESULT SINK |
|
||||
| |
|
||||
| 2:HASH JOIN |
|
||||
| | join op: INNER JOIN (BROADCAST) |
|
||||
| | hash predicates: |
|
||||
| | colocate: false, reason: group is not stable |
|
||||
| | `tbl1`.`k2` = `tbl2`.`k2` |
|
||||
| | tuple ids: 0 1 |
|
||||
| | |
|
||||
| |----3:EXCHANGE |
|
||||
| | tuple ids: 1 |
|
||||
| | |
|
||||
| 0:OlapScanNode |
|
||||
| TABLE: tbl1 |
|
||||
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
|
||||
| partitions=0/2 |
|
||||
| rollup: null |
|
||||
| buckets=0/0 |
|
||||
| cardinality=-1 |
|
||||
| avgRowSize=0.0 |
|
||||
| numNodes=0 |
|
||||
| tuple ids: 0 |
|
||||
| |
|
||||
| PLAN FRAGMENT 1 |
|
||||
| OUTPUT EXPRS: |
|
||||
| PARTITION: RANDOM |
|
||||
| |
|
||||
| STREAM DATA SINK |
|
||||
| EXCHANGE ID: 03 |
|
||||
| UNPARTITIONED |
|
||||
| |
|
||||
| 1:OlapScanNode |
|
||||
| TABLE: tbl2 |
|
||||
| PREAGGREGATION: OFF. Reason: null |
|
||||
| partitions=0/1 |
|
||||
| rollup: null |
|
||||
| buckets=0/0 |
|
||||
| cardinality=-1 |
|
||||
| avgRowSize=0.0 |
|
||||
| numNodes=0 |
|
||||
| tuple ids: 1 |
|
||||
+----------------------------------------------------+
|
||||
```
|
||||
|
||||
The HASH JOIN node displays the corresponding reason: `colocate: false, reason: group is not stable`. At the same time, an EXCHANGE node will be generated.
|
||||
|
||||
|
||||
## Advanced Operations
|
||||
|
||||
### FE Configuration Item
|
||||
|
||||
* disable\_colocate\_relocate
|
||||
|
||||
Whether to close Doris's automatic Colocation replica repair. The default is false, i.e. not closed. This parameter only affects the replica repair of the Colocation table, but does not affect the normal table.
|
||||
|
||||
* disable\_colocate\_balance
|
||||
|
||||
Whether to turn off automatic Colocation replica balancing for Doris. The default is false, i.e. not closed. This parameter only affects the replica balance of the Collocation table, but does not affect the common table.
|
||||
|
||||
User can set these configurations at runtime. See `HELP ADMIN SHOW CONFIG;` and `HELP ADMIN SET CONFIG;`.
|
||||
|
||||
* disable\_colocate\_join
|
||||
|
||||
Whether to turn off the Colocation Join function or not. In 0.10 and previous versions, the default is true, that is, closed. In a later version, it will default to false, that is, open.
|
||||
|
||||
* use\_new\_tablet\_scheduler
|
||||
|
||||
In 0.10 and previous versions, the new replica scheduling logic is incompatible with the Colocation Join function, so in 0.10 and previous versions, if `disable_colocate_join = false`, you need to set `use_new_tablet_scheduler = false`, that is, close the new replica scheduler. In later versions, `use_new_tablet_scheduler` will be equal to true.
|
||||
|
||||
###HTTP Restful API
|
||||
|
||||
Doris provides several HTTP Restful APIs related to Colocation Join for viewing and modifying Colocation Group.
|
||||
|
||||
The API is implemented on the FE side and accessed using `fe_host: fe_http_port`. ADMIN privileges are required.
|
||||
|
||||
1. View all Colocation information for the cluster
|
||||
|
||||
```
|
||||
GET /api/colocate
|
||||
|
||||
Return the internal Colocation info in JSON format:
|
||||
|
||||
{
|
||||
"msg": "success",
|
||||
"code": 0,
|
||||
"data": {
|
||||
"infos": [
|
||||
["10003.12002", "10003_group1", "10037, 10043", "1", "1", "int(11)", "true"]
|
||||
],
|
||||
"unstableGroupIds": [],
|
||||
"allGroupIds": [{
|
||||
"dbId": 10003,
|
||||
"grpId": 12002
|
||||
}]
|
||||
},
|
||||
"count": 0
|
||||
}
|
||||
```
|
||||
2. Mark Group as Stable or Unstable
|
||||
|
||||
* Mark as Stable
|
||||
|
||||
```
|
||||
POST /api/colocate/group_stable?db_id=10005&group_id=10008
|
||||
|
||||
Returns: 200
|
||||
```
|
||||
|
||||
* Mark as Unstable
|
||||
|
||||
```
|
||||
DELETE /api/colocate/group_stable?db_id=10005&group_id=10008
|
||||
|
||||
Returns: 200
|
||||
```
|
||||
|
||||
3. Setting Data Distribution for Group
|
||||
|
||||
The interface can force the number distribution of a group.
|
||||
|
||||
```
|
||||
POST /api/colocate/bucketseq?db_id=10005&group_id=10008
|
||||
|
||||
Body:
|
||||
[[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]
|
||||
|
||||
Returns: 200
|
||||
```
|
||||
Body is a Buckets Sequence represented by a nested array and the ID of the BE where the fragments are distributed in each Bucket.
|
||||
|
||||
Note that using this command, you may need to set the FE configuration `disable_colocate_relocate` and `disable_colocate_balance` to true. That is to shut down the system for automatic Colocation replica repair and balancing. Otherwise, it may be automatically reset by the system after modification.
|
||||
|
||||
@ -26,3 +26,134 @@ under the License.
|
||||
|
||||
# Orthogonal BITMAP calculation
|
||||
|
||||
## Background
|
||||
|
||||
The original bitmap aggregate function designed by Doris is more general, but it has poor performance for the intersection and union of bitmap large cardinality above 100 million level. There are two main reasons for checking the bitmap aggregate function logic of the back-end be. First, when the bitmap cardinality is large, if the bitmap data size exceeds 1g, the network / disk IO processing time is relatively long; second, after the scan data, all the back-end be instances are transmitted to the top-level node for intersection and union operation, which brings pressure on the top-level single node and becomes the processing bottleneck.
|
||||
|
||||
The solution is to divide the bitmap column values according to the range, and the values of different ranges are stored in different buckets, so as to ensure that the bitmap values of different buckets are orthogonal and the data distribution is more uniform. In the case of query, the orthogonal bitmap in different buckets is firstly aggregated and calculated, and then the top-level node directly combines and summarizes the aggregated calculated values and outputs them. This will greatly improve the computing efficiency and solve the bottleneck problem of the top single node computing.
|
||||
|
||||
## User guide
|
||||
|
||||
1. Create a table and add hid column to represent bitmap column value ID range as hash bucket column
|
||||
2. Usage scenarios
|
||||
|
||||
### Create table
|
||||
|
||||
We need to use the aggregation model when building tables. The data type is bitmap, and the aggregation function is bitmap_ union
|
||||
```
|
||||
CREATE TABLE `user_tag_bitmap` (
|
||||
`tag` bigint(20) NULL COMMENT "user tag",
|
||||
`hid` smallint(6) NULL COMMENT "Bucket ID",
|
||||
`user_id` bitmap BITMAP_UNION NULL COMMENT ""
|
||||
) ENGINE=OLAP
|
||||
AGGREGATE KEY(`tag`, `hid`)
|
||||
COMMENT "OLAP"
|
||||
DISTRIBUTED BY HASH(`hid`) BUCKETS 3
|
||||
```
|
||||
The HID column is added to the table schema to indicate the ID range as a hash bucket column.
|
||||
|
||||
Note: the HID number and buckets should be set reasonably, and the HID number should be set at least 5 times of buckets, so as to make the data hash bucket division as balanced as possible
|
||||
|
||||
|
||||
### Data Load
|
||||
|
||||
```
|
||||
LOAD LABEL user_tag_bitmap_test
|
||||
(
|
||||
DATA INFILE('hdfs://abc')
|
||||
INTO TABLE user_tag_bitmap
|
||||
COLUMNS TERMINATED BY ','
|
||||
(tmp_tag, tmp_user_id)
|
||||
SET (
|
||||
tag = tmp_tag,
|
||||
hid = ceil(tmp_user_id/5000000),
|
||||
user_id = to_bitmap(tmp_user_id)
|
||||
)
|
||||
)
|
||||
...
|
||||
```
|
||||
|
||||
Data format:
|
||||
|
||||
```
|
||||
11111111,1
|
||||
11111112,2
|
||||
11111113,3
|
||||
11111114,4
|
||||
...
|
||||
```
|
||||
|
||||
Note: the first column represents the user tags, which have been converted from Chinese into numbers
|
||||
|
||||
When loading data, vertically cut the bitmap value range of the user. For example, the hid value of the user ID in the range of 1-5000000 is the same, and the row with the same HID value will be allocated into a sub-bucket, so that the bitmap value in each sub-bucket is orthogonal. On the UDAF implementation of bitmap, the orthogonal feature of bitmap value in the bucket can be used to perform intersection union calculation, and the calculation results will be shuffled to the top node for aggregation.
|
||||
|
||||
#### orthogonal_bitmap_intersect
|
||||
|
||||
The bitmap intersection function
|
||||
|
||||
Syntax:
|
||||
|
||||
orthogonal_bitmap_intersect(bitmap_column, column_to_filter, filter_values)
|
||||
|
||||
Parameters:
|
||||
|
||||
the first parameter is the bitmap column, the second parameter is the dimension column for filtering, and the third parameter is the variable length parameter, which means different values of the filter dimension column
|
||||
|
||||
Explain:
|
||||
|
||||
on the basis of this table schema, this function has two levels of aggregation in query planning. In the first layer, be nodes (update and serialize) first press filter_ Values are used to hash aggregate the keys, and then the bitmaps of all keys are intersected. The results are serialized and sent to the second level be nodes (merge and finalize). In the second level be nodes, all the bitmap values from the first level nodes are combined circularly
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
select BITMAP_COUNT(orthogonal_bitmap_intersect(user_id, tag, 13080800, 11110200)) from user_tag_bitmap where tag in (13080800, 11110200);
|
||||
|
||||
```
|
||||
|
||||
#### orthogonal_bitmap_intersect_count
|
||||
|
||||
To calculate the bitmap intersection count function, the syntax is the same as the original Intersect_Count, but the implementation is different
|
||||
|
||||
Syntax:
|
||||
|
||||
orthogonal_bitmap_intersect_count(bitmap_column, column_to_filter, filter_values)
|
||||
|
||||
Parameters:
|
||||
|
||||
The first parameter is the bitmap column, the second parameter is the dimension column for filtering, and the third parameter is the variable length parameter, which means different values of the filter dimension column
|
||||
|
||||
Explain:
|
||||
|
||||
on the basis of this table schema, the query planning aggregation is divided into two layers. In the first layer, be nodes (update and serialize) first press filter_ Values are used to hash aggregate the keys, and then the intersection of bitmaps of all keys is performed, and then the intersection results are counted. The count values are serialized and sent to the second level be nodes (merge and finalize). In the second level be nodes, the sum of all the count values from the first level nodes is calculated circularly
|
||||
|
||||
|
||||
#### orthogonal_bitmap_union_count
|
||||
|
||||
Figure out the bitmap union count function, syntax with the original bitmap_union_count, but the implementation is different.
|
||||
|
||||
Syntax:
|
||||
|
||||
orthogonal_bitmap_union_count(bitmap_column)
|
||||
|
||||
Explain:
|
||||
|
||||
on the basis of this table schema, this function is divided into two layers. In the first layer, be nodes (update and serialize) merge all the bitmaps, and then count the resulting bitmaps. The count values are serialized and sent to the second level be nodes (merge and finalize). In the second layer, the be nodes are used to calculate the sum of all the count values from the first level nodes
|
||||
|
||||
|
||||
### Suitable for the scene
|
||||
|
||||
It is consistent with the scenario of orthogonal calculation of bitmap, such as calculation retention, funnel, user portrait, etc.
|
||||
|
||||
Crowd selection:
|
||||
|
||||
```
|
||||
select orthogonal_bitmap_intersect_count(user_id, tag, 13080800, 11110200) from user_tag_bitmap where tag in (13080800, 11110200);
|
||||
|
||||
Note: 13080800 and 11110200 represent user labels
|
||||
```
|
||||
|
||||
Calculate the deduplication value for user_id:
|
||||
|
||||
```
|
||||
select orthogonal_bitmap_union_count(user_id) from user_tag_bitmap where tag in (13080800, 11110200);
|
||||
```
|
||||
|
||||
@ -26,3 +26,263 @@ under the License.
|
||||
|
||||
# Temporary partition
|
||||
|
||||
Since version 0.12, Doris supports temporary partitioning.
|
||||
|
||||
A temporary partition belongs to a partitioned table. Only partitioned tables can create temporary partitions.
|
||||
|
||||
## Rules
|
||||
|
||||
* The partition columns of the temporary partition is the same as the formal partition and cannot be modified.
|
||||
* The partition ranges of all temporary partitions of a table cannot overlap, but the ranges of temporary partitions and formal partitions can overlap.
|
||||
* The partition name of the temporary partition cannot be the same as the formal partitions and other temporary partitions.
|
||||
|
||||
## Supported operations
|
||||
|
||||
The temporary partition supports add, delete, and replace operations.
|
||||
|
||||
### Add temporary partition
|
||||
|
||||
You can add temporary partitions to a table with the `ALTER TABLE ADD TEMPORARY PARTITION` statement:
|
||||
|
||||
```
|
||||
ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp1 VALUES LESS THAN ("2020-02-01");
|
||||
|
||||
ALTER TABLE tbl2 ADD TEMPORARY PARTITION tp1 VALUES [("2020-01-01"), ("2020-02-01"));
|
||||
|
||||
ALTER TABLE tbl1 ADD TEMPORARY PARTITION tp1 VALUES LESS THAN ("2020-02-01")
|
||||
("in_memory" = "true", "replication_num" = "1")
|
||||
DISTRIBUTED BY HASH (k1) BUCKETS 5;
|
||||
|
||||
ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai");
|
||||
|
||||
ALTER TABLE tbl4 ADD TEMPORARY PARTITION tp1 VALUES IN ((1, "Beijing"), (1, "Shanghai"));
|
||||
|
||||
ALTER TABLE tbl3 ADD TEMPORARY PARTITION tp1 VALUES IN ("Beijing", "Shanghai")
|
||||
("in_memory" = "true", "replication_num" = "1")
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 5;
|
||||
|
||||
```
|
||||
|
||||
See `HELP ALTER TABLE;` for more help and examples.
|
||||
|
||||
Some instructions for adding operations:
|
||||
|
||||
* Adding a temporary partition is similar to adding a formal partition. The partition range of the temporary partition is independent of the formal partition.
|
||||
* Temporary partition can independently specify some attributes. Includes information such as the number of buckets, the number of replicas, whether it is a memory table, or the storage medium.
|
||||
|
||||
### Delete temporary partition
|
||||
|
||||
A table's temporary partition can be dropped with the `ALTER TABLE DROP TEMPORARY PARTITION` statement:
|
||||
|
||||
```
|
||||
ALTER TABLE tbl1 DROP TEMPORARY PARTITION tp1;
|
||||
```
|
||||
|
||||
See `HELP ALTER TABLE;` for more help and examples.
|
||||
|
||||
Some instructions for the delete operation:
|
||||
|
||||
* Deleting the temporary partition will not affect the data of the formal partition.
|
||||
|
||||
### Replace partition
|
||||
|
||||
You can replace formal partitions of a table with temporary partitions with the `ALTER TABLE REPLACE PARTITION` statement.
|
||||
|
||||
```
|
||||
ALTER TABLE tbl1 REPLACE PARTITION (p1) WITH TEMPORARY PARTITION (tp1);
|
||||
|
||||
ALTER TABLE tbl1 REPLACE PARTITION (p1, p2) WITH TEMPORARY PARTITION (tp1, tp2, tp3);
|
||||
|
||||
ALTER TABLE tbl1 REPLACE PARTITION (p1, p2) WITH TEMPORARY PARTITION (tp1, tp2)
|
||||
PROPERTIES (
|
||||
"strict_range" = "false",
|
||||
"use_temp_partition_name" = "true"
|
||||
);
|
||||
```
|
||||
|
||||
See `HELP ALTER TABLE;` for more help and examples.
|
||||
|
||||
The replace operation has two special optional parameters:
|
||||
|
||||
1. `strict_range`
|
||||
|
||||
The default is true.
|
||||
|
||||
For Range partition, When this parameter is true, the range union of all formal partitions to be replaced needs to be the same as the range union of the temporary partitions to be replaced. When set to false, you only need to ensure that the range between the new formal partitions does not overlap after replacement.
|
||||
|
||||
For List partition, this parameter is always true, and the enumeration values of all full partitions to be replaced must be identical to the enumeration values of the temporary partitions to be replaced.
|
||||
|
||||
Here are some examples:
|
||||
|
||||
* Example 1
|
||||
|
||||
Range of partitions p1, p2, p3 to be replaced (=> union):
|
||||
|
||||
```
|
||||
(10, 20), [20, 30), [40, 50) => [10, 30), [40, 50)
|
||||
```
|
||||
|
||||
Replace the range of partitions tp1, tp2 (=> union):
|
||||
|
||||
```
|
||||
(10, 30), [40, 45), [45, 50) => [10, 30), [40, 50)
|
||||
```
|
||||
|
||||
The union of ranges is the same, so you can use tp1 and tp2 to replace p1, p2, p3.
|
||||
|
||||
* Example 2
|
||||
|
||||
Range of partition p1 to be replaced (=> union):
|
||||
|
||||
```
|
||||
(10, 50) => [10, 50)
|
||||
```
|
||||
|
||||
Replace the range of partitions tp1, tp2 (=> union):
|
||||
|
||||
```
|
||||
(10, 30), [40, 50) => [10, 30), [40, 50)
|
||||
```
|
||||
|
||||
The union of ranges is not the same. If `strict_range` is true, you cannot use tp1 and tp2 to replace p1. If false, and the two partition ranges `[10, 30), [40, 50)` and the other formal partitions do not overlap, they can be replaced.
|
||||
|
||||
* Example 3
|
||||
|
||||
Enumerated values of partitions p1, p2 to be replaced (=> union).
|
||||
|
||||
```
|
||||
(1, 2, 3), (4, 5, 6) => (1, 2, 3, 4, 5, 6)
|
||||
```
|
||||
|
||||
Replace the enumerated values of partitions tp1, tp2, tp3 (=> union).
|
||||
|
||||
```
|
||||
(1, 2, 3), (4), (5, 6) => (1, 2, 3, 4, 5, 6)
|
||||
```
|
||||
|
||||
The enumeration values are the same, you can use tp1, tp2, tp3 to replace p1, p2
|
||||
|
||||
* Example 4
|
||||
|
||||
Enumerated values of partitions p1, p2, p3 to be replaced (=> union).
|
||||
|
||||
```
|
||||
(("1", "beijing"), ("1", "shanghai")), (("2", "beijing"), ("2", "shanghai")), (("3", "beijing"), ("3", "shanghai")) => (("1", "beijing"), ("3", "shanghai")) "), ("1", "shanghai"), ("2", "beijing"), ("2", "shanghai"), ("3", "beijing"), ("3", "shanghai"))
|
||||
```
|
||||
|
||||
Replace the enumerated values of partitions tp1, tp2 (=> union).
|
||||
|
||||
```
|
||||
(("1", "beijing"), ("1", "shanghai")), (("2", "beijing"), ("2", "shanghai"), ("3", "beijing"), ("3", "shanghai")) => (("1", "beijing") , ("1", "shanghai"), ("2", "beijing"), ("2", "shanghai"), ("3", "beijing"), ("3", "shanghai"))
|
||||
```
|
||||
|
||||
The enumeration values are the same, you can use tp1, tp2 to replace p1, p2, p3
|
||||
|
||||
2. `use_temp_partition_name`
|
||||
|
||||
The default is false. When this parameter is false, and the number of partitions to be replaced is the same as the number of replacement partitions, the name of the formal partition after the replacement remains unchanged. If true, after replacement, the name of the formal partition is the name of the replacement partition. Here are some examples:
|
||||
|
||||
* Example 1
|
||||
|
||||
```
|
||||
ALTER TABLE tbl1 REPLACE PARTITION (p1) WITH TEMPORARY PARTITION (tp1);
|
||||
```
|
||||
|
||||
`use_temp_partition_name` is false by default. After replacement, the partition name is still p1, but the related data and attributes are replaced with tp1.
|
||||
|
||||
If `use_temp_partition_name` is true by default, the name of the partition is tp1 after replacement. The p1 partition no longer exists.
|
||||
|
||||
* Example 2
|
||||
|
||||
```
|
||||
ALTER TABLE tbl1 REPLACE PARTITION (p1, p2) WITH TEMPORARY PARTITION (tp1);
|
||||
```
|
||||
|
||||
`use_temp_partition_name` is false by default, but this parameter is invalid because the number of partitions to be replaced and the number of replacement partitions are different. After the replacement, the partition name is tp1, and p1 and p2 no longer exist.
|
||||
|
||||
Some instructions for the replacement operation:
|
||||
|
||||
* After the partition is replaced successfully, the replaced partition will be deleted and cannot be recovered.
|
||||
|
||||
## Load and query of temporary partitions
|
||||
|
||||
Users can load data into temporary partitions or specify temporary partitions for querying.
|
||||
|
||||
1. Load temporary partition
|
||||
|
||||
The syntax for specifying a temporary partition is slightly different depending on the load method. Here is a simple illustration through an example:
|
||||
|
||||
```
|
||||
INSERT INTO tbl TEMPORARY PARTITION (tp1, tp2, ...) SELECT ....
|
||||
```
|
||||
|
||||
```
|
||||
curl --location-trusted -u root: -H "label: 123" -H "temporary_partition: tp1, tp2, ..." -T testData http: // host: port / api / testDb / testTbl / _stream_load
|
||||
```
|
||||
|
||||
```
|
||||
LOAD LABEL example_db.label1
|
||||
(
|
||||
DATA INFILE ("hdfs: // hdfs_host: hdfs_port / user / palo / data / input / file")
|
||||
INTO TABLE `my_table`
|
||||
TEMPORARY PARTITION (tp1, tp2, ...)
|
||||
...
|
||||
)
|
||||
WITH BROKER hdfs ("username" = "hdfs_user", "password" = "hdfs_password");
|
||||
```
|
||||
|
||||
```
|
||||
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
|
||||
COLUMNS (k1, k2, k3, v1, v2, v3 = k1 * 100),
|
||||
TEMPORARY PARTITIONS (tp1, tp2, ...),
|
||||
WHERE k1> 100
|
||||
PROPERTIES
|
||||
(...)
|
||||
FROM KAFKA
|
||||
(...);
|
||||
```
|
||||
|
||||
2. Query the temporary partition
|
||||
|
||||
```
|
||||
SELECT ... FROM
|
||||
tbl1 TEMPORARY PARTITION (tp1, tp2, ...)
|
||||
JOIN
|
||||
tbl2 TEMPORARY PARTITION (tp1, tp2, ...)
|
||||
ON ...
|
||||
WHERE ...;
|
||||
```
|
||||
|
||||
## Relationship to other operations
|
||||
|
||||
### DROP
|
||||
|
||||
* After using the `DROP` operation to directly drop the database or table, you can recover the database or table (within a limited time) through the `RECOVER` command, but the temporary partition will not be recovered.
|
||||
* After the formal partition is dropped using the `ALTER` command, the partition can be recovered by the `RECOVER` command (within a limited time). Operating a formal partition is not related to a temporary partition.
|
||||
* After the temporary partition is dropped using the `ALTER` command, the temporary partition cannot be recovered through the `RECOVER` command.
|
||||
|
||||
### TRUNCATE
|
||||
|
||||
* Use the `TRUNCATE` command to empty the table. The temporary partition of the table will be deleted and cannot be recovered.
|
||||
* When using `TRUNCATE` command to empty the formal partition, it will not affect the temporary partition.
|
||||
* You cannot use the `TRUNCATE` command to empty the temporary partition.
|
||||
|
||||
### ALTER
|
||||
|
||||
* When the table has a temporary partition, you cannot use the `ALTER` command to perform Schema Change, Rollup, etc. on the table.
|
||||
* You cannot add temporary partitions to a table while the table is undergoing a alter operation.
|
||||
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. Atomic overwrite
|
||||
|
||||
In some cases, the user wants to be able to rewrite the data of a certain partition, but if it is dropped first and then loaded, there will be a period of time when the data cannot be seen. At this moment, the user can first create a corresponding temporary partition, load new data into the temporary partition, and then replace the original partition atomically through the `REPLACE` operation to achieve the purpose. For atomic overwrite operations of non-partitioned tables, please refer to [Replace Table Document](./alter-table-replace-table.md)
|
||||
|
||||
2. Modify the number of buckets
|
||||
|
||||
In some cases, the user used an inappropriate number of buckets when creating a partition. The user can first create a temporary partition corresponding to the partition range and specify a new number of buckets. Then use the `INSERT INTO` command to load the data of the formal partition into the temporary partition. Through the replacement operation, the original partition is replaced atomically to achieve the purpose.
|
||||
|
||||
3. Merge or split partitions
|
||||
|
||||
In some cases, users want to modify the range of partitions, such as merging two partitions, or splitting a large partition into multiple smaller partitions. Then the user can first create temporary partitions corresponding to the merged or divided range, and then load the data of the formal partition into the temporary partition through the `INSERT INTO` command. Through the replacement operation, the original partition is replaced atomically to achieve the purpose.
|
||||
@ -1,6 +1,6 @@
|
||||
---
|
||||
{
|
||||
"title": "Resource Management",
|
||||
"title": "Resource management",
|
||||
"language": "en"
|
||||
}
|
||||
---
|
||||
@ -24,4 +24,131 @@ specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# Resource Management
|
||||
# Resource Management
|
||||
|
||||
In order to save the compute and storage resources in the Doris cluster, Doris needs to reference to some other external resources to do the related work. such as spark/GPU for query, HDFS/S3 for external storage, spark/MapReduce for ETL, connect to external storage by ODBC driver. Therefore, Doris need a resource management mechanism to manage these external resources.
|
||||
|
||||
## Fundamental Concept
|
||||
|
||||
A resource contains basic information such as name and type. The name is globally unique. Different types of resources contain different attributes. Please refer to the introduction of each resource for details.
|
||||
|
||||
The creation and deletion of resources can only be performed by users own `admin` permission. One resource belongs to the entire Doris cluster. Users with `admin` permission can assign permission of resource to other users. Please refer to `HELP GRANT` or doris document.
|
||||
|
||||
|
||||
## Operation Of Resource
|
||||
|
||||
There are three main commands for resource management: `create resource`, `drop resource` and `show resources`. They are to create, delete and check resources. The specific syntax of these three commands can be viewed by executing `help CMD` after MySQL client connects to Doris.
|
||||
|
||||
1. CREATE RESOURCE
|
||||
|
||||
This statement is used to create a resource. For details, please refer to [CREATE RESOURCE](../sql-manual/sql-reference-v2/Data-Definition-Statements/Create/CREATE-RESOURCE.html).
|
||||
|
||||
2. DROP RESOURCE
|
||||
|
||||
This command can delete an existing resource. For details, see [DROP RESOURCE](../sql-manual/sql-reference-v2/Data-Definition-Statements/Drop/DROP-RESOURCE.html).
|
||||
|
||||
3. SHOW RESOURCES
|
||||
|
||||
This command can view the resources that the user has permission to use. For details, see [SHOW RESOURCES](../sql-manual/sql-reference-v2/Show-Statements/SHOW-RESOURCES.html).
|
||||
|
||||
## Resources Supported
|
||||
|
||||
Currently, Doris can support
|
||||
|
||||
* Spark resource: do ETL work
|
||||
* ODBC resource: query and import data from external tables
|
||||
|
||||
The following shows how the two resources are used.
|
||||
|
||||
### Spark
|
||||
|
||||
#### Parameter
|
||||
|
||||
##### Spark Parameters:
|
||||
|
||||
`spark.master`: required, currently supported yarn, spark://host:port.
|
||||
|
||||
`spark.submit.deployMode`: The deployment mode of spark. required. It supports cluster and client.
|
||||
|
||||
`spark.hadoop.yarn.resourcemanager.address`: required when master is yarn.
|
||||
|
||||
`spark.hadoop.fs.defaultFS`: required when master is yarn.
|
||||
|
||||
Other parameters are optional, refer to: http://spark.apache.org/docs/latest/configuration.html.
|
||||
|
||||
##### If spark is used for ETL, also need to specify the following parameters:
|
||||
|
||||
`working_dir`: Directory used by ETL. Spark is required when used as an ETL resource. For example: hdfs://host:port/tmp/doris.
|
||||
|
||||
`broker`: The name of broker. Is required when spark be used as ETL resource. You need to use the `ALTER SYSTEM ADD BROKER` command to complete the configuration in advance.
|
||||
|
||||
* `broker.property_key`: When the broker reads the intermediate file generated by ETL, it needs the specified authentication information.
|
||||
|
||||
|
||||
|
||||
#### Example
|
||||
|
||||
Create a spark resource named `spark0 `in the yarn cluster mode.
|
||||
|
||||
|
||||
```sql
|
||||
CREATE EXTERNAL RESOURCE "spark0"
|
||||
PROPERTIES
|
||||
(
|
||||
"type" = "spark",
|
||||
"spark.master" = "yarn",
|
||||
"spark.submit.deployMode" = "cluster",
|
||||
"spark.jars" = "xxx.jar,yyy.jar",
|
||||
"spark.files" = "/tmp/aaa,/tmp/bbb",
|
||||
"spark.executor.memory" = "1g",
|
||||
"spark.yarn.queue" = "queue0",
|
||||
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
|
||||
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
|
||||
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
|
||||
"broker" = "broker0",
|
||||
"broker.username" = "user0",
|
||||
"broker.password" = "password0"
|
||||
);
|
||||
```
|
||||
|
||||
### ODBC
|
||||
|
||||
#### Parameter
|
||||
|
||||
##### ODBC Parameters:
|
||||
|
||||
`type`: Required, must be `odbc_catalog`. As the type identifier of resource.
|
||||
|
||||
`user`: The user name of the external table, required.
|
||||
|
||||
`password`: The user password of the external table, required.
|
||||
|
||||
`host`: The ip address of the external table, required.
|
||||
|
||||
`port`: The port of the external table, required.
|
||||
|
||||
`odbc_type`: Indicates the type of external table. Currently, Doris supports `MySQL` and `Oracle`. In the future, it may support more databases. The ODBC external table referring to the resource is required. The old MySQL external table referring to the resource is optional.
|
||||
|
||||
`driver`: Indicates the driver dynamic library used by the ODBC external table.
|
||||
The ODBC external table referring to the resource is required. The old MySQL external table referring to the resource is optional.
|
||||
|
||||
For the usage of ODBC resource, please refer to [ODBC of Doris](../extending-doris/odbc-of-doris.html)
|
||||
|
||||
|
||||
#### Example
|
||||
|
||||
Create the ODBC resource of Oracle, named `oracle_odbc`.
|
||||
|
||||
```sql
|
||||
CREATE EXTERNAL RESOURCE `oracle_odbc`
|
||||
PROPERTIES (
|
||||
"type" = "odbc_catalog",
|
||||
"host" = "192.168.0.1",
|
||||
"port" = "8086",
|
||||
"user" = "test",
|
||||
"password" = "test",
|
||||
"database" = "test",
|
||||
"odbc_type" = "oracle",
|
||||
"driver" = "Oracle 19 ODBC driver"
|
||||
);
|
||||
```
|
||||
|
||||
@ -26,3 +26,71 @@ under the License.
|
||||
|
||||
# Time zone
|
||||
|
||||
Doris supports multiple time zone settings
|
||||
|
||||
## Noun Interpretation
|
||||
|
||||
* FE: Frontend, the front-end node of Doris. Responsible for metadata management and request access.
|
||||
* BE: Backend, Doris's back-end node. Responsible for query execution and data storage.
|
||||
|
||||
## Basic concepts
|
||||
|
||||
There are multiple time zone related parameters in Doris
|
||||
|
||||
* `system_time_zone`:
|
||||
|
||||
When the server starts, it will be set automatically according to the time zone set by the machine, which cannot be modified after setting.
|
||||
|
||||
* `time_zone`:
|
||||
|
||||
Server current time zone, set it at session level or global level.
|
||||
|
||||
## Specific operations
|
||||
|
||||
1. `SHOW VARIABLES LIKE '% time_zone%'`
|
||||
|
||||
View the current time zone related configuration
|
||||
|
||||
2. `SET time_zone = 'Asia/Shanghai'`
|
||||
|
||||
This command can set the session level time zone, which will fail after disconnection.
|
||||
|
||||
3. `SET global time_zone = 'Asia/Shanghai'`
|
||||
|
||||
This command can set time zone parameters at the global level. The FE will persist the parameters and will not fail when the connection is disconnected.
|
||||
|
||||
### Impact of time zone
|
||||
|
||||
Time zone setting affects the display and storage of time zone sensitive values.
|
||||
|
||||
It includes the values displayed by time functions such as `NOW()` or `CURTIME()`, as well as the time values in `SHOW LOAD` and `SHOW BACKENDS` statements.
|
||||
|
||||
However, it does not affect the `LESS THAN VALUE` of the time-type partition column in the `CREATE TABLE` statement, nor does it affect the display of values stored as `DATE/DATETIME` type.
|
||||
|
||||
Functions affected by time zone:
|
||||
|
||||
* `FROM_UNIXTIME`: Given a UTC timestamp, return the date and time of the specified time zone, such as `FROM_UNIXTIME(0)`, return the CST time zone: `1970-01-08:00`.
|
||||
|
||||
* `UNIX_TIMESTAMP`: Given a specified time zone date and time, return UTC timestamp, such as CST time zone `UNIX_TIMESTAMP('1970-01 08:00:00')`, return `0`.
|
||||
|
||||
* `CURTIME`: Returns the datetime of specified time zone.
|
||||
|
||||
* `NOW`: Returns the specified date and time of specified time zone.
|
||||
|
||||
* `CONVERT_TZ`: Converts a date and time from one specified time zone to another.
|
||||
|
||||
## Restrictions
|
||||
|
||||
Time zone values can be given in several formats, case-insensitive:
|
||||
|
||||
* A string representing UTC offset, such as '+10:00' or '-6:00'.
|
||||
|
||||
* Standard time zone formats, such as "Asia/Shanghai", "America/Los_Angeles"
|
||||
|
||||
* Abbreviated time zone formats such as MET and CTT are not supported. Because the abbreviated time zone is ambiguous in different scenarios, it is not recommended to use it.
|
||||
|
||||
* In order to be compatible with Doris and support CST abbreviated time zone, CST will be internally transferred to "Asia/Shanghai", which is Chinese standard time zone.
|
||||
|
||||
## Time zone format list
|
||||
|
||||
[List of TZ database time zones](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones)
|
||||
|
||||
@ -26,4 +26,478 @@ under the License.
|
||||
|
||||
# Variable
|
||||
|
||||
This document focuses on currently supported variables.
|
||||
|
||||
Variables in Doris refer to variable settings in MySQL. However, some of the variables are only used to be compatible with some MySQL client protocols, and do not produce their actual meaning in the MySQL database.
|
||||
|
||||
## Variable setting and viewing
|
||||
|
||||
### View
|
||||
|
||||
All or specified variables can be viewed via `SHOW VARIABLES [LIKE 'xxx'];`. Such as:
|
||||
|
||||
```
|
||||
SHOW VARIABLES;
|
||||
SHOW VARIABLES LIKE '%time_zone%';
|
||||
```
|
||||
|
||||
### Settings
|
||||
|
||||
Some variables can be set at global-level or session-only. For global-level, the set value will be used in subsequent new session connections. For session-only, the variable only works for the current session.
|
||||
|
||||
For session-only, set by the `SET var_name=xxx;` statement. Such as:
|
||||
|
||||
```
|
||||
SET exec_mem_limit = 137438953472;
|
||||
SET forward_to_master = true;
|
||||
SET time_zone = "Asia/Shanghai";
|
||||
```
|
||||
|
||||
For global-level, set by `SET GLOBAL var_name=xxx;`. Such as:
|
||||
|
||||
```
|
||||
SET GLOBAL exec_mem_limit = 137438953472
|
||||
```
|
||||
|
||||
> Note 1: Only ADMIN users can set variable at global-level.
|
||||
> Note 2: Global-level variables do not affect variable values in the current session, only variables in new sessions.
|
||||
|
||||
Variables that support both session-level and global-level setting include:
|
||||
|
||||
* `time_zone`
|
||||
* `wait_timeout`
|
||||
* `sql_mode`
|
||||
* `enable_profile`
|
||||
* `query_timeout`
|
||||
* `exec_mem_limit`
|
||||
* `batch_size`
|
||||
* `parallel_fragment_exec_instance_num`
|
||||
* `parallel_exchange_instance_num`
|
||||
* `allow_partition_column_nullable`
|
||||
* `insert_visible_timeout_ms`
|
||||
* `enable_fold_constant_by_be`
|
||||
|
||||
Variables that support only global-level setting include:
|
||||
|
||||
* `default_rowset_type`
|
||||
|
||||
At the same time, variable settings also support constant expressions. Such as:
|
||||
|
||||
```
|
||||
SET exec_mem_limit = 10 * 1024 * 1024 * 1024;
|
||||
SET forward_to_master = concat('tr', 'u', 'e');
|
||||
```
|
||||
|
||||
### Set variables in the query statement
|
||||
|
||||
In some scenarios, we may need to set variables specifically for certain queries.
|
||||
The SET_VAR hint sets the session value of a system variable temporarily (for the duration of a single statement). Examples:
|
||||
|
||||
```
|
||||
SELECT /*+ SET_VAR(exec_mem_limit = 8589934592) */ name FROM people ORDER BY name;
|
||||
SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3);
|
||||
```
|
||||
|
||||
Note that the comment must start with /*+ and can only follow the SELECT.
|
||||
|
||||
## Supported variables
|
||||
|
||||
* `SQL_AUTO_IS_NULL`
|
||||
|
||||
Used for compatible JDBC connection pool C3P0. No practical effect.
|
||||
|
||||
* `auto_increment_increment`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `autocommit`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `batch_size`
|
||||
|
||||
Used to specify the number of rows of a single packet transmitted by each node during query execution. By default, the number of rows of a packet is 1024 rows. That is, after the source node generates 1024 rows of data, it is packaged and sent to the destination node.
|
||||
|
||||
A larger number of rows will increase the throughput of the query in the case of scanning large data volumes, but may increase the query delay in small query scenario. At the same time, it also increases the memory overhead of the query. The recommended setting range is 1024 to 4096.
|
||||
|
||||
* `character_set_client`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `character_set_connection`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `character_set_results`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `character_set_server`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `codegen_level`
|
||||
|
||||
Used to set the level of LLVM codegen. (Not currently in effect).
|
||||
|
||||
* `collation_connection`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `collation_database`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `collation_server`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `delete_without_partition`
|
||||
|
||||
When set to true. When using the delete command to delete partition table data, no partition is required. The delete operation will be automatically applied to all partitions.
|
||||
|
||||
Note, however, that the automatic application to all partitions may cause the delete command to take a long time to trigger a large number of subtasks and cause a long time. If it is not necessary, it is not recommended to turn it on.
|
||||
|
||||
* `disable_colocate_join`
|
||||
|
||||
Controls whether the [Colocation Join](./colocation-join.md) function is enabled. The default is false, which means that the feature is enabled. True means that the feature is disabled. When this feature is disabled, the query plan will not attempt to perform a Colocation Join.
|
||||
|
||||
* `enable_bucket_shuffle_join`
|
||||
|
||||
Controls whether the [Bucket Shuffle Join] (./bucket-shuffle-join.md) function is enabled. The default is true, which means that the feature is enabled. False means that the feature is disabled. When this feature is disabled, the query plan will not attempt to perform a Bucket Shuffle Join.
|
||||
|
||||
* `disable_streaming_preaggregations`
|
||||
|
||||
Controls whether streaming pre-aggregation is turned on. The default is false, which is enabled. Currently not configurable and enabled by default.
|
||||
|
||||
* `enable_insert_strict`
|
||||
|
||||
Used to set the `strict` mode when loading data via INSERT statement. The default is false, which means that the `strict` mode is not turned on. For an introduction to this mode, see [here](./load-data/insert-into-manual.md).
|
||||
|
||||
* `enable_spilling`
|
||||
|
||||
Used to set whether to enable external sorting. The default is false, which turns off the feature. This feature is enabled when the user does not specify a LIMIT condition for the ORDER BY clause and also sets `enable_spilling` to true. When this feature is enabled, the temporary data is stored in the `doris-scratch/` directory of the BE data directory and the temporary data is cleared after the query is completed.
|
||||
|
||||
This feature is mainly used for sorting operations with large amounts of data using limited memory.
|
||||
|
||||
Note that this feature is experimental and does not guarantee stability. Please turn it on carefully.
|
||||
|
||||
* `exec_mem_limit`
|
||||
|
||||
Used to set the memory limit for a single query. The default is 2GB, you can set it in B/K/KB/M/MB/G/GB/T/TB/P/PB, the default is B.
|
||||
|
||||
This parameter is used to limit the memory that can be used by an instance of a single query fragment in a query plan. A query plan may have multiple instances, and a BE node may execute one or more instances. Therefore, this parameter does not accurately limit the memory usage of a query across the cluster, nor does it accurately limit the memory usage of a query on a single BE node. The specific needs need to be judged according to the generated query plan.
|
||||
|
||||
Usually, only some blocking nodes (such as sorting node, aggregation node, and join node) consume more memory, while in other nodes (such as scan node), data is streamed and does not occupy much memory.
|
||||
|
||||
When a `Memory Exceed Limit` error occurs, you can try to increase the parameter exponentially, such as 4G, 8G, 16G, and so on.
|
||||
|
||||
* `forward_to_master`
|
||||
|
||||
The user sets whether to forward some commands to the Master FE node for execution. The default is `true`, which means no forwarding. There are multiple FE nodes in Doris, one of which is the Master node. Usually users can connect to any FE node for full-featured operation. However, some of detail information can only be obtained from the Master FE node.
|
||||
|
||||
For example, the `SHOW BACKENDS;` command, if not forwarded to the Master FE node, can only see some basic information such as whether the node is alive, and forwarded to the Master FE to obtain more detailed information including the node startup time and the last heartbeat time.
|
||||
|
||||
The commands currently affected by this parameter are as follows:
|
||||
|
||||
1. `SHOW FRONTEND;`
|
||||
|
||||
Forward to Master to view the last heartbeat information.
|
||||
|
||||
2. `SHOW BACKENDS;`
|
||||
|
||||
Forward to Master to view startup time, last heartbeat information, and disk capacity information.
|
||||
|
||||
3. `SHOW BROKERS;`
|
||||
|
||||
Forward to Master to view the start time and last heartbeat information.
|
||||
|
||||
4. `SHOW TABLET;`/`ADMIN SHOW REPLICA DISTRIBUTION;`/`ADMIN SHOW REPLICA STATUS;`
|
||||
|
||||
Forward to Master to view the tablet information stored in the Master FE metadata. Under normal circumstances, the tablet information in different FE metadata should be consistent. When a problem occurs, this method can be used to compare the difference between the current FE and Master FE metadata.
|
||||
|
||||
5. `SHOW PROC;`
|
||||
|
||||
Forward to Master to view information about the relevant PROC stored in the Master FE metadata. Mainly used for metadata comparison.
|
||||
|
||||
* `init_connect`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `interactive_timeout`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `enable_profile`
|
||||
|
||||
Used to set whether you need to view the profile of the query. The default is false, which means no profile is required.
|
||||
|
||||
By default, the BE sends a profile to the FE for viewing errors only if an error occurs in the query. A successful query will not send a profile. Sending a profile will incur a certain amount of network overhead, which is detrimental to a high concurrent query scenario.
|
||||
|
||||
When the user wants to analyze the profile of a query, the query can be sent after this variable is set to true. After the query is finished, you can view the profile on the web page of the currently connected FE:
|
||||
|
||||
`fe_host:fe_http:port/query`
|
||||
|
||||
It will display the most recent 100 queries which `enable_profile` is set to true.
|
||||
|
||||
* `language`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `license`
|
||||
|
||||
Show Doris's license. No other effect.
|
||||
|
||||
* `load_mem_limit`
|
||||
|
||||
Used to specify the memory limit of the load operation. The default is 0, which means that this variable is not used, and `exec_mem_limit` is used as the memory limit for the load operation.
|
||||
|
||||
This variable is usually used for INSERT operations. Because the INSERT operation has both query and load part. If the user does not set this variable, the respective memory limits of the query and load part are `exec_mem_limit`. Otherwise, the memory of query part of INSERT is limited to `exec_mem_limit`, and the load part is limited to` load_mem_limit`.
|
||||
|
||||
For other load methods, such as BROKER LOAD, STREAM LOAD, the memory limit still uses `exec_mem_limit`.
|
||||
|
||||
* `lower_case_table_names`
|
||||
|
||||
Used to control whether the user table name is case-sensitive.
|
||||
|
||||
A value of 0 makes the table name case-sensitive. The default is 0.
|
||||
|
||||
When the value is 1, the table name is case insensitive. Doris will convert the table name to lowercase when storing and querying.
|
||||
The advantage is that any case of table name can be used in one statement. The following SQL is correct:
|
||||
```
|
||||
mysql> show tables;
|
||||
+------------------+
|
||||
| Tables_ in_testdb|
|
||||
+------------------+
|
||||
| cost |
|
||||
+------------------+
|
||||
mysql> select * from COST where COst.id < 100 order by cost.id;
|
||||
```
|
||||
The disadvantage is that the table name specified in the table creation statement cannot be obtained after table creation. The table name viewed by 'show tables' is lower case of the specified table name.
|
||||
|
||||
When the value is 2, the table name is case insensitive. Doris stores the table name specified in the table creation statement and converts it to lowercase for comparison during query.
|
||||
The advantage is that the table name viewed by 'show tables' is the table name specified in the table creation statement;
|
||||
The disadvantage is that only one case of table name can be used in the same statement. For example, the table name 'cost' can be used to query the 'cost' table:
|
||||
```
|
||||
mysql> select * from COST where COST.id < 100 order by COST.id;
|
||||
```
|
||||
|
||||
This variable is compatible with MySQL and must be configured at cluster initialization by specifying `lower_case_table_names=` in fe.conf. It cannot be modified by the `set` statement after cluster initialization is complete, nor can it be modified by restarting or upgrading the cluster.
|
||||
|
||||
The system view table names in information_schema are case-insensitive and behave as 2 when the value of `lower_case_table_names` is 0.
|
||||
|
||||
Translated with www.DeepL.com/Translator (free version)
|
||||
|
||||
* `max_allowed_packet`
|
||||
|
||||
Used for compatible JDBC connection pool C3P0. No practical effect.
|
||||
|
||||
* `max_pushdown_conditions_per_column`
|
||||
|
||||
For the specific meaning of this variable, please refer to the description of `max_pushdown_conditions_per_column` in [BE Configuration](./config/be_config.md). This variable is set to -1 by default, which means that the configuration value in `be.conf` is used. If the setting is greater than 0, the query in the current session will use the variable value, and ignore the configuration value in `be.conf`.
|
||||
|
||||
* `max_scan_key_num`
|
||||
|
||||
For the specific meaning of this variable, please refer to the description of `doris_max_scan_key_num` in [BE Configuration](./config/be_config.md). This variable is set to -1 by default, which means that the configuration value in `be.conf` is used. If the setting is greater than 0, the query in the current session will use the variable value, and ignore the configuration value in `be.conf`.
|
||||
|
||||
* `net_buffer_length`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `net_read_timeout`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `net_write_timeout`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `parallel_exchange_instance_num`
|
||||
|
||||
Used to set the number of exchange nodes used by an upper node to receive data from the lower node in the execution plan. The default is -1, which means that the number of exchange nodes is equal to the number of execution instances of the lower nodes (default behavior). When the setting is greater than 0 and less than the number of execution instances of the lower node, the number of exchange nodes is equal to the set value.
|
||||
|
||||
In a distributed query execution plan, the upper node usually has one or more exchange nodes for receiving data from the execution instances of the lower nodes on different BEs. Usually the number of exchange nodes is equal to the number of execution instances of the lower nodes.
|
||||
|
||||
In some aggregate query scenarios, if the amount of data to be scanned at the bottom is large, but the amount of data after aggregation is small, you can try to modify this variable to a smaller value, which can reduce the resource overhead of such queries. Such as the scenario of aggregation query on the DUPLICATE KEY data model.
|
||||
|
||||
* `parallel_fragment_exec_instance_num`
|
||||
|
||||
For the scan node, set its number of instances to execute on each BE node. The default is 1.
|
||||
|
||||
A query plan typically produces a set of scan ranges, the range of data that needs to be scanned. These data are distributed across multiple BE nodes. A BE node will have one or more scan ranges. By default, a set of scan ranges for each BE node is processed by only one execution instance. When the machine resources are abundant, you can increase the variable and let more execution instances process a set of scan ranges at the same time, thus improving query efficiency.
|
||||
|
||||
The number of scan instances determines the number of other execution nodes in the upper layer, such as aggregate nodes and join nodes. Therefore, it is equivalent to increasing the concurrency of the entire query plan execution. Modifying this parameter will help improve the efficiency of large queries, but larger values will consume more machine resources, such as CPU, memory, and disk IO.
|
||||
|
||||
* `query_cache_size`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `query_cache_type`
|
||||
|
||||
Used for compatible JDBC connection pool C3P0. No practical effect.
|
||||
|
||||
* `query_timeout`
|
||||
|
||||
Used to set the query timeout. This variable applies to all query statements in the current connection, as well as INSERT statements. The default is 5 minutes, in seconds.
|
||||
|
||||
* `resource_group`
|
||||
|
||||
Not used.
|
||||
|
||||
* `send_batch_parallelism`
|
||||
|
||||
Used to set the default parallelism for sending batch when execute InsertStmt operation, if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, then the coordinator BE will use the value of `max_send_batch_parallelism_per_job`.
|
||||
|
||||
* `sql_mode`
|
||||
|
||||
Used to specify SQL mode to accommodate certain SQL dialects. For the SQL mode, see [here](./sql-mode.md).
|
||||
|
||||
* `sql_safe_updates`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `sql_select_limit`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `system_time_zone`
|
||||
|
||||
Displays the current system time zone. Cannot be changed.
|
||||
|
||||
* `time_zone`
|
||||
|
||||
Used to set the time zone of the current session. The time zone has an effect on the results of certain time functions. For the time zone, see [here](./time-zone.md).
|
||||
|
||||
* `tx_isolation`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `tx_read_only`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `transaction_read_only`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `transaction_isolation`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `version`
|
||||
|
||||
Used for compatibility with MySQL clients. No practical effect.
|
||||
|
||||
* `performance_schema`
|
||||
|
||||
Used for compatibility with MySQL JDBC 8.0.16 or later version. No practical effect.
|
||||
|
||||
* `version_comment`
|
||||
|
||||
Used to display the version of Doris. Cannot be changed.
|
||||
|
||||
* `wait_timeout`
|
||||
|
||||
The length of the connection used to set up an idle connection. When an idle connection does not interact with Doris for that length of time, Doris will actively disconnect the link. The default is 8 hours, in seconds.
|
||||
|
||||
* `default_rowset_type`
|
||||
|
||||
Used for setting the default storage format of Backends storage engine. Valid options: alpha/beta
|
||||
|
||||
* `use_v2_rollup`
|
||||
|
||||
Used to control the sql query to use segment v2 rollup index to get data. This variable is only used for validation when upgrading to segment v2 feature. Otherwise, not recommended to use.
|
||||
|
||||
* `rewrite_count_distinct_to_bitmap_hll`
|
||||
|
||||
Whether to rewrite count distinct queries of bitmap and HLL types as bitmap_union_count and hll_union_agg.
|
||||
|
||||
* `prefer_join_method`
|
||||
|
||||
When choosing the join method(broadcast join or shuffle join), if the broadcast join cost and shuffle join cost are equal, which join method should we prefer.
|
||||
|
||||
Currently, the optional values for this variable are "broadcast" or "shuffle".
|
||||
|
||||
* `allow_partition_column_nullable`
|
||||
|
||||
Whether to allow the partition column to be NULL when creating the table. The default is true, which means NULL is allowed. false means the partition column must be defined as NOT NULL.
|
||||
|
||||
* `insert_visible_timeout_ms`
|
||||
|
||||
When execute insert statement, doris will wait for the transaction to commit and visible after the import is completed.
|
||||
This parameter controls the timeout of waiting for transaction to be visible. The default value is 10000, and the minimum value is 1000.
|
||||
|
||||
* `enable_exchange_node_parallel_merge`
|
||||
|
||||
In a sort query, when an upper level node receives the ordered data of the lower level node, it will sort the corresponding data on the exchange node to ensure that the final data is ordered. However, when a single thread merges multiple channels of data, if the amount of data is too large, it will lead to a single point of exchange node merge bottleneck.
|
||||
|
||||
Doris optimizes this part if there are too many data nodes in the lower layer. Exchange node will start multithreading for parallel merging to speed up the sorting process. This parameter is false by default, which means that exchange node does not adopt parallel merge sort to reduce the extra CPU and memory consumption.
|
||||
|
||||
* `extract_wide_range_expr`
|
||||
|
||||
Used to control whether turn on the 'Wide Common Factors' rule. The value has two: true or false. On by default.
|
||||
|
||||
* `enable_fold_constant_by_be`
|
||||
|
||||
Used to control the calculation method of constant folding. The default is `false`, that is, calculation is performed in `FE`; if it is set to `true`, it will be calculated by `BE` through `RPC` request.
|
||||
|
||||
* `cpu_resource_limit`
|
||||
|
||||
Used to limit the resource overhead of a query. This is an experimental feature. The current implementation is to limit the number of scan threads for a query on a single node. The number of scan threads is limited, and the data returned from the bottom layer slows down, thereby limiting the overall computational resource overhead of the query. Assuming it is set to 2, a query can use up to 2 scan threads on a single node.
|
||||
|
||||
This parameter will override the effect of `parallel_fragment_exec_instance_num`. That is, assuming that `parallel_fragment_exec_instance_num` is set to 4, and this parameter is set to 2. Then 4 execution instances on a single node will share up to 2 scanning threads.
|
||||
|
||||
This parameter will be overridden by the `cpu_resource_limit` configuration in the user property.
|
||||
|
||||
The default is -1, which means no limit.
|
||||
|
||||
* `disable_join_reorder`
|
||||
|
||||
Used to turn off all automatic join reorder algorithms in the system. There are two values: true and false.It is closed by default, that is, the automatic join reorder algorithm of the system is adopted. After set to true, the system will close all automatic sorting algorithms, adopt the original SQL table order, and execute join
|
||||
|
||||
* `enable_infer_predicate`
|
||||
|
||||
Used to control whether to perform predicate derivation. There are two values: true and false. It is turned off by default, that is, the system does not perform predicate derivation, and uses the original predicate to perform related operations. After it is set to true, predicate expansion is performed.
|
||||
|
||||
* `return_object_data_as_binary`
|
||||
Used to identify whether to return the bitmap/hll result in the select result. In the select into outfile statement, if the export file format is csv, the bimap/hll data will be base64-encoded, if it is the parquet file format, the data will be stored as a byte array
|
||||
|
||||
* `block_encryption_mode`
|
||||
The block_encryption_mode variable controls the block encryption mode. The default setting is empty, when use AES equal to `AES_128_ECB`, when use SM4 equal to `SM3_128_ECB`
|
||||
available values:
|
||||
|
||||
```
|
||||
AES_128_ECB,
|
||||
AES_192_ECB,
|
||||
AES_256_ECB,
|
||||
AES_128_CBC,
|
||||
AES_192_CBC,
|
||||
AES_256_CBC,
|
||||
AES_128_CFB,
|
||||
AES_192_CFB,
|
||||
AES_256_CFB,
|
||||
AES_128_CFB1,
|
||||
AES_192_CFB1,
|
||||
AES_256_CFB1,
|
||||
AES_128_CFB8,
|
||||
AES_192_CFB8,
|
||||
AES_256_CFB8,
|
||||
AES_128_CFB128,
|
||||
AES_192_CFB128,
|
||||
AES_256_CFB128,
|
||||
AES_128_CTR,
|
||||
AES_192_CTR,
|
||||
AES_256_CTR,
|
||||
AES_128_OFB,
|
||||
AES_192_OFB,
|
||||
AES_256_OFB,
|
||||
SM4_128_ECB,
|
||||
SM4_128_CBC,
|
||||
SM4_128_CFB128,
|
||||
SM4_128_OFB,
|
||||
SM4_128_CTR,
|
||||
```
|
||||
|
||||
* `enable_infer_predicate`
|
||||
|
||||
Used to control whether predicate deduction is performed. There are two values: true and false. It is turned off by default, and the system does not perform predicate deduction, and uses the original predicate for related operations. When set to true, predicate expansion occurs.
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -24,4 +24,50 @@ specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# User 配置项
|
||||
# User 配置项
|
||||
|
||||
该文档主要介绍了 User 级别的相关配置项。User 级别的配置生效范围为单个用户。每个用户都可以设置自己的 User property。相互不影响。
|
||||
|
||||
## 查看配置项
|
||||
|
||||
FE 启动后,在 MySQL 客户端,通过下面命令查看 User 的配置项:
|
||||
|
||||
`SHOW PROPERTY [FOR user] [LIKE key pattern]`
|
||||
|
||||
具体语法可通过命令:`help show property;` 查询。
|
||||
|
||||
## 设置配置项
|
||||
|
||||
FE 启动后,在MySQL 客户端,通过下面命令修改 User 的配置项:
|
||||
|
||||
`SET PROPERTY [FOR 'user'] 'key' = 'value' [, 'key' = 'value']`
|
||||
|
||||
具体语法可通过命令:`help set property;` 查询。
|
||||
|
||||
User 级别的配置项只会对指定用户生效,并不会影响其他用户的配置。
|
||||
|
||||
## 应用举例
|
||||
|
||||
1. 修改用户 Billie 的 `max_user_connections`
|
||||
|
||||
通过 `SHOW PROPERTY FOR 'Billie' LIKE '%max_user_connections%';` 查看 Billie 用户当前的最大链接数为 100。
|
||||
|
||||
通过 `SET PROPERTY FOR 'Billie' 'max_user_connections' = '200';` 修改 Billie 用户的当前最大连接数到 200。
|
||||
|
||||
## 配置项列表
|
||||
|
||||
### max_user_connections
|
||||
|
||||
用户最大的连接数,默认值为100。一般情况不需要更改该参数,除非查询的并发数超过了默认值。
|
||||
|
||||
### max_query_instances
|
||||
|
||||
用户同一时间点可使用的instance个数, 默认是-1,小于等于0将会使用配置default_max_query_instances.
|
||||
|
||||
### resource
|
||||
|
||||
### quota
|
||||
|
||||
### default_load_cluster
|
||||
|
||||
### load_cluster
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
---
|
||||
{
|
||||
"title": "用户及权限管理",
|
||||
"title": "权限管理",
|
||||
"language": "zh-CN"
|
||||
}
|
||||
---
|
||||
@ -24,4 +24,196 @@ specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# 权限管理
|
||||
# 权限管理
|
||||
|
||||
Doris 新的权限管理系统参照了 Mysql 的权限管理机制,做到了表级别细粒度的权限控制,基于角色的权限访问控制,并且支持白名单机制。
|
||||
|
||||
## 名词解释
|
||||
|
||||
1. 用户标识 user_identity
|
||||
|
||||
在权限系统中,一个用户被识别为一个 User Identity(用户标识)。用户标识由两部分组成:username 和 userhost。其中 username 为用户名,由英文大小写组成。userhost 表示该用户链接来自的 IP。user_identity 以 username@'userhost' 的方式呈现,表示来自 userhost 的 username。
|
||||
|
||||
user_identity 的另一种表现方式为 username@['domain'],其中 domain 为域名,可以通过 DNS 或 BNS(百度名字服务)解析为一组 ip。最终表现为一组 username@'userhost',所以后面我们统一使用 username@'userhost' 来表示。
|
||||
|
||||
2. 权限 Privilege
|
||||
|
||||
权限作用的对象是节点、数据库或表。不同的权限代表不同的操作许可。
|
||||
|
||||
3. 角色 Role
|
||||
|
||||
Doris可以创建自定义命名的角色。角色可以被看做是一组权限的集合。新创建的用户可以被赋予某一角色,则自动被赋予该角色所拥有的权限。后续对角色的权限变更,也会体现在所有属于该角色的用户权限上。
|
||||
|
||||
4. 用户属性 user_property
|
||||
|
||||
用户属性直接附属于某一用户,而不是用户标识。即 cmy@'192.%' 和 cmy@['domain'] 都拥有同一组用户属性,该属性属于用户 cmy,而不是 cmy@'192.%' 或 cmy@['domain']。
|
||||
|
||||
用户属性包括但不限于: 用户最大连接数、导入集群配置等等。
|
||||
|
||||
## 支持的操作
|
||||
|
||||
1. 创建用户:CREATE USER
|
||||
2. 删除用户:DROP USER
|
||||
3. 授权:GRANT
|
||||
4. 撤权:REVOKE
|
||||
5. 创建角色:CREATE ROLE
|
||||
6. 删除角色:DROP ROLE
|
||||
7. 查看当前用户权限:SHOW GRANTS
|
||||
8. 查看所有用户权限:SHOW ALL GRANTS
|
||||
9. 查看已创建的角色:SHOW ROLES
|
||||
10. 查看用户属性:SHOW PROPERTY
|
||||
|
||||
关于以上命令的详细帮助,可以通过 mysql 客户端连接 Doris 后,使用 help + command 获取帮助。如 `HELP CREATE USER`。
|
||||
|
||||
## 权限类型
|
||||
|
||||
Doris 目前支持以下几种权限
|
||||
|
||||
1. Node_priv
|
||||
|
||||
节点变更权限。包括 FE、BE、BROKER 节点的添加、删除、下线等操作。目前该权限只能授予 Root 用户。
|
||||
|
||||
2. Grant_priv
|
||||
|
||||
权限变更权限。允许执行包括授权、撤权、添加/删除/变更 用户/角色 等操作。
|
||||
|
||||
3. Select_priv
|
||||
|
||||
对数据库、表的只读权限。
|
||||
|
||||
4. Load_priv
|
||||
|
||||
对数据库、表的写权限。包括 Load、Insert、Delete 等。
|
||||
|
||||
5. Alter_priv
|
||||
|
||||
对数据库、表的更改权限。包括重命名 库/表、添加/删除/变更 列、添加/删除 分区等操作。
|
||||
|
||||
6. Create_priv
|
||||
|
||||
创建数据库、表、视图的权限。
|
||||
|
||||
7. Drop_priv
|
||||
|
||||
删除数据库、表、视图的权限。
|
||||
|
||||
8. Usage_priv
|
||||
|
||||
资源的使用权限。
|
||||
|
||||
## 权限层级
|
||||
|
||||
同时,根据权限适用范围的不同,我们将库表的权限分为以下三个层级:
|
||||
|
||||
1. GLOBAL LEVEL:全局权限。即通过 GRANT 语句授予的 `*.*` 上的权限。被授予的权限适用于任意数据库中的任意表。
|
||||
2. DATABASE LEVEL:数据库级权限。即通过 GRANT 语句授予的 `db.*` 上的权限。被授予的权限适用于指定数据库中的任意表。
|
||||
3. TABLE LEVEL:表级权限。即通过 GRANT 语句授予的 `db.tbl` 上的权限。被授予的权限适用于指定数据库中的指定表。
|
||||
|
||||
将资源的权限分为以下两个层级:
|
||||
|
||||
1. GLOBAL LEVEL:全局权限。即通过 GRANT 语句授予的 `*` 上的权限。被授予的权限适用于资源。
|
||||
2. RESOURCE LEVEL: 资源级权限。即通过 GRANT 语句授予的 `resource_name` 上的权限。被授予的权限适用于指定资源。
|
||||
|
||||
## ADMIN/GRANT 权限说明
|
||||
|
||||
ADMIN_PRIV 和 GRANT_PRIV 权限同时拥有**授予权限**的权限,较为特殊。这里对和这两个权限相关的操作逐一说明。
|
||||
|
||||
1. CREATE USER
|
||||
- 拥有 ADMIN 权限,或任意层级的 GRANT 权限的用户可以创建新用户。
|
||||
2. DROP USER
|
||||
- 只有 ADMIN 权限可以删除用户。
|
||||
3. CREATE/DROP ROLE
|
||||
- 只有 ADMIN 权限可以创建角色。
|
||||
4. GRANT/REVOKE
|
||||
- 拥有 ADMIN 权限,或者 GLOBAL 层级 GRANT 权限的用户,可以授予或撤销任意用户的权限。
|
||||
- 拥有 DATABASE 层级 GRANT 权限的用户,可以授予或撤销任意用户对指定数据库的权限。
|
||||
- 拥有 TABLE 层级 GRANT 权限的用户,可以授予或撤销任意用户对指定数据库中指定表的权限。
|
||||
5. SET PASSWORD
|
||||
- 拥有 ADMIN 权限,或者 GLOBAL 层级 GRANT 权限的用户,可以设置任意用户的密码。
|
||||
- 普通用户可以设置自己对应的 UserIdentity 的密码。自己对应的 UserIdentity 可以通过 `SELECT CURRENT_USER();` 命令查看。
|
||||
- 拥有非 GLOBAL 层级 GRANT 权限的用户,不可以设置已存在用户的密码,仅能在创建用户时指定密码。
|
||||
|
||||
## 一些说明
|
||||
|
||||
1. Doris 初始化时,会自动创建如下用户和角色:
|
||||
1. operator 角色:该角色拥有 Node_priv 和 Admin_priv,即对Doris的所有权限。后续某个升级版本中,我们可能会将该角色的权限限制为 Node_priv,即仅授予节点变更权限。以满足某些云上部署需求。
|
||||
2. admin 角色:该角色拥有 Admin_priv,即除节点变更以外的所有权限。
|
||||
3. root@'%':root 用户,允许从任意节点登陆,角色为 operator。
|
||||
4. admin@'%':admin 用户,允许从任意节点登陆,角色为 admin。
|
||||
2. 不支持删除或更改默认创建的角色或用户的权限。
|
||||
|
||||
3. operator 角色的用户有且只有一个。admin 角色的用户可以创建多个。
|
||||
|
||||
4. 一些可能产生冲突的操作说明
|
||||
|
||||
1. 域名与ip冲突:
|
||||
|
||||
假设创建了如下用户:
|
||||
|
||||
CREATE USER cmy@['domain'];
|
||||
|
||||
并且授权:
|
||||
|
||||
GRANT SELECT_PRIV ON *.* TO cmy@['domain']
|
||||
|
||||
该 domain 被解析为两个 ip:ip1 和 ip2
|
||||
|
||||
假设之后,我们对 cmy@'ip1' 进行一次单独授权:
|
||||
|
||||
GRANT ALTER_PRIV ON *.* TO cmy@'ip1';
|
||||
|
||||
则 cmy@'ip1' 的权限会被修改为 SELECT_PRIV, ALTER_PRIV。并且当我们再次变更 cmy@['domain'] 的权限时,cmy@'ip1' 也不会跟随改变。
|
||||
|
||||
2. 重复ip冲突:
|
||||
|
||||
假设创建了如下用户:
|
||||
|
||||
CREATE USER cmy@'%' IDENTIFIED BY "12345";
|
||||
|
||||
CREATE USER cmy@'192.%' IDENTIFIED BY "abcde";
|
||||
|
||||
在优先级上,'192.%' 优先于 '%',因此,当用户 cmy 从 192.168.1.1 这台机器尝试使用密码 '12345' 登陆 Doris 会被拒绝。
|
||||
|
||||
5. 忘记密码
|
||||
|
||||
如果忘记了密码无法登陆 Doris,可以在 Doris FE 节点所在机器,使用如下命令无密码登陆 Doris:
|
||||
|
||||
`mysql-client -h 127.0.0.1 -P query_port -uroot`
|
||||
|
||||
登陆后,可以通过 SET PASSWORD 命令重置密码。
|
||||
|
||||
6. 任何用户都不能重置 root 用户的密码,除了 root 用户自己。
|
||||
|
||||
7. ADMIN_PRIV 权限只能在 GLOBAL 层级授予或撤销。
|
||||
|
||||
8. 拥有 GLOBAL 层级 GRANT_PRIV 其实等同于拥有 ADMIN_PRIV,因为该层级的 GRANT_PRIV 有授予任意权限的权限,请谨慎使用。
|
||||
|
||||
9. `current_user()` 和 `user()`
|
||||
|
||||
用户可以通过 `SELECT current_user();` 和 `SELECT user();` 分别查看 `current_user` 和 `user`。其中 `current_user` 表示当前用户是以哪种身份通过认证系统的,而 `user` 则是用户当前实际的 `user_identity`。举例说明:
|
||||
|
||||
假设创建了 `user1@'192.%'` 这个用户,然后以为来自 192.168.10.1 的用户 user1 登陆了系统,则此时的 `current_user` 为 `user1@'192.%'`,而 `user` 为 `user1@'192.168.10.1'`。
|
||||
|
||||
所有的权限都是赋予某一个 `current_user` 的,真实用户拥有对应的 `current_user` 的所有权限。
|
||||
|
||||
## 最佳实践
|
||||
|
||||
这里举例一些 Doris 权限系统的使用场景。
|
||||
|
||||
1. 场景一
|
||||
|
||||
Doris 集群的使用者分为管理员(Admin)、开发工程师(RD)和用户(Client)。其中管理员拥有整个集群的所有权限,主要负责集群的搭建、节点管理等。开发工程师负责业务建模,包括建库建表、数据的导入和修改等。用户访问不同的数据库和表来获取数据。
|
||||
|
||||
在这种场景下,可以为管理员赋予 ADMIN 权限或 GRANT 权限。对 RD 赋予对任意或指定数据库表的 CREATE、DROP、ALTER、LOAD、SELECT 权限。对 Client 赋予对任意或指定数据库表 SELECT 权限。同时,也可以通过创建不同的角色,来简化对多个用户的授权操作。
|
||||
|
||||
2. 场景二
|
||||
|
||||
一个集群内有多个业务,每个业务可能使用一个或多个数据。每个业务需要管理自己的用户。在这种场景下。管理员用户可以为每个数据库创建一个拥有 DATABASE 层级 GRANT 权限的用户。该用户仅可以对用户进行指定的数据库的授权。
|
||||
|
||||
3. 黑名单
|
||||
|
||||
Doris 本身不支持黑名单,只有白名单功能,但我们可以通过某些方式来模拟黑名单。假设先创建了名为 `user@'192.%'` 的用户,表示允许来自 `192.*` 的用户登录。此时如果想禁止来自 `192.168.10.1` 的用户登录。则可以再创建一个用户 `cmy@'192.168.10.1'` 的用户,并设置一个新的密码。因为 `192.168.10.1` 的优先级高于 `192.%`,所以来自 `192.168.10.1` 将不能再使用旧密码进行登录。
|
||||
|
||||
## 更多帮助
|
||||
|
||||
关于 权限管理 使用的更多详细语法及最佳实践,请参阅 [GRANTS](../sql-manual/sql-reference-v2/Account-Management-Statements/GRANT.html) 命令手册,你也可以在 MySql 客户端命令行下输入 `HELP GRANTS` 获取更多帮助信息。
|
||||
@ -24,4 +24,170 @@ specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# Broker
|
||||
# Broker
|
||||
|
||||
Broker 是 Doris 集群中一种可选进程,主要用于支持 Doris 读写远端存储上的文件和目录,如 HDFS、BOS 和 AFS 等。
|
||||
|
||||
Broker 通过提供一个 RPC 服务端口来提供服务,是一个无状态的 Java 进程,负责为远端存储的读写操作封装一些类 POSIX 的文件操作,如 open,pread,pwrite 等等。除此之外,Broker 不记录任何其他信息,所以包括远端存储的连接信息、文件信息、权限信息等等,都需要通过参数在 RPC 调用中传递给 Broker 进程,才能使得 Broker 能够正确读写文件。
|
||||
|
||||
Broker 仅作为一个数据通路,并不参与任何计算,因此仅需占用较少的内存。通常一个 Doris 系统中会部署一个或多个 Broker 进程。并且相同类型的 Broker 会组成一个组,并设定一个 **名称(Broker name)**。
|
||||
|
||||
Broker 在 Doris 系统架构中的位置如下:
|
||||
|
||||
```text
|
||||
+----+ +----+
|
||||
| FE | | BE |
|
||||
+-^--+ +--^-+
|
||||
| |
|
||||
| |
|
||||
+-v---------v-+
|
||||
| Broker |
|
||||
+------^------+
|
||||
|
|
||||
|
|
||||
+------v------+
|
||||
|HDFS/BOS/AFS |
|
||||
+-------------+
|
||||
```
|
||||
|
||||
本文档主要介绍 Broker 在访问不同远端存储时需要的参数,如连接信息、权限认证信息等等。
|
||||
|
||||
## 支持的存储系统
|
||||
|
||||
不同的 Broker 类型支持不同的存储系统。
|
||||
|
||||
1. 社区版 HDFS
|
||||
- 支持简单认证访问
|
||||
- 支持通过 kerberos 认证访问
|
||||
- 支持 HDFS HA 模式访问
|
||||
2. 百度 HDFS/AFS(开源版本不支持)
|
||||
- 支持通过 ugi 简单认证访问
|
||||
3. 百度对象存储 BOS(开源版本不支持)
|
||||
- 支持通过 AK/SK 认证访问
|
||||
|
||||
## 需要 Broker 的操作
|
||||
|
||||
1. [Broker Load](../data-operate/import/import-way/broker-load-manual.html)
|
||||
2. [数据导出(Export)](../data-operate/export/export-manual.html)
|
||||
3. [数据备份](../admin-manual/data-admin/backup.html)
|
||||
|
||||
## Broker 信息
|
||||
|
||||
Broker 的信息包括 **名称(Broker name)** 和 **认证信息** 两部分。通常的语法格式如下:
|
||||
|
||||
```text
|
||||
WITH BROKER "broker_name"
|
||||
(
|
||||
"username" = "xxx",
|
||||
"password" = "yyy",
|
||||
"other_prop" = "prop_value",
|
||||
...
|
||||
);
|
||||
```
|
||||
|
||||
### 名称
|
||||
|
||||
通常用户需要通过操作命令中的 `WITH BROKER "broker_name"` 子句来指定一个已经存在的 Broker Name。Broker Name 是用户在通过 `ALTER SYSTEM ADD BROKER` 命令添加 Broker 进程时指定的一个名称。一个名称通常对应一个或多个 Broker 进程。Doris 会根据名称选择可用的 Broker 进程。用户可以通过 `SHOW BROKER` 命令查看当前集群中已经存在的 Broker。
|
||||
|
||||
**注:Broker Name 只是一个用户自定义名称,不代表 Broker 的类型。**
|
||||
|
||||
### 认证信息
|
||||
|
||||
不同的 Broker 类型,以及不同的访问方式需要提供不同的认证信息。认证信息通常在 `WITH BROKER "broker_name"` 之后的 Property Map 中以 Key-Value 的方式提供。
|
||||
|
||||
#### 社区版 HDFS
|
||||
|
||||
1. 简单认证
|
||||
|
||||
简单认证即 Hadoop 配置 `hadoop.security.authentication` 为 `simple`。
|
||||
|
||||
使用系统用户访问 HDFS。或者在 Broker 启动的环境变量中添加:`HADOOP_USER_NAME`。
|
||||
|
||||
```text
|
||||
(
|
||||
"username" = "user",
|
||||
"password" = ""
|
||||
);
|
||||
```
|
||||
|
||||
密码置空即可。
|
||||
|
||||
2. Kerberos 认证
|
||||
|
||||
该认证方式需提供以下信息:
|
||||
|
||||
- `hadoop.security.authentication`:指定认证方式为 kerberos。
|
||||
- `kerberos_principal`:指定 kerberos 的 principal。
|
||||
- `kerberos_keytab`:指定 kerberos 的 keytab 文件路径。该文件必须为 Broker 进程所在服务器上的文件的绝对路径。并且可以被 Broker 进程访问。
|
||||
- `kerberos_keytab_content`:指定 kerberos 中 keytab 文件内容经过 base64 编码之后的内容。这个跟 `kerberos_keytab` 配置二选一即可。
|
||||
|
||||
示例如下:
|
||||
|
||||
```text
|
||||
(
|
||||
"hadoop.security.authentication" = "kerberos",
|
||||
"kerberos_principal" = "doris@YOUR.COM",
|
||||
"kerberos_keytab" = "/home/doris/my.keytab"
|
||||
)
|
||||
```
|
||||
|
||||
```text
|
||||
(
|
||||
"hadoop.security.authentication" = "kerberos",
|
||||
"kerberos_principal" = "doris@YOUR.COM",
|
||||
"kerberos_keytab_content" = "ASDOWHDLAWIDJHWLDKSALDJSDIWALD"
|
||||
)
|
||||
```
|
||||
|
||||
如果采用Kerberos认证方式,则部署Broker进程的时候需要[krb5.conf (opens new window)](https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html)文件, krb5.conf文件包含Kerberos的配置信息,通常,您应该将krb5.conf文件安装在目录/etc中。您可以通过设置环境变量KRB5_CONFIG覆盖默认位置。 krb5.conf文件的内容示例如下:
|
||||
|
||||
```text
|
||||
[libdefaults]
|
||||
default_realm = DORIS.HADOOP
|
||||
default_tkt_enctypes = des3-hmac-sha1 des-cbc-crc
|
||||
default_tgs_enctypes = des3-hmac-sha1 des-cbc-crc
|
||||
dns_lookup_kdc = true
|
||||
dns_lookup_realm = false
|
||||
|
||||
[realms]
|
||||
DORIS.HADOOP = {
|
||||
kdc = kerberos-doris.hadoop.service:7005
|
||||
}
|
||||
```
|
||||
|
||||
3. HDFS HA 模式
|
||||
|
||||
这个配置用于访问以 HA 模式部署的 HDFS 集群。
|
||||
|
||||
- `dfs.nameservices`:指定 hdfs 服务的名字,自定义,如:"dfs.nameservices" = "my_ha"。
|
||||
- `dfs.ha.namenodes.xxx`:自定义 namenode 的名字,多个名字以逗号分隔。其中 xxx 为 `dfs.nameservices` 中自定义的名字,如: "dfs.ha.namenodes.my_ha" = "my_nn"。
|
||||
- `dfs.namenode.rpc-address.xxx.nn`:指定 namenode 的rpc地址信息。其中 nn 表示 `dfs.ha.namenodes.xxx` 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn" = "host:port"。
|
||||
- `dfs.client.failover.proxy.provider`:指定 client 连接 namenode 的 provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。
|
||||
|
||||
示例如下:
|
||||
|
||||
```text
|
||||
(
|
||||
"dfs.nameservices" = "my_ha",
|
||||
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
|
||||
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
|
||||
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
|
||||
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
|
||||
)
|
||||
```
|
||||
|
||||
HA 模式可以和前面两种认证方式组合,进行集群访问。如通过简单认证访问 HA HDFS:
|
||||
|
||||
```text
|
||||
(
|
||||
"username"="user",
|
||||
"password"="passwd",
|
||||
"dfs.nameservices" = "my_ha",
|
||||
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
|
||||
"dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
|
||||
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
|
||||
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
|
||||
)
|
||||
```
|
||||
|
||||
关于HDFS集群的配置可以写入hdfs-site.xml文件中,用户使用Broker进程读取HDFS集群的信息时,只需要填写集群的文件路径名和认证信息即可。
|
||||
@ -24,4 +24,125 @@ specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# 资源管理
|
||||
# 资源管理
|
||||
|
||||
为了节省Doris集群内的计算、存储资源,Doris需要引入一些其他外部资源来完成相关的工作,如Spark/GPU用于查询,HDFS/S3用于外部存储,Spark/MapReduce用于ETL, 通过ODBC连接外部存储等,因此我们引入资源管理机制来管理Doris使用的这些外部资源。
|
||||
|
||||
## 基本概念
|
||||
|
||||
一个资源包含名字、类型等基本信息,名字为全局唯一,不同类型的资源包含不同的属性,具体参考各资源的介绍。
|
||||
|
||||
资源的创建和删除只能由拥有 `admin` 权限的用户进行操作。一个资源隶属于整个Doris集群。拥有 `admin` 权限的用户可以将使用权限`usage_priv` 赋给普通用户。可参考`HELP GRANT`或者权限文档。
|
||||
|
||||
## 具体操作
|
||||
|
||||
资源管理主要有三个命令:`CREATE RESOURCE`,`DROP RESOURCE`和`SHOW RESOURCES`,分别为创建、删除和查看资源。这三个命令的具体语法可以通过MySQL客户端连接到 Doris 后,执行 `HELP cmd` 的方式查看帮助。
|
||||
|
||||
1. CREATE RESOURCE
|
||||
|
||||
该语句用于创建资源。具体操作可参考 [CREATE RESOURCE](../sql-manual/sql-reference-v2/Data-Definition-Statements/Create/CREATE-RESOURCE.html)。
|
||||
|
||||
2. DROP RESOURCE
|
||||
|
||||
该命令可以删除一个已存在的资源。具体操作见 [DROP RESOURCE](../sql-manual/sql-reference-v2/Data-Definition-Statements/Drop/DROP-RESOURCE.html) 。
|
||||
|
||||
3. SHOW RESOURCES
|
||||
|
||||
该命令可以查看用户有使用权限的资源。具体操作见 [SHOW RESOURCES](../sql-manual/sql-reference-v2/Show-Statements/SHOW-RESOURCES.html)。
|
||||
|
||||
## 支持的资源
|
||||
|
||||
目前Doris能够支持
|
||||
|
||||
- Spark资源 : 完成ETL工作。
|
||||
- ODBC资源:查询和导入外部表的数据
|
||||
|
||||
下面将分别展示两种资源的使用方式。
|
||||
|
||||
### Spark
|
||||
|
||||
#### 参数
|
||||
|
||||
##### Spark 相关参数如下:
|
||||
|
||||
`spark.master`: 必填,目前支持yarn,spark://host:port。
|
||||
|
||||
`spark.submit.deployMode`: Spark 程序的部署模式,必填,支持 cluster,client 两种。
|
||||
|
||||
`spark.hadoop.yarn.resourcemanager.address`: master为yarn时必填。
|
||||
|
||||
`spark.hadoop.fs.defaultFS`: master为yarn时必填。
|
||||
|
||||
其他参数为可选,参考http://spark.apache.org/docs/latest/configuration.html。
|
||||
|
||||
##### 如果Spark用于ETL,还需要指定以下参数:
|
||||
|
||||
`working_dir`: ETL 使用的目录。spark作为ETL资源使用时必填。例如:hdfs://host:port/tmp/doris。
|
||||
|
||||
`broker`: broker 名字。spark作为ETL资源使用时必填。需要使用`ALTER SYSTEM ADD BROKER` 命令提前完成配置。
|
||||
|
||||
- `broker.property_key`: broker读取ETL生成的中间文件时需要指定的认证信息等。
|
||||
|
||||
#### 示例
|
||||
|
||||
创建 yarn cluster 模式,名为 spark0 的 Spark 资源。
|
||||
|
||||
```sql
|
||||
CREATE EXTERNAL RESOURCE "spark0"
|
||||
PROPERTIES
|
||||
(
|
||||
"type" = "spark",
|
||||
"spark.master" = "yarn",
|
||||
"spark.submit.deployMode" = "cluster",
|
||||
"spark.jars" = "xxx.jar,yyy.jar",
|
||||
"spark.files" = "/tmp/aaa,/tmp/bbb",
|
||||
"spark.executor.memory" = "1g",
|
||||
"spark.yarn.queue" = "queue0",
|
||||
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
|
||||
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
|
||||
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
|
||||
"broker" = "broker0",
|
||||
"broker.username" = "user0",
|
||||
"broker.password" = "password0"
|
||||
);
|
||||
```
|
||||
|
||||
### ODBC
|
||||
|
||||
#### 参数
|
||||
|
||||
##### ODBC 相关参数如下:
|
||||
|
||||
`type`: 必填,且必须为`odbc_catalog`。作为resource的类型标识。
|
||||
|
||||
`user`: 外部表的账号,必填。
|
||||
|
||||
`password`: 外部表的密码,必填。
|
||||
|
||||
`host`: 外部表的连接ip地址,必填。
|
||||
|
||||
`port`: 外部表的连接端口,必填。
|
||||
|
||||
`odbc_type`: 标示外部表的类型,当前doris支持`mysql`与`oracle`,未来可能支持更多的数据库。引用该resource的ODBC外表必填,旧的mysql外表选填。
|
||||
|
||||
`driver`: 标示外部表使用的driver动态库,引用该resource的ODBC外表必填,旧的mysql外表选填。
|
||||
|
||||
具体如何使用可以,可以参考[ODBC of Doris](https://doris.apache.org/zh-CN/extending-doris/odbc-of-doris.html)
|
||||
|
||||
#### 示例
|
||||
|
||||
创建oracle的odbc resource,名为 odbc_oracle 的 odbc_catalog的 资源。
|
||||
|
||||
```sql
|
||||
CREATE EXTERNAL RESOURCE `oracle_odbc`
|
||||
PROPERTIES (
|
||||
"type" = "odbc_catalog",
|
||||
"host" = "192.168.0.1",
|
||||
"port" = "8086",
|
||||
"user" = "test",
|
||||
"password" = "test",
|
||||
"database" = "test",
|
||||
"odbc_type" = "oracle",
|
||||
"driver" = "Oracle 19 ODBC driver"
|
||||
);
|
||||
```
|
||||
Reference in New Issue
Block a user