[enhance](policy) Support to change table's storage policy if the two policy has same resource (#23665)

This commit is contained in:
AlexYue
2023-09-01 11:25:27 +08:00
committed by GitHub
parent d6450a3f1c
commit d96bc2de1a
10 changed files with 613 additions and 13 deletions

View File

@ -102,6 +102,7 @@ Or associate a storage policy with an existing partition
```
ALTER TABLE create_table_partition MODIFY PARTITION (*) SET("storage_policy"="test_policy");
```
**Note**: If the user specifies different storage policies for the entire table and certain partitions during table creation, the storage policy set for the partitions will be ignored, and all partitions of the table will use the table's policy. If you need a specific partition to have a different policy than the others, you can modify it by associating the partition with the desired storage policy, as mentioned earlier in the context of modifying an existing partition.
For details, please refer to the [resource](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-RESOURCE.md), [policy](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md), create table, alter and other documents in the docs directory
### Some restrictions

View File

@ -101,6 +101,7 @@ ALTER TABLE create_table_not_have_policy set ("storage_policy" = "test_policy");
```
ALTER TABLE create_table_partition MODIFY PARTITION (*) SET("storage_policy"="test_policy");
```
**注意**,如果用户在建表时给整张table和部分partition指定了不同的storage policy,partition设置的storage policy会被无视,整张表的所有partition都会使用table的policy. 如果您需要让某个partition的policy和别的不同,则可以使用上文中对一个已存在的partition,关联storage policy的方式修改.
具体可以参考docs目录下[resource](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-RESOURCE.md)、 [policy](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md)、 [create table](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)、 [alter table](../sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN.md)等文档,里面有详细介绍
### 一些限制

View File

@ -54,11 +54,9 @@ import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@ -189,20 +187,21 @@ public class Alter {
boolean needProcessOutsideTableLock = false;
if (currentAlterOps.checkTableStoragePolicy(alterClauses)) {
String tableStoragePolicy = olapTable.getStoragePolicy();
if (!tableStoragePolicy.isEmpty()) {
String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses);
// If the two policy has one same resource, then it's safe for the table to change policy
// There would only be the cooldown ttl or cooldown time would be affected
if (!Env.getCurrentEnv().getPolicyMgr()
.checkStoragePolicyIfSameResource(tableStoragePolicy, currentStoragePolicy)
&& !tableStoragePolicy.isEmpty()) {
for (Partition partition : olapTable.getAllPartitions()) {
for (Tablet tablet : partition.getBaseIndex().getTablets()) {
for (Replica replica : tablet.getReplicas()) {
if (replica.getRowCount() > 0 || replica.getDataSize() > 0) {
throw new DdlException("Do not support alter table's storage policy , this table ["
+ olapTable.getName() + "] has storage policy " + tableStoragePolicy
+ ", the table need to be empty.");
}
}
if (Partition.PARTITION_INIT_VERSION < partition.getVisibleVersion()) {
throw new DdlException("Do not support alter table's storage policy , this table ["
+ olapTable.getName() + "] has storage policy " + tableStoragePolicy
+ ", the table need to be empty.");
}
}
}
String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses);
// check currentStoragePolicy resource exist.
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy);

View File

@ -98,6 +98,10 @@ public class DataProperty implements Writable, GsonPostProcessable {
return storagePolicy;
}
public void setStoragePolicy(String storagePolicy) {
this.storagePolicy = storagePolicy;
}
public boolean isStorageMediumSpecified() {
return storageMediumSpecified;
}

View File

@ -1834,6 +1834,7 @@ public class OlapTable extends Table {
TableProperty tableProperty = getOrCreatTableProperty();
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, storagePolicy);
tableProperty.buildStoragePolicy();
partitionInfo.refreshTableStoragePolicy(storagePolicy);
}
public String getStoragePolicy() {

View File

@ -230,6 +230,13 @@ public class PartitionInfo implements Writable {
idToDataProperty.put(partitionId, newDataProperty);
}
public void refreshTableStoragePolicy(String storagePolicy) {
idToStoragePolicy.replaceAll((k, v) -> storagePolicy);
idToDataProperty.entrySet().forEach(entry -> {
entry.getValue().setStoragePolicy(storagePolicy);
});
}
public String getStoragePolicy(long partitionId) {
return idToStoragePolicy.getOrDefault(partitionId, "");
}

View File

@ -2223,7 +2223,12 @@ public class InternalCatalog implements CatalogIf<Database> {
"Can not create UNIQUE KEY table that enables Merge-On-write"
+ " with storage policy(" + storagePolicy + ")");
}
olapTable.setStoragePolicy(storagePolicy);
// Consider one situation: if the table has no storage policy but some partitions
// have their own storage policy then it might be erased by the following function.
// So we only set the storage policy if the table's policy is not null or empty
if (!Strings.isNullOrEmpty(storagePolicy)) {
olapTable.setStoragePolicy(storagePolicy);
}
TTabletType tabletType;
try {

View File

@ -533,4 +533,15 @@ public class PolicyMgr implements Writable {
readUnlock();
}
}
public boolean checkStoragePolicyIfSameResource(String policyName, String anotherPolicyName) {
Optional<Policy> policy = findPolicy(policyName, PolicyTypeEnum.STORAGE);
Optional<Policy> policy1 = findPolicy(anotherPolicyName, PolicyTypeEnum.STORAGE);
if (policy1.isPresent() && policy.isPresent()) {
StoragePolicy storagePolicy = (StoragePolicy) policy.get();
StoragePolicy storagePolicy1 = (StoragePolicy) policy1.get();
return storagePolicy1.getStorageResource().equals(storagePolicy.getStorageResource());
}
return false;
}
}

View File

@ -0,0 +1,288 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import groovy.json.JsonSlurper
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.time.LocalDate;
suite("add_drop_partition") {
def fetchBeHttp = { check_func, meta_url ->
def i = meta_url.indexOf("/api")
String endPoint = meta_url.substring(0, i)
String metaUri = meta_url.substring(i)
i = endPoint.lastIndexOf('/')
endPoint = endPoint.substring(i + 1)
httpTest {
endpoint endPoint
uri metaUri
op "get"
check check_func
}
}
// data_sizes is one arrayList<Long>, t is tablet
def fetchDataSize = { data_sizes, t ->
def tabletId = t[0]
String meta_url = t[17]
def clos = { respCode, body ->
logger.info("test ttl expired resp Code {}", "${respCode}".toString())
assertEquals("${respCode}".toString(), "200")
String out = "${body}".toString()
def obj = new JsonSlurper().parseText(out)
data_sizes[0] = obj.local_data_size
data_sizes[1] = obj.remote_data_size
}
fetchBeHttp(clos, meta_url.replace("header", "data_size"))
}
// used as passing out parameter to fetchDataSize
List<Long> sizes = [-1, -1]
def tableName = "tbl1"
sql """ DROP TABLE IF EXISTS ${tableName} """
def check_storage_policy_exist = { name->
def polices = sql"""
show storage policy;
"""
for (p in polices) {
if (name == p[0]) {
return true;
}
}
return false;
}
def resource_name = "test_add_drop_partition_resource"
def policy_name= "test_add_drop_partition_policy"
if (check_storage_policy_exist(policy_name)) {
sql """
DROP STORAGE POLICY ${policy_name}
"""
}
def has_resouce = sql """
SHOW RESOURCES WHERE NAME = "${resource_name}";
"""
if (has_resouce.size() > 0) {
sql """
DROP RESOURCE ${resource_name}
"""
}
sql """
CREATE RESOURCE IF NOT EXISTS "${resource_name}"
PROPERTIES(
"type"="s3",
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_REGION" = "${getS3Region()}",
"AWS_ROOT_PATH" = "regression/cooldown",
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_MAX_CONNECTIONS" = "50",
"AWS_REQUEST_TIMEOUT_MS" = "3000",
"AWS_CONNECTION_TIMEOUT_MS" = "1000",
"AWS_BUCKET" = "${getS3BucketName()}",
"s3_validity_check" = "true"
);
"""
sql """
CREATE STORAGE POLICY IF NOT EXISTS ${policy_name}
PROPERTIES(
"storage_resource" = "${resource_name}",
"cooldown_ttl" = "300"
)
"""
// test one replica
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`k1` int,
`k2` date
)
PARTITION BY RANGE(k2)(
partition p1 VALUES LESS THAN ("2014-01-01"),
partition p2 VALUES LESS THAN ("2015-01-01"),
partition p3 VALUES LESS THAN ("2016-01-01")
)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES
(
"replication_num" = "1",
"storage_policy" = "${policy_name}"
);
"""
sql """
insert into ${tableName} values(1, "2013-01-01");
"""
sql """
insert into ${tableName} values(1, "2014-01-01");
"""
sql """
insert into ${tableName} values(1, "2015-01-01");
"""
// show tablets from table, 获取第一个tablet的 LocalDataSize1
def tablets = sql """
SHOW TABLETS FROM ${tableName}
"""
log.info( "test tablets not empty")
assertTrue(tablets.size() > 0)
fetchDataSize(sizes, tablets[0])
def LocalDataSize1 = sizes[0]
def RemoteDataSize1 = sizes[1]
log.info( "test local size {} not zero, remote size {}", LocalDataSize1, RemoteDataSize1)
assertTrue(LocalDataSize1 != 0)
log.info( "test remote size is zero")
assertEquals(RemoteDataSize1, 0)
def originLocalDataSize1 = LocalDataSize1;
// 等待10min,show tablets from table, 预期not_use_storage_policy_tablet_list 的 RemoteDataSize 为0,LocalDataSize不为0
sleep(600000)
tablets = sql """
SHOW TABLETS FROM ${tableName}
"""
log.info( "test tablets not empty")
fetchDataSize(sizes, tablets[0])
while (sizes[1] == 0) {
log.info( "test remote size is zero, sleep 10s")
sleep(10000)
tablets = sql """
SHOW TABLETS FROM ${tableName}
"""
fetchDataSize(sizes, tablets[0])
}
assertTrue(tablets.size() > 0)
LocalDataSize1 = sizes[0]
RemoteDataSize1 = sizes[1]
Long sleepTimes = 0;
while (RemoteDataSize1 != originLocalDataSize1 && sleepTimes < 60) {
log.info( "test remote size is same with origin size, sleep 10s")
sleep(10000)
tablets = sql """
SHOW TABLETS FROM
"""
fetchDataSize(sizes, tablets[0])
LocalDataSize1 = sizes[0]
RemoteDataSize1 = sizes[1]
sleepTimes += 1
}
log.info( "test local size is zero")
assertEquals(LocalDataSize1, 0)
log.info( "test remote size not zero")
assertEquals(RemoteDataSize1, originLocalDataSize1)
// 12列是storage policy
def partitions = sql "show partitions from ${tableName}"
for (par in partitions) {
assertTrue(par[12] == "${policy_name}")
}
try_sql """
drop storage policy add_policy;
"""
try_sql """
drop resource add_resource;
"""
sql """
CREATE RESOURCE IF NOT EXISTS "add_resource"
PROPERTIES(
"type"="s3",
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_REGION" = "${getS3Region()}",
"AWS_ROOT_PATH" = "regression/cooldown",
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_MAX_CONNECTIONS" = "50",
"AWS_REQUEST_TIMEOUT_MS" = "3000",
"AWS_CONNECTION_TIMEOUT_MS" = "1000",
"AWS_BUCKET" = "${getS3BucketName()}",
"s3_validity_check" = "true"
);
"""
try_sql """
create storage policy tmp_policy
PROPERTIES( "storage_resource" = "add_resource", "cooldown_ttl" = "300");
"""
// can not set to one policy with different resource
try {
sql """alter table ${tableName} set ("storage_policy" = "add_policy");"""
} catch (java.sql.SQLException t) {
assertTrue(true)
}
sql """
CREATE STORAGE POLICY IF NOT EXISTS add_policy1
PROPERTIES(
"storage_resource" = "${resource_name}",
"cooldown_ttl" = "60"
)
"""
sql """alter table ${tableName} set ("storage_policy" = "add_policy1");"""
// wait for report
sleep(300000)
partitions = sql "show partitions from ${tableName}"
for (par in partitions) {
assertTrue(par[12] == "add_policy1")
}
sql """
alter table ${tableName} ADD PARTITION np
VALUES LESS THAN ("2016-01-01");
"""
sql """
insert into ${tableName} values(1, "2016-01-01");
"""
partitions = sql "show partitions from ${tableName}"
for (par in partitions) {
assertTrue(par[12] == "add_policy1")
}
sql """
sql * from ${tableName}
"""
sql """
DROP TABLE ${tableName}
"""
sql """
drop storage policy add_policy;
"""
sql """
drop storage policy add_policy1;
"""
sql """
drop resource add_resource;
"""
}

View File

@ -0,0 +1,283 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import groovy.json.JsonSlurper
import org.codehaus.groovy.runtime.IOGroovyMethods
import java.time.LocalDate;
suite("cold_heat_dynamic_partition") {
def fetchBeHttp = { check_func, meta_url ->
def i = meta_url.indexOf("/api")
String endPoint = meta_url.substring(0, i)
String metaUri = meta_url.substring(i)
i = endPoint.lastIndexOf('/')
endPoint = endPoint.substring(i + 1)
httpTest {
endpoint endPoint
uri metaUri
op "get"
check check_func
}
}
// data_sizes is one arrayList<Long>, t is tablet
def fetchDataSize = { data_sizes, t ->
def tabletId = t[0]
String meta_url = t[17]
def clos = { respCode, body ->
logger.info("test ttl expired resp Code {}", "${respCode}".toString())
assertEquals("${respCode}".toString(), "200")
String out = "${body}".toString()
def obj = new JsonSlurper().parseText(out)
data_sizes[0] = obj.local_data_size
data_sizes[1] = obj.remote_data_size
}
fetchBeHttp(clos, meta_url.replace("header", "data_size"))
}
// used as passing out parameter to fetchDataSize
List<Long> sizes = [-1, -1]
def tableName = "tbl2"
sql """ DROP TABLE IF EXISTS ${tableName} """
def check_storage_policy_exist = { name->
def polices = sql"""
show storage policy;
"""
for (p in polices) {
if (name == p[0]) {
return true;
}
}
return false;
}
def resource_name = "test_dynamic_partition_resource"
def policy_name= "test_dynamic_partition_policy"
if (check_storage_policy_exist(policy_name)) {
sql """
DROP STORAGE POLICY ${policy_name}
"""
}
def has_resouce = sql """
SHOW RESOURCES WHERE NAME = "${resource_name}";
"""
if (has_resouce.size() > 0) {
sql """
DROP RESOURCE IF EXISTS ${resource_name}
"""
}
sql """
CREATE RESOURCE IF NOT EXISTS "${resource_name}"
PROPERTIES(
"type"="s3",
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_REGION" = "${getS3Region()}",
"AWS_ROOT_PATH" = "regression/cooldown",
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_MAX_CONNECTIONS" = "50",
"AWS_REQUEST_TIMEOUT_MS" = "3000",
"AWS_CONNECTION_TIMEOUT_MS" = "1000",
"AWS_BUCKET" = "${getS3BucketName()}",
"s3_validity_check" = "true"
);
"""
sql """
CREATE STORAGE POLICY IF NOT EXISTS ${policy_name}
PROPERTIES(
"storage_resource" = "${resource_name}",
"cooldown_ttl" = "300"
)
"""
// test one replica
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE ${tableName} (
`k1` int,
`k2` date
)
PARTITION BY RANGE(k2)()
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES
(
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "1",
"dynamic_partition.replication_num" = "1",
"dynamic_partition.start" = "-3",
"storage_policy" = "${policy_name}",
"replication_num" = "1"
);
"""
LocalDate currentDate = LocalDate.now();
LocalDate currentDatePlusOne = currentDate.plusDays(1);
LocalDate currentDatePlusTwo = currentDate.plusDays(2);
LocalDate currentDatePlusThree = currentDate.plusDays(3);
sql """
insert into ${tableName} values(1, "${currentDate.toString()}");
"""
sql """
insert into ${tableName} values(1, "${currentDatePlusOne.toString()}");
"""
sql """
insert into ${tableName} values(1, "${currentDatePlusTwo.toString()}");
"""
sql """
insert into ${tableName} values(1, "${currentDatePlusThree.toString()}");
"""
// show tablets from table, 获取第一个tablet的 LocalDataSize1
def tablets = sql """
SHOW TABLETS FROM ${tableName}
"""
log.info( "test tablets not empty")
assertTrue(tablets.size() > 0)
fetchDataSize(sizes, tablets[0])
def LocalDataSize1 = sizes[0]
def RemoteDataSize1 = sizes[1]
log.info( "test local size {} not zero, remote size {}", LocalDataSize1, RemoteDataSize1)
assertTrue(LocalDataSize1 != 0)
log.info( "test remote size is zero")
assertEquals(RemoteDataSize1, 0)
def originLocalDataSize1 = LocalDataSize1;
// 等待10min,show tablets from table, 预期not_use_storage_policy_tablet_list 的 RemoteDataSize 为0,LocalDataSize不为0
sleep(600000)
tablets = sql """
SHOW TABLETS FROM ${tableName}
"""
log.info( "test tablets not empty")
fetchDataSize(sizes, tablets[0])
while (sizes[1] == 0) {
log.info( "test remote size is zero, sleep 10s")
sleep(10000)
tablets = sql """
SHOW TABLETS FROM ${tableName}
"""
fetchDataSize(sizes, tablets[0])
}
assertTrue(tablets.size() > 0)
LocalDataSize1 = sizes[0]
RemoteDataSize1 = sizes[1]
Long sleepTimes = 0;
while (RemoteDataSize1 != originLocalDataSize1 && sleepTimes < 60) {
log.info( "test remote size is same with origin size, sleep 10s")
sleep(10000)
tablets = sql """
SHOW TABLETS FROM
"""
fetchDataSize(sizes, tablets[0])
LocalDataSize1 = sizes[0]
RemoteDataSize1 = sizes[1]
sleepTimes += 1
}
log.info( "test local size is zero")
assertEquals(LocalDataSize1, 0)
log.info( "test remote size not zero")
assertEquals(RemoteDataSize1, originLocalDataSize1)
// 12列是storage policy
def partitions = sql "show partitions from ${tableName}"
for (par in partitions) {
assertTrue(par[12] == "${policy_name}")
}
try_sql """
drop storage policy tmp_policy;
"""
try_sql """
drop resource tmp_resource;
"""
sql """
CREATE RESOURCE IF NOT EXISTS "tmp_resource"
PROPERTIES(
"type"="s3",
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_REGION" = "${getS3Region()}",
"AWS_ROOT_PATH" = "regression/cooldown",
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_MAX_CONNECTIONS" = "50",
"AWS_REQUEST_TIMEOUT_MS" = "3000",
"AWS_CONNECTION_TIMEOUT_MS" = "1000",
"AWS_BUCKET" = "${getS3BucketName()}",
"s3_validity_check" = "true"
);
"""
try_sql """
create storage policy tmp_policy
PROPERTIES( "storage_resource" = "tmp_resource", "cooldown_ttl" = "300");
"""
// can not set to one policy with different resource
try {
sql """alter table ${tableName} set ("storage_policy" = "tmp_policy");"""
} catch (java.sql.SQLException t) {
assertTrue(true)
}
sql """
CREATE STORAGE POLICY IF NOT EXISTS tmp_policy1
PROPERTIES(
"storage_resource" = "${resource_name}",
"cooldown_ttl" = "60"
)
"""
sql """alter table ${tableName} set ("storage_policy" = "tmp_policy1");"""
// wait for report
sleep(300000)
partitions = sql "show partitions from ${tableName}"
for (par in partitions) {
assertTrue(par[12] == "tmp_policy1")
}
sql """
sql * from ${tableName}
"""
sql """
DROP TABLE ${tableName}
"""
sql """
drop storage policy tmp_policy;
"""
sql """
drop storage policy tmp_policy1;
"""
sql """
drop resource tmp_resource;
"""
}