[nerids](statistics) remove lock in statistics cache loader #17833
remove the redandunt lock in the CacheLoader, since it use the forkjoinpool in default Add execute time log for collect stats Avoid submit duplicate task, when there already has a task to load for the same column
This commit is contained in:
@ -33,6 +33,8 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKey, ColumnLevelStatisticCache> {
|
||||
@ -47,70 +49,68 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe
|
||||
+ "." + StatisticConstants.HISTOGRAM_TBL_NAME + " WHERE "
|
||||
+ "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')";
|
||||
|
||||
private static int CUR_RUNNING_LOAD = 0;
|
||||
|
||||
private static final Object LOCK = new Object();
|
||||
|
||||
// TODO: Maybe we should trigger a analyze job when the required ColumnStatistic doesn't exists.
|
||||
|
||||
private final ConcurrentMap<StatisticsCacheKey, CompletableFuture<ColumnLevelStatisticCache>>
|
||||
inProgressing = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public @NonNull CompletableFuture<ColumnLevelStatisticCache> asyncLoad(@NonNull StatisticsCacheKey key,
|
||||
@NonNull Executor executor) {
|
||||
synchronized (LOCK) {
|
||||
if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) {
|
||||
try {
|
||||
LOCK.wait();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Ignore interruption", e);
|
||||
}
|
||||
}
|
||||
CUR_RUNNING_LOAD++;
|
||||
return CompletableFuture.supplyAsync(() -> {
|
||||
ColumnLevelStatisticCache statistic = new ColumnLevelStatisticCache();
|
||||
|
||||
try {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("tblId", String.valueOf(key.tableId));
|
||||
params.put("idxId", String.valueOf(key.idxId));
|
||||
params.put("colId", String.valueOf(key.colName));
|
||||
|
||||
List<ColumnStatistic> columnStatistics;
|
||||
List<ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_COLUMN_STATISTICS));
|
||||
try {
|
||||
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize column statistics", e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
if (CollectionUtils.isEmpty(columnStatistics)) {
|
||||
statistic.setColumnStatistic(ColumnStatistic.UNKNOWN);
|
||||
} else {
|
||||
statistic.setColumnStatistic(columnStatistics.get(0));
|
||||
}
|
||||
|
||||
List<Histogram> histogramStatistics;
|
||||
List<ResultRow> histogramResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_HISTOGRAM_STATISTICS));
|
||||
try {
|
||||
histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize histogram statistics", e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(histogramStatistics)) {
|
||||
statistic.setHistogram(histogramStatistics.get(0));
|
||||
}
|
||||
} finally {
|
||||
synchronized (LOCK) {
|
||||
CUR_RUNNING_LOAD--;
|
||||
LOCK.notify();
|
||||
}
|
||||
}
|
||||
|
||||
return statistic;
|
||||
});
|
||||
CompletableFuture<ColumnLevelStatisticCache> future = inProgressing.get(key);
|
||||
if (future != null) {
|
||||
return future;
|
||||
}
|
||||
future = CompletableFuture.supplyAsync(() -> {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
LOG.info("Query BE for column stats:{}-{} start time:{}", key.tableId, key.colName,
|
||||
startTime);
|
||||
ColumnLevelStatisticCache statistic = new ColumnLevelStatisticCache();
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("tblId", String.valueOf(key.tableId));
|
||||
params.put("idxId", String.valueOf(key.idxId));
|
||||
params.put("colId", String.valueOf(key.colName));
|
||||
|
||||
List<ColumnStatistic> columnStatistics;
|
||||
List<ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_COLUMN_STATISTICS));
|
||||
try {
|
||||
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize column statistics", e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
if (CollectionUtils.isEmpty(columnStatistics)) {
|
||||
statistic.setColumnStatistic(ColumnStatistic.UNKNOWN);
|
||||
} else {
|
||||
statistic.setColumnStatistic(columnStatistics.get(0));
|
||||
}
|
||||
|
||||
List<Histogram> histogramStatistics;
|
||||
List<ResultRow> histogramResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_HISTOGRAM_STATISTICS));
|
||||
try {
|
||||
histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize histogram statistics", e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(histogramStatistics)) {
|
||||
statistic.setHistogram(histogramStatistics.get(0));
|
||||
}
|
||||
return statistic;
|
||||
} finally {
|
||||
long endTime = System.currentTimeMillis();
|
||||
LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName,
|
||||
endTime, endTime - startTime);
|
||||
inProgressing.remove(key);
|
||||
}
|
||||
});
|
||||
inProgressing.put(key, future);
|
||||
return future;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user