[feature] add remote storage policy config for create table properties (#10159)

Add remote storage policy config for create table properties. It will set storage policy for table and partitions in `CREATE TABLE` and `ALTER TABLE`.
This policy will be used when partition is being migrated from local to remote.
grammy:
1.
`CREATE TABLE TblPxy1
(...)
ENGINE=olap
DISTRIBUTED BY HASH (aa) BUCKETS 1
PROPERTIES(
    "remote_storage_policy" = "testPolicy3"
);`
2.
`ALTER TABLE TblPxy01 SET ("remote_storage_policy" = "testPolicy3");`
3.
`ALTER TABLE TblPxy01 MODIFY PARTITION p2 SET ("remote_storage_policy" = "testPolicy3");`
This commit is contained in:
pengxiangyu
2022-06-20 12:42:23 +08:00
committed by GitHub
parent acf07d8966
commit 087fc596b1
19 changed files with 288 additions and 187 deletions

View File

@ -0,0 +1,74 @@
---
{
"title": "DROP-MATERIALIZED-VIEW",
"language": "en"
}
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## DROP-POLICY
### Name
DROP POLICY
### Description
drop policy for row or storage
#### ROW POLICY
Grammar:
1. Drop row policy
```sql
DROP ROW POLICY test_row_policy_1 on table1 [FOR user];
```
2. Drop storage policy
```sql
DROP STORAGE POLICY policy_name1
```
### Example
1. Drop the row policy for table1 named test_row_policy_1
```sql
DROP ROW POLICY test_row_policy_1 on table1
```
2. Drop the row policy for table1 using by user test
```sql
DROP ROW POLICY test_row_policy_1 on table1 for test
```
3. Drop the storage policy named policy_name1
```sql
DROP STORAGE POLICY policy_name1
```
### Keywords
DROP, POLICY
### Best Practice

View File

@ -38,10 +38,16 @@ DROP POLICY
语法:
1. 删除行安全策略
```sql
DROP ROW POLICY test_row_policy_1 on table1 [FOR user];
```
2. 删除冷热数据存储策略
```sql
DROP STORAGE POLICY policy_name1
```
### Example
1. 删除 table1 的 test_row_policy_1
@ -56,6 +62,10 @@ DROP ROW POLICY test_row_policy_1 on table1 [FOR user];
DROP ROW POLICY test_row_policy_1 on table1 for test
```
3. 删除 policy_name1 对应的冷热数据存储策略
```sql
DROP STORAGE POLICY policy_name1
```
### Keywords
DROP, POLICY

View File

@ -690,14 +690,11 @@ public class Alter {
DateLiteral dateLiteral = new DateLiteral(dataProperty.getCooldownTimeMs(),
TimeUtils.getTimeZone(), Type.DATETIME);
newProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, dateLiteral.getStringValue());
newProperties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE,
dataProperty.getRemoteStorageResourceName());
DateLiteral dateLiteral1 = new DateLiteral(dataProperty.getRemoteCooldownTimeMs(),
TimeUtils.getTimeZone(), Type.DATETIME);
newProperties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME, dateLiteral1.getStringValue());
newProperties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_POLICY, dataProperty.getRemoteStoragePolicy());
newProperties.putAll(properties);
// 4.3 analyze new properties
DataProperty newDataProperty = PropertyAnalyzer.analyzeDataProperty(newProperties, null);
DataProperty newDataProperty =
PropertyAnalyzer.analyzeDataProperty(newProperties, DataProperty.DEFAULT_DATA_PROPERTY);
// 1. date property
if (newDataProperty != null) {

View File

@ -1532,6 +1532,10 @@ public class SchemaChangeHandler extends AlterHandler {
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
Catalog.getCurrentCatalog().modifyTableReplicaAllocation(db, olapTable, properties);
return;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_POLICY)) {
olapTable.setRemoteStoragePolicy(
properties.get(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_POLICY));
return;
}
}

View File

@ -96,18 +96,13 @@ public class ModifyPartitionClause extends AlterTableClause {
// 3. in_memory
// 4. tablet type
private void checkProperties(Map<String, String> properties) throws AnalysisException {
// 1. data property, can not modify partition property remote_storage_resource
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
throw new AnalysisException("Do not support modify partition data property `remote_storage_resource`.");
}
// 2. replica allocation
// 1. replica allocation
PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
// 3. in memory
// 2. in memory
PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY, false);
// 4. tablet type
// 3. tablet type
PropertyAnalyzer.analyzeTabletType(properties);
}

View File

@ -87,10 +87,10 @@ public class ModifyTablePropertiesClause extends AlterTableClause {
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
this.needTableStable = false;
this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_POLICY)) {
// do nothing, just check valid.
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TABLET_TYPE)) {
throw new AnalysisException("Alter tablet type not supported");
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
throw new AnalysisException("Alter table remote_storage_resource is not supported.");
} else {
throw new AnalysisException("Unknown table property: " + properties.keySet());
}

View File

@ -2853,11 +2853,11 @@ public class Catalog {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).append("\" = \"");
sb.append(olapTable.getStorageFormat()).append("\"");
// remote storage resource
String remoteStorageResource = olapTable.getRemoteStorageResource();
if (!Strings.isNullOrEmpty(remoteStorageResource)) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE).append("\" = \"");
sb.append(remoteStorageResource).append("\"");
// remote storage
String remoteStoragePolicy = olapTable.getRemoteStoragePolicy();
if (!Strings.isNullOrEmpty(remoteStoragePolicy)) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_POLICY).append("\" = \"");
sb.append(remoteStoragePolicy).append("\"");
}
// compression type
if (olapTable.getCompressionType() != TCompressionType.LZ4F) {

View File

@ -42,10 +42,8 @@ public class DataProperty implements Writable {
private TStorageMedium storageMedium;
@SerializedName(value = "cooldownTimeMs")
private long cooldownTimeMs;
@SerializedName(value = "remoteStorageResourceName")
private String remoteStorageResourceName;
@SerializedName(value = "remoteCooldownTimeMs")
private long remoteCooldownTimeMs;
@SerializedName(value = "remoteStoragePolicy")
private String remoteStoragePolicy;
private DataProperty() {
// for persist
@ -59,16 +57,20 @@ public class DataProperty implements Writable {
} else {
this.cooldownTimeMs = MAX_COOLDOWN_TIME_MS;
}
this.remoteStorageResourceName = "";
this.remoteCooldownTimeMs = MAX_COOLDOWN_TIME_MS;
this.remoteStoragePolicy = "";
}
public DataProperty(TStorageMedium medium, long cooldown,
String remoteStorageResourceName, long remoteCooldownTimeMs) {
/**
* DataProperty construction.
*
* @param medium storage medium for the init storage of the table
* @param cooldown cool down time for SSD->HDD
* @param remoteStoragePolicy remote storage policy for remote storage
*/
public DataProperty(TStorageMedium medium, long cooldown, String remoteStoragePolicy) {
this.storageMedium = medium;
this.cooldownTimeMs = cooldown;
this.remoteStorageResourceName = remoteStorageResourceName;
this.remoteCooldownTimeMs = remoteCooldownTimeMs;
this.remoteStoragePolicy = remoteStoragePolicy;
}
public TStorageMedium getStorageMedium() {
@ -79,12 +81,8 @@ public class DataProperty implements Writable {
return cooldownTimeMs;
}
public long getRemoteCooldownTimeMs() {
return remoteCooldownTimeMs;
}
public String getRemoteStorageResourceName() {
return remoteStorageResourceName;
public String getRemoteStoragePolicy() {
return remoteStoragePolicy;
}
public static DataProperty read(DataInput in) throws IOException {
@ -106,13 +104,12 @@ public class DataProperty implements Writable {
public void readFields(DataInput in) throws IOException {
storageMedium = TStorageMedium.valueOf(Text.readString(in));
cooldownTimeMs = in.readLong();
remoteStorageResourceName = "";
remoteCooldownTimeMs = MAX_COOLDOWN_TIME_MS;
remoteStoragePolicy = "";
}
@Override
public int hashCode() {
return Objects.hash(storageMedium, cooldownTimeMs, remoteStorageResourceName, remoteCooldownTimeMs);
return Objects.hash(storageMedium, cooldownTimeMs, remoteStoragePolicy);
}
@Override
@ -129,8 +126,7 @@ public class DataProperty implements Writable {
return this.storageMedium == other.storageMedium
&& this.cooldownTimeMs == other.cooldownTimeMs
&& this.remoteCooldownTimeMs == other.remoteCooldownTimeMs
&& this.remoteStorageResourceName.equals(other.remoteStorageResourceName);
&& this.remoteStoragePolicy.equals(other.remoteStoragePolicy);
}
@Override
@ -138,8 +134,7 @@ public class DataProperty implements Writable {
StringBuilder sb = new StringBuilder();
sb.append("Storage medium[").append(this.storageMedium).append("]. ");
sb.append("cool down[").append(TimeUtils.longToTimeString(cooldownTimeMs)).append("]. ");
sb.append("remote storage resource name[").append(this.remoteStorageResourceName).append("]. ");
sb.append("remote cool down[").append(TimeUtils.longToTimeString(remoteCooldownTimeMs)).append("].");
sb.append("remote storage policy[").append(this.remoteStoragePolicy).append("]. ");
return sb.toString();
}
}

View File

@ -1547,12 +1547,17 @@ public class OlapTable extends Table {
tableProperty.buildDataSortInfo();
}
public void setRemoteStorageResource(String resourceName) {
/**
* set remote storage policy for table.
*
* @param remoteStoragePolicy remote storage policy name
*/
public void setRemoteStoragePolicy(String remoteStoragePolicy) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty.setRemoteStorageResource(resourceName);
tableProperty.buildRemoteStorageResource();
tableProperty.setRemoteStoragePolicy(remoteStoragePolicy);
tableProperty.buildRemoteStoragePolicy();
}
// return true if partition with given name already exist, both in partitions and temp partitions.
@ -1722,11 +1727,16 @@ public class OlapTable extends Table {
return tableProperty.getDataSortInfo();
}
public String getRemoteStorageResource() {
/**
* get remote storage policy name.
*
* @return remote storage policy name for this table.
*/
public String getRemoteStoragePolicy() {
if (tableProperty == null) {
return "";
}
return tableProperty.getRemoteStorageResource();
return tableProperty.getRemoteStoragePolicy();
}
// For non partitioned table:

View File

@ -31,6 +31,9 @@ import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
@ -43,10 +46,8 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
@ -99,36 +100,12 @@ public class ResourceMgr implements Writable {
throw new DdlException("Resource(" + resourceName + ") does not exist");
}
// Check whether the resource is in use before deleting it, except spark resource
List<String> usedTables = new ArrayList<>();
List<Long> dbIds = Catalog.getCurrentCatalog().getDbIds();
for (Long dbId : dbIds) {
Optional<Database> database = Catalog.getCurrentCatalog().getDb(dbId);
database.ifPresent(db -> {
List<Table> tables = db.getTablesOnIdOrder();
for (Table table : tables) {
if (table instanceof OdbcTable) {
// odbc resource
if (resourceName.equals(((OdbcTable) table).getOdbcCatalogResourceName())) {
usedTables.add(db.getFullName() + "." + table.getName());
}
} else if (table instanceof OlapTable) {
// remote resource, such as s3 resource
PartitionInfo partitionInfo = ((OlapTable) table).getPartitionInfo();
List<Long> partitionIds = ((OlapTable) table).getPartitionIds();
for (Long partitionId : partitionIds) {
DataProperty dataProperty = partitionInfo.getDataProperty(partitionId);
if (resourceName.equals(dataProperty.getRemoteStorageResourceName())) {
usedTables.add(db.getFullName() + "." + table.getName());
break;
}
}
}
}
});
}
if (usedTables.size() > 0) {
LOG.warn("Can not drop resource, since it's used in tables {}", usedTables);
throw new DdlException("Can not drop resource, since it's used in tables " + usedTables);
StoragePolicy checkedStoragePolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, null);
checkedStoragePolicy.setStorageResource(resourceName);
if (Catalog.getCurrentCatalog().getPolicyMgr().existPolicy(checkedStoragePolicy)) {
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedStoragePolicy);
LOG.warn("Can not drop resource, since it's used in policy {}", policy.getPolicyName());
throw new DdlException("Can not drop resource, since it's used in policy " + policy.getPolicyName());
}
nameToResource.remove(resourceName);
// log drop

View File

@ -72,8 +72,8 @@ public class TableProperty implements Writable {
private DataSortInfo dataSortInfo = new DataSortInfo();
// remote storage resource, for cold data
private String remoteStorageResource;
// remote storage policy, for cold data
private String remoteStoragePolicy;
public TableProperty(Map<String, String> properties) {
this.properties = properties;
@ -162,8 +162,8 @@ public class TableProperty implements Writable {
return this;
}
public TableProperty buildRemoteStorageResource() {
remoteStorageResource = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE, "");
public TableProperty buildRemoteStoragePolicy() {
remoteStoragePolicy = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_POLICY, "");
return this;
}
@ -184,10 +184,9 @@ public class TableProperty implements Writable {
replicaAlloc.toCreateStmt());
}
public void setRemoteStorageResource(String resourceName) {
this.remoteStorageResource = resourceName;
properties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE,
resourceName);
public void setRemoteStoragePolicy(String remotePolicyName) {
this.remoteStoragePolicy = remotePolicyName;
properties.put(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_POLICY, remotePolicyName);
}
public ReplicaAllocation getReplicaAllocation() {
@ -232,8 +231,8 @@ public class TableProperty implements Writable {
return dataSortInfo;
}
public String getRemoteStorageResource() {
return remoteStorageResource;
public String getRemoteStoragePolicy() {
return remoteStoragePolicy;
}
public TCompressionType getCompressionType() {
@ -264,7 +263,7 @@ public class TableProperty implements Writable {
.buildInMemory()
.buildStorageFormat()
.buildDataSortInfo()
.buildRemoteStorageResource()
.buildRemoteStoragePolicy()
.buildCompressionType();
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) {
// get replica num from property map and create replica allocation

View File

@ -66,8 +66,7 @@ public class PartitionsProcDir implements ProcDirInterface {
.add("PartitionId").add("PartitionName")
.add("VisibleVersion").add("VisibleVersionTime")
.add("State").add("PartitionKey").add("Range").add("DistributionKey")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime")
.add("RemoteStorageResource").add("RemoteStorageCooldownTime")
.add("Buckets").add("ReplicationNum").add("StorageMedium").add("CooldownTime").add("RemoteStoragePolicy")
.add("LastConsistencyCheckTime").add("DataSize").add("IsInMemory").add("ReplicaAllocation")
.build();
@ -273,8 +272,7 @@ public class PartitionsProcDir implements ProcDirInterface {
DataProperty dataProperty = tblPartitionInfo.getDataProperty(partitionId);
partitionInfo.add(dataProperty.getStorageMedium().name());
partitionInfo.add(TimeUtils.longToTimeString(dataProperty.getCooldownTimeMs()));
partitionInfo.add(dataProperty.getRemoteStorageResourceName());
partitionInfo.add(TimeUtils.longToTimeString(dataProperty.getRemoteCooldownTimeMs()));
partitionInfo.add(dataProperty.getRemoteStoragePolicy());
partitionInfo.add(TimeUtils.longToTimeString(partition.getLastCheckTime()));

View File

@ -26,11 +26,13 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TSortType;
@ -50,8 +52,6 @@ import java.util.Map;
import java.util.Set;
public class PropertyAnalyzer {
private static final Logger LOG = LogManager.getLogger(PropertyAnalyzer.class);
private static final String COMMA_SEPARATOR = ",";
public static final String PROPERTIES_SHORT_KEY = "short_key";
public static final String PROPERTIES_REPLICATION_NUM = "replication_num";
@ -59,7 +59,6 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_STORAGE_TYPE = "storage_type";
public static final String PROPERTIES_STORAGE_MEDIUM = "storage_medium";
public static final String PROPERTIES_STORAGE_COOLDOWN_TIME = "storage_cooldown_time";
public static final String PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME = "remote_storage_cooldown_time";
// for 1.x -> 2.x migration
public static final String PROPERTIES_VERSION_INFO = "version_info";
// for restore
@ -67,8 +66,6 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_BF_COLUMNS = "bloom_filter_columns";
public static final String PROPERTIES_BF_FPP = "bloom_filter_fpp";
private static final double MAX_FPP = 0.05;
private static final double MIN_FPP = 0.0001;
public static final String PROPERTIES_COLUMN_SEPARATOR = "column_separator";
public static final String PROPERTIES_LINE_DELIMITER = "line_delimiter";
@ -89,7 +86,7 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_INMEMORY = "in_memory";
public static final String PROPERTIES_REMOTE_STORAGE_RESOURCE = "remote_storage_resource";
public static final String PROPERTIES_REMOTE_STORAGE_POLICY = "remote_storage_policy";
public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
@ -109,6 +106,11 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_DISABLE_LOAD = "disable_load";
private static final Logger LOG = LogManager.getLogger(PropertyAnalyzer.class);
private static final String COMMA_SEPARATOR = ",";
private static final double MAX_FPP = 0.05;
private static final double MIN_FPP = 0.0001;
public static DataProperty analyzeDataProperty(Map<String, String> properties, DataProperty oldDataProperty)
throws AnalysisException {
if (properties == null || properties.isEmpty()) {
@ -117,13 +119,11 @@ public class PropertyAnalyzer {
TStorageMedium storageMedium = null;
long cooldownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
String remoteStorageResourceName = "";
long remoteCooldownTimeStamp = DataProperty.MAX_COOLDOWN_TIME_MS;
String remoteStoragePolicy = "";
boolean hasMedium = false;
boolean hasCooldown = false;
boolean hasRemoteStorageResource = false;
boolean hasRemoteCooldown = false;
boolean hasRemoteStoragePolicy = false;
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
@ -142,31 +142,23 @@ public class PropertyAnalyzer {
if (cooldownTimeStamp != DataProperty.MAX_COOLDOWN_TIME_MS) {
hasCooldown = true;
}
} else if (!hasRemoteStorageResource && key.equalsIgnoreCase(PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
} else if (!hasRemoteStoragePolicy && key.equalsIgnoreCase(PROPERTIES_REMOTE_STORAGE_POLICY)) {
if (!Strings.isNullOrEmpty(value)) {
hasRemoteStorageResource = true;
remoteStorageResourceName = value;
}
} else if (!hasRemoteCooldown && key.equalsIgnoreCase(PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME)) {
DateLiteral dateLiteral = new DateLiteral(value, Type.DATETIME);
remoteCooldownTimeStamp = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
if (remoteCooldownTimeStamp != DataProperty.MAX_COOLDOWN_TIME_MS) {
hasRemoteCooldown = true;
hasRemoteStoragePolicy = true;
remoteStoragePolicy = value;
}
}
} // end for properties
// Check properties
if (!hasCooldown && !hasMedium) {
if (!hasCooldown && !hasMedium && !hasRemoteStoragePolicy) {
return oldDataProperty;
}
properties.remove(PROPERTIES_STORAGE_MEDIUM);
properties.remove(PROPERTIES_STORAGE_COOLDOWN_TIME);
properties.remove(PROPERTIES_REMOTE_STORAGE_RESOURCE);
properties.remove(PROPERTIES_REMOTE_STORAGE_COOLDOWN_TIME);
properties.remove(PROPERTIES_REMOTE_STORAGE_POLICY);
if (hasCooldown && !hasMedium) {
throw new AnalysisException("Invalid data property. storage medium property is not found");
@ -190,30 +182,28 @@ public class PropertyAnalyzer {
cooldownTimeStamp = currentTimeMs + Config.storage_cooldown_second * 1000L;
}
// check remote_storage_resource and remote_storage_cooldown_time
if ((!hasRemoteCooldown && hasRemoteStorageResource) || (hasRemoteCooldown && !hasRemoteStorageResource)) {
throw new AnalysisException("Invalid data property, "
+ "`remote_storage_resource` and `remote_storage_cooldown_time` must be used together.");
}
if (hasRemoteStorageResource && hasRemoteCooldown) {
if (hasRemoteStoragePolicy) {
// check remote resource
Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(remoteStorageResourceName);
if (resource == null) {
throw new AnalysisException("Invalid data property, "
+ "`remote_storage_resource` [" + remoteStorageResourceName + "] dose not exist.");
StoragePolicy checkedPolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, remoteStoragePolicy);
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedPolicy);
if (!(policy instanceof StoragePolicy)) {
throw new AnalysisException("No PolicyStorage: " + remoteStoragePolicy);
}
StoragePolicy storagePolicy = (StoragePolicy) policy;
// check remote storage cool down timestamp
if (remoteCooldownTimeStamp <= currentTimeMs) {
throw new AnalysisException("Remote storage cool down time should later than now");
}
if (hasCooldown && (remoteCooldownTimeStamp <= cooldownTimeStamp)) {
throw new AnalysisException(
"`remote_storage_cooldown_time` should later than `storage_cooldown_time`.");
if (storagePolicy.getCooldownDatetime() != null) {
if (storagePolicy.getCooldownDatetime().getTime() <= currentTimeMs) {
throw new AnalysisException("Remote storage cool down time should later than now");
}
if (hasCooldown && (storagePolicy.getCooldownDatetime().getTime() <= cooldownTimeStamp)) {
throw new AnalysisException("`remote_storage_cooldown_time`"
+ " should later than `storage_cooldown_time`.");
}
}
}
Preconditions.checkNotNull(storageMedium);
return new DataProperty(storageMedium, cooldownTimeStamp, remoteStorageResourceName, remoteCooldownTimeStamp);
return new DataProperty(storageMedium, cooldownTimeStamp, remoteStoragePolicy);
}
public static short analyzeShortKeyColumnCount(Map<String, String> properties) throws AnalysisException {
@ -501,19 +491,26 @@ public class PropertyAnalyzer {
return defaultVal;
}
// analyze remote storage resource
public static String analyzeRemoteStorageResource(Map<String, String> properties) throws AnalysisException {
String resourceName = "";
if (properties != null && properties.containsKey(PROPERTIES_REMOTE_STORAGE_RESOURCE)) {
resourceName = properties.get(PROPERTIES_REMOTE_STORAGE_RESOURCE);
// check resource existence
Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName);
if (resource == null) {
throw new AnalysisException("Resource does not exist, name: " + resourceName);
/**
* analyze remote storage policy.
*
* @param properties property for table
* @return remote storage policy name
* @throws AnalysisException policy name doesn't exist
*/
public static String analyzeRemoteStoragePolicy(Map<String, String> properties) throws AnalysisException {
String remoteStoragePolicy = "";
if (properties != null && properties.containsKey(PROPERTIES_REMOTE_STORAGE_POLICY)) {
remoteStoragePolicy = properties.get(PROPERTIES_REMOTE_STORAGE_POLICY);
// check remote storage policy existence
StoragePolicy checkedStoragePolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, remoteStoragePolicy);
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedStoragePolicy);
if (!(policy instanceof StoragePolicy)) {
throw new AnalysisException("StoragePolicy: " + remoteStoragePolicy + " does not exist.");
}
}
return resourceName;
return remoteStoragePolicy;
}
// analyze property like : "type" = "xxx";

View File

@ -1828,8 +1828,8 @@ public class InternalDataSource implements DataSourceIf {
olapTable.setIsInMemory(isInMemory);
// set remote storage
String resourceName = PropertyAnalyzer.analyzeRemoteStorageResource(properties);
olapTable.setRemoteStorageResource(resourceName);
String remoteStoragePolicy = PropertyAnalyzer.analyzeRemoteStoragePolicy(properties);
olapTable.setRemoteStoragePolicy(remoteStoragePolicy);
TTabletType tabletType;
try {
@ -1838,7 +1838,6 @@ public class InternalDataSource implements DataSourceIf {
throw new DdlException(e.getMessage());
}
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
// if this is an unpartitioned table, we should analyze data property and replication num here.
// if this is a partitioned table, there properties are already analyzed

View File

@ -62,13 +62,21 @@ public class DropPolicyLog implements Writable {
* Generate delete logs through stmt.
**/
public static DropPolicyLog fromDropStmt(DropPolicyStmt stmt) throws AnalysisException {
String curDb = stmt.getTableName().getDb();
if (curDb == null) {
curDb = ConnectContext.get().getDatabase();
switch (stmt.getType()) {
case STORAGE:
return new DropPolicyLog(-1, -1, stmt.getType(), stmt.getPolicyName(), null);
case ROW:
String curDb = stmt.getTableName().getDb();
if (curDb == null) {
curDb = ConnectContext.get().getDatabase();
}
Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
return new DropPolicyLog(db.getId(), table.getId(), stmt.getType(),
stmt.getPolicyName(), stmt.getUser());
default:
throw new AnalysisException("Invalid policy type: " + stmt.getType().name());
}
Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
return new DropPolicyLog(db.getId(), table.getId(), stmt.getType(), stmt.getPolicyName(), stmt.getUser());
}
@Override

View File

@ -136,16 +136,44 @@ public class PolicyMgr implements Writable {
return userPolicySet.contains(user);
}
private boolean existPolicy(Policy checkedPolicy) {
/**
* Check whether the policy exist.
*
* @param checkedPolicy policy condition to check
* @return exist or not
*/
public boolean existPolicy(Policy checkedPolicy) {
List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
return policies.stream().anyMatch(policy -> policy.matchPolicy(checkedPolicy));
}
/**
* CCheck whether the policy exist for the DropPolicyLog.
*
* @param checkedDropPolicy policy log condition to check
* @return exist or not
*/
private boolean existPolicy(DropPolicyLog checkedDropPolicy) {
List<Policy> policies = getPoliciesByType(checkedDropPolicy.getType());
return policies.stream().anyMatch(policy -> policy.matchPolicy(checkedDropPolicy));
}
/**
* Get policy by type and name.
*
* @param checkedPolicy condition to get policy
* @return Policy in typeToPolicyMap
*/
public Policy getPolicy(Policy checkedPolicy) {
List<Policy> policies = getPoliciesByType(checkedPolicy.getType());
for (Policy policy : policies) {
if (policy.matchPolicy(checkedPolicy)) {
return policy;
}
}
return null;
}
private List<Policy> getPoliciesByType(PolicyTypeEnum policyType) {
if (typeToPolicyMap == null) {
return new ArrayList<>();

View File

@ -147,8 +147,12 @@ public class StoragePolicy extends Policy {
props = Catalog.getCurrentCatalog().getResourceMgr().getResource(this.storageResource).toString();
}
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String cooldownDatetimeStr = "";
if (this.cooldownDatetime != null) {
cooldownDatetimeStr = df.format(this.cooldownDatetime);
}
return Lists.newArrayList(this.policyName, this.type.name(), this.storageResource,
df.format(this.cooldownDatetime), this.cooldownTtl, props);
cooldownDatetimeStr, this.cooldownTtl, props);
}
@Override
@ -166,7 +170,9 @@ public class StoragePolicy extends Policy {
return false;
}
StoragePolicy storagePolicy = (StoragePolicy) checkedPolicyCondition;
return checkMatched(storagePolicy.getType(), storagePolicy.getPolicyName());
return (storagePolicy.getStorageResource() == null
|| storagePolicy.getStorageResource().equals(this.storageResource))
&& checkMatched(storagePolicy.getType(), storagePolicy.getPolicyName());
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.doris.alter;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DateLiteral;
@ -198,6 +199,18 @@ public class AlterTest {
+ " \"s3_connection_timeout_ms\" = \"1000\"\n"
+ ");");
createRemoteStoragePolicy("CREATE STORAGE POLICY testPolicy\n"
+ "PROPERTIES(\n"
+ " \"storage_resource\" = \"remote_s3\",\n"
+ " \"cooldown_datetime\" = \"2100-05-10 00:00:00\"\n"
+ ");");
createRemoteStoragePolicy("CREATE STORAGE POLICY testPolicy2\n"
+ "PROPERTIES(\n"
+ " \"storage_resource\" = \"remote_s3\",\n"
+ " \"cooldown_ttl\" = \"1d\"\n"
+ ");");
createTable("CREATE TABLE test.tbl_remote\n"
+ "(\n"
+ " k1 date,\n"
@ -217,9 +230,8 @@ public class AlterTest {
+ " 'replication_num' = '1',\n"
+ " 'in_memory' = 'false',\n"
+ " 'storage_medium' = 'SSD',\n"
+ " 'storage_cooldown_time' = '2122-04-01 20:24:00',\n"
+ " 'remote_storage_resource' = 'remote_s3',\n"
+ " 'remote_storage_cooldown_time' = '2122-12-01 20:23:00'"
+ " 'storage_cooldown_time' = '2100-05-09 00:00:00',\n"
+ " 'remote_storage_policy' = 'testPolicy'\n"
+ ");");
}
@ -239,6 +251,11 @@ public class AlterTest {
Catalog.getCurrentCatalog().getResourceMgr().createResource(stmt);
}
private static void createRemoteStoragePolicy(String sql) throws Exception {
CreatePolicyStmt stmt = (CreatePolicyStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Catalog.getCurrentCatalog().getPolicyMgr().createPolicy(stmt);
}
private static void alterTable(String sql, boolean expectedException) throws Exception {
try {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
@ -447,13 +464,13 @@ public class AlterTest {
stmt = "alter table test.tbl4 modify partition (p3, p4) set ('storage_medium' = 'HDD')";
DateLiteral dateLiteral = new DateLiteral("2999-12-31 00:00:00", Type.DATETIME);
long cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "");
partitionList = Lists.newArrayList(p3, p4);
for (Partition partition : partitionList) {
Assert.assertEquals(oldDataProperty, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
}
alterTable(stmt, false);
DataProperty newDataProperty = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "", DataProperty.MAX_COOLDOWN_TIME_MS);
DataProperty newDataProperty = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "");
for (Partition partition : partitionList) {
Assert.assertEquals(newDataProperty, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
}
@ -466,7 +483,7 @@ public class AlterTest {
dateLiteral = new DateLiteral("2100-12-31 00:00:00", Type.DATETIME);
cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DataProperty newDataProperty1 = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
DataProperty newDataProperty1 = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "");
partitionList = Lists.newArrayList(p1, p2);
for (Partition partition : partitionList) {
Assert.assertEquals(newDataProperty1, tbl4.getPartitionInfo().getDataProperty(partition.getId()));
@ -492,11 +509,9 @@ public class AlterTest {
Partition p3 = tblRemote.getPartition("p3");
Partition p4 = tblRemote.getPartition("p4");
DateLiteral dateLiteral = new DateLiteral("2122-04-01 20:24:00", Type.DATETIME);
DateLiteral dateLiteral = new DateLiteral("2100-05-09 00:00:00", Type.DATETIME);
long cooldownTimeMs = dateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DateLiteral dateLiteral1 = new DateLiteral("2122-12-01 20:23:00", Type.DATETIME);
long remoteCooldownTimeMs = dateLiteral1.unixTimestamp(TimeUtils.getTimeZone());
DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "remote_s3", remoteCooldownTimeMs);
DataProperty oldDataProperty = new DataProperty(TStorageMedium.SSD, cooldownTimeMs, "testPolicy");
List<Partition> partitionList = Lists.newArrayList(p2, p3, p4);
for (Partition partition : partitionList) {
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
@ -507,7 +522,7 @@ public class AlterTest {
alterTable(stmt, false);
DateLiteral newDateLiteral = new DateLiteral("2100-04-01 22:22:22", Type.DATETIME);
long newCooldownTimeMs = newDateLiteral.unixTimestamp(TimeUtils.getTimeZone());
DataProperty dataProperty2 = new DataProperty(TStorageMedium.SSD, newCooldownTimeMs, "remote_s3", remoteCooldownTimeMs);
DataProperty dataProperty2 = new DataProperty(TStorageMedium.SSD, newCooldownTimeMs, "testPolicy");
for (Partition partition : partitionList) {
Assert.assertEquals(dataProperty2, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
}
@ -516,33 +531,22 @@ public class AlterTest {
// alter storage_medium
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('storage_medium' = 'HDD')";
alterTable(stmt, false);
DataProperty dataProperty1 = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "remote_s3", remoteCooldownTimeMs);
DataProperty dataProperty1 = new DataProperty(
TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "testPolicy");
for (Partition partition : partitionList) {
Assert.assertEquals(dataProperty1, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
}
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
// alter remote_storage
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('remote_storage_resource' = 'remote_s3_1')";
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('remote_storage_policy' = 'testPolicy3')";
alterTable(stmt, true);
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
// alter remote_storage_cooldown_time
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ('remote_storage_cooldown_time' = '2122-12-01 20:23:00')";
alterTable(stmt, false);
DateLiteral newRemoteDate = new DateLiteral("2122-12-01 20:23:00", Type.DATETIME);
long newRemoteCooldownTimeMs = newRemoteDate.unixTimestamp(TimeUtils.getTimeZone());
DataProperty dataProperty4 = new DataProperty(TStorageMedium.HDD, DataProperty.MAX_COOLDOWN_TIME_MS, "remote_s3", newRemoteCooldownTimeMs);
for (Partition partition : partitionList) {
Assert.assertEquals(dataProperty4, tblRemote.getPartitionInfo().getDataProperty(partition.getId()));
}
Assert.assertEquals(oldDataProperty, tblRemote.getPartitionInfo().getDataProperty(p1.getId()));
// alter recover to old state
stmt = "alter table test.tbl_remote modify partition (p2, p3, p4) set ("
+ "'storage_medium' = 'SSD', "
+ "'storage_cooldown_time' = '2122-04-01 20:24:00', "
+ "'remote_storage_cooldown_time' = '2122-12-01 20:23:00'"
+ "'storage_cooldown_time' = '2100-05-09 00:00:00'"
+ ")";
alterTable(stmt, false);
for (Partition partition : partitionList) {

View File

@ -26,7 +26,7 @@ import org.junit.Test;
public class DataPropertyTest {
@Test
public void tesCooldownTimeMs() throws Exception {
public void testCooldownTimeMs() throws Exception {
Config.default_storage_medium = "ssd";
DataProperty dataProperty = DataProperty.DEFAULT_DATA_PROPERTY;
Assert.assertNotEquals(DataProperty.MAX_COOLDOWN_TIME_MS, dataProperty.getCooldownTimeMs());
@ -35,7 +35,7 @@ public class DataPropertyTest {
Assert.assertNotEquals(DataProperty.MAX_COOLDOWN_TIME_MS, dataProperty.getCooldownTimeMs());
long storageCooldownTimeMs = System.currentTimeMillis() + 24 * 3600 * 1000L;
dataProperty = new DataProperty(TStorageMedium.SSD, storageCooldownTimeMs, "", DataProperty.MAX_COOLDOWN_TIME_MS);
dataProperty = new DataProperty(TStorageMedium.SSD, storageCooldownTimeMs, "");
Assert.assertEquals(storageCooldownTimeMs, dataProperty.getCooldownTimeMs());
dataProperty = new DataProperty(TStorageMedium.HDD);