diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 6d9a6a4615..41d4a5e56e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -64,6 +64,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.master.MasterImpl; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; @@ -71,6 +72,8 @@ import org.apache.doris.qe.DdlExecutor; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.VariableMgr; +import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.StatisticsCacheKey; import org.apache.doris.statistics.query.QueryStats; import org.apache.doris.system.Backend; import org.apache.doris.system.Frontend; @@ -164,6 +167,7 @@ import org.apache.doris.thrift.TTableIndexQueryStats; import org.apache.doris.thrift.TTableQueryStats; import org.apache.doris.thrift.TTableStatus; import org.apache.doris.thrift.TUpdateExportTaskStatusRequest; +import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest; import org.apache.doris.thrift.TWaitingTxnStatusRequest; import org.apache.doris.thrift.TWaitingTxnStatusResult; import org.apache.doris.transaction.DatabaseTransactionMgr; @@ -2670,4 +2674,12 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + + @Override + public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException { + StatisticsCacheKey key = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); + ColumnStatistic columnStatistic = GsonUtils.GSON.fromJson(request.colStats, ColumnStatistic.class); + Env.getCurrentEnv().getStatisticsCache().putCache(key, columnStatistic); + return new TStatus(TStatusCode.OK); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index 47ab57437c..744c4af7a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -21,14 +21,20 @@ import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; public class ColumnStatistic { @@ -69,12 +75,19 @@ public class ColumnStatistic { UNSUPPORTED_TYPE.add(Type.LAMBDA_FUNCTION); } + @SerializedName("count") public final double count; + @SerializedName("ndv") public final double ndv; + @SerializedName("numNulls") public final double numNulls; + @SerializedName("dataSize") public final double dataSize; + @SerializedName("avgSizeByte") public final double avgSizeByte; + @SerializedName("minValue") public final double minValue; + @SerializedName("maxValue") public final double maxValue; public final boolean isUnKnown; /* @@ -102,9 +115,12 @@ public class ColumnStatistic { public final LiteralExpr minExpr; public final LiteralExpr maxExpr; + @SerializedName("histogram") // assign value when do stats estimation. public final Histogram histogram; + public final Map partitionIdToColStats = new HashMap<>(); + public ColumnStatistic(double count, double ndv, ColumnStatistic original, double avgSizeByte, double numNulls, double dataSize, double minValue, double maxValue, double selectivity, LiteralExpr minExpr, LiteralExpr maxExpr, boolean isUnKnown, Histogram histogram) { @@ -123,6 +139,27 @@ public class ColumnStatistic { this.histogram = histogram; } + public static ColumnStatistic fromResultRow(List resultRows) { + Map partitionIdToColStats = new HashMap<>(); + ColumnStatistic columnStatistic = null; + try { + for (ResultRow resultRow : resultRows) { + String partId = resultRow.getColumnValue("part_id"); + if (partId == null) { + columnStatistic = fromResultRow(resultRow); + } else { + partitionIdToColStats.put(Long.parseLong(partId), fromResultRow(resultRow)); + } + } + } catch (Throwable t) { + LOG.warn("Failed to deserialize column stats", t); + return ColumnStatistic.UNKNOWN; + } + Preconditions.checkState(columnStatistic != null, "Column stats is null"); + columnStatistic.partitionIdToColStats.putAll(partitionIdToColStats); + return columnStatistic; + } + // TODO: use thrift public static ColumnStatistic fromResultRow(ResultRow resultRow) { try { @@ -138,7 +175,8 @@ public class ColumnStatistic { columnStatisticBuilder.setNumNulls(Double.parseDouble(nullCount)); columnStatisticBuilder.setDataSize(Double .parseDouble(resultRow.getColumnValueWithDefault("data_size_in_bytes", "0"))); - columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize() + columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getCount() == 0 + ? 0 : columnStatisticBuilder.getDataSize() / columnStatisticBuilder.getCount()); long catalogId = Long.parseLong(resultRow.getColumnValue("catalog_id")); long idxId = Long.parseLong(resultRow.getColumnValue("idx_id")); @@ -385,4 +423,13 @@ public class ColumnStatistic { public boolean isUnKnown() { return isUnKnown; } + + public void loadPartitionStats(long tableId, long idxId, String colName) throws DdlException { + List resultRows = StatisticsRepository.loadPartStats(tableId, idxId, colName); + for (ResultRow resultRow : resultRows) { + String partId = resultRow.getColumnValue("part_id"); + ColumnStatistic columnStatistic = ColumnStatistic.fromResultRow(resultRow); + partitionIdToColStats.put(Long.parseLong(partId), columnStatistic); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java index ecd082aa11..d94a90b75f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java @@ -19,33 +19,24 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.FeConstants; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader> { private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class); - private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME - + "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE " - + "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')"; - @Override protected Optional doLoad(StatisticsCacheKey key) { // Load from statistics table. - Optional columnStatistic = loadFromStatsTable(String.valueOf(key.tableId), - String.valueOf(key.idxId), key.colName); + Optional columnStatistic = loadFromStatsTable(key.tableId, + key.idxId, key.colName); if (columnStatistic.isPresent()) { return columnStatistic; } @@ -61,26 +52,19 @@ public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader loadFromStatsTable(String tableId, String idxId, String colName) { - Map params = new HashMap<>(); - params.put("tblId", tableId); - params.put("idxId", idxId); - params.put("colId", colName); - - List columnStatistics; - List columnResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(QUERY_COLUMN_STATISTICS)); + private Optional loadFromStatsTable(long tableId, long idxId, String colName) { + List columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName); + ColumnStatistic columnStatistics; try { - columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult); + columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResults); } catch (Exception e) { LOG.warn("Exception to deserialize column statistics", e); return Optional.empty(); } - if (CollectionUtils.isEmpty(columnStatistics)) { + if (columnStatistics == null) { return Optional.empty(); } else { - return Optional.of(columnStatistics.get(0)); + return Optional.of(columnStatistics); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Histogram.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Histogram.java index b346e3f8fb..05e2c199ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/Histogram.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Histogram.java @@ -179,4 +179,9 @@ public class Histogram { Bucket lastBucket = buckets.get(buckets.size() - 1); return lastBucket.preSum + lastBucket.count; } + + @Override + public String toString() { + return serializeToJson(this); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index e200c7befa..1f378d21c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -97,7 +97,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask { StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); execSQL(sql); - Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName()); + Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName()); } @VisibleForTesting diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index f46e19f529..284083ac01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -17,11 +17,19 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.ha.FrontendNodeType; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.system.Frontend; +import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -97,7 +105,7 @@ public class StatisticsCache { } public Optional getColumnStatistics(long catalogId, long dbId, - long tblId, long idxId, String colName) { + long tblId, long idxId, String colName) { ConnectContext ctx = ConnectContext.get(); if (ctx != null && ctx.getSessionVariable().internalSession) { return Optional.empty(); @@ -203,42 +211,78 @@ public class StatisticsCache { } for (ResultRow r : recentStatsUpdatedCols) { try { - String tblId = r.getColumnValue("tbl_id"); - String idxId = r.getColumnValue("idx_id"); + long tblId = Long.parseLong(r.getColumnValue("tbl_id")); + long idxId = Long.parseLong(r.getColumnValue("idx_id")); String colId = r.getColumnValue("col_id"); final StatisticsCacheKey k = - new StatisticsCacheKey(Long.parseLong(tblId), Long.parseLong(idxId), colId); + new StatisticsCacheKey(tblId, idxId, colId); final ColumnStatistic c = ColumnStatistic.fromResultRow(r); - CompletableFuture> f = new CompletableFuture>() { - - @Override - public Optional get() throws InterruptedException, ExecutionException { - return Optional.of(c); - } - - @Override - public boolean isDone() { - return true; - } - - @Override - public boolean complete(Optional value) { - return true; - } - - @Override - public Optional join() { - return Optional.of(c); - } - }; - if (c.isUnKnown) { - continue; - } - columnStatisticsCache.put(k, f); + c.loadPartitionStats(tblId, idxId, colId); + putCache(k, c); } catch (Throwable t) { LOG.warn("Error when preheating stats cache", t); } } } + public void syncLoadColStats(long tableId, long idxId, String colName) { + List columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName); + for (ResultRow r : columnResults) { + final StatisticsCacheKey k = + new StatisticsCacheKey(tableId, idxId, colName); + final ColumnStatistic c = ColumnStatistic.fromResultRow(r); + if (c == ColumnStatistic.UNKNOWN) { + continue; + } + putCache(k, c); + TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest = new TUpdateFollowerStatsCacheRequest(); + updateFollowerStatsCacheRequest.key = GsonUtils.GSON.toJson(k); + updateFollowerStatsCacheRequest.colStats = GsonUtils.GSON.toJson(c); + for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) { + TNetworkAddress address = new TNetworkAddress(frontend.getHost(), + frontend.getRpcPort()); + FrontendService.Client client = null; + try { + client = ClientPool.frontendPool.borrowObject(address); + client.updateStatsCache(updateFollowerStatsCacheRequest); + } catch (Throwable t) { + LOG.warn("Failed to sync stats to follower: {}", address, t); + } finally { + if (client != null) { + ClientPool.frontendPool.returnObject(address, client); + } + } + } + } + } + + public void putCache(StatisticsCacheKey k, ColumnStatistic c) { + CompletableFuture> f = new CompletableFuture>() { + + @Override + public Optional get() throws InterruptedException, ExecutionException { + return Optional.of(c); + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public boolean complete(Optional value) { + return true; + } + + @Override + public Optional join() { + return Optional.of(c); + } + }; + if (c.isUnKnown) { + return; + } + columnStatisticsCache.put(k, f); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheKey.java index 8851abc22f..e254adf4e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheKey.java @@ -17,6 +17,8 @@ package org.apache.doris.statistics; +import com.google.gson.annotations.SerializedName; + import java.util.Objects; import java.util.StringJoiner; @@ -26,10 +28,15 @@ public class StatisticsCacheKey { * May be index id either, since they are natively same in the code. * catalogId and dbId are not included in the hashCode. Because tableId is globally unique. */ + @SerializedName("catalogId") public final long catalogId; + @SerializedName("dbId") public final long dbId; + @SerializedName("tableId") public final long tableId; + @SerializedName("idxId") public final long idxId; + @SerializedName("colName") public final String colName; private static final String DELIMITER = "-"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index 53ea8e7fd2..d20bb358c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -120,6 +120,15 @@ public class StatisticsRepository { + " WHERE tbl_id = ${tblId}" + " AND part_id IS NOT NULL"; + private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME + + "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE " + + "tbl_id=${tblId} AND idx_id=${idxId} AND col_id='${colId}'"; + + private static final String QUERY_PARTITION_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME + + "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE " + + " tbl_id=${tblId} AND idx_id=${idxId} AND col_id='${colId}' " + + " AND part_id IS NOT NULL"; + public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) { ResultRow resultRow = queryColumnStatisticById(tableId, colName); if (resultRow == null) { @@ -420,4 +429,24 @@ public class StatisticsRepository { return idToPartitionTableStats; } + + public static List loadColStats(long tableId, long idxId, String colName) { + Map params = new HashMap<>(); + params.put("tblId", String.valueOf(tableId)); + params.put("idxId", String.valueOf(idxId)); + params.put("colId", colName); + + return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_COLUMN_STATISTICS)); + } + + public static List loadPartStats(long tableId, long idxId, String colName) { + Map params = new HashMap<>(); + params.put("tblId", String.valueOf(tableId)); + params.put("idxId", String.valueOf(idxId)); + params.put("colId", colName); + + return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_PARTITION_STATISTICS)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 6e773fc430..eef64ef1c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -148,12 +148,12 @@ public class StatisticsUtil { .collect(Collectors.toList()); } - public static List deserializeToColumnStatistics(List resultBatches) + public static ColumnStatistic deserializeToColumnStatistics(List resultBatches) throws Exception { if (CollectionUtils.isEmpty(resultBatches)) { - return Collections.emptyList(); + return null; } - return resultBatches.stream().map(ColumnStatistic::fromResultRow).collect(Collectors.toList()); + return ColumnStatistic.fromResultRow(resultBatches); } public static List deserializeToHistogramStatistics(List resultBatches) diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java index eacab62034..6ac8c7432a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java @@ -101,6 +101,7 @@ public class CacheTest extends TestWithFeService { colNames.add("col_id"); colNames.add("min"); colNames.add("max"); + colNames.add("part_id"); List primitiveTypes = new ArrayList<>(); primitiveTypes.add(PrimitiveType.BIGINT); primitiveTypes.add(PrimitiveType.BIGINT); @@ -113,6 +114,7 @@ public class CacheTest extends TestWithFeService { primitiveTypes.add(PrimitiveType.VARCHAR); primitiveTypes.add(PrimitiveType.VARCHAR); primitiveTypes.add(PrimitiveType.VARCHAR); + primitiveTypes.add(PrimitiveType.BIGINT); List values = new ArrayList<>(); values.add("1"); values.add("2"); @@ -125,6 +127,7 @@ public class CacheTest extends TestWithFeService { values.add("8"); values.add("9"); values.add("10"); + values.add(null); ResultRow resultRow = new ResultRow(colNames, primitiveTypes, values); return Arrays.asList(resultRow); } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 51dadbd8b8..0b74c10fba 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1055,6 +1055,11 @@ struct TGetBinlogLagResult { 2: optional i64 lag } +struct TUpdateFollowerStatsCacheRequest { + 1: optional string key; + 2: optional string colStats; +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1119,4 +1124,6 @@ service FrontendService { TGetMasterTokenResult getMasterToken(1: TGetMasterTokenRequest request) TGetBinlogLagResult getBinlogLag(1: TGetBinlogLagRequest request) + + Status.TStatus updateStatsCache(1: TUpdateFollowerStatsCacheRequest request) } diff --git a/regression-test/suites/statistics/analyze_stats.groovy b/regression-test/suites/statistics/analyze_stats.groovy index b486385acc..7233b13929 100644 --- a/regression-test/suites/statistics/analyze_stats.groovy +++ b/regression-test/suites/statistics/analyze_stats.groovy @@ -19,6 +19,10 @@ suite("test_analyze") { String db = "regression_test_statistics" String tbl = "analyzetestlimited_duplicate_all" + sql """ + DROP TABLE IF EXISTS `${tbl}` + """ + sql """ CREATE TABLE IF NOT EXISTS `${tbl}` ( `analyzetestlimitedk3` int(11) null comment "",