[refactor](stats) Remove useless async loader code. (#31380)
This commit is contained in:
@ -22,26 +22,17 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class BasicAsyncCacheLoader<K, V> implements AsyncCacheLoader<K, V> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(BasicAsyncCacheLoader.class);
|
||||
|
||||
private final Map<K, CompletableFutureWithCreateTime<V>> inProgressing = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public @NonNull CompletableFuture<V> asyncLoad(
|
||||
@NonNull K key,
|
||||
@NonNull Executor executor) {
|
||||
CompletableFutureWithCreateTime<V> cfWrapper = inProgressing.get(key);
|
||||
if (cfWrapper != null) {
|
||||
return cfWrapper.cf;
|
||||
}
|
||||
CompletableFuture<V> future = CompletableFuture.supplyAsync(() -> {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
@ -49,49 +40,10 @@ public abstract class BasicAsyncCacheLoader<K, V> implements AsyncCacheLoader<K,
|
||||
} finally {
|
||||
long endTime = System.currentTimeMillis();
|
||||
LOG.info("Load statistic cache [{}] cost time ms:{}", key, endTime - startTime);
|
||||
removeFromIProgressing(key);
|
||||
}
|
||||
}, executor);
|
||||
putIntoIProgressing(key,
|
||||
new CompletableFutureWithCreateTime<V>(System.currentTimeMillis(), future));
|
||||
return future;
|
||||
}
|
||||
|
||||
protected abstract V doLoad(K k);
|
||||
|
||||
private static class CompletableFutureWithCreateTime<V> extends CompletableFuture<V> {
|
||||
|
||||
public final long startTime;
|
||||
public final CompletableFuture<V> cf;
|
||||
private final long expiredTimeMilli = TimeUnit.MINUTES.toMillis(30);
|
||||
|
||||
public CompletableFutureWithCreateTime(long startTime, CompletableFuture<V> cf) {
|
||||
this.startTime = startTime;
|
||||
this.cf = cf;
|
||||
}
|
||||
|
||||
public boolean isExpired() {
|
||||
return System.currentTimeMillis() - startTime > expiredTimeMilli;
|
||||
}
|
||||
}
|
||||
|
||||
private void putIntoIProgressing(K k, CompletableFutureWithCreateTime<V> v) {
|
||||
synchronized (inProgressing) {
|
||||
inProgressing.put(k, v);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeFromIProgressing(K k) {
|
||||
synchronized (inProgressing) {
|
||||
inProgressing.remove(k);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeExpiredInProgressing() {
|
||||
// Quite simple logic that would complete very fast.
|
||||
// Lock on object to avoid ConcurrentModificationException.
|
||||
synchronized (inProgressing) {
|
||||
inProgressing.entrySet().removeIf(e -> e.getValue().isExpired());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,7 +18,6 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.qe.InternalQueryExecutionException;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -27,19 +26,11 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
|
||||
|
||||
public class ColumnStatisticsCacheLoader extends BasicAsyncCacheLoader<StatisticsCacheKey, Optional<ColumnStatistic>> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class);
|
||||
|
||||
private static final ThreadPoolExecutor singleThreadPool = ThreadPoolManager.newDaemonFixedThreadPool(
|
||||
StatisticConstants.RETRY_LOAD_THREAD_POOL_SIZE,
|
||||
StatisticConstants.RETRY_LOAD_QUEUE_SIZE, "STATS_RELOAD",
|
||||
true,
|
||||
new DiscardOldestPolicy());
|
||||
|
||||
@Override
|
||||
protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
|
||||
Optional<ColumnStatistic> columnStatistic = Optional.empty();
|
||||
|
||||
@ -44,7 +44,6 @@ import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class StatisticsCache {
|
||||
|
||||
@ -75,21 +74,6 @@ public class StatisticsCache {
|
||||
.executor(threadPool)
|
||||
.buildAsync(histogramCacheLoader);
|
||||
|
||||
{
|
||||
threadPool.submit(() -> {
|
||||
while (true) {
|
||||
try {
|
||||
columnStatisticsCacheLoader.removeExpiredInProgressing();
|
||||
histogramCacheLoader.removeExpiredInProgressing();
|
||||
} catch (Throwable t) {
|
||||
// IGNORE
|
||||
}
|
||||
Thread.sleep(TimeUnit.MINUTES.toMillis(15));
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId, long idxId, String colName) {
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
if (ctx != null && ctx.getSessionVariable().internalSession) {
|
||||
|
||||
@ -266,7 +266,7 @@ public class CacheTest extends TestWithFeService {
|
||||
Assertions.assertEquals(6, columnStatistic.minValue);
|
||||
Assertions.assertEquals(7, columnStatistic.maxValue);
|
||||
} else {
|
||||
System.out.println("Cached is not loaded, skip test.");
|
||||
System.out.println("Cache is not loaded, skip test.");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
|
||||
Reference in New Issue
Block a user