diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java index ac08f01f31..5e3bd20c0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java @@ -53,7 +53,10 @@ public class DropStatsStmt extends DdlStmt { private Set columnNames; // Flag to drop external table row count in table_statistics. private boolean dropTableRowCount; + private boolean isAllColumns; + private long catalogId; + private long dbId; private long tblId; public DropStatsStmt(boolean dropExpired) { @@ -100,10 +103,13 @@ public class DropStatsStmt extends DdlStmt { DatabaseIf db = catalog.getDbOrAnalysisException(dbName); TableIf table = db.getTableOrAnalysisException(tblName); tblId = table.getId(); + dbId = db.getId(); + catalogId = catalog.getId(); // check permission checkAnalyzePriv(db.getFullName(), table.getName()); // check columnNames if (columnNames != null) { + isAllColumns = false; for (String cName : columnNames) { if (table.getColumn(cName) == null) { ErrorReport.reportAnalysisException( @@ -115,6 +121,7 @@ public class DropStatsStmt extends DdlStmt { } } } else { + isAllColumns = true; columnNames = table.getColumns().stream().map(Column::getName).collect(Collectors.toSet()); } } @@ -123,10 +130,22 @@ public class DropStatsStmt extends DdlStmt { return tblId; } + public long getDbId() { + return dbId; + } + + public long getCatalogIdId() { + return catalogId; + } + public Set getColumnNames() { return columnNames; } + public boolean isAllColumns() { + return isAllColumns; + } + public boolean dropTableRowCount() { return dropTableRowCount; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9a98576b11..1a1ceb799c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -95,9 +95,12 @@ import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.qe.VariableMgr; import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor; +import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.InvalidateStatsTarget; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticsCacheKey; +import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.statistics.query.QueryStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -3041,8 +3044,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request) throws TException { - StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); - Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName); + InvalidateStatsTarget target = GsonUtils.GSON.fromJson(request.key, InvalidateStatsTarget.class); + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + TableStatsMeta tableStats = analysisManager.findTableStatsStatus(target.tableId); + if (tableStats == null) { + return new TStatus(TStatusCode.OK); + } + analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId, target.columns, tableStats); return new TStatus(TStatusCode.OK); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 8f927694dc..fe64fb1414 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -58,6 +58,9 @@ import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.DBObjects; import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.system.Frontend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -646,23 +649,16 @@ public class AnalysisManager implements Writable { } Set cols = dropStatsStmt.getColumnNames(); + long catalogId = dropStatsStmt.getCatalogIdId(); + long dbId = dropStatsStmt.getDbId(); long tblId = dropStatsStmt.getTblId(); TableStatsMeta tableStats = findTableStatsStatus(dropStatsStmt.getTblId()); if (tableStats == null) { return; } - if (cols == null) { - tableStats.reset(); - } else { - dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn); - StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache(); - for (String col : cols) { - statisticsCache.syncInvalidate(tblId, -1L, col); - } - tableStats.updatedTime = 0; - } - tableStats.userInjected = false; - logCreateTableStats(tableStats); + invalidateLocalStats(catalogId, dbId, tblId, cols, tableStats); + // Drop stats ddl is master only operation. + invalidateRemoteStats(catalogId, dbId, tblId, cols, dropStatsStmt.isAllColumns()); StatisticsRepository.dropStatistics(tblId, cols); } @@ -671,15 +667,55 @@ public class AnalysisManager implements Writable { if (tableStats == null) { return; } + long catalogId = table.getDatabase().getCatalog().getId(); + long dbId = table.getDatabase().getId(); + long tableId = table.getId(); Set cols = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet()); + invalidateLocalStats(catalogId, dbId, tableId, cols, tableStats); + // Drop stats ddl is master only operation. + invalidateRemoteStats(catalogId, dbId, tableId, cols, true); + StatisticsRepository.dropStatistics(table.getId(), cols); + } + + public void invalidateLocalStats(long catalogId, long dbId, long tableId, + Set columns, TableStatsMeta tableStats) { + if (tableStats == null) { + return; + } StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache(); - for (String col : cols) { - tableStats.removeColumn(col); - statisticsCache.syncInvalidate(table.getId(), -1L, col); + if (columns == null) { + TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId); + columns = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet()); + } + for (String column : columns) { + tableStats.removeColumn(column); + statisticsCache.invalidate(tableId, -1, column); } tableStats.updatedTime = 0; - logCreateTableStats(tableStats); - StatisticsRepository.dropStatistics(table.getId(), cols); + tableStats.userInjected = false; + } + + public void invalidateRemoteStats(long catalogId, long dbId, long tableId, + Set columns, boolean isAllColumns) { + InvalidateStatsTarget target = new InvalidateStatsTarget(catalogId, dbId, tableId, columns, isAllColumns); + TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest(); + request.key = GsonUtils.GSON.toJson(target); + StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache(); + SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode(); + boolean success = true; + for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) { + // Skip master + if (selfNode.equals(frontend.getHost())) { + continue; + } + success = success && statisticsCache.invalidateStats(frontend, request); + } + if (!success) { + // If any rpc failed, use edit log to sync table stats to non-master FEs. + LOG.warn("Failed to invalidate all remote stats by rpc for table {}, use edit log.", tableId); + TableStatsMeta tableStats = findTableStatsStatus(tableId); + logCreateTableStats(tableStats); + } } public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java new file mode 100644 index 0000000000..e49048f894 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import com.google.gson.annotations.SerializedName; + +import java.util.Set; + +public class InvalidateStatsTarget { + + @SerializedName("catalogId") + public final long catalogId; + + @SerializedName("dbId") + public final long dbId; + + @SerializedName("tableId") + public final long tableId; + + @SerializedName("columns") + public final Set columns; + + public InvalidateStatsTarget(long catalogId, long dbId, long tableId, Set columns, boolean isAllColumns) { + this.catalogId = catalogId; + this.dbId = dbId; + this.tableId = tableId; + if (isAllColumns) { + this.columns = null; + } else { + this.columns = columns; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index fbec9a60fa..0cf2808222 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -137,19 +137,6 @@ public class StatisticsCache { columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName)); } - public void syncInvalidate(long tblId, long idxId, String colName) { - StatisticsCacheKey cacheKey = new StatisticsCacheKey(tblId, idxId, colName); - columnStatisticsCache.synchronous().invalidate(cacheKey); - TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest(); - request.key = GsonUtils.GSON.toJson(cacheKey); - for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) { - if (StatisticsUtil.isMaster(frontend)) { - continue; - } - invalidateStats(frontend, request); - } - } - public void updateColStatsCache(long tblId, long idxId, String colName, ColumnStatistic statistic) { columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic)); } @@ -261,7 +248,7 @@ public class StatisticsCache { } @VisibleForTesting - public void invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) { + public boolean invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) { TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort()); FrontendService.Client client = null; try { @@ -269,11 +256,13 @@ public class StatisticsCache { client.invalidateStatsCache(request); } catch (Throwable t) { LOG.warn("Failed to sync invalidate to follower: {}", address, t); + return false; } finally { if (client != null) { ClientPool.frontendPool.returnObject(address, client); } } + return true; } public void putCache(StatisticsCacheKey k, ColumnStatistic c) {