[ehancement](stats) Stats preheating as FE booted (#18502)
1. Support prefetch some column stats when FE booted, it would load column stats that was got updated recently according to the comment of PR #18460 from @morrySnow 2. Refactor stats cache, split histogram cache from column stats, so that we could avoid some redundant query for column statistics table,for example, update the histogram or column stats only, in the previous implementation a united cache loader would send query request to both column stats table and histogram table, 3. Extract some common logic to StatsUtil 4. Remove some useless codes in unit tests, those codes is hard to maintaince and it's not a good idea for testing the accurracy of stats estimation according to the advise from @englefly 5. Add field type restriction when create analysis tasks to avoid unnecessary failure
This commit is contained in:
@ -1345,6 +1345,9 @@ public class Env {
|
||||
LOG.info(msg);
|
||||
// for master, there are some new thread pools need to register metric
|
||||
ThreadPoolManager.registerAllThreadPoolMetric();
|
||||
if (analysisManager != null) {
|
||||
analysisManager.getStatisticsCache().preHeat();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@ -90,9 +90,9 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
|
||||
import org.apache.doris.nereids.types.DataType;
|
||||
import org.apache.doris.statistics.ColumnLevelStatisticCache;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.statistics.ColumnStatisticBuilder;
|
||||
import org.apache.doris.statistics.Histogram;
|
||||
import org.apache.doris.statistics.StatisticRange;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
import org.apache.doris.statistics.StatisticsBuilder;
|
||||
@ -177,7 +177,6 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
|
||||
|
||||
@Override
|
||||
public Statistics visitLogicalOlapScan(LogicalOlapScan olapScan, Void context) {
|
||||
olapScan.getExpressions();
|
||||
return computeScan(olapScan);
|
||||
}
|
||||
|
||||
@ -431,17 +430,22 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
|
||||
for (SlotReference slotReference : slotSet) {
|
||||
String colName = slotReference.getName();
|
||||
if (colName == null) {
|
||||
throw new RuntimeException(String.format("Column %s not found", colName));
|
||||
throw new RuntimeException(String.format("Invalid slot: %s", slotReference.getExprId()));
|
||||
}
|
||||
ColumnLevelStatisticCache cache =
|
||||
Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(table.getId(), -1, colName);
|
||||
if (cache == null || cache.columnStatistic == null) {
|
||||
columnStatisticMap.put(slotReference, ColumnStatistic.UNKNOWN);
|
||||
ColumnStatistic cache =
|
||||
Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(table.getId(), colName);
|
||||
if (cache == ColumnStatistic.UNKNOWN) {
|
||||
columnStatisticMap.put(slotReference, cache);
|
||||
continue;
|
||||
}
|
||||
ColumnStatisticBuilder columnStatisticBuilder =
|
||||
new ColumnStatisticBuilder(cache.columnStatistic).setHistogram(cache.getHistogram());
|
||||
columnStatisticMap.put(slotReference, columnStatisticBuilder.build());
|
||||
Histogram histogram = Env.getCurrentEnv().getStatisticsCache().getHistogram(table.getId(), colName);
|
||||
if (histogram != null) {
|
||||
ColumnStatisticBuilder columnStatisticBuilder =
|
||||
new ColumnStatisticBuilder(cache).setHistogram(histogram);
|
||||
columnStatisticMap.put(slotReference, columnStatisticBuilder.build());
|
||||
cache = columnStatisticBuilder.build();
|
||||
}
|
||||
columnStatisticMap.put(slotReference, cache);
|
||||
}
|
||||
return new Statistics(rowCount, columnStatisticMap);
|
||||
}
|
||||
|
||||
@ -39,8 +39,8 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
|
||||
@ -17,15 +17,12 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
@ -47,27 +44,28 @@ public class AnalysisTaskScheduler {
|
||||
|
||||
private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>();
|
||||
|
||||
public synchronized void schedule(AnalysisTaskInfo analysisJobInfo) {
|
||||
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName);
|
||||
Preconditions.checkArgument(catalog != null);
|
||||
DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName);
|
||||
Preconditions.checkArgument(db != null);
|
||||
TableIf table = db.getTableNullable(analysisJobInfo.tblName);
|
||||
Preconditions.checkArgument(table != null);
|
||||
BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisJobInfo);
|
||||
addToManualJobQueue(analysisTask);
|
||||
if (analysisJobInfo.jobType.equals(JobType.MANUAL)) {
|
||||
return;
|
||||
public synchronized void schedule(AnalysisTaskInfo analysisTaskInfo) {
|
||||
try {
|
||||
TableIf table = StatisticsUtil.findTable(analysisTaskInfo.catalogName,
|
||||
analysisTaskInfo.dbName, analysisTaskInfo.tblName);
|
||||
BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisTaskInfo);
|
||||
switch (analysisTaskInfo.jobType) {
|
||||
case MANUAL:
|
||||
addToManualJobQueue(analysisTask);
|
||||
break;
|
||||
case SYSTEM:
|
||||
addToSystemQueue(analysisTask);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown job type: " + analysisTaskInfo.jobType);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
|
||||
analysisTaskInfo, AnalysisState.FAILED, t.getMessage(), System.currentTimeMillis());
|
||||
}
|
||||
addToSystemQueue(analysisTask);
|
||||
}
|
||||
|
||||
private void removeFromSystemQueue(BaseAnalysisTask analysisJobInfo) {
|
||||
if (manualJobSet.contains(analysisJobInfo)) {
|
||||
systemJobQueue.remove(analysisJobInfo);
|
||||
manualJobSet.remove(analysisJobInfo);
|
||||
}
|
||||
}
|
||||
// Make sure invoker of this method is synchronized on object.
|
||||
|
||||
private void addToSystemQueue(BaseAnalysisTask analysisJobInfo) {
|
||||
if (systemJobSet.contains(analysisJobInfo)) {
|
||||
@ -78,6 +76,7 @@ public class AnalysisTaskScheduler {
|
||||
notify();
|
||||
}
|
||||
|
||||
// Make sure invoker of this method is synchronized on object.
|
||||
private void addToManualJobQueue(BaseAnalysisTask analysisJobInfo) {
|
||||
if (manualJobSet.contains(analysisJobInfo)) {
|
||||
return;
|
||||
@ -90,10 +89,10 @@ public class AnalysisTaskScheduler {
|
||||
public synchronized BaseAnalysisTask getPendingTasks() {
|
||||
while (true) {
|
||||
if (!manualJobQueue.isEmpty()) {
|
||||
return manualJobQueue.poll();
|
||||
return pollAndRemove(manualJobQueue, manualJobSet);
|
||||
}
|
||||
if (!systemJobQueue.isEmpty()) {
|
||||
return systemJobQueue.poll();
|
||||
return pollAndRemove(systemJobQueue, systemJobSet);
|
||||
}
|
||||
try {
|
||||
wait();
|
||||
@ -103,4 +102,11 @@ public class AnalysisTaskScheduler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Poll from queue, remove from set. Make sure invoker of this method is synchronized on object.
|
||||
private BaseAnalysisTask pollAndRemove(Queue<BaseAnalysisTask> q, Set<BaseAnalysisTask> s) {
|
||||
BaseAnalysisTask t = q.poll();
|
||||
s.remove(t);
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.statistics;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
@ -29,10 +30,18 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
public abstract class BaseAnalysisTask {
|
||||
|
||||
public static final Logger LOG = LogManager.getLogger(BaseAnalysisTask.class);
|
||||
|
||||
/**
|
||||
* Stats stored in the column_statistics table basically has two types, `part_id` is null which means it is
|
||||
* aggregate from partition level stats, `part_id` is not null which means it is partition level stats.
|
||||
* For latter, it's id field contains part id, for previous doesn't.
|
||||
*/
|
||||
protected static final String INSERT_PART_STATISTICS = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
@ -93,6 +102,8 @@ public abstract class BaseAnalysisTask {
|
||||
|
||||
protected AnalysisState analysisState;
|
||||
|
||||
protected Set<PrimitiveType> unsupportedType = new HashSet<>();
|
||||
|
||||
@VisibleForTesting
|
||||
public BaseAnalysisTask() {
|
||||
|
||||
@ -104,7 +115,17 @@ public abstract class BaseAnalysisTask {
|
||||
init(info);
|
||||
}
|
||||
|
||||
protected void initUnsupportedType() {
|
||||
unsupportedType.add(PrimitiveType.HLL);
|
||||
unsupportedType.add(PrimitiveType.BITMAP);
|
||||
unsupportedType.add(PrimitiveType.ARRAY);
|
||||
unsupportedType.add(PrimitiveType.MAP);
|
||||
unsupportedType.add(PrimitiveType.JSONB);
|
||||
unsupportedType.add(PrimitiveType.STRUCT);
|
||||
}
|
||||
|
||||
private void init(AnalysisTaskInfo info) {
|
||||
initUnsupportedType();
|
||||
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
|
||||
if (catalog == null) {
|
||||
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,
|
||||
@ -127,9 +148,11 @@ public abstract class BaseAnalysisTask {
|
||||
|| info.analysisType.equals(AnalysisType.HISTOGRAM))) {
|
||||
col = tbl.getColumn(info.colName);
|
||||
if (col == null) {
|
||||
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
|
||||
info, AnalysisState.FAILED, String.format("Column with name %s not exists", info.tblName),
|
||||
System.currentTimeMillis());
|
||||
throw new RuntimeException(String.format("Column with name %s not exists", info.tblName));
|
||||
}
|
||||
if (isUnsupportedType(col.getType().getPrimitiveType())) {
|
||||
throw new RuntimeException(String.format("Column with type %s is not supported",
|
||||
col.getType().toString()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,4 +188,8 @@ public abstract class BaseAnalysisTask {
|
||||
return "COUNT(1) * " + column.getType().getSlotSize();
|
||||
}
|
||||
|
||||
private boolean isUnsupportedType(PrimitiveType type) {
|
||||
return unsupportedType.contains(type);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,52 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
public class ColumnLevelStatisticCache {
|
||||
|
||||
public Histogram histogram;
|
||||
|
||||
public ColumnStatistic columnStatistic;
|
||||
|
||||
public ColumnLevelStatisticCache() {
|
||||
}
|
||||
|
||||
public ColumnLevelStatisticCache(Histogram histogram, ColumnStatistic columnStatistic) {
|
||||
this.histogram = histogram;
|
||||
this.columnStatistic = columnStatistic;
|
||||
}
|
||||
|
||||
public Histogram getHistogram() {
|
||||
return histogram;
|
||||
}
|
||||
|
||||
public void setHistogram(Histogram histogram) {
|
||||
this.histogram = histogram;
|
||||
}
|
||||
|
||||
public ColumnStatistic getColumnStatistic() {
|
||||
if (columnStatistic != null) {
|
||||
return columnStatistic;
|
||||
}
|
||||
return ColumnStatistic.UNKNOWN;
|
||||
}
|
||||
|
||||
public void setColumnStatistic(ColumnStatistic columnStatistic) {
|
||||
this.columnStatistic = columnStatistic;
|
||||
}
|
||||
}
|
||||
@ -94,6 +94,7 @@ public class ColumnStatistic {
|
||||
public final LiteralExpr minExpr;
|
||||
public final LiteralExpr maxExpr;
|
||||
|
||||
// assign value when do stats estimation.
|
||||
public final Histogram histogram;
|
||||
|
||||
public ColumnStatistic(double count, double ndv, double originalNdv, double avgSizeByte,
|
||||
@ -154,7 +155,8 @@ public class ColumnStatistic {
|
||||
}
|
||||
columnStatisticBuilder.setSelectivity(1.0);
|
||||
columnStatisticBuilder.setOriginalNdv(ndv);
|
||||
Histogram histogram = Env.getCurrentEnv().getStatisticsCache().getHistogram(tblId, idxId, colName);
|
||||
Histogram histogram = Env.getCurrentEnv().getStatisticsCache().getHistogram(tblId, idxId, colName)
|
||||
.orElse(null);
|
||||
columnStatisticBuilder.setHistogram(histogram);
|
||||
return columnStatisticBuilder.build();
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -0,0 +1,66 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletionException;
|
||||
|
||||
public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader<Optional<ColumnStatistic>> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class);
|
||||
|
||||
private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
|
||||
+ "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')";
|
||||
|
||||
@Override
|
||||
protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("tblId", String.valueOf(key.tableId));
|
||||
params.put("idxId", String.valueOf(key.idxId));
|
||||
params.put("colId", String.valueOf(key.colName));
|
||||
|
||||
List<ColumnStatistic> columnStatistics;
|
||||
List<ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_COLUMN_STATISTICS));
|
||||
try {
|
||||
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize column statistics", e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
if (CollectionUtils.isEmpty(columnStatistics)) {
|
||||
return Optional.empty();
|
||||
} else {
|
||||
return Optional.of(columnStatistics.get(0));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,65 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletionException;
|
||||
|
||||
public class HistogramCacheLoader extends StatisticsCacheLoader<Optional<Histogram>> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(HistogramCacheLoader.class);
|
||||
|
||||
private static final String QUERY_HISTOGRAM_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.HISTOGRAM_TBL_NAME + " WHERE "
|
||||
+ "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')";
|
||||
|
||||
@Override
|
||||
protected Optional<Histogram> doLoad(StatisticsCacheKey key) {
|
||||
List<Histogram> histogramStatistics;
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("tblId", String.valueOf(key.tableId));
|
||||
params.put("idxId", String.valueOf(key.idxId));
|
||||
params.put("colId", String.valueOf(key.colName));
|
||||
|
||||
List<ResultRow> histogramResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_HISTOGRAM_STATISTICS));
|
||||
try {
|
||||
histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize histogram statistics", e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(histogramStatistics)) {
|
||||
return Optional.of(histogramStatistics.get(0));
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
@ -109,6 +109,6 @@ public class HistogramTask extends BaseAnalysisTask {
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), -1, col.getName());
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshHistogramSync(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,7 +142,7 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
}
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), -1, col.getName());
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
|
||||
private void getStatData(ColumnStatisticsData data, Map<String, String> params) {
|
||||
|
||||
@ -124,7 +124,7 @@ public class MVAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("type", column.getType().toString());
|
||||
StatisticsUtil.execUpdate(ANALYZE_MV_COL, params);
|
||||
Env.getCurrentEnv().getStatisticsCache()
|
||||
.refreshSync(meta.getIndexId(), meta.getIndexId(), column.getName());
|
||||
.refreshColStatsSync(meta.getIndexId(), meta.getIndexId(), column.getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -94,7 +94,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
|
||||
execSQL(sql);
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), -1, col.getName());
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
||||
@ -59,9 +59,8 @@ public class StatisticConstants {
|
||||
*/
|
||||
public static final long STATISTICS_TASKS_TIMEOUT_IN_MS = TimeUnit.MINUTES.toMillis(10);
|
||||
|
||||
public static final long PRELOAD_RETRY_TIMES = 5;
|
||||
|
||||
public static final int LOAD_TASK_LIMITS = 10;
|
||||
|
||||
public static final double DEFAULT_INNER_JOIN_FACTOR = 0.1;
|
||||
public static final long PRELOAD_RETRY_INTERVAL_IN_SECONDS = TimeUnit.SECONDS.toMillis(10);
|
||||
|
||||
}
|
||||
|
||||
@ -17,18 +17,21 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
//import org.apache.doris.common.ThreadPoolManager;
|
||||
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -44,85 +47,159 @@ public class StatisticsCache {
|
||||
= ThreadPoolManager.newDaemonFixedThreadPool(
|
||||
10, Integer.MAX_VALUE, "STATS_FETCH", true);
|
||||
|
||||
private final StatisticsCacheLoader cacheLoader = new StatisticsCacheLoader();
|
||||
private final ColumnStatisticsCacheLoader columnStatisticsCacheLoader = new ColumnStatisticsCacheLoader();
|
||||
private final HistogramCacheLoader histogramCacheLoader = new HistogramCacheLoader();
|
||||
|
||||
private final AsyncLoadingCache<StatisticsCacheKey, ColumnLevelStatisticCache> cache = Caffeine.newBuilder()
|
||||
.maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
|
||||
.expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
|
||||
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
|
||||
.executor(threadPool)
|
||||
.buildAsync(cacheLoader);
|
||||
private final AsyncLoadingCache<StatisticsCacheKey, Optional<ColumnStatistic>> columnStatisticsCache =
|
||||
Caffeine.newBuilder()
|
||||
.maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
|
||||
.expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
|
||||
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
|
||||
.executor(threadPool)
|
||||
.buildAsync(columnStatisticsCacheLoader);
|
||||
|
||||
private final AsyncLoadingCache<StatisticsCacheKey, Optional<Histogram>> histogramCache =
|
||||
Caffeine.newBuilder()
|
||||
.maximumSize(StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE)
|
||||
.expireAfterAccess(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
|
||||
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
|
||||
.executor(threadPool)
|
||||
.buildAsync(histogramCacheLoader);
|
||||
|
||||
{
|
||||
threadPool.submit(() -> {
|
||||
while (true) {
|
||||
try {
|
||||
cacheLoader.removeExpiredInProgressing();
|
||||
Thread.sleep(TimeUnit.MINUTES.toMillis(15));
|
||||
columnStatisticsCacheLoader.removeExpiredInProgressing();
|
||||
histogramCacheLoader.removeExpiredInProgressing();
|
||||
} catch (Throwable t) {
|
||||
// IGNORE
|
||||
}
|
||||
Thread.sleep(TimeUnit.MINUTES.toMillis(15));
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
public ColumnStatistic getColumnStatistics(long tblId, String colName) {
|
||||
ColumnLevelStatisticCache columnLevelStatisticCache = getColumnStatistics(tblId, -1, colName);
|
||||
if (columnLevelStatisticCache == null) {
|
||||
return ColumnStatistic.UNKNOWN;
|
||||
}
|
||||
return columnLevelStatisticCache.columnStatistic;
|
||||
return getColumnStatistics(tblId, -1, colName).orElse(ColumnStatistic.UNKNOWN);
|
||||
}
|
||||
|
||||
public ColumnLevelStatisticCache getColumnStatistics(long tblId, long idxId, String colName) {
|
||||
public Optional<ColumnStatistic> getColumnStatistics(long tblId, long idxId, String colName) {
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
if (ctx != null && ctx.getSessionVariable().internalSession) {
|
||||
return null;
|
||||
return Optional.empty();
|
||||
}
|
||||
StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
|
||||
try {
|
||||
CompletableFuture<ColumnLevelStatisticCache> f = cache.get(k);
|
||||
CompletableFuture<Optional<ColumnStatistic>> f = columnStatisticsCache.get(k);
|
||||
if (f.isDone() && f.get() != null) {
|
||||
return f.get();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unexpected exception while returning ColumnStatistic", e);
|
||||
}
|
||||
return null;
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public Histogram getHistogram(long tblId, String colName) {
|
||||
return getHistogram(tblId, -1, colName);
|
||||
return getHistogram(tblId, -1, colName).orElse(null);
|
||||
}
|
||||
|
||||
public Histogram getHistogram(long tblId, long idxId, String colName) {
|
||||
public Optional<Histogram> getHistogram(long tblId, long idxId, String colName) {
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
if (ctx != null && ctx.getSessionVariable().internalSession) {
|
||||
return null;
|
||||
return Optional.empty();
|
||||
}
|
||||
StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
|
||||
try {
|
||||
CompletableFuture<ColumnLevelStatisticCache> f = cache.get(k);
|
||||
CompletableFuture<Optional<Histogram>> f = histogramCache.get(k);
|
||||
if (f.isDone() && f.get() != null) {
|
||||
return f.get().getHistogram();
|
||||
return f.get();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unexpected exception while returning Histogram", e);
|
||||
}
|
||||
return null;
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
// TODO: finish this method.
|
||||
public void eraseExpiredCache(long tblId, long idxId, String colName) {
|
||||
cache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
|
||||
columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
|
||||
}
|
||||
|
||||
public void updateCache(long tblId, long idxId, String colName, ColumnLevelStatisticCache statistic) {
|
||||
cache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), statistic);
|
||||
public void updateColStatsCache(long tblId, long idxId, String colName, ColumnStatistic statistic) {
|
||||
columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic));
|
||||
}
|
||||
|
||||
public void refreshSync(long tblId, long idxId, String colName) {
|
||||
cache.synchronous().refresh(new StatisticsCacheKey(tblId, idxId, colName));
|
||||
public void refreshColStatsSync(long tblId, long idxId, String colName) {
|
||||
columnStatisticsCache.synchronous().refresh(new StatisticsCacheKey(tblId, idxId, colName));
|
||||
}
|
||||
|
||||
public void refreshHistogramSync(long tblId, long idxId, String colName) {
|
||||
histogramCache.synchronous().refresh(new StatisticsCacheKey(tblId, idxId, colName));
|
||||
}
|
||||
|
||||
public void preHeat() {
|
||||
threadPool.submit(this::doPreHeat);
|
||||
}
|
||||
|
||||
private void doPreHeat() {
|
||||
List<ResultRow> recentStatsUpdatedCols = null;
|
||||
long retryTimes = 0;
|
||||
while (retryTimes < StatisticConstants.PRELOAD_RETRY_TIMES) {
|
||||
try {
|
||||
recentStatsUpdatedCols = StatisticsRepository.fetchRecentStatsUpdatedCol();
|
||||
break;
|
||||
} catch (Throwable t) {
|
||||
// IGNORE
|
||||
}
|
||||
retryTimes++;
|
||||
try {
|
||||
Thread.sleep(StatisticConstants.PRELOAD_RETRY_INTERVAL_IN_SECONDS);
|
||||
} catch (Throwable t) {
|
||||
// IGNORE
|
||||
}
|
||||
}
|
||||
|
||||
if (CollectionUtils.isEmpty(recentStatsUpdatedCols)) {
|
||||
return;
|
||||
}
|
||||
for (ResultRow r : recentStatsUpdatedCols) {
|
||||
try {
|
||||
String tblId = r.getColumnValue("tbl_id");
|
||||
String idxId = r.getColumnValue("idx_id");
|
||||
String colId = r.getColumnValue("col_id");
|
||||
final StatisticsCacheKey k =
|
||||
new StatisticsCacheKey(Long.parseLong(tblId), Long.parseLong(idxId), colId);
|
||||
final ColumnStatistic c = ColumnStatistic.fromResultRow(r);
|
||||
CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>() {
|
||||
|
||||
@Override
|
||||
public Optional<ColumnStatistic> get() throws InterruptedException, ExecutionException {
|
||||
return Optional.of(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean complete(Optional<ColumnStatistic> value) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ColumnStatistic> join() {
|
||||
return Optional.of(c);
|
||||
}
|
||||
};
|
||||
columnStatisticsCache.put(k, f);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Error when preheating stats cache", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class StatisticsCacheKey {
|
||||
|
||||
@ -28,6 +29,8 @@ public class StatisticsCacheKey {
|
||||
public final long idxId;
|
||||
public final String colName;
|
||||
|
||||
private static final String DELIMITER = "-";
|
||||
|
||||
public StatisticsCacheKey(long tableId, String colName) {
|
||||
this(tableId, -1, colName);
|
||||
}
|
||||
@ -54,4 +57,13 @@ public class StatisticsCacheKey {
|
||||
StatisticsCacheKey k = (StatisticsCacheKey) obj;
|
||||
return this.tableId == k.tableId && this.idxId == k.idxId && this.colName.equals(k.colName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringJoiner sj = new StringJoiner(DELIMITER);
|
||||
sj.add(String.valueOf(tableId));
|
||||
sj.add(String.valueOf(idxId));
|
||||
sj.add(colName);
|
||||
return sj.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,90 +17,35 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CompletionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKey, ColumnLevelStatisticCache> {
|
||||
public abstract class StatisticsCacheLoader<V> implements AsyncCacheLoader<StatisticsCacheKey, V> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsCacheLoader.class);
|
||||
|
||||
private static final String QUERY_COLUMN_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
|
||||
+ "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')";
|
||||
|
||||
private static final String QUERY_HISTOGRAM_STATISTICS = "SELECT * FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.HISTOGRAM_TBL_NAME + " WHERE "
|
||||
+ "id = CONCAT('${tblId}', '-', ${idxId}, '-', '${colId}')";
|
||||
|
||||
// TODO: Maybe we should trigger a analyze job when the required ColumnStatistic doesn't exists.
|
||||
|
||||
private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime>
|
||||
inProgressing = new HashMap<>();
|
||||
private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime<V>> inProgressing = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public @NonNull CompletableFuture<ColumnLevelStatisticCache> asyncLoad(@NonNull StatisticsCacheKey key,
|
||||
public @NonNull CompletableFuture<V> asyncLoad(
|
||||
@NonNull StatisticsCacheKey key,
|
||||
@NonNull Executor executor) {
|
||||
CompletableFutureWithCreateTime cfWrapper = inProgressing.get(key);
|
||||
CompletableFutureWithCreateTime<V> cfWrapper = inProgressing.get(key);
|
||||
if (cfWrapper != null) {
|
||||
return cfWrapper.cf;
|
||||
}
|
||||
CompletableFuture<ColumnLevelStatisticCache> future = CompletableFuture.supplyAsync(() -> {
|
||||
CompletableFuture<V> future = CompletableFuture.supplyAsync(() -> {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
LOG.info("Query BE for column stats:{}-{} start time:{}", key.tableId, key.colName,
|
||||
startTime);
|
||||
ColumnLevelStatisticCache statistic = new ColumnLevelStatisticCache();
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("tblId", String.valueOf(key.tableId));
|
||||
params.put("idxId", String.valueOf(key.idxId));
|
||||
params.put("colId", String.valueOf(key.colName));
|
||||
|
||||
List<ColumnStatistic> columnStatistics;
|
||||
List<ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_COLUMN_STATISTICS));
|
||||
try {
|
||||
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize column statistics", e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
if (CollectionUtils.isEmpty(columnStatistics)) {
|
||||
statistic.setColumnStatistic(ColumnStatistic.UNKNOWN);
|
||||
} else {
|
||||
statistic.setColumnStatistic(columnStatistics.get(0));
|
||||
}
|
||||
|
||||
List<Histogram> histogramStatistics;
|
||||
List<ResultRow> histogramResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_HISTOGRAM_STATISTICS));
|
||||
try {
|
||||
histogramStatistics = StatisticsUtil.deserializeToHistogramStatistics(histogramResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize histogram statistics", e);
|
||||
throw new CompletionException(e);
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(histogramStatistics)) {
|
||||
statistic.setHistogram(histogramStatistics.get(0));
|
||||
}
|
||||
return statistic;
|
||||
return doLoad(key);
|
||||
} finally {
|
||||
long endTime = System.currentTimeMillis();
|
||||
LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName,
|
||||
@ -108,11 +53,30 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe
|
||||
removeFromIProgressing(key);
|
||||
}
|
||||
}, executor);
|
||||
putIntoIProgressing(key, new CompletableFutureWithCreateTime(System.currentTimeMillis(), future));
|
||||
putIntoIProgressing(key,
|
||||
new CompletableFutureWithCreateTime<V>(System.currentTimeMillis(), future));
|
||||
return future;
|
||||
}
|
||||
|
||||
private void putIntoIProgressing(StatisticsCacheKey k, CompletableFutureWithCreateTime v) {
|
||||
protected abstract V doLoad(StatisticsCacheKey k);
|
||||
|
||||
private static class CompletableFutureWithCreateTime<V> extends CompletableFuture<V> {
|
||||
|
||||
public final long startTime;
|
||||
public final CompletableFuture<V> cf;
|
||||
private final long expiredTimeMilli = TimeUnit.MINUTES.toMillis(30);
|
||||
|
||||
public CompletableFutureWithCreateTime(long startTime, CompletableFuture<V> cf) {
|
||||
this.startTime = startTime;
|
||||
this.cf = cf;
|
||||
}
|
||||
|
||||
public boolean isExpired() {
|
||||
return System.currentTimeMillis() - startTime > expiredTimeMilli;
|
||||
}
|
||||
}
|
||||
|
||||
private void putIntoIProgressing(StatisticsCacheKey k, CompletableFutureWithCreateTime<V> v) {
|
||||
synchronized (inProgressing) {
|
||||
inProgressing.put(k, v);
|
||||
}
|
||||
@ -131,24 +95,4 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe
|
||||
inProgressing.entrySet().removeIf(e -> e.getValue().isExpired());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* To make sure any item in the inProgressing would finally be removed to avoid potential mem leak.
|
||||
*/
|
||||
private static class CompletableFutureWithCreateTime extends CompletableFuture<ColumnLevelStatisticCache> {
|
||||
|
||||
private static final long EXPIRED_TIME_MILLI = TimeUnit.MINUTES.toMillis(30);
|
||||
|
||||
public final long startTime;
|
||||
public final CompletableFuture<ColumnLevelStatisticCache> cf;
|
||||
|
||||
public CompletableFutureWithCreateTime(long startTime, CompletableFuture<ColumnLevelStatisticCache> cf) {
|
||||
this.startTime = startTime;
|
||||
this.cf = cf;
|
||||
}
|
||||
|
||||
public boolean isExpired() {
|
||||
return System.currentTimeMillis() - startTime > EXPIRED_TIME_MILLI;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,6 +83,13 @@ public class StatisticsRepository {
|
||||
private static final String DROP_TABLE_HISTOGRAM_TEMPLATE = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME
|
||||
+ "." + StatisticConstants.HISTOGRAM_TBL_NAME + " WHERE ${condition}";
|
||||
|
||||
private static final String FETCH_RECENT_STATS_UPDATED_COL =
|
||||
"SELECT * FROM "
|
||||
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.STATISTIC_TBL_NAME
|
||||
+ " WHERE part_id is NULL "
|
||||
+ " ORDER BY update_time DESC LIMIT "
|
||||
+ StatisticConstants.STATISTICS_RECORDS_CACHE_SIZE;
|
||||
|
||||
public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) {
|
||||
ResultRow resultRow = queryColumnStatisticById(tableId, colName);
|
||||
if (resultRow == null) {
|
||||
@ -103,8 +110,8 @@ public class StatisticsRepository {
|
||||
partitionIds.add(partition.getId());
|
||||
}
|
||||
return queryPartitionStatistics(dbObjects.table.getId(),
|
||||
colName, partitionIds).stream().map(ColumnStatistic::fromResultRow).collect(
|
||||
Collectors.toList());
|
||||
colName, partitionIds).stream().map(ColumnStatistic::fromResultRow).collect(
|
||||
Collectors.toList());
|
||||
}
|
||||
|
||||
public static ResultRow queryColumnStatisticById(long tblId, String colName) {
|
||||
@ -259,16 +266,8 @@ public class StatisticsRepository {
|
||||
params.put("max", max == null ? "NULL" : max);
|
||||
params.put("dataSize", String.valueOf(columnStatistic.dataSize));
|
||||
StatisticsUtil.execUpdate(INSERT_INTO_COLUMN_STATISTICS, params);
|
||||
|
||||
Histogram histogram = Env.getCurrentEnv().getStatisticsCache()
|
||||
.getHistogram(objects.table.getId(), -1, colName);
|
||||
|
||||
ColumnLevelStatisticCache statistic = new ColumnLevelStatisticCache();
|
||||
statistic.setHistogram(histogram);
|
||||
statistic.setColumnStatistic(builder.build());
|
||||
|
||||
Env.getCurrentEnv().getStatisticsCache()
|
||||
.updateCache(objects.table.getId(), -1, colName, statistic);
|
||||
.updateColStatsCache(objects.table.getId(), -1, colName, builder.build());
|
||||
}
|
||||
|
||||
public static void dropTableStatistics(DropTableStatsStmt dropTableStatsStmt) {
|
||||
@ -279,4 +278,8 @@ public class StatisticsRepository {
|
||||
dropHistogram(dbId, tbIds, cols, partIds);
|
||||
dropStatistics(dbId, tbIds, cols, partIds);
|
||||
}
|
||||
|
||||
public static List<ResultRow> fetchRecentStatsUpdatedCol() {
|
||||
return StatisticsUtil.execStatisticQuery(FETCH_RECENT_STATS_UPDATED_COL);
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,6 +284,23 @@ public class StatisticsUtil {
|
||||
return tblIf.getColumn(columnName);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
public static Column findColumn(String catalogName, String dbName, String tblName, String columnName)
|
||||
throws Throwable {
|
||||
TableIf tableIf = findTable(catalogName, dbName, tblName);
|
||||
return tableIf.getColumn(columnName);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public static TableIf findTable(String catalogName, String dbName, String tblName) throws Throwable {
|
||||
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
|
||||
.getCatalogOrException(catalogName, c -> new RuntimeException("Catalog: " + c + " not exists"));
|
||||
DatabaseIf db = catalog.getDbOrException(dbName,
|
||||
d -> new RuntimeException("DB: " + d + " not exists"));
|
||||
return db.getTableOrException(tblName,
|
||||
t -> new RuntimeException("Table: " + t + " not exists"));
|
||||
}
|
||||
|
||||
public static boolean isNullOrEmpty(String str) {
|
||||
return Optional.ofNullable(str)
|
||||
.map(String::trim)
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.nereids.jobs.cascades;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.jobs.JobContext;
|
||||
@ -53,8 +52,6 @@ public class DeriveStatsJobTest {
|
||||
|
||||
@Mocked
|
||||
ConnectContext context;
|
||||
@Mocked
|
||||
Env env;
|
||||
|
||||
SlotReference slot1;
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -1,151 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.stats;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.datasets.tpch.TPCHUtils;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.qe.OriginStatement;
|
||||
import org.apache.doris.statistics.ColumnLevelStatisticCache;
|
||||
import org.apache.doris.statistics.StatisticsCache;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
// Assume that all column name is unique in the tested database
|
||||
// CHECKSTYLE OFF
|
||||
public abstract class TestStats extends TestWithFeService {
|
||||
|
||||
protected Map<String/*colname*/, ColumnLevelStatisticCache> stats = new HashMap<>();
|
||||
|
||||
protected List<String> cols = new ArrayList<String>() {{
|
||||
add("id");
|
||||
add("catalog_id");
|
||||
add("db_id");
|
||||
add("tbl_id");
|
||||
add("idx_id");
|
||||
add("col_id");
|
||||
add("part_id");
|
||||
add("count");
|
||||
add("ndv");
|
||||
add("null_count");
|
||||
add("min");
|
||||
add("max");
|
||||
add("data_size_in_bytes");
|
||||
add("update_time");
|
||||
}};
|
||||
|
||||
protected List<PrimitiveType> types = new ArrayList<PrimitiveType>() {{
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.BIGINT);
|
||||
add(PrimitiveType.BIGINT);
|
||||
add(PrimitiveType.BIGINT);
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.VARCHAR);
|
||||
add(PrimitiveType.BIGINT);
|
||||
add(PrimitiveType.DATETIME);
|
||||
}};
|
||||
|
||||
protected List<String> values = new ArrayList<>();
|
||||
|
||||
protected ResultRow resultRow = null;
|
||||
|
||||
protected final static Map<String, Type> colType = new HashMap<>();
|
||||
|
||||
protected abstract void initMockedColumnsStats();
|
||||
|
||||
protected abstract void initQError();
|
||||
|
||||
|
||||
protected abstract void initMockedReturnedRows();
|
||||
|
||||
protected abstract void initEnv() throws Exception;
|
||||
|
||||
protected abstract void initColNameToType();
|
||||
|
||||
protected Map<Integer/*query id*/, Map<PlanNodeId, Double>> mockedExactReturnedRows = new HashMap<>();
|
||||
protected Map<Integer, Double> queryIdToQError = new HashMap<>();
|
||||
|
||||
protected double avgQError;
|
||||
|
||||
|
||||
public void run() throws Exception {
|
||||
new MockUp<StatisticsUtil>() {
|
||||
|
||||
@Mock
|
||||
public Column findColumn(long catalogId, long dbId, long tblId, long idxId, String columnName) {
|
||||
return new Column(columnName, colType.get(columnName));
|
||||
}
|
||||
};
|
||||
initMockedReturnedRows();
|
||||
initColNameToType();
|
||||
initMockedColumnsStats();
|
||||
new MockUp<StatisticsCache>() {
|
||||
@Mock
|
||||
public ColumnLevelStatisticCache getColumnStatistics(long tblId, long idxId, String colName) {
|
||||
return stats.get(colName);
|
||||
}
|
||||
};
|
||||
|
||||
connectContext.getSessionVariable().setEnableNereidsPlanner(true);
|
||||
connectContext.getSessionVariable().enableFallbackToOriginalPlanner = false;
|
||||
StatsErrorEstimator statsErrorEstimator = new StatsErrorEstimator();
|
||||
connectContext.setStatsErrorEstimator(statsErrorEstimator);
|
||||
List<Double> qErrorList = new ArrayList<>();
|
||||
initEnv();
|
||||
for (int i = 0; i < TPCHUtils.SQLS.size(); i++) {
|
||||
String sql = TPCHUtils.SQLS.get(i);
|
||||
int sqlNumber = i + 1;
|
||||
NereidsPlanner nereidsPlanner = new NereidsPlanner(
|
||||
new StatementContext(connectContext, new OriginStatement(sql, 0)));
|
||||
NereidsParser nereidsParser = new NereidsParser();
|
||||
nereidsPlanner.plan(nereidsParser.parseSQL(sql).get(0));
|
||||
Map<PlanNodeId, Double> extractReturnedRows = mockedExactReturnedRows.get(sqlNumber);
|
||||
for (Entry<PlanNodeId, Double> entry : extractReturnedRows.entrySet()) {
|
||||
// statsErrorEstimator.setExactReturnedRow(entry.getKey(), entry.getValue());
|
||||
}
|
||||
qErrorList.add(statsErrorEstimator.calculateQError());
|
||||
statsErrorEstimator = new StatsErrorEstimator();
|
||||
connectContext.setStatsErrorEstimator(statsErrorEstimator);
|
||||
}
|
||||
// Assert.assertTrue(
|
||||
// qErrorList.stream()
|
||||
// .mapToDouble(Double::doubleValue).average().orElseGet(() -> Double.POSITIVE_INFINITY)
|
||||
// <= avgQError + 1);
|
||||
}
|
||||
}
|
||||
@ -19,10 +19,16 @@ package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
@ -40,7 +46,7 @@ import java.util.concurrent.Executor;
|
||||
public class CacheTest extends TestWithFeService {
|
||||
|
||||
@Test
|
||||
public void testColumn(@Mocked StatisticsCacheLoader cacheLoader) throws Exception {
|
||||
public void testColumn(@Mocked ColumnStatisticsCacheLoader cacheLoader) throws Exception {
|
||||
new Expectations() {
|
||||
{
|
||||
cacheLoader.asyncLoad((StatisticsCacheKey) any, (Executor) any);
|
||||
@ -130,6 +136,46 @@ public class CacheTest extends TestWithFeService {
|
||||
|
||||
@Test
|
||||
public void testLoadHistogram() throws Exception {
|
||||
new MockUp<Histogram>() {
|
||||
|
||||
@Mock
|
||||
public Histogram fromResultRow(ResultRow resultRow) {
|
||||
try {
|
||||
HistogramBuilder histogramBuilder = new HistogramBuilder();
|
||||
|
||||
Column col = new Column("abc", PrimitiveType.DATETIME);
|
||||
|
||||
Type dataType = col.getType();
|
||||
histogramBuilder.setDataType(dataType);
|
||||
|
||||
double sampleRate = Double.parseDouble(resultRow.getColumnValue("sample_rate"));
|
||||
histogramBuilder.setSampleRate(sampleRate);
|
||||
|
||||
String json = resultRow.getColumnValue("buckets");
|
||||
JsonObject jsonObj = JsonParser.parseString(json).getAsJsonObject();
|
||||
|
||||
int bucketNum = jsonObj.get("num_buckets").getAsInt();
|
||||
histogramBuilder.setNumBuckets(bucketNum);
|
||||
|
||||
List<Bucket> buckets = Lists.newArrayList();
|
||||
JsonArray jsonArray = jsonObj.getAsJsonArray("buckets");
|
||||
for (JsonElement element : jsonArray) {
|
||||
try {
|
||||
String bucketJson = element.toString();
|
||||
buckets.add(Bucket.deserializeFromJson(dataType, bucketJson));
|
||||
} catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
histogramBuilder.setBuckets(buckets);
|
||||
|
||||
return histogramBuilder.build();
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
};
|
||||
new MockUp<StatisticsUtil>() {
|
||||
|
||||
@Mock
|
||||
@ -185,7 +231,9 @@ public class CacheTest extends TestWithFeService {
|
||||
};
|
||||
|
||||
StatisticsCache statisticsCache = new StatisticsCache();
|
||||
statisticsCache.refreshHistogramSync(0, -1, "col");
|
||||
Thread.sleep(10000);
|
||||
Histogram histogram = statisticsCache.getHistogram(0, "col");
|
||||
Assertions.assertEquals(null, histogram);
|
||||
Assertions.assertNotNull(histogram);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user