[opt](stats) remove table stats when table has been removed (#23803)
This commit is contained in:
@ -20,6 +20,7 @@ package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
@ -168,8 +169,7 @@ public interface CatalogIf<T extends DatabaseIf> {
|
||||
}
|
||||
|
||||
// Return a copy of all db collection.
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public Collection<DatabaseIf> getAllDbs();
|
||||
public Collection<DatabaseIf<? extends TableIf>> getAllDbs();
|
||||
|
||||
public boolean enableAutoAnalyze();
|
||||
}
|
||||
|
||||
@ -289,7 +289,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
LOG.warn("Non catalog {} is found.", stmt.getCatalogName());
|
||||
return;
|
||||
}
|
||||
CatalogIf catalog = nameToCatalog.get(stmt.getCatalogName());
|
||||
CatalogIf<DatabaseIf<TableIf>> catalog = nameToCatalog.get(stmt.getCatalogName());
|
||||
if (catalog == null) {
|
||||
throw new DdlException("No catalog found with name: " + stmt.getCatalogName());
|
||||
}
|
||||
@ -299,7 +299,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
|
||||
lastDBOfCatalog.remove(stmt.getCatalogName());
|
||||
Env.getCurrentEnv().getQueryStats().clear(catalog.getId());
|
||||
|
||||
Env.getCurrentEnv().getAnalysisManager().removeExternalTableStats(catalog);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.DeltaLakeExternalDataBase;
|
||||
import org.apache.doris.catalog.external.EsExternalDatabase;
|
||||
import org.apache.doris.catalog.external.ExternalDatabase;
|
||||
@ -588,7 +589,7 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<DatabaseIf> getAllDbs() {
|
||||
public Collection<DatabaseIf<? extends TableIf>> getAllDbs() {
|
||||
makeSureInitialized();
|
||||
return new HashSet<>(idToDb.values());
|
||||
}
|
||||
|
||||
@ -919,6 +919,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
Env.getCurrentEnv().getEditLog().logDropTable(info);
|
||||
Env.getCurrentEnv().getQueryStats().clear(Env.getCurrentEnv().getCurrentCatalog().getId(),
|
||||
db.getId(), table.getId());
|
||||
Env.getCurrentEnv().getAnalysisManager().removeTableStats(table.getId());
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
}
|
||||
@ -3138,7 +3139,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<DatabaseIf> getAllDbs() {
|
||||
public Collection<DatabaseIf<? extends TableIf>> getAllDbs() {
|
||||
return new HashSet<>(idToDb.values());
|
||||
}
|
||||
|
||||
|
||||
@ -1107,6 +1107,10 @@ public class EditLog {
|
||||
env.getAnalysisManager().replayUpdateTableStatsStatus((TableStats) journal.getData());
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_DELETE_TABLE_STATS: {
|
||||
env.getAnalysisManager().replayTableStatsDeletion((TableStatsDeletionLog) journal.getData());
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
IOException e = new IOException();
|
||||
LOG.error("UNKNOWN Operation Type {}", opCode, e);
|
||||
@ -1944,4 +1948,8 @@ public class EditLog {
|
||||
public void logCreateTableStats(TableStats tableStats) {
|
||||
logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats);
|
||||
}
|
||||
|
||||
public void logDeleteTableStats(TableStatsDeletionLog log) {
|
||||
logEdit(OperationType.OP_DELETE_ANALYSIS_JOB, log);
|
||||
}
|
||||
}
|
||||
|
||||
@ -333,6 +333,8 @@ public class OperationType {
|
||||
|
||||
public static final short OP_UPDATE_TABLE_STATS = 455;
|
||||
|
||||
public static final short OP_DELETE_TABLE_STATS = 456;
|
||||
|
||||
/**
|
||||
* Get opcode name by op code.
|
||||
**/
|
||||
|
||||
@ -0,0 +1,42 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.persist;
|
||||
|
||||
import org.apache.doris.common.io.Writable;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
public class TableStatsDeletionLog implements Writable {
|
||||
|
||||
public final long id;
|
||||
|
||||
public TableStatsDeletionLog(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeLong(id);
|
||||
}
|
||||
|
||||
public static TableStatsDeletionLog read(DataInput dataInput) throws IOException {
|
||||
return new TableStatsDeletionLog(dataInput.readLong());
|
||||
}
|
||||
}
|
||||
@ -71,6 +71,8 @@ public class AnalysisInfo implements Writable {
|
||||
}
|
||||
|
||||
public enum ScheduleType {
|
||||
// Job created by AutoCollector is also `ONCE` type, this is because it runs once only and should be removed
|
||||
// when its information is expired
|
||||
ONCE,
|
||||
PERIOD,
|
||||
AUTOMATIC
|
||||
|
||||
@ -43,8 +43,10 @@ import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.AnalyzeDeletionLog;
|
||||
import org.apache.doris.persist.TableStatsDeletionLog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ShowResultSet;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
@ -220,7 +222,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
|
||||
private void clear() {
|
||||
clearMeta(analysisJobInfoMap, (a) ->
|
||||
clearExpiredAnalysisInfo(analysisJobInfoMap, (a) ->
|
||||
a.scheduleType.equals(ScheduleType.ONCE)
|
||||
&& System.currentTimeMillis() - a.lastExecTimeInMs
|
||||
> TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
|
||||
@ -228,7 +230,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new AnalyzeDeletionLog(id));
|
||||
return null;
|
||||
});
|
||||
clearMeta(analysisTaskInfoMap, (a) -> System.currentTimeMillis() - a.lastExecTimeInMs
|
||||
clearExpiredAnalysisInfo(analysisTaskInfoMap, (a) -> System.currentTimeMillis() - a.lastExecTimeInMs
|
||||
> TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
|
||||
(id) -> {
|
||||
Env.getCurrentEnv().getEditLog().logDeleteAnalysisTask(new AnalyzeDeletionLog(id));
|
||||
@ -236,7 +238,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
});
|
||||
}
|
||||
|
||||
private void clearMeta(Map<Long, AnalysisInfo> infoMap, Predicate<AnalysisInfo> isExpired,
|
||||
private void clearExpiredAnalysisInfo(Map<Long, AnalysisInfo> infoMap, Predicate<AnalysisInfo> isExpired,
|
||||
Function<Long, Void> writeLog) {
|
||||
synchronized (infoMap) {
|
||||
List<Long> expired = new ArrayList<>();
|
||||
@ -968,4 +970,36 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
public void removeExternalTableStats(CatalogIf<DatabaseIf<TableIf>> catalogIf) {
|
||||
if (FeConstants.runningUnitTest) {
|
||||
return;
|
||||
}
|
||||
Set<Long> tblSet = catalogIf.getAllDbs().stream()
|
||||
.map(DatabaseIf::getTables)
|
||||
.flatMap(Collection::stream)
|
||||
.map(t -> ((TableIf) t).getId())
|
||||
.collect(Collectors.toSet());
|
||||
List<Long> expiredTblIds = new ArrayList<>();
|
||||
for (Map.Entry<Long, TableStats> entry : idToTblStatsStatus.entrySet()) {
|
||||
if (tblSet.contains(entry.getKey())) {
|
||||
expiredTblIds.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
for (Long tblId : expiredTblIds) {
|
||||
removeTableStats(tblId);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeTableStats(long tblId) {
|
||||
if (!idToTblStatsStatus.containsKey(tblId)) {
|
||||
return;
|
||||
}
|
||||
TableStatsDeletionLog log = new TableStatsDeletionLog(tblId);
|
||||
Env.getCurrentEnv().getEditLog().logDeleteTableStats(log);
|
||||
replayTableStatsDeletion(log);
|
||||
}
|
||||
|
||||
public void replayTableStatsDeletion(TableStatsDeletionLog log) {
|
||||
idToTblStatsStatus.remove(log.id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -912,4 +912,8 @@ PARTITION `p599` VALUES IN (599)
|
||||
SHOW COLUMN CACHED STATS increment_analyze_test(id)
|
||||
"""
|
||||
expected_id_col_stats(inc_res, 6, 1)
|
||||
|
||||
sql """
|
||||
DROP TABLE regression_test_statistics.increment_analyze_test;
|
||||
"""
|
||||
}
|
||||
Reference in New Issue
Block a user