From 474295cf31513775fc08bde748b07d2f8cef691a Mon Sep 17 00:00:00 2001 From: yujun Date: Thu, 27 Jun 2024 22:30:13 +0800 Subject: [PATCH] [chore](autobucket) add autobucket test and log #36874 (#36907) cherry pick from #36874 --- .../clone/DynamicPartitionScheduler.java | 47 +++++++++++++------ .../catalog/DynamicPartitionTableTest.java | 38 +++++++++++++++ 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index 8c5f4f669c..38d68c320e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -71,6 +71,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * This class is used to periodically add or drop partition on an olapTable which specify dynamic partition properties @@ -186,7 +187,7 @@ public class DynamicPartitionScheduler extends MasterDaemon { } private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table, - String nowPartitionName, boolean executeFirstTime) { + String partitionName, String nowPartitionName, boolean executeFirstTime) { // if execute first time, all partitions no contain data if (!table.isAutoBucket() || executeFirstTime) { return property.getBuckets(); @@ -194,27 +195,41 @@ public class DynamicPartitionScheduler extends MasterDaemon { // auto bucket // get all history partitions - ArrayList partitionSizeArray = Lists.newArrayList(); RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo()); List> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet()); idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint())); - for (Map.Entry idToItem : idToItems) { - Partition partition = table.getPartition(idToItem.getKey()); - // exclude current partition because its data isn't enough one week/day/hour. - if (partition != null && !partition.getName().equals(nowPartitionName) - && partition.getVisibleVersion() >= 2) { - partitionSizeArray.add(partition.getAllDataSize(true)); - } - } + List partitions = idToItems.stream() + .map(entry -> table.getPartition(entry.getKey())) + .filter(partition -> partition != null && !partition.getName().equals(nowPartitionName) + && partition.getVisibleVersion() >= 2) + .collect(Collectors.toList()); // no exist history partition data - if (partitionSizeArray.isEmpty()) { + if (partitions.isEmpty()) { + LOG.info("autobucket use property's buckets due to all partitions no data, table: [{}-{}], " + + "partition: {}, buckets num: {}", + table.getName(), table.getId(), partitionName, property.getBuckets()); return property.getBuckets(); } + ArrayList partitionSizeArray = partitions.stream() + .map(partition -> partition.getAllDataSize(true)) + .collect(Collectors.toCollection(ArrayList::new)); + long estimatePartitionSize = getNextPartitionSize(partitionSizeArray); // plus 5 for uncompressed data - long uncompressedPartitionSize = getNextPartitionSize(partitionSizeArray) * 5; - return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); + long uncompressedPartitionSize = estimatePartitionSize * 5; + int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets); + LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, " + + " estimate partition size: {}, last partitions(partition name, local size, remote size): {}", + partitions.size(), table.getName(), table.getId(), partitionName, bucketsNum, + estimatePartitionSize, + partitions.stream() + .skip(Math.max(0, partitions.size() - 7)) + .map(partition -> "(" + partition.getName() + ", " + partition.getDataSize(true) + + ", " + partition.getRemoteDataSize() + ")") + .collect(Collectors.toList())); + + return bucketsNum; } private ArrayList getAddPartitionClause(Database db, OlapTable olapTable, @@ -320,7 +335,8 @@ public class DynamicPartitionScheduler extends MasterDaemon { DistributionDesc distributionDesc = null; DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo(); - int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, nowPartitionName, executeFirstTime); + int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName, + nowPartitionName, executeFirstTime); if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) { HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; List distColumnNames = new ArrayList<>(); @@ -488,7 +504,8 @@ public class DynamicPartitionScheduler extends MasterDaemon { return dropPartitionClauses; } - private void executeDynamicPartition(Collection> dynamicPartitionTableInfoCol, + // make public just for fe ut + public void executeDynamicPartition(Collection> dynamicPartitionTableInfoCol, boolean executeFirstTime) throws DdlException { Iterator> iterator = dynamicPartitionTableInfoCol.iterator(); while (iterator.hasNext()) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index d457be0324..c2f1383732 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -22,11 +22,13 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.clone.DynamicPartitionScheduler; +import org.apache.doris.clone.RebalancerTestUtil; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TStorageMedium; @@ -46,6 +48,7 @@ import java.time.format.DateTimeFormatter; import java.util.Calendar; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.GregorianCalendar; import java.util.Iterator; @@ -1719,4 +1722,39 @@ public class DynamicPartitionTableTest { + ");"; ExceptionChecker.expectThrowsNoException(() -> createTable(createOlapTblStmt2)); } + + @Test + public void testAutoBuckets() throws Exception { + String createOlapTblStmt = " CREATE TABLE test.test_autobucket_dynamic_partition \n" + + " (k1 DATETIME)\n" + + " PARTITION BY RANGE (k1) () DISTRIBUTED BY HASH (k1) BUCKETS AUTO\n" + + " PROPERTIES (\n" + + " \"dynamic_partition.enable\" = \"true\",\n" + + " \"dynamic_partition.time_unit\" = \"YEAR\",\n" + + " \"dynamic_partition.end\" = \"1\",\n" + + " \"dynamic_partition.prefix\" = \"p\",\n" + + " \"replication_allocation\" = \"tag.location.default: 1\"\n" + + ")"; + ExceptionChecker.expectThrowsNoException(() -> createTable(createOlapTblStmt)); + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException("test"); + OlapTable table = (OlapTable) db.getTableOrAnalysisException("test_autobucket_dynamic_partition"); + List partitions = Lists.newArrayList(table.getAllPartitions()); + Assert.assertEquals(2, partitions.size()); + for (Partition partition : partitions) { + Assert.assertEquals(FeConstants.default_bucket_num, partition.getDistributionInfo().getBucketNum()); + partition.setVisibleVersionAndTime(2L, System.currentTimeMillis()); + } + RebalancerTestUtil.updateReplicaDataSize(1, 1, 1); + + String alterStmt = + "alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '2')"; + ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt)); + List> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(db.getId(), table.getId())); + Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false); + + partitions = Lists.newArrayList(table.getAllPartitions()); + partitions.sort(Comparator.comparing(Partition::getId)); + Assert.assertEquals(3, partitions.size()); + Assert.assertEquals(1, partitions.get(2).getDistributionInfo().getBucketNum()); + } }