From 62a46876b6284249935b1158f5bea44ed11f5428 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Mon, 22 Jan 2024 13:20:34 +0800 Subject: [PATCH] [improvement](statistics) Optimize drop stats operation (#30144) Before, drop stats operation need to call columns * followers times of isMaster() function and the same times of rpc to drop remote column stats. This pr is to reduce the rpc calls and use more efficient way to check master node instead of using isMaster() --- .../apache/doris/analysis/DropStatsStmt.java | 19 +++++ .../doris/service/FrontendServiceImpl.java | 12 +++- .../doris/statistics/AnalysisManager.java | 70 ++++++++++++++----- .../statistics/InvalidateStatsTarget.java | 48 +++++++++++++ .../doris/statistics/StatisticsCache.java | 17 +---- 5 files changed, 133 insertions(+), 33 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java index ac08f01f31..5e3bd20c0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropStatsStmt.java @@ -53,7 +53,10 @@ public class DropStatsStmt extends DdlStmt { private Set columnNames; // Flag to drop external table row count in table_statistics. private boolean dropTableRowCount; + private boolean isAllColumns; + private long catalogId; + private long dbId; private long tblId; public DropStatsStmt(boolean dropExpired) { @@ -100,10 +103,13 @@ public class DropStatsStmt extends DdlStmt { DatabaseIf db = catalog.getDbOrAnalysisException(dbName); TableIf table = db.getTableOrAnalysisException(tblName); tblId = table.getId(); + dbId = db.getId(); + catalogId = catalog.getId(); // check permission checkAnalyzePriv(db.getFullName(), table.getName()); // check columnNames if (columnNames != null) { + isAllColumns = false; for (String cName : columnNames) { if (table.getColumn(cName) == null) { ErrorReport.reportAnalysisException( @@ -115,6 +121,7 @@ public class DropStatsStmt extends DdlStmt { } } } else { + isAllColumns = true; columnNames = table.getColumns().stream().map(Column::getName).collect(Collectors.toSet()); } } @@ -123,10 +130,22 @@ public class DropStatsStmt extends DdlStmt { return tblId; } + public long getDbId() { + return dbId; + } + + public long getCatalogIdId() { + return catalogId; + } + public Set getColumnNames() { return columnNames; } + public boolean isAllColumns() { + return isAllColumns; + } + public boolean dropTableRowCount() { return dropTableRowCount; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9a98576b11..1a1ceb799c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -95,9 +95,12 @@ import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.qe.VariableMgr; import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor; +import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.InvalidateStatsTarget; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.StatisticsCacheKey; +import org.apache.doris.statistics.TableStatsMeta; import org.apache.doris.statistics.query.QueryStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -3041,8 +3044,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @Override public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request) throws TException { - StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); - Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName); + InvalidateStatsTarget target = GsonUtils.GSON.fromJson(request.key, InvalidateStatsTarget.class); + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + TableStatsMeta tableStats = analysisManager.findTableStatsStatus(target.tableId); + if (tableStats == null) { + return new TStatus(TStatusCode.OK); + } + analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId, target.columns, tableStats); return new TStatus(TStatusCode.OK); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 8f927694dc..fe64fb1414 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -58,6 +58,9 @@ import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.DBObjects; import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.system.Frontend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -646,23 +649,16 @@ public class AnalysisManager implements Writable { } Set cols = dropStatsStmt.getColumnNames(); + long catalogId = dropStatsStmt.getCatalogIdId(); + long dbId = dropStatsStmt.getDbId(); long tblId = dropStatsStmt.getTblId(); TableStatsMeta tableStats = findTableStatsStatus(dropStatsStmt.getTblId()); if (tableStats == null) { return; } - if (cols == null) { - tableStats.reset(); - } else { - dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn); - StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache(); - for (String col : cols) { - statisticsCache.syncInvalidate(tblId, -1L, col); - } - tableStats.updatedTime = 0; - } - tableStats.userInjected = false; - logCreateTableStats(tableStats); + invalidateLocalStats(catalogId, dbId, tblId, cols, tableStats); + // Drop stats ddl is master only operation. + invalidateRemoteStats(catalogId, dbId, tblId, cols, dropStatsStmt.isAllColumns()); StatisticsRepository.dropStatistics(tblId, cols); } @@ -671,15 +667,55 @@ public class AnalysisManager implements Writable { if (tableStats == null) { return; } + long catalogId = table.getDatabase().getCatalog().getId(); + long dbId = table.getDatabase().getId(); + long tableId = table.getId(); Set cols = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet()); + invalidateLocalStats(catalogId, dbId, tableId, cols, tableStats); + // Drop stats ddl is master only operation. + invalidateRemoteStats(catalogId, dbId, tableId, cols, true); + StatisticsRepository.dropStatistics(table.getId(), cols); + } + + public void invalidateLocalStats(long catalogId, long dbId, long tableId, + Set columns, TableStatsMeta tableStats) { + if (tableStats == null) { + return; + } StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache(); - for (String col : cols) { - tableStats.removeColumn(col); - statisticsCache.syncInvalidate(table.getId(), -1L, col); + if (columns == null) { + TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId); + columns = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet()); + } + for (String column : columns) { + tableStats.removeColumn(column); + statisticsCache.invalidate(tableId, -1, column); } tableStats.updatedTime = 0; - logCreateTableStats(tableStats); - StatisticsRepository.dropStatistics(table.getId(), cols); + tableStats.userInjected = false; + } + + public void invalidateRemoteStats(long catalogId, long dbId, long tableId, + Set columns, boolean isAllColumns) { + InvalidateStatsTarget target = new InvalidateStatsTarget(catalogId, dbId, tableId, columns, isAllColumns); + TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest(); + request.key = GsonUtils.GSON.toJson(target); + StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache(); + SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode(); + boolean success = true; + for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) { + // Skip master + if (selfNode.equals(frontend.getHost())) { + continue; + } + success = success && statisticsCache.invalidateStats(frontend, request); + } + if (!success) { + // If any rpc failed, use edit log to sync table stats to non-master FEs. + LOG.warn("Failed to invalidate all remote stats by rpc for table {}, use edit log.", tableId); + TableStatsMeta tableStats = findTableStatsStatus(tableId); + logCreateTableStats(tableStats); + } } public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java new file mode 100644 index 0000000000..e49048f894 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/InvalidateStatsTarget.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import com.google.gson.annotations.SerializedName; + +import java.util.Set; + +public class InvalidateStatsTarget { + + @SerializedName("catalogId") + public final long catalogId; + + @SerializedName("dbId") + public final long dbId; + + @SerializedName("tableId") + public final long tableId; + + @SerializedName("columns") + public final Set columns; + + public InvalidateStatsTarget(long catalogId, long dbId, long tableId, Set columns, boolean isAllColumns) { + this.catalogId = catalogId; + this.dbId = dbId; + this.tableId = tableId; + if (isAllColumns) { + this.columns = null; + } else { + this.columns = columns; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index fbec9a60fa..0cf2808222 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -137,19 +137,6 @@ public class StatisticsCache { columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName)); } - public void syncInvalidate(long tblId, long idxId, String colName) { - StatisticsCacheKey cacheKey = new StatisticsCacheKey(tblId, idxId, colName); - columnStatisticsCache.synchronous().invalidate(cacheKey); - TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest(); - request.key = GsonUtils.GSON.toJson(cacheKey); - for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) { - if (StatisticsUtil.isMaster(frontend)) { - continue; - } - invalidateStats(frontend, request); - } - } - public void updateColStatsCache(long tblId, long idxId, String colName, ColumnStatistic statistic) { columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic)); } @@ -261,7 +248,7 @@ public class StatisticsCache { } @VisibleForTesting - public void invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) { + public boolean invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) { TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort()); FrontendService.Client client = null; try { @@ -269,11 +256,13 @@ public class StatisticsCache { client.invalidateStatsCache(request); } catch (Throwable t) { LOG.warn("Failed to sync invalidate to follower: {}", address, t); + return false; } finally { if (client != null) { ClientPool.frontendPool.returnObject(address, client); } } + return true; } public void putCache(StatisticsCacheKey k, ColumnStatistic c) {