From 2308881e9f593d2fc87d397c7e11917eb5bbedd6 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Fri, 29 Dec 2023 14:36:48 +0800 Subject: [PATCH] [improvement](statistics) Analyze partition columns when new partition loaded data for the first time. (#29154) The first time load data to a partition, we need to analyze the partition columns even when the health rate is high. Because if not, the min max value of the column may not include the new partition values, which may cause bad plan. --- .../doris/analysis/ShowTableStatsStmt.java | 2 ++ .../doris/statistics/AnalysisManager.java | 9 +++++ .../statistics/StatisticsAutoCollector.java | 34 ++++++++++++------ .../doris/statistics/TableStatsMeta.java | 14 ++++++++ .../transaction/DatabaseTransactionMgr.java | 6 +++- .../StatisticsAutoCollectorTest.java | 5 +++ .../suites/statistics/analyze_stats.groovy | 36 +++++++++++++++++++ 7 files changed, 94 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java index 3a80daebc9..284b6248b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java @@ -54,6 +54,7 @@ public class ShowTableStatsStmt extends ShowStmt { .add("updated_time") .add("columns") .add("trigger") + .add("new_partition") .build(); private final TableName tableName; @@ -149,6 +150,7 @@ public class ShowTableStatsStmt extends ShowStmt { row.add(formattedDateTime); row.add(tableStatistic.analyzeColumns().toString()); row.add(tableStatistic.jobType.toString()); + row.add(String.valueOf(tableStatistic.newPartitionLoaded.get())); result.add(row); return new ShowResultSet(getMetaData(), result); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 0a52cc8fae..63ae94e37d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -933,6 +933,15 @@ public class AnalysisManager implements Writable { } } + // Set to true means new partition loaded data + public void setNewPartitionLoaded(long tblId) { + TableStatsMeta statsStatus = idToTblStats.get(tblId); + if (statsStatus != null) { + statsStatus.newPartitionLoaded.set(true); + logCreateTableStats(statsStatus); + } + } + public void updateTableStatsStatus(TableStatsMeta tableStats) { replayUpdateTableStatsStatus(tableStats); logCreateTableStats(tableStats); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 17d666b7ea..fe90a2a59d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; @@ -38,6 +39,7 @@ import org.apache.logging.log4j.Logger; import java.time.LocalTime; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -152,8 +154,8 @@ public class StatisticsAutoCollector extends StatisticsCollector { return false; } TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); - // means it's never got analyzed - if (tableStats == null) { + // means it's never got analyzed or new partition loaded data. + if (tableStats == null || tableStats.newPartitionLoaded.get()) { return false; } return System.currentTimeMillis() @@ -191,8 +193,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { @VisibleForTesting protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) { - TableIf table = StatisticsUtil - .findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId); + TableIf table = StatisticsUtil.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId); // Skip tables that are too width. if (table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) { return null; @@ -200,16 +201,27 @@ public class StatisticsAutoCollector extends StatisticsCollector { AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager(); TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId()); - if (!table.needReAnalyzeTable(tblStats)) { - return null; + + Map> needRunPartitions = null; + String colNames = jobInfo.colName; + if (table.needReAnalyzeTable(tblStats)) { + needRunPartitions = table.findReAnalyzeNeededPartitions(); + } else if (table instanceof OlapTable && tblStats.newPartitionLoaded.get()) { + OlapTable olapTable = (OlapTable) table; + needRunPartitions = new HashMap<>(); + Set partitionColumnNames = olapTable.getPartitionInfo().getPartitionColumns().stream() + .map(Column::getName).collect(Collectors.toSet()); + colNames = partitionColumnNames.stream().collect(Collectors.joining(",")); + Set partitionNames = olapTable.getAllPartitions().stream() + .map(Partition::getName).collect(Collectors.toSet()); + for (String column : partitionColumnNames) { + needRunPartitions.put(column, partitionNames); + } } - Map> needRunPartitions = table.findReAnalyzeNeededPartitions(); - - if (needRunPartitions.isEmpty()) { + if (needRunPartitions == null || needRunPartitions.isEmpty()) { return null; } - - return new AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build(); + return new AnalysisInfoBuilder(jobInfo).setColName(colNames).setColToPartitions(needRunPartitions).build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index eb6672ffe1..00878adcc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -19,6 +19,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -37,6 +38,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -67,6 +69,9 @@ public class TableStatsMeta implements Writable { @SerializedName("trigger") public JobType jobType; + @SerializedName("newPartitionLoaded") + public AtomicBoolean newPartitionLoaded = new AtomicBoolean(false); + @VisibleForTesting public TableStatsMeta() { tblId = 0; @@ -154,6 +159,15 @@ public class TableStatsMeta implements Writable { .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) .map(Column::getName).collect(Collectors.toSet()))) { updatedRows.set(0); + newPartitionLoaded.set(false); + } + if (tableIf instanceof OlapTable) { + PartitionInfo partitionInfo = ((OlapTable) tableIf).getPartitionInfo(); + if (partitionInfo != null && analyzedJob.colToPartitions.keySet() + .containsAll(partitionInfo.getPartitionColumns().stream() + .map(Column::getName).collect(Collectors.toSet()))) { + newPartitionLoaded.set(false); + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 5bb954289c..f0217e71d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1830,6 +1830,7 @@ public class DatabaseTransactionMgr { private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) { Set errorReplicaIds = transactionState.getErrorReplicas(); + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) { long tableId = tableCommitInfo.getTableId(); OlapTable table = (OlapTable) db.getTableNullable(tableId); @@ -1889,6 +1890,10 @@ public class DatabaseTransactionMgr { } // end for indices long version = partitionCommitInfo.getVersion(); long versionTime = partitionCommitInfo.getVersionTime(); + if (partition.getVisibleVersion() == Partition.PARTITION_INIT_VERSION + && version > Partition.PARTITION_INIT_VERSION) { + analysisManager.setNewPartitionLoaded(tableId); + } partition.updateVisibleVersionAndTime(version, versionTime); if (LOG.isDebugEnabled()) { LOG.debug("transaction state {} set partition {}'s version to [{}]", @@ -1896,7 +1901,6 @@ public class DatabaseTransactionMgr { } } } - AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); Map tableIdToTotalNumDeltaRows = transactionState.getTableIdToTotalNumDeltaRows(); Map tableIdToNumDeltaRows = Maps.newHashMap(); tableIdToTotalNumDeltaRows diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java index 87342202fb..f6def60908 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java @@ -298,8 +298,13 @@ public class StatisticsAutoCollectorTest { }; // A very huge table has been updated recently, so we should skip it this time stats.updatedTime = System.currentTimeMillis() - 1000; + stats.newPartitionLoaded = new AtomicBoolean(); + stats.newPartitionLoaded.set(true); StatisticsAutoCollector autoCollector = new StatisticsAutoCollector(); + // Test new partition loaded data for the first time. Not skip. Assertions.assertFalse(autoCollector.skip(olapTable)); + stats.newPartitionLoaded.set(false); + // Assertions.assertTrue(autoCollector.skip(olapTable)); // The update of this huge table is long time ago, so we shouldn't skip it this time stats.updatedTime = System.currentTimeMillis() - StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 10000; diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index 64967280ce..7bacd4c833 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -2576,6 +2576,42 @@ PARTITION `p599` VALUES IN (599) assertEquals("\'name1\'", result[0][6]) assertEquals("\'name3\'", result[0][7]) + // Test partititon load data for the first time. + sql """ + CREATE TABLE `partition_test` ( + `id` INT NOT NULL, + `name` VARCHAR(25) NOT NULL, + `comment` VARCHAR(152) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + COMMENT 'OLAP' + PARTITION BY RANGE(`id`) + (PARTITION p1 VALUES [("0"), ("100")), + PARTITION p2 VALUES [("100"), ("200")), + PARTITION p3 VALUES [("200"), ("300"))) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + + sql """analyze table partition_test with sync""" + sql """insert into partition_test values (1, '1', '1')""" + def partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "true") + assertEquals(partition_result[0][0], "1") + sql """analyze table partition_test with sync""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "false") + sql """insert into partition_test values (101, '1', '1')""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "true") + sql """analyze table partition_test(id) with sync""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "false") + sql """insert into partition_test values (102, '1', '1')""" + partition_result = sql """show table stats partition_test""" + assertEquals(partition_result[0][6], "false") + // Test trigger type. sql """DROP DATABASE IF EXISTS trigger""" sql """CREATE DATABASE IF NOT EXISTS trigger"""