[feat](stats) Support analyze with sample automatically (#23978)

1. Analyze with sample automatically when table size is greater than huge_table_lower_bound_size_in_bytes(5G by default). User can disable this feature by fe option enable_auto_sample
2. Support grammer like `ANALYZE TABLE test WITH FULL` to force do full analyze whatever table size is
3. Fix bugs that tables stats doesn't get updated properly when stats is dropped, or only few column is analyzed
This commit is contained in:
AKIRA
2023-09-13 20:42:10 +09:00
committed by GitHub
parent 05722b4cfd
commit 786a721e03
21 changed files with 383 additions and 155 deletions

View File

@ -6074,6 +6074,13 @@ with_analysis_properties ::=
put("period.cron", cron_expr);
}};
:}
| KW_FULL
{:
RESULT = new HashMap<String, String>() {{
put(AnalyzeProperties.PROPERTY_FORCE_FULL, "true");
}};
:}
;
opt_with_analysis_properties ::=

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
// TODO: Remove map
public class AnalyzeProperties {
public static final String PROPERTY_SYNC = "sync";
@ -42,6 +43,8 @@ public class AnalyzeProperties {
public static final String PROPERTY_ANALYSIS_TYPE = "analysis.type";
public static final String PROPERTY_PERIOD_SECONDS = "period.seconds";
public static final String PROPERTY_FORCE_FULL = "force.full";
public static final AnalyzeProperties DEFAULT_PROP = new AnalyzeProperties(new HashMap<String, String>() {
{
put(AnalyzeProperties.PROPERTY_SYNC, "false");
@ -67,6 +70,7 @@ public class AnalyzeProperties {
.add(PROPERTY_ANALYSIS_TYPE)
.add(PROPERTY_PERIOD_SECONDS)
.add(PROPERTY_PERIOD_CRON)
.add(PROPERTY_FORCE_FULL)
.build();
public AnalyzeProperties(Map<String, String> properties) {
@ -264,6 +268,10 @@ public class AnalyzeProperties {
|| properties.containsKey(PROPERTY_SAMPLE_ROWS);
}
public boolean forceFull() {
return properties.containsKey(PROPERTY_FORCE_FULL);
}
public String toSQL() {
StringBuilder sb = new StringBuilder();
sb.append("PROPERTIES(");

View File

@ -93,4 +93,8 @@ public class AnalyzeStmt extends StatementBase {
public CronExpression getCron() {
return analyzeProperties.getCron();
}
public boolean forceFull() {
return analyzeProperties.forceFull();
}
}

View File

@ -181,6 +181,9 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
throw new AnalysisException("Automatic collection "
+ "and period statistics collection cannot be set at same time");
}
if (analyzeProperties.isSample() && analyzeProperties.forceFull()) {
throw new AnalysisException("Impossible to analyze with sample and full simultaneously");
}
}
private void checkColumn() throws AnalysisException {

View File

@ -144,7 +144,7 @@ public class ShowTableStatsStmt extends ShowStmt {
row.add(tableStatistic.analysisMethod.toString());
row.add(tableStatistic.analysisType.toString());
row.add(new Date(tableStatistic.updatedTime).toString());
row.add(tableStatistic.columns);
row.add(tableStatistic.analyzeColumns().toString());
row.add(tableStatistic.jobType.toString());
result.add(row);
return new ShowResultSet(getMetaData(), result);

View File

@ -1132,24 +1132,39 @@ public class OlapTable extends Table {
if (rowCount == 0) {
return false;
}
long updateRows = tblStats.updatedRows.get();
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.map(Column::getName)
.collect(Collectors.toSet()))) {
return true;
}
// long updateRows = tblStats.updatedRows.get();
long updateRows = Math.abs(tblStats.rowCount - rowCount);
int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows);
return tblHealth < Config.table_stats_health_threshold;
}
@Override
public Set<String> findReAnalyzeNeededPartitions() {
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId());
if (tableStats == null) {
return getPartitionNames().stream().map(this::getPartition)
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
TableIf table = this;
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
Set<String> allPartitions = table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
if (tableStats == null) {
return table.getBaseSchema().stream().collect(Collectors.toMap(Column::getName, v -> allPartitions));
}
return getPartitionNames().stream()
.map(this::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
.collect(Collectors.toSet());
Map<String, Set<String>> colToPart = new HashMap<>();
for (Column col : table.getBaseSchema()) {
long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName());
Set<String> partitions = table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName)
.collect(Collectors.toSet());
colToPart.put(col.getName(), partitions);
}
return colToPart;
}
@Override

View File

@ -580,7 +580,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
}
@Override
public Set<String> findReAnalyzeNeededPartitions() {
return Collections.emptySet();
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
return Collections.emptyMap();
}
}

View File

@ -34,6 +34,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -139,7 +140,7 @@ public interface TableIf {
boolean needReAnalyzeTable(TableStats tblStats);
Set<String> findReAnalyzeNeededPartitions();
Map<String, Set<String>> findReAnalyzeNeededPartitions();
void write(DataOutput out) throws IOException;
@ -244,5 +245,10 @@ public interface TableIf {
default long getLastUpdateTime() {
return -1L;
}
default long getDataSize() {
// TODO: Each tableIf should impl it by itself.
return 0;
}
}

View File

@ -50,10 +50,12 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* External table represent tables that are not self-managed by Doris.
@ -388,10 +390,10 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
}
@Override
public Set<String> findReAnalyzeNeededPartitions() {
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
HashSet<String> partitions = Sets.newHashSet();
// TODO: Find a way to collect external table partitions that need to be analyzed.
partitions.add("Dummy Partition");
return partitions;
return getBaseSchema().stream().collect(Collectors.toMap(Column::getName, k -> partitions));
}
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.quartz.CronExpression;
@ -167,13 +168,16 @@ public class AnalysisInfo implements Writable {
public CronExpression cronExpression;
@SerializedName("forceFull")
public final boolean forceFull;
public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, String catalogName, String dbName, String tblName,
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
CronExpression cronExpression) {
CronExpression cronExpression, boolean forceFull) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
@ -204,6 +208,7 @@ public class AnalysisInfo implements Writable {
if (cronExpression != null) {
this.cronExprStr = cronExpression.getCronExpression();
}
this.forceFull = forceFull;
}
@Override
@ -214,11 +219,11 @@ public class AnalysisInfo implements Writable {
sj.add("DBName: " + dbName);
sj.add("TableName: " + tblName);
sj.add("ColumnName: " + colName);
sj.add("TaskType: " + analysisType.toString());
sj.add("TaskMode: " + analysisMode.toString());
sj.add("TaskMethod: " + analysisMethod.toString());
sj.add("TaskType: " + analysisType);
sj.add("TaskMode: " + analysisMode);
sj.add("TaskMethod: " + analysisMethod);
sj.add("Message: " + message);
sj.add("CurrentState: " + state.toString());
sj.add("CurrentState: " + state);
if (samplePercent > 0) {
sj.add("SamplePercent: " + samplePercent);
}
@ -240,6 +245,10 @@ public class AnalysisInfo implements Writable {
if (periodTimeInMs > 0) {
sj.add("periodTimeInMs: " + StatisticsUtil.getReadableTime(periodTimeInMs));
}
if (StringUtils.isNotEmpty(cronExprStr)) {
sj.add("cronExpr: " + cronExprStr);
}
sj.add("forceFull: " + forceFull);
return sj.toString();
}

View File

@ -59,6 +59,8 @@ public class AnalysisInfoBuilder {
private CronExpression cronExpression;
private boolean forceFull;
public AnalysisInfoBuilder() {
}
@ -90,6 +92,7 @@ public class AnalysisInfoBuilder {
partitionOnly = info.partitionOnly;
samplingPartition = info.samplingPartition;
cronExpression = info.cronExpression;
forceFull = info.forceFull;
}
public AnalysisInfoBuilder setJobId(long jobId) {
@ -226,37 +229,14 @@ public class AnalysisInfoBuilder {
this.cronExpression = cronExpression;
}
public void setForceFull(boolean forceFull) {
this.forceFull = forceFull;
}
public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogName, dbName, tblName, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, cronExpression);
}
public AnalysisInfoBuilder copy() {
return new AnalysisInfoBuilder()
.setJobId(jobId)
.setTaskId(taskId)
.setTaskIds(taskIds)
.setCatalogName(catalogName)
.setDbName(dbName)
.setTblName(tblName)
.setColToPartitions(colToPartitions)
.setColName(colName)
.setIndexId(indexId)
.setJobType(jobType)
.setAnalysisMode(analysisMode)
.setAnalysisMethod(analysisMethod)
.setAnalysisType(analysisType)
.setSamplePercent(samplePercent)
.setSampleRows(sampleRows)
.setPeriodTimeInMs(periodTimeInMs)
.setMaxBucketNum(maxBucketNum)
.setMessage(message)
.setLastExecTimeInMs(lastExecTimeInMs)
.setTimeCostInMs(timeCostInMs)
.setState(state)
.setScheduleType(scheduleType)
.setExternalTableLevelTask(externalTableLevelTask);
externalTableLevelTask, partitionOnly, samplingPartition, cronExpression, forceFull);
}
}

View File

@ -29,7 +29,6 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
@ -113,7 +112,7 @@ public class AnalysisManager extends Daemon implements Writable {
// Tracking and control sync analyze tasks, keep in mem only
private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();
private final Map<Long, TableStats> idToTblStatsStatus = new ConcurrentHashMap<>();
private final Map<Long, TableStats> idToTblStats = new ConcurrentHashMap<>();
private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> {
AnalysisInfo info = w.info;
@ -175,7 +174,6 @@ public class AnalysisManager extends Daemon implements Writable {
return null;
};
private final Function<TaskStatusWrapper, Void> systemJobStatusUpdater = w -> {
AnalysisInfo info = w.info;
info.state = w.taskState;
@ -407,8 +405,7 @@ public class AnalysisManager extends Daemon implements Writable {
* TODO Supports incremental collection of statistics from materialized views
*/
private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames,
Set<String> partitionNames, AnalysisType analysisType,
AnalysisMode analysisMode) throws DdlException {
Set<String> partitionNames, AnalysisType analysisType) throws DdlException {
long tableId = table.getId();
Map<String, Set<String>> columnToPartitions = columnNames.stream()
@ -452,8 +449,7 @@ public class AnalysisManager extends Daemon implements Writable {
}
if (analysisType == AnalysisType.FUNDAMENTALS) {
Set<String> reAnalyzeNeededPartitions = findReAnalyzeNeededPartitions(table);
columnToPartitions.replaceAll((k, v) -> reAnalyzeNeededPartitions);
return table.findReAnalyzeNeededPartitions();
}
return columnToPartitions;
@ -502,7 +498,7 @@ public class AnalysisManager extends Daemon implements Writable {
infoBuilder.setScheduleType(scheduleType);
infoBuilder.setLastExecTimeInMs(0);
infoBuilder.setCronExpression(cronExpression);
infoBuilder.setForceFull(stmt.forceFull());
if (analysisMethod == AnalysisMethod.SAMPLE) {
infoBuilder.setSamplePercent(samplePercent);
infoBuilder.setSampleRows(sampleRows);
@ -519,7 +515,7 @@ public class AnalysisManager extends Daemon implements Writable {
infoBuilder.setPeriodTimeInMs(periodTimeInMs);
Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table, columnNames,
partitionNames, analysisType, analysisMode);
partitionNames, analysisType);
infoBuilder.setColToPartitions(colToPartitions);
infoBuilder.setTaskIds(Lists.newArrayList());
@ -685,17 +681,24 @@ public class AnalysisManager extends Daemon implements Writable {
Env.getCurrentEnv().getStatisticsCleaner().clear();
return;
}
Set<String> cols = dropStatsStmt.getColumnNames();
long tblId = dropStatsStmt.getTblId();
TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats != null) {
tableStats.updatedTime = 0;
replayUpdateTableStatsStatus(tableStats);
if (tableStats == null) {
return;
}
if (cols == null) {
tableStats.reset();
} else {
dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn);
for (String col : cols) {
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col);
}
}
logCreateTableStats(tableStats);
StatisticsRepository.dropStatistics(tblId, cols);
for (String col : cols) {
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col);
}
}
public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) throws DdlException {
@ -875,7 +878,7 @@ public class AnalysisManager extends Daemon implements Writable {
AnalysisManager analysisManager = new AnalysisManager();
readAnalysisInfo(in, analysisManager.analysisJobInfoMap, true);
readAnalysisInfo(in, analysisManager.analysisTaskInfoMap, false);
readIdToTblStats(in, analysisManager.idToTblStatsStatus);
readIdToTblStats(in, analysisManager.idToTblStats);
return analysisManager;
}
@ -910,8 +913,8 @@ public class AnalysisManager extends Daemon implements Writable {
}
private void writeTableStats(DataOutput out) throws IOException {
out.writeInt(idToTblStatsStatus.size());
for (Entry<Long, TableStats> entry : idToTblStatsStatus.entrySet()) {
out.writeInt(idToTblStats.size());
for (Entry<Long, TableStats> entry : idToTblStats.entrySet()) {
entry.getValue().write(out);
}
}
@ -922,12 +925,12 @@ public class AnalysisManager extends Daemon implements Writable {
}
public TableStats findTableStatsStatus(long tblId) {
return idToTblStatsStatus.get(tblId);
return idToTblStats.get(tblId);
}
// Invoke this when load transaction finished.
public void updateUpdatedRows(long tblId, long rows) {
TableStats statsStatus = idToTblStatsStatus.get(tblId);
TableStats statsStatus = idToTblStats.get(tblId);
if (statsStatus != null) {
statsStatus.updatedRows.addAndGet(rows);
}
@ -939,7 +942,7 @@ public class AnalysisManager extends Daemon implements Writable {
}
public void replayUpdateTableStatsStatus(TableStats tableStats) {
idToTblStatsStatus.put(tableStats.tblId, tableStats);
idToTblStats.put(tableStats.tblId, tableStats);
}
public void logCreateTableStats(TableStats tableStats) {
@ -951,20 +954,4 @@ public class AnalysisManager extends Daemon implements Writable {
systemJobInfoMap.put(jobInfo.jobId, jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
}
@VisibleForTesting
protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
TableStats tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
}
return table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
.collect(Collectors.toSet());
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.QueryState;
@ -215,14 +216,21 @@ public abstract class BaseAnalysisTask {
}
protected String getSampleExpression() {
if (info.analysisMethod == AnalysisMethod.FULL) {
if (info.forceFull) {
return "";
}
// TODO Add sampling methods for external tables
int sampleRows = info.sampleRows;
if (info.analysisMethod == AnalysisMethod.FULL) {
if (Config.enable_auto_sample && tbl.getDataSize() > Config.huge_table_lower_bound_size_in_bytes) {
sampleRows = Config.huge_table_default_sample_rows;
} else {
return "";
}
}
if (info.samplePercent > 0) {
return String.format("TABLESAMPLE(%d PERCENT)", info.samplePercent);
} else {
return String.format("TABLESAMPLE(%d ROWS)", info.sampleRows);
return String.format("TABLESAMPLE(%d ROWS)", sampleRows);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
@ -64,11 +65,15 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
+ "MIN(`${colName}`) AS min, "
+ "MAX(`${colName}`) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW() FROM `${dbName}`.`${tblName}` PARTITION ${partitionName}";
+ "NOW() FROM `${dbName}`.`${tblName}` PARTITION ${partitionName} ${sampleExpr}";
// cache stats for each partition, it would be inserted into column_statistics in a batch.
private final List<List<ColStatsData>> buf = new ArrayList<>();
@VisibleForTesting
public OlapAnalysisTask() {
}
public OlapAnalysisTask(AnalysisInfo info) {
super(info);
}
@ -116,7 +121,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
public void execSQLs(List<String> partitionAnalysisSQLs, Map<String, String> params) throws Exception {
long startTime = System.currentTimeMillis();
LOG.debug("analyze task {} start at {}", info.toString(), new Date());
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) {
List<List<String>> sqlGroups = Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT);
for (List<String> group : sqlGroups) {
if (killed) {

View File

@ -23,18 +23,19 @@ import org.apache.doris.analysis.VariableExpr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
@ -57,7 +58,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
public StatisticsAutoCollector() {
super("Automatic Analyzer",
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes),
new AnalysisTaskExecutor(Config.full_auto_analyze_simultaneously_running_task_num));
}
@ -103,36 +104,57 @@ public class StatisticsAutoCollector extends StatisticsCollector {
}
}
public List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends TableIf> db) {
protected List<AnalysisInfo> constructAnalysisInfo(DatabaseIf<? extends TableIf> db) {
List<AnalysisInfo> analysisInfos = new ArrayList<>();
for (TableIf table : db.getTables()) {
if (table instanceof View) {
if (skip(table)) {
continue;
}
TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(),
table.getName());
AnalysisInfo jobInfo = new AnalysisInfoBuilder()
.setJobId(Env.getCurrentEnv().getNextId())
.setCatalogName(db.getCatalog().getName())
.setDbName(db.getFullName())
.setTblName(tableName.getTbl())
.setColName(
table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(
Column::getName).collect(Collectors.joining(","))
)
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL)
.setScheduleType(AnalysisInfo.ScheduleType.ONCE)
.setState(AnalysisState.PENDING)
.setTaskIds(new ArrayList<>())
.setJobType(JobType.SYSTEM).build();
analysisInfos.add(jobInfo);
createAnalyzeJobForTbl(db, analysisInfos, table);
}
return analysisInfos;
}
// return true if skip auto analyze this time.
protected boolean skip(TableIf table) {
if (!(table instanceof OlapTable || table instanceof ExternalTable)) {
return true;
}
if (table.getDataSize() < Config.huge_table_lower_bound_size_in_bytes) {
return false;
}
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
return System.currentTimeMillis() - tableStats.updatedTime < Config.huge_table_auto_analyze_interval_in_millis;
}
protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
List<AnalysisInfo> analysisInfos, TableIf table) {
AnalysisMethod analysisMethod = table.getDataSize() > Config.huge_table_lower_bound_size_in_bytes
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(),
table.getName());
AnalysisInfo jobInfo = new AnalysisInfoBuilder()
.setJobId(Env.getCurrentEnv().getNextId())
.setCatalogName(db.getCatalog().getName())
.setDbName(db.getFullName())
.setTblName(tableName.getTbl())
.setColName(
table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(
Column::getName).collect(Collectors.joining(","))
)
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(analysisMethod)
.setSamplePercent(Config.huge_table_default_sample_rows)
.setScheduleType(AnalysisInfo.ScheduleType.ONCE)
.setState(AnalysisState.PENDING)
.setTaskIds(new ArrayList<>())
.setLastExecTimeInMs(System.currentTimeMillis())
.setJobType(JobType.SYSTEM).build();
analysisInfos.add(jobInfo);
}
@VisibleForTesting
protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
TableIf table = StatisticsUtil
@ -144,35 +166,15 @@ public class StatisticsAutoCollector extends StatisticsCollector {
return null;
}
Set<String> needRunPartitions = table.findReAnalyzeNeededPartitions();
Map<String, Set<String>> needRunPartitions = table.findReAnalyzeNeededPartitions();
if (needRunPartitions.isEmpty()) {
return null;
}
return getAnalysisJobInfo(jobInfo, table, needRunPartitions);
return new AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build();
}
@VisibleForTesting
protected AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
Set<String> needRunPartitions) {
Map<String, Set<String>> newColToPartitions = Maps.newHashMap();
Map<String, Set<String>> colToPartitions = jobInfo.colToPartitions;
if (colToPartitions == null) {
for (Column c : table.getColumns()) {
newColToPartitions.put(c.getName(), needRunPartitions);
}
} else {
colToPartitions.keySet().forEach(colName -> {
Column column = table.getColumn(colName);
if (column != null) {
newColToPartitions.put(colName, needRunPartitions);
}
});
}
return new AnalysisInfoBuilder(jobInfo)
.setColToPartitions(newColToPartitions).build();
}
private boolean checkAnalyzeTime(LocalTime now) {
try {
@ -215,7 +217,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
}
private SessionVariable findRangeFromGlobalSessionVar(String varName) throws Exception {
SessionVariable sessionVariable = VariableMgr.newSessionVariable();
SessionVariable sessionVariable = VariableMgr.newSessionVariable();
VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL);
VariableMgr.getValue(sessionVariable, variableExpr);
return sessionVariable;

View File

@ -29,6 +29,9 @@ import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
public class TableStats implements Writable {
@ -57,8 +60,8 @@ public class TableStats implements Writable {
@SerializedName("updateTime")
public long updatedTime;
@SerializedName("columns")
public String columns;
@SerializedName("colLastUpdatedTime")
private ConcurrentMap<String, Long> colLastUpdatedTime = new ConcurrentHashMap<>();
@SerializedName("trigger")
public JobType jobType;
@ -72,7 +75,14 @@ public class TableStats implements Writable {
analysisMethod = analyzedJob.analysisMethod;
analysisType = analyzedJob.analysisType;
updatedTime = System.currentTimeMillis();
columns = analyzedJob.colName;
String cols = analyzedJob.colName;
// colName field AnalyzeJob's format likes: "[col1, col2]", we need to remove brackets here
if (analyzedJob.colName.startsWith("[") && analyzedJob.colName.endsWith("]")) {
cols = cols.substring(1, cols.length() - 1);
}
for (String col : cols.split(",")) {
colLastUpdatedTime.put(col, updatedTime);
}
jobType = analyzedJob.jobType;
}
@ -84,6 +94,28 @@ public class TableStats implements Writable {
public static TableStats read(DataInput dataInput) throws IOException {
String json = Text.readString(dataInput);
return GsonUtils.GSON.fromJson(json, TableStats.class);
TableStats tableStats = GsonUtils.GSON.fromJson(json, TableStats.class);
// Might be null counterintuitively, for compatible
if (tableStats.colLastUpdatedTime == null) {
tableStats.colLastUpdatedTime = new ConcurrentHashMap<>();
}
return tableStats;
}
public long findColumnLastUpdateTime(String colName) {
return colLastUpdatedTime.getOrDefault(colName, 0L);
}
public void removeColumn(String colName) {
colLastUpdatedTime.remove(colName);
}
public Set<String> analyzeColumns() {
return colLastUpdatedTime.keySet();
}
public void reset() {
updatedTime = 0;
colLastUpdatedTime.replaceAll((k, v) -> 0L);
}
}

View File

@ -158,6 +158,10 @@ public class StatisticsUtil {
}
public static AutoCloseConnectContext buildConnectContext() {
return buildConnectContext(false);
}
public static AutoCloseConnectContext buildConnectContext(boolean limitScan) {
ConnectContext connectContext = new ConnectContext();
SessionVariable sessionVariable = connectContext.getSessionVariable();
sessionVariable.internalSession = true;
@ -168,6 +172,7 @@ public class StatisticsUtil {
sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num;
sessionVariable.setEnableNereidsPlanner(false);
sessionVariable.enableProfile = false;
sessionVariable.enableScanRunSerial = limitScan;
sessionVariable.queryTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60;
sessionVariable.insertTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60;
sessionVariable.enableFileCache = false;

View File

@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class OlapAnalysisTaskTest {
@Test
public void testAutoSample(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked TableIf tableIf) {
new Expectations() {
{
tableIf.getDataSize();
result = 60_0000_0000L;
}
};
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder()
.setAnalysisMethod(AnalysisMethod.FULL);
OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
olapAnalysisTask.info = analysisInfoBuilder.build();
olapAnalysisTask.tbl = tableIf;
Config.enable_auto_sample = true;
String sampleExpr = olapAnalysisTask.getSampleExpression();
Assertions.assertEquals("TABLESAMPLE(200000 ROWS)", sampleExpr);
new Expectations() {
{
tableIf.getDataSize();
result = 1_0000_0000L;
}
};
sampleExpr = olapAnalysisTask.getSampleExpression();
Assertions.assertEquals("", sampleExpr);
analysisInfoBuilder.setSampleRows(10);
analysisInfoBuilder.setAnalysisMethod(AnalysisMethod.SAMPLE);
olapAnalysisTask.info = analysisInfoBuilder.build();
sampleExpr = olapAnalysisTask.getSampleExpression();
Assertions.assertEquals("TABLESAMPLE(10 ROWS)", sampleExpr);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.View;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
@ -39,14 +40,17 @@ import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.util.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class StatisticsAutoCollectorTest {
@ -145,17 +149,24 @@ public class StatisticsAutoCollectorTest {
new MockUp<OlapTable>() {
@Mock
protected Set<String> findReAnalyzeNeededPartitions() {
protected Map<String, Set<String>> findReAnalyzeNeededPartitions() {
Set<String> partitionNames = new HashSet<>();
partitionNames.add("p1");
partitionNames.add("p2");
return partitionNames;
Map<String, Set<String>> map = new HashMap<>();
map.put("col1", partitionNames);
return map;
}
@Mock
public long getRowCount() {
return 100;
}
@Mock
public List<Column> getBaseSchema() {
return Lists.newArrayList(new Column("col1", Type.INT), new Column("col2", Type.INT));
}
};
new MockUp<StatisticsUtil>() {
@ -198,7 +209,8 @@ public class StatisticsAutoCollectorTest {
.setDbName("db")
.setTblName("tbl").build();
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
// uncomment it when updatedRows get ready
// Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
}
}