[Improvement](multi catalog, statistics)Support two level external statistics cache loader (#20906)
The current column statistic cache loader is to load data from column_statistics olap table. This pr is to change the cache loader logic to First load from column_statistics olap table, if no data was loaded, then load from table metadata. This is mainly to support fetch statistics data for external catalog using HMS or Iceberg api. This is the first PR, next pr will implement the fetch logic for different external catalogs.
This commit is contained in:
@ -1747,7 +1747,7 @@ public class Config extends ConfigBase {
|
||||
* Otherwise, use external catalog metadata.
|
||||
*/
|
||||
@ConfField(mutable = true)
|
||||
public static boolean collect_external_table_stats_by_sql = false;
|
||||
public static boolean collect_external_table_stats_by_sql = true;
|
||||
|
||||
/**
|
||||
* Max num of same name meta informatntion in catalog recycle bin.
|
||||
|
||||
@ -29,6 +29,7 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.persist.CreateTableInfo;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
@ -435,6 +436,11 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CatalogIf getCatalog() {
|
||||
return Env.getCurrentInternalCatalog();
|
||||
}
|
||||
|
||||
public List<Table> getTables() {
|
||||
return new ArrayList<>(idToTable.values());
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
@ -232,4 +233,6 @@ public interface DatabaseIf<T extends TableIf> {
|
||||
}
|
||||
|
||||
void dropTable(String tableName);
|
||||
|
||||
CatalogIf getCatalog();
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.external.hudi.HudiTable;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -47,6 +48,7 @@ import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
@ -542,4 +544,14 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
|
||||
}
|
||||
return Math.max(cardinality, 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatabaseIf getDatabase() {
|
||||
return Env.getCurrentInternalCatalog().getDbNullable(qualifiedDbName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ColumnStatistic> getColumnStatistic() {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
@ -30,6 +31,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -128,6 +130,10 @@ public interface TableIf {
|
||||
|
||||
long estimatedRowCount();
|
||||
|
||||
DatabaseIf getDatabase();
|
||||
|
||||
Optional<ColumnStatistic> getColumnStatistic();
|
||||
|
||||
/**
|
||||
* Doris table type.
|
||||
*/
|
||||
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.InitDatabaseLog;
|
||||
import org.apache.doris.persist.gson.GsonPostProcessable;
|
||||
@ -334,4 +335,9 @@ public abstract class ExternalDatabase<T extends ExternalTable>
|
||||
public void createTable(String tableName, long tableId) {
|
||||
throw new NotImplementedException("createTable() is not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public CatalogIf getCatalog() {
|
||||
return extCatalog;
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.catalog.external;
|
||||
|
||||
import org.apache.doris.alter.AlterCancelException;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
@ -33,6 +34,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
@ -45,6 +47,7 @@ import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
@ -317,6 +320,17 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DatabaseIf getDatabase() {
|
||||
return catalog.getDbNullable(dbName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ColumnStatistic> getColumnStatistic() {
|
||||
// TODO: Implement this interface for all kinds of external table.
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Should only be called in ExternalCatalog's getSchema(),
|
||||
* which is called from schema cache.
|
||||
|
||||
@ -18,17 +18,16 @@
|
||||
package org.apache.doris.catalog.external;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.HudiUtils;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.HiveAnalysisTask;
|
||||
import org.apache.doris.statistics.IcebergAnalysisTask;
|
||||
import org.apache.doris.statistics.StatisticsRepository;
|
||||
import org.apache.doris.statistics.TableStatistic;
|
||||
import org.apache.doris.thrift.THiveTable;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
@ -51,6 +50,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -392,11 +392,15 @@ public class HMSExternalTable extends ExternalTable {
|
||||
@Override
|
||||
public long estimatedRowCount() {
|
||||
try {
|
||||
TableStatistic tableStatistic = StatisticsRepository.fetchTableLevelStats(id);
|
||||
return tableStatistic.rowCount;
|
||||
} catch (DdlException e) {
|
||||
return 1;
|
||||
Optional<TableStatistic> tableStatistics = Env.getCurrentEnv().getStatisticsCache().getTableStatistics(
|
||||
catalog.getId(), catalog.getDbOrAnalysisException(dbName).getId(), id);
|
||||
if (tableStatistics.isPresent()) {
|
||||
return tableStatistics.get().rowCount;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Fail to get row count for table %s", name), e);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
|
||||
|
||||
@ -122,6 +122,8 @@ import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.AbstractMap.SimpleEntry;
|
||||
import java.util.Collections;
|
||||
@ -138,6 +140,8 @@ import java.util.stream.Collectors;
|
||||
public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
|
||||
public static double DEFAULT_AGGREGATE_RATIO = 0.5;
|
||||
public static double DEFAULT_COLUMN_NDV_RATIO = 0.5;
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(StatsCalculator.class);
|
||||
private final GroupExpression groupExpression;
|
||||
|
||||
private boolean forbidUnknownColStats = false;
|
||||
@ -529,7 +533,21 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
|
||||
} else if (isPlayNereidsDump) {
|
||||
return ColumnStatistic.UNKNOWN;
|
||||
} else {
|
||||
return Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(table.getId(), colName);
|
||||
long catalogId;
|
||||
long dbId;
|
||||
try {
|
||||
catalogId = table.getDatabase().getCatalog().getId();
|
||||
dbId = table.getDatabase().getId();
|
||||
} catch (Exception e) {
|
||||
// Use -1 for catalog id and db id when failed to get them from metadata.
|
||||
// This is OK because catalog id and db id is not in the hashcode function of ColumnStatistics cache
|
||||
// and the table id is globally unique.
|
||||
LOG.debug(String.format("Fail to get catalog id and db id for table %s", table.getName()));
|
||||
catalogId = -1;
|
||||
dbId = -1;
|
||||
}
|
||||
return Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(
|
||||
catalogId, dbId, table.getId(), colName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -695,7 +695,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
long tblId = dropStatsStmt.getTblId();
|
||||
StatisticsRepository.dropStatistics(tblId, cols);
|
||||
for (String col : cols) {
|
||||
Env.getCurrentEnv().getStatisticsCache().invidate(tblId, -1L, col);
|
||||
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
@ -30,7 +32,6 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletionException;
|
||||
|
||||
public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader<Optional<ColumnStatistic>> {
|
||||
|
||||
@ -42,20 +43,39 @@ public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader<Optional<
|
||||
|
||||
@Override
|
||||
protected Optional<ColumnStatistic> doLoad(StatisticsCacheKey key) {
|
||||
// Load from statistics table.
|
||||
Optional<ColumnStatistic> columnStatistic = loadFromStatsTable(String.valueOf(key.tableId),
|
||||
String.valueOf(key.idxId), key.colName);
|
||||
if (columnStatistic.isPresent()) {
|
||||
return columnStatistic;
|
||||
}
|
||||
// Load from data source metadata
|
||||
try {
|
||||
TableIf table = Env.getCurrentEnv().getCatalogMgr().getCatalog(key.catalogId)
|
||||
.getDbOrMetaException(key.dbId).getTableOrMetaException(key.tableId);
|
||||
columnStatistic = table.getColumnStatistic();
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Exception to get column statistics by metadata. [Catalog:%d, DB:%d, Table:%d]",
|
||||
key.catalogId, key.dbId, key.tableId), e);
|
||||
}
|
||||
return columnStatistic;
|
||||
}
|
||||
|
||||
private Optional<ColumnStatistic> loadFromStatsTable(String tableId, String idxId, String colName) {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("tblId", String.valueOf(key.tableId));
|
||||
params.put("idxId", String.valueOf(key.idxId));
|
||||
params.put("colId", String.valueOf(key.colName));
|
||||
params.put("tblId", tableId);
|
||||
params.put("idxId", idxId);
|
||||
params.put("colId", colName);
|
||||
|
||||
List<ColumnStatistic> columnStatistics;
|
||||
List<ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(QUERY_COLUMN_STATISTICS));
|
||||
.replace(QUERY_COLUMN_STATISTICS));
|
||||
try {
|
||||
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResult);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize column statistics", e);
|
||||
throw new CompletionException(e);
|
||||
LOG.warn("Exception to deserialize column statistics", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
if (CollectionUtils.isEmpty(columnStatistics)) {
|
||||
return Optional.empty();
|
||||
|
||||
@ -256,7 +256,11 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
|
||||
|
||||
private Map<String, String> buildTableStatsParams(String partId) {
|
||||
Map<String, String> commonParams = new HashMap<>();
|
||||
commonParams.put("id", String.valueOf(tbl.getId()));
|
||||
String id = StatisticsUtil.constructId(tbl.getId(), -1);
|
||||
if (!partId.equals("NULL")) {
|
||||
id = StatisticsUtil.constructId(id, partId);
|
||||
}
|
||||
commonParams.put("id", id);
|
||||
commonParams.put("catalogId", String.valueOf(catalog.getId()));
|
||||
commonParams.put("dbId", String.valueOf(db.getId()));
|
||||
commonParams.put("tblId", String.valueOf(tbl.getId()));
|
||||
|
||||
@ -63,7 +63,8 @@ public class OlapScanStatsDerive extends BaseStatsDerive {
|
||||
for (Map.Entry<Id, String> entry : slotIdToTableIdAndColumnName.entrySet()) {
|
||||
String colName = entry.getValue();
|
||||
ColumnStatistic statistic =
|
||||
Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(table.getId(), colName);
|
||||
Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(
|
||||
table.getDatabase().getCatalog().getId(), table.getDatabase().getId(), table.getId(), colName);
|
||||
if (!statistic.isUnKnown) {
|
||||
rowCount = statistic.count;
|
||||
}
|
||||
|
||||
@ -51,6 +51,7 @@ public class StatisticsCache {
|
||||
|
||||
private final ColumnStatisticsCacheLoader columnStatisticsCacheLoader = new ColumnStatisticsCacheLoader();
|
||||
private final HistogramCacheLoader histogramCacheLoader = new HistogramCacheLoader();
|
||||
private final TableStatisticsCacheLoader tableStatisticsCacheLoader = new TableStatisticsCacheLoader();
|
||||
|
||||
private final AsyncLoadingCache<StatisticsCacheKey, Optional<ColumnStatistic>> columnStatisticsCache =
|
||||
Caffeine.newBuilder()
|
||||
@ -68,12 +69,20 @@ public class StatisticsCache {
|
||||
.executor(threadPool)
|
||||
.buildAsync(histogramCacheLoader);
|
||||
|
||||
private final AsyncLoadingCache<StatisticsCacheKey, Optional<TableStatistic>> tableStatisticsCache =
|
||||
Caffeine.newBuilder()
|
||||
.maximumSize(Config.stats_cache_size)
|
||||
.expireAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
|
||||
.executor(threadPool)
|
||||
.buildAsync(tableStatisticsCacheLoader);
|
||||
|
||||
{
|
||||
threadPool.submit(() -> {
|
||||
while (true) {
|
||||
try {
|
||||
columnStatisticsCacheLoader.removeExpiredInProgressing();
|
||||
histogramCacheLoader.removeExpiredInProgressing();
|
||||
tableStatisticsCacheLoader.removeExpiredInProgressing();
|
||||
} catch (Throwable t) {
|
||||
// IGNORE
|
||||
}
|
||||
@ -83,16 +92,17 @@ public class StatisticsCache {
|
||||
});
|
||||
}
|
||||
|
||||
public ColumnStatistic getColumnStatistics(long tblId, String colName) {
|
||||
return getColumnStatistics(tblId, -1, colName).orElse(ColumnStatistic.UNKNOWN);
|
||||
public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId, String colName) {
|
||||
return getColumnStatistics(catalogId, dbId, tblId, -1, colName).orElse(ColumnStatistic.UNKNOWN);
|
||||
}
|
||||
|
||||
public Optional<ColumnStatistic> getColumnStatistics(long tblId, long idxId, String colName) {
|
||||
public Optional<ColumnStatistic> getColumnStatistics(long catalogId, long dbId,
|
||||
long tblId, long idxId, String colName) {
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
if (ctx != null && ctx.getSessionVariable().internalSession) {
|
||||
return Optional.empty();
|
||||
}
|
||||
StatisticsCacheKey k = new StatisticsCacheKey(tblId, idxId, colName);
|
||||
StatisticsCacheKey k = new StatisticsCacheKey(catalogId, dbId, tblId, idxId, colName);
|
||||
try {
|
||||
CompletableFuture<Optional<ColumnStatistic>> f = columnStatisticsCache.get(k);
|
||||
if (f.isDone()) {
|
||||
@ -125,7 +135,23 @@ public class StatisticsCache {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public void invidate(long tblId, long idxId, String colName) {
|
||||
public Optional<TableStatistic> getTableStatistics(long catalogId, long dbId, long tableId) {
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
if (ctx != null && ctx.getSessionVariable().internalSession) {
|
||||
return Optional.empty();
|
||||
}
|
||||
StatisticsCacheKey k = new StatisticsCacheKey(catalogId, dbId, tableId);
|
||||
try {
|
||||
CompletableFuture<Optional<TableStatistic>> f = tableStatisticsCache.get(k);
|
||||
// Synchronous return the cache value for table row count.
|
||||
return f.get();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unexpected exception while returning Histogram", e);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public void invalidate(long tblId, long idxId, String colName) {
|
||||
columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
|
||||
}
|
||||
|
||||
|
||||
@ -24,7 +24,10 @@ public class StatisticsCacheKey {
|
||||
|
||||
/**
|
||||
* May be index id either, since they are natively same in the code.
|
||||
* catalogId and dbId are not included in the hashCode. Because tableId is globally unique.
|
||||
*/
|
||||
public final long catalogId;
|
||||
public final long dbId;
|
||||
public final long tableId;
|
||||
public final long idxId;
|
||||
public final String colName;
|
||||
@ -36,6 +39,16 @@ public class StatisticsCacheKey {
|
||||
}
|
||||
|
||||
public StatisticsCacheKey(long tableId, long idxId, String colName) {
|
||||
this(-1, -1, tableId, idxId, colName);
|
||||
}
|
||||
|
||||
public StatisticsCacheKey(long catalogId, long dbId, long tableId) {
|
||||
this(catalogId, dbId, tableId, -1, "");
|
||||
}
|
||||
|
||||
public StatisticsCacheKey(long catalogId, long dbId, long tableId, long idxId, String colName) {
|
||||
this.catalogId = catalogId;
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
this.idxId = idxId;
|
||||
this.colName = colName;
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
|
||||
public class TableStatisticsCacheLoader extends StatisticsCacheLoader<Optional<TableStatistic>> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(TableStatisticsCacheLoader.class);
|
||||
|
||||
@Override
|
||||
protected Optional<TableStatistic> doLoad(StatisticsCacheKey key) {
|
||||
try {
|
||||
TableStatistic tableStatistic = StatisticsRepository.fetchTableLevelStats(key.tableId);
|
||||
return Optional.of(tableStatistic);
|
||||
} catch (DdlException e) {
|
||||
LOG.debug("Fail to get table line number from table_statistics table. "
|
||||
+ "Will try to get from data source.", e);
|
||||
}
|
||||
// Get row count by call TableIf interface getRowCount
|
||||
// when statistic table doesn't contain a record for this table.
|
||||
try {
|
||||
TableIf table = Env.getCurrentEnv().getCatalogMgr().getCatalog(key.catalogId)
|
||||
.getDbOrDdlException(key.dbId).getTableOrAnalysisException(key.tableId);
|
||||
long rowCount = table.getRowCount();
|
||||
long lastAnalyzeTimeInMs = System.currentTimeMillis();
|
||||
String updateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(lastAnalyzeTimeInMs));
|
||||
Optional.of(new TableStatistic(rowCount, lastAnalyzeTimeInMs, updateTime));
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Fail to get row count for table %d", key.tableId), e);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
@ -18,8 +18,13 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.HMSExternalDatabase;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
@ -61,10 +66,10 @@ public class CacheTest extends TestWithFeService {
|
||||
}
|
||||
};
|
||||
StatisticsCache statisticsCache = new StatisticsCache();
|
||||
ColumnStatistic c = statisticsCache.getColumnStatistics(1, "col");
|
||||
ColumnStatistic c = statisticsCache.getColumnStatistics(-1, -1, 1, "col");
|
||||
Assertions.assertTrue(c.isUnKnown);
|
||||
Thread.sleep(100);
|
||||
c = statisticsCache.getColumnStatistics(1, "col");
|
||||
c = statisticsCache.getColumnStatistics(-1, -1, 1, "col");
|
||||
Assertions.assertTrue(c.isUnKnown);
|
||||
}
|
||||
|
||||
@ -125,10 +130,10 @@ public class CacheTest extends TestWithFeService {
|
||||
}
|
||||
};
|
||||
StatisticsCache statisticsCache = new StatisticsCache();
|
||||
ColumnStatistic columnStatistic = statisticsCache.getColumnStatistics(0, "col");
|
||||
ColumnStatistic columnStatistic = statisticsCache.getColumnStatistics(-1, -1, 0, "col");
|
||||
Assertions.assertTrue(columnStatistic.isUnKnown);
|
||||
Thread.sleep(1000);
|
||||
columnStatistic = statisticsCache.getColumnStatistics(0, "col");
|
||||
columnStatistic = statisticsCache.getColumnStatistics(-1, -1, 0, "col");
|
||||
Assertions.assertEquals(1, columnStatistic.count);
|
||||
Assertions.assertEquals(2, columnStatistic.ndv);
|
||||
Assertions.assertEquals(10, columnStatistic.maxValue);
|
||||
@ -236,4 +241,60 @@ public class CacheTest extends TestWithFeService {
|
||||
Histogram histogram = statisticsCache.getHistogram(0, "col");
|
||||
Assertions.assertNotNull(histogram);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadFromMeta(@Mocked Env env,
|
||||
@Mocked CatalogMgr mgr,
|
||||
@Mocked HMSExternalCatalog catalog,
|
||||
@Mocked HMSExternalDatabase db,
|
||||
@Mocked HMSExternalTable table) throws Exception {
|
||||
new MockUp<StatisticsUtil>() {
|
||||
|
||||
@Mock
|
||||
public Column findColumn(long catalogId, long dbId, long tblId, long idxId, String columnName) {
|
||||
return new Column("abc", PrimitiveType.BIGINT);
|
||||
}
|
||||
|
||||
@Mock
|
||||
public List<ResultRow> execStatisticQuery(String sql) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
new MockUp<Env>() {
|
||||
@Mock
|
||||
public Env getCurrentEnv() {
|
||||
return env;
|
||||
}
|
||||
};
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
env.getCatalogMgr();
|
||||
result = mgr;
|
||||
|
||||
mgr.getCatalog(1);
|
||||
result = catalog;
|
||||
|
||||
catalog.getDbOrMetaException(1);
|
||||
result = db;
|
||||
|
||||
db.getTableOrMetaException(1);
|
||||
result = table;
|
||||
|
||||
table.getColumnStatistic();
|
||||
result = new ColumnStatistic(1, 2, null, 3, 4, 5, 6, 7, 8, null, null, false, null);
|
||||
}
|
||||
};
|
||||
StatisticsCache statisticsCache = new StatisticsCache();
|
||||
ColumnStatistic columnStatistic = statisticsCache.getColumnStatistics(1, 1, 1, "col");
|
||||
Thread.sleep(3000);
|
||||
columnStatistic = statisticsCache.getColumnStatistics(1, 1, 1, "col");
|
||||
Assertions.assertEquals(1, columnStatistic.count);
|
||||
Assertions.assertEquals(2, columnStatistic.ndv);
|
||||
Assertions.assertEquals(3, columnStatistic.avgSizeByte);
|
||||
Assertions.assertEquals(4, columnStatistic.numNulls);
|
||||
Assertions.assertEquals(5, columnStatistic.dataSize);
|
||||
Assertions.assertEquals(6, columnStatistic.minValue);
|
||||
Assertions.assertEquals(7, columnStatistic.maxValue);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user