[improvement](statistics) Analyze partition columns when new partition loaded data for the first time. (#29154)
The first time load data to a partition, we need to analyze the partition columns even when the health rate is high. Because if not, the min max value of the column may not include the new partition values, which may cause bad plan.
This commit is contained in:
@ -54,6 +54,7 @@ public class ShowTableStatsStmt extends ShowStmt {
|
||||
.add("updated_time")
|
||||
.add("columns")
|
||||
.add("trigger")
|
||||
.add("new_partition")
|
||||
.build();
|
||||
|
||||
private final TableName tableName;
|
||||
@ -149,6 +150,7 @@ public class ShowTableStatsStmt extends ShowStmt {
|
||||
row.add(formattedDateTime);
|
||||
row.add(tableStatistic.analyzeColumns().toString());
|
||||
row.add(tableStatistic.jobType.toString());
|
||||
row.add(String.valueOf(tableStatistic.newPartitionLoaded.get()));
|
||||
result.add(row);
|
||||
return new ShowResultSet(getMetaData(), result);
|
||||
}
|
||||
|
||||
@ -933,6 +933,15 @@ public class AnalysisManager implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
// Set to true means new partition loaded data
|
||||
public void setNewPartitionLoaded(long tblId) {
|
||||
TableStatsMeta statsStatus = idToTblStats.get(tblId);
|
||||
if (statsStatus != null) {
|
||||
statsStatus.newPartitionLoaded.set(true);
|
||||
logCreateTableStats(statsStatus);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateTableStatsStatus(TableStatsMeta tableStats) {
|
||||
replayUpdateTableStatsStatus(tableStats);
|
||||
logCreateTableStats(tableStats);
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -38,6 +39,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.LocalTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -152,8 +154,8 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
return false;
|
||||
}
|
||||
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
|
||||
// means it's never got analyzed
|
||||
if (tableStats == null) {
|
||||
// means it's never got analyzed or new partition loaded data.
|
||||
if (tableStats == null || tableStats.newPartitionLoaded.get()) {
|
||||
return false;
|
||||
}
|
||||
return System.currentTimeMillis()
|
||||
@ -191,8 +193,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
|
||||
@VisibleForTesting
|
||||
protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
|
||||
TableIf table = StatisticsUtil
|
||||
.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId);
|
||||
TableIf table = StatisticsUtil.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId);
|
||||
// Skip tables that are too width.
|
||||
if (table.getBaseSchema().size() > StatisticsUtil.getAutoAnalyzeTableWidthThreshold()) {
|
||||
return null;
|
||||
@ -200,16 +201,27 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
|
||||
AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager();
|
||||
TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId());
|
||||
if (!table.needReAnalyzeTable(tblStats)) {
|
||||
return null;
|
||||
|
||||
Map<String, Set<String>> needRunPartitions = null;
|
||||
String colNames = jobInfo.colName;
|
||||
if (table.needReAnalyzeTable(tblStats)) {
|
||||
needRunPartitions = table.findReAnalyzeNeededPartitions();
|
||||
} else if (table instanceof OlapTable && tblStats.newPartitionLoaded.get()) {
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
needRunPartitions = new HashMap<>();
|
||||
Set<String> partitionColumnNames = olapTable.getPartitionInfo().getPartitionColumns().stream()
|
||||
.map(Column::getName).collect(Collectors.toSet());
|
||||
colNames = partitionColumnNames.stream().collect(Collectors.joining(","));
|
||||
Set<String> partitionNames = olapTable.getAllPartitions().stream()
|
||||
.map(Partition::getName).collect(Collectors.toSet());
|
||||
for (String column : partitionColumnNames) {
|
||||
needRunPartitions.put(column, partitionNames);
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, Set<String>> needRunPartitions = table.findReAnalyzeNeededPartitions();
|
||||
|
||||
if (needRunPartitions.isEmpty()) {
|
||||
if (needRunPartitions == null || needRunPartitions.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return new AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build();
|
||||
return new AnalysisInfoBuilder(jobInfo).setColName(colNames).setColToPartitions(needRunPartitions).build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
@ -37,6 +38,7 @@ import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -67,6 +69,9 @@ public class TableStatsMeta implements Writable {
|
||||
@SerializedName("trigger")
|
||||
public JobType jobType;
|
||||
|
||||
@SerializedName("newPartitionLoaded")
|
||||
public AtomicBoolean newPartitionLoaded = new AtomicBoolean(false);
|
||||
|
||||
@VisibleForTesting
|
||||
public TableStatsMeta() {
|
||||
tblId = 0;
|
||||
@ -154,6 +159,15 @@ public class TableStatsMeta implements Writable {
|
||||
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
|
||||
.map(Column::getName).collect(Collectors.toSet()))) {
|
||||
updatedRows.set(0);
|
||||
newPartitionLoaded.set(false);
|
||||
}
|
||||
if (tableIf instanceof OlapTable) {
|
||||
PartitionInfo partitionInfo = ((OlapTable) tableIf).getPartitionInfo();
|
||||
if (partitionInfo != null && analyzedJob.colToPartitions.keySet()
|
||||
.containsAll(partitionInfo.getPartitionColumns().stream()
|
||||
.map(Column::getName).collect(Collectors.toSet()))) {
|
||||
newPartitionLoaded.set(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1830,6 +1830,7 @@ public class DatabaseTransactionMgr {
|
||||
|
||||
private boolean updateCatalogAfterVisible(TransactionState transactionState, Database db) {
|
||||
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
for (TableCommitInfo tableCommitInfo : transactionState.getIdToTableCommitInfos().values()) {
|
||||
long tableId = tableCommitInfo.getTableId();
|
||||
OlapTable table = (OlapTable) db.getTableNullable(tableId);
|
||||
@ -1889,6 +1890,10 @@ public class DatabaseTransactionMgr {
|
||||
} // end for indices
|
||||
long version = partitionCommitInfo.getVersion();
|
||||
long versionTime = partitionCommitInfo.getVersionTime();
|
||||
if (partition.getVisibleVersion() == Partition.PARTITION_INIT_VERSION
|
||||
&& version > Partition.PARTITION_INIT_VERSION) {
|
||||
analysisManager.setNewPartitionLoaded(tableId);
|
||||
}
|
||||
partition.updateVisibleVersionAndTime(version, versionTime);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("transaction state {} set partition {}'s version to [{}]",
|
||||
@ -1896,7 +1901,6 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
}
|
||||
}
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
Map<Long, Long> tableIdToTotalNumDeltaRows = transactionState.getTableIdToTotalNumDeltaRows();
|
||||
Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
|
||||
tableIdToTotalNumDeltaRows
|
||||
|
||||
@ -298,8 +298,13 @@ public class StatisticsAutoCollectorTest {
|
||||
};
|
||||
// A very huge table has been updated recently, so we should skip it this time
|
||||
stats.updatedTime = System.currentTimeMillis() - 1000;
|
||||
stats.newPartitionLoaded = new AtomicBoolean();
|
||||
stats.newPartitionLoaded.set(true);
|
||||
StatisticsAutoCollector autoCollector = new StatisticsAutoCollector();
|
||||
// Test new partition loaded data for the first time. Not skip.
|
||||
Assertions.assertFalse(autoCollector.skip(olapTable));
|
||||
stats.newPartitionLoaded.set(false);
|
||||
// Assertions.assertTrue(autoCollector.skip(olapTable));
|
||||
// The update of this huge table is long time ago, so we shouldn't skip it this time
|
||||
stats.updatedTime = System.currentTimeMillis()
|
||||
- StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis() - 10000;
|
||||
|
||||
@ -2576,6 +2576,42 @@ PARTITION `p599` VALUES IN (599)
|
||||
assertEquals("\'name1\'", result[0][6])
|
||||
assertEquals("\'name3\'", result[0][7])
|
||||
|
||||
// Test partititon load data for the first time.
|
||||
sql """
|
||||
CREATE TABLE `partition_test` (
|
||||
`id` INT NOT NULL,
|
||||
`name` VARCHAR(25) NOT NULL,
|
||||
`comment` VARCHAR(152) NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`id`)
|
||||
COMMENT 'OLAP'
|
||||
PARTITION BY RANGE(`id`)
|
||||
(PARTITION p1 VALUES [("0"), ("100")),
|
||||
PARTITION p2 VALUES [("100"), ("200")),
|
||||
PARTITION p3 VALUES [("200"), ("300")))
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1");
|
||||
"""
|
||||
|
||||
sql """analyze table partition_test with sync"""
|
||||
sql """insert into partition_test values (1, '1', '1')"""
|
||||
def partition_result = sql """show table stats partition_test"""
|
||||
assertEquals(partition_result[0][6], "true")
|
||||
assertEquals(partition_result[0][0], "1")
|
||||
sql """analyze table partition_test with sync"""
|
||||
partition_result = sql """show table stats partition_test"""
|
||||
assertEquals(partition_result[0][6], "false")
|
||||
sql """insert into partition_test values (101, '1', '1')"""
|
||||
partition_result = sql """show table stats partition_test"""
|
||||
assertEquals(partition_result[0][6], "true")
|
||||
sql """analyze table partition_test(id) with sync"""
|
||||
partition_result = sql """show table stats partition_test"""
|
||||
assertEquals(partition_result[0][6], "false")
|
||||
sql """insert into partition_test values (102, '1', '1')"""
|
||||
partition_result = sql """show table stats partition_test"""
|
||||
assertEquals(partition_result[0][6], "false")
|
||||
|
||||
// Test trigger type.
|
||||
sql """DROP DATABASE IF EXISTS trigger"""
|
||||
sql """CREATE DATABASE IF NOT EXISTS trigger"""
|
||||
|
||||
Reference in New Issue
Block a user