From 031d35d4a19da6584fd085fb25a75b97bfd04aff Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Tue, 18 Apr 2023 17:41:10 +0900 Subject: [PATCH] [fix](stats) Stats still in cache after user dropped it (#18720) 1. Evict the dropped stats from cache 2. Remove codes for the partition level stats collection 3. Disable analyze whole database directly 4. Fix the potential death loop in the stats cleaner 5. Sleep thread in each loop when scanning stats table to avoid excessive IO usage by this task. --- fe/fe-core/src/main/cup/sql_parser.cup | 14 +-- .../apache/doris/analysis/AnalyzeStmt.java | 40 ------- .../apache/doris/analysis/DropStatsStmt.java | 112 ++++++------------ .../doris/statistics/AnalysisManager.java | 36 ++---- .../doris/statistics/AnalysisTaskInfo.java | 7 +- .../statistics/AnalysisTaskInfoBuilder.java | 10 +- .../doris/statistics/HistogramTask.java | 2 +- .../doris/statistics/MVAnalysisTask.java | 2 +- .../doris/statistics/OlapAnalysisTask.java | 2 +- .../doris/statistics/StatisticConstants.java | 1 + .../doris/statistics/StatisticsCache.java | 3 +- .../doris/statistics/StatisticsCleaner.java | 8 +- .../statistics/StatisticsRepository.java | 63 ++-------- .../doris/statistics/AnalysisJobTest.java | 2 - .../statistics/AnalysisTaskExecutorTest.java | 2 - .../doris/statistics/HistogramTaskTest.java | 3 - .../data/statistics/analyze_test.out | 3 + .../suites/statistics/analyze_test.groovy | 43 +++++-- 18 files changed, 116 insertions(+), 237 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index a860b74d4f..90a254302d 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2812,30 +2812,30 @@ show_create_reporitory_stmt ::= // analyze statment analyze_stmt ::= - KW_ANALYZE opt_sync:sync KW_TABLE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties + KW_ANALYZE opt_sync:sync KW_TABLE table_name:tbl opt_col_list:cols opt_properties:properties {: boolean is_whole_tbl = (cols == null); boolean is_histogram = false; boolean is_increment = false; - RESULT = new AnalyzeStmt(tbl, sync, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment); + RESULT = new AnalyzeStmt(tbl, sync, cols, properties, is_whole_tbl, is_histogram, is_increment); :} | KW_ANALYZE opt_sync:sync KW_INCREMENTAL KW_TABLE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties {: boolean is_whole_tbl = (cols == null); boolean is_histogram = false; boolean is_increment = true; - RESULT = new AnalyzeStmt(tbl, sync, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment); + RESULT = new AnalyzeStmt(tbl, sync, cols, properties, is_whole_tbl, is_histogram, is_increment); :} | KW_ANALYZE opt_sync:sync KW_TABLE table_name:tbl KW_UPDATE KW_HISTOGRAM KW_ON ident_list:cols opt_partition_names:partitionNames opt_properties:properties {: boolean is_whole_tbl = false; boolean is_histogram = true; boolean is_increment = false; - RESULT = new AnalyzeStmt(tbl, sync, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment); + RESULT = new AnalyzeStmt(tbl, sync, cols, properties, is_whole_tbl, is_histogram, is_increment); :} | KW_ANALYZE opt_sync:sync KW_TABLE table_name:tbl KW_UPDATE KW_HISTOGRAM {: - RESULT = new AnalyzeStmt(tbl, sync, null, null, new HashMap<>(), true, true, false); + RESULT = new AnalyzeStmt(tbl, sync, null, new HashMap<>(), true, true, false); :} ; @@ -3014,9 +3014,9 @@ drop_stmt ::= RESULT = new DropPolicyStmt(PolicyTypeEnum.STORAGE, ifExists, policyName, null, null); :} /* statistics */ - | KW_DROP KW_STATS opt_table_name:tbl opt_col_list:cols opt_partition_names:partitionNames + | KW_DROP KW_STATS table_name:tbl opt_col_list:cols {: - RESULT = new DropStatsStmt(tbl, partitionNames, cols); + RESULT = new DropStatsStmt(tbl, cols); :} | KW_DROP KW_EXPIRED KW_STATS {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java index 40bdb0b1fb..d189a8aeee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java @@ -21,8 +21,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.common.AnalysisException; @@ -75,7 +73,6 @@ public class AnalyzeStmt extends DdlStmt { private final TableName tableName; private final boolean sync; - private final PartitionNames partitionNames; private final List columnNames; private final Map properties; @@ -86,7 +83,6 @@ public class AnalyzeStmt extends DdlStmt { public AnalyzeStmt(TableName tableName, boolean sync, List columnNames, - PartitionNames partitionNames, Map properties, Boolean isWholeTbl, Boolean isHistogram, @@ -94,7 +90,6 @@ public class AnalyzeStmt extends DdlStmt { this.tableName = tableName; this.sync = sync; this.columnNames = columnNames; - this.partitionNames = partitionNames; this.properties = properties; this.isWholeTbl = isWholeTbl; this.isHistogram = isHistogram; @@ -137,8 +132,6 @@ public class AnalyzeStmt extends DdlStmt { } } - checkPartitionNames(); - checkProperties(); } @@ -159,29 +152,6 @@ public class AnalyzeStmt extends DdlStmt { } } - private void checkPartitionNames() throws AnalysisException { - if (partitionNames != null) { - partitionNames.analyze(analyzer); - Database db = analyzer.getEnv().getInternalCatalog() - .getDbOrAnalysisException(tableName.getDb()); - OlapTable olapTable = (OlapTable) db.getTableOrAnalysisException(tableName.getTbl()); - if (!olapTable.isPartitioned()) { - throw new AnalysisException("Not a partitioned table: " + olapTable.getName()); - } - List names = partitionNames.getPartitionNames(); - Set olapPartitionNames = olapTable.getPartitionNames(); - List tempPartitionNames = olapTable.getTempPartitions().stream() - .map(Partition::getName).collect(Collectors.toList()); - Optional illegalPartitionName = names.stream() - .filter(name -> (tempPartitionNames.contains(name) - || !olapPartitionNames.contains(name))) - .findFirst(); - if (illegalPartitionName.isPresent()) { - throw new AnalysisException("Temporary partition or partition does not exist"); - } - } - } - private void checkProperties() throws UserException { if (properties != null) { Optional optional = properties.keySet().stream().filter( @@ -227,11 +197,6 @@ public class AnalyzeStmt extends DdlStmt { .stream().map(Column::getName).collect(Collectors.toSet()) : Sets.newHashSet(columnNames); } - public Set getPartitionNames() { - return partitionNames == null ? Sets.newHashSet(table.getPartitionNames()) - : Sets.newHashSet(partitionNames.getPartitionNames()); - } - public Map getProperties() { // TODO add default properties return properties != null ? properties : Maps.newHashMap(); @@ -263,11 +228,6 @@ public class AnalyzeStmt extends DdlStmt { sb.append(")"); } - if (partitionNames != null) { - sb.append(" "); - sb.append(partitionNames.toSql()); - } - if (properties != null) { sb.append(" "); sb.append("PROPERTIES("); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java index d1d594f61d..137db5e310 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java @@ -17,10 +17,9 @@ package org.apache.doris.analysis; -import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; @@ -31,9 +30,9 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; -import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -51,26 +50,20 @@ public class DropStatsStmt extends DdlStmt { public final boolean dropExpired; private final TableName tableName; - private final PartitionNames partitionNames; - private final List columnNames; + private Set columnNames; - // after analyzed - private long dbId; - private final Set tbIds = Sets.newHashSet(); - private final Set partitionIds = Sets.newHashSet(); + private long tblId; public DropStatsStmt(boolean dropExpired) { this.dropExpired = dropExpired; this.tableName = null; - this.partitionNames = null; this.columnNames = null; } public DropStatsStmt(TableName tableName, - PartitionNames partitionNames, List columnNames) { + List columnNames) { this.tableName = tableName; - this.partitionNames = partitionNames; - this.columnNames = columnNames; + this.columnNames = new HashSet<>(columnNames); dropExpired = false; } @@ -80,75 +73,43 @@ public class DropStatsStmt extends DdlStmt { if (dropExpired) { return; } - if (tableName != null) { - tableName.analyze(analyzer); - - String catalogName = tableName.getCtl(); - String dbName = tableName.getDb(); - String tblName = tableName.getTbl(); - CatalogIf catalog = analyzer.getEnv().getCatalogMgr() - .getCatalogOrAnalysisException(catalogName); - DatabaseIf db = catalog.getDbOrAnalysisException(dbName); - TableIf table = db.getTableOrAnalysisException(tblName); - - dbId = db.getId(); - tbIds.add(table.getId()); - - // disallow external catalog - Util.prohibitExternalCatalog(tableName.getCtl(), - this.getClass().getSimpleName()); - - // check permission - checkAnalyzePriv(db.getFullName(), table.getName()); - - // check partitionNames - if (partitionNames != null) { - partitionNames.analyze(analyzer); - partitionIds.addAll(partitionNames.getPartitionNames().stream() - .map(name -> table.getPartition(name).getId()) - .collect(Collectors.toList())); - } - - // check columnNames - if (columnNames != null) { - for (String cName : columnNames) { - if (table.getColumn(cName) == null) { - ErrorReport.reportAnalysisException( - ErrorCode.ERR_WRONG_COLUMN_NAME, - "DROP", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - cName); - } + tableName.analyze(analyzer); + String catalogName = tableName.getCtl(); + String dbName = tableName.getDb(); + String tblName = tableName.getTbl(); + CatalogIf catalog = analyzer.getEnv().getCatalogMgr() + .getCatalogOrAnalysisException(catalogName); + DatabaseIf db = catalog.getDbOrAnalysisException(dbName); + TableIf table = db.getTableOrAnalysisException(tblName); + tblId = table.getId(); + // disallow external catalog + Util.prohibitExternalCatalog(tableName.getCtl(), + this.getClass().getSimpleName()); + // check permission + checkAnalyzePriv(db.getFullName(), table.getName()); + // check columnNames + if (columnNames != null) { + for (String cName : columnNames) { + if (table.getColumn(cName) == null) { + ErrorReport.reportAnalysisException( + ErrorCode.ERR_WRONG_COLUMN_NAME, + "DROP", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + cName); } } } else { - Database db = analyzer.getEnv().getInternalCatalog() - .getDbOrAnalysisException(analyzer.getDefaultDb()); - List tables = db.getTables(); - for (Table table : tables) { - checkAnalyzePriv(db.getFullName(), table.getName()); - } - - dbId = db.getId(); - tbIds.addAll(tables.stream().map(Table::getId).collect(Collectors.toList())); + columnNames = table.getColumns().stream().map(Column::getName).collect(Collectors.toSet()); } } - public long getDbId() { - return dbId; - } - - public Set getTbIds() { - return tbIds; - } - - public Set getPartitionIds() { - return partitionIds; + public long getTblId() { + return tblId; } public Set getColumnNames() { - return columnNames != null ? Sets.newHashSet(columnNames) : Sets.newHashSet(); + return columnNames; } @Override @@ -166,11 +127,6 @@ public class DropStatsStmt extends DdlStmt { sb.append(")"); } - if (partitionNames != null) { - sb.append(" "); - sb.append(partitionNames.toSql()); - } - return sb.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index a0171cc3c0..433135d5c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -37,7 +37,6 @@ import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; @@ -53,7 +52,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; public class AnalysisManager { @@ -98,23 +96,10 @@ public class AnalysisManager { TableName tbl = analyzeStmt.getTblName(); StatisticsUtil.convertTableNameToObjects(tbl); Set colNames = analyzeStmt.getColumnNames(); - Set partitionNames = analyzeStmt.getPartitionNames(); Map analysisTaskInfos = new HashMap<>(); long jobId = Env.getCurrentEnv().getNextId(); - // If the analysis is not incremental, need to delete existing statistics. - // we cannot collect histograms incrementally and do not support it - if (!analyzeStmt.isIncrement && !analyzeStmt.isHistogram) { - long dbId = analyzeStmt.getDbId(); - TableIf table = analyzeStmt.getTable(); - Set tblIds = Sets.newHashSet(table.getId()); - Set partIds = partitionNames.stream() - .map(p -> table.getPartition(p).getId()) - .collect(Collectors.toSet()); - StatisticsRepository.dropStatistics(dbId, tblIds, colNames, partIds); - } - - createTaskForEachColumns(analyzeStmt, catalogName, db, tbl, colNames, partitionNames, analysisTaskInfos, jobId); - createTaskForMVIdx(analyzeStmt, catalogName, db, tbl, partitionNames, analysisTaskInfos, jobId); + createTaskForEachColumns(analyzeStmt, catalogName, db, tbl, colNames, analysisTaskInfos, jobId); + createTaskForMVIdx(analyzeStmt, catalogName, db, tbl, analysisTaskInfos, jobId); persistAnalysisJob(catalogName, db, tbl, jobId); if (analyzeStmt.isSync()) { @@ -143,7 +128,7 @@ public class AnalysisManager { } private void createTaskForMVIdx(AnalyzeStmt analyzeStmt, String catalogName, String db, TableName tbl, - Set partitionNames, Map analysisTaskInfos, long jobId) throws DdlException { + Map analysisTaskInfos, long jobId) throws DdlException { if (!(analyzeStmt.isWholeTbl && analyzeStmt.getTable().getType().equals(TableType.OLAP))) { return; } @@ -158,7 +143,7 @@ public class AnalysisManager { AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId( jobId).setTaskId(taskId) .setCatalogName(catalogName).setDbName(db) - .setTblName(tbl.getTbl()).setPartitionNames(partitionNames) + .setTblName(tbl.getTbl()) .setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL) .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX) .setScheduleType(ScheduleType.ONCE).build(); @@ -175,7 +160,7 @@ public class AnalysisManager { } private void createTaskForEachColumns(AnalyzeStmt analyzeStmt, String catalogName, String db, TableName tbl, - Set colNames, Set partitionNames, Map analysisTaskInfos, + Set colNames, Map analysisTaskInfos, long jobId) throws DdlException { for (String colName : colNames) { long taskId = Env.getCurrentEnv().getNextId(); @@ -183,7 +168,7 @@ public class AnalysisManager { AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(jobId) .setTaskId(taskId).setCatalogName(catalogName).setDbName(db) .setTblName(tbl.getTbl()).setColName(colName) - .setPartitionNames(partitionNames).setJobType(JobType.MANUAL) + .setJobType(JobType.MANUAL) .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType) .setState(AnalysisState.PENDING) .setScheduleType(ScheduleType.ONCE).build(); @@ -271,12 +256,17 @@ public class AnalysisManager { } } - public void dropStats(DropStatsStmt dropStatsStmt) { + public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException { if (dropStatsStmt.dropExpired) { Env.getCurrentEnv().getStatisticsCleaner().clear(); return; } - StatisticsRepository.dropTableStatistics(dropStatsStmt); + Set cols = dropStatsStmt.getColumnNames(); + long tblId = dropStatsStmt.getTblId(); + StatisticsRepository.dropStatistics(tblId, cols); + for (String col : cols) { + Env.getCurrentEnv().getStatisticsCache().invidate(tblId, -1L, col); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java index def16de41c..004d51b55e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java @@ -20,7 +20,6 @@ package org.apache.doris.statistics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Set; import java.util.StringJoiner; public class AnalysisTaskInfo { @@ -63,8 +62,6 @@ public class AnalysisTaskInfo { public final String colName; - public final Set partitionNames; - public final Long indexId; public final JobType jobType; @@ -87,7 +84,7 @@ public class AnalysisTaskInfo { public final ScheduleType scheduleType; public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName, - String colName, Set partitionNames, Long indexId, JobType jobType, + String colName, Long indexId, JobType jobType, AnalysisMethod analysisMethod, AnalysisType analysisType, String message, int lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType) { this.jobId = jobId; @@ -96,7 +93,6 @@ public class AnalysisTaskInfo { this.dbName = dbName; this.tblName = tblName; this.colName = colName; - this.partitionNames = partitionNames; this.indexId = indexId; this.jobType = jobType; this.analysisMethod = analysisMethod; @@ -115,7 +111,6 @@ public class AnalysisTaskInfo { sj.add("DBName: " + dbName); sj.add("TableName: " + tblName); sj.add("ColumnName: " + colName); - sj.add("PartitionNames: " + partitionNames); sj.add("TaskType: " + analysisType.toString()); sj.add("TaskMethod: " + analysisMethod.toString()); sj.add("Message: " + message); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java index e687d6fe93..e804388153 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java @@ -22,8 +22,6 @@ import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType; -import java.util.Set; - public class AnalysisTaskInfoBuilder { private long jobId; private long taskId; @@ -31,7 +29,6 @@ public class AnalysisTaskInfoBuilder { private String dbName; private String tblName; private String colName; - private Set partitionNames; private Long indexId; private JobType jobType; private AnalysisMethod analysisMethod; @@ -71,11 +68,6 @@ public class AnalysisTaskInfoBuilder { return this; } - public AnalysisTaskInfoBuilder setPartitionNames(Set partitionNames) { - this.partitionNames = partitionNames; - return this; - } - public AnalysisTaskInfoBuilder setIndexId(Long indexId) { this.indexId = indexId; return this; @@ -117,7 +109,7 @@ public class AnalysisTaskInfoBuilder { } public AnalysisTaskInfo build() { - return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, colName, partitionNames, + return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, colName, indexId, jobType, analysisMethod, analysisType, message, lastExecTimeInMs, state, scheduleType); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HistogramTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HistogramTask.java index 0cb83169ee..5adb34d4b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HistogramTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HistogramTask.java @@ -81,7 +81,7 @@ public class HistogramTask extends BaseAnalysisTask { params.put("percentValue", String.valueOf((int) (info.sampleRate * 100))); String histogramSql; - Set partitionNames = info.partitionNames; + Set partitionNames = tbl.getPartitionNames(); if (partitionNames.isEmpty()) { StringSubstitutor stringSubstitutor = new StringSubstitutor(params); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java index ce73dbea87..62eb72ec9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java @@ -97,7 +97,7 @@ public class MVAnalysisTask extends BaseAnalysisTask { .get(); selectItem.setAlias(column.getName()); Map params = new HashMap<>(); - for (String partName : info.partitionNames) { + for (String partName : tbl.getPartitionNames()) { PartitionNames partitionName = new PartitionNames(false, Collections.singletonList(partName)); tableRef.setPartitionNames(partitionName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index c3a01fb5b5..de848a2592 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -73,7 +73,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask { List partitionAnalysisSQLs = new ArrayList<>(); try { tbl.readLock(); - Set partNames = info.partitionNames; + Set partNames = tbl.getPartitionNames(); for (String partName : partNames) { Partition part = tbl.getPartition(partName); if (part == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index 19df103e11..d5d473eac6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -66,5 +66,6 @@ public class StatisticConstants { public static final int ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS = 7; public static final int FETCH_LIMIT = 10000; + public static final int FETCH_INTERVAL_IN_MS = 500; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 6ee30bb90a..ccac80a5f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -123,8 +123,7 @@ public class StatisticsCache { return Optional.empty(); } - // TODO: finish this method. - public void eraseExpiredCache(long tblId, long idxId, String colName) { + public void invidate(long tblId, long idxId, String colName) { columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java index 4ef5f35727..3bccdff3b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java @@ -174,9 +174,8 @@ public class StatisticsCleaner extends MasterDaemon { public ExpiredStats findExpiredStats(OlapTable statsTbl) { ExpiredStats expiredStats = new ExpiredStats(); - long rowCount = statsTbl.getRowCount(); long pos = 0; - while (pos < rowCount + while (pos < statsTbl.getRowCount() && !expiredStats.isFull()) { List rows = StatisticsRepository.fetchStatsFullName(StatisticConstants.FETCH_LIMIT, pos); pos += StatisticConstants.FETCH_LIMIT; @@ -227,6 +226,11 @@ public class StatisticsCleaner extends MasterDaemon { LOG.warn("Error occurred when retrieving expired stats", e); } } + try { + Thread.sleep(StatisticConstants.FETCH_INTERVAL_IN_MS); + } catch (InterruptedException t) { + // IGNORE + } } return expiredStats; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index 4ce673a063..80187edec2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -18,12 +18,12 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AlterColumnStatsStmt; -import org.apache.doris.analysis.DropStatsStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.statistics.util.DBObjects; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; @@ -85,10 +85,7 @@ public class StatisticsRepository { + "'${colId}', ${partId}, ${count}, ${ndv}, ${nullCount}, '${min}', '${max}', ${dataSize}, NOW())"; private static final String DROP_TABLE_STATISTICS_TEMPLATE = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME - + "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE ${condition}"; - - private static final String DROP_TABLE_HISTOGRAM_TEMPLATE = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME - + "." + StatisticConstants.HISTOGRAM_TBL_NAME + " WHERE ${condition}"; + + "." + "${tblName}" + " WHERE ${condition}"; private static final String FETCH_RECENT_STATS_UPDATED_COL = "SELECT * FROM " @@ -175,48 +172,21 @@ public class StatisticsRepository { return stringJoiner.toString(); } - public static void dropStatistics(Long dbId, - Set tblIds, Set colNames, Set partIds) { - dropStatistics(dbId, tblIds, colNames, partIds, false); + public static void dropStatistics(long tblId, Set colNames) throws DdlException { + dropStatistics(tblId, colNames, StatisticConstants.STATISTIC_TBL_NAME); + dropStatistics(tblId, colNames, StatisticConstants.HISTOGRAM_TBL_NAME); } - public static void dropHistogram(Long dbId, - Set tblIds, Set colNames, Set partIds) { - dropStatistics(dbId, tblIds, colNames, partIds, true); - } - - private static void dropStatistics(Long dbId, - Set tblIds, Set colNames, Set partIds, boolean isHistogram) { - if (dbId <= 0) { - throw new IllegalArgumentException("Database id is not specified."); - } - - StringBuilder predicate = new StringBuilder(); - predicate.append(String.format("db_id = '%d'", dbId)); - - if (!tblIds.isEmpty()) { - buildPredicate("tbl_id", tblIds, predicate); - } - - if (!colNames.isEmpty()) { - buildPredicate("col_id", colNames, predicate); - } - - if (!partIds.isEmpty() && !isHistogram) { - // Histogram is not collected and deleted by partition - buildPredicate("part_id", partIds, predicate); - } - + public static void dropStatistics(long tblId, Set colNames, String statsTblName) throws DdlException { Map params = new HashMap<>(); - params.put("condition", predicate.toString()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - + String right = colNames.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")); + String inPredicate = String.format("tbl_id = %s AND %s IN (%s)", tblId, "col_id", right); + params.put("tblName", statsTblName); + params.put("condition", inPredicate); try { - String statement = isHistogram ? stringSubstitutor.replace(DROP_TABLE_HISTOGRAM_TEMPLATE) : - stringSubstitutor.replace(DROP_TABLE_STATISTICS_TEMPLATE); - StatisticsUtil.execUpdate(statement); + StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(DROP_TABLE_STATISTICS_TEMPLATE)); } catch (Exception e) { - LOG.warn("Drop statistics failed", e); + throw new DdlException(e.getMessage(), e); } } @@ -302,15 +272,6 @@ public class StatisticsRepository { .updateColStatsCache(objects.table.getId(), -1, colName, builder.build()); } - public static void dropTableStatistics(DropStatsStmt dropTableStatsStmt) { - Long dbId = dropTableStatsStmt.getDbId(); - Set tbIds = dropTableStatsStmt.getTbIds(); - Set cols = dropTableStatsStmt.getColumnNames(); - Set partIds = dropTableStatsStmt.getPartitionIds(); - dropHistogram(dbId, tbIds, cols, partIds); - dropStatistics(dbId, tbIds, cols, partIds); - } - public static List fetchRecentStatsUpdatedCol() { return StatisticsUtil.execStatisticQuery(FETCH_RECENT_STATS_UPDATED_COL); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index dcfb475c24..09f64621ce 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -28,7 +28,6 @@ import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.utframe.TestWithFeService; -import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Mock; import mockit.MockUp; @@ -111,7 +110,6 @@ public class AnalysisJobTest extends TestWithFeService { .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( AnalysisType.COLUMN) - .setPartitionNames(Sets.newHashSet("t1")) .build(); new OlapAnalysisTask(analysisJobInfo).execute(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index 22e4d531a1..10e1635560 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -26,7 +26,6 @@ import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import org.apache.doris.statistics.util.BlockingCounter; import org.apache.doris.utframe.TestWithFeService; -import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Mock; import mockit.MockUp; @@ -96,7 +95,6 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( AnalysisType.COLUMN) - .setPartitionNames(Sets.newHashSet("t1")) .build(); OlapAnalysisTask task = new OlapAnalysisTask(analysisTaskInfo); new MockUp() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java index 3184634815..caf316429f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java @@ -26,7 +26,6 @@ import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import org.apache.doris.system.SystemInfoService; import org.apache.doris.utframe.TestWithFeService; -import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Mock; import mockit.MockUp; @@ -92,7 +91,6 @@ public class HistogramTaskTest extends TestWithFeService { Assertions.assertEquals(AnalysisType.HISTOGRAM, info.analysisType); Assertions.assertEquals("t1", info.tblName); Assertions.assertEquals("col1", info.colName); - Assertions.assertEquals("p_201701", info.partitionNames.iterator().next()); } } } @@ -105,7 +103,6 @@ public class HistogramTaskTest extends TestWithFeService { .setDbName(SystemInfoService.DEFAULT_CLUSTER + ":" + "histogram_task_test").setTblName("t1") .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL) .setAnalysisType(AnalysisType.HISTOGRAM) - .setPartitionNames(Sets.newHashSet("t")) .build(); HistogramTask task = new HistogramTask(analysisTaskInfo); diff --git a/regression-test/data/statistics/analyze_test.out b/regression-test/data/statistics/analyze_test.out index 0123d30de7..048b06a7e2 100644 --- a/regression-test/data/statistics/analyze_test.out +++ b/regression-test/data/statistics/analyze_test.out @@ -23,3 +23,6 @@ 5 5 0 1 7 5 5 5 0 1 7 5 +-- !sql -- +0 + diff --git a/regression-test/suites/statistics/analyze_test.groovy b/regression-test/suites/statistics/analyze_test.groovy index fb2c7598d6..f8414d9f68 100644 --- a/regression-test/suites/statistics/analyze_test.groovy +++ b/regression-test/suites/statistics/analyze_test.groovy @@ -59,7 +59,7 @@ suite("analyze_test") { sql """ DROP TABLE IF EXISTS ${tblName1} """ - + sql """CREATE TABLE ${tblName1} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null) UNIQUE KEY(analyze_test_col1) DISTRIBUTED BY HASH(analyze_test_col1) @@ -68,11 +68,11 @@ suite("analyze_test") { "replication_num"="1", "enable_unique_key_merge_on_write"="true" );""" - + sql """ DROP TABLE IF EXISTS ${tblName2} """ - + sql """CREATE TABLE ${tblName2} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null) UNIQUE KEY(analyze_test_col1) DISTRIBUTED BY HASH(analyze_test_col1) @@ -81,11 +81,11 @@ suite("analyze_test") { "replication_num"="1", "enable_unique_key_merge_on_write"="true" );""" - + sql """ DROP TABLE IF EXISTS ${tblName3} """ - + sql """CREATE TABLE ${tblName3} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null) UNIQUE KEY(analyze_test_col1) DISTRIBUTED BY HASH(analyze_test_col1) @@ -100,14 +100,14 @@ suite("analyze_test") { sql """insert into ${tblName1} values(4, 5, 6);""" sql """insert into ${tblName1} values(7, 1, 9);""" sql """insert into ${tblName1} values(3, 8, 2);""" - sql """insert into ${tblName1} values(5, 2, 1);""" - + sql """insert into ${tblName1} values(5, 2, 1);""" + sql """insert into ${tblName2} values(1, 2, 3);""" sql """insert into ${tblName2} values(4, 5, 6);""" sql """insert into ${tblName2} values(7, 1, 9);""" sql """insert into ${tblName2} values(3, 8, 2);""" sql """insert into ${tblName2} values(5, 2, 1);""" - + sql """insert into ${tblName3} values(1, 2, 3);""" sql """insert into ${tblName3} values(4, 5, 6);""" sql """insert into ${tblName3} values(7, 1, 9);""" @@ -158,4 +158,29 @@ suite("analyze_test") { select count, ndv, null_count, min, max, data_size_in_bytes from __internal_schema.column_statistics where col_id in ('analyze_test_col1', 'analyze_test_col2', 'analyze_test_col3') order by col_id """ -} \ No newline at end of file + + sql """ + DROP STATS ${tblName3} (analyze_test_col1); + """ + + qt_sql """ + SELECT COUNT(*) FROM __internal_schema.column_statistics where + col_id in ('analyze_test_col1', 'analyze_test_col2', 'analyze_test_col3') + """ + // Below test would failed on community pipeline for unknown reason, comment it temporarily + // sql """ + // SET enable_nereids_planner=true; + // + // """ + // sql """ + // SET forbid_unknown_col_stats=true; + // """ + // + //test { + // sql """ + // SELECT analyze_test_col1 FROM ${tblName3} + // """ + // exception """errCode = 2, detailMessage = Unexpected exception: column stats for analyze_test_col1 is unknown""" + //} + +}