cherry pick from #36874
This commit is contained in:
@ -71,6 +71,7 @@ import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This class is used to periodically add or drop partition on an olapTable which specify dynamic partition properties
|
||||
@ -186,7 +187,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
}
|
||||
|
||||
private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table,
|
||||
String nowPartitionName, boolean executeFirstTime) {
|
||||
String partitionName, String nowPartitionName, boolean executeFirstTime) {
|
||||
// if execute first time, all partitions no contain data
|
||||
if (!table.isAutoBucket() || executeFirstTime) {
|
||||
return property.getBuckets();
|
||||
@ -194,27 +195,41 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
|
||||
// auto bucket
|
||||
// get all history partitions
|
||||
ArrayList<Long> partitionSizeArray = Lists.newArrayList();
|
||||
RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo());
|
||||
List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
|
||||
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
|
||||
for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
|
||||
Partition partition = table.getPartition(idToItem.getKey());
|
||||
// exclude current partition because its data isn't enough one week/day/hour.
|
||||
if (partition != null && !partition.getName().equals(nowPartitionName)
|
||||
&& partition.getVisibleVersion() >= 2) {
|
||||
partitionSizeArray.add(partition.getAllDataSize(true));
|
||||
}
|
||||
}
|
||||
List<Partition> partitions = idToItems.stream()
|
||||
.map(entry -> table.getPartition(entry.getKey()))
|
||||
.filter(partition -> partition != null && !partition.getName().equals(nowPartitionName)
|
||||
&& partition.getVisibleVersion() >= 2)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// no exist history partition data
|
||||
if (partitionSizeArray.isEmpty()) {
|
||||
if (partitions.isEmpty()) {
|
||||
LOG.info("autobucket use property's buckets due to all partitions no data, table: [{}-{}], "
|
||||
+ "partition: {}, buckets num: {}",
|
||||
table.getName(), table.getId(), partitionName, property.getBuckets());
|
||||
return property.getBuckets();
|
||||
}
|
||||
|
||||
ArrayList<Long> partitionSizeArray = partitions.stream()
|
||||
.map(partition -> partition.getAllDataSize(true))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
long estimatePartitionSize = getNextPartitionSize(partitionSizeArray);
|
||||
// plus 5 for uncompressed data
|
||||
long uncompressedPartitionSize = getNextPartitionSize(partitionSizeArray) * 5;
|
||||
return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets);
|
||||
long uncompressedPartitionSize = estimatePartitionSize * 5;
|
||||
int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets);
|
||||
LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, "
|
||||
+ " estimate partition size: {}, last partitions(partition name, local size, remote size): {}",
|
||||
partitions.size(), table.getName(), table.getId(), partitionName, bucketsNum,
|
||||
estimatePartitionSize,
|
||||
partitions.stream()
|
||||
.skip(Math.max(0, partitions.size() - 7))
|
||||
.map(partition -> "(" + partition.getName() + ", " + partition.getDataSize(true)
|
||||
+ ", " + partition.getRemoteDataSize() + ")")
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
return bucketsNum;
|
||||
}
|
||||
|
||||
private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable,
|
||||
@ -320,7 +335,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
|
||||
DistributionDesc distributionDesc = null;
|
||||
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
|
||||
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, nowPartitionName, executeFirstTime);
|
||||
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName,
|
||||
nowPartitionName, executeFirstTime);
|
||||
if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
|
||||
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
|
||||
List<String> distColumnNames = new ArrayList<>();
|
||||
@ -488,7 +504,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
return dropPartitionClauses;
|
||||
}
|
||||
|
||||
private void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitionTableInfoCol,
|
||||
// make public just for fe ut
|
||||
public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitionTableInfoCol,
|
||||
boolean executeFirstTime) throws DdlException {
|
||||
Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfoCol.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
|
||||
Reference in New Issue
Block a user