[feature](cold-hot) support s3 resource (#8808)

Add cold hot support in FE meta, support alter resource DDL in FE
This commit is contained in:
qiye
2022-04-13 09:52:03 +08:00
committed by GitHub
parent 64cf64d1f8
commit bca121333e
45 changed files with 1871 additions and 471 deletions

View File

@ -623,6 +623,7 @@ module.exports = [
initialOpenGroupIndex: -1,
children: [
"ALTER DATABASE",
"ALTER RESOURCE",
"ALTER TABLE",
"ALTER VIEW",
"BACKUP",
@ -634,6 +635,7 @@ module.exports = [
"CREATE INDEX",
"CREATE MATERIALIZED VIEW",
"CREATE REPOSITORY",
"CREATE RESOURCE",
"CREATE TABLE LIKE",
"CREATE TABLE",
"CREATE VIEW",
@ -643,6 +645,7 @@ module.exports = [
"DROP INDEX",
"DROP MATERIALIZED VIEW",
"DROP REPOSITORY",
"DROP RESOURCE",
"DROP TABLE",
"DROP VIEW",
"HLL",
@ -651,6 +654,7 @@ module.exports = [
"REFRESH TABLE",
"RESTORE",
"SHOW ENCRYPTKEYS",
"SHOW RESOURCES",
"TRUNCATE TABLE",
"create-function",
"drop-function",

View File

@ -637,6 +637,7 @@ module.exports = [
initialOpenGroupIndex: -1,
children: [
"ALTER DATABASE",
"ALTER RESOURCE",
"ALTER TABLE",
"ALTER VIEW",
"BACKUP",

View File

@ -25,114 +25,117 @@ under the License.
-->
# ALTER SYSTEM
## Description
This statement is used to operate on nodes in a system. (Administrator only!)
Grammar:
1) Adding nodes (without multi-tenant functionality, add in this way)
ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
2) Adding idle nodes (that is, adding BACKEND that does not belong to any cluster)
ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
3) Adding nodes to a cluster
ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...];
4) Delete nodes
ALTER SYSTEM DROP BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
5) Node offline
ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
6)226;- 21152;-Broker
ALTER SYSTEM ADD BROKER broker_name "host:port"[,"host:port"...];
(7) 20943;"23569;" Broker
ALTER SYSTEM DROP BROKER broker_name "host:port"[,"host:port"...];
8) Delete all Brokers
ALTER SYSTEM DROP ALL BROKER broker_name
9) Set up a Load error hub for centralized display of import error information
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES ("key" = "value"[, ...]);
10) Modify property of BE
ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
This statement is used to operate on nodes in a system. (Administrator only!)
Syntax:
1) Adding nodes (without multi-tenant functionality, add in this way)
ALTER SYSTEM ADD BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
2) Adding idle nodes (that is, adding BACKEND that does not belong to any cluster)
ALTER SYSTEM ADD FREE BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
3) Adding nodes to a cluster
ALTER SYSTEM ADD BACKEND TO cluster_name "host:heartbeat_port"[,"host:heartbeat_port"...];
4) Delete nodes
ALTER SYSTEM DROP BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
5) Node offline
ALTER SYSTEM DECOMMISSION BACKEND "host:heartbeat_port"[,"host:heartbeat_port"...];
6) Add Broker
ALTER SYSTEM ADD BROKER broker_name "host:port"[,"host:port"...];
7) Drop Broker
ALTER SYSTEM DROP BROKER broker_name "host:port"[,"host:port"...];
8) Delete all Brokers
ALTER SYSTEM DROP ALL BROKER broker_name
9) Set up a Load error hub for centralized display of import error information
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES ("key" = "value"[, ...]);
10) Modify property of BE
ALTER SYSTEM MODIFY BACKEND "host:heartbeat_port" SET ("key" = "value"[, ...]);
Explain:
1) Host can be hostname or IP address
2) heartbeat_port is the heartbeat port of the node
3) Adding and deleting nodes are synchronous operations. These two operations do not take into account the existing data on the node, the node is directly deleted from the metadata, please use cautiously.
4) Node offline operations are used to secure offline nodes. This operation is asynchronous. If successful, the node will eventually be removed from the metadata. If it fails, the offline will not be completed.
5) The offline operation of the node can be cancelled manually. See CANCEL DECOMMISSION for details
6) Load error hub:
Currently, two types of Hub are supported: Mysql and Broker. You need to specify "type" = "mysql" or "type" = "broker" in PROPERTIES.
If you need to delete the current load error hub, you can set type to null.
1) When using the Mysql type, the error information generated when importing will be inserted into the specified MySQL library table, and then the error information can be viewed directly through the show load warnings statement.
Explain:
1) Host can be hostname or IP address
2) heartbeat_port is the heartbeat port of the node
3) Adding and deleting nodes are synchronous operations. These two operations do not take into account the existing data on the node, the node is directly deleted from the metadata, please use cautiously.
4) Node offline operations are used to secure offline nodes. This operation is asynchronous. If successful, the node will eventually be removed from the metadata. If it fails, the offline will not be completed.
5) The offline operation of the node can be cancelled manually. See CANCEL DECOMMISSION for details
6) Load error hub:
Currently, two types of Hub are supported: Mysql and Broker. You need to specify "type" = "mysql" or "type" = "broker" in PROPERTIES.
If you need to delete the current load error hub, you can set type to null.
1) When using the Mysql type, the error information generated when importing will be inserted into the specified MySQL library table, and then the error information can be viewed directly through the show load warnings statement.
Hub of Mysql type needs to specify the following parameters:
host: mysql host
port: mysql port
user: mysql user
password: mysql password
database mysql database
table: mysql table
2) When the Broker type is used, the error information generated when importing will form a file and be written to the designated remote storage system through the broker. Make sure that the corresponding broker is deployed
Hub of Broker type needs to specify the following parameters:
Broker: Name of broker
Path: Remote Storage Path
Other properties: Other information necessary to access remote storage, such as authentication information.
7) Modify BE node attributes currently supports the following attributes:
1. tag.location:Resource tag
2. disable_query: Query disabled attribute
3. disable_load: Load disabled attribute
Hub of Mysql type needs to specify the following parameters:
host: mysql host
port: mysql port
user: mysql user
password: mysql password
database mysql database
table: mysql table
## Example
2) When the Broker type is used, the error information generated when importing will form a file and be written to the designated remote storage system through the broker. Make sure that the corresponding broker is deployed
Hub of Broker type needs to specify the following parameters:
Broker: Name of broker
Path: Remote Storage Path
Other properties: Other information necessary to access remote storage, such as authentication information.
7) Modify BE node attributes currently supports the following attributes:
1. tag.location:Resource tag
2. disable_query: Query disabled attribute
3. disable_load: Load disabled attribute
## example
1. Add a node
ALTER SYSTEM ADD BACKEND "host:port";
2. Adding an idle node
ALTER SYSTEM ADD FREE BACKEND "host:port";
3. Delete two nodes
ALTER SYSTEM DROP BACKEND "host1:port", "host2:port";
4. offline two nodes
ALTER SYSTEM DECOMMISSION BACKEND "host1:port", "host2:port";
5. Add two Hdfs Broker
ALTER SYSTEM ADD BROKER hdfs "host1:port", "host2:port";
6. Add a load error hub of Mysql type
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
("type"= "mysql",
"host" = "192.168.1.17"
"port" = "3306",
"User" = "my" name,
"password" = "my_passwd",
"database" = "doris_load",
"table" = "load_errors"
);
7. 添加一个 Broker 类型的 load error hub
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
("type"= "broker",
"Name" = BOS,
"path" = "bos://backup-cmy/logs",
"bos_endpoint" ="http://gz.bcebos.com",
"bos_accesskey" = "069fc278xxxxxx24ddb522",
"bos_secret_accesskey"="700adb0c6xxxxxx74d59eaa980a"
);
8. Delete the current load error hub
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
("type"= "null");
9. Modify BE resource tag
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a");
10. Modify the query disabled attribute of BE
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true");
11. Modify the load disabled attribute of BE
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true");
1. Add a node
ALTER SYSTEM ADD BACKEND "host:port";
2. Adding an idle node
ALTER SYSTEM ADD FREE BACKEND "host:port";
3. Delete two nodes
ALTER SYSTEM DROP BACKEND "host1:port", "host2:port";
4. offline two nodes
ALTER SYSTEM DECOMMISSION BACKEND "host1:port", "host2:port";
5. Add two Hdfs Broker
ALTER SYSTEM ADD BROKER hdfs "host1:port", "host2:port";
6. Add a load error hub of Mysql type
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
("type"= "mysql",
"host" = "192.168.1.17"
"port" = "3306",
"User" = "my" name,
"password" = "my_passwd",
"database" = "doris_load",
"table" = "load_errors"
);
7. 添加一个 Broker 类型的 load error hub
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
("type"= "broker",
"Name" = BOS,
"path" = "bos://backup-cmy/logs",
"bos_endpoint" ="http://gz.bcebos.com",
"bos_accesskey" = "069fc278xxxxxx24ddb522",
"bos_secret_accesskey"="700adb0c6xxxxxx74d59eaa980a"
);
8. Delete the current load error hub
ALTER SYSTEM SET LOAD ERRORS HUB PROPERTIES
("type"= "null");
9. Modify BE resource tag
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("tag.location" = "group_a");
10. Modify the query disabled attribute of BE
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_query" = "true");
11. Modify the load disabled attribute of BE
ALTER SYSTEM MODIFY BACKEND "host1:9050" SET ("disable_load" = "true");
## keyword
AGE,SYSTEM,BACKGROUND,BROKER,FREE
AGE, SYSTEM, BACKGROUND, BROKER, FREE

View File

@ -0,0 +1,48 @@
---
{
"title": "ALTER RESOURCE",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# ALTER RESOURCE
## Description
This statement is used to modify an existing resource. Only the root or admin user can modify resources.
Syntax:
ALTER RESOURCE 'resource_name'
PROPERTIES ("key"="value", ...);
Note: The resource type does not support modification.
## Example
1. Modify the working directory of the Spark resource named spark0:
ALTER RESOURCE 'spark0' PROPERTIES ("working_dir" = "hdfs://127.0.0.1:10000/tmp/doris_new");
2. Modify the maximum number of connections to the S3 resource named remote_s3:
ALTER RESOURCE 'remote_s3' PROPERTIES ("s3_max_connections" = "100");
## keyword
ALTER, RESOURCE

View File

@ -71,6 +71,7 @@ under the License.
1) The following attributes of the modified partition are currently supported.
- storage_medium
- storage_cooldown_time
- remote_storage_cooldown_time
- replication_num
— in_memory
2) For single-partition tables, partition_name is the same as the table name.

View File

@ -0,0 +1,134 @@
---
{
"title": "CREATE RESOURCE",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# CREATE RESOURCE
## Description
This statement is used to create a resource. Only the root or admin user can create resources. Currently supports Spark, ODBC, S3 external resources.
In the future, other external resources may be added to Doris for use, such as Spark/GPU for query, HDFS/S3 for external storage, MapReduce for ETL, etc.
Syntax:
CREATE [EXTERNAL] RESOURCE "resource_name"
PROPERTIES ("key"="value", ...);
Explanation:
1. The type of resource needs to be specified in PROPERTIES "type" = "[spark|odbc_catalog|s3]", currently supports spark, odbc_catalog, s3.
2. The PROPERTIES varies according to the resource type, see the example for details.
## Example
1. Create a Spark resource named spark0 in yarn cluster mode.
````
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
````
Spark related parameters are as follows:
- spark.master: Required, currently supports yarn, spark://host:port.
- spark.submit.deployMode: The deployment mode of the Spark program, required, supports both cluster and client.
- spark.hadoop.yarn.resourcemanager.address: Required when master is yarn.
- spark.hadoop.fs.defaultFS: Required when master is yarn.
- Other parameters are optional, refer to http://spark.apache.org/docs/latest/configuration.html
Working_dir and broker need to be specified when Spark is used for ETL. described as follows:
working_dir: The directory used by the ETL. Required when spark is used as an ETL resource. For example: hdfs://host:port/tmp/doris.
broker: broker name. Required when spark is used as an ETL resource. Configuration needs to be done in advance using the `ALTER SYSTEM ADD BROKER` command.
broker.property_key: The authentication information that the broker needs to specify when reading the intermediate file generated by ETL.
2. Create an ODBC resource
````
CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
"type" = "odbc_catalog",
"host" = "192.168.0.1",
"port" = "8086",
"user" = "test",
"password" = "test",
"database" = "test",
"odbc_type" = "oracle",
"driver" = "Oracle 19 ODBC driver"
);
````
The relevant parameters of ODBC are as follows:
- hosts: IP address of the external database
- driver: The driver name of the ODBC appearance, which must be the same as the Driver name in be/conf/odbcinst.ini.
- odbc_type: the type of the external database, currently supports oracle, mysql, postgresql
- user: username of the foreign database
- password: the password information of the corresponding user
3. Create S3 resource
````
CREATE RESOURCE "remote_s3"
PROPERTIES
(
"type" = "s3",
"s3_endpoint" = "http://bj.s3.com",
"s3_region" = "bj",
"s3_root_path" = "/path/to/root",
"s3_access_key" = "bbb",
"s3_secret_key" = "aaaa",
"s3_max_connections" = "50",
"s3_request_timeout_ms" = "3000",
"s3_connection_timeout_ms" = "1000"
);
````
S3 related parameters are as follows:
- required
- s3_endpoint: s3 endpoint
- s3_region: s3 region
- s3_root_path: s3 root directory
- s3_access_key: s3 access key
- s3_secret_key: s3 secret key
- optional
- s3_max_connections: the maximum number of s3 connections, the default is 50
- s3_request_timeout_ms: s3 request timeout, in milliseconds, the default is 3000
- s3_connection_timeout_ms: s3 connection timeout, in milliseconds, the default is 1000
## keyword
CREATE, RESOURCE

View File

@ -297,18 +297,24 @@ Syntax:
PROPERTIES (
"storage_medium" = "[SSD|HDD]",
["storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
["remote_storage_resource" = "xxx"],
["remote_storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
["replication_num" = "3"],
["replication_allocation" = "xxx"]
["replication_allocation" = "xxx"]
)
```
storage_medium: SSD or HDD, The default initial storage media can be specified by `default_storage_medium= XXX` in the fe configuration file `fe.conf`, or, if not, by default, HDD.
Note: when FE configuration 'enable_strict_storage_medium_check' is' True ', if the corresponding storage medium is not set in the cluster, the construction clause 'Failed to find enough host in all backends with storage medium is SSD|HDD'.
storage_cooldown_time: If storage_medium is SSD, data will be automatically moved to HDD when timeout.
Default is 30 days.
Format: "yyyy-MM-dd HH:mm:ss"
replication_num: Replication number of a partition. Default is 3.
replication_allocation: Specify the distribution of replicas according to the resource tag.
storage_medium: SSD or HDD, The default initial storage media can be specified by `default_storage_medium= XXX` in the fe configuration file `fe.conf`, or, if not, by default, HDD.
Note: when FE configuration 'enable_strict_storage_medium_check' is' True ', if the corresponding storage medium is not set in the cluster, the construction clause 'Failed to find enough host in all backends with storage medium is SSD|HDD'.
storage_cooldown_time: If storage_medium is SSD, data will be automatically moved to HDD when timeout.
Default is 30 days.
Format: "yyyy-MM-dd HH:mm:ss"
remote_storage_resource: The remote storage resource name, which needs to be used in conjunction with the storage_cold_medium parameter.
remote_storage_cooldown_time: Used in conjunction with remote_storage_resource. Indicates the expiration time of the partition stored locally.
Does not expire by default. Must be later than storage_cooldown_time if used with it.
The format is: "yyyy-MM-dd HH:mm:ss"
replication_num: Replication number of a partition. Default is 3.
replication_allocation: Specify the distribution of replicas according to the resource tag.
If table is not range partitions. This property takes on Table level. Or it will takes on Partition level.
User can specify different properties for different partition by `ADD PARTITION` or `MODIFY PARTITION` statements.
@ -353,7 +359,7 @@ Syntax:
dynamic_partition.reserved_history_periods: Used to specify the range of reserved history periods
```
5) You can create multiple Rollups in bulk when building a table
5) You can create multiple Rollups in bulk when building a table
grammar:
```
ROLLUP (rollup_name (column_name1, column_name2, ...)
@ -405,68 +411,89 @@ Syntax:
"storage_cooldown_time" = "2015-06-04 00:00:00"
);
```
3. Create an olap table, with range partitioned, distributed by hash. Records with the same key exist at the same time, set the initial storage medium and cooling time, use default column storage.
1) LESS THAN
3. Create an olap table, distributed by hash, with aggregation type. Also set storage medium and cooldown time.
Setting up remote storage resource and cold data storage media.
```
CREATE TABLE example_db.table_range
CREATE TABLE example_db.table_hash
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
k1 BIGINT,
k2 LARGEINT,
v1 VARCHAR(2048) REPLACE,
v2 SMALLINT SUM DEFAULT "10"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1)
(
PARTITION p1 VALUES LESS THAN ("2014-01-01"),
PARTITION p2 VALUES LESS THAN ("2014-06-01"),
PARTITION p3 VALUES LESS THAN ("2014-12-01")
)
DISTRIBUTED BY HASH(k2) BUCKETS 32
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD", "storage_cooldown_time" = "2015-06-04 00:00:00"
"storage_medium" = "SSD",
"storage_cooldown_time" = "2015-06-04 00:00:00",
"remote_storage_resource" = "remote_s3",
"remote_storage_cooldown_time" = "2015-12-04 00:00:00"
);
```
Explain:
This statement will create 3 partitions:
```
( { MIN }, {"2014-01-01"} )
[ {"2014-01-01"}, {"2014-06-01"} )
[ {"2014-06-01"}, {"2014-12-01"} )
```
Data outside these ranges will not be loaded.
```
2) Fixed Range
```
CREATE TABLE table_range
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1, k2, k3)
(
PARTITION p1 VALUES [("2014-01-01", "10", "200"), ("2014-01-01", "20", "300")),
PARTITION p2 VALUES [("2014-06-01", "100", "200"), ("2014-07-01", "100", "300"))
)
DISTRIBUTED BY HASH(k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD"
);
```
4. Create an olap table, with list partitioned, distributed by hash. Records with the same key exist at the same time, set the initial storage medium and cooling time, use default column storage.
4. Create an olap table, with range partitioned, distributed by hash. Records with the same key exist at the same time, set the initial storage medium and cooling time, use default column storage.
1) LESS THAN
```
CREATE TABLE example_db.table_range
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1)
(
PARTITION p1 VALUES LESS THAN ("2014-01-01"),
PARTITION p2 VALUES LESS THAN ("2014-06-01"),
PARTITION p3 VALUES LESS THAN ("2014-12-01")
)
DISTRIBUTED BY HASH(k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD", "storage_cooldown_time" = "2015-06-04 00:00:00"
);
```
Explain:
This statement will create 3 partitions:
```
( { MIN }, {"2014-01-01"} )
[ {"2014-01-01"}, {"2014-06-01"} )
[ {"2014-06-01"}, {"2014-12-01"} )
```
Data outside these ranges will not be loaded.
2) Fixed Range
```
CREATE TABLE table_range
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1, k2, k3)
(
PARTITION p1 VALUES [("2014-01-01", "10", "200"), ("2014-01-01", "20", "300")),
PARTITION p2 VALUES [("2014-06-01", "100", "200"), ("2014-07-01", "100", "300"))
)
DISTRIBUTED BY HASH(k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD"
);
```
5. Create an olap table, with list partitioned, distributed by hash. Records with the same key exist at the same time, set the initial storage medium and cooling time, use default column storage.
1) Single column partition
@ -540,9 +567,9 @@ Syntax:
Data that is not within these partition enumeration values will be filtered as illegal data
5. Create a mysql table
5.1 Create MySQL table directly from external table information
```
6. Create a mysql table
6.1 Create MySQL table directly from external table information
```
CREATE EXTERNAL TABLE example_db.table_mysql
(
k1 DATE,
@ -561,21 +588,20 @@ Syntax:
"database" = "mysql_db_test",
"table" = "mysql_table_test"
)
```
```
5.2 Create MySQL table with external ODBC catalog resource
```
CREATE EXTERNAL RESOURCE "mysql_resource"
PROPERTIES
(
"type" = "odbc_catalog",
"user" = "mysql_user",
"password" = "mysql_passwd",
"host" = "127.0.0.1",
"port" = "8239"
);
```
```
6.2 Create MySQL table with external ODBC catalog resource
```
CREATE EXTERNAL RESOURCE "mysql_resource"
PROPERTIES
(
"type" = "odbc_catalog",
"user" = "mysql_user",
"password" = "mysql_passwd",
"host" = "127.0.0.1",
"port" = "8239"
);
CREATE EXTERNAL TABLE example_db.table_mysql
(
k1 DATE,
@ -590,10 +616,10 @@ Syntax:
"odbc_catalog_resource" = "mysql_resource",
"database" = "mysql_db_test",
"table" = "mysql_table_test"
)
```
);
```
6. Create a broker table, with file on HDFS, line delimit by "|", column separated by "\n"
7. Create a broker table, with file on HDFS, line delimit by "|", column separated by "\n"
```
CREATE EXTERNAL TABLE example_db.table_broker (
@ -616,7 +642,7 @@ Syntax:
);
```
7. Create table will HLL column
8. Create table will HLL column
```
CREATE TABLE example_db.example_table
@ -631,7 +657,7 @@ Syntax:
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
8. Create a table will BITMAP_UNION column
9. Create a table will BITMAP_UNION column
```
CREATE TABLE example_db.example_table
@ -645,21 +671,21 @@ Syntax:
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
9. Create a table with QUANTILE_UNION column (the origin value of **v1** and **v2** columns must be **numeric** types)
10. Create a table with QUANTILE_UNION column (the origin value of **v1** and **v2** columns must be **numeric** types)
```
CREATE TABLE example_db.example_table
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
v1 QUANTILE_STATE QUANTILE_UNION,
v2 QUANTILE_STATE QUANTILE_UNION
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
10. Create 2 colocate join table.
```
CREATE TABLE example_db.example_table
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
v1 QUANTILE_STATE QUANTILE_UNION,
v2 QUANTILE_STATE QUANTILE_UNION
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
11. Create 2 colocate join table.
```
CREATE TABLE `t1` (
@ -682,7 +708,7 @@ Syntax:
);
```
11. Create a broker table, with file on BOS.
12. Create a broker table, with file on BOS.
```
CREATE EXTERNAL TABLE example_db.table_broker (
@ -700,7 +726,7 @@ Syntax:
);
```
12. Create a table with a bitmap index
13. Create a table with a bitmap index
```
CREATE TABLE example_db.table_hash
@ -717,7 +743,7 @@ Syntax:
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
13. Create a dynamic partitioning table (dynamic partitioning needs to be enabled in FE configuration), which creates partitions 3 days in advance every day. For example, if today is' 2020-01-08 ', partitions named 'p20200108', 'p20200109', 'p20200110', 'p20200111' will be created.
14. Create a dynamic partitioning table (dynamic partitioning needs to be enabled in FE configuration), which creates partitions 3 days in advance every day. For example, if today is' 2020-01-08 ', partitions named 'p20200108', 'p20200109', 'p20200110', 'p20200111' will be created.
```
[types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; )
@ -726,29 +752,29 @@ Syntax:
[types: [DATE]; keys: [2020-01-11]; ‥types: [DATE]; keys: [2020-01-12]; )
```
```
CREATE TABLE example_db.dynamic_partition
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1) ()
DISTRIBUTED BY HASH(k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
);
```
14. Create a table with rollup index
```
```
CREATE TABLE example_db.dynamic_partition
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1) ()
DISTRIBUTED BY HASH(k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
);
```
15. Create a table with rollup index
```
CREATE TABLE example_db.rolup_index_table
(
event_day DATE,
@ -765,11 +791,11 @@ Syntax:
r3(event_day)
)
PROPERTIES("replication_num" = "3");
```
```
15. Create a inmemory table:
16. Create a inmemory table:
```
```
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
@ -783,10 +809,10 @@ Syntax:
COMMENT "my first doris table"
DISTRIBUTED BY HASH(k1) BUCKETS 32
PROPERTIES ("in_memory"="true");
```
```
16. Create a hive external table
```
17. Create a hive external table
```
CREATE TABLE example_db.table_hive
(
k1 TINYINT,
@ -800,11 +826,11 @@ Syntax:
"table" = "hive_table_name",
"hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
```
```
17. Specify the replica distribution of the table through replication_allocation
18. Specify the replica distribution of the table through replication_allocation
```
```
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
@ -812,9 +838,9 @@ Syntax:
)
DISTRIBUTED BY HASH(k1) BUCKETS 32
PROPERTIES (
"replication_allocation"="tag.location.group_a:1, tag.location.group_b:2"
);
"replication_allocation"="tag.location.group_a:1, tag.location.group_b:2"
);
CREATE TABLE example_db.dynamic_partition
(
k1 DATE,
@ -833,11 +859,11 @@ Syntax:
"dynamic_partition.buckets" = "32",
"dynamic_partition."replication_allocation" = "tag.location.group_a:3"
);
```
```
17. Create an Iceberg external table
19. Create an Iceberg external table
```
```
CREATE TABLE example_db.t_iceberg
ENGINE=ICEBERG
PROPERTIES (
@ -846,7 +872,7 @@ Syntax:
"iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);
```
```
## keyword

View File

@ -0,0 +1,46 @@
---
{
"title": "DROP RESOURCE",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# DROP RESOURCE
## Description
This statement is used to delete an existing resource. Only the root or admin user can delete resources.
Syntax:
DROP RESOURCE 'resource_name'
Note: ODBC/S3 resources that are in use cannot be deleted.
## Example
1. Delete the Spark resource named spark0:
DROP RESOURCE 'spark0';
## keyword
DROP, RESOURCE

View File

@ -25,17 +25,19 @@ under the License.
-->
# SHOW RESOURCES
## description
This statement is used to display the resources that the user has permission to use. Ordinary users can only display the resources with permission, while root or admin users can display all the resources.
## Description
This statement is used to display the resources that the user has permission to use.
Ordinary users can only display the resources with permission, while root or admin users can display all the resources.
Grammar
Syntax:
SHOW RESOURCES
[
WHERE
[NAME [ = "your_resource_name" | LIKE "name_matcher"]]
[RESOURCETYPE = ["SPARK"]]
[RESOURCETYPE = ["[spark|odbc_catalog|s3]"]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];
@ -48,7 +50,8 @@ under the License.
5) If LIMIT is specified, limit matching records are displayed. Otherwise, it is all displayed.
6) If OFFSET is specified, the query results are displayed starting with the offset offset. The offset is 0 by default.
## example
## Example
1. Display all resources that the current user has permissions on
SHOW RESOURCES;
@ -60,5 +63,5 @@ under the License.
## keyword
SHOW, RESOURCES
SHOW RESOURCES, RESOURCES

View File

@ -0,0 +1,48 @@
---
{
"title": "ALTER RESOURCE",
"language": "zh-CN"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# ALTER RESOURCE
## Description
该语句用于修改一个已有的资源。仅 root 或 admin 用户可以修改资源。
语法:
ALTER RESOURCE 'resource_name'
PROPERTIES ("key"="value", ...);
注意:resource type 不支持修改。
## Example
1. 修改名为 spark0 的 Spark 资源的工作目录:
ALTER RESOURCE 'spark0' PROPERTIES ("working_dir" = "hdfs://127.0.0.1:10000/tmp/doris_new");
2. 修改名为 remote_s3 的 S3 资源的最大连接数:
ALTER RESOURCE 'remote_s3' PROPERTIES ("s3_max_connections" = "100");
## keyword
ALTER, RESOURCE

View File

@ -71,6 +71,7 @@ under the License.
1) 当前支持修改分区的下列属性:
- storage_medium
- storage_cooldown_time
- remote_storage_cooldown_time
- replication_num
— in_memory
2) 对于单分区表,partition_name 同表名。

View File

@ -25,21 +25,27 @@ under the License.
-->
# CREATE RESOURCE
## description
该语句用于创建资源。仅 root 或 admin 用户可以创建资源。目前仅支持 Spark 外部资源。将来其他外部资源可能会加入到 Doris 中使用,如 Spark/GPU 用于查询,HDFS/S3 用于外部存储,MapReduce 用于 ETL 等。
## Description
该语句用于创建资源。仅 root 或 admin 用户可以创建资源。目前支持 Spark, ODBC, S3 外部资源。
将来其他外部资源可能会加入到 Doris 中使用,如 Spark/GPU 用于查询,HDFS/S3 用于外部存储,MapReduce 用于 ETL 等。
语法:
CREATE [EXTERNAL] RESOURCE "resource_name"
PROPERTIES ("key"="value", ...);
说明:
1. PROPERTIES中需要指定资源的类型 "type" = "spark",目前支持 spark。
1. PROPERTIES中需要指定资源的类型 "type" = "[spark|odbc_catalog|s3]",目前支持 spark, odbc_catalog, s3
2. 根据资源类型的不同 PROPERTIES 有所不同,具体见示例。
## example
## Example
1. 创建yarn cluster 模式,名为 spark0 的 Spark 资源。
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
```
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
@ -53,21 +59,74 @@ under the License.
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
);
```
Spark 相关参数如下:
1. spark.master: 必填,目前支持yarn,spark://host:port。
2. spark.submit.deployMode: Spark 程序的部署模式,必填,支持 cluster,client 两种。
3. spark.hadoop.yarn.resourcemanager.address: master为yarn时必填。
4. spark.hadoop.fs.defaultFS: master为yarn时必填。
5. 其他参数为可选,参考http://spark.apache.org/docs/latest/configuration.html
Spark 用于 ETL 时需要指定 working_dir 和 broker。说明如下:
working_dir: ETL 使用的目录。spark作为ETL资源使用时必填。例如:hdfs://host:port/tmp/doris。
broker: broker 名字。spark作为ETL资源使用时必填。需要使用`ALTER SYSTEM ADD BROKER` 命令提前完成配置。
broker.property_key: broker读取ETL生成的中间文件时需要指定的认证信息等。
Spark 相关参数如下:
- spark.master: 必填,目前支持yarn,spark://host:port。
- spark.submit.deployMode: Spark 程序的部署模式,必填,支持 cluster,client 两种。
- spark.hadoop.yarn.resourcemanager.address: master为yarn时必填。
- spark.hadoop.fs.defaultFS: master为yarn时必填。
- 其他参数为可选,参考http://spark.apache.org/docs/latest/configuration.html
Spark 用于 ETL 时需要指定 working_dir 和 broker。说明如下:
working_dir: ETL 使用的目录。spark作为ETL资源使用时必填。例如:hdfs://host:port/tmp/doris。
broker: broker 名字。spark作为ETL资源使用时必填。需要使用`ALTER SYSTEM ADD BROKER` 命令提前完成配置。
broker.property_key: broker读取ETL生成的中间文件时需要指定的认证信息等。
2. 创建 ODBC resource
```
CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
"type" = "odbc_catalog",
"host" = "192.168.0.1",
"port" = "8086",
"user" = "test",
"password" = "test",
"database" = "test",
"odbc_type" = "oracle",
"driver" = "Oracle 19 ODBC driver"
);
```
ODBC 的相关参数如下:
- hosts:外表数据库的IP地址
- driver:ODBC外表的Driver名,该名字需要和be/conf/odbcinst.ini中的Driver名一致。
- odbc_type:外表数据库的类型,当前支持oracle, mysql, postgresql
- user:外表数据库的用户名
- password:对应用户的密码信息
3. 创建 S3 resource
```
CREATE RESOURCE "remote_s3"
PROPERTIES
(
"type" = "s3",
"s3_endpoint" = "http://bj.s3.com",
"s3_region" = "bj",
"s3_root_path" = "/path/to/root",
"s3_access_key" = "bbb",
"s3_secret_key" = "aaaa",
"s3_max_connections" = "50",
"s3_request_timeout_ms" = "3000",
"s3_connection_timeout_ms" = "1000"
);
```
S3 相关参数如下:
- 必需参数
- s3_endpoint:s3 endpoint
- s3_region:s3 region
- s3_root_path:s3 根目录
- s3_access_key:s3 access key
- s3_secret_key:s3 secret key
- 可选参数
- s3_max_connections:s3 最大连接数量,默认为 50
- s3_request_timeout_ms:s3 请求超时时间,单位毫秒,默认为 3000
- s3_connection_timeout_ms:s3 连接超时时间,单位毫秒,默认为 1000
## keyword
CREATE, RESOURCE
CREATE, RESOURCE

View File

@ -308,18 +308,24 @@ under the License.
PROPERTIES (
"storage_medium" = "[SSD|HDD]",
["storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
["remote_storage_resource" = "xxx"],
["remote_storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
["replication_num" = "3"]
["replication_allocation" = "xxx"]
)
```
storage_medium: 于指定该分区的初始存储介质,可选择 SSD 或 HDD。默认初始存储介质可通过fe的配置文件 `fe.conf` 中指定 `default_storage_medium=xxx`,如果没有指定,则默认为 HDD。
注意:当FE配置项 `enable_strict_storage_medium_check` 为 `True` 时,若集群中没有设置对应的存储介质时,建表语句会报错 `Failed to find enough host in all backends with storage medium is SSD|HDD`.
storage_cooldown_time: 设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
默认存放 30 天。
格式为:"yyyy-MM-dd HH:mm:ss"
replication_num: 指定分区的副本数。默认为 3
replication_allocation: 按照资源标签来指定副本分布
storage_medium: 于指定该分区的初始存储介质,可选择 SSD 或 HDD。默认初始存储介质可通过fe的配置文件 `fe.conf` 中指定 `default_storage_medium=xxx`,如果没有指定,则默认为 HDD。
注意:当FE配置项 `enable_strict_storage_medium_check` 为 `True` 时,若集群中没有设置对应的存储介质时,建表语句会报错 `Failed to find enough host in all backends with storage medium is SSD|HDD`.
storage_cooldown_time: 设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
默认存放 30 天。
格式为:"yyyy-MM-dd HH:mm:ss"
remote_storage_resource: 远端存储资源名称,需要与 remote_storage_cooldown_time 参数搭配使用
remote_storage_cooldown_time: 与 remote_storage_resource 搭配使用。表示该分区在本地存储的到期时间
默认不过期。如果与 storage_cooldown_time 搭配使用必须晚于该时间。
格式为:"yyyy-MM-dd HH:mm:ss"
replication_num: 指定分区的副本数。默认为 3。
replication_allocation: 按照资源标签来指定副本分布。
当表为单分区表时,这些属性为表的属性。
当表为两级分区时,这些属性为附属于每一个分区。
@ -329,31 +335,31 @@ under the License.
bloom filter 索引仅适用于查询条件为 in 和 equal 的情况,该列的值越分散效果越好
目前只支持以下情况的列:除了 TINYINT FLOAT DOUBLE 类型以外的 key 列及聚合方法为 REPLACE 的 value 列
```
```
PROPERTIES (
"bloom_filter_columns"="k1,k2,k3"
)
```
```
3) 如果希望使用 Colocate Join 特性,需要在 properties 中指定
```
PROPERTIES (
"colocate_with"="table1"
)
```
```
PROPERTIES (
"colocate_with"="table1"
)
```
4) 如果希望使用动态分区特性,需要在properties 中指定。注意:动态分区只支持 RANGE 分区
```
PROPERTIES (
"dynamic_partition.enable" = "true|false",
"dynamic_partition.time_unit" = "HOUR|DAY|WEEK|MONTH",
"dynamic_partition.start" = "${integer_value}",
"dynamic_partition.end" = "${integer_value}",
"dynamic_partition.prefix" = "${string_value}",
"dynamic_partition.buckets" = "${integer_value}
```
```
PROPERTIES (
"dynamic_partition.enable" = "true|false",
"dynamic_partition.time_unit" = "HOUR|DAY|WEEK|MONTH",
"dynamic_partition.start" = "${integer_value}",
"dynamic_partition.end" = "${integer_value}",
"dynamic_partition.prefix" = "${string_value}",
"dynamic_partition.buckets" = "${integer_value}
```
dynamic_partition.enable: 用于指定表级别的动态分区功能是否开启。默认为 true。
dynamic_partition.time_unit: 用于指定动态添加分区的时间单位,可选择为HOUR(小时),DAY(天),WEEK(周),MONTH(月)。
注意:以小时为单位的分区列,数据类型不能为 DATE。
@ -375,20 +381,20 @@ under the License.
6) 如果希望使用 内存表 特性,需要在 properties 中指定
```
```
PROPERTIES (
"in_memory"="true"
)
```
```
当 in_memory 属性为 true 时,Doris会尽可能将该表的数据和索引Cache到BE 内存中
7) 创建UNIQUE_KEYS表时,可以指定一个sequence列,当KEY列相同时,将按照sequence列进行REPLACE(较大值替换较小值,否则无法替换)
```
```
PROPERTIES (
"function_column.sequence_type" = 'Date',
);
```
```
sequence_type用来指定sequence列的类型,可以为整型和时间类型
## example
@ -428,8 +434,29 @@ under the License.
"storage_cooldown_time" = "2015-06-04 00:00:00"
);
```
3. 创建一个 olap 表,使用 Hash 分桶,使用列存,相同key的记录进行覆盖,设置初始存储介质和冷却时间
设置远端存储和冷数据存储介质
3. 创建一个 olap 表,使用 Range 分区,使用Hash分桶,默认使用列存,
```
CREATE TABLE example_db.table_hash
(
k1 BIGINT,
k2 LARGEINT,
v1 VARCHAR(2048) REPLACE,
v2 SMALLINT SUM DEFAULT "10"
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD",
"storage_cooldown_time" = "2015-06-04 00:00:00",
"remote_storage_resource" = "remote_s3",
"remote_storage_cooldown_time" = "2015-12-04 00:00:00"
);
```
4. 创建一个 olap 表,使用 Range 分区,使用Hash分桶,默认使用列存,
相同key的记录同时存在,设置初始存储介质和冷却时间
1)LESS THAN
@ -492,7 +519,7 @@ under the License.
);
```
4. 创建一个 olap 表,使用 List 分区,使用Hash分桶,默认使用列存,
5. 创建一个 olap 表,使用 List 分区,使用Hash分桶,默认使用列存,
相同key的记录同时存在,设置初始存储介质和冷却时间
1)单列分区
@ -567,10 +594,10 @@ under the License.
不在这些分区枚举值内的数据将视为非法数据被过滤
5. 创建一个 mysql 表
6. 创建一个 mysql 表
5.1 直接通过外表信息创建mysql表
```
6.1 直接通过外表信息创建mysql表
```
CREATE EXTERNAL TABLE example_db.table_mysql
(
k1 DATE,
@ -589,21 +616,19 @@ under the License.
"database" = "mysql_db_test",
"table" = "mysql_table_test"
)
```
```
5.2 通过External Catalog Resource创建mysql表
```
CREATE EXTERNAL RESOURCE "mysql_resource"
PROPERTIES
(
"type" = "odbc_catalog",
"user" = "mysql_user",
"password" = "mysql_passwd",
"host" = "127.0.0.1",
"port" = "8239"
);
```
```
6.2 通过External Catalog Resource创建mysql表
```
CREATE EXTERNAL RESOURCE "mysql_resource"
PROPERTIES
(
"type" = "odbc_catalog",
"user" = "mysql_user",
"password" = "mysql_passwd",
"host" = "127.0.0.1",
"port" = "8239"
);
CREATE EXTERNAL TABLE example_db.table_mysql
(
k1 DATE,
@ -619,11 +644,11 @@ under the License.
"database" = "mysql_db_test",
"table" = "mysql_table_test"
)
```
```
6. 创建一个数据文件存储在HDFS上的 broker 外部表, 数据使用 "|" 分割,"\n" 换行
7. 创建一个数据文件存储在HDFS上的 broker 外部表, 数据使用 "|" 分割,"\n" 换行
```
```
CREATE EXTERNAL TABLE example_db.table_broker (
k1 DATE,
k2 INT,
@ -642,11 +667,11 @@ under the License.
"username" = "hdfs_user",
"password" = "hdfs_password"
)
```
```
7. 创建一张含有HLL列的表
8. 创建一张含有HLL列的表
```
```
CREATE TABLE example_db.example_table
(
k1 TINYINT,
@ -657,11 +682,11 @@ under the License.
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
```
8. 创建一张含有BITMAP_UNION聚合类型的表(v1和v2列的原始数据类型必须是TINYINT,SMALLINT,INT)
9. 创建一张含有BITMAP_UNION聚合类型的表(v1和v2列的原始数据类型必须是TINYINT,SMALLINT,INT)
```
```
CREATE TABLE example_db.example_table
(
k1 TINYINT,
@ -672,26 +697,26 @@ under the License.
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
```
1. 创建一张含有QUANTILE_UNION聚合类型的表(v1和v2列的原始数据类型必须是数值类型)
10. 创建一张含有QUANTILE_UNION聚合类型的表(v1和v2列的原始数据类型必须是数值类型)
```
CREATE TABLE example_db.example_table
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
v1 QUANTILE_STATE QUANTILE_UNION,
v2 QUANTILE_STATE QUANTILE_UNION
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
```
CREATE TABLE example_db.example_table
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
v1 QUANTILE_STATE QUANTILE_UNION,
v2 QUANTILE_STATE QUANTILE_UNION
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
10. 创建两张支持Colocate Join的表t1 和t2
11. 创建两张支持Colocate Join的表t1 和t2
```
```
CREATE TABLE `t1` (
`id` int(11) COMMENT "",
`value` varchar(8) COMMENT ""
@ -711,11 +736,11 @@ under the License.
PROPERTIES (
"colocate_with" = "t1"
);
```
```
11. 创建一个数据文件存储在BOS上的 broker 外部表
12. 创建一个数据文件存储在BOS上的 broker 外部表
```
```
CREATE EXTERNAL TABLE example_db.table_broker (
k1 DATE
)
@ -729,11 +754,11 @@ under the License.
"bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
"bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
)
```
```
12. 创建一个带有bitmap 索引的表
13. 创建一个带有bitmap 索引的表
```
```
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
@ -746,18 +771,18 @@ under the License.
AGGREGATE KEY(k1, k2)
COMMENT "my first doris table"
DISTRIBUTED BY HASH(k1) BUCKETS 32;
```
```
13. 创建一个动态分区表(需要在FE配置中开启动态分区功能),该表每天提前创建3天的分区,并删除3天前的分区。例如今天为`2020-01-08`,则会创建分区名为`p20200108`, `p20200109`, `p20200110`, `p20200111`的分区. 分区范围分别为:
14. 创建一个动态分区表(需要在FE配置中开启动态分区功能),该表每天提前创建3天的分区,并删除3天前的分区。例如今天为`2020-01-08`,则会创建分区名为`p20200108`, `p20200109`, `p20200110`, `p20200111`的分区. 分区范围分别为:
```
[types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; )
[types: [DATE]; keys: [2020-01-09]; ‥types: [DATE]; keys: [2020-01-10]; )
[types: [DATE]; keys: [2020-01-10]; ‥types: [DATE]; keys: [2020-01-11]; )
[types: [DATE]; keys: [2020-01-11]; ‥types: [DATE]; keys: [2020-01-12]; )
```
```
[types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; )
[types: [DATE]; keys: [2020-01-09]; ‥types: [DATE]; keys: [2020-01-10]; )
[types: [DATE]; keys: [2020-01-10]; ‥types: [DATE]; keys: [2020-01-11]; )
[types: [DATE]; keys: [2020-01-11]; ‥types: [DATE]; keys: [2020-01-12]; )
```
```
```
CREATE TABLE example_db.dynamic_partition
(
k1 DATE,
@ -778,10 +803,10 @@ under the License.
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
);
```
```
14. 创建一个带有rollup索引的表
```
15. 创建一个带有rollup索引的表
```
CREATE TABLE example_db.rollup_index_table
(
event_day DATE,
@ -798,10 +823,10 @@ under the License.
r3(event_day)
)
PROPERTIES("replication_num" = "3");
```
15. 创建一个内存表
```
16. 创建一个内存表
```
```
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
@ -815,11 +840,11 @@ under the License.
COMMENT "my first doris table"
DISTRIBUTED BY HASH(k1) BUCKETS 32
PROPERTIES ("in_memory"="true");
```
```
16. 创建一个hive外部表
17. 创建一个hive外部表
```
```
CREATE TABLE example_db.table_hive
(
k1 TINYINT,
@ -833,11 +858,11 @@ under the License.
"table" = "hive_table_name",
"hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
```
```
17. 通过 replication_allocation 指定表的副本分布
18. 通过 replication_allocation 指定表的副本分布
```
```
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
@ -845,8 +870,8 @@ under the License.
)
DISTRIBUTED BY HASH(k1) BUCKETS 32
PROPERTIES (
"replication_allocation"="tag.location.group_a:1, tag.location.group_b:2"
);
"replication_allocation"="tag.location.group_a:1, tag.location.group_b:2"
);
CREATE TABLE example_db.dynamic_partition
@ -867,11 +892,11 @@ under the License.
"dynamic_partition.buckets" = "32",
"dynamic_partition."replication_allocation" = "tag.location.group_a:3"
);
```
```
17. 创建一个 Iceberg 外表
19. 创建一个 Iceberg 外表
```
```
CREATE TABLE example_db.t_iceberg
ENGINE=ICEBERG
PROPERTIES (
@ -880,7 +905,7 @@ under the License.
"iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);
```
```
## keyword

View File

@ -25,15 +25,20 @@ under the License.
-->
# DROP RESOURCE
## description
## Description
该语句用于删除一个已有的资源。仅 root 或 admin 用户可以删除资源。
语法:
DROP RESOURCE 'resource_name'
## example
注意:正在使用的 ODBC/S3 资源无法删除。
## Example
1. 删除名为 spark0 的 Spark 资源:
DROP RESOURCE 'spark0';
## keyword
DROP, RESOURCE
DROP, RESOURCE

View File

@ -25,30 +25,32 @@ under the License.
-->
# SHOW RESOURCES
## description
## Description
该语句用于展示用户有使用权限的资源。普通用户仅能展示有使用权限的资源,root 或 admin 用户会展示所有的资源。
语法
语法:
SHOW RESOURCES
[
WHERE
[NAME [ = "your_resource_name" | LIKE "name_matcher"]]
[RESOURCETYPE = ["SPARK"]]
[RESOURCETYPE = ["[spark|odbc_catalog|s3]"]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];
说明:
1) 如果使用 NAME LIKE,则会匹配RESOURCESName包含 name_matcherResource
1) 如果使用 NAME LIKE,则会匹配 RESOURCESName 包含 name_matcherResource
2) 如果使用 NAME = ,则精确匹配指定的 Name
3) 如果指定了RESOURCETYPE,则匹配对应的Resrouce类型
3) 如果指定了 RESOURCETYPE,则匹配对应的 Resrouce 类型
4) 可以使用 ORDER BY 对任意列组合进行排序
5) 如果指定了 LIMIT,则显示 limit 条匹配记录。否则全部显示
6) 如果指定了 OFFSET,则从偏移量offset开始显示查询结果。默认情况下偏移量为0。
6) 如果指定了 OFFSET,则从偏移量 offset 开始显示查询结果。默认情况下偏移量为 0。
## Example
## example
1. 展示当前用户拥有权限的所有Resource
SHOW RESOURCES;
@ -60,5 +62,5 @@ under the License.
## keyword
SHOW RESOURCES
SHOW RESOURCES, RESOURCES

View File

@ -34,9 +34,11 @@ public final class FeMetaVersion {
public static final int VERSION_106 = 106;
// support stream load 2PC
public static final int VERSION_107 = 107;
// add storage_cold_medium and remote_storage_resource_name in DataProperty
public static final int VERSION_108 = 108;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_107;
public static final int VERSION_CURRENT = VERSION_108;
// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
// these clause will be useless and we could remove them

View File

@ -858,6 +858,10 @@ alter_stmt ::=
{:
RESULT = new AlterDatabasePropertyStmt(dbName, map);
:}
| KW_ALTER KW_RESOURCE ident_or_text:resourceName opt_properties:properties
{:
RESULT = new AlterResourceStmt(resourceName, properties);
:}
| KW_ALTER KW_ROUTINE KW_LOAD KW_FOR job_label:jobLabel opt_properties:jobProperties
opt_datasource_properties:datasourceProperties
{:

View File

@ -24,6 +24,7 @@ import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.ColumnRenameClause;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.ModifyColumnCommentClause;
@ -51,6 +52,7 @@ import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.View;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
@ -59,6 +61,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
@ -79,6 +82,7 @@ import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -654,15 +658,13 @@ public class Alter {
}
// get value from properties here
// 1. data property
DataProperty newDataProperty = PropertyAnalyzer.analyzeDataProperty(properties, null);
// 2. replica allocation
// 1. replica allocation
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
Catalog.getCurrentSystemInfo().checkReplicaAllocation(db.getClusterName(), replicaAlloc);
// 3. in memory
// 2. in memory
boolean newInMemory = PropertyAnalyzer.analyzeBooleanProp(properties,
PropertyAnalyzer.PROPERTIES_INMEMORY, false);
// 4. tablet type
// 3. tablet type
TTabletType tTabletType =
PropertyAnalyzer.analyzeTabletType(properties);
@ -670,6 +672,23 @@ public class Alter {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (String partitionName : partitionNames) {
Partition partition = olapTable.getPartition(partitionName);
// 4. data property
// 4.1 get old data property from partition
DataProperty dataProperty = partitionInfo.getDataProperty(partition.getId());
// 4.2 combine the old properties with new ones
Map<String, String> newProperties = new HashMap<>();
newProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, dataProperty.getStorageMedium().name());
DateLiteral dateLiteral = new DateLiteral(dataProperty.getCooldownTimeMs(),
TimeUtils.getTimeZone(), Type.DATETIME);
newProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, dateLiteral.getStringValue());
newProperties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE, dataProperty.getRemoteStorageResourceName());
DateLiteral dateLiteral1 = new DateLiteral(dataProperty.getRemoteCooldownTimeMs(),
TimeUtils.getTimeZone(), Type.DATETIME);
newProperties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME, dateLiteral1.getStringValue());
newProperties.putAll(properties);
// 4.3 analyze new properties
DataProperty newDataProperty = PropertyAnalyzer.analyzeDataProperty(newProperties, null);
// 1. date property
if (newDataProperty != null) {
partitionInfo.setDataProperty(partition.getId(), newDataProperty);

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Resource;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import java.util.Map;
public class AlterResourceStmt extends DdlStmt {
private static final String TYPE = "type";
private final String resourceName;
private final Map<String, String> properties;
public AlterResourceStmt(String resourceName, Map<String, String> properties) {
this.resourceName = resourceName;
this.properties = properties;
}
public String getResourceName() {
return resourceName;
}
public Map<String, String> getProperties() {
return properties;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// check auth
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
if (properties == null || properties.isEmpty()) {
throw new AnalysisException("Resource properties can't be null");
}
// check type in properties
if (properties.containsKey(TYPE)) {
throw new AnalysisException("Can not change resource type.");
}
// check resource existence
Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName);
if (resource == null) {
throw new AnalysisException("Unknown resource: " + resourceName);
}
// check properties
resource.checkProperties(properties);
}
@Override
public String toSql() {
StringBuffer sb = new StringBuffer();
sb.append("ALTER RESOURCE '").append(resourceName).append("' ");
sb.append("PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
return sb.toString();
}
}

View File

@ -18,13 +18,11 @@
package org.apache.doris.analysis;
import org.apache.doris.alter.AlterOpType;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -98,10 +96,10 @@ public class ModifyPartitionClause extends AlterTableClause {
// 3. in_memory
// 4. tablet type
private void checkProperties(Map<String, String> properties) throws AnalysisException {
// 1. data property
DataProperty newDataProperty = null;
newDataProperty = PropertyAnalyzer.analyzeDataProperty(properties, DataProperty.DEFAULT_DATA_PROPERTY);
Preconditions.checkNotNull(newDataProperty);
// 1. data property, can not modify partition property remote_storage_resource
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
throw new AnalysisException("Do not support modify partition data property `remote_storage_resource`.");
}
// 2. replica allocation
PropertyAnalyzer.analyzeReplicaAllocation(properties, "");

View File

@ -88,6 +88,8 @@ public class ModifyTablePropertiesClause extends AlterTableClause {
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TABLET_TYPE)) {
throw new AnalysisException("Alter tablet type not supported");
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
throw new AnalysisException("Alter table remote_storage_resource is not supported.");
} else {
throw new AnalysisException("Unknown table property: " + properties.keySet());
}

View File

@ -3722,6 +3722,10 @@ public class Catalog {
boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false);
olapTable.setIsInMemory(isInMemory);
// set remote storage
String resourceName = PropertyAnalyzer.analyzeRemoteStorageResource(properties);
olapTable.setRemoteStorageResource(resourceName);
TTabletType tabletType;
try {
tabletType = PropertyAnalyzer.analyzeTabletType(properties);
@ -4246,6 +4250,13 @@ public class Catalog {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).append("\" = \"");
sb.append(olapTable.getStorageFormat()).append("\"");
// remote storage resource
String remoteStorageResource = olapTable.getRemoteStorageResource();
if (!Strings.isNullOrEmpty(remoteStorageResource)) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE).append("\" = \"");
sb.append(remoteStorageResource).append("\"");
}
sb.append("\n)");
} else if (table.getType() == TableType.MYSQL) {
MysqlTable mysqlTable = (MysqlTable) table;

View File

@ -18,10 +18,13 @@
package org.apache.doris.catalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TStorageMedium;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
@ -30,13 +33,18 @@ import java.io.IOException;
public class DataProperty implements Writable {
public static final DataProperty DEFAULT_DATA_PROPERTY = new DataProperty(
"SSD".equalsIgnoreCase(Config.default_storage_medium) ? TStorageMedium.SSD : TStorageMedium.HDD);
"SSD".equalsIgnoreCase(Config.default_storage_medium) ? TStorageMedium.SSD : TStorageMedium.HDD
);
public static final long MAX_COOLDOWN_TIME_MS = 253402271999000L; // 9999-12-31 23:59:59
@SerializedName(value = "storageMedium")
private TStorageMedium storageMedium;
@SerializedName(value = "cooldownTimeMs")
private long cooldownTimeMs;
@SerializedName(value = "remoteStorageResourceName")
private String remoteStorageResourceName;
@SerializedName(value = "remoteCooldownTimeMs")
private long remoteCooldownTimeMs;
private DataProperty() {
// for persist
@ -50,11 +58,16 @@ public class DataProperty implements Writable {
} else {
this.cooldownTimeMs = MAX_COOLDOWN_TIME_MS;
}
this.remoteStorageResourceName = "";
this.remoteCooldownTimeMs = MAX_COOLDOWN_TIME_MS;
}
public DataProperty(TStorageMedium medium, long cooldown) {
public DataProperty(TStorageMedium medium, long cooldown,
String remoteStorageResourceName, long remoteCooldownTimeMs) {
this.storageMedium = medium;
this.cooldownTimeMs = cooldown;
this.remoteStorageResourceName = remoteStorageResourceName;
this.remoteCooldownTimeMs = remoteCooldownTimeMs;
}
public TStorageMedium getStorageMedium() {
@ -65,7 +78,19 @@ public class DataProperty implements Writable {
return cooldownTimeMs;
}
public long getRemoteCooldownTimeMs() {
return remoteCooldownTimeMs;
}
public String getRemoteStorageResourceName() {
return remoteStorageResourceName;
}
public static DataProperty read(DataInput in) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_108) {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, DataProperty.class);
}
DataProperty dataProperty = new DataProperty();
dataProperty.readFields(in);
return dataProperty;
@ -73,13 +98,15 @@ public class DataProperty implements Writable {
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, storageMedium.name());
out.writeLong(cooldownTimeMs);
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public void readFields(DataInput in) throws IOException {
storageMedium = TStorageMedium.valueOf(Text.readString(in));
cooldownTimeMs = in.readLong();
remoteStorageResourceName = "";
remoteCooldownTimeMs = MAX_COOLDOWN_TIME_MS;
}
@Override
@ -95,14 +122,18 @@ public class DataProperty implements Writable {
DataProperty other = (DataProperty) obj;
return this.storageMedium == other.storageMedium
&& this.cooldownTimeMs == other.cooldownTimeMs;
&& this.cooldownTimeMs == other.cooldownTimeMs
&& this.remoteCooldownTimeMs == other.remoteCooldownTimeMs
&& this.remoteStorageResourceName.equals(other.remoteStorageResourceName);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Storage medium[").append(this.storageMedium).append("]. ");
sb.append("cool down[").append(TimeUtils.longToTimeString(cooldownTimeMs)).append("].");
sb.append("cool down[").append(TimeUtils.longToTimeString(cooldownTimeMs)).append("]. ");
sb.append("remote storage resource name[").append(this.remoteStorageResourceName).append("]. ");
sb.append("remote cool down[").append(TimeUtils.longToTimeString(remoteCooldownTimeMs)).append("].");
return sb.toString();
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.proc.BaseProcResult;
@ -93,6 +94,33 @@ public class OdbcCatalogResource extends Resource {
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
// modify properties
replaceIfEffectiveValue(this.configs, HOST, properties.get(HOST));
replaceIfEffectiveValue(this.configs, PORT, properties.get(PORT));
replaceIfEffectiveValue(this.configs, USER, properties.get(USER));
replaceIfEffectiveValue(this.configs, PASSWORD, properties.get(PASSWORD));
replaceIfEffectiveValue(this.configs, TYPE, properties.get(TYPE));
replaceIfEffectiveValue(this.configs, DRIVER, properties.get(DRIVER));
}
@Override
public void checkProperties(Map<String, String> properties) throws AnalysisException {
Map<String, String> copiedProperties = Maps.newHashMap(properties);
// check properties
copiedProperties.remove(HOST);
copiedProperties.remove(PORT);
copiedProperties.remove(USER);
copiedProperties.remove(PASSWORD);
copiedProperties.remove(TYPE);
copiedProperties.remove(DRIVER);
if (!copiedProperties.isEmpty()) {
throw new AnalysisException("Unknown ODBC catalog resource properties: " + copiedProperties);
}
}
public String getProperties(String propertiesKey) {
// check the properties key
String value = configs.get(propertiesKey);

View File

@ -1536,6 +1536,14 @@ public class OlapTable extends Table {
tableProperty.buildDataSortInfo();
}
public void setRemoteStorageResource(String resourceName) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty.setRemoteStorageResource(resourceName);
tableProperty.buildRemoteStorageResource();
}
// return true if partition with given name already exist, both in partitions and temp partitions.
// return false otherwise
public boolean checkPartitionNameExist(String partitionName) {
@ -1688,6 +1696,13 @@ public class OlapTable extends Table {
return tableProperty.getDataSortInfo();
}
public String getRemoteStorageResource() {
if (tableProperty == null) {
return "";
}
return tableProperty.getRemoteStorageResource();
}
// For non partitioned table:
// The table's distribute hash columns need to be a subset of the aggregate columns.
//

View File

@ -17,7 +17,9 @@
package org.apache.doris.catalog;
import com.google.common.base.Strings;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.io.DeepCopy;
@ -42,7 +44,8 @@ public abstract class Resource implements Writable {
public enum ResourceType {
UNKNOWN,
SPARK,
ODBC_CATALOG;
ODBC_CATALOG,
S3;
public static ResourceType fromString(String resourceType) {
for (ResourceType type : ResourceType.values()) {
@ -68,20 +71,35 @@ public abstract class Resource implements Writable {
}
public static Resource fromStmt(CreateResourceStmt stmt) throws DdlException {
Resource resource = null;
ResourceType type = stmt.getResourceType();
Resource resource = getResourceInstance(stmt.getResourceType(), stmt.getResourceName());
resource.setProperties(stmt.getProperties());
return resource;
}
/**
* Get resource instance by resource name and type
* @param type
* @param name
* @return
* @throws DdlException
*/
private static Resource getResourceInstance(ResourceType type, String name) throws DdlException {
Resource resource;
switch (type) {
case SPARK:
resource = new SparkResource(stmt.getResourceName());
resource = new SparkResource(name);
break;
case ODBC_CATALOG:
resource = new OdbcCatalogResource(stmt.getResourceName());
resource = new OdbcCatalogResource(name);
break;
case S3:
resource = new S3Resource(name);
break;
default:
throw new DdlException("Only support Spark resource.");
throw new DdlException("Unknown resource type: " + type);
}
resource.setProperties(stmt.getProperties());
return resource;
}
@ -93,6 +111,26 @@ public abstract class Resource implements Writable {
return type;
}
/**
* Modify properties in child resources
* @param properties
* @throws DdlException
*/
public abstract void modifyProperties(Map<String, String> properties) throws DdlException;
/**
* Check properties in child resources
* @param properties
* @throws AnalysisException
*/
public abstract void checkProperties(Map<String, String> properties) throws AnalysisException;
protected void replaceIfEffectiveValue(Map<String, String> properties, String key, String value) {
if (!Strings.isNullOrEmpty(value)) {
properties.put(key, value);
}
}
/**
* Set and check the properties in child resources
*/

View File

@ -17,6 +17,7 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.DropResourceStmt;
import org.apache.doris.catalog.Resource.ResourceType;
@ -42,8 +43,10 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import com.google.gson.annotations.SerializedName;
@ -69,14 +72,16 @@ public class ResourceMgr implements Writable {
}
public void createResource(CreateResourceStmt stmt) throws DdlException {
if (stmt.getResourceType() != ResourceType.SPARK && stmt.getResourceType() != ResourceType.ODBC_CATALOG) {
throw new DdlException("Only support SPARK and ODBC_CATALOG resource.");
if (stmt.getResourceType() != ResourceType.SPARK
&& stmt.getResourceType() != ResourceType.ODBC_CATALOG
&& stmt.getResourceType() != ResourceType.S3) {
throw new DdlException("Only support SPARK, ODBC_CATALOG and REMOTE_STORAGE resource.");
}
Resource resource = Resource.fromStmt(stmt);
createResource(resource);
// log add
Catalog.getCurrentCatalog().getEditLog().logCreateResource(resource);
LOG.info("create resource success. resource: {}", resource);
LOG.info("Create resource success. Resource: {}", resource);
}
public void createResource(Resource resource) throws DdlException {
@ -91,14 +96,46 @@ public class ResourceMgr implements Writable {
}
public void dropResource(DropResourceStmt stmt) throws DdlException {
String name = stmt.getResourceName();
if (nameToResource.remove(name) == null) {
throw new DdlException("Resource(" + name + ") does not exist");
String resourceName = stmt.getResourceName();
if (!nameToResource.containsKey(resourceName)) {
throw new DdlException("Resource(" + resourceName + ") does not exist");
}
// Check whether the resource is in use before deleting it, except spark resource
List<String> usedTables = new ArrayList<>();
List<Long> dbIds = Catalog.getCurrentCatalog().getDbIds();
for (Long dbId : dbIds) {
Optional<Database> database = Catalog.getCurrentCatalog().getDb(dbId);
database.ifPresent(db -> {
List<Table> tables = db.getTablesOnIdOrder();
for (Table table : tables) {
if (table instanceof OdbcTable) {
// odbc resource
if (resourceName.equals(((OdbcTable) table).getOdbcCatalogResourceName())) {
usedTables.add(db.getFullName() + "." + table.getName());
}
} else if (table instanceof OlapTable) {
// remote resource, such as s3 resource
PartitionInfo partitionInfo = ((OlapTable) table).getPartitionInfo();
List<Long> partitionIds = ((OlapTable) table).getPartitionIds();
for (Long partitionId : partitionIds) {
DataProperty dataProperty = partitionInfo.getDataProperty(partitionId);
if (resourceName.equals(dataProperty.getRemoteStorageResourceName())) {
usedTables.add(db.getFullName() + "." + table.getName());
break;
}
}
}
}
});
}
if (usedTables.size() > 0) {
LOG.warn("Can not drop resource, since it's used in tables {}", usedTables);
throw new DdlException("Can not drop resource, since it's used in tables " + usedTables);
}
nameToResource.remove(resourceName);
// log drop
Catalog.getCurrentCatalog().getEditLog().logDropResource(new DropResourceOperationLog(name));
LOG.info("drop resource success. resource name: {}", name);
Catalog.getCurrentCatalog().getEditLog().logDropResource(new DropResourceOperationLog(resourceName));
LOG.info("Drop resource success. Resource resourceName: {}", resourceName);
}
// Drop resource whether successful or not
@ -114,6 +151,26 @@ public class ResourceMgr implements Writable {
nameToResource.remove(operationLog.getName());
}
public void alterResource(AlterResourceStmt stmt) throws DdlException {
String resourceName = stmt.getResourceName();
Map<String, String> properties = stmt.getProperties();
if (!nameToResource.containsKey(resourceName)) {
throw new DdlException("Resource(" + resourceName + ") dose not exist.");
}
Resource resource = nameToResource.get(resourceName);
resource.modifyProperties(properties);
// log alter
Catalog.getCurrentCatalog().getEditLog().logAlterResource(resource);
LOG.info("Alter resource success. Resource: {}", resource);
}
public void replayAlterResource(Resource resource) {
nameToResource.put(resource.getName(), resource);
}
public boolean containsResource(String name) {
return nameToResource.containsKey(name);
}

View File

@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.proc.BaseProcResult;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.util.Map;
/**
* S3 resource for olap table
*
* Syntax:
* CREATE RESOURCE "remote_s3"
* PROPERTIES
* (
* "type" = "s3",
* "s3_endpoint" = "bj",
* "s3_region" = "bj",
* "s3_root_path" = "/path/to/root",
* "s3_access_key" = "bbb",
* "s3_secret_key" = "aaaa",
* "s3_max_connections" = "50",
* "s3_request_timeout_ms" = "3000",
* "s3_connection_timeout_ms" = "1000"
* );
*/
public class S3Resource extends Resource {
// required
private static final String S3_ENDPOINT = "s3_endpoint";
private static final String S3_REGION = "s3_region";
private static final String S3_ROOT_PATH = "s3_root_path";
private static final String S3_ACCESS_KEY = "s3_access_key";
private static final String S3_SECRET_KEY = "s3_secret_key";
// optional
private static final String S3_MAX_CONNECTIONS = "s3_max_connections";
private static final String S3_REQUEST_TIMEOUT_MS = "s3_request_timeout_ms";
private static final String S3_CONNECTION_TIMEOUT_MS = "s3_connection_timeout_ms";
private static final String DEFAULT_S3_MAX_CONNECTIONS = "50";
private static final String DEFAULT_S3_REQUEST_TIMEOUT_MS = "3000";
private static final String DEFAULT_S3_CONNECTION_TIMEOUT_MS = "1000";
@SerializedName(value = "properties")
private Map<String, String> properties;
public S3Resource(String name) {
this(name, Maps.newHashMap());
}
public S3Resource(String name, Map<String, String> properties) {
super(name, ResourceType.S3);
this.properties = properties;
}
public String getProperty(String propertyKey) {
return properties.get(propertyKey);
}
@Override
protected void setProperties(Map<String, String> properties) throws DdlException {
Preconditions.checkState(properties != null);
this.properties = properties;
// check properties
// required
checkRequiredProperty(S3_ENDPOINT);
checkRequiredProperty(S3_REGION);
checkRequiredProperty(S3_ROOT_PATH);
checkRequiredProperty(S3_ACCESS_KEY);
checkRequiredProperty(S3_SECRET_KEY);
// optional
checkOptionalProperty(S3_MAX_CONNECTIONS, DEFAULT_S3_MAX_CONNECTIONS);
checkOptionalProperty(S3_REQUEST_TIMEOUT_MS, DEFAULT_S3_REQUEST_TIMEOUT_MS);
checkOptionalProperty(S3_CONNECTION_TIMEOUT_MS, DEFAULT_S3_CONNECTION_TIMEOUT_MS);
}
private void checkRequiredProperty(String propertyKey) throws DdlException {
String value = properties.get(propertyKey);
if (Strings.isNullOrEmpty(value)) {
throw new DdlException("Missing [" + propertyKey + "] in properties.");
}
}
private void checkOptionalProperty(String propertyKey, String defaultValue) {
this.properties.putIfAbsent(propertyKey, defaultValue);
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
// modify properties
replaceIfEffectiveValue(this.properties, S3_ENDPOINT, properties.get(S3_ENDPOINT));
replaceIfEffectiveValue(this.properties, S3_REGION, properties.get(S3_REGION));
replaceIfEffectiveValue(this.properties, S3_ROOT_PATH, properties.get(S3_ROOT_PATH));
replaceIfEffectiveValue(this.properties, S3_ACCESS_KEY, properties.get(S3_ACCESS_KEY));
replaceIfEffectiveValue(this.properties, S3_SECRET_KEY, properties.get(S3_SECRET_KEY));
replaceIfEffectiveValue(this.properties, S3_MAX_CONNECTIONS, properties.get(S3_MAX_CONNECTIONS));
replaceIfEffectiveValue(this.properties, S3_REQUEST_TIMEOUT_MS, properties.get(S3_REQUEST_TIMEOUT_MS));
replaceIfEffectiveValue(this.properties, S3_CONNECTION_TIMEOUT_MS, properties.get(S3_CONNECTION_TIMEOUT_MS));
}
@Override
public void checkProperties(Map<String, String> properties) throws AnalysisException {
// check properties
Map<String, String> copiedProperties = Maps.newHashMap(properties);
copiedProperties.remove(S3_ENDPOINT);
copiedProperties.remove(S3_REGION);
copiedProperties.remove(S3_ROOT_PATH);
copiedProperties.remove(S3_ACCESS_KEY);
copiedProperties.remove(S3_SECRET_KEY);
copiedProperties.remove(S3_MAX_CONNECTIONS);
copiedProperties.remove(S3_REQUEST_TIMEOUT_MS);
copiedProperties.remove(S3_CONNECTION_TIMEOUT_MS);
if (!copiedProperties.isEmpty()) {
throw new AnalysisException("Unknown S3 resource properties: " + copiedProperties);
}
}
@Override
protected void getProcNodeData(BaseProcResult result) {
String lowerCaseType = type.name().toLowerCase();
for (Map.Entry<String, String> entry : properties.entrySet()) {
// it's dangerous to show password in show odbc resource,
// so we use empty string to replace the real password
if (entry.getKey().equals(S3_ACCESS_KEY)) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), ""));
} else {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
@ -206,6 +207,11 @@ public class SparkResource extends Resource {
return;
}
// update properties
updateProperties(properties);
}
private void updateProperties(Map<String, String> properties) throws DdlException {
// update spark configs
if (properties.containsKey(SPARK_MASTER)) {
throw new DdlException("Cannot change spark master");
@ -291,6 +297,24 @@ public class SparkResource extends Resource {
return brokerProperties;
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
updateProperties(properties);
}
@Override
public void checkProperties(Map<String, String> properties) throws AnalysisException {
Map<String, String> copiedProperties = Maps.newHashMap(properties);
copiedProperties.keySet().removeAll(getSparkConfig(properties).keySet());
copiedProperties.keySet().removeAll(getBrokerProperties(properties).keySet());
copiedProperties.remove(BROKER);
copiedProperties.remove(WORKING_DIR);
if (!copiedProperties.isEmpty()) {
throw new AnalysisException("Unknown spark resource properties: " + copiedProperties);
}
}
@Override
protected void getProcNodeData(BaseProcResult result) {
String lowerCaseType = type.name().toLowerCase();

View File

@ -72,6 +72,9 @@ public class TableProperty implements Writable {
private DataSortInfo dataSortInfo = new DataSortInfo();
// remote storage resource, for cold data
private String remoteStorageResource;
public TableProperty(Map<String, String> properties) {
this.properties = properties;
}
@ -159,6 +162,11 @@ public class TableProperty implements Writable {
return this;
}
public TableProperty buildRemoteStorageResource() {
remoteStorageResource = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE, "");
return this;
}
public void modifyTableProperties(Map<String, String> modifyProperties) {
properties.putAll(modifyProperties);
removeDuplicateReplicaNumProperty();
@ -176,6 +184,12 @@ public class TableProperty implements Writable {
replicaAlloc.toCreateStmt());
}
public void setRemoteStorageResource(String resourceName) {
this.remoteStorageResource = resourceName;
properties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE,
resourceName);
}
public ReplicaAllocation getReplicaAllocation() {
return replicaAlloc;
}
@ -214,6 +228,10 @@ public class TableProperty implements Writable {
return dataSortInfo;
}
public String getRemoteStorageResource() {
return remoteStorageResource;
}
public void buildReplicaAllocation() {
try {
// Must copy the properties because "analyzeReplicaAllocation" with remove the property
@ -237,7 +255,8 @@ public class TableProperty implements Writable {
.executeBuildDynamicProperty()
.buildInMemory()
.buildStorageFormat()
.buildDataSortInfo();
.buildDataSortInfo()
.buildRemoteStorageResource();
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) {
// get replica num from property map and create replica allocation
String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);

View File

@ -242,7 +242,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.SSD.name());
String cooldownTime = DynamicPartitionUtil.getPartitionRangeString(property, now, offset + hotPartitionNum,
DynamicPartitionUtil.DATETIME_FORMAT);
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COLDOWN_TIME, cooldownTime);
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, cooldownTime);
}
private Range<PartitionKey> getClosedRange(Database db, OlapTable olapTable, Column partitionColumn, String partitionFormat,

View File

@ -67,6 +67,7 @@ public class PartitionsProcDir implements ProcDirInterface {
.add("VisibleVersion").add("VisibleVersionTime")
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime")
.add("RemoteStorageResource").add("RemoteStorageCooldownTime")
.add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation")
.build();
@ -269,6 +270,8 @@ public class PartitionsProcDir implements ProcDirInterface {
DataProperty dataProperty = tblPartitionInfo.getDataProperty(partitionId);
partitionInfo.add(dataProperty.getStorageMedium().name());
partitionInfo.add(TimeUtils.longToTimeString(dataProperty.getCooldownTimeMs()));
partitionInfo.add(dataProperty.getRemoteStorageResourceName());
partitionInfo.add(TimeUtils.longToTimeString(dataProperty.getRemoteCooldownTimeMs()));
partitionInfo.add(TimeUtils.longToTimeString(partition.getLastCheckTime()));

View File

@ -19,12 +19,14 @@ package org.apache.doris.common.util;
import org.apache.doris.analysis.DataSortInfo;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
@ -56,7 +58,8 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_REPLICATION_ALLOCATION = "replication_allocation";
public static final String PROPERTIES_STORAGE_TYPE = "storage_type";
public static final String PROPERTIES_STORAGE_MEDIUM = "storage_medium";
public static final String PROPERTIES_STORAGE_COLDOWN_TIME = "storage_cooldown_time";
public static final String PROPERTIES_STORAGE_COOLDOWN_TIME = "storage_cooldown_time";
public static final String PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME = "remote_storage_cooldown_time";
// for 1.x -> 2.x migration
public static final String PROPERTIES_VERSION_INFO = "version_info";
// for restore
@ -85,6 +88,8 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_INMEMORY = "in_memory";
public static final String PROPERTIES_REMOTE_STORAGE_RESOURCE = "remote_storage_resource";
public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
public static final String PROPERTIES_STRICT_RANGE = "strict_range";
@ -105,15 +110,19 @@ public class PropertyAnalyzer {
public static DataProperty analyzeDataProperty(Map<String, String> properties, DataProperty oldDataProperty)
throws AnalysisException {
if (properties == null) {
if (properties == null || properties.isEmpty()) {
return oldDataProperty;
}
TStorageMedium storageMedium = null;
long coolDownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
long cooldownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
String remoteStorageResourceName = "";
long remoteCooldownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
boolean hasMedium = false;
boolean hasCooldown = false;
boolean hasRemoteStorageResource = false;
boolean hasRemoteCooldown = false;
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
@ -126,42 +135,83 @@ public class PropertyAnalyzer {
} else {
throw new AnalysisException("Invalid storage medium: " + value);
}
} else if (!hasCooldown && key.equalsIgnoreCase(PROPERTIES_STORAGE_COLDOWN_TIME)) {
hasCooldown = true;
} else if (!hasCooldown && key.equalsIgnoreCase(PROPERTIES_STORAGE_COOLDOWN_TIME)) {
DateLiteral dateLiteral = new DateLiteral(value, Type.DATETIME);
coolDownTimeStamp = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
cooldownTimeStamp = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
if (cooldownTimeStamp != DataProperty.MAX_COOLDOWN_TIME_MS) {
hasCooldown = true;
}
} else if (!hasRemoteStorageResource && key.equalsIgnoreCase(PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
if (!Strings.isNullOrEmpty(value)) {
hasRemoteStorageResource = true;
remoteStorageResourceName = value;
}
} else if (!hasRemoteCooldown && key.equalsIgnoreCase(PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME)) {
DateLiteral dateLiteral = new DateLiteral(value, Type.DATETIME);
remoteCooldownTimeStamp = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
if (remoteCooldownTimeStamp != DataProperty.MAX_COOLDOWN_TIME_MS) {
hasRemoteCooldown = true;
}
}
} // end for properties
// Check properties
if (!hasCooldown && !hasMedium) {
return oldDataProperty;
}
properties.remove(PROPERTIES_STORAGE_MEDIUM);
properties.remove(PROPERTIES_STORAGE_COLDOWN_TIME);
properties.remove(PROPERTIES_STORAGE_COOLDOWN_TIME);
properties.remove(PROPERTIES_REMOTE_STORAGE_RESOURCE);
properties.remove(PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME);
if (hasCooldown && !hasMedium) {
throw new AnalysisException("Invalid data property. storage medium property is not found");
}
if (storageMedium == TStorageMedium.HDD && hasCooldown) {
throw new AnalysisException("Can not assign cooldown timestamp to HDD storage medium");
cooldownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
LOG.info("Can not assign cool down timestamp to HDD storage medium, ignore user setting.");
hasCooldown = false;
}
long currentTimeMs = System.currentTimeMillis();
if (storageMedium == TStorageMedium.SSD && hasCooldown) {
if (coolDownTimeStamp <= currentTimeMs) {
throw new AnalysisException("Cooldown time should later than now");
if (cooldownTimeStamp <= currentTimeMs) {
throw new AnalysisException("Cool down time should later than now");
}
}
if (storageMedium == TStorageMedium.SSD && !hasCooldown) {
// set default cooldown time
coolDownTimeStamp = currentTimeMs + Config.storage_cooldown_second * 1000L;
cooldownTimeStamp = currentTimeMs + Config.storage_cooldown_second * 1000L;
}
// check remote_storage_resource and remote_storage_cooldown_time
if ((!hasRemoteCooldown && hasRemoteStorageResource) || (hasRemoteCooldown && !hasRemoteStorageResource)) {
throw new AnalysisException("Invalid data property, " +
"`remote_storage_resource` and `remote_storage_cooldown_time` must be used together.");
}
if (hasRemoteStorageResource && hasRemoteCooldown) {
// check remote resource
Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(remoteStorageResourceName);
if (resource == null) {
throw new AnalysisException("Invalid data property, " +
"`remote_storage_resource` [" + remoteStorageResourceName + "] dose not exist.");
}
// check remote storage cool down timestamp
if (remoteCooldownTimeStamp <= currentTimeMs) {
throw new AnalysisException("Remote storage cool down time should later than now");
}
if (hasCooldown && (remoteCooldownTimeStamp <= cooldownTimeStamp)) {
throw new AnalysisException("`remote_storage_cooldown_time` should later than `storage_cooldown_time`.");
}
}
Preconditions.checkNotNull(storageMedium);
return new DataProperty(storageMedium, coolDownTimeStamp);
return new DataProperty(storageMedium, cooldownTimeStamp, remoteStorageResourceName, remoteCooldownTimeStamp);
}
public static short analyzeShortKeyColumnCount(Map<String, String> properties) throws AnalysisException {
@ -421,6 +471,21 @@ public class PropertyAnalyzer {
return defaultVal;
}
// analyze remote storage resource
public static String analyzeRemoteStorageResource(Map<String, String> properties) throws AnalysisException {
String resourceName = "";
if (properties != null && properties.containsKey(PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
resourceName = properties.get(PROPERTIES_REMOTE_STORAGE_RESOURCE);
// check resource existence
Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName);
if (resource == null) {
throw new AnalysisException("Resource does not exist, name: " + resourceName);
}
}
return resourceName;
}
// analyze property like : "type" = "xxx";
public static String analyzeType(Map<String, String> properties) throws AnalysisException {
String type = null;

View File

@ -533,7 +533,8 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_CREATE_RESOURCE: {
case OperationType.OP_CREATE_RESOURCE:
case OperationType.OP_ALTER_RESOURCE:{
data = Resource.read(in);
isRead = true;
break;

View File

@ -682,6 +682,11 @@ public class EditLog {
catalog.getResourceMgr().replayDropResource(operationLog);
break;
}
case OperationType.OP_ALTER_RESOURCE: {
final Resource resource = (Resource) journal.getData();
catalog.getResourceMgr().replayAlterResource(resource);
break;
}
case OperationType.OP_CREATE_SMALL_FILE: {
SmallFile smallFile = (SmallFile) journal.getData();
catalog.getSmallFileMgr().replayCreateFile(smallFile);
@ -1311,6 +1316,10 @@ public class EditLog {
logEdit(OperationType.OP_DROP_RESOURCE, operationLog);
}
public void logAlterResource(Resource resource) {
logEdit(OperationType.OP_ALTER_RESOURCE, resource);
}
public void logCreateSmallFile(SmallFile info) {
logEdit(OperationType.OP_CREATE_SMALL_FILE, info);
}

View File

@ -207,6 +207,7 @@ public class OperationType {
// resource 276~290
public static final short OP_CREATE_RESOURCE = 276;
public static final short OP_DROP_RESOURCE = 277;
public static final short OP_ALTER_RESOURCE = 278;
// alter external table
public static final short OP_ALTER_EXTERNAL_TABLE_SCHEMA = 280;

View File

@ -27,6 +27,7 @@ import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.OdbcCatalogResource;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.StructType;
@ -109,7 +110,8 @@ public class GsonUtils {
private static RuntimeTypeAdapterFactory<Resource> resourceTypeAdapterFactory = RuntimeTypeAdapterFactory
.of(Resource.class, "clazz")
.registerSubtype(SparkResource.class, SparkResource.class.getSimpleName())
.registerSubtype(OdbcCatalogResource.class, OdbcCatalogResource.class.getSimpleName());
.registerSubtype(OdbcCatalogResource.class, OdbcCatalogResource.class.getSimpleName())
.registerSubtype(S3Resource.class, S3Resource.class.getSimpleName());
// runtime adapter for class "AlterJobV2"
private static RuntimeTypeAdapterFactory<AlterJobV2> alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory

View File

@ -31,6 +31,7 @@ import org.apache.doris.analysis.AlterColumnStatsStmt;
import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.AlterSqlBlockRuleStmt;
import org.apache.doris.analysis.AlterSystemStmt;
@ -299,6 +300,8 @@ public class DdlExecutor {
catalog.getRefreshManager().handleRefreshTable((RefreshTableStmt) ddlStmt);
} else if (ddlStmt instanceof RefreshDbStmt) {
catalog.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt);
} else if (ddlStmt instanceof AlterResourceStmt) {
catalog.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
} else {
throw new DdlException("Unknown statement.");
}

View File

@ -19,8 +19,10 @@ package org.apache.doris.alter;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DropResourceStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
@ -36,6 +38,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
@ -132,7 +135,7 @@ public class AlterTest {
" 'replication_num' = '1',\n" +
" 'in_memory' = 'false',\n" +
" 'storage_medium' = 'SSD',\n" +
" 'storage_cooldown_time' = '9999-12-31 00:00:00'\n" +
" 'storage_cooldown_time' = '2999-12-31 00:00:00'\n" +
");");
createTable("CREATE TABLE test.tbl5\n" +
@ -167,6 +170,59 @@ public class AlterTest {
"\"driver\" = \"Oracle Driver\",\n" +
"\"odbc_type\" = \"oracle\"\n" +
");");
// s3 resource
createRemoteStorageResource("create resource \"remote_s3\"\n" +
"properties\n" +
"(\n" +
" \"type\" = \"s3\", \n" +
" \"s3_endpoint\" = \"bj\",\n" +
" \"s3_region\" = \"bj\",\n" +
" \"s3_root_path\" = \"/path/to/root\",\n" +
" \"s3_access_key\" = \"bbb\",\n" +
" \"s3_secret_key\" = \"aaaa\",\n" +
" \"s3_max_connections\" = \"50\",\n" +
" \"s3_request_timeout_ms\" = \"3000\",\n" +
" \"s3_connection_timeout_ms\" = \"1000\"\n" +
");");
createRemoteStorageResource("create resource \"remote_s3_1\"\n" +
"properties\n" +
"(\n" +
" \"type\" = \"s3\", \n" +
" \"s3_endpoint\" = \"bj\",\n" +
" \"s3_region\" = \"bj\",\n" +
" \"s3_root_path\" = \"/path/to/root\",\n" +
" \"s3_access_key\" = \"bbb\",\n" +
" \"s3_secret_key\" = \"aaaa\",\n" +
" \"s3_max_connections\" = \"50\",\n" +
" \"s3_request_timeout_ms\" = \"3000\",\n" +
" \"s3_connection_timeout_ms\" = \"1000\"\n" +
");");
createTable("CREATE TABLE test.tbl_remote\n" +
"(\n" +
" k1 date,\n" +
" k2 int,\n" +
" v1 int sum\n" +
")\n" +
"PARTITION BY RANGE(k1)\n" +
"(\n" +
" PARTITION p1 values less than('2020-02-01'),\n" +
" PARTITION p2 values less than('2020-03-01'),\n" +
" PARTITION p3 values less than('2020-04-01'),\n" +
" PARTITION p4 values less than('2020-05-01')\n" +
")\n" +
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
"PROPERTIES" +
"(" +
" 'replication_num' = '1',\n" +
" 'in_memory' = 'false',\n" +
" 'storage_medium' = 'SSD',\n" +
" 'storage_cooldown_time' = '2122-04-01 20:24:00',\n" +
" 'remote_storage_resource' = 'remote_s3',\n" +
" 'remote_storage_cooldown_time' = '2122-12-01 20:23:00'" +
");");
}
@AfterClass
@ -180,6 +236,11 @@ public class AlterTest {
Catalog.getCurrentCatalog().createTable(createTableStmt);
}
private static void createRemoteStorageResource(String sql) throws Exception {
CreateResourceStmt stmt = (CreateResourceStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Catalog.getCurrentCatalog().getResourceMgr().createResource(stmt);
}
private static void alterTable(String sql, boolean expectedException) throws Exception {
try {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
@ -383,21 +444,37 @@ public class AlterTest {
}
Assert.assertEquals(false, tbl4.getPartitionInfo().getIsInMemory(p4.getId()));
// batch update storage_medium and storage_cool_down properties
stmt = "alter table test.tbl4 modify partition (p2, p3, p4) set ('storage_medium' = 'HDD')";
DateLiteral dateLiteral = new DateLiteral("9999-12-31 00:00:00", Type.DATETIME);
long coolDownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, coolDownTimeMs);
partitionList = Lists.newArrayList(p2, p3, p4);
// batch update storage_medium and storage_cooldown properties
// alter storage_medium
stmt = "alter table test.tbl4 modify partition (p3, p4) set ('storage_medium' = 'HDD')";
DateLiteral dateLiteral = new DateLiteral("2999-12-31 00:00:00", Type.DATETIME);
long cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
partitionList = Lists.newArrayList(p3, p4);
for (Partition partition : partitionList) {
Assert.assertEquals(oldDataProperty, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
}
alterTable(stmt, false);
DataProperty newDataProperty = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS);
DataProperty newDataProperty = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "", DataProperty.MAX_COOLDOWN_TIME_MS);
for (Partition partition : partitionList) {
Assert.assertEquals(newDataProperty, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
}
Assert.assertEquals(oldDataProperty, tbl4.getPartitionInfo().getDataProperty(p1.getId()));
Assert.assertEquals(oldDataProperty, tbl4.getPartitionInfo().getDataProperty(p2.getId()));
// alter cooldown_time
stmt = "alter table test.tbl4 modify partition (p1, p2) set ('storage_cooldown_time' = '2100-12-31 00:00:00')";
alterTable(stmt, false);
dateLiteral = new DateLiteral("2100-12-31 00:00:00", Type.DATETIME);
cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DataProperty newDataProperty1 = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
partitionList = Lists.newArrayList(p1, p2);
for (Partition partition : partitionList) {
Assert.assertEquals(newDataProperty1, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
}
Assert.assertEquals(newDataProperty, tbl4.getPartitionInfo().getDataProperty(p3.getId()));
Assert.assertEquals(newDataProperty, tbl4.getPartitionInfo().getDataProperty(p4.getId()));
// batch update range partitions' properties with *
stmt = "alter table test.tbl4 modify partition (*) set ('replication_num' = '1')";
@ -408,6 +485,75 @@ public class AlterTest {
}
}
@Test
public void testAlterRemoteStorageTableDataProperties() throws Exception {
Database db = Catalog.getCurrentCatalog().getDbOrMetaException("default_cluster:test");
OlapTable tblRemote = (OlapTable) db.getTableOrMetaException("tbl_remote");
Partition p1 = tblRemote.getPartition("p1");
Partition p2 = tblRemote.getPartition("p2");
Partition p3 = tblRemote.getPartition("p3");
Partition p4 = tblRemote.getPartition("p4");
DateLiteral dateLiteral = new DateLiteral("2122-04-01 20:24:00", Type.DATETIME);
long cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DateLiteral dateLiteral1 = new DateLiteral("2122-12-01 20:23:00", Type.DATETIME);
long remoteCooldownTimeMs = dateLiteral1.unixTimestamp(TimeUtils.getTimeZone());
DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "remote_s3", remoteCooldownTimeMs);
List<Partition> partitionList = Lists.newArrayList(p2, p3, p4);
for (Partition partition : partitionList) {
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
}
// alter cooldown_time
String stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('storage_cooldown_time' = '2100-04-01 22:22:22')";
alterTable(stmt, false);
DateLiteral newDateLiteral = new DateLiteral("2100-04-01 22:22:22", Type.DATETIME);
long newCooldownTimeMs = newDateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DataProperty dataProperty2 = new DataProperty(TStorageMedium.SSD, newCooldownTimeMs, "remote_s3", remoteCooldownTimeMs);
for (Partition partition : partitionList) {
Assert.assertEquals(dataProperty2, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
}
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
// alter storage_medium
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('storage_medium' = 'HDD')";
alterTable(stmt, false);
DataProperty dataProperty1 = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "remote_s3", remoteCooldownTimeMs);
for (Partition partition : partitionList) {
Assert.assertEquals(dataProperty1, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
}
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
// alter remote_storage
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('remote_storage_resource' = 'remote_s3_1')";
alterTable(stmt, true);
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
// alter remote_storage_cooldown_time
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('remote_storage_cooldown_time' = '2122-12-01 20:23:00')";
alterTable(stmt, false);
DateLiteral newRemoteDate = new DateLiteral("2122-12-01 20:23:00", Type.DATETIME);
long newRemoteCooldownTimeMs = newRemoteDate.unixTimestamp(TimeUtils.getTimeZone());
DataProperty dataProperty4 = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "remote_s3", newRemoteCooldownTimeMs);
for (Partition partition : partitionList) {
Assert.assertEquals(dataProperty4, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
}
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
// alter recover to old state
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set (" +
"'storage_medium' = 'SSD', " +
"'storage_cooldown_time' = '2122-04-01 20:24:00', " +
"'remote_storage_cooldown_time' = '2122-12-01 20:23:00'" +
")";
alterTable(stmt, false);
for (Partition partition : partitionList) {
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
}
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
}
@Test
public void testDynamicPartitionDropAndAdd() throws Exception {
// test day range
@ -890,4 +1036,11 @@ public class AlterTest {
Assert.assertEquals("tbl1", odbcTable.getOdbcTableName());
Assert.assertEquals("MySQL", odbcTable.getOdbcDriver());
}
@Test(expected = DdlException.class)
public void testDropInUseResource() throws Exception {
String sql = "drop resource remote_s3";
DropResourceStmt stmt = (DropResourceStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Catalog.getCurrentCatalog().getResourceMgr().dropResource(stmt);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ResourceType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PaloAuth;
@ -39,12 +40,14 @@ public class CreateResourceStmtTest {
private Analyzer analyzer;
private String resourceName1;
private String resourceName2;
private String resourceName3;
@Before()
public void setUp() {
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
resourceName1 = "spark0";
resourceName2 = "odbc";
resourceName3 = "s3";
}
@Test
@ -74,6 +77,14 @@ public class CreateResourceStmtTest {
Assert.assertEquals(Resource.ResourceType.ODBC_CATALOG, stmt.getResourceType());
Assert.assertEquals("CREATE EXTERNAL RESOURCE 'odbc' PROPERTIES(\"type\" = \"odbc_catalog\")", stmt.toSql());
properties = Maps.newHashMap();
properties.put("type", "s3");
stmt = new CreateResourceStmt(true, resourceName3, properties);
stmt.analyze(analyzer);
Assert.assertEquals(resourceName3, stmt.getResourceName());
Assert.assertEquals(ResourceType.S3, stmt.getResourceType());
Assert.assertEquals("CREATE EXTERNAL RESOURCE 's3' PROPERTIES(\"type\" = \"s3\")", stmt.toSql());
}
@Test(expected = AnalysisException.class)

View File

@ -33,8 +33,9 @@ public class DataPropertyTest {
dataProperty = new DataProperty(TStorageMedium.SSD);
Assert.assertNotEquals(DataProperty.MAX_COOLDOWN_TIME_MS, dataProperty.getCooldownTimeMs());
dataProperty = new DataProperty(TStorageMedium.SSD, System.currentTimeMillis() + 24 * 3600 * 1000L);
Assert.assertEquals(System.currentTimeMillis() + 24 * 3600 * 1000L, dataProperty.getCooldownTimeMs());
long storageCooldownTimeMs = System.currentTimeMillis() + 24 * 3600 * 1000L;
dataProperty = new DataProperty(TStorageMedium.SSD, storageCooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
Assert.assertEquals(storageCooldownTimeMs, dataProperty.getCooldownTimeMs());
dataProperty = new DataProperty(TStorageMedium.HDD);
Assert.assertEquals(DataProperty.MAX_COOLDOWN_TIME_MS, dataProperty.getCooldownTimeMs());

View File

@ -18,6 +18,7 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.AlterResourceStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.DropResourceStmt;
@ -36,35 +37,67 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class ResourceMgrTest {
private String name;
private String type;
// spark resource
private String master;
private String sparkResName;
private String sparkRestype;
private String workingDir;
private String broker;
private Map<String, String> properties;
private Map<String, String> sparkProperties;
// s3 resource
private String s3ResName;
private String s3ResType;
private String s3Endpoint;
private String s3Region;
private String s3RootPath;
private String s3AccessKey;
private String s3SecretKey;
private String s3MaxConnections;
private String s3ReqTimeoutMs;
private String s3ConnTimeoutMs;
private Map<String, String> s3Properties;
private Analyzer analyzer;
@Before
public void setUp() {
name = "spark0";
type = "spark";
sparkResName = "spark0";
sparkRestype = "spark";
master = "spark://127.0.0.1:7077";
workingDir = "hdfs://127.0.0.1/tmp/doris";
broker = "broker0";
properties = Maps.newHashMap();
properties.put("type", type);
properties.put("spark.master", master);
properties.put("spark.submit.deployMode", "cluster");
properties.put("working_dir", workingDir);
properties.put("broker", broker);
sparkProperties = Maps.newHashMap();
sparkProperties.put("type", sparkRestype);
sparkProperties.put("spark.master", master);
sparkProperties.put("spark.submit.deployMode", "cluster");
sparkProperties.put("working_dir", workingDir);
sparkProperties.put("broker", broker);
s3ResName = "s30";
s3ResType = "s3";
s3Endpoint = "aaa";
s3Region = "bj";
s3RootPath = "/path/to/root";
s3AccessKey = "xxx";
s3SecretKey = "yyy";
s3MaxConnections = "50";
s3ReqTimeoutMs = "3000";
s3ConnTimeoutMs = "1000";
s3Properties = new HashMap<>();
s3Properties.put("type", s3ResType);
s3Properties.put("s3_endpoint", s3Endpoint);
s3Properties.put("s3_region", s3Region);
s3Properties.put("s3_root_path", s3RootPath);
s3Properties.put("s3_access_key", s3AccessKey);
s3Properties.put("s3_secret_key", s3SecretKey);
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
}
@Test
public void testAddDropResource(@Injectable BrokerMgr brokerMgr, @Injectable EditLog editLog,
public void testAddAlterDropResource(@Injectable BrokerMgr brokerMgr, @Injectable EditLog editLog,
@Mocked Catalog catalog, @Injectable PaloAuth auth) throws UserException {
new Expectations() {
{
@ -81,22 +114,54 @@ public class ResourceMgrTest {
}
};
// spark resource
// add
ResourceMgr mgr = new ResourceMgr();
CreateResourceStmt stmt = new CreateResourceStmt(true, name, properties);
CreateResourceStmt stmt = new CreateResourceStmt(true, sparkResName, sparkProperties);
stmt.analyze(analyzer);
Assert.assertEquals(0, mgr.getResourceNum());
mgr.createResource(stmt);
Assert.assertEquals(1, mgr.getResourceNum());
Assert.assertTrue(mgr.containsResource(name));
SparkResource resource = (SparkResource) mgr.getResource(name);
Assert.assertTrue(mgr.containsResource(sparkResName));
SparkResource resource = (SparkResource) mgr.getResource(sparkResName);
Assert.assertNotNull(resource);
Assert.assertEquals(broker, resource.getBroker());
// alter
workingDir = "hdfs://127.0.0.1/tmp/doris_new";
Map<String, String> copiedSparkProperties = Maps.newHashMap(sparkProperties);
copiedSparkProperties.put("working_dir", workingDir);
copiedSparkProperties.remove("spark.master");
AlterResourceStmt alterResourceStmt = new AlterResourceStmt(sparkResName, copiedSparkProperties);
mgr.alterResource(alterResourceStmt);
Assert.assertEquals(workingDir, ((SparkResource) mgr.getResource(sparkResName)).getWorkingDir());
// drop
DropResourceStmt dropStmt = new DropResourceStmt(name);
DropResourceStmt dropStmt = new DropResourceStmt(sparkResName);
mgr.dropResource(dropStmt);
Assert.assertEquals(0, mgr.getResourceNum());
// s3 resource
stmt = new CreateResourceStmt(true, s3ResName, s3Properties);
stmt.analyze(analyzer);
Assert.assertEquals(0, mgr.getResourceNum());
mgr.createResource(stmt);
Assert.assertEquals(1, mgr.getResourceNum());
// alter
s3Region = "sh";
Map<String, String> copiedS3Properties = Maps.newHashMap(s3Properties);
copiedS3Properties.put("s3_region", s3Region);
copiedS3Properties.remove("type");
alterResourceStmt = new AlterResourceStmt(s3ResName, copiedS3Properties);
mgr.alterResource(alterResourceStmt);
Assert.assertEquals(s3Region, ((S3Resource) mgr.getResource(s3ResName)).getProperty("s3_region"));
// drop
dropStmt = new DropResourceStmt(s3ResName);
mgr.dropResource(dropStmt);
Assert.assertEquals(0, mgr.getResourceNum());
}
@Test(expected = DdlException.class)
@ -117,7 +182,7 @@ public class ResourceMgrTest {
// add
ResourceMgr mgr = new ResourceMgr();
CreateResourceStmt stmt = new CreateResourceStmt(true, name, properties);
CreateResourceStmt stmt = new CreateResourceStmt(true, sparkResName, sparkProperties);
stmt.analyze(analyzer);
Assert.assertEquals(0, mgr.getResourceNum());
mgr.createResource(stmt);
@ -132,7 +197,7 @@ public class ResourceMgrTest {
// drop
ResourceMgr mgr = new ResourceMgr();
Assert.assertEquals(0, mgr.getResourceNum());
DropResourceStmt stmt = new DropResourceStmt(name);
DropResourceStmt stmt = new DropResourceStmt(sparkResName);
mgr.dropResource(stmt);
}
}

View File

@ -0,0 +1,195 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.HashMap;
import java.util.Map;
public class S3ResourceTest {
private String name;
private String type;
private String s3Endpoint;
private String s3Region;
private String s3RootPath;
private String s3AccessKey;
private String s3SecretKey;
private String s3MaxConnections;
private String s3ReqTimeoutMs;
private String s3ConnTimeoutMs;
private Map<String, String> s3Properties;
private Analyzer analyzer;
@Before
public void setUp() {
name = "s3";
type = "s3";
s3Endpoint = "aaa";
s3Region = "bj";
s3RootPath = "/path/to/root";
s3AccessKey = "xxx";
s3SecretKey = "yyy";
s3MaxConnections = "50";
s3ReqTimeoutMs = "3000";
s3ConnTimeoutMs = "1000";
s3Properties = new HashMap<>();
s3Properties.put("type", type);
s3Properties.put("s3_endpoint", s3Endpoint);
s3Properties.put("s3_region", s3Region);
s3Properties.put("s3_root_path", s3RootPath);
s3Properties.put("s3_access_key", s3AccessKey);
s3Properties.put("s3_secret_key", s3SecretKey);
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
}
@Test
public void testFromStmt(@Mocked Catalog catalog, @Injectable PaloAuth auth) throws UserException {
new Expectations() {
{
catalog.getAuth();
result = auth;
auth.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN);
result = true;
}
};
// resource with default settings
CreateResourceStmt stmt = new CreateResourceStmt(true, name, s3Properties);
stmt.analyze(analyzer);
S3Resource s3Resource = (S3Resource) Resource.fromStmt(stmt);
Assert.assertEquals(name, s3Resource.getName());
Assert.assertEquals(type, s3Resource.getType().name().toLowerCase());
Assert.assertEquals(s3Endpoint, s3Resource.getProperty("s3_endpoint"));
Assert.assertEquals(s3Region, s3Resource.getProperty("s3_region"));
Assert.assertEquals(s3RootPath, s3Resource.getProperty("s3_root_path"));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty("s3_access_key"));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty("s3_secret_key"));
Assert.assertEquals(s3MaxConnections, s3Resource.getProperty("s3_max_connections"));
Assert.assertEquals(s3ReqTimeoutMs, s3Resource.getProperty("s3_request_timeout_ms"));
Assert.assertEquals(s3ConnTimeoutMs, s3Resource.getProperty("s3_connection_timeout_ms"));
// with no default settings
s3Properties.put("s3_max_connections", "100");
s3Properties.put("s3_request_timeout_ms", "2000");
s3Properties.put("s3_connection_timeout_ms", "2000");
stmt = new CreateResourceStmt(true, name, s3Properties);
stmt.analyze(analyzer);
s3Resource = (S3Resource) Resource.fromStmt(stmt);
Assert.assertEquals(name, s3Resource.getName());
Assert.assertEquals(type, s3Resource.getType().name().toLowerCase());
Assert.assertEquals(s3Endpoint, s3Resource.getProperty("s3_endpoint"));
Assert.assertEquals(s3Region, s3Resource.getProperty("s3_region"));
Assert.assertEquals(s3RootPath, s3Resource.getProperty("s3_root_path"));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty("s3_access_key"));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty("s3_secret_key"));
Assert.assertEquals("100", s3Resource.getProperty("s3_max_connections"));
Assert.assertEquals("2000", s3Resource.getProperty("s3_request_timeout_ms"));
Assert.assertEquals("2000", s3Resource.getProperty("s3_connection_timeout_ms"));
}
@Test (expected = DdlException.class)
public void testAbnormalResource(@Mocked Catalog catalog, @Injectable PaloAuth auth) throws UserException {
new Expectations() {
{
catalog.getAuth();
result = auth;
auth.checkGlobalPriv((ConnectContext) any, PrivPredicate.ADMIN);
result = true;
}
};
s3Properties.remove("s3_root_path");
CreateResourceStmt stmt = new CreateResourceStmt(true, name, s3Properties);
stmt.analyze(analyzer);
Resource.fromStmt(stmt);
}
@Test
public void testSerialization() throws Exception{
MetaContext metaContext = new MetaContext();
metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT);
metaContext.setThreadLocalInfo();
// 1. write
File s3File = new File("./s3Resource");
s3File.createNewFile();
DataOutputStream s3Dos = new DataOutputStream(new FileOutputStream(s3File));
S3Resource s3Resource1 = new S3Resource("s3_1");
s3Resource1.write(s3Dos);
Map<String, String> properties = new HashMap<>();
properties.put("s3_endpoint", "aaa");
properties.put("s3_region", "bbb");
properties.put("s3_root_path", "/path/to/root");
properties.put("s3_access_key", "xxx");
properties.put("s3_secret_key", "yyy");
S3Resource s3Resource2 = new S3Resource("s3_2");
s3Resource2.setProperties(properties);
s3Resource2.write(s3Dos);
s3Dos.flush();
s3Dos.close();
// 2. read
DataInputStream s3Dis = new DataInputStream(new FileInputStream(s3File));
S3Resource rS3Resource1 = (S3Resource) S3Resource.read(s3Dis);
S3Resource rS3Resource2 = (S3Resource) S3Resource.read(s3Dis);
Assert.assertEquals("s3_1", rS3Resource1.getName());
Assert.assertEquals("s3_2", rS3Resource2.getName());
Assert.assertEquals(rS3Resource2.getProperty("s3_endpoint"), "aaa");
Assert.assertEquals(rS3Resource2.getProperty("s3_region"), "bbb");
Assert.assertEquals(rS3Resource2.getProperty("s3_root_path"), "/path/to/root");
Assert.assertEquals(rS3Resource2.getProperty("s3_access_key"), "xxx");
Assert.assertEquals(rS3Resource2.getProperty("s3_secret_key"), "yyy");
Assert.assertEquals(rS3Resource2.getProperty("s3_max_connections"), "50");
Assert.assertEquals(rS3Resource2.getProperty("s3_request_timeout_ms"), "3000");
Assert.assertEquals(rS3Resource2.getProperty("s3_connection_timeout_ms"), "1000");
// 3. delete
s3Dis.close();
s3File.delete();
}
}

View File

@ -37,7 +37,6 @@ import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.thrift.TStorageMedium;
@ -59,7 +58,6 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
//import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@ -67,7 +65,6 @@ import java.util.stream.LongStream;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
//import static com.google.common.collect.MoreCollectors.onlyElement;
public class DiskRebalanceTest {
private static final Logger LOG = LogManager.getLogger(DiskRebalanceTest.class);

View File

@ -146,7 +146,7 @@ public class PropertyAnalyzerTest {
Map<String, String> properties = Maps.newHashMap();
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, "SSD");
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COLDOWN_TIME, tomorrowTimeStr);
properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, tomorrowTimeStr);
DataProperty dataProperty = PropertyAnalyzer.analyzeDataProperty(properties, new DataProperty(TStorageMedium.SSD));
// avoid UT fail because time zone different
DateLiteral dateLiteral = new DateLiteral(tomorrowTimeStr, Type.DATETIME);