[Fix](statistics)Fix bug and improve auto analyze. (#27626)

1. Implement needReAnalyzeTable for ExternalTable. For now, external table will not be reanalyzed in 10 days.
2. For HiveMetastoreCache.loadPartitions, handle the empty iterator case to avoid Index out of boundary exception.
3. Wrap handle show analyze loop with try catch, so that when one table failed (for example, catalog dropped so the table couldn't be found anymore), we can still show the other tables.
4. For now, only OlapTable and Hive HMSExternalTable support sample analyze, throw exception for other types of table.
5. In StatisticsCollector, call constructJob after createTableLevelTaskForExternalTable to avoid NPE.
This commit is contained in:
Jibing-Li
2023-11-27 22:13:48 +08:00
committed by GitHub
parent 7ac97c1650
commit 2076d2b390
9 changed files with 118 additions and 44 deletions

View File

@ -392,9 +392,18 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
@Override
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
// TODO: Find a way to decide if this external table need to be reanalyzed.
// For now, simply return true for all external tables.
return true;
if (tblStats == null) {
return true;
}
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName)
.collect(Collectors.toSet()))) {
return true;
}
return System.currentTimeMillis()
- tblStats.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
}
@Override

View File

@ -320,6 +320,10 @@ public class HiveMetaStoreCache {
}
private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends PartitionCacheKey> keys) {
Map<PartitionCacheKey, HivePartition> ret = new HashMap<>();
if (keys == null || !keys.iterator().hasNext()) {
return ret;
}
PartitionCacheKey oneKey = Iterables.get(keys, 0);
String dbName = oneKey.getDbName();
String tblName = oneKey.getTblName();
@ -341,7 +345,6 @@ public class HiveMetaStoreCache {
}).collect(Collectors.toList());
List<Partition> partitions = catalog.getClient().getPartitions(dbName, tblName, partitionNames);
// Compose the return result map.
Map<PartitionCacheKey, HivePartition> ret = new HashMap<>();
for (Partition partition : partitions) {
StorageDescriptor sd = partition.getSd();
ret.put(new PartitionCacheKey(dbName, tblName, partition.getValues()),

View File

@ -448,6 +448,9 @@ public class SessionVariable implements Serializable, Writable {
public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
= "huge_table_auto_analyze_interval_in_millis";
public static final String EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
= "external_table_auto_analyze_interval_in_millis";
public static final String TABLE_STATS_HEALTH_THRESHOLD
= "table_stats_health_threshold";
@ -1366,6 +1369,12 @@ public class SessionVariable implements Serializable, Writable {
+ "tables larger than huge_table_lower_bound_size_in_bytes are analyzed only once."})
public long hugeTableAutoAnalyzeIntervalInMillis = TimeUnit.HOURS.toMillis(12);
@VariableMgr.VarAttr(name = EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, flag = VariableMgr.GLOBAL,
description = {"控制对外表的自动ANALYZE的最小时间间隔,在该时间间隔内的外表仅ANALYZE一次",
"This controls the minimum time interval for automatic ANALYZE on external tables."
+ "Within this interval, external tables are analyzed only once."})
public long externalTableAutoAnalyzeIntervalInMillis = TimeUnit.HOURS.toMillis(24);
@VariableMgr.VarAttr(name = TABLE_STATS_HEALTH_THRESHOLD, flag = VariableMgr.GLOBAL,
description = {"取值在0-100之间,当自上次统计信息收集操作之后"
+ "数据更新量达到 (100 - table_stats_health_threshold)% ,认为该表的统计信息已过时",

View File

@ -2622,44 +2622,51 @@ public class ShowExecutor {
List<List<String>> resultRows = Lists.newArrayList();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
for (AnalysisInfo analysisInfo : results) {
List<String> row = new ArrayList<>();
row.add(String.valueOf(analysisInfo.jobId));
CatalogIf<? extends DatabaseIf<? extends TableIf>> c = StatisticsUtil.findCatalog(analysisInfo.catalogId);
row.add(c.getName());
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(analysisInfo.dbId);
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
if (databaseIf.isPresent()) {
Optional<? extends TableIf> table = databaseIf.get().getTable(analysisInfo.tblId);
row.add(table.isPresent() ? table.get().getName() : "Table may get deleted");
} else {
row.add("DB may get deleted");
}
row.add(analysisInfo.colName);
row.add(analysisInfo.jobType.toString());
row.add(analysisInfo.analysisType.toString());
row.add(analysisInfo.message);
row.add(TimeUtils.DATETIME_FORMAT.format(
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.lastExecTimeInMs),
ZoneId.systemDefault())));
row.add(analysisInfo.state.toString());
try {
row.add(showStmt.isAuto()
? analysisInfo.progress
: Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
List<String> row = new ArrayList<>();
row.add(String.valueOf(analysisInfo.jobId));
CatalogIf<? extends DatabaseIf<? extends TableIf>> c
= StatisticsUtil.findCatalog(analysisInfo.catalogId);
row.add(c.getName());
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(analysisInfo.dbId);
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
if (databaseIf.isPresent()) {
Optional<? extends TableIf> table = databaseIf.get().getTable(analysisInfo.tblId);
row.add(table.isPresent() ? table.get().getName() : "Table may get deleted");
} else {
row.add("DB may get deleted");
}
row.add(analysisInfo.colName);
row.add(analysisInfo.jobType.toString());
row.add(analysisInfo.analysisType.toString());
row.add(analysisInfo.message);
row.add(TimeUtils.DATETIME_FORMAT.format(
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.lastExecTimeInMs),
ZoneId.systemDefault())));
row.add(analysisInfo.state.toString());
try {
row.add(showStmt.isAuto()
? analysisInfo.progress
: Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
} catch (Exception e) {
row.add("N/A");
LOG.warn("Failed to get progress for job: {}", analysisInfo, e);
}
row.add(analysisInfo.scheduleType.toString());
LocalDateTime startTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.startTime),
java.time.ZoneId.systemDefault());
LocalDateTime endTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.endTime),
java.time.ZoneId.systemDefault());
row.add(startTime.format(formatter));
row.add(endTime.format(formatter));
resultRows.add(row);
} catch (Exception e) {
row.add("N/A");
LOG.warn("Failed to get progress for job: {}", analysisInfo, e);
LOG.warn("Failed to get analyze info for table {}.{}.{}, reason: {}",
analysisInfo.catalogId, analysisInfo.dbId, analysisInfo.tblId, e.getMessage());
continue;
}
row.add(analysisInfo.scheduleType.toString());
LocalDateTime startTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.startTime),
java.time.ZoneId.systemDefault());
LocalDateTime endTime =
LocalDateTime.ofInstant(Instant.ofEpochMilli(analysisInfo.endTime),
java.time.ZoneId.systemDefault());
row.add(startTime.format(formatter));
row.add(endTime.format(formatter));
resultRows.add(row);
}
resultSet = new ShowResultSet(showStmt.getMetaData(), resultRows);
}

View File

@ -29,6 +29,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
@ -336,6 +337,12 @@ public class AnalysisManager implements Writable {
// No statistics need to be collected or updated
return null;
}
// Only OlapTable and Hive HMSExternalTable support sample analyze.
if ((stmt.getSamplePercent() > 0 || stmt.getSampleRows() > 0) && !canSample(stmt.getTable())) {
String message = String.format("Table %s doesn't support sample analyze.", stmt.getTable().getName());
LOG.info(message);
throw new DdlException(message);
}
boolean isSync = stmt.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
@ -1085,4 +1092,20 @@ public class AnalysisManager implements Writable {
public boolean hasUnFinished() {
return !analysisJobIdToTaskMap.isEmpty();
}
/**
* Only OlapTable and Hive HMSExternalTable can sample for now.
* @param table
* @return Return true if the given table can do sample analyze. False otherwise.
*/
public boolean canSample(TableIf table) {
if (table instanceof OlapTable) {
return true;
}
if (table instanceof HMSExternalTable
&& ((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
return true;
}
return false;
}
}

View File

@ -90,6 +90,8 @@ public class StatisticConstants {
public static final long HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = TimeUnit.HOURS.toMillis(12);
public static final long EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = TimeUnit.HOURS.toMillis(24);
public static final int TABLE_STATS_HEALTH_THRESHOLD = 60;
public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;

View File

@ -22,7 +22,7 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
@ -107,17 +107,28 @@ public class StatisticsAutoCollector extends StatisticsCollector {
protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends TableIf> db) {
List<AnalysisInfo> analysisInfos = new ArrayList<>();
for (TableIf table : db.getTables()) {
if (skip(table)) {
try {
if (skip(table)) {
continue;
}
createAnalyzeJobForTbl(db, analysisInfos, table);
} catch (Throwable t) {
LOG.warn("Failed to analyze table {}.{}.{}",
db.getCatalog().getName(), db.getFullName(), table.getName(), t);
continue;
}
createAnalyzeJobForTbl(db, analysisInfos, table);
}
return analysisInfos;
}
// return true if skip auto analyze this time.
protected boolean skip(TableIf table) {
if (!(table instanceof OlapTable || table instanceof ExternalTable)) {
if (!(table instanceof OlapTable || table instanceof HMSExternalTable)) {
return true;
}
// For now, only support Hive HMS table auto collection.
if (table instanceof HMSExternalTable
&& !((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
return true;
}
if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {

View File

@ -78,10 +78,10 @@ public abstract class StatisticsCollector extends MasterDaemon {
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values());
if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false);
}
Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values());
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks);
analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
}

View File

@ -906,6 +906,16 @@ public class StatisticsUtil {
return StatisticConstants.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
}
public static long getExternalTableAutoAnalyzeIntervalInMillis() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS)
.externalTableAutoAnalyzeIntervalInMillis;
} catch (Exception e) {
LOG.warn("Failed to get value of externalTableAutoAnalyzeIntervalInMillis, return default", e);
}
return StatisticConstants.EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
}
public static long getTableStatsHealthThreshold() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.TABLE_STATS_HEALTH_THRESHOLD)