diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java index e7e488a6e7..ac5896bb06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BasicAsyncCacheLoader.java @@ -22,26 +22,17 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.checkerframework.checker.nullness.qual.NonNull; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; public abstract class BasicAsyncCacheLoader implements AsyncCacheLoader { private static final Logger LOG = LogManager.getLogger(BasicAsyncCacheLoader.class); - private final Map> inProgressing = new HashMap<>(); - @Override public @NonNull CompletableFuture asyncLoad( @NonNull K key, @NonNull Executor executor) { - CompletableFutureWithCreateTime cfWrapper = inProgressing.get(key); - if (cfWrapper != null) { - return cfWrapper.cf; - } CompletableFuture future = CompletableFuture.supplyAsync(() -> { long startTime = System.currentTimeMillis(); try { @@ -49,49 +40,10 @@ public abstract class BasicAsyncCacheLoader implements AsyncCacheLoader(System.currentTimeMillis(), future)); return future; } protected abstract V doLoad(K k); - - private static class CompletableFutureWithCreateTime extends CompletableFuture { - - public final long startTime; - public final CompletableFuture cf; - private final long expiredTimeMilli = TimeUnit.MINUTES.toMillis(30); - - public CompletableFutureWithCreateTime(long startTime, CompletableFuture cf) { - this.startTime = startTime; - this.cf = cf; - } - - public boolean isExpired() { - return System.currentTimeMillis() - startTime > expiredTimeMilli; - } - } - - private void putIntoIProgressing(K k, CompletableFutureWithCreateTime v) { - synchronized (inProgressing) { - inProgressing.put(k, v); - } - } - - private void removeFromIProgressing(K k) { - synchronized (inProgressing) { - inProgressing.remove(k); - } - } - - public void removeExpiredInProgressing() { - // Quite simple logic that would complete very fast. - // Lock on object to avoid ConcurrentModificationException. - synchronized (inProgressing) { - inProgressing.entrySet().removeIf(e -> e.getValue().isExpired()); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java index e33cff3107..a281f9b0ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java @@ -18,7 +18,6 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.qe.InternalQueryExecutionException; import org.apache.doris.statistics.util.StatisticsUtil; @@ -27,19 +26,11 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Optional; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; public class ColumnStatisticsCacheLoader extends BasicAsyncCacheLoader> { private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class); - private static final ThreadPoolExecutor singleThreadPool = ThreadPoolManager.newDaemonFixedThreadPool( - StatisticConstants.RETRY_LOAD_THREAD_POOL_SIZE, - StatisticConstants.RETRY_LOAD_QUEUE_SIZE, "STATS_RELOAD", - true, - new DiscardOldestPolicy()); - @Override protected Optional doLoad(StatisticsCacheKey key) { Optional columnStatistic = Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 20f25eb3e9..62e11f5c9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -44,7 +44,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; public class StatisticsCache { @@ -75,21 +74,6 @@ public class StatisticsCache { .executor(threadPool) .buildAsync(histogramCacheLoader); - { - threadPool.submit(() -> { - while (true) { - try { - columnStatisticsCacheLoader.removeExpiredInProgressing(); - histogramCacheLoader.removeExpiredInProgressing(); - } catch (Throwable t) { - // IGNORE - } - Thread.sleep(TimeUnit.MINUTES.toMillis(15)); - } - - }); - } - public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId, long idxId, String colName) { ConnectContext ctx = ConnectContext.get(); if (ctx != null && ctx.getSessionVariable().internalSession) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java index 00d233ff54..f23b93624b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java @@ -266,7 +266,7 @@ public class CacheTest extends TestWithFeService { Assertions.assertEquals(6, columnStatistic.minValue); Assertions.assertEquals(7, columnStatistic.maxValue); } else { - System.out.println("Cached is not loaded, skip test."); + System.out.println("Cache is not loaded, skip test."); } } catch (Throwable t) { t.printStackTrace();