From 74dfdc00dcc1ee7066dcdea22bc482116ca57404 Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Sun, 19 Mar 2023 21:30:21 +0900 Subject: [PATCH] [nerids](statistics) remove lock in statistics cache loader #17833 remove the redandunt lock in the CacheLoader, since it use the forkjoinpool in default Add execute time log for collect stats Avoid submit duplicate task, when there already has a task to load for the same column --- .../statistics/StatisticsCacheLoader.java | 118 +++++++++--------- 1 file changed, 59 insertions(+), 59 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java index 73d38b7db8..c592a9b4eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java @@ -33,6 +33,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; public class StatisticsCacheLoader implements AsyncCacheLoader { @@ -47,70 +49,68 @@ public class StatisticsCacheLoader implements AsyncCacheLoader> + inProgressing = new ConcurrentHashMap<>(); + @Override public @NonNull CompletableFuture asyncLoad(@NonNull StatisticsCacheKey key, @NonNull Executor executor) { - synchronized (LOCK) { - if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) { - try { - LOCK.wait(); - } catch (InterruptedException e) { - LOG.warn("Ignore interruption", e); - } - } - CUR_RUNNING_LOAD++; - return CompletableFuture.supplyAsync(() -> { - ColumnLevelStatisticCache statistic = new ColumnLevelStatisticCache(); - try { - Map params = new HashMap<>(); - params.put("tblId", String.valueOf(key.tableId)); - params.put("idxId", String.valueOf(key.idxId)); - params.put("colId", String.valueOf(key.colName)); - - List columnStatistics; - List columnResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(QUERY_COLUMN_STATISTICS)); - try { - columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult); - } catch (Exception e) { - LOG.warn("Failed to deserialize column statistics", e); - throw new CompletionException(e); - } - if (CollectionUtils.isEmpty(columnStatistics)) { - statistic.setColumnStatistic(ColumnStatistic.UNKNOWN); - } else { - statistic.setColumnStatistic(columnStatistics.get(0)); - } - - List histogramStatistics; - List histogramResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(QUERY_HISTOGRAM_STATISTICS)); - try { - histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult); - } catch (Exception e) { - LOG.warn("Failed to deserialize histogram statistics", e); - throw new CompletionException(e); - } - if (!CollectionUtils.isEmpty(histogramStatistics)) { - statistic.setHistogram(histogramStatistics.get(0)); - } - } finally { - synchronized (LOCK) { - CUR_RUNNING_LOAD--; - LOCK.notify(); - } - } - - return statistic; - }); + CompletableFuture future = inProgressing.get(key); + if (future != null) { + return future; } + future = CompletableFuture.supplyAsync(() -> { + long startTime = System.currentTimeMillis(); + try { + LOG.info("Query BE for column stats:{}-{} start time:{}", key.tableId, key.colName, + startTime); + ColumnLevelStatisticCache statistic = new ColumnLevelStatisticCache(); + Map params = new HashMap<>(); + params.put("tblId", String.valueOf(key.tableId)); + params.put("idxId", String.valueOf(key.idxId)); + params.put("colId", String.valueOf(key.colName)); + + List columnStatistics; + List columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_COLUMN_STATISTICS)); + try { + columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult); + } catch (Exception e) { + LOG.warn("Failed to deserialize column statistics", e); + throw new CompletionException(e); + } + if (CollectionUtils.isEmpty(columnStatistics)) { + statistic.setColumnStatistic(ColumnStatistic.UNKNOWN); + } else { + statistic.setColumnStatistic(columnStatistics.get(0)); + } + + List histogramStatistics; + List histogramResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_HISTOGRAM_STATISTICS)); + try { + histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult); + } catch (Exception e) { + LOG.warn("Failed to deserialize histogram statistics", e); + throw new CompletionException(e); + } + if (!CollectionUtils.isEmpty(histogramStatistics)) { + statistic.setHistogram(histogramStatistics.get(0)); + } + return statistic; + } finally { + long endTime = System.currentTimeMillis(); + LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName, + endTime, endTime - startTime); + inProgressing.remove(key); + } + }); + inProgressing.put(key, future); + return future; } }