[improvement](statistics) Optimize drop stats operation (#30144)

Before, drop stats operation need to call columns * followers times of isMaster() function and the same times of rpc to drop remote column stats. This pr is to reduce the rpc calls and use more efficient way to check master node instead of using isMaster()
This commit is contained in:
Jibing-Li
2024-01-22 13:20:34 +08:00
committed by yiguolei
parent 3e73933857
commit 62a46876b6
5 changed files with 133 additions and 33 deletions

View File

@ -53,7 +53,10 @@ public class DropStatsStmt extends DdlStmt {
private Set<String> columnNames;
// Flag to drop external table row count in table_statistics.
private boolean dropTableRowCount;
private boolean isAllColumns;
private long catalogId;
private long dbId;
private long tblId;
public DropStatsStmt(boolean dropExpired) {
@ -100,10 +103,13 @@ public class DropStatsStmt extends DdlStmt {
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrAnalysisException(tblName);
tblId = table.getId();
dbId = db.getId();
catalogId = catalog.getId();
// check permission
checkAnalyzePriv(db.getFullName(), table.getName());
// check columnNames
if (columnNames != null) {
isAllColumns = false;
for (String cName : columnNames) {
if (table.getColumn(cName) == null) {
ErrorReport.reportAnalysisException(
@ -115,6 +121,7 @@ public class DropStatsStmt extends DdlStmt {
}
}
} else {
isAllColumns = true;
columnNames = table.getColumns().stream().map(Column::getName).collect(Collectors.toSet());
}
}
@ -123,10 +130,22 @@ public class DropStatsStmt extends DdlStmt {
return tblId;
}
public long getDbId() {
return dbId;
}
public long getCatalogIdId() {
return catalogId;
}
public Set<String> getColumnNames() {
return columnNames;
}
public boolean isAllColumns() {
return isAllColumns;
}
public boolean dropTableRowCount() {
return dropTableRowCount;
}

View File

@ -95,9 +95,12 @@ import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.InvalidateStatsTarget;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticsCacheKey;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@ -3041,8 +3044,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
@Override
public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request) throws TException {
StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName);
InvalidateStatsTarget target = GsonUtils.GSON.fromJson(request.key, InvalidateStatsTarget.class);
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
TableStatsMeta tableStats = analysisManager.findTableStatsStatus(target.tableId);
if (tableStats == null) {
return new TStatus(TStatusCode.OK);
}
analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId, target.columns, tableStats);
return new TStatus(TStatusCode.OK);
}

View File

@ -58,6 +58,9 @@ import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@ -646,23 +649,16 @@ public class AnalysisManager implements Writable {
}
Set<String> cols = dropStatsStmt.getColumnNames();
long catalogId = dropStatsStmt.getCatalogIdId();
long dbId = dropStatsStmt.getDbId();
long tblId = dropStatsStmt.getTblId();
TableStatsMeta tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
}
if (cols == null) {
tableStats.reset();
} else {
dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn);
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
for (String col : cols) {
statisticsCache.syncInvalidate(tblId, -1L, col);
}
tableStats.updatedTime = 0;
}
tableStats.userInjected = false;
logCreateTableStats(tableStats);
invalidateLocalStats(catalogId, dbId, tblId, cols, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tblId, cols, dropStatsStmt.isAllColumns());
StatisticsRepository.dropStatistics(tblId, cols);
}
@ -671,15 +667,55 @@ public class AnalysisManager implements Writable {
if (tableStats == null) {
return;
}
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
Set<String> cols = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
invalidateLocalStats(catalogId, dbId, tableId, cols, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
StatisticsRepository.dropStatistics(table.getId(), cols);
}
public void invalidateLocalStats(long catalogId, long dbId, long tableId,
Set<String> columns, TableStatsMeta tableStats) {
if (tableStats == null) {
return;
}
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
for (String col : cols) {
tableStats.removeColumn(col);
statisticsCache.syncInvalidate(table.getId(), -1L, col);
if (columns == null) {
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
columns = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
}
for (String column : columns) {
tableStats.removeColumn(column);
statisticsCache.invalidate(tableId, -1, column);
}
tableStats.updatedTime = 0;
logCreateTableStats(tableStats);
StatisticsRepository.dropStatistics(table.getId(), cols);
tableStats.userInjected = false;
}
public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
Set<String> columns, boolean isAllColumns) {
InvalidateStatsTarget target = new InvalidateStatsTarget(catalogId, dbId, tableId, columns, isAllColumns);
TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest();
request.key = GsonUtils.GSON.toJson(target);
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
boolean success = true;
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
// Skip master
if (selfNode.equals(frontend.getHost())) {
continue;
}
success = success && statisticsCache.invalidateStats(frontend, request);
}
if (!success) {
// If any rpc failed, use edit log to sync table stats to non-master FEs.
LOG.warn("Failed to invalidate all remote stats by rpc for table {}, use edit log.", tableId);
TableStatsMeta tableStats = findTableStatsStatus(tableId);
logCreateTableStats(tableStats);
}
}
public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) throws DdlException {

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import com.google.gson.annotations.SerializedName;
import java.util.Set;
public class InvalidateStatsTarget {
@SerializedName("catalogId")
public final long catalogId;
@SerializedName("dbId")
public final long dbId;
@SerializedName("tableId")
public final long tableId;
@SerializedName("columns")
public final Set<String> columns;
public InvalidateStatsTarget(long catalogId, long dbId, long tableId, Set<String> columns, boolean isAllColumns) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
if (isAllColumns) {
this.columns = null;
} else {
this.columns = columns;
}
}
}

View File

@ -137,19 +137,6 @@ public class StatisticsCache {
columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
}
public void syncInvalidate(long tblId, long idxId, String colName) {
StatisticsCacheKey cacheKey = new StatisticsCacheKey(tblId, idxId, colName);
columnStatisticsCache.synchronous().invalidate(cacheKey);
TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest();
request.key = GsonUtils.GSON.toJson(cacheKey);
for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
if (StatisticsUtil.isMaster(frontend)) {
continue;
}
invalidateStats(frontend, request);
}
}
public void updateColStatsCache(long tblId, long idxId, String colName, ColumnStatistic statistic) {
columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic));
}
@ -261,7 +248,7 @@ public class StatisticsCache {
}
@VisibleForTesting
public void invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) {
public boolean invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
FrontendService.Client client = null;
try {
@ -269,11 +256,13 @@ public class StatisticsCache {
client.invalidateStatsCache(request);
} catch (Throwable t) {
LOG.warn("Failed to sync invalidate to follower: {}", address, t);
return false;
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
return true;
}
public void putCache(StatisticsCacheKey k, ColumnStatistic c) {