This commit is contained in:
AKIRA
2023-03-27 12:09:35 +09:00
committed by GitHub
parent 2929a96224
commit dc7b2015f5
2 changed files with 81 additions and 13 deletions

View File

@ -17,6 +17,9 @@
package org.apache.doris.statistics;
//import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.qe.ConnectContext;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
@ -26,16 +29,43 @@ import org.apache.logging.log4j.Logger;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class StatisticsCache {
private static final Logger LOG = LogManager.getLogger(StatisticsCache.class);
/**
* Use a standalone thread pool to avoid interference between this and any other jdk function
* that use the thread of ForkJoinPool#common in the system.
*/
private final ThreadPoolExecutor threadPool
= ThreadPoolManager.newDaemonFixedThreadPool(
10, Integer.MAX_VALUE, "STATS_FETCH", true);
private final StatisticsCacheLoader cacheLoader = new StatisticsCacheLoader();
private final AsyncLoadingCache<StatisticsCacheKey, ColumnLevelStatisticCache> cache = Caffeine.newBuilder()
.maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
.expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
.buildAsync(new StatisticsCacheLoader());
.executor(threadPool)
.buildAsync(cacheLoader);
{
threadPool.submit(() -> {
while (true) {
try {
cacheLoader.removeExpiredInProgressing();
Thread.sleep(TimeUnit.MINUTES.toMillis(15));
} catch (Throwable t) {
// IGNORE
}
}
});
}
public ColumnStatistic getColumnStatistics(long tblId, String colName) {
ColumnLevelStatisticCache columnLevelStatisticCache = getColumnStatistics(tblId, -1, colName);

View File

@ -33,9 +33,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKey, ColumnLevelStatisticCache> {
@ -51,18 +50,17 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe
// TODO: Maybe we should trigger a analyze job when the required ColumnStatistic doesn't exists.
private final ConcurrentMap<StatisticsCacheKey, CompletableFuture<ColumnLevelStatisticCache>>
inProgressing = new ConcurrentHashMap<>();
private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime>
inProgressing = new HashMap<>();
@Override
public @NonNull CompletableFuture<ColumnLevelStatisticCache> asyncLoad(@NonNull StatisticsCacheKey key,
@NonNull Executor executor) {
CompletableFuture<ColumnLevelStatisticCache> future = inProgressing.get(key);
if (future != null) {
return future;
CompletableFutureWithCreateTime cfWrapper = inProgressing.get(key);
if (cfWrapper != null) {
return cfWrapper.cf;
}
future = CompletableFuture.supplyAsync(() -> {
CompletableFuture<ColumnLevelStatisticCache> future = CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
LOG.info("Query BE for column stats:{}-{} start time:{}", key.tableId, key.colName,
@ -107,10 +105,50 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe
long endTime = System.currentTimeMillis();
LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName,
endTime, endTime - startTime);
inProgressing.remove(key);
removeFromIProgressing(key);
}
});
inProgressing.put(key, future);
}, executor);
putIntoIProgressing(key, new CompletableFutureWithCreateTime(System.currentTimeMillis(), future));
return future;
}
private void putIntoIProgressing(StatisticsCacheKey k, CompletableFutureWithCreateTime v) {
synchronized (inProgressing) {
inProgressing.put(k, v);
}
}
private void removeFromIProgressing(StatisticsCacheKey k) {
synchronized (inProgressing) {
inProgressing.remove(k);
}
}
public void removeExpiredInProgressing() {
// Quite simple logic that would complete very fast.
// Lock on object to avoid ConcurrentModificationException.
synchronized (inProgressing) {
inProgressing.entrySet().removeIf(e -> e.getValue().isExpired());
}
}
/**
* To make sure any item in the inProgressing would finally be removed to avoid potential mem leak.
*/
private static class CompletableFutureWithCreateTime extends CompletableFuture<ColumnLevelStatisticCache> {
private static final long EXPIRED_TIME_MILLI = TimeUnit.MINUTES.toMillis(30);
public final long startTime;
public final CompletableFuture<ColumnLevelStatisticCache> cf;
public CompletableFutureWithCreateTime(long startTime, CompletableFuture<ColumnLevelStatisticCache> cf) {
this.startTime = startTime;
this.cf = cf;
}
public boolean isExpired() {
return System.currentTimeMillis() - startTime > EXPIRED_TIME_MILLI;
}
}
}