[fix](dynamic partition) fix dynamic partition storage medium not working (#29490)
This commit is contained in:
@ -19,7 +19,6 @@ package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.TimestampArithmeticExpr.TimeUnit;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.DynamicPartitionUtil;
|
||||
@ -27,6 +26,8 @@ import org.apache.doris.common.util.DynamicPartitionUtil.StartOfDate;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
@ -97,7 +98,7 @@ public class DynamicPartitionProperty {
|
||||
this.reservedHistoryPeriods = properties.getOrDefault(
|
||||
RESERVED_HISTORY_PERIODS, NOT_SET_RESERVED_HISTORY_PERIODS);
|
||||
this.storagePolicy = properties.getOrDefault(STORAGE_POLICY, "");
|
||||
this.storageMedium = properties.getOrDefault(STORAGE_MEDIUM, Config.default_storage_medium);
|
||||
this.storageMedium = properties.getOrDefault(STORAGE_MEDIUM, "");
|
||||
createStartOfs(properties);
|
||||
} else {
|
||||
this.exist = false;
|
||||
@ -228,8 +229,10 @@ public class DynamicPartitionProperty {
|
||||
+ ",\n\"" + HISTORY_PARTITION_NUM + "\" = \"" + historyPartitionNum + "\""
|
||||
+ ",\n\"" + HOT_PARTITION_NUM + "\" = \"" + hotPartitionNum + "\""
|
||||
+ ",\n\"" + RESERVED_HISTORY_PERIODS + "\" = \"" + reservedHistoryPeriods + "\""
|
||||
+ ",\n\"" + STORAGE_POLICY + "\" = \"" + storagePolicy + "\""
|
||||
+ ",\n\"" + STORAGE_MEDIUM + "\" = \"" + storageMedium + "\"";
|
||||
+ ",\n\"" + STORAGE_POLICY + "\" = \"" + storagePolicy + "\"";
|
||||
if (!Strings.isNullOrEmpty(storageMedium)) {
|
||||
res += ",\n\"" + STORAGE_MEDIUM + "\" = \"" + storageMedium + "\"";
|
||||
}
|
||||
if (getTimeUnit().equalsIgnoreCase(TimeUnit.WEEK.toString())) {
|
||||
res += ",\n\"" + START_DAY_OF_WEEK + "\" = \"" + startOfWeek.dayOfWeek + "\"";
|
||||
} else if (getTimeUnit().equalsIgnoreCase(TimeUnit.MONTH.toString())) {
|
||||
|
||||
@ -52,6 +52,7 @@ import org.apache.doris.common.util.RangeUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Range;
|
||||
@ -351,19 +352,28 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
*/
|
||||
private void setStorageMediumProperty(HashMap<String, String> partitionProperties,
|
||||
DynamicPartitionProperty property, ZonedDateTime now, int hotPartitionNum, int offset) {
|
||||
if ((hotPartitionNum <= 0 || offset + hotPartitionNum <= 0) && !property.getStorageMedium()
|
||||
.equalsIgnoreCase("ssd")) {
|
||||
return;
|
||||
}
|
||||
String cooldownTime;
|
||||
if (property.getStorageMedium().equalsIgnoreCase("ssd")) {
|
||||
cooldownTime = TimeUtils.longToTimeString(DataProperty.MAX_COOLDOWN_TIME_MS);
|
||||
// 1. no hot partition, then use dynamic_partition.storage_medium
|
||||
// 2. has hot partition
|
||||
// 1) dynamic_partition.storage_medium = 'ssd', then use ssd;
|
||||
// 2) otherwise
|
||||
// a. cooldown partition, then use hdd
|
||||
// b. hot partition. then use ssd
|
||||
if (hotPartitionNum <= 0 || property.getStorageMedium().equalsIgnoreCase("ssd")) {
|
||||
if (!Strings.isNullOrEmpty(property.getStorageMedium())) {
|
||||
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, property.getStorageMedium());
|
||||
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME,
|
||||
TimeUtils.longToTimeString(DataProperty.MAX_COOLDOWN_TIME_MS));
|
||||
}
|
||||
} else if (offset + hotPartitionNum <= 0) {
|
||||
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.HDD.name());
|
||||
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME,
|
||||
TimeUtils.longToTimeString(DataProperty.MAX_COOLDOWN_TIME_MS));
|
||||
} else {
|
||||
cooldownTime = DynamicPartitionUtil.getPartitionRangeString(
|
||||
String cooldownTime = DynamicPartitionUtil.getPartitionRangeString(
|
||||
property, now, offset + hotPartitionNum, DynamicPartitionUtil.DATETIME_FORMAT);
|
||||
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.SSD.name());
|
||||
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, cooldownTime);
|
||||
}
|
||||
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.SSD.name());
|
||||
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, cooldownTime);
|
||||
}
|
||||
|
||||
private void setStoragePolicyProperty(HashMap<String, String> partitionProperties,
|
||||
|
||||
@ -0,0 +1,166 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.apache.doris.regression.suite.ClusterOptions
|
||||
|
||||
suite('test_storage_medium_has_disk') {
|
||||
def checkPartitionMedium = { table, isHdd ->
|
||||
def partitions = sql_return_maparray "SHOW PARTITIONS FROM ${table}"
|
||||
assertTrue(partitions.size() > 0)
|
||||
partitions.each { assertEquals(isHdd ? 'HDD' : 'SSD', it.StorageMedium) }
|
||||
}
|
||||
|
||||
def checkTableStaticPartition = { isPartitionTable, isHdd ->
|
||||
def table = "tbl_static_${isPartitionTable}_${isHdd}"
|
||||
def sqlText = "CREATE TABLE ${table} (k INT)"
|
||||
if (isPartitionTable) {
|
||||
sqlText += " PARTITION BY RANGE(k) (PARTITION p1 VALUES LESS THAN ('100'), PARTITION p2 VALUES LESS THAN ('200'))"
|
||||
}
|
||||
sqlText += ' DISTRIBUTED BY HASH(k) BUCKETS 5'
|
||||
if (isHdd) {
|
||||
sqlText += " PROPERTIES ('storage_medium' = 'hdd')"
|
||||
}
|
||||
sql sqlText
|
||||
checkPartitionMedium table, isHdd
|
||||
}
|
||||
|
||||
def beReportTablet = { ->
|
||||
def backendId_to_backendIP = [:]
|
||||
def backendId_to_backendHttpPort = [:]
|
||||
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
|
||||
backendId_to_backendIP.each { beId, beIp ->
|
||||
def port = backendId_to_backendHttpPort.get(beId) as int
|
||||
be_report_tablet beIp, port
|
||||
}
|
||||
sleep 8000
|
||||
}
|
||||
|
||||
def options = new ClusterOptions()
|
||||
options.feConfigs += ['default_storage_medium=SSD', 'dynamic_partition_check_interval_seconds=1']
|
||||
options.beConfigs += ['report_disk_state_interval_seconds=1']
|
||||
options.beDisks = ['HDD=1', 'SSD=1']
|
||||
docker(options) {
|
||||
// test static partitions
|
||||
for (def isPartitionTable : [true, false]) {
|
||||
for (def isHdd : [true, false]) {
|
||||
checkTableStaticPartition isPartitionTable, isHdd
|
||||
}
|
||||
}
|
||||
|
||||
//test dynamic partition without hot partitions
|
||||
def table = 'tbl_dynamic_1'
|
||||
sql """CREATE TABLE ${table} (k DATE)
|
||||
PARTITION BY RANGE(k)()
|
||||
DISTRIBUTED BY HASH (k) BUCKETS 5
|
||||
PROPERTIES
|
||||
(
|
||||
"storage_medium" = "hdd",
|
||||
"dynamic_partition.enable" = "true",
|
||||
"dynamic_partition.time_unit" = "DAY",
|
||||
"dynamic_partition.end" = "1",
|
||||
"dynamic_partition.prefix" = "p",
|
||||
"dynamic_partition.buckets" = "3",
|
||||
"dynamic_partition.replication_num" = "1",
|
||||
"dynamic_partition.create_history_partition"= "true",
|
||||
"dynamic_partition.start" = "-3"
|
||||
)
|
||||
"""
|
||||
checkPartitionMedium table, false
|
||||
|
||||
table = 'tbl_dynamic_2'
|
||||
sql """CREATE TABLE ${table} (k DATE)
|
||||
PARTITION BY RANGE(k)()
|
||||
DISTRIBUTED BY HASH (k) BUCKETS 5
|
||||
PROPERTIES
|
||||
(
|
||||
"storage_medium" = "ssd",
|
||||
"dynamic_partition.storage_medium" = "hdd",
|
||||
"dynamic_partition.enable" = "true",
|
||||
"dynamic_partition.time_unit" = "DAY",
|
||||
"dynamic_partition.end" = "3",
|
||||
"dynamic_partition.prefix" = "p",
|
||||
"dynamic_partition.buckets" = "3",
|
||||
"dynamic_partition.replication_num" = "1",
|
||||
"dynamic_partition.create_history_partition"= "true",
|
||||
"dynamic_partition.start" = "-4"
|
||||
)
|
||||
"""
|
||||
checkPartitionMedium table, true
|
||||
|
||||
// test dynamic_partition with hot partitions,
|
||||
// storage medium ssd will set all partitions's storage_medium to ssd
|
||||
table = 'tbl_dynamic_hot_1'
|
||||
sql """CREATE TABLE ${table} (k DATE)
|
||||
PARTITION BY RANGE(k)()
|
||||
DISTRIBUTED BY HASH (k) BUCKETS 5
|
||||
PROPERTIES
|
||||
(
|
||||
"dynamic_partition.enable" = "true",
|
||||
"dynamic_partition.time_unit" = "DAY",
|
||||
"dynamic_partition.hot_partition_num" = "1",
|
||||
"dynamic_partition.storage_medium" = "ssd",
|
||||
"dynamic_partition.end" = "3",
|
||||
"dynamic_partition.prefix" = "p",
|
||||
"dynamic_partition.buckets" = "3",
|
||||
"dynamic_partition.replication_num" = "1",
|
||||
"dynamic_partition.create_history_partition"= "true",
|
||||
"dynamic_partition.hot"= "true",
|
||||
"dynamic_partition.start" = "-4"
|
||||
)
|
||||
"""
|
||||
beReportTablet
|
||||
checkPartitionMedium table, false
|
||||
|
||||
// test dynamic_partition with hot partitions
|
||||
for (def i = 0; i < 2; i++) {
|
||||
table = 'tbl_dynamic_hot_2'
|
||||
sql "DROP TABLE IF EXISTS ${table} FORCE"
|
||||
sql """CREATE TABLE ${table} (k DATE)
|
||||
PARTITION BY RANGE(k)()
|
||||
DISTRIBUTED BY HASH (k) BUCKETS 5
|
||||
PROPERTIES
|
||||
(
|
||||
"dynamic_partition.storage_medium" = "hdd",
|
||||
"dynamic_partition.enable" = "true",
|
||||
"dynamic_partition.time_unit" = "DAY",
|
||||
"dynamic_partition.hot_partition_num" = "1",
|
||||
"dynamic_partition.end" = "3",
|
||||
"dynamic_partition.prefix" = "p",
|
||||
"dynamic_partition.buckets" = "3",
|
||||
"dynamic_partition.replication_num" = "1",
|
||||
"dynamic_partition.create_history_partition"= "true",
|
||||
"dynamic_partition.hot"= "true",
|
||||
"dynamic_partition.start" = "-4"
|
||||
)
|
||||
"""
|
||||
|
||||
beReportTablet
|
||||
def partitions = sql_return_maparray "SHOW PARTITIONS FROM ${table}"
|
||||
if (i == 0 && (partitions.size() != 8 || partitions.get(3).StorageMedium != 'HDD'
|
||||
|| partitions.get(4).StorageMedium != 'SSD')) {
|
||||
sleep 5000
|
||||
continue
|
||||
}
|
||||
|
||||
assertEquals(8, partitions.size())
|
||||
for (def j = 0; j < 8; j++) {
|
||||
assertEquals(j < 4 ? 'HDD' : 'SSD', partitions.get(j).StorageMedium)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user