Add storage policy for remote storage migration (#9997)
This commit is contained in:
@ -32,11 +32,13 @@ CREATE POLICY
|
||||
|
||||
### Description
|
||||
|
||||
Create security policies and explain to view the rewritten SQL.
|
||||
Create policies,such as:
|
||||
1. Create security policies(ROW POLICY) and explain to view the rewritten SQL.
|
||||
2. Create storage migration policy(STORAGE POLICY), used for cold and hot data transform
|
||||
|
||||
#### 行安全策略
|
||||
grammar:
|
||||
#### Grammar:
|
||||
|
||||
1. ROW POLICY
|
||||
```sql
|
||||
CREATE ROW POLICY test_row_policy_1 ON test.table1
|
||||
AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
|
||||
@ -49,6 +51,21 @@ illustrate:
|
||||
- It is connected with AND between RESTRICTIVE AND PERMISSIVE
|
||||
- It cannot be created for users root and admin
|
||||
|
||||
2. STORAGE POLICY
|
||||
```sql
|
||||
CREATE STORAGE POLICY test_storage_policy_1
|
||||
PROPERTIES ("key"="value", ...);
|
||||
```
|
||||
illustrate:
|
||||
- PROPERTIES has such keys:
|
||||
1. storage_resource:storage resource name for policy
|
||||
2. cooldown_datetime:cool down time for tablet, can't be set with cooldown_ttl.
|
||||
3. cooldown_ttl:hot data stay time. The time cost between the time of tablet created and
|
||||
the time of migrated to cold data, formatted as:
|
||||
1d:1 day
|
||||
1h:1 hour
|
||||
50000: 50000 second
|
||||
|
||||
### Example
|
||||
|
||||
1. Create a set of row security policies
|
||||
@ -76,6 +93,24 @@ illustrate:
|
||||
select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd')
|
||||
```
|
||||
|
||||
2. Create policy for storage
|
||||
1. Create policy on cooldown_datetime
|
||||
```sql
|
||||
CREATE STORAGE POLICY testPolicy
|
||||
PROPERTIES(
|
||||
"storage_resource" = "s3",
|
||||
"cooldown_datetime" = "2022-06-08 00:00:00"
|
||||
);
|
||||
```
|
||||
2. Create policy on cooldown_ttl
|
||||
```sql
|
||||
CREATE STORAGE POLICY testPolicy
|
||||
PROPERTIES(
|
||||
"storage_resource" = "s3",
|
||||
"cooldown_ttl" = "1d"
|
||||
);
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
CREATE, POLICY
|
||||
|
||||
@ -32,16 +32,18 @@ CREATE POLICY
|
||||
|
||||
### Description
|
||||
|
||||
创建安全策略,explain 可以查看改写后的 SQL。
|
||||
创建策略,包含以下几种:
|
||||
|
||||
#### 行安全策略
|
||||
语法:
|
||||
1. 创建安全策略(ROW POLICY),explain 可以查看改写后的 SQL。
|
||||
2. 创建数据迁移策略(STORAGE POLICY),用于冷热数据转换。
|
||||
|
||||
#### 语法:
|
||||
|
||||
1. ROW POLICY
|
||||
```sql
|
||||
CREATE ROW POLICY test_row_policy_1 ON test.table1
|
||||
AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
|
||||
```
|
||||
|
||||
参数说明:
|
||||
|
||||
- filterType:RESTRICTIVE 将一组策略通过 AND 连接, PERMISSIVE 将一组策略通过 OR 连接
|
||||
@ -49,6 +51,20 @@ AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
|
||||
- RESTRICTIVE 和 PERMISSIVE 之间通过 AND 连接的
|
||||
- 不允许对 root 和 admin 用户创建
|
||||
|
||||
2. STORAGE POLICY
|
||||
```sql
|
||||
CREATE STORAGE POLICY test_storage_policy_1
|
||||
PROPERTIES ("key"="value", ...);
|
||||
```
|
||||
参数说明:
|
||||
- PROPERTIES中需要指定资源的类型:
|
||||
1. storage_resource:指定策略使用的storage resource名称。
|
||||
2. cooldown_datetime:热数据转为冷数据时间,不能与cooldown_ttl同时存在。
|
||||
3. cooldown_ttl:热数据持续时间。从数据分片生成时开始计算,经过指定时间后转为冷数据。支持的格式:
|
||||
1d:1天
|
||||
1h:1小时
|
||||
50000: 50000秒
|
||||
|
||||
### Example
|
||||
|
||||
1. 创建一组行安全策略
|
||||
@ -75,6 +91,23 @@ AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
|
||||
```sql
|
||||
select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd')
|
||||
```
|
||||
2. 创建数据迁移策略
|
||||
1. 指定数据冷却时间创建数据迁移策略
|
||||
```sql
|
||||
CREATE STORAGE POLICY testPolicy
|
||||
PROPERTIES(
|
||||
"storage_resource" = "s3",
|
||||
"cooldown_datetime" = "2022-06-08 00:00:00"
|
||||
);
|
||||
```
|
||||
2. 指定热数据持续时间创建数据迁移策略
|
||||
```sql
|
||||
CREATE STORAGE POLICY testPolicy
|
||||
PROPERTIES(
|
||||
"storage_resource" = "s3",
|
||||
"cooldown_ttl" = "1d"
|
||||
);
|
||||
```
|
||||
|
||||
### Keywords
|
||||
|
||||
|
||||
@ -1361,6 +1361,11 @@ create_stmt ::=
|
||||
{:
|
||||
RESULT = new CreatePolicyStmt(PolicyTypeEnum.ROW, ifNotExists, policyName, tbl, filterType, user, wherePredicate);
|
||||
:}
|
||||
/* storage policy */
|
||||
| KW_CREATE KW_STORAGE KW_POLICY opt_if_not_exists:ifNotExists ident:policyName opt_properties:properties
|
||||
{:
|
||||
RESULT = new CreatePolicyStmt(PolicyTypeEnum.STORAGE, ifNotExists, policyName, properties);
|
||||
:}
|
||||
;
|
||||
|
||||
channel_desc_list ::=
|
||||
@ -2074,6 +2079,10 @@ drop_stmt ::=
|
||||
{:
|
||||
RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, user);
|
||||
:}
|
||||
| KW_DROP KW_STORAGE KW_POLICY opt_if_exists:ifExists ident:policyName
|
||||
{:
|
||||
RESULT = new DropPolicyStmt(PolicyTypeEnum.STORAGE, ifExists, policyName, null, null);
|
||||
:}
|
||||
;
|
||||
|
||||
// Recover statement
|
||||
@ -2574,6 +2583,10 @@ show_stmt ::=
|
||||
{:
|
||||
RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, null);
|
||||
:}
|
||||
| KW_SHOW KW_STORAGE KW_POLICY
|
||||
{:
|
||||
RESULT = new ShowPolicyStmt(PolicyTypeEnum.STORAGE, null);
|
||||
:}
|
||||
;
|
||||
|
||||
show_param ::=
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.PrintableMap;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.policy.FilterType;
|
||||
import org.apache.doris.policy.PolicyTypeEnum;
|
||||
@ -28,6 +29,8 @@ import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Create policy statement.
|
||||
* syntax:
|
||||
@ -45,17 +48,20 @@ public class CreatePolicyStmt extends DdlStmt {
|
||||
private final String policyName;
|
||||
|
||||
@Getter
|
||||
private final TableName tableName;
|
||||
private TableName tableName = null;
|
||||
|
||||
@Getter
|
||||
private final FilterType filterType;
|
||||
private FilterType filterType = null;
|
||||
|
||||
@Getter
|
||||
private final UserIdentity user;
|
||||
private UserIdentity user = null;
|
||||
|
||||
@Getter
|
||||
private Expr wherePredicate;
|
||||
|
||||
@Getter
|
||||
private Map<String, String> properties;
|
||||
|
||||
/**
|
||||
* Use for cup.
|
||||
**/
|
||||
@ -70,14 +76,31 @@ public class CreatePolicyStmt extends DdlStmt {
|
||||
this.wherePredicate = wherePredicate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use for cup.
|
||||
*/
|
||||
public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String policyName,
|
||||
Map<String, String> properties) {
|
||||
this.type = type;
|
||||
this.ifNotExists = ifNotExists;
|
||||
this.policyName = policyName;
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
super.analyze(analyzer);
|
||||
tableName.analyze(analyzer);
|
||||
user.analyze(analyzer.getClusterName());
|
||||
if (user.isRootUser() || user.isAdminUser()) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt",
|
||||
user.getQualifiedUser(), user.getHost(), tableName.getTbl());
|
||||
switch (type) {
|
||||
case STORAGE:
|
||||
break;
|
||||
case ROW:
|
||||
default:
|
||||
tableName.analyze(analyzer);
|
||||
user.analyze(analyzer.getClusterName());
|
||||
if (user.isRootUser() || user.isAdminUser()) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt",
|
||||
user.getQualifiedUser(), user.getHost(), tableName.getTbl());
|
||||
}
|
||||
}
|
||||
// check auth
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
|
||||
@ -92,8 +115,16 @@ public class CreatePolicyStmt extends DdlStmt {
|
||||
if (ifNotExists) {
|
||||
sb.append("IF NOT EXISTS");
|
||||
}
|
||||
sb.append(policyName).append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType)
|
||||
.append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql());
|
||||
sb.append(policyName);
|
||||
switch (type) {
|
||||
case STORAGE:
|
||||
sb.append(" PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
|
||||
break;
|
||||
case ROW:
|
||||
default:
|
||||
sb.append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType)
|
||||
.append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql());
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,9 +54,15 @@ public class DropPolicyStmt extends DdlStmt {
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
super.analyze(analyzer);
|
||||
tableName.analyze(analyzer);
|
||||
if (user != null) {
|
||||
user.analyze(analyzer.getClusterName());
|
||||
switch (type) {
|
||||
case STORAGE:
|
||||
break;
|
||||
case ROW:
|
||||
default:
|
||||
tableName.analyze(analyzer);
|
||||
if (user != null) {
|
||||
user.analyze(analyzer.getClusterName());
|
||||
}
|
||||
}
|
||||
// check auth
|
||||
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
|
||||
@ -71,9 +77,16 @@ public class DropPolicyStmt extends DdlStmt {
|
||||
if (ifExists) {
|
||||
sb.append("IF EXISTS ");
|
||||
}
|
||||
sb.append(policyName).append(" ON ").append(tableName.toSql());
|
||||
if (user != null) {
|
||||
sb.append(" FOR ").append(user.getQualifiedUser());
|
||||
sb.append(policyName);
|
||||
switch (type) {
|
||||
case STORAGE:
|
||||
break;
|
||||
case ROW:
|
||||
default:
|
||||
sb.append(" ON ").append(tableName.toSql());
|
||||
if (user != null) {
|
||||
sb.append(" FOR ").append(user.getQualifiedUser());
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@ -25,6 +25,8 @@ import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.policy.PolicyTypeEnum;
|
||||
import org.apache.doris.policy.RowPolicy;
|
||||
import org.apache.doris.policy.StoragePolicy;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
|
||||
@ -48,18 +50,6 @@ public class ShowPolicyStmt extends ShowStmt {
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
private static final ShowResultSetMetaData ROW_META_DATA =
|
||||
ShowResultSetMetaData.builder()
|
||||
.addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
|
||||
.addColumn(new Column("DbName", ScalarType.createVarchar(100)))
|
||||
.addColumn(new Column("TableName", ScalarType.createVarchar(100)))
|
||||
.addColumn(new Column("Type", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("FilterType", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535)))
|
||||
.addColumn(new Column("User", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535)))
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
super.analyze(analyzer);
|
||||
@ -76,14 +66,26 @@ public class ShowPolicyStmt extends ShowStmt {
|
||||
public String toSql() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("SHOW ").append(type).append(" POLICY");
|
||||
if (user != null) {
|
||||
sb.append(" FOR ").append(user);
|
||||
switch (type) {
|
||||
case STORAGE:
|
||||
break;
|
||||
case ROW:
|
||||
default:
|
||||
if (user != null) {
|
||||
sb.append(" FOR ").append(user);
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShowResultSetMetaData getMetaData() {
|
||||
return ROW_META_DATA;
|
||||
switch (type) {
|
||||
case STORAGE:
|
||||
return StoragePolicy.STORAGE_META_DATA;
|
||||
case ROW:
|
||||
default:
|
||||
return RowPolicy.ROW_META_DATA;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,8 +44,7 @@ public abstract class Resource implements Writable {
|
||||
UNKNOWN,
|
||||
SPARK,
|
||||
ODBC_CATALOG,
|
||||
S3,
|
||||
STORAGE_POLICY;
|
||||
S3;
|
||||
|
||||
public static ResourceType fromString(String resourceType) {
|
||||
for (ResourceType type : ResourceType.values()) {
|
||||
@ -96,9 +95,6 @@ public abstract class Resource implements Writable {
|
||||
case S3:
|
||||
resource = new S3Resource(name);
|
||||
break;
|
||||
case STORAGE_POLICY:
|
||||
resource = new StoragePolicyResource(name);
|
||||
break;
|
||||
default:
|
||||
throw new DdlException("Unknown resource type: " + type);
|
||||
}
|
||||
|
||||
@ -72,8 +72,7 @@ public class ResourceMgr implements Writable {
|
||||
public void createResource(CreateResourceStmt stmt) throws DdlException {
|
||||
if (stmt.getResourceType() != ResourceType.SPARK
|
||||
&& stmt.getResourceType() != ResourceType.ODBC_CATALOG
|
||||
&& stmt.getResourceType() != ResourceType.S3
|
||||
&& stmt.getResourceType() != ResourceType.STORAGE_POLICY) {
|
||||
&& stmt.getResourceType() != ResourceType.S3) {
|
||||
throw new DdlException("Only support SPARK, ODBC_CATALOG and REMOTE_STORAGE resource.");
|
||||
}
|
||||
Resource resource = Resource.fromStmt(stmt);
|
||||
|
||||
@ -1,136 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Policy resource for olap table.
|
||||
* Syntax:
|
||||
* CREATE RESOURCE "storage_policy_name"
|
||||
* PROPERTIES(
|
||||
* "type"="storage_policy",
|
||||
* "cooldown_datetime" = "2022-06-01", // time when data is transfter to medium
|
||||
* "cooldown_ttl" = "1h", // data is transfter to medium after 1 hour
|
||||
* "s3_resource" = "my_s3" // point to a s3 resource
|
||||
* );
|
||||
*/
|
||||
public class StoragePolicyResource extends Resource {
|
||||
// required
|
||||
private static final String STORAGE_RESOURCE = "storage_resource";
|
||||
// optional
|
||||
private static final String COOLDOWN_DATETIME = "cooldown_datetime";
|
||||
private static final String COOLDOWN_TTL = "cooldown_ttl";
|
||||
|
||||
private static final String DEFAULT_COOLDOWN_DATETIME = "9999-01-01 00:00:00";
|
||||
private static final String DEFAULT_COOLDOWN_TTL = "1h";
|
||||
|
||||
@SerializedName(value = "properties")
|
||||
private Map<String, String> properties;
|
||||
|
||||
public StoragePolicyResource(String name) {
|
||||
this(name, Maps.newHashMap());
|
||||
}
|
||||
|
||||
public StoragePolicyResource(String name, Map<String, String> properties) {
|
||||
super(name, ResourceType.STORAGE_POLICY);
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public String getProperty(String propertyKey) {
|
||||
return properties.get(propertyKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setProperties(Map<String, String> properties) throws DdlException {
|
||||
Preconditions.checkState(properties != null);
|
||||
this.properties = properties;
|
||||
// check properties
|
||||
// required
|
||||
checkRequiredProperty(STORAGE_RESOURCE);
|
||||
// optional
|
||||
checkOptionalProperty(COOLDOWN_DATETIME, DEFAULT_COOLDOWN_DATETIME);
|
||||
checkOptionalProperty(COOLDOWN_TTL, DEFAULT_COOLDOWN_TTL);
|
||||
if (properties.containsKey(COOLDOWN_DATETIME) && properties.containsKey(COOLDOWN_TTL)
|
||||
&& !properties.get(COOLDOWN_DATETIME).isEmpty() && !properties.get(COOLDOWN_TTL).isEmpty()) {
|
||||
throw new DdlException("Only one of [" + COOLDOWN_DATETIME + "] and [" + COOLDOWN_TTL
|
||||
+ "] can be specified in properties.");
|
||||
}
|
||||
}
|
||||
|
||||
private void checkRequiredProperty(String propertyKey) throws DdlException {
|
||||
String value = properties.get(propertyKey);
|
||||
|
||||
if (Strings.isNullOrEmpty(value)) {
|
||||
throw new DdlException("Missing [" + propertyKey + "] in properties.");
|
||||
}
|
||||
}
|
||||
|
||||
private void checkOptionalProperty(String propertyKey, String defaultValue) {
|
||||
this.properties.putIfAbsent(propertyKey, defaultValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void modifyProperties(Map<String, String> properties) throws DdlException {
|
||||
if (properties.containsKey(COOLDOWN_DATETIME) && properties.containsKey(COOLDOWN_TTL)
|
||||
&& !properties.get(COOLDOWN_DATETIME).isEmpty() && !properties.get(COOLDOWN_TTL).isEmpty()) {
|
||||
throw new DdlException("Only one of [" + COOLDOWN_DATETIME + "] and [" + COOLDOWN_TTL
|
||||
+ "] can be specified in properties.");
|
||||
}
|
||||
// modify properties
|
||||
replaceIfEffectiveValue(this.properties, STORAGE_RESOURCE, properties.get(STORAGE_RESOURCE));
|
||||
replaceIfEffectiveValue(this.properties, COOLDOWN_DATETIME, properties.get(COOLDOWN_DATETIME));
|
||||
replaceIfEffectiveValue(this.properties, COOLDOWN_TTL, properties.get(COOLDOWN_TTL));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkProperties(Map<String, String> properties) throws AnalysisException {
|
||||
// check properties
|
||||
Map<String, String> copiedProperties = Maps.newHashMap(properties);
|
||||
copiedProperties.remove(STORAGE_RESOURCE);
|
||||
copiedProperties.remove(COOLDOWN_DATETIME);
|
||||
copiedProperties.remove(COOLDOWN_TTL);
|
||||
|
||||
if (!copiedProperties.isEmpty()) {
|
||||
throw new AnalysisException("Unknown policy resource properties: " + copiedProperties);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getCopiedProperties() {
|
||||
return Maps.newHashMap(properties);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void getProcNodeData(BaseProcResult result) {
|
||||
String lowerCaseType = type.name().toLowerCase();
|
||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -65,7 +65,6 @@ import org.apache.doris.mysql.privilege.UserPropertyInfo;
|
||||
import org.apache.doris.plugin.PluginInfo;
|
||||
import org.apache.doris.policy.DropPolicyLog;
|
||||
import org.apache.doris.policy.Policy;
|
||||
import org.apache.doris.policy.RowPolicy;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.Frontend;
|
||||
import org.apache.doris.transaction.TransactionState;
|
||||
@ -813,7 +812,7 @@ public class EditLog {
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_CREATE_POLICY: {
|
||||
RowPolicy log = (RowPolicy) journal.getData();
|
||||
Policy log = (Policy) journal.getData();
|
||||
catalog.getPolicyMgr().replayCreate(log);
|
||||
break;
|
||||
}
|
||||
@ -1426,11 +1425,7 @@ public class EditLog {
|
||||
}
|
||||
|
||||
public void logCreatePolicy(Policy policy) {
|
||||
if (policy instanceof RowPolicy) {
|
||||
logEdit(OperationType.OP_CREATE_POLICY, policy);
|
||||
} else {
|
||||
LOG.error("invalid policy: " + policy.getType().name());
|
||||
}
|
||||
logEdit(OperationType.OP_CREATE_POLICY, policy);
|
||||
}
|
||||
|
||||
public void logDropPolicy(DropPolicyLog log) {
|
||||
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.load.sync.SyncJob;
|
||||
import org.apache.doris.load.sync.canal.CanalSyncJob;
|
||||
import org.apache.doris.policy.Policy;
|
||||
import org.apache.doris.policy.RowPolicy;
|
||||
import org.apache.doris.policy.StoragePolicy;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
@ -136,7 +137,8 @@ public class GsonUtils {
|
||||
// runtime adapter for class "Policy"
|
||||
private static RuntimeTypeAdapterFactory<Policy> policyTypeAdapterFactory = RuntimeTypeAdapterFactory
|
||||
.of(Policy.class, "clazz")
|
||||
.registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName());
|
||||
.registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName())
|
||||
.registerSubtype(StoragePolicy.class, StoragePolicy.class.getSimpleName());
|
||||
|
||||
// the builder of GSON instance.
|
||||
// Add any other adapters if necessary.
|
||||
|
||||
@ -77,16 +77,20 @@ public abstract class Policy implements Writable, GsonPostProcessable {
|
||||
* Trans stmt to Policy.
|
||||
**/
|
||||
public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException {
|
||||
String curDb = stmt.getTableName().getDb();
|
||||
if (curDb == null) {
|
||||
curDb = ConnectContext.get().getDatabase();
|
||||
}
|
||||
Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
|
||||
UserIdentity userIdent = stmt.getUser();
|
||||
userIdent.analyze(ConnectContext.get().getClusterName());
|
||||
switch (stmt.getType()) {
|
||||
case STORAGE:
|
||||
StoragePolicy storagePolicy = new StoragePolicy(stmt.getType(), stmt.getPolicyName());
|
||||
storagePolicy.init(stmt.getProperties());
|
||||
return storagePolicy;
|
||||
case ROW:
|
||||
default:
|
||||
String curDb = stmt.getTableName().getDb();
|
||||
if (curDb == null) {
|
||||
curDb = ConnectContext.get().getDatabase();
|
||||
}
|
||||
Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
|
||||
UserIdentity userIdent = stmt.getUser();
|
||||
userIdent.analyze(ConnectContext.get().getClusterName());
|
||||
Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
|
||||
return new RowPolicy(stmt.getType(), stmt.getPolicyName(), db.getId(), userIdent,
|
||||
stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(),
|
||||
|
||||
@ -208,6 +208,9 @@ public class PolicyMgr implements Writable {
|
||||
long currentDbId = ConnectContext.get().getCurrentDbId();
|
||||
Policy checkedPolicy = null;
|
||||
switch (showStmt.getType()) {
|
||||
case STORAGE:
|
||||
checkedPolicy = new StoragePolicy();
|
||||
break;
|
||||
case ROW:
|
||||
default:
|
||||
RowPolicy rowPolicy = new RowPolicy();
|
||||
|
||||
@ -23,10 +23,13 @@ import org.apache.doris.analysis.SqlParser;
|
||||
import org.apache.doris.analysis.SqlScanner;
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
@ -45,6 +48,18 @@ import java.util.List;
|
||||
@Data
|
||||
public class RowPolicy extends Policy {
|
||||
|
||||
public static final ShowResultSetMetaData ROW_META_DATA =
|
||||
ShowResultSetMetaData.builder()
|
||||
.addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
|
||||
.addColumn(new Column("DbName", ScalarType.createVarchar(100)))
|
||||
.addColumn(new Column("TableName", ScalarType.createVarchar(100)))
|
||||
.addColumn(new Column("Type", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("FilterType", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535)))
|
||||
.addColumn(new Column("User", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535)))
|
||||
.build();
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(RowPolicy.class);
|
||||
|
||||
/**
|
||||
|
||||
@ -0,0 +1,196 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.policy;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import lombok.Data;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Save policy for storage migration.
|
||||
**/
|
||||
@Data
|
||||
public class StoragePolicy extends Policy {
|
||||
|
||||
public static final ShowResultSetMetaData STORAGE_META_DATA =
|
||||
ShowResultSetMetaData.builder()
|
||||
.addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
|
||||
.addColumn(new Column("Type", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("StorageResource", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("CooldownDatetime", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("CooldownTtl", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("properties", ScalarType.createVarchar(65535)))
|
||||
.build();
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(StoragePolicy.class);
|
||||
// required
|
||||
private static final String STORAGE_RESOURCE = "storage_resource";
|
||||
// optional
|
||||
private static final String COOLDOWN_DATETIME = "cooldown_datetime";
|
||||
private static final String COOLDOWN_TTL = "cooldown_ttl";
|
||||
|
||||
@SerializedName(value = "storageResource")
|
||||
private String storageResource = null;
|
||||
|
||||
@SerializedName(value = "cooldownDatetime")
|
||||
private Date cooldownDatetime = null;
|
||||
|
||||
@SerializedName(value = "cooldownTtl")
|
||||
private String cooldownTtl = null;
|
||||
|
||||
private Map<String, String> props;
|
||||
|
||||
public StoragePolicy() {}
|
||||
|
||||
/**
|
||||
* Policy for Storage Migration.
|
||||
*
|
||||
* @param type PolicyType
|
||||
* @param policyName policy name
|
||||
* @param storageResource resource name for storage
|
||||
* @param cooldownDatetime cool down time
|
||||
* @param cooldownTtl cool down time cost after partition is created
|
||||
*/
|
||||
public StoragePolicy(final PolicyTypeEnum type, final String policyName, final String storageResource,
|
||||
final Date cooldownDatetime, final String cooldownTtl) {
|
||||
super(type, policyName);
|
||||
this.storageResource = storageResource;
|
||||
this.cooldownDatetime = cooldownDatetime;
|
||||
this.cooldownTtl = cooldownTtl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Policy for Storage Migration.
|
||||
*
|
||||
* @param type PolicyType
|
||||
* @param policyName policy name
|
||||
*/
|
||||
public StoragePolicy(final PolicyTypeEnum type, final String policyName) {
|
||||
super(type, policyName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Init props for storage policy.
|
||||
*
|
||||
* @param props properties for storage policy
|
||||
*/
|
||||
public void init(final Map<String, String> props) throws AnalysisException {
|
||||
if (props == null) {
|
||||
throw new AnalysisException("properties config is required");
|
||||
}
|
||||
checkRequiredProperty(props, STORAGE_RESOURCE);
|
||||
this.storageResource = props.get(STORAGE_RESOURCE);
|
||||
boolean hasCooldownDatetime = false;
|
||||
boolean hasCooldownTtl = false;
|
||||
if (props.containsKey(COOLDOWN_DATETIME)) {
|
||||
hasCooldownDatetime = true;
|
||||
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
try {
|
||||
this.cooldownDatetime = df.parse(props.get(COOLDOWN_DATETIME));
|
||||
} catch (ParseException e) {
|
||||
throw new AnalysisException(String.format("cooldown_datetime format error: %s",
|
||||
props.get(COOLDOWN_DATETIME)), e);
|
||||
}
|
||||
}
|
||||
if (props.containsKey(COOLDOWN_TTL)) {
|
||||
hasCooldownTtl = true;
|
||||
this.cooldownTtl = props.get(COOLDOWN_TTL);
|
||||
}
|
||||
if (hasCooldownDatetime && hasCooldownTtl) {
|
||||
throw new AnalysisException(COOLDOWN_DATETIME + " and " + COOLDOWN_TTL + " can't be set together.");
|
||||
}
|
||||
if (!hasCooldownDatetime && !hasCooldownTtl) {
|
||||
throw new AnalysisException(COOLDOWN_DATETIME + " or " + COOLDOWN_TTL + " must be set");
|
||||
}
|
||||
if (!Catalog.getCurrentCatalog().getResourceMgr().containsResource(this.storageResource)) {
|
||||
throw new AnalysisException("storage resource doesn't exist: " + this.storageResource);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use for SHOW POLICY.
|
||||
**/
|
||||
public List<String> getShowInfo() throws AnalysisException {
|
||||
String props = "";
|
||||
if (Catalog.getCurrentCatalog().getResourceMgr().containsResource(this.storageResource)) {
|
||||
props = Catalog.getCurrentCatalog().getResourceMgr().getResource(this.storageResource).toString();
|
||||
}
|
||||
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
return Lists.newArrayList(this.policyName, this.type.name(), this.storageResource,
|
||||
df.format(this.cooldownDatetime), this.cooldownTtl, props);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void gsonPostProcess() throws IOException {}
|
||||
|
||||
@Override
|
||||
public StoragePolicy clone() {
|
||||
return new StoragePolicy(this.type, this.policyName, this.storageResource,
|
||||
this.cooldownDatetime, this.cooldownTtl);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matchPolicy(Policy checkedPolicyCondition) {
|
||||
if (!(checkedPolicyCondition instanceof StoragePolicy)) {
|
||||
return false;
|
||||
}
|
||||
StoragePolicy storagePolicy = (StoragePolicy) checkedPolicyCondition;
|
||||
return checkMatched(storagePolicy.getType(), storagePolicy.getPolicyName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matchPolicy(DropPolicyLog checkedDropCondition) {
|
||||
return checkMatched(checkedDropCondition.getType(), checkedDropCondition.getPolicyName());
|
||||
}
|
||||
|
||||
/**
|
||||
* check required key in properties.
|
||||
*
|
||||
* @param props properties for storage policy
|
||||
* @param propertyKey key for property
|
||||
* @throws AnalysisException exception for properties error
|
||||
*/
|
||||
private void checkRequiredProperty(final Map<String, String> props, String propertyKey) throws AnalysisException {
|
||||
String value = props.get(propertyKey);
|
||||
|
||||
if (Strings.isNullOrEmpty(value)) {
|
||||
throw new AnalysisException("Missing [" + propertyKey + "] in properties.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInvalid() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user