[feat](stats) Support to delete expired stats periodically (#18614)
Support to delete expired stats periodically and manually. default cleaner running interval is 2 days Manually clean syntax is ```sql DROP EXPIRED STATS ``` TODO: 1. process external catalog's stats 2. run drop at the appointed time 3. sleep a short time after drop one batch
This commit is contained in:
@ -355,6 +355,7 @@ terminal String
|
||||
KW_EXCEPT,
|
||||
KW_EXCLUDE,
|
||||
KW_EXISTS,
|
||||
KW_EXPIRED,
|
||||
KW_EXPORT,
|
||||
KW_EXTENDED,
|
||||
KW_EXTERNAL,
|
||||
@ -3015,7 +3016,11 @@ drop_stmt ::=
|
||||
/* statistics */
|
||||
| KW_DROP KW_STATS opt_table_name:tbl opt_col_list:cols opt_partition_names:partitionNames
|
||||
{:
|
||||
RESULT = new DropTableStatsStmt(tbl, partitionNames, cols);
|
||||
RESULT = new DropStatsStmt(tbl, partitionNames, cols);
|
||||
:}
|
||||
| KW_DROP KW_EXPIRED KW_STATS
|
||||
{:
|
||||
RESULT = new DropStatsStmt(true);
|
||||
:}
|
||||
;
|
||||
|
||||
@ -7356,6 +7361,8 @@ keyword ::=
|
||||
{: RESULT = id; :}
|
||||
| KW_CURRENT_CATALOG:id
|
||||
{: RESULT = id; :}
|
||||
| KW_EXPIRED: id
|
||||
{: RESULT = id; :}
|
||||
;
|
||||
|
||||
// Identifier that contain keyword
|
||||
|
||||
@ -42,11 +42,14 @@ import java.util.stream.Collectors;
|
||||
* Manually drop statistics for tables or partitions.
|
||||
* Table or partition can be specified, if neither is specified,
|
||||
* all statistics under the current database will be deleted.
|
||||
*
|
||||
* <p>
|
||||
* syntax:
|
||||
* DROP STATS [TableName [PARTITIONS(partitionNames)]];
|
||||
* DROP [EXPIRED] STATS [TableName [PARTITIONS(partitionNames)]];
|
||||
*/
|
||||
public class DropTableStatsStmt extends DdlStmt {
|
||||
public class DropStatsStmt extends DdlStmt {
|
||||
|
||||
public final boolean dropExpired;
|
||||
|
||||
private final TableName tableName;
|
||||
private final PartitionNames partitionNames;
|
||||
private final List<String> columnNames;
|
||||
@ -56,17 +59,27 @@ public class DropTableStatsStmt extends DdlStmt {
|
||||
private final Set<Long> tbIds = Sets.newHashSet();
|
||||
private final Set<Long> partitionIds = Sets.newHashSet();
|
||||
|
||||
public DropTableStatsStmt(TableName tableName,
|
||||
public DropStatsStmt(boolean dropExpired) {
|
||||
this.dropExpired = dropExpired;
|
||||
this.tableName = null;
|
||||
this.partitionNames = null;
|
||||
this.columnNames = null;
|
||||
}
|
||||
|
||||
public DropStatsStmt(TableName tableName,
|
||||
PartitionNames partitionNames, List<String> columnNames) {
|
||||
this.tableName = tableName;
|
||||
this.partitionNames = partitionNames;
|
||||
this.columnNames = columnNames;
|
||||
dropExpired = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws UserException {
|
||||
super.analyze(analyzer);
|
||||
|
||||
if (dropExpired) {
|
||||
return;
|
||||
}
|
||||
if (tableName != null) {
|
||||
tableName.analyze(analyzer);
|
||||
|
||||
@ -44,10 +44,10 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -808,4 +808,8 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public Map<Long, Table> getIdToTable() {
|
||||
return new HashMap<>(idToTable);
|
||||
}
|
||||
}
|
||||
|
||||
@ -216,6 +216,7 @@ import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.statistics.AnalysisManager;
|
||||
import org.apache.doris.statistics.AnalysisTaskScheduler;
|
||||
import org.apache.doris.statistics.StatisticsCache;
|
||||
import org.apache.doris.statistics.StatisticsCleaner;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.FQDNManager;
|
||||
import org.apache.doris.system.Frontend;
|
||||
@ -450,6 +451,8 @@ public class Env {
|
||||
|
||||
private ResourceGroupMgr resourceGroupMgr;
|
||||
|
||||
private StatisticsCleaner statisticsCleaner;
|
||||
|
||||
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
|
||||
if (nodeType == null) {
|
||||
// get all
|
||||
@ -650,6 +653,7 @@ public class Env {
|
||||
this.fqdnManager = new FQDNManager(systemInfo);
|
||||
if (!isCheckpointCatalog) {
|
||||
this.analysisManager = new AnalysisManager();
|
||||
this.statisticsCleaner = new StatisticsCleaner();
|
||||
}
|
||||
this.globalFunctionMgr = new GlobalFunctionMgr();
|
||||
this.resourceGroupMgr = new ResourceGroupMgr();
|
||||
@ -874,6 +878,9 @@ public class Env {
|
||||
// If not using bdb, we need to notify the FE type transfer manually.
|
||||
notifyNewFETypeTransfer(FrontendNodeType.MASTER);
|
||||
}
|
||||
if (statisticsCleaner != null) {
|
||||
statisticsCleaner.start();
|
||||
}
|
||||
}
|
||||
|
||||
// wait until FE is ready.
|
||||
@ -5358,4 +5365,8 @@ public class Env {
|
||||
public GlobalFunctionMgr getGlobalFunctionMgr() {
|
||||
return globalFunctionMgr;
|
||||
}
|
||||
|
||||
public StatisticsCleaner getStatisticsCleaner() {
|
||||
return statisticsCleaner;
|
||||
}
|
||||
}
|
||||
|
||||
@ -576,6 +576,10 @@ public class OlapTable extends Table {
|
||||
return indexIdToMeta;
|
||||
}
|
||||
|
||||
public Map<Long, MaterializedIndexMeta> getCopyOfIndexIdToMeta() {
|
||||
return new HashMap<>(indexIdToMeta);
|
||||
}
|
||||
|
||||
public Map<Long, MaterializedIndexMeta> getCopiedIndexIdToMeta() {
|
||||
return new HashMap<>(indexIdToMeta);
|
||||
}
|
||||
|
||||
@ -992,5 +992,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
}
|
||||
internalCatalog = (InternalCatalog) idToCatalog.get(InternalCatalog.INTERNAL_CATALOG_ID);
|
||||
}
|
||||
|
||||
public Map<Long, CatalogIf> getIdToCatalog() {
|
||||
return idToCatalog;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -3516,4 +3516,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
LOG.info("finished replay databases from image");
|
||||
return newChecksum;
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<Long, Database> getIdToDb() {
|
||||
return new ConcurrentHashMap<>(idToDb);
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,7 +86,7 @@ import org.apache.doris.analysis.DropRepositoryStmt;
|
||||
import org.apache.doris.analysis.DropResourceStmt;
|
||||
import org.apache.doris.analysis.DropRoleStmt;
|
||||
import org.apache.doris.analysis.DropSqlBlockRuleStmt;
|
||||
import org.apache.doris.analysis.DropTableStatsStmt;
|
||||
import org.apache.doris.analysis.DropStatsStmt;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.analysis.DropUserStmt;
|
||||
import org.apache.doris.analysis.GrantStmt;
|
||||
@ -327,9 +327,8 @@ public class DdlExecutor {
|
||||
env.getAuth().alterUser((AlterUserStmt) ddlStmt);
|
||||
} else if (ddlStmt instanceof CleanProfileStmt) {
|
||||
ProfileManager.getInstance().cleanProfile();
|
||||
} else if (ddlStmt instanceof DropTableStatsStmt) {
|
||||
DropTableStatsStmt stmt = (DropTableStatsStmt) ddlStmt;
|
||||
StatisticsRepository.dropTableStatistics(stmt);
|
||||
} else if (ddlStmt instanceof DropStatsStmt) {
|
||||
env.getAnalysisManager().dropStats((DropStatsStmt) ddlStmt);
|
||||
} else {
|
||||
throw new DdlException("Unknown statement.");
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.analysis.AnalyzeStmt;
|
||||
import org.apache.doris.analysis.DropStatsStmt;
|
||||
import org.apache.doris.analysis.ShowAnalyzeStmt;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -270,4 +271,12 @@ public class AnalysisManager {
|
||||
}
|
||||
}
|
||||
|
||||
public void dropStats(DropStatsStmt dropStatsStmt) {
|
||||
if (dropStatsStmt.dropExpired) {
|
||||
Env.getCurrentEnv().getStatisticsCleaner().clear();
|
||||
return;
|
||||
}
|
||||
StatisticsRepository.dropTableStatistics(dropStatsStmt);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -63,4 +63,8 @@ public class StatisticConstants {
|
||||
|
||||
public static final long PRELOAD_RETRY_INTERVAL_IN_SECONDS = TimeUnit.SECONDS.toMillis(10);
|
||||
|
||||
public static final int ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS = 7;
|
||||
|
||||
public static final int FETCH_LIMIT = 10000;
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,260 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedIndexMeta;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Maintenance the internal statistics table.
|
||||
* Delete rows that corresponding DB/Table/Column not exists anymore.
|
||||
*/
|
||||
public class StatisticsCleaner extends MasterDaemon {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsCleaner.class);
|
||||
|
||||
private OlapTable colStatsTbl;
|
||||
private OlapTable histStatsTbl;
|
||||
|
||||
private Map<Long, CatalogIf> idToCatalog;
|
||||
|
||||
/* Internal DB only */
|
||||
private Map<Long, Database> idToDb;
|
||||
|
||||
/* Internal tbl only */
|
||||
private Map<Long, Table> idToTbl;
|
||||
|
||||
private Map<Long, MaterializedIndexMeta> idToMVIdx;
|
||||
|
||||
public StatisticsCleaner() {
|
||||
super("Statistics Table Cleaner",
|
||||
TimeUnit.HOURS.toMillis(StatisticConstants.STATISTIC_CLEAN_INTERVAL_IN_HOURS));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
return;
|
||||
}
|
||||
clear();
|
||||
}
|
||||
|
||||
public synchronized void clear() {
|
||||
if (!init()) {
|
||||
return;
|
||||
}
|
||||
clear(colStatsTbl);
|
||||
clear(histStatsTbl);
|
||||
}
|
||||
|
||||
private void clear(OlapTable statsTbl) {
|
||||
ExpiredStats expiredStats = null;
|
||||
do {
|
||||
expiredStats = findExpiredStats(statsTbl);
|
||||
deleteExpiredStats(expiredStats);
|
||||
} while (!expiredStats.isEmpty());
|
||||
}
|
||||
|
||||
private boolean init() {
|
||||
try {
|
||||
colStatsTbl =
|
||||
(OlapTable) StatisticsUtil
|
||||
.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
|
||||
SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME,
|
||||
StatisticConstants.STATISTIC_TBL_NAME);
|
||||
histStatsTbl =
|
||||
(OlapTable) StatisticsUtil
|
||||
.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
|
||||
SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME,
|
||||
StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to init stats cleaner", t);
|
||||
return false;
|
||||
}
|
||||
|
||||
idToCatalog = Env.getCurrentEnv().getCatalogMgr().getIdToCatalog();
|
||||
idToDb = Env.getCurrentEnv().getInternalCatalog().getIdToDb();
|
||||
idToTbl = constructTblMap();
|
||||
idToMVIdx = constructIdxMap();
|
||||
return true;
|
||||
}
|
||||
|
||||
private Map<Long, Table> constructTblMap() {
|
||||
Map<Long, Table> idToTbl = new HashMap<>();
|
||||
for (Database db : idToDb.values()) {
|
||||
idToTbl.putAll(db.getIdToTable());
|
||||
}
|
||||
return idToTbl;
|
||||
}
|
||||
|
||||
private Map<Long, MaterializedIndexMeta> constructIdxMap() {
|
||||
Map<Long, MaterializedIndexMeta> idToMVIdx = new HashMap<>();
|
||||
for (Table t : idToTbl.values()) {
|
||||
if (t instanceof OlapTable) {
|
||||
OlapTable olapTable = (OlapTable) t;
|
||||
olapTable.getCopyOfIndexIdToMeta()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(idx -> idx.getValue().getDefineStmt() != null)
|
||||
.forEach(e -> idToMVIdx.put(e.getKey(), e.getValue()));
|
||||
}
|
||||
}
|
||||
return idToMVIdx;
|
||||
}
|
||||
|
||||
private void deleteExpiredStats(ExpiredStats expiredStats) {
|
||||
doDelete("catalog_id", expiredStats.expiredCatalog.stream()
|
||||
.map(String::valueOf).collect(Collectors.toList()));
|
||||
doDelete("db_id", expiredStats.expiredDatabase.stream()
|
||||
.map(String::valueOf).collect(Collectors.toList()));
|
||||
doDelete("tbl_id", expiredStats.expiredTable.stream()
|
||||
.map(String::valueOf).collect(Collectors.toList()));
|
||||
doDelete("idx_id", expiredStats.expiredIdxId.stream()
|
||||
.map(String::valueOf).collect(Collectors.toList()));
|
||||
doDelete("id", expiredStats.ids.stream()
|
||||
.map(String::valueOf).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
private void doDelete(String/*col name*/ colName, List<String> pred) {
|
||||
String deleteTemplate = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE ${left} IN (${right})";
|
||||
if (CollectionUtils.isEmpty(pred)) {
|
||||
return;
|
||||
}
|
||||
String right = pred.stream().map(s -> "'" + s + "'").collect(Collectors.joining(","));
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("left", colName);
|
||||
params.put("right", right);
|
||||
String sql = new StringSubstitutor(params).replace(deleteTemplate);
|
||||
try {
|
||||
StatisticsUtil.execUpdate(sql);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to delete expired stats!", e);
|
||||
}
|
||||
}
|
||||
|
||||
public ExpiredStats findExpiredStats(OlapTable statsTbl) {
|
||||
ExpiredStats expiredStats = new ExpiredStats();
|
||||
long rowCount = statsTbl.getRowCount();
|
||||
long pos = 0;
|
||||
while (pos < rowCount
|
||||
&& !expiredStats.isFull()) {
|
||||
List<ResultRow> rows = StatisticsRepository.fetchStatsFullName(StatisticConstants.FETCH_LIMIT, pos);
|
||||
pos += StatisticConstants.FETCH_LIMIT;
|
||||
for (ResultRow r : rows) {
|
||||
try {
|
||||
String id = r.getColumnValue("id");
|
||||
long catalogId = Long.parseLong(r.getColumnValue("catalog_id"));
|
||||
if (!idToCatalog.containsKey(catalogId)) {
|
||||
expiredStats.expiredCatalog.add(catalogId);
|
||||
continue;
|
||||
}
|
||||
long dbId = Long.parseLong(r.getColumnValue("db_id"));
|
||||
if (!idToDb.containsKey(dbId)) {
|
||||
expiredStats.expiredDatabase.add(dbId);
|
||||
continue;
|
||||
}
|
||||
long tblId = Long.parseLong(r.getColumnValue("tbl_id"));
|
||||
if (!idToTbl.containsKey(tblId)) {
|
||||
expiredStats.expiredTable.add(tblId);
|
||||
continue;
|
||||
}
|
||||
|
||||
long idxId = Long.parseLong(r.getColumnValue("idx_id"));
|
||||
if (idxId != -1 && !idToMVIdx.containsKey(idxId)) {
|
||||
expiredStats.expiredIdxId.add(idxId);
|
||||
continue;
|
||||
}
|
||||
|
||||
Table t = idToTbl.get(tblId);
|
||||
String colId = r.getColumnValue("col_id");
|
||||
if (t.getColumn(colId) == null) {
|
||||
expiredStats.ids.add(id);
|
||||
continue;
|
||||
}
|
||||
if (!(t instanceof OlapTable)) {
|
||||
continue;
|
||||
}
|
||||
OlapTable olapTable = (OlapTable) t;
|
||||
String partIdStr = r.getColumnValue("part_id");
|
||||
if (partIdStr == null) {
|
||||
continue;
|
||||
}
|
||||
long partId = Long.parseLong(partIdStr);
|
||||
if (!olapTable.getPartitionIds().contains(partId)) {
|
||||
expiredStats.ids.add(id);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error occurred when retrieving expired stats", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return expiredStats;
|
||||
}
|
||||
|
||||
private static class ExpiredStats {
|
||||
Set<Long> expiredCatalog = new HashSet<>();
|
||||
Set<Long> expiredDatabase = new HashSet<>();
|
||||
Set<Long> expiredTable = new HashSet<>();
|
||||
|
||||
Set<Long> expiredIdxId = new HashSet<>();
|
||||
|
||||
Set<String> ids = new HashSet<>();
|
||||
|
||||
public boolean isFull() {
|
||||
return expiredCatalog.size() >= Config.expr_children_limit
|
||||
|| expiredDatabase.size() >= Config.expr_children_limit
|
||||
|| expiredTable.size() >= Config.expr_children_limit
|
||||
|| expiredIdxId.size() >= Config.expr_children_limit
|
||||
|| ids.size() >= Config.expr_children_limit;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return expiredCatalog.isEmpty()
|
||||
&& expiredDatabase.isEmpty()
|
||||
&& expiredTable.isEmpty()
|
||||
&& expiredIdxId.isEmpty()
|
||||
&& ids.size() < Config.expr_children_limit / 100;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -18,7 +18,7 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.analysis.AlterColumnStatsStmt;
|
||||
import org.apache.doris.analysis.DropTableStatsStmt;
|
||||
import org.apache.doris.analysis.DropStatsStmt;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -90,6 +90,12 @@ public class StatisticsRepository {
|
||||
+ " ORDER BY update_time DESC LIMIT "
|
||||
+ StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE;
|
||||
|
||||
private static final String FETCH_STATS_FULL_NAME =
|
||||
"SELECT id, catalog_id, db_id, tbl_id, idx_id, col_id, part_id FROM "
|
||||
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.STATISTIC_TBL_NAME
|
||||
+ " ORDER BY update_time "
|
||||
+ "LIMIT ${limit} OFFSET ${offset}";
|
||||
|
||||
public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) {
|
||||
ResultRow resultRow = queryColumnStatisticById(tableId, colName);
|
||||
if (resultRow == null) {
|
||||
@ -272,7 +278,7 @@ public class StatisticsRepository {
|
||||
.updateColStatsCache(objects.table.getId(), -1, colName, builder.build());
|
||||
}
|
||||
|
||||
public static void dropTableStatistics(DropTableStatsStmt dropTableStatsStmt) {
|
||||
public static void dropTableStatistics(DropStatsStmt dropTableStatsStmt) {
|
||||
Long dbId = dropTableStatsStmt.getDbId();
|
||||
Set<Long> tbIds = dropTableStatsStmt.getTbIds();
|
||||
Set<String> cols = dropTableStatsStmt.getColumnNames();
|
||||
@ -284,4 +290,11 @@ public class StatisticsRepository {
|
||||
public static List<ResultRow> fetchRecentStatsUpdatedCol() {
|
||||
return StatisticsUtil.execStatisticQuery(FETCH_RECENT_STATS_UPDATED_COL);
|
||||
}
|
||||
|
||||
public static List<ResultRow> fetchStatsFullName(long limit, long offset) {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("limit", String.valueOf(limit));
|
||||
params.put("offset", String.valueOf(offset));
|
||||
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(FETCH_STATS_FULL_NAME));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,120 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Maintenance the internal statistics table.
|
||||
* Delete rows that corresponding DB/Table/Column not exists anymore.
|
||||
*/
|
||||
public class StatisticsTableCleaner extends MasterDaemon {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsTableCleaner.class);
|
||||
|
||||
public StatisticsTableCleaner() {
|
||||
super("Statistics Table Cleaner",
|
||||
StatisticConstants.STATISTIC_CLEAN_INTERVAL_IN_HOURS * 3600 * 1000);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
if (Env.getCurrentEnv().isMaster()) {
|
||||
deleteExpiredStatistics();
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteExpiredStatistics() {
|
||||
List<Database> databases = Env.getCurrentEnv().getInternalCatalog().getDbs();
|
||||
deleteByDB(databases.stream().map(Database::getId).map(String::valueOf).collect(Collectors.toList()));
|
||||
List<String> tblIds = new ArrayList<>();
|
||||
List<String> colIds = new ArrayList<>();
|
||||
List<String> partitionIds = new ArrayList<>();
|
||||
for (Database database : databases) {
|
||||
List<Table> tables = database.getTables();
|
||||
for (Table table : tables) {
|
||||
tblIds.add(String.valueOf(table.getId()));
|
||||
if (table instanceof OlapTable) {
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
partitionIds.addAll(olapTable.getPartitionIds()
|
||||
.stream().map(String::valueOf).collect(Collectors.toList()));
|
||||
}
|
||||
colIds.addAll(table.getColumns().stream().map(Column::getName)
|
||||
.map(String::valueOf).collect(Collectors.toList()));
|
||||
}
|
||||
}
|
||||
deleteByTblId(tblIds);
|
||||
deleteByColId(colIds);
|
||||
deleteByPartitionId(partitionIds);
|
||||
}
|
||||
|
||||
private void deleteByPartitionId(List<String> partitionIds) {
|
||||
deleteExpired("part_id", partitionIds);
|
||||
}
|
||||
|
||||
private void deleteByTblId(List<String> tblIds) {
|
||||
deleteExpired("tbl_id", tblIds);
|
||||
}
|
||||
|
||||
private void deleteByDB(List<String> dbIds) {
|
||||
deleteExpired("db_id", dbIds);
|
||||
}
|
||||
|
||||
private void deleteByColId(List<String> colId) {
|
||||
deleteExpired("col_id", colId);
|
||||
}
|
||||
|
||||
private void deleteExpired(String colName, List<String> constants) {
|
||||
// TODO: must promise count of children of predicate is less than the FE limits.
|
||||
String deleteTemplate = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.STATISTIC_TBL_NAME + "WHERE ${colName} NOT IN ${predicate}";
|
||||
StringJoiner predicateBuilder = new StringJoiner(",", "(", ")");
|
||||
constants.forEach(predicateBuilder::add);
|
||||
Map<String, String> map = new HashMap<String, String>() {
|
||||
{
|
||||
put("colName", colName);
|
||||
put("predicate", predicateBuilder.toString());
|
||||
}
|
||||
};
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(map);
|
||||
try {
|
||||
StatisticsUtil.execUpdate(stringSubstitutor.replace(deleteTemplate));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Remove expired statistics failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -489,6 +489,7 @@ import org.apache.doris.qe.SqlModeHelper;
|
||||
keywordMap.put("execute", new Integer(SqlParserSymbols.KW_EXECUTE));
|
||||
keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES));
|
||||
keywordMap.put("ignore", new Integer(SqlParserSymbols.KW_IGNORE));
|
||||
keywordMap.put("expired", new Integer(SqlParserSymbols.KW_EXPIRED));
|
||||
}
|
||||
|
||||
// map from token id to token description
|
||||
|
||||
@ -2,8 +2,24 @@
|
||||
-- !sql --
|
||||
5 4 0 1 8 20
|
||||
5 4 0 1 8 20
|
||||
5 4 0 1 8 20
|
||||
5 4 0 1 8 20
|
||||
5 4 0 1 8 20
|
||||
5 4 0 1 8 20
|
||||
5 5 0 1 7 5
|
||||
5 5 0 1 7 5
|
||||
5 5 0 1 7 5
|
||||
5 5 0 1 7 5
|
||||
5 5 0 1 7 5
|
||||
5 5 0 1 7 5
|
||||
5 5 0 1 9 20
|
||||
5 5 0 1 9 20
|
||||
5 5 0 1 9 20
|
||||
5 5 0 1 9 20
|
||||
5 5 0 1 9 20
|
||||
5 5 0 1 9 20
|
||||
|
||||
-- !sql --
|
||||
5 5 0 1 7 5
|
||||
5 5 0 1 7 5
|
||||
|
||||
|
||||
@ -16,12 +16,79 @@
|
||||
// under the License.
|
||||
|
||||
suite("analyze_test") {
|
||||
|
||||
def dbName1 = "analyze_test_db_1"
|
||||
|
||||
def tblName1 = "${dbName1}.analyze_test_tbl_1"
|
||||
|
||||
def dbName2 = "analyze_test_db_2"
|
||||
|
||||
def tblName2 = "${dbName2}.analyze_test_tbl_2"
|
||||
|
||||
def dbName3 = "analyze_test_db_3"
|
||||
|
||||
def tblName3 = "${dbName3}.analyze_test_tbl_3"
|
||||
|
||||
|
||||
sql """
|
||||
DROP TABLE IF EXISTS test_table_alter_column_stats
|
||||
DROP DATABASE IF EXISTS ${dbName1};
|
||||
"""
|
||||
sql """CREATE TABLE test_table_alter_column_stats (col1 varchar(11451) not null, col2 int not null, col3 int not null)
|
||||
UNIQUE KEY(col1)
|
||||
DISTRIBUTED BY HASH(col1)
|
||||
|
||||
|
||||
sql """
|
||||
CREATE DATABASE ${dbName1};
|
||||
"""
|
||||
|
||||
sql """
|
||||
DROP DATABASE IF EXISTS ${dbName2}
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE DATABASE ${dbName2};
|
||||
"""
|
||||
|
||||
sql """
|
||||
DROP DATABASE IF EXISTS ${dbName3}
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE DATABASE ${dbName3};
|
||||
"""
|
||||
|
||||
|
||||
sql """
|
||||
DROP TABLE IF EXISTS ${tblName1}
|
||||
"""
|
||||
|
||||
sql """CREATE TABLE ${tblName1} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null)
|
||||
UNIQUE KEY(analyze_test_col1)
|
||||
DISTRIBUTED BY HASH(analyze_test_col1)
|
||||
BUCKETS 3
|
||||
PROPERTIES(
|
||||
"replication_num"="1",
|
||||
"enable_unique_key_merge_on_write"="true"
|
||||
);"""
|
||||
|
||||
sql """
|
||||
DROP TABLE IF EXISTS ${tblName2}
|
||||
"""
|
||||
|
||||
sql """CREATE TABLE ${tblName2} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null)
|
||||
UNIQUE KEY(analyze_test_col1)
|
||||
DISTRIBUTED BY HASH(analyze_test_col1)
|
||||
BUCKETS 3
|
||||
PROPERTIES(
|
||||
"replication_num"="1",
|
||||
"enable_unique_key_merge_on_write"="true"
|
||||
);"""
|
||||
|
||||
sql """
|
||||
DROP TABLE IF EXISTS ${tblName3}
|
||||
"""
|
||||
|
||||
sql """CREATE TABLE ${tblName3} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null)
|
||||
UNIQUE KEY(analyze_test_col1)
|
||||
DISTRIBUTED BY HASH(analyze_test_col1)
|
||||
BUCKETS 3
|
||||
PROPERTIES(
|
||||
"replication_num"="1",
|
||||
@ -29,23 +96,66 @@ suite("analyze_test") {
|
||||
);"""
|
||||
|
||||
|
||||
sql """insert into test_table_alter_column_stats values(1, 2, 3);"""
|
||||
sql """insert into test_table_alter_column_stats values(4, 5, 6);"""
|
||||
sql """insert into test_table_alter_column_stats values(7, 1, 9);"""
|
||||
sql """insert into test_table_alter_column_stats values(3, 8, 2);"""
|
||||
sql """insert into test_table_alter_column_stats values(5, 2, 1);"""
|
||||
|
||||
sql """delete from __internal_schema.column_statistics where col_id in ('col1', 'col2', 'col3')"""
|
||||
sql """insert into ${tblName1} values(1, 2, 3);"""
|
||||
sql """insert into ${tblName1} values(4, 5, 6);"""
|
||||
sql """insert into ${tblName1} values(7, 1, 9);"""
|
||||
sql """insert into ${tblName1} values(3, 8, 2);"""
|
||||
sql """insert into ${tblName1} values(5, 2, 1);"""
|
||||
|
||||
sql """insert into ${tblName2} values(1, 2, 3);"""
|
||||
sql """insert into ${tblName2} values(4, 5, 6);"""
|
||||
sql """insert into ${tblName2} values(7, 1, 9);"""
|
||||
sql """insert into ${tblName2} values(3, 8, 2);"""
|
||||
sql """insert into ${tblName2} values(5, 2, 1);"""
|
||||
|
||||
sql """insert into ${tblName3} values(1, 2, 3);"""
|
||||
sql """insert into ${tblName3} values(4, 5, 6);"""
|
||||
sql """insert into ${tblName3} values(7, 1, 9);"""
|
||||
sql """insert into ${tblName3} values(3, 8, 2);"""
|
||||
sql """insert into ${tblName3} values(5, 2, 1);"""
|
||||
|
||||
sql """
|
||||
analyze sync table test_table_alter_column_stats;
|
||||
delete from __internal_schema.column_statistics where col_id in ('analyze_test_col1', 'analyze_test_col2', 'analyze_test_col3')
|
||||
"""
|
||||
|
||||
sql """
|
||||
analyze sync table ${tblName1};
|
||||
"""
|
||||
|
||||
sql """
|
||||
analyze sync table ${tblName2};
|
||||
"""
|
||||
|
||||
sql """
|
||||
analyze sync table ${tblName3};
|
||||
"""
|
||||
|
||||
order_qt_sql """
|
||||
select count, ndv, null_count, min, max, data_size_in_bytes from __internal_schema.column_statistics where
|
||||
col_id in ('col1', 'col2', 'col3') order by col_id
|
||||
col_id in ('analyze_test_col1', 'analyze_test_col2', 'analyze_test_col3') order by col_id
|
||||
"""
|
||||
|
||||
sql """
|
||||
ALTER TABLE ${tblName3} DROP COLUMN analyze_test_col3;
|
||||
"""
|
||||
sql """
|
||||
DROP TABLE IF EXISTS test_table_alter_column_stats
|
||||
ALTER TABLE ${tblName3} DROP COLUMN analyze_test_col2;
|
||||
"""
|
||||
|
||||
sql """
|
||||
DROP TABLE ${tblName2}
|
||||
"""
|
||||
|
||||
sql """
|
||||
DROP DATABASE ${dbName1}
|
||||
"""
|
||||
|
||||
sql """
|
||||
DROP EXPIRED STATS
|
||||
"""
|
||||
|
||||
order_qt_sql """
|
||||
select count, ndv, null_count, min, max, data_size_in_bytes from __internal_schema.column_statistics where
|
||||
col_id in ('analyze_test_col1', 'analyze_test_col2', 'analyze_test_col3') order by col_id
|
||||
"""
|
||||
}
|
||||
Reference in New Issue
Block a user