This commit is contained in:
@ -185,38 +185,28 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
}
|
||||
}
|
||||
|
||||
private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table) {
|
||||
private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table, String nowPartitionName) {
|
||||
if (!table.isAutoBucket()) {
|
||||
return property.getBuckets();
|
||||
}
|
||||
|
||||
// auto bucket
|
||||
// get all history partitions
|
||||
List<Partition> partitions = Lists.newArrayList();
|
||||
ArrayList<Long> partitionSizeArray = Lists.newArrayList();
|
||||
RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo());
|
||||
List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
|
||||
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
|
||||
for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
|
||||
Partition partition = table.getPartition(idToItem.getKey());
|
||||
if (partition != null) {
|
||||
partitions.add(partition);
|
||||
}
|
||||
}
|
||||
|
||||
// no exist history partition
|
||||
if (partitions.size() == 0) {
|
||||
return property.getBuckets();
|
||||
}
|
||||
|
||||
ArrayList<Long> partitionSizeArray = Lists.newArrayList();
|
||||
for (Partition partition : partitions) {
|
||||
if (partition.getVisibleVersion() >= 2) {
|
||||
// exclude current partition because its data isn't enough one week/day/hour.
|
||||
if (partition != null && !partition.getName().equals(nowPartitionName)
|
||||
&& partition.getVisibleVersion() >= 2) {
|
||||
partitionSizeArray.add(partition.getAllDataSize(true));
|
||||
}
|
||||
}
|
||||
|
||||
// no exist history partition data
|
||||
if (partitionSizeArray.size() == 0) {
|
||||
if (partitionSizeArray.isEmpty()) {
|
||||
return property.getBuckets();
|
||||
}
|
||||
|
||||
@ -249,6 +239,12 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
int hotPartitionNum = dynamicPartitionProperty.getHotPartitionNum();
|
||||
String storagePolicyName = dynamicPartitionProperty.getStoragePolicy();
|
||||
|
||||
String nowPartitionPrevBorder = DynamicPartitionUtil.getPartitionRangeString(
|
||||
dynamicPartitionProperty, now, 0, partitionFormat);
|
||||
String nowPartitionName = dynamicPartitionProperty.getPrefix()
|
||||
+ DynamicPartitionUtil.getFormattedPartitionName(dynamicPartitionProperty.getTimeZone(),
|
||||
nowPartitionPrevBorder, dynamicPartitionProperty.getTimeUnit());
|
||||
|
||||
for (; idx <= dynamicPartitionProperty.getEnd(); idx++) {
|
||||
String prevBorder = DynamicPartitionUtil.getPartitionRangeString(
|
||||
dynamicPartitionProperty, now, idx, partitionFormat);
|
||||
@ -322,7 +318,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
|
||||
DistributionDesc distributionDesc = null;
|
||||
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
|
||||
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable);
|
||||
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, nowPartitionName);
|
||||
if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
|
||||
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
|
||||
List<String> distColumnNames = new ArrayList<>();
|
||||
|
||||
@ -93,6 +93,7 @@ public class AutoBucketUtils {
|
||||
if (bucketsNum < bucketsNumByPartitionSize && bucketsNum < beNum) {
|
||||
bucketsNum = beNum;
|
||||
}
|
||||
bucketsNum = Math.min(bucketsNum, Config.autobucket_max_buckets);
|
||||
logger.debug("AutoBucketsUtil: final bucketsNum {}", bucketsNum);
|
||||
return bucketsNum;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user