From dc7b2015f535f08414ae82c8b2d1c8701371f6aa Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Mon, 27 Mar 2023 12:09:35 +0900 Subject: [PATCH] eh (#18122) --- .../doris/statistics/StatisticsCache.java | 32 +++++++++- .../statistics/StatisticsCacheLoader.java | 62 +++++++++++++++---- 2 files changed, 81 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index c6486459a6..9add3c1bc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -17,6 +17,9 @@ package org.apache.doris.statistics; +//import org.apache.doris.common.ThreadPoolManager; + +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.qe.ConnectContext; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; @@ -26,16 +29,43 @@ import org.apache.logging.log4j.Logger; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class StatisticsCache { private static final Logger LOG = LogManager.getLogger(StatisticsCache.class); + /** + * Use a standalone thread pool to avoid interference between this and any other jdk function + * that use the thread of ForkJoinPool#common in the system. + */ + private final ThreadPoolExecutor threadPool + = ThreadPoolManager.newDaemonFixedThreadPool( + 10, Integer.MAX_VALUE, "STATS_FETCH", true); + + private final StatisticsCacheLoader cacheLoader = new StatisticsCacheLoader(); + private final AsyncLoadingCache cache = Caffeine.newBuilder() .maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE) .expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS)) .refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL)) - .buildAsync(new StatisticsCacheLoader()); + .executor(threadPool) + .buildAsync(cacheLoader); + + { + threadPool.submit(() -> { + while (true) { + try { + cacheLoader.removeExpiredInProgressing(); + Thread.sleep(TimeUnit.MINUTES.toMillis(15)); + } catch (Throwable t) { + // IGNORE + } + } + + }); + } public ColumnStatistic getColumnStatistics(long tblId, String colName) { ColumnLevelStatisticCache columnLevelStatisticCache = getColumnStatistics(tblId, -1, colName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java index c592a9b4eb..3417356b78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java @@ -33,9 +33,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; public class StatisticsCacheLoader implements AsyncCacheLoader { @@ -51,18 +50,17 @@ public class StatisticsCacheLoader implements AsyncCacheLoader> - inProgressing = new ConcurrentHashMap<>(); + private final Map + inProgressing = new HashMap<>(); @Override public @NonNull CompletableFuture asyncLoad(@NonNull StatisticsCacheKey key, @NonNull Executor executor) { - - CompletableFuture future = inProgressing.get(key); - if (future != null) { - return future; + CompletableFutureWithCreateTime cfWrapper = inProgressing.get(key); + if (cfWrapper != null) { + return cfWrapper.cf; } - future = CompletableFuture.supplyAsync(() -> { + CompletableFuture future = CompletableFuture.supplyAsync(() -> { long startTime = System.currentTimeMillis(); try { LOG.info("Query BE for column stats:{}-{} start time:{}", key.tableId, key.colName, @@ -107,10 +105,50 @@ public class StatisticsCacheLoader implements AsyncCacheLoader e.getValue().isExpired()); + } + } + + /** + * To make sure any item in the inProgressing would finally be removed to avoid potential mem leak. + */ + private static class CompletableFutureWithCreateTime extends CompletableFuture { + + private static final long EXPIRED_TIME_MILLI = TimeUnit.MINUTES.toMillis(30); + + public final long startTime; + public final CompletableFuture cf; + + public CompletableFutureWithCreateTime(long startTime, CompletableFuture cf) { + this.startTime = startTime; + this.cf = cf; + } + + public boolean isExpired() { + return System.currentTimeMillis() - startTime > EXPIRED_TIME_MILLI; + } + } }