[feature](stats) Support full auto analyze (#21192)
1. Auto analyze all tables except for internal tables 2. make resource used by analyze configurable
This commit is contained in:
@ -1444,12 +1444,6 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int cbo_default_sample_percentage = 10;
|
||||
|
||||
/*
|
||||
* if true, will allow the system to collect statistics automatically
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static boolean enable_auto_collect_statistics = true;
|
||||
|
||||
/*
|
||||
* the system automatically checks the time interval for statistics
|
||||
*/
|
||||
@ -2025,4 +2019,20 @@ public class Config extends ConfigBase {
|
||||
"Hive行数估算分区采样数",
|
||||
"Sample size for hive row count estimation."})
|
||||
public static int hive_stats_partition_sample_size = 3000;
|
||||
|
||||
@ConfField
|
||||
public static boolean enable_full_auto_analyze = true;
|
||||
|
||||
@ConfField
|
||||
public static String full_auto_analyze_start_time = "00:00:00";
|
||||
|
||||
@ConfField
|
||||
public static String full_auto_analyze_end_time = "23:59:59";
|
||||
|
||||
@ConfField
|
||||
public static int statistics_sql_parallel_exec_instance_num = 1;
|
||||
|
||||
@ConfField
|
||||
public static long statistics_sql_mem_limit_in_bytes = 2L * 1024 * 1024 * 1024;
|
||||
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -41,6 +42,14 @@ public class AnalyzeProperties {
|
||||
public static final String PROPERTY_ANALYSIS_TYPE = "analysis.type";
|
||||
public static final String PROPERTY_PERIOD_SECONDS = "period.seconds";
|
||||
|
||||
public static final AnalyzeProperties DEFAULT_PROP = new AnalyzeProperties(new HashMap<String, String>() {
|
||||
{
|
||||
put(AnalyzeProperties.PROPERTY_SYNC, "false");
|
||||
put(AnalyzeProperties.PROPERTY_AUTOMATIC, "false");
|
||||
put(AnalyzeProperties.PROPERTY_ANALYSIS_TYPE, AnalysisType.FUNDAMENTALS.toString());
|
||||
}
|
||||
});
|
||||
|
||||
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
|
||||
.add(PROPERTY_SYNC)
|
||||
.add(PROPERTY_INCREMENTAL)
|
||||
|
||||
@ -253,8 +253,13 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
}
|
||||
|
||||
private void checkAnalyzePriv(String dbName, String tblName) throws AnalysisException {
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
// means it a system analyze
|
||||
if (ctx == null) {
|
||||
return;
|
||||
}
|
||||
if (!Env.getCurrentEnv().getAccessManager()
|
||||
.checkTblPriv(ConnectContext.get(), dbName, tblName, PrivPredicate.SELECT)) {
|
||||
.checkTblPriv(ctx, dbName, tblName, PrivPredicate.SELECT)) {
|
||||
ErrorReport.reportAnalysisException(
|
||||
ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
|
||||
"ANALYZE",
|
||||
|
||||
@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -161,4 +162,8 @@ public interface CatalogIf<T extends DatabaseIf> {
|
||||
log.setProps(getProperties());
|
||||
return log;
|
||||
}
|
||||
|
||||
// Return a copy of all db collection.
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public Collection<DatabaseIf> getAllDbs();
|
||||
}
|
||||
|
||||
@ -61,8 +61,10 @@ import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Function;
|
||||
@ -1086,5 +1088,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
public Map<Long, CatalogIf> getIdToCatalog() {
|
||||
return idToCatalog;
|
||||
}
|
||||
|
||||
public Set<CatalogIf> getCopyOfCatalog() {
|
||||
return new HashSet<>(idToCatalog.values());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.catalog.external.EsExternalDatabase;
|
||||
@ -55,6 +56,8 @@ import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
@ -547,4 +550,9 @@ public abstract class ExternalCatalog
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<DatabaseIf> getAllDbs() {
|
||||
return new HashSet<>(idToDb.values());
|
||||
}
|
||||
}
|
||||
|
||||
@ -2960,4 +2960,9 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
public ConcurrentHashMap<Long, Database> getIdToDb() {
|
||||
return new ConcurrentHashMap<>(idToDb);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<DatabaseIf> getAllDbs() {
|
||||
return new HashSet<>(idToDb.values());
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.analysis.AnalyzeDBStmt;
|
||||
import org.apache.doris.analysis.AnalyzeProperties;
|
||||
import org.apache.doris.analysis.AnalyzeStmt;
|
||||
import org.apache.doris.analysis.AnalyzeTblStmt;
|
||||
import org.apache.doris.analysis.DropAnalyzeJobStmt;
|
||||
@ -162,6 +163,14 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws DdlException {
|
||||
DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
|
||||
List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db, analyzeDBStmt.getAnalyzeProperties());
|
||||
if (!analyzeDBStmt.isSync()) {
|
||||
sendJobId(analysisInfos, proxy);
|
||||
}
|
||||
}
|
||||
|
||||
public List<AnalysisInfo> buildAnalysisInfosForDB(DatabaseIf<TableIf> db, AnalyzeProperties analyzeProperties)
|
||||
throws DdlException {
|
||||
List<TableIf> tbls = db.getTables();
|
||||
List<AnalysisInfo> analysisInfos = new ArrayList<>();
|
||||
db.readLock();
|
||||
@ -171,9 +180,9 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
if (table instanceof View) {
|
||||
continue;
|
||||
}
|
||||
TableName tableName = new TableName(analyzeDBStmt.getCtlIf().getName(), db.getFullName(),
|
||||
TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(),
|
||||
table.getName());
|
||||
AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeDBStmt.getAnalyzeProperties(), tableName,
|
||||
AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeProperties, tableName,
|
||||
table.getBaseSchema().stream().map(
|
||||
Column::getName).collect(
|
||||
Collectors.toList()), db.getId(), table);
|
||||
@ -187,13 +196,10 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
for (AnalyzeTblStmt analyzeTblStmt : analyzeStmts) {
|
||||
analysisInfos.add(buildAndAssignJob(analyzeTblStmt));
|
||||
}
|
||||
if (!analyzeDBStmt.isSync()) {
|
||||
sendJobId(analysisInfos, proxy);
|
||||
}
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
}
|
||||
|
||||
return analysisInfos;
|
||||
}
|
||||
|
||||
// Each analyze stmt corresponding to an analysis job.
|
||||
@ -245,7 +251,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
|
||||
// Analysis job created by the system
|
||||
public void createAnalysisJob(AnalysisInfo info) throws DdlException {
|
||||
public void createSystemAnalysisJob(AnalysisInfo info) throws DdlException {
|
||||
AnalysisInfo jobInfo = buildAnalysisJobInfo(info);
|
||||
if (jobInfo.colToPartitions.isEmpty()) {
|
||||
// No statistics need to be collected or updated
|
||||
|
||||
@ -17,14 +17,17 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.analysis.DdlStmt;
|
||||
import org.apache.doris.analysis.AnalyzeProperties;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -32,6 +35,9 @@ import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.LocalTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.time.format.DateTimeParseException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -56,14 +62,44 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
|
||||
if (!StatisticsUtil.statsTblAvailable()) {
|
||||
return;
|
||||
}
|
||||
if (Config.enable_auto_collect_statistics) {
|
||||
|
||||
if (!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!Config.enable_full_auto_analyze) {
|
||||
analyzePeriodically();
|
||||
analyzeAutomatically();
|
||||
} else {
|
||||
analyzeAll();
|
||||
}
|
||||
}
|
||||
|
||||
public void autoAnalyzeStats(DdlStmt ddlStmt) {
|
||||
// TODO Monitor some DDL statements, and then trigger automatic analysis tasks
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private void analyzeAll() {
|
||||
Set<CatalogIf> catalogs = Env.getCurrentEnv().getCatalogMgr().getCopyOfCatalog();
|
||||
for (CatalogIf ctl : catalogs) {
|
||||
try {
|
||||
Collection<DatabaseIf> dbs = ctl.getAllDbs();
|
||||
for (DatabaseIf<TableIf> databaseIf : dbs) {
|
||||
if (StatisticConstants.STATISTICS_DB_BLACK_LIST.contains(databaseIf.getFullName())) {
|
||||
continue;
|
||||
}
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
List<AnalysisInfo> analysisInfos = analysisManager.buildAnalysisInfosForDB(databaseIf,
|
||||
AnalyzeProperties.DEFAULT_PROP);
|
||||
for (AnalysisInfo analysisInfo : analysisInfos) {
|
||||
analysisInfo = getReAnalyzeRequiredPart(analysisInfo);
|
||||
if (analysisInfo == null) {
|
||||
continue;
|
||||
}
|
||||
analysisManager.createSystemAnalysisJob(analysisInfo);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to analyze all statistics.", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void analyzePeriodically() {
|
||||
@ -72,7 +108,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
|
||||
List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
|
||||
for (AnalysisInfo jobInfo : jobInfos) {
|
||||
jobInfo = new AnalysisInfoBuilder(jobInfo).setJobType(JobType.SYSTEM).build();
|
||||
analysisManager.createAnalysisJob(jobInfo);
|
||||
analysisManager.createSystemAnalysisJob(jobInfo);
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
LOG.warn("Failed to periodically analyze the statistics." + e);
|
||||
@ -85,12 +121,12 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
|
||||
for (AnalysisInfo jobInfo : jobInfos) {
|
||||
AnalysisInfo checkedJobInfo = null;
|
||||
try {
|
||||
checkedJobInfo = checkAutomaticJobInfo(jobInfo);
|
||||
checkedJobInfo = getReAnalyzeRequiredPart(jobInfo);
|
||||
if (checkedJobInfo != null) {
|
||||
analysisManager.createAnalysisJob(checkedJobInfo);
|
||||
analysisManager.createSystemAnalysisJob(checkedJobInfo);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to create analyze job: {}", checkedJobInfo);
|
||||
LOG.warn("Failed to create analyze job: {}", checkedJobInfo, t);
|
||||
}
|
||||
|
||||
}
|
||||
@ -116,7 +152,7 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
|
||||
* @return new job info after check
|
||||
* @throws Throwable failed to check
|
||||
*/
|
||||
private AnalysisInfo checkAutomaticJobInfo(AnalysisInfo jobInfo) throws Throwable {
|
||||
private AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) throws Throwable {
|
||||
long lastExecTimeInMs = jobInfo.lastExecTimeInMs;
|
||||
TableIf table = StatisticsUtil
|
||||
.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
|
||||
@ -212,4 +248,21 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
|
||||
return new AnalysisInfoBuilder(jobInfo)
|
||||
.setColToPartitions(newColToPartitions).build();
|
||||
}
|
||||
|
||||
private boolean checkAnalyzeTime(LocalTime now) {
|
||||
try {
|
||||
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
|
||||
LocalTime start = LocalTime.parse(Config.full_auto_analyze_start_time, timeFormatter);
|
||||
LocalTime end = LocalTime.parse(Config.full_auto_analyze_end_time, timeFormatter);
|
||||
|
||||
if (start.isAfter(end) && (now.isAfter(start) || now.isBefore(end))) {
|
||||
return true;
|
||||
} else {
|
||||
return now.isAfter(start) && now.isBefore(end);
|
||||
}
|
||||
} catch (DateTimeParseException e) {
|
||||
LOG.warn("Parse analyze start/end time format fail", e);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,10 +165,10 @@ public class StatisticsUtil {
|
||||
ConnectContext connectContext = new ConnectContext();
|
||||
SessionVariable sessionVariable = connectContext.getSessionVariable();
|
||||
sessionVariable.internalSession = true;
|
||||
sessionVariable.setMaxExecMemByte(StatisticConstants.STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES);
|
||||
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
|
||||
sessionVariable.setEnableInsertStrict(true);
|
||||
sessionVariable.parallelExecInstanceNum = StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
|
||||
sessionVariable.parallelPipelineTaskNum = StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
|
||||
sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num;
|
||||
sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num;
|
||||
sessionVariable.setEnableNereidsPlanner(false);
|
||||
sessionVariable.enableProfile = false;
|
||||
connectContext.setEnv(Env.getCurrentEnv());
|
||||
|
||||
Reference in New Issue
Block a user