[enhancement](nereids) Sync stats across FE cluster after analyze #21482

Before this PR, if user connect to follower and analyze table, stats would not get cached in follower FE, since Analyze stmt would be forwarded to master, and in follower it's still lazy load to cache.After this PR, once analyze finished on master, master would sync stats to all followers and update follower's stats cache
Load partition stats to col stats
This commit is contained in:
AKIRA
2023-07-11 20:09:02 +08:00
committed by GitHub
parent 8ffa21a157
commit ed410034c6
12 changed files with 201 additions and 59 deletions

View File

@ -64,6 +64,7 @@ import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
@ -71,6 +72,8 @@ import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.StatisticsCacheKey;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
@ -164,6 +167,7 @@ import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableQueryStats;
import org.apache.doris.thrift.TTableStatus;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.DatabaseTransactionMgr;
@ -2670,4 +2674,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
@Override
public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException {
StatisticsCacheKey key = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
ColumnStatistic columnStatistic = GsonUtils.GSON.fromJson(request.colStats, ColumnStatistic.class);
Env.getCurrentEnv().getStatisticsCache().putCache(key, columnStatistic);
return new TStatus(TStatusCode.OK);
}
}

View File

@ -21,14 +21,20 @@ import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ColumnStatistic {
@ -69,12 +75,19 @@ public class ColumnStatistic {
UNSUPPORTED_TYPE.add(Type.LAMBDA_FUNCTION);
}
@SerializedName("count")
public final double count;
@SerializedName("ndv")
public final double ndv;
@SerializedName("numNulls")
public final double numNulls;
@SerializedName("dataSize")
public final double dataSize;
@SerializedName("avgSizeByte")
public final double avgSizeByte;
@SerializedName("minValue")
public final double minValue;
@SerializedName("maxValue")
public final double maxValue;
public final boolean isUnKnown;
/*
@ -102,9 +115,12 @@ public class ColumnStatistic {
public final LiteralExpr minExpr;
public final LiteralExpr maxExpr;
@SerializedName("histogram")
// assign value when do stats estimation.
public final Histogram histogram;
public final Map<Long, ColumnStatistic> partitionIdToColStats = new HashMap<>();
public ColumnStatistic(double count, double ndv, ColumnStatistic original, double avgSizeByte,
double numNulls, double dataSize, double minValue, double maxValue,
double selectivity, LiteralExpr minExpr, LiteralExpr maxExpr, boolean isUnKnown, Histogram histogram) {
@ -123,6 +139,27 @@ public class ColumnStatistic {
this.histogram = histogram;
}
public static ColumnStatistic fromResultRow(List<ResultRow> resultRows) {
Map<Long, ColumnStatistic> partitionIdToColStats = new HashMap<>();
ColumnStatistic columnStatistic = null;
try {
for (ResultRow resultRow : resultRows) {
String partId = resultRow.getColumnValue("part_id");
if (partId == null) {
columnStatistic = fromResultRow(resultRow);
} else {
partitionIdToColStats.put(Long.parseLong(partId), fromResultRow(resultRow));
}
}
} catch (Throwable t) {
LOG.warn("Failed to deserialize column stats", t);
return ColumnStatistic.UNKNOWN;
}
Preconditions.checkState(columnStatistic != null, "Column stats is null");
columnStatistic.partitionIdToColStats.putAll(partitionIdToColStats);
return columnStatistic;
}
// TODO: use thrift
public static ColumnStatistic fromResultRow(ResultRow resultRow) {
try {
@ -138,7 +175,8 @@ public class ColumnStatistic {
columnStatisticBuilder.setNumNulls(Double.parseDouble(nullCount));
columnStatisticBuilder.setDataSize(Double
.parseDouble(resultRow.getColumnValueWithDefault("data_size_in_bytes", "0")));
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getCount() == 0
? 0 : columnStatisticBuilder.getDataSize()
/ columnStatisticBuilder.getCount());
long catalogId = Long.parseLong(resultRow.getColumnValue("catalog_id"));
long idxId = Long.parseLong(resultRow.getColumnValue("idx_id"));
@ -385,4 +423,13 @@ public class ColumnStatistic {
public boolean isUnKnown() {
return isUnKnown;
}
public void loadPartitionStats(long tableId, long idxId, String colName) throws DdlException {
List<ResultRow> resultRows = StatisticsRepository.loadPartStats(tableId, idxId, colName);
for (ResultRow resultRow : resultRows) {
String partId = resultRow.getColumnValue("part_id");
ColumnStatistic columnStatistic = ColumnStatistic.fromResultRow(resultRow);
partitionIdToColStats.put(Long.parseLong(partId), columnStatistic);
}
}
}

View File

@ -19,33 +19,24 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.FeConstants;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader<Optional<ColumnStatistic>> {
private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class);
private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
+ "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')";
@Override
protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
// Load from statistics table.
Optional<ColumnStatistic> columnStatistic = loadFromStatsTable(String.valueOf(key.tableId),
String.valueOf(key.idxId), key.colName);
Optional<ColumnStatistic> columnStatistic = loadFromStatsTable(key.tableId,
key.idxId, key.colName);
if (columnStatistic.isPresent()) {
return columnStatistic;
}
@ -61,26 +52,19 @@ public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader<Optional<
return columnStatistic;
}
private Optional<ColumnStatistic> loadFromStatsTable(String tableId, String idxId, String colName) {
Map<String, String> params = new HashMap<>();
params.put("tblId", tableId);
params.put("idxId", idxId);
params.put("colId", colName);
List<ColumnStatistic> columnStatistics;
List<ResultRow> columnResult =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(QUERY_COLUMN_STATISTICS));
private Optional<ColumnStatistic> loadFromStatsTable(long tableId, long idxId, String colName) {
List<ResultRow> columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName);
ColumnStatistic columnStatistics;
try {
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult);
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResults);
} catch (Exception e) {
LOG.warn("Exception to deserialize column statistics", e);
return Optional.empty();
}
if (CollectionUtils.isEmpty(columnStatistics)) {
if (columnStatistics == null) {
return Optional.empty();
} else {
return Optional.of(columnStatistics.get(0));
return Optional.of(columnStatistics);
}
}
}

View File

@ -179,4 +179,9 @@ public class Histogram {
Bucket lastBucket = buckets.get(buckets.size() - 1);
return lastBucket.preSum + lastBucket.count;
}
@Override
public String toString() {
return serializeToJson(this);
}
}

View File

@ -97,7 +97,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
execSQL(sql);
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName());
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}
@VisibleForTesting

View File

@ -17,11 +17,19 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
@ -97,7 +105,7 @@ public class StatisticsCache {
}
public Optional<ColumnStatistic> getColumnStatistics(long catalogId, long dbId,
long tblId, long idxId, String colName) {
long tblId, long idxId, String colName) {
ConnectContext ctx = ConnectContext.get();
if (ctx != null && ctx.getSessionVariable().internalSession) {
return Optional.empty();
@ -203,42 +211,78 @@ public class StatisticsCache {
}
for (ResultRow r : recentStatsUpdatedCols) {
try {
String tblId = r.getColumnValue("tbl_id");
String idxId = r.getColumnValue("idx_id");
long tblId = Long.parseLong(r.getColumnValue("tbl_id"));
long idxId = Long.parseLong(r.getColumnValue("idx_id"));
String colId = r.getColumnValue("col_id");
final StatisticsCacheKey k =
new StatisticsCacheKey(Long.parseLong(tblId), Long.parseLong(idxId), colId);
new StatisticsCacheKey(tblId, idxId, colId);
final ColumnStatistic c = ColumnStatistic.fromResultRow(r);
CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>() {
@Override
public Optional<ColumnStatistic> get() throws InterruptedException, ExecutionException {
return Optional.of(c);
}
@Override
public boolean isDone() {
return true;
}
@Override
public boolean complete(Optional<ColumnStatistic> value) {
return true;
}
@Override
public Optional<ColumnStatistic> join() {
return Optional.of(c);
}
};
if (c.isUnKnown) {
continue;
}
columnStatisticsCache.put(k, f);
c.loadPartitionStats(tblId, idxId, colId);
putCache(k, c);
} catch (Throwable t) {
LOG.warn("Error when preheating stats cache", t);
}
}
}
public void syncLoadColStats(long tableId, long idxId, String colName) {
List<ResultRow> columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName);
for (ResultRow r : columnResults) {
final StatisticsCacheKey k =
new StatisticsCacheKey(tableId, idxId, colName);
final ColumnStatistic c = ColumnStatistic.fromResultRow(r);
if (c == ColumnStatistic.UNKNOWN) {
continue;
}
putCache(k, c);
TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest = new TUpdateFollowerStatsCacheRequest();
updateFollowerStatsCacheRequest.key = GsonUtils.GSON.toJson(k);
updateFollowerStatsCacheRequest.colStats = GsonUtils.GSON.toJson(c);
for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(),
frontend.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
client.updateStatsCache(updateFollowerStatsCacheRequest);
} catch (Throwable t) {
LOG.warn("Failed to sync stats to follower: {}", address, t);
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
}
}
}
public void putCache(StatisticsCacheKey k, ColumnStatistic c) {
CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>() {
@Override
public Optional<ColumnStatistic> get() throws InterruptedException, ExecutionException {
return Optional.of(c);
}
@Override
public boolean isDone() {
return true;
}
@Override
public boolean complete(Optional<ColumnStatistic> value) {
return true;
}
@Override
public Optional<ColumnStatistic> join() {
return Optional.of(c);
}
};
if (c.isUnKnown) {
return;
}
columnStatisticsCache.put(k, f);
}
}

View File

@ -17,6 +17,8 @@
package org.apache.doris.statistics;
import com.google.gson.annotations.SerializedName;
import java.util.Objects;
import java.util.StringJoiner;
@ -26,10 +28,15 @@ public class StatisticsCacheKey {
* May be index id either, since they are natively same in the code.
* catalogId and dbId are not included in the hashCode. Because tableId is globally unique.
*/
@SerializedName("catalogId")
public final long catalogId;
@SerializedName("dbId")
public final long dbId;
@SerializedName("tableId")
public final long tableId;
@SerializedName("idxId")
public final long idxId;
@SerializedName("colName")
public final String colName;
private static final String DELIMITER = "-";

View File

@ -120,6 +120,15 @@ public class StatisticsRepository {
+ " WHERE tbl_id = ${tblId}"
+ " AND part_id IS NOT NULL";
private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
+ "tbl_id=${tblId} AND idx_id=${idxId} AND col_id='${colId}'";
private static final String QUERY_PARTITION_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
+ " tbl_id=${tblId} AND idx_id=${idxId} AND col_id='${colId}' "
+ " AND part_id IS NOT NULL";
public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) {
ResultRow resultRow = queryColumnStatisticById(tableId, colName);
if (resultRow == null) {
@ -420,4 +429,24 @@ public class StatisticsRepository {
return idToPartitionTableStats;
}
public static List<ResultRow> loadColStats(long tableId, long idxId, String colName) {
Map<String, String> params = new HashMap<>();
params.put("tblId", String.valueOf(tableId));
params.put("idxId", String.valueOf(idxId));
params.put("colId", colName);
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(QUERY_COLUMN_STATISTICS));
}
public static List<ResultRow> loadPartStats(long tableId, long idxId, String colName) {
Map<String, String> params = new HashMap<>();
params.put("tblId", String.valueOf(tableId));
params.put("idxId", String.valueOf(idxId));
params.put("colId", colName);
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(QUERY_PARTITION_STATISTICS));
}
}

View File

@ -148,12 +148,12 @@ public class StatisticsUtil {
.collect(Collectors.toList());
}
public static List<ColumnStatistic> deserializeToColumnStatistics(List<ResultRow> resultBatches)
public static ColumnStatistic deserializeToColumnStatistics(List<ResultRow> resultBatches)
throws Exception {
if (CollectionUtils.isEmpty(resultBatches)) {
return Collections.emptyList();
return null;
}
return resultBatches.stream().map(ColumnStatistic::fromResultRow).collect(Collectors.toList());
return ColumnStatistic.fromResultRow(resultBatches);
}
public static List<Histogram> deserializeToHistogramStatistics(List<ResultRow> resultBatches)

View File

@ -101,6 +101,7 @@ public class CacheTest extends TestWithFeService {
colNames.add("col_id");
colNames.add("min");
colNames.add("max");
colNames.add("part_id");
List<PrimitiveType> primitiveTypes = new ArrayList<>();
primitiveTypes.add(PrimitiveType.BIGINT);
primitiveTypes.add(PrimitiveType.BIGINT);
@ -113,6 +114,7 @@ public class CacheTest extends TestWithFeService {
primitiveTypes.add(PrimitiveType.VARCHAR);
primitiveTypes.add(PrimitiveType.VARCHAR);
primitiveTypes.add(PrimitiveType.VARCHAR);
primitiveTypes.add(PrimitiveType.BIGINT);
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
@ -125,6 +127,7 @@ public class CacheTest extends TestWithFeService {
values.add("8");
values.add("9");
values.add("10");
values.add(null);
ResultRow resultRow = new ResultRow(colNames, primitiveTypes, values);
return Arrays.asList(resultRow);
}

View File

@ -1055,6 +1055,11 @@ struct TGetBinlogLagResult {
2: optional i64 lag
}
struct TUpdateFollowerStatsCacheRequest {
1: optional string key;
2: optional string colStats;
}
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@ -1119,4 +1124,6 @@ service FrontendService {
TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request)
TGetBinlogLagResult getBinlogLag(1: TGetBinlogLagRequest request)
Status.TStatus updateStatsCache(1: TUpdateFollowerStatsCacheRequest request)
}

View File

@ -19,6 +19,10 @@ suite("test_analyze") {
String db = "regression_test_statistics"
String tbl = "analyzetestlimited_duplicate_all"
sql """
DROP TABLE IF EXISTS `${tbl}`
"""
sql """
CREATE TABLE IF NOT EXISTS `${tbl}` (
`analyzetestlimitedk3` int(11) null comment "",