Refactor get row count related interface, add row count cache for external table. (#31276)

This commit is contained in:
Jibing-Li
2024-02-23 13:02:13 +08:00
committed by yiguolei
parent 9508546cc8
commit 9a40b6c978
21 changed files with 180 additions and 159 deletions

View File

@ -1260,7 +1260,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public long getRowCount() {
public long fetchRowCount() {
long rowCount = 0;
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
rowCount += entry.getValue().getBaseIndex().getRowCount();
@ -1277,11 +1277,6 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
return rowCount;
}
@Override
public long getCacheRowCount() {
return getRowCount();
}
@Override
public long getAvgRowLength() {
long rowCount = 0;

View File

@ -394,11 +394,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
}
public long getRowCount() {
return 0;
}
public long getCacheRowCount() {
return getRowCount();
return fetchRowCount();
}
public long getAvgRowLength() {
@ -605,24 +601,6 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
throw new NotImplementedException("createAnalysisTask not implemented");
}
/**
* for NOT-ANALYZED Olap table, return estimated row count,
* for other table, return 1
* @return estimated row count
*/
public long estimatedRowCount() {
long cardinality = 0;
if (this instanceof OlapTable) {
OlapTable table = (OlapTable) this;
for (long selectedPartitionId : table.getPartitionIds()) {
final Partition partition = table.getPartition(selectedPartitionId);
final MaterializedIndex baseIndex = partition.getBaseIndex();
cardinality += baseIndex.getRowCount();
}
}
return Math.max(cardinality, 1);
}
@Override
public DatabaseIf getDatabase() {
return Env.getCurrentInternalCatalog().getDbNullable(qualifiedDbName);
@ -649,4 +627,9 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
}
@Override
public long fetchRowCount() {
return 0;
}
}

View File

@ -135,9 +135,7 @@ public interface TableIf {
long getRowCount();
// Get the exact number of rows in the internal table;
// Get the number of cached rows or estimated rows in the external table, if not, return -1.
long getCacheRowCount();
long fetchRowCount();
long getDataLength();
@ -151,7 +149,10 @@ public interface TableIf {
BaseAnalysisTask createAnalysisTask(AnalysisInfo info);
long estimatedRowCount();
// For empty table, nereids require getting 1 as row count. This is a wrap function for nereids to call getRowCount.
default long getRowCountForNereids() {
return Math.max(getRowCount(), 1);
}
DatabaseIf getDatabase();

View File

@ -44,7 +44,8 @@ import java.util.concurrent.ExecutorService;
/**
* Cache meta of external catalog
* 1. Meta for hive meta store, mainly for partition.
* 2. Table Schema cahce.
* 2. Table Schema cache.
* 3. Row count cache.
*/
public class ExternalMetaCacheMgr {
private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class);
@ -58,6 +59,8 @@ public class ExternalMetaCacheMgr {
private ExecutorService executor;
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
// all external table row count cache.
private ExternalRowCountCache rowCountCache;
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
@ -68,6 +71,7 @@ public class ExternalMetaCacheMgr {
"ExternalMetaCacheMgr", 120, true);
hudiPartitionMgr = HudiPartitionMgr.get(executor);
fsCache = new FileSystemCache(executor);
rowCountCache = new ExternalRowCountCache(executor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr();
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
}
@ -114,6 +118,10 @@ public class ExternalMetaCacheMgr {
return fsCache;
}
public ExternalRowCountCache getRowCountCache() {
return rowCountCache;
}
public void removeCache(long catalogId) {
if (cacheMap.remove(catalogId) != null) {
LOG.info("remove hive metastore cache for catalog {}", catalogId);

View File

@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.statistics.BasicAsyncCacheLoader;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public class ExternalRowCountCache {
private static final Logger LOG = LogManager.getLogger(ExternalRowCountCache.class);
private final AsyncLoadingCache<RowCountKey, Optional<Long>> rowCountCache;
public ExternalRowCountCache(ExecutorService executor) {
rowCountCache = Caffeine.newBuilder()
.maximumSize(Config.max_external_table_row_count_cache_num)
.expireAfterWrite(Duration.ofMinutes(Config.external_cache_expire_time_minutes_after_access))
.executor(executor)
.buildAsync(new RowCountCacheLoader());
}
public static class RowCountKey {
private final long catalogId;
private final long dbId;
private final long tableId;
public RowCountKey(long catalogId, long dbId, long tableId) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof RowCountKey)) {
return false;
}
return ((RowCountKey) obj).tableId == this.tableId;
}
@Override
public int hashCode() {
return (int) tableId;
}
}
public static class RowCountCacheLoader extends BasicAsyncCacheLoader<RowCountKey, Optional<Long>> {
@Override
protected Optional<Long> doLoad(RowCountKey rowCountKey) {
try {
TableIf table = StatisticsUtil.findTable(rowCountKey.catalogId, rowCountKey.dbId, rowCountKey.tableId);
return Optional.of(table.fetchRowCount());
} catch (Exception e) {
LOG.warn("Failed to get table with catalogId {}, dbId {}, tableId {}", rowCountKey.catalogId,
rowCountKey.dbId, rowCountKey.tableId);
return Optional.empty();
}
}
}
/**
* Get cached row count for the given table. Return 0 if cached not loaded or table not exists.
* Cached will be loaded async.
* @param catalogId
* @param dbId
* @param tableId
* @return Cached row count or 0 if not exist
*/
public long getCachedRowCount(long catalogId, long dbId, long tableId) {
RowCountKey key = new RowCountKey(catalogId, dbId, tableId);
try {
CompletableFuture<Optional<Long>> f = rowCountCache.get(key);
if (f.isDone()) {
return f.get().orElse(0L);
}
} catch (Exception e) {
LOG.warn("Unexpected exception while returning row count", e);
}
return 0;
}
}

View File

@ -297,10 +297,16 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
@Override
public long getRowCount() {
return 0;
// All external table should get external row count from cache.
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
}
public long getCacheRowCount() {
@Override
/**
* Default return 0. Subclass need to implement this interface.
* This is called by ExternalRowCountCache to load row count cache.
*/
public long fetchRowCount() {
return 0;
}
@ -351,11 +357,6 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
throw new NotImplementedException("createAnalysisTask not implemented");
}
@Override
public long estimatedRowCount() {
return 1;
}
@Override
public DatabaseIf getDatabase() {
return catalog.getDbNullable(dbName);

View File

@ -40,7 +40,6 @@ import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.HMSAnalysisTask;
import org.apache.doris.statistics.StatsType;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
@ -146,9 +145,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
private DLAType dlaType = DLAType.UNKNOWN;
// No as precise as row count in TableStats, but better than none.
private long estimatedRowCount = -1;
// record the event update time when enable hms event listener
protected volatile long eventUpdateTime;
@ -196,7 +192,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
}
objectCreated = true;
estimatedRowCount = getRowCountFromExternalSource(true);
}
}
@ -319,24 +314,11 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
return 0;
}
@Override
public long getRowCount() {
makeSureInitialized();
long rowCount = getRowCountFromExternalSource(false);
if (rowCount == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Will estimate row count from file list.");
}
rowCount = StatisticsUtil.getRowCountFromFileList(this);
}
return rowCount;
}
private long getRowCountFromExternalSource(boolean isInit) {
private long getRowCountFromExternalSource() {
long rowCount;
switch (dlaType) {
case HIVE:
rowCount = StatisticsUtil.getHiveRowCount(this, isInit);
rowCount = StatisticsUtil.getHiveRowCount(this);
break;
case ICEBERG:
rowCount = StatisticsUtil.getIcebergRowCount(this);
@ -514,47 +496,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public long getCacheRowCount() {
//Cached accurate information
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
if (tableStats != null) {
long rowCount = tableStats.rowCount;
if (LOG.isDebugEnabled()) {
LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount);
}
return rowCount;
public long fetchRowCount() {
makeSureInitialized();
// Get row count from hive metastore property.
long rowCount = getRowCountFromExternalSource();
// Only hive table supports estimate row count by listing file.
if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) {
LOG.debug("Will estimate row count from file list.");
rowCount = StatisticsUtil.getRowCountFromFileList(this);
}
//estimated information
if (estimatedRowCount != -1) {
return estimatedRowCount;
}
return -1;
}
@Override
public long estimatedRowCount() {
try {
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
if (tableStats != null) {
long rowCount = tableStats.rowCount;
if (LOG.isDebugEnabled()) {
LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount);
}
return rowCount;
}
if (estimatedRowCount != -1) {
return estimatedRowCount;
}
// Cache the estimated row count in this structure
// though the table never get analyzed, since the row estimation might be expensive caused by RPC.
estimatedRowCount = getRowCount();
return estimatedRowCount;
} catch (Exception e) {
LOG.warn("Fail to get row count for table {}", name, e);
}
return 1;
return rowCount;
}
private void initPartitionColumns(List<Column> schema) {
@ -766,7 +717,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
@Override
public void gsonPostProcess() throws IOException {
super.gsonPostProcess();
estimatedRowCount = -1;
}
@Override

View File

@ -18,13 +18,11 @@
package org.apache.doris.datasource.jdbc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.JdbcAnalysisTask;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.logging.log4j.LogManager;
@ -111,28 +109,4 @@ public class JdbcExternalTable extends ExternalTable {
makeSureInitialized();
return new JdbcAnalysisTask(info);
}
@Override
public long getRowCount() {
makeSureInitialized();
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
if (tableStats != null) {
long rowCount = tableStats.rowCount;
if (LOG.isDebugEnabled()) {
LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount);
}
return rowCount;
}
return 1;
}
@Override
public long getCacheRowCount() {
return getRowCount();
}
@Override
public long estimatedRowCount() {
return getRowCount();
}
}

View File

@ -649,7 +649,7 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
.map(s -> (SlotReference) s).collect(Collectors.toSet());
Map<Expression, ColumnStatistic> columnStatisticMap = new HashMap<>();
TableIf table = catalogRelation.getTable();
double rowCount = catalogRelation.getTable().estimatedRowCount();
double rowCount = catalogRelation.getTable().getRowCountForNereids();
boolean hasUnknownCol = false;
long idxId = -1;
if (catalogRelation instanceof OlapScan) {

View File

@ -920,11 +920,7 @@ public class ShowExecutor {
// Row_format
row.add(null);
// Rows
// Use estimatedRowCount(), not getRowCount().
// because estimatedRowCount() is an async call, it will not block, and it will call getRowCount()
// finally. So that for some table(especially external table),
// we can get the row count without blocking.
row.add(String.valueOf(table.estimatedRowCount()));
row.add(String.valueOf(table.getRowCount()));
// Avg_row_length
row.add(String.valueOf(table.getAvgRowLength()));
// Data_length
@ -2540,7 +2536,7 @@ public class ShowExecutor {
tableStats == null means it's not analyzed, in this case show the estimated row count.
*/
if (tableStats == null && tableIf instanceof HMSExternalTable) {
resultSet = showTableStatsStmt.constructResultSet(tableIf.estimatedRowCount());
resultSet = showTableStatsStmt.constructResultSet(tableIf.getRowCount());
} else {
resultSet = showTableStatsStmt.constructResultSet(tableStats);
}

View File

@ -608,7 +608,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
status.setUpdateTime(table.getUpdateTime() / 1000);
status.setCheckTime(lastCheckTime / 1000);
status.setCollation("utf-8");
status.setRows(table.getCacheRowCount());
status.setRows(table.getRowCount());
status.setDataLength(table.getDataLength());
status.setAvgRowLength(table.getAvgRowLength());
tablesResult.add(status);

View File

@ -550,8 +550,7 @@ public class AnalysisManager implements Writable {
@VisibleForTesting
public void updateTableStats(AnalysisInfo jobInfo) {
TableIf tbl = StatisticsUtil.findTable(jobInfo.catalogId,
jobInfo.dbId, jobInfo.tblId);
TableIf tbl = StatisticsUtil.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId);
// External Table only update table stats when all tasks finished.
// Because it needs to get the row count from the result of row count task.
if (tbl instanceof ExternalTable && !jobInfo.state.equals(AnalysisState.FINISHED)) {
@ -559,7 +558,7 @@ public class AnalysisManager implements Writable {
}
TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
if (tableStats == null) {
updateTableStatsStatus(new TableStatsMeta(jobInfo.emptyJob ? 0 : tbl.estimatedRowCount(), jobInfo, tbl));
updateTableStatsStatus(new TableStatsMeta(jobInfo.emptyJob ? 0 : tbl.getRowCount(), jobInfo, tbl));
} else {
tableStats.update(jobInfo, tbl);
logCreateTableStats(tableStats);

View File

@ -28,15 +28,15 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
public abstract class StatisticsCacheLoader<V> implements AsyncCacheLoader<StatisticsCacheKey, V> {
public abstract class BasicAsyncCacheLoader<K, V> implements AsyncCacheLoader<K, V> {
private static final Logger LOG = LogManager.getLogger(StatisticsCacheLoader.class);
private static final Logger LOG = LogManager.getLogger(BasicAsyncCacheLoader.class);
private final Map<StatisticsCacheKey, CompletableFutureWithCreateTime<V>> inProgressing = new HashMap<>();
private final Map<K, CompletableFutureWithCreateTime<V>> inProgressing = new HashMap<>();
@Override
public @NonNull CompletableFuture<V> asyncLoad(
@NonNull StatisticsCacheKey key,
@NonNull K key,
@NonNull Executor executor) {
CompletableFutureWithCreateTime<V> cfWrapper = inProgressing.get(key);
if (cfWrapper != null) {
@ -48,8 +48,7 @@ public abstract class StatisticsCacheLoader<V> implements AsyncCacheLoader<Stati
return doLoad(key);
} finally {
long endTime = System.currentTimeMillis();
LOG.info("Query BE for column stats:{}-{} end time:{} cost time:{}", key.tableId, key.colName,
endTime, endTime - startTime);
LOG.info("Load statistic cache [{}] cost time ms:{}", key, endTime - startTime);
removeFromIProgressing(key);
}
}, executor);
@ -58,7 +57,7 @@ public abstract class StatisticsCacheLoader<V> implements AsyncCacheLoader<Stati
return future;
}
protected abstract V doLoad(StatisticsCacheKey k);
protected abstract V doLoad(K k);
private static class CompletableFutureWithCreateTime<V> extends CompletableFuture<V> {
@ -76,13 +75,13 @@ public abstract class StatisticsCacheLoader<V> implements AsyncCacheLoader<Stati
}
}
private void putIntoIProgressing(StatisticsCacheKey k, CompletableFutureWithCreateTime<V> v) {
private void putIntoIProgressing(K k, CompletableFutureWithCreateTime<V> v) {
synchronized (inProgressing) {
inProgressing.put(k, v);
}
}
private void removeFromIProgressing(StatisticsCacheKey k) {
private void removeFromIProgressing(K k) {
synchronized (inProgressing) {
inProgressing.remove(k);
}

View File

@ -30,7 +30,7 @@ import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader<Optional<ColumnStatistic>> {
public class ColumnStatisticsCacheLoader extends BasicAsyncCacheLoader<StatisticsCacheKey, Optional<ColumnStatistic>> {
private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class);

View File

@ -110,7 +110,7 @@ public class HMSAnalysisTask extends ExternalAnalysisTask {
// Estimate the row count. This value is inaccurate if the table stats is empty.
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager()
.findTableStatsStatus(hmsExternalTable.getId());
long count = tableStatsStatus == null ? hmsExternalTable.estimatedRowCount() : tableStatsStatus.rowCount;
long count = tableStatsStatus == null ? hmsExternalTable.getRowCount() : tableStatsStatus.rowCount;
dataSize = dataSize * count / partitionNames.size();
numNulls = numNulls * count / partitionNames.size();
int ndv = ndvPartValues.size();
@ -131,7 +131,7 @@ public class HMSAnalysisTask extends ExternalAnalysisTask {
private void getHmsColumnStats() throws Exception {
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager()
.findTableStatsStatus(hmsExternalTable.getId());
long count = tableStatsStatus == null ? hmsExternalTable.estimatedRowCount() : tableStatsStatus.rowCount;
long count = tableStatsStatus == null ? hmsExternalTable.getRowCount() : tableStatsStatus.rowCount;
Map<String, String> params = buildStatsParams("NULL");
Map<StatsType, String> statsParams = new HashMap<>();

View File

@ -31,7 +31,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
public class HistogramCacheLoader extends StatisticsCacheLoader<Optional<Histogram>> {
public class HistogramCacheLoader extends BasicAsyncCacheLoader<StatisticsCacheKey, Optional<Histogram>> {
private static final Logger LOG = LogManager.getLogger(HistogramCacheLoader.class);

View File

@ -59,7 +59,7 @@ public class OlapScanStatsDerive extends BaseStatsDerive {
Map<Id, ColumnStatistic> columnStatisticMap = new HashMap<>();
Table table = scanNode.getOlapTable();
double rowCount = table.estimatedRowCount();
double rowCount = table.getRowCountForNereids();
for (Map.Entry<Id, String> entry : slotIdToTableIdAndColumnName.entrySet()) {
String colName = entry.getValue();
// TODO. Get index id for materialized view.

View File

@ -39,7 +39,6 @@ public class StatisticConstants {
public static final int ID_LEN = 4096;
public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2;
/**
* Bucket count fot column_statistics and analysis_job table.
*/

View File

@ -73,6 +73,7 @@ public class StatisticsCacheKey {
@Override
public String toString() {
StringJoiner sj = new StringJoiner(DELIMITER);
sj.add("ColumnStats");
sj.add(String.valueOf(tableId));
sj.add(String.valueOf(idxId));
sj.add(colName);

View File

@ -553,10 +553,9 @@ public class StatisticsUtil {
* First get it from remote table parameters. If not found, estimate it : totalSize/estimatedRowSize
*
* @param table Hive HMSExternalTable to estimate row count.
* @param isInit Flag to indicate if this is called during init. To avoid recursively get schema.
* @return estimated row count
*/
public static long getHiveRowCount(HMSExternalTable table, boolean isInit) {
public static long getHiveRowCount(HMSExternalTable table) {
Map<String, String> parameters = table.getRemoteTable().getParameters();
if (parameters == null) {
return -1;
@ -569,7 +568,7 @@ public class StatisticsUtil {
return rows;
}
}
if (!parameters.containsKey(TOTAL_SIZE) || isInit) {
if (!parameters.containsKey(TOTAL_SIZE)) {
return -1;
}
// Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize
@ -579,7 +578,7 @@ public class StatisticsUtil {
estimatedRowSize += column.getDataType().getSlotSize();
}
if (estimatedRowSize == 0) {
return 1;
return -1;
}
return totalSize / estimatedRowSize;
}
@ -657,7 +656,7 @@ public class StatisticsUtil {
estimatedRowSize += column.getDataType().getSlotSize();
}
if (estimatedRowSize == 0) {
return 1;
return 0;
}
if (samplePartitionSize < totalPartitionSize) {
totalSize = totalSize * totalPartitionSize / samplePartitionSize;