diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 2bfd114b02..9fdeaab511 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -355,6 +355,7 @@ terminal String KW_EXCEPT, KW_EXCLUDE, KW_EXISTS, + KW_EXPIRED, KW_EXPORT, KW_EXTENDED, KW_EXTERNAL, @@ -3015,7 +3016,11 @@ drop_stmt ::= /* statistics */ | KW_DROP KW_STATS opt_table_name:tbl opt_col_list:cols opt_partition_names:partitionNames {: - RESULT = new DropTableStatsStmt(tbl, partitionNames, cols); + RESULT = new DropStatsStmt(tbl, partitionNames, cols); + :} + | KW_DROP KW_EXPIRED KW_STATS + {: + RESULT = new DropStatsStmt(true); :} ; @@ -7356,6 +7361,8 @@ keyword ::= {: RESULT = id; :} | KW_CURRENT_CATALOG:id {: RESULT = id; :} + | KW_EXPIRED: id + {: RESULT = id; :} ; // Identifier that contain keyword diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java similarity index 92% rename from fe/fe-core/src/main/java/org/apache/doris/analysis/DropTableStatsStmt.java rename to fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java index f36da7e021..da67165bf8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java @@ -42,11 +42,14 @@ import java.util.stream.Collectors; * Manually drop statistics for tables or partitions. * Table or partition can be specified, if neither is specified, * all statistics under the current database will be deleted. - * + *

* syntax: - * DROP STATS [TableName [PARTITIONS(partitionNames)]]; + * DROP [EXPIRED] STATS [TableName [PARTITIONS(partitionNames)]]; */ -public class DropTableStatsStmt extends DdlStmt { +public class DropStatsStmt extends DdlStmt { + + public final boolean dropExpired; + private final TableName tableName; private final PartitionNames partitionNames; private final List columnNames; @@ -56,17 +59,27 @@ public class DropTableStatsStmt extends DdlStmt { private final Set tbIds = Sets.newHashSet(); private final Set partitionIds = Sets.newHashSet(); - public DropTableStatsStmt(TableName tableName, + public DropStatsStmt(boolean dropExpired) { + this.dropExpired = dropExpired; + this.tableName = null; + this.partitionNames = null; + this.columnNames = null; + } + + public DropStatsStmt(TableName tableName, PartitionNames partitionNames, List columnNames) { this.tableName = tableName; this.partitionNames = partitionNames; this.columnNames = columnNames; + dropExpired = false; } @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - + if (dropExpired) { + return; + } if (tableName != null) { tableName.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index d1d0b2165a..794570ffaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -44,10 +44,10 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -808,4 +808,8 @@ public class Database extends MetaObject implements Writable, DatabaseIf } return null; } + + public Map getIdToTable() { + return new HashMap<>(idToTable); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index d42dc61451..950d52753f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -216,6 +216,7 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.AnalysisTaskScheduler; import org.apache.doris.statistics.StatisticsCache; +import org.apache.doris.statistics.StatisticsCleaner; import org.apache.doris.system.Backend; import org.apache.doris.system.FQDNManager; import org.apache.doris.system.Frontend; @@ -450,6 +451,8 @@ public class Env { private ResourceGroupMgr resourceGroupMgr; + private StatisticsCleaner statisticsCleaner; + public List getFrontends(FrontendNodeType nodeType) { if (nodeType == null) { // get all @@ -650,6 +653,7 @@ public class Env { this.fqdnManager = new FQDNManager(systemInfo); if (!isCheckpointCatalog) { this.analysisManager = new AnalysisManager(); + this.statisticsCleaner = new StatisticsCleaner(); } this.globalFunctionMgr = new GlobalFunctionMgr(); this.resourceGroupMgr = new ResourceGroupMgr(); @@ -874,6 +878,9 @@ public class Env { // If not using bdb, we need to notify the FE type transfer manually. notifyNewFETypeTransfer(FrontendNodeType.MASTER); } + if (statisticsCleaner != null) { + statisticsCleaner.start(); + } } // wait until FE is ready. @@ -5358,4 +5365,8 @@ public class Env { public GlobalFunctionMgr getGlobalFunctionMgr() { return globalFunctionMgr; } + + public StatisticsCleaner getStatisticsCleaner() { + return statisticsCleaner; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 1aecafcc6b..96eb25bc90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -576,6 +576,10 @@ public class OlapTable extends Table { return indexIdToMeta; } + public Map getCopyOfIndexIdToMeta() { + return new HashMap<>(indexIdToMeta); + } + public Map getCopiedIndexIdToMeta() { return new HashMap<>(indexIdToMeta); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index d35a481724..b8732dd110 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -992,5 +992,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable { } internalCatalog = (InternalCatalog) idToCatalog.get(InternalCatalog.INTERNAL_CATALOG_ID); } + + public Map getIdToCatalog() { + return idToCatalog; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index ae443de561..292723e091 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3516,4 +3516,8 @@ public class InternalCatalog implements CatalogIf { LOG.info("finished replay databases from image"); return newChecksum; } + + public ConcurrentHashMap getIdToDb() { + return new ConcurrentHashMap<>(idToDb); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 0110292e7e..773126bfca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -86,7 +86,7 @@ import org.apache.doris.analysis.DropRepositoryStmt; import org.apache.doris.analysis.DropResourceStmt; import org.apache.doris.analysis.DropRoleStmt; import org.apache.doris.analysis.DropSqlBlockRuleStmt; -import org.apache.doris.analysis.DropTableStatsStmt; +import org.apache.doris.analysis.DropStatsStmt; import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.DropUserStmt; import org.apache.doris.analysis.GrantStmt; @@ -327,9 +327,8 @@ public class DdlExecutor { env.getAuth().alterUser((AlterUserStmt) ddlStmt); } else if (ddlStmt instanceof CleanProfileStmt) { ProfileManager.getInstance().cleanProfile(); - } else if (ddlStmt instanceof DropTableStatsStmt) { - DropTableStatsStmt stmt = (DropTableStatsStmt) ddlStmt; - StatisticsRepository.dropTableStatistics(stmt); + } else if (ddlStmt instanceof DropStatsStmt) { + env.getAnalysisManager().dropStats((DropStatsStmt) ddlStmt); } else { throw new DdlException("Unknown statement."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index ee3c15eae4..a0171cc3c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -18,6 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeStmt; +import org.apache.doris.analysis.DropStatsStmt; import org.apache.doris.analysis.ShowAnalyzeStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Env; @@ -270,4 +271,12 @@ public class AnalysisManager { } } + public void dropStats(DropStatsStmt dropStatsStmt) { + if (dropStatsStmt.dropExpired) { + Env.getCurrentEnv().getStatisticsCleaner().clear(); + return; + } + StatisticsRepository.dropTableStatistics(dropStatsStmt); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index 7feff48e26..19df103e11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -63,4 +63,8 @@ public class StatisticConstants { public static final long PRELOAD_RETRY_INTERVAL_IN_SECONDS = TimeUnit.SECONDS.toMillis(10); + public static final int ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS = 7; + + public static final int FETCH_LIMIT = 10000; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java new file mode 100644 index 0000000000..4ef5f35727 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCleaner.java @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndexMeta; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; +import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.system.SystemInfoService; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Maintenance the internal statistics table. + * Delete rows that corresponding DB/Table/Column not exists anymore. + */ +public class StatisticsCleaner extends MasterDaemon { + + private static final Logger LOG = LogManager.getLogger(StatisticsCleaner.class); + + private OlapTable colStatsTbl; + private OlapTable histStatsTbl; + + private Map idToCatalog; + + /* Internal DB only */ + private Map idToDb; + + /* Internal tbl only */ + private Map idToTbl; + + private Map idToMVIdx; + + public StatisticsCleaner() { + super("Statistics Table Cleaner", + TimeUnit.HOURS.toMillis(StatisticConstants.STATISTIC_CLEAN_INTERVAL_IN_HOURS)); + } + + @Override + protected void runAfterCatalogReady() { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + clear(); + } + + public synchronized void clear() { + if (!init()) { + return; + } + clear(colStatsTbl); + clear(histStatsTbl); + } + + private void clear(OlapTable statsTbl) { + ExpiredStats expiredStats = null; + do { + expiredStats = findExpiredStats(statsTbl); + deleteExpiredStats(expiredStats); + } while (!expiredStats.isEmpty()); + } + + private boolean init() { + try { + colStatsTbl = + (OlapTable) StatisticsUtil + .findTable(InternalCatalog.INTERNAL_CATALOG_NAME, + SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME, + StatisticConstants.STATISTIC_TBL_NAME); + histStatsTbl = + (OlapTable) StatisticsUtil + .findTable(InternalCatalog.INTERNAL_CATALOG_NAME, + SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME, + StatisticConstants.HISTOGRAM_TBL_NAME); + } catch (Throwable t) { + LOG.warn("Failed to init stats cleaner", t); + return false; + } + + idToCatalog = Env.getCurrentEnv().getCatalogMgr().getIdToCatalog(); + idToDb = Env.getCurrentEnv().getInternalCatalog().getIdToDb(); + idToTbl = constructTblMap(); + idToMVIdx = constructIdxMap(); + return true; + } + + private Map constructTblMap() { + Map idToTbl = new HashMap<>(); + for (Database db : idToDb.values()) { + idToTbl.putAll(db.getIdToTable()); + } + return idToTbl; + } + + private Map constructIdxMap() { + Map idToMVIdx = new HashMap<>(); + for (Table t : idToTbl.values()) { + if (t instanceof OlapTable) { + OlapTable olapTable = (OlapTable) t; + olapTable.getCopyOfIndexIdToMeta() + .entrySet() + .stream() + .filter(idx -> idx.getValue().getDefineStmt() != null) + .forEach(e -> idToMVIdx.put(e.getKey(), e.getValue())); + } + } + return idToMVIdx; + } + + private void deleteExpiredStats(ExpiredStats expiredStats) { + doDelete("catalog_id", expiredStats.expiredCatalog.stream() + .map(String::valueOf).collect(Collectors.toList())); + doDelete("db_id", expiredStats.expiredDatabase.stream() + .map(String::valueOf).collect(Collectors.toList())); + doDelete("tbl_id", expiredStats.expiredTable.stream() + .map(String::valueOf).collect(Collectors.toList())); + doDelete("idx_id", expiredStats.expiredIdxId.stream() + .map(String::valueOf).collect(Collectors.toList())); + doDelete("id", expiredStats.ids.stream() + .map(String::valueOf).collect(Collectors.toList())); + } + + private void doDelete(String/*col name*/ colName, List pred) { + String deleteTemplate = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME + + "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE ${left} IN (${right})"; + if (CollectionUtils.isEmpty(pred)) { + return; + } + String right = pred.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",")); + Map params = new HashMap<>(); + params.put("left", colName); + params.put("right", right); + String sql = new StringSubstitutor(params).replace(deleteTemplate); + try { + StatisticsUtil.execUpdate(sql); + } catch (Exception e) { + LOG.warn("Failed to delete expired stats!", e); + } + } + + public ExpiredStats findExpiredStats(OlapTable statsTbl) { + ExpiredStats expiredStats = new ExpiredStats(); + long rowCount = statsTbl.getRowCount(); + long pos = 0; + while (pos < rowCount + && !expiredStats.isFull()) { + List rows = StatisticsRepository.fetchStatsFullName(StatisticConstants.FETCH_LIMIT, pos); + pos += StatisticConstants.FETCH_LIMIT; + for (ResultRow r : rows) { + try { + String id = r.getColumnValue("id"); + long catalogId = Long.parseLong(r.getColumnValue("catalog_id")); + if (!idToCatalog.containsKey(catalogId)) { + expiredStats.expiredCatalog.add(catalogId); + continue; + } + long dbId = Long.parseLong(r.getColumnValue("db_id")); + if (!idToDb.containsKey(dbId)) { + expiredStats.expiredDatabase.add(dbId); + continue; + } + long tblId = Long.parseLong(r.getColumnValue("tbl_id")); + if (!idToTbl.containsKey(tblId)) { + expiredStats.expiredTable.add(tblId); + continue; + } + + long idxId = Long.parseLong(r.getColumnValue("idx_id")); + if (idxId != -1 && !idToMVIdx.containsKey(idxId)) { + expiredStats.expiredIdxId.add(idxId); + continue; + } + + Table t = idToTbl.get(tblId); + String colId = r.getColumnValue("col_id"); + if (t.getColumn(colId) == null) { + expiredStats.ids.add(id); + continue; + } + if (!(t instanceof OlapTable)) { + continue; + } + OlapTable olapTable = (OlapTable) t; + String partIdStr = r.getColumnValue("part_id"); + if (partIdStr == null) { + continue; + } + long partId = Long.parseLong(partIdStr); + if (!olapTable.getPartitionIds().contains(partId)) { + expiredStats.ids.add(id); + } + } catch (Exception e) { + LOG.warn("Error occurred when retrieving expired stats", e); + } + } + } + return expiredStats; + } + + private static class ExpiredStats { + Set expiredCatalog = new HashSet<>(); + Set expiredDatabase = new HashSet<>(); + Set expiredTable = new HashSet<>(); + + Set expiredIdxId = new HashSet<>(); + + Set ids = new HashSet<>(); + + public boolean isFull() { + return expiredCatalog.size() >= Config.expr_children_limit + || expiredDatabase.size() >= Config.expr_children_limit + || expiredTable.size() >= Config.expr_children_limit + || expiredIdxId.size() >= Config.expr_children_limit + || ids.size() >= Config.expr_children_limit; + } + + public boolean isEmpty() { + return expiredCatalog.isEmpty() + && expiredDatabase.isEmpty() + && expiredTable.isEmpty() + && expiredIdxId.isEmpty() + && ids.size() < Config.expr_children_limit / 100; + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index c18cb38d0a..865dd3ca2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -18,7 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AlterColumnStatsStmt; -import org.apache.doris.analysis.DropTableStatsStmt; +import org.apache.doris.analysis.DropStatsStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; @@ -90,6 +90,12 @@ public class StatisticsRepository { + " ORDER BY update_time DESC LIMIT " + StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE; + private static final String FETCH_STATS_FULL_NAME = + "SELECT id, catalog_id, db_id, tbl_id, idx_id, col_id, part_id FROM " + + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.STATISTIC_TBL_NAME + + " ORDER BY update_time " + + "LIMIT ${limit} OFFSET ${offset}"; + public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) { ResultRow resultRow = queryColumnStatisticById(tableId, colName); if (resultRow == null) { @@ -272,7 +278,7 @@ public class StatisticsRepository { .updateColStatsCache(objects.table.getId(), -1, colName, builder.build()); } - public static void dropTableStatistics(DropTableStatsStmt dropTableStatsStmt) { + public static void dropTableStatistics(DropStatsStmt dropTableStatsStmt) { Long dbId = dropTableStatsStmt.getDbId(); Set tbIds = dropTableStatsStmt.getTbIds(); Set cols = dropTableStatsStmt.getColumnNames(); @@ -284,4 +290,11 @@ public class StatisticsRepository { public static List fetchRecentStatsUpdatedCol() { return StatisticsUtil.execStatisticQuery(FETCH_RECENT_STATS_UPDATED_COL); } + + public static List fetchStatsFullName(long limit, long offset) { + Map params = new HashMap<>(); + params.put("limit", String.valueOf(limit)); + params.put("offset", String.valueOf(offset)); + return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(FETCH_STATS_FULL_NAME)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTableCleaner.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTableCleaner.java deleted file mode 100644 index bcc076ee02..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTableCleaner.java +++ /dev/null @@ -1,120 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Table; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.MasterDaemon; -import org.apache.doris.statistics.util.StatisticsUtil; - -import org.apache.commons.text.StringSubstitutor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.StringJoiner; -import java.util.stream.Collectors; - -/** - * Maintenance the internal statistics table. - * Delete rows that corresponding DB/Table/Column not exists anymore. - */ -public class StatisticsTableCleaner extends MasterDaemon { - - private static final Logger LOG = LogManager.getLogger(StatisticsTableCleaner.class); - - public StatisticsTableCleaner() { - super("Statistics Table Cleaner", - StatisticConstants.STATISTIC_CLEAN_INTERVAL_IN_HOURS * 3600 * 1000); - } - - @Override - protected void runAfterCatalogReady() { - if (Env.getCurrentEnv().isMaster()) { - deleteExpiredStatistics(); - } - } - - private void deleteExpiredStatistics() { - List databases = Env.getCurrentEnv().getInternalCatalog().getDbs(); - deleteByDB(databases.stream().map(Database::getId).map(String::valueOf).collect(Collectors.toList())); - List tblIds = new ArrayList<>(); - List colIds = new ArrayList<>(); - List partitionIds = new ArrayList<>(); - for (Database database : databases) { - List
tables = database.getTables(); - for (Table table : tables) { - tblIds.add(String.valueOf(table.getId())); - if (table instanceof OlapTable) { - OlapTable olapTable = (OlapTable) table; - partitionIds.addAll(olapTable.getPartitionIds() - .stream().map(String::valueOf).collect(Collectors.toList())); - } - colIds.addAll(table.getColumns().stream().map(Column::getName) - .map(String::valueOf).collect(Collectors.toList())); - } - } - deleteByTblId(tblIds); - deleteByColId(colIds); - deleteByPartitionId(partitionIds); - } - - private void deleteByPartitionId(List partitionIds) { - deleteExpired("part_id", partitionIds); - } - - private void deleteByTblId(List tblIds) { - deleteExpired("tbl_id", tblIds); - } - - private void deleteByDB(List dbIds) { - deleteExpired("db_id", dbIds); - } - - private void deleteByColId(List colId) { - deleteExpired("col_id", colId); - } - - private void deleteExpired(String colName, List constants) { - // TODO: must promise count of children of predicate is less than the FE limits. - String deleteTemplate = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME - + "." + StatisticConstants.STATISTIC_TBL_NAME + "WHERE ${colName} NOT IN ${predicate}"; - StringJoiner predicateBuilder = new StringJoiner(",", "(", ")"); - constants.forEach(predicateBuilder::add); - Map map = new HashMap() { - { - put("colName", colName); - put("predicate", predicateBuilder.toString()); - } - }; - StringSubstitutor stringSubstitutor = new StringSubstitutor(map); - try { - StatisticsUtil.execUpdate(stringSubstitutor.replace(deleteTemplate)); - } catch (Exception e) { - LOG.warn("Remove expired statistics failed", e); - } - } - -} diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 41fb69a8f1..ec6511411c 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -489,6 +489,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("execute", new Integer(SqlParserSymbols.KW_EXECUTE)); keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES)); keywordMap.put("ignore", new Integer(SqlParserSymbols.KW_IGNORE)); + keywordMap.put("expired", new Integer(SqlParserSymbols.KW_EXPIRED)); } // map from token id to token description diff --git a/regression-test/data/statistics/analyze_test.out b/regression-test/data/statistics/analyze_test.out index 249e1e6abc..0123d30de7 100644 --- a/regression-test/data/statistics/analyze_test.out +++ b/regression-test/data/statistics/analyze_test.out @@ -2,8 +2,24 @@ -- !sql -- 5 4 0 1 8 20 5 4 0 1 8 20 +5 4 0 1 8 20 +5 4 0 1 8 20 +5 4 0 1 8 20 +5 4 0 1 8 20 5 5 0 1 7 5 5 5 0 1 7 5 +5 5 0 1 7 5 +5 5 0 1 7 5 +5 5 0 1 7 5 +5 5 0 1 7 5 +5 5 0 1 9 20 +5 5 0 1 9 20 +5 5 0 1 9 20 +5 5 0 1 9 20 5 5 0 1 9 20 5 5 0 1 9 20 +-- !sql -- +5 5 0 1 7 5 +5 5 0 1 7 5 + diff --git a/regression-test/suites/statistics/analyze_test.groovy b/regression-test/suites/statistics/analyze_test.groovy index b4f194e673..fb2c7598d6 100644 --- a/regression-test/suites/statistics/analyze_test.groovy +++ b/regression-test/suites/statistics/analyze_test.groovy @@ -16,12 +16,79 @@ // under the License. suite("analyze_test") { + + def dbName1 = "analyze_test_db_1" + + def tblName1 = "${dbName1}.analyze_test_tbl_1" + + def dbName2 = "analyze_test_db_2" + + def tblName2 = "${dbName2}.analyze_test_tbl_2" + + def dbName3 = "analyze_test_db_3" + + def tblName3 = "${dbName3}.analyze_test_tbl_3" + + sql """ - DROP TABLE IF EXISTS test_table_alter_column_stats + DROP DATABASE IF EXISTS ${dbName1}; """ - sql """CREATE TABLE test_table_alter_column_stats (col1 varchar(11451) not null, col2 int not null, col3 int not null) - UNIQUE KEY(col1) - DISTRIBUTED BY HASH(col1) + + + sql """ + CREATE DATABASE ${dbName1}; + """ + + sql """ + DROP DATABASE IF EXISTS ${dbName2} + """ + + sql """ + CREATE DATABASE ${dbName2}; + """ + + sql """ + DROP DATABASE IF EXISTS ${dbName3} + """ + + sql """ + CREATE DATABASE ${dbName3}; + """ + + + sql """ + DROP TABLE IF EXISTS ${tblName1} + """ + + sql """CREATE TABLE ${tblName1} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null) + UNIQUE KEY(analyze_test_col1) + DISTRIBUTED BY HASH(analyze_test_col1) + BUCKETS 3 + PROPERTIES( + "replication_num"="1", + "enable_unique_key_merge_on_write"="true" + );""" + + sql """ + DROP TABLE IF EXISTS ${tblName2} + """ + + sql """CREATE TABLE ${tblName2} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null) + UNIQUE KEY(analyze_test_col1) + DISTRIBUTED BY HASH(analyze_test_col1) + BUCKETS 3 + PROPERTIES( + "replication_num"="1", + "enable_unique_key_merge_on_write"="true" + );""" + + sql """ + DROP TABLE IF EXISTS ${tblName3} + """ + + sql """CREATE TABLE ${tblName3} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null) + UNIQUE KEY(analyze_test_col1) + DISTRIBUTED BY HASH(analyze_test_col1) BUCKETS 3 PROPERTIES( "replication_num"="1", @@ -29,23 +96,66 @@ suite("analyze_test") { );""" - sql """insert into test_table_alter_column_stats values(1, 2, 3);""" - sql """insert into test_table_alter_column_stats values(4, 5, 6);""" - sql """insert into test_table_alter_column_stats values(7, 1, 9);""" - sql """insert into test_table_alter_column_stats values(3, 8, 2);""" - sql """insert into test_table_alter_column_stats values(5, 2, 1);""" - - sql """delete from __internal_schema.column_statistics where col_id in ('col1', 'col2', 'col3')""" + sql """insert into ${tblName1} values(1, 2, 3);""" + sql """insert into ${tblName1} values(4, 5, 6);""" + sql """insert into ${tblName1} values(7, 1, 9);""" + sql """insert into ${tblName1} values(3, 8, 2);""" + sql """insert into ${tblName1} values(5, 2, 1);""" + + sql """insert into ${tblName2} values(1, 2, 3);""" + sql """insert into ${tblName2} values(4, 5, 6);""" + sql """insert into ${tblName2} values(7, 1, 9);""" + sql """insert into ${tblName2} values(3, 8, 2);""" + sql """insert into ${tblName2} values(5, 2, 1);""" + + sql """insert into ${tblName3} values(1, 2, 3);""" + sql """insert into ${tblName3} values(4, 5, 6);""" + sql """insert into ${tblName3} values(7, 1, 9);""" + sql """insert into ${tblName3} values(3, 8, 2);""" + sql """insert into ${tblName3} values(5, 2, 1);""" sql """ - analyze sync table test_table_alter_column_stats; + delete from __internal_schema.column_statistics where col_id in ('analyze_test_col1', 'analyze_test_col2', 'analyze_test_col3') + """ + + sql """ + analyze sync table ${tblName1}; + """ + + sql """ + analyze sync table ${tblName2}; + """ + + sql """ + analyze sync table ${tblName3}; """ order_qt_sql """ select count, ndv, null_count, min, max, data_size_in_bytes from __internal_schema.column_statistics where - col_id in ('col1', 'col2', 'col3') order by col_id + col_id in ('analyze_test_col1', 'analyze_test_col2', 'analyze_test_col3') order by col_id + """ + + sql """ + ALTER TABLE ${tblName3} DROP COLUMN analyze_test_col3; """ sql """ - DROP TABLE IF EXISTS test_table_alter_column_stats + ALTER TABLE ${tblName3} DROP COLUMN analyze_test_col2; + """ + + sql """ + DROP TABLE ${tblName2} + """ + + sql """ + DROP DATABASE ${dbName1} + """ + + sql """ + DROP EXPIRED STATS + """ + + order_qt_sql """ + select count, ndv, null_count, min, max, data_size_in_bytes from __internal_schema.column_statistics where + col_id in ('analyze_test_col1', 'analyze_test_col2', 'analyze_test_col3') order by col_id """ } \ No newline at end of file