[opt](stats) Support display of auto analyze jobs (#24135)

### Support dispaly of auto analyze jobs

After this PR, users and DBA could use such grammar to check the execution status of auto analyze jobs:

```sql

SHOW AUTO ANALYZE [tbl_name] [WHERE STATE='SOME STATE']
```

Record count of history auto analyze job could be configured by setting FE option: auto_analyze_job_record_count, default value is 2000

### Enhance auto analyze

After this PR, auto jobs those created automatically will no longer execute beyond a specific time frame.
This commit is contained in:
AKIRA
2023-09-14 18:10:04 +09:00
committed by GitHub
parent 4fbb25bc55
commit 0be0b8ff58
19 changed files with 406 additions and 143 deletions

View File

@ -4237,13 +4237,17 @@ show_param ::=
RESULT = new ShowCreateMaterializedViewStmt(mvName, tableName);
:}
/* show analyze job */
| KW_ANALYZE opt_table_name:tbl opt_wild_where order_by_clause:orderByClause limit_clause:limitClause
| KW_ANALYZE opt_table_name:tbl opt_wild_where
{:
RESULT = new ShowAnalyzeStmt(tbl, parser.where, orderByClause, limitClause);
RESULT = new ShowAnalyzeStmt(tbl, parser.where, false);
:}
| KW_ANALYZE INTEGER_LITERAL:jobId opt_wild_where order_by_clause:orderByClause limit_clause:limitClause
| KW_ANALYZE INTEGER_LITERAL:jobId opt_wild_where
{:
RESULT = new ShowAnalyzeStmt(jobId, parser.where, orderByClause, limitClause);
RESULT = new ShowAnalyzeStmt(jobId, parser.where);
:}
| KW_AUTO KW_ANALYZE opt_table_name:tbl opt_wild_where
{:
RESULT = new ShowAnalyzeStmt(tbl, parser.where, true);
:}
| KW_ANALYZE KW_TASK KW_STATUS INTEGER_LITERAL:jobId
{:

View File

@ -25,7 +25,6 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
@ -35,10 +34,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
/**
* ShowAnalyzeStmt is used to show statistics job info.
* syntax:
@ -69,36 +64,30 @@ public class ShowAnalyzeStmt extends ShowStmt {
.build();
private long jobId;
private TableName dbTableName;
private Expr whereClause;
private LimitElement limitElement;
private List<OrderByElement> orderByElements;
private String stateValue;
private ArrayList<OrderByPair> orderByPairs;
private final TableName dbTableName;
private final Expr whereClause;
// extract from predicate
private String stateValue;
private final boolean auto;
public ShowAnalyzeStmt() {
}
public ShowAnalyzeStmt(TableName dbTableName,
Expr whereClause,
List<OrderByElement> orderByElements,
LimitElement limitElement) {
Expr whereClause, boolean auto) {
this.dbTableName = dbTableName;
this.whereClause = whereClause;
this.orderByElements = orderByElements;
this.limitElement = limitElement;
this.auto = auto;
}
public ShowAnalyzeStmt(long jobId,
Expr whereClause,
List<OrderByElement> orderByElements,
LimitElement limitElement) {
Expr whereClause) {
Preconditions.checkArgument(jobId > 0, "JobId must greater than 0.");
this.jobId = jobId;
this.dbTableName = null;
this.whereClause = whereClause;
this.orderByElements = orderByElements;
this.limitElement = limitElement;
this.auto = false;
}
public long getJobId() {
@ -111,12 +100,6 @@ public class ShowAnalyzeStmt extends ShowStmt {
return stateValue;
}
public ArrayList<OrderByPair> getOrderByPairs() {
Preconditions.checkArgument(isAnalyzed(),
"The orderByPairs must be obtained after the parsing is complete");
return orderByPairs;
}
public Expr getWhereClause() {
Preconditions.checkArgument(isAnalyzed(),
"The whereClause must be obtained after the parsing is complete");
@ -124,13 +107,6 @@ public class ShowAnalyzeStmt extends ShowStmt {
return whereClause;
}
public long getLimit() {
if (limitElement != null && limitElement.hasLimit()) {
return limitElement.getLimit();
}
return -1L;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
if (!Config.enable_stats) {
@ -149,21 +125,6 @@ public class ShowAnalyzeStmt extends ShowStmt {
if (whereClause != null) {
analyzeSubPredicate(whereClause);
}
// analyze order by
if (orderByElements != null && !orderByElements.isEmpty()) {
orderByPairs = new ArrayList<>();
for (OrderByElement orderByElement : orderByElements) {
if (orderByElement.getExpr() instanceof SlotRef) {
SlotRef slotRef = (SlotRef) orderByElement.getExpr();
int index = analyzeColumn(slotRef.getColumnName());
OrderByPair orderByPair = new OrderByPair(index, !orderByElement.getIsAsc());
orderByPairs.add(orderByPair);
} else {
throw new AnalysisException("Should order by column");
}
}
}
}
@Override
@ -279,25 +240,6 @@ public class ShowAnalyzeStmt extends ShowStmt {
sb.append(whereClause.toSql());
}
// Order By clause
if (orderByElements != null) {
sb.append(" ");
sb.append("ORDER BY");
sb.append(" ");
IntStream.range(0, orderByElements.size()).forEach(i -> {
sb.append(orderByElements.get(i).getExpr().toSql());
sb.append((orderByElements.get(i).getIsAsc()) ? " ASC" : " DESC");
sb.append((i + 1 != orderByElements.size()) ? ", " : "");
});
}
if (getLimit() != -1L) {
sb.append(" ");
sb.append("LIMIT");
sb.append(" ");
sb.append(getLimit());
}
return sb.toString();
}
@ -309,4 +251,8 @@ public class ShowAnalyzeStmt extends ShowStmt {
public TableName getDbTableName() {
return dbTableName;
}
public boolean isAuto() {
return auto;
}
}

View File

@ -885,6 +885,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_PERSIST_AUTO_JOB: {
data = AnalysisInfo.read(in);
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);

View File

@ -1108,6 +1108,10 @@ public class EditLog {
env.getAnalysisManager().replayUpdateTableStatsStatus((TableStats) journal.getData());
break;
}
case OperationType.OP_PERSIST_AUTO_JOB: {
env.getAnalysisManager().replayPersistSysJob((AnalysisInfo) journal.getData());
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@ -1945,4 +1949,9 @@ public class EditLog {
public void logCreateTableStats(TableStats tableStats) {
logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats);
}
public void logAutoJob(AnalysisInfo analysisInfo) {
logEdit(OperationType.OP_PERSIST_AUTO_JOB, analysisInfo);
}
}

View File

@ -333,6 +333,8 @@ public class OperationType {
public static final short OP_UPDATE_TABLE_STATS = 455;
public static final short OP_PERSIST_AUTO_JOB = 456;
/**
* Get opcode name by op code.
**/
@ -354,4 +356,5 @@ public class OperationType {
}
return "Not Found";
}
}

View File

@ -2675,7 +2675,9 @@ public class ShowExecutor {
ZoneId.systemDefault())));
row.add(analysisInfo.state.toString());
try {
row.add(Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
row.add(showStmt.isAuto()
? analysisInfo.progress
: Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId));
} catch (Exception e) {
row.add("N/A");
LOG.warn("Failed to get progress for job: {}", analysisInfo, e);

View File

@ -2582,6 +2582,7 @@ public class StmtExecutor {
analyze(context.getSessionVariable().toThrift());
}
} catch (Exception e) {
LOG.warn("Failed to run internal SQL: {}", originStmt, e);
throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e);
}
planner.getFragments();

View File

@ -48,6 +48,7 @@ public class AnalysisInfo implements Writable {
private static final Logger LOG = LogManager.getLogger(AnalysisInfo.class);
// TODO: useless, remove it later
public enum AnalysisMode {
INCREMENTAL,
FULL
@ -166,6 +167,9 @@ public class AnalysisInfo implements Writable {
@SerializedName("cronExpr")
public String cronExprStr;
@SerializedName("progress")
public String progress;
public CronExpression cronExpression;
@SerializedName("forceFull")

View File

@ -29,6 +29,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
@ -39,11 +40,13 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.AnalyzeDeletionLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
@ -52,11 +55,13 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.SimpleQueue;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -66,6 +71,7 @@ import org.quartz.CronExpression;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -73,6 +79,7 @@ import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -94,7 +101,7 @@ public class AnalysisManager extends Daemon implements Writable {
private static final Logger LOG = LogManager.getLogger(AnalysisManager.class);
// Tracking running manually submitted async tasks, keep in mem only
private final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();
private StatisticsCache statisticsCache;
@ -107,13 +114,15 @@ public class AnalysisManager extends Daemon implements Writable {
private final Map<Long, AnalysisInfo> analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>());
// Tracking system submitted job, keep in mem only
private final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>();
protected final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>();
// Tracking and control sync analyze tasks, keep in mem only
private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();
private final Map<Long, TableStats> idToTblStats = new ConcurrentHashMap<>();
protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null, this);
private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> {
AnalysisInfo info = w.info;
AnalysisState taskState = w.taskState;
@ -174,26 +183,44 @@ public class AnalysisManager extends Daemon implements Writable {
return null;
};
private final Function<TaskStatusWrapper, Void> systemJobStatusUpdater = w -> {
private final String progressDisplayTemplate = "%d Finished | %d Failed | %d In Progress | %d Total";
protected final Function<TaskStatusWrapper, Void> systemJobStatusUpdater = w -> {
AnalysisInfo info = w.info;
info.state = w.taskState;
info.message = w.message;
AnalysisInfo job = systemJobInfoMap.get(info.jobId);
if (job == null) {
return null;
}
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
if (!task.info.state.equals(AnalysisState.FINISHED)) {
if (task.info.state.equals(AnalysisState.FAILED)) {
systemJobInfoMap.remove(info.jobId);
}
int failedCount = 0;
StringJoiner reason = new StringJoiner(", ");
Map<Long, BaseAnalysisTask> taskMap = analysisJobIdToTaskMap.get(info.jobId);
for (BaseAnalysisTask task : taskMap.values()) {
if (task.info.state.equals(AnalysisState.RUNNING) || task.info.state.equals(AnalysisState.PENDING)) {
return null;
}
if (task.info.state.equals(AnalysisState.FAILED)) {
failedCount++;
reason.add(task.info.message);
}
}
try {
updateTableStats(job);
} catch (Throwable e) {
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
} finally {
job.lastExecTimeInMs = System.currentTimeMillis();
job.message = reason.toString();
job.progress = String.format(progressDisplayTemplate,
taskMap.size() - failedCount, failedCount, 0, taskMap.size());
if (failedCount > 0) {
job.message = reason.toString();
job.state = AnalysisState.FAILED;
} else {
job.state = AnalysisState.FINISHED;
}
autoJobs.offer(job);
systemJobInfoMap.remove(info.jobId);
}
return null;
@ -202,7 +229,6 @@ public class AnalysisManager extends Daemon implements Writable {
private final Function<TaskStatusWrapper, Void>[] updaters =
new Function[] {userJobStatusUpdater, systemJobStatusUpdater};
public AnalysisManager() {
super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS));
if (!Env.isCheckpointThread()) {
@ -616,9 +642,19 @@ public class AnalysisManager extends Daemon implements Writable {
}
public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) {
if (stmt.isAuto()) {
// It's ok to sync on this field, it would only be assigned when instance init or do checkpoint
synchronized (autoJobs) {
return findShowAnalyzeResult(autoJobs, stmt);
}
}
return findShowAnalyzeResult(analysisJobInfoMap.values(), stmt);
}
protected List<AnalysisInfo> findShowAnalyzeResult(Collection<AnalysisInfo> analysisInfos, ShowAnalyzeStmt stmt) {
String state = stmt.getStateValue();
TableName tblName = stmt.getDbTableName();
return analysisJobInfoMap.values().stream()
return analysisInfos.stream()
.filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId())
.filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state)))
.filter(a -> tblName == null || a.catalogName.equals(tblName.getCtl())
@ -649,7 +685,7 @@ public class AnalysisManager extends Daemon implements Writable {
break;
}
}
return String.format("%d Finished/%d Failed/%d In Progress/%d Total", finished, failed, inProgress, total);
return String.format(progressDisplayTemplate, finished, failed, inProgress, total);
}
@VisibleForTesting
@ -879,6 +915,7 @@ public class AnalysisManager extends Daemon implements Writable {
readAnalysisInfo(in, analysisManager.analysisJobInfoMap, true);
readAnalysisInfo(in, analysisManager.analysisTaskInfoMap, false);
readIdToTblStats(in, analysisManager.idToTblStats);
readAutoJobs(in, analysisManager);
return analysisManager;
}
@ -898,11 +935,18 @@ public class AnalysisManager extends Daemon implements Writable {
}
}
private static void readAutoJobs(DataInput in, AnalysisManager analysisManager) throws IOException {
Type type = new TypeToken<LinkedList<AnalysisInfo>>() {}.getType();
Collection<AnalysisInfo> autoJobs = GsonUtils.GSON.fromJson(Text.readString(in), type);
analysisManager.autoJobs = analysisManager.createSimpleQueue(autoJobs, analysisManager);
}
@Override
public void write(DataOutput out) throws IOException {
writeJobInfo(out, analysisJobInfoMap);
writeJobInfo(out, analysisTaskInfoMap);
writeTableStats(out);
writeAutoJobsStatus(out);
}
private void writeJobInfo(DataOutput out, Map<Long, AnalysisInfo> infoMap) throws IOException {
@ -919,6 +963,12 @@ public class AnalysisManager extends Daemon implements Writable {
}
}
private void writeAutoJobsStatus(DataOutput output) throws IOException {
Type type = new TypeToken<LinkedList<AnalysisInfo>>() {}.getType();
String autoJobs = GsonUtils.GSON.toJson(this.autoJobs, type);
Text.writeString(output, autoJobs);
}
// For unit test use only.
public void addToJobIdTasksMap(long jobId, Map<Long, BaseAnalysisTask> tasks) {
analysisJobIdToTaskMap.put(jobId, tasks);
@ -954,4 +1004,45 @@ public class AnalysisManager extends Daemon implements Writable {
systemJobInfoMap.put(jobInfo.jobId, jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
}
@VisibleForTesting
protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
TableStats tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
}
return table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
.collect(Collectors.toSet());
}
protected void logAutoJob(AnalysisInfo autoJob) {
Env.getCurrentEnv().getEditLog().logAutoJob(autoJob);
}
public void replayPersistSysJob(AnalysisInfo analysisInfo) {
autoJobs.offer(analysisInfo);
}
protected SimpleQueue<AnalysisInfo> createSimpleQueue(Collection<AnalysisInfo> collection,
AnalysisManager analysisManager) {
return new SimpleQueue<>(Config.auto_analyze_job_record_count,
a -> {
// FE is not ready when replaying log and operations triggered by replaying
// shouldn't be logged again.
if (Env.getCurrentEnv().isReady() && !Env.isCheckpointThread()) {
analysisManager.logAutoJob(a);
}
return null;
},
a -> {
// DO NOTHING
return null;
}, null);
}
}

View File

@ -99,4 +99,9 @@ public class AnalysisTaskExecutor extends Thread {
public boolean idle() {
return executors.getQueue().isEmpty();
}
public void clear() {
executors.getQueue().clear();
taskQueue.clear();
}
}

View File

@ -18,11 +18,15 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.LocalTime;
import java.util.concurrent.FutureTask;
public class AnalysisTaskWrapper extends FutureTask<Void> {
@ -52,6 +56,14 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
if (task.killed) {
return;
}
if (task.info.scheduleType.equals(ScheduleType.AUTOMATIC) && !StatisticsUtil.checkAnalyzeTime(
LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
// TODO: Do we need a separate AnalysisState here?
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info, AnalysisState.FAILED, "Auto task"
+ "doesn't get executed within specified time range", System.currentTimeMillis());
return;
}
executor.putJob(this);
super.run();
Object result = get();

View File

@ -115,7 +115,7 @@ public abstract class BaseAnalysisTask {
init(info);
}
private void init(AnalysisInfo info) {
protected void init(AnalysisInfo info) {
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
if (catalog == null) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,

View File

@ -163,7 +163,7 @@ public class ColumnStatistic {
String colName = row.get(5);
Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName);
if (col == null) {
LOG.warn("Failed to deserialize column statistics, ctlId: {} dbId: {}"
LOG.debug("Failed to deserialize column statistics, ctlId: {} dbId: {}"
+ "tblId: {} column: {} not exists",
catalogId, dbID, tblId, colName);
return ColumnStatistic.UNKNOWN;

View File

@ -17,9 +17,7 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.VariableExpr;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@ -27,23 +25,19 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -64,7 +58,8 @@ public class StatisticsAutoCollector extends StatisticsCollector {
@Override
protected void collect() {
if (!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
if (!StatisticsUtil.checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
analysisTaskExecutor.clear();
return;
}
if (Config.enable_full_auto_analyze) {
@ -147,7 +142,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(analysisMethod)
.setSamplePercent(Config.huge_table_default_sample_rows)
.setScheduleType(AnalysisInfo.ScheduleType.ONCE)
.setScheduleType(ScheduleType.AUTOMATIC)
.setState(AnalysisState.PENDING)
.setTaskIds(new ArrayList<>())
.setLastExecTimeInMs(System.currentTimeMillis())
@ -175,51 +170,27 @@ public class StatisticsAutoCollector extends StatisticsCollector {
return new AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build();
}
private boolean checkAnalyzeTime(LocalTime now) {
try {
Pair<LocalTime, LocalTime> range = findRangeFromGlobalSessionVar();
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
LocalTime start = range == null
? LocalTime.parse(Config.full_auto_analyze_start_time, timeFormatter) : range.first;
LocalTime end = range == null
? LocalTime.parse(Config.full_auto_analyze_end_time, timeFormatter) : range.second;
if (start.isAfter(end) && (now.isAfter(start) || now.isBefore(end))) {
return true;
} else {
return now.isAfter(start) && now.isBefore(end);
@VisibleForTesting
protected AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
Set<String> needRunPartitions) {
Map<String, Set<String>> newColToPartitions = Maps.newHashMap();
Map<String, Set<String>> colToPartitions = jobInfo.colToPartitions;
if (colToPartitions == null) {
for (Column c : table.getColumns()) {
if (StatisticsUtil.isUnsupportedType(c.getType())) {
continue;
}
newColToPartitions.put(c.getName(), needRunPartitions);
}
} catch (DateTimeParseException e) {
LOG.warn("Parse analyze start/end time format fail", e);
return true;
} else {
colToPartitions.keySet().forEach(colName -> {
Column column = table.getColumn(colName);
if (column != null) {
newColToPartitions.put(colName, needRunPartitions);
}
});
}
}
private Pair<LocalTime, LocalTime> findRangeFromGlobalSessionVar() {
try {
String startTime =
findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME)
.fullAutoAnalyzeStartTime;
if (StringUtils.isEmpty(startTime)) {
return null;
}
String endTime = findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME)
.fullAutoAnalyzeEndTime;
if (StringUtils.isEmpty(startTime)) {
return null;
}
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
return Pair.of(LocalTime.parse(startTime, timeFormatter), LocalTime.parse(endTime, timeFormatter));
} catch (Exception e) {
return null;
}
}
private SessionVariable findRangeFromGlobalSessionVar(String varName) throws Exception {
SessionVariable sessionVariable = VariableMgr.newSessionVariable();
VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL);
VariableMgr.getValue(sessionVariable, variableExpr);
return sessionVariable;
return new AnalysisInfoBuilder(jobInfo)
.setColToPartitions(newColToPartitions).build();
}
}

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics.util;
import java.util.Collection;
import java.util.LinkedList;
import java.util.function.Function;
// Any operation on this structure should be thread-safe
public class SimpleQueue<T> extends LinkedList<T> {
private final long limit;
private final Function<T, Void> offerFunc;
private final Function<T, Void> evictFunc;
public SimpleQueue(long limit, Function<T, Void> offerFunc, Function<T, Void> evictFunc) {
this.limit = limit;
this.offerFunc = offerFunc;
this.evictFunc = evictFunc;
}
@Override
public synchronized boolean offer(T analysisInfo) {
while (size() >= limit) {
remove();
}
super.offer(analysisInfo);
offerFunc.apply(analysisInfo);
return true;
}
@Override
public synchronized T remove() {
T analysisInfo = super.remove();
evictFunc.apply(analysisInfo);
return analysisInfo;
}
public SimpleQueue(long limit, Function<T, Void> offerFunc, Function<T, Void> evictFunc, Collection<T> collection) {
this(limit, offerFunc, evictFunc);
if (collection != null) {
for (T e : collection) {
offer(e);
}
}
}
}

View File

@ -25,10 +25,12 @@ import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LargeIntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.analysis.VariableExpr;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
@ -49,6 +51,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
@ -62,6 +65,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Histogram;
@ -87,6 +91,9 @@ import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -711,4 +718,51 @@ public class StatisticsUtil {
}
return table instanceof ExternalTable;
}
public static boolean checkAnalyzeTime(LocalTime now) {
try {
Pair<LocalTime, LocalTime> range = findRangeFromGlobalSessionVar();
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
LocalTime start = range == null
? LocalTime.parse(Config.full_auto_analyze_start_time, timeFormatter) : range.first;
LocalTime end = range == null
? LocalTime.parse(Config.full_auto_analyze_end_time, timeFormatter) : range.second;
if (start.isAfter(end) && (now.isAfter(start) || now.isBefore(end))) {
return true;
} else {
return now.isAfter(start) && now.isBefore(end);
}
} catch (DateTimeParseException e) {
LOG.warn("Parse analyze start/end time format fail", e);
return true;
}
}
private static Pair<LocalTime, LocalTime> findRangeFromGlobalSessionVar() {
try {
String startTime =
findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME)
.fullAutoAnalyzeStartTime;
if (StringUtils.isEmpty(startTime)) {
return null;
}
String endTime = findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME)
.fullAutoAnalyzeEndTime;
if (StringUtils.isEmpty(startTime)) {
return null;
}
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
return Pair.of(LocalTime.parse(startTime, timeFormatter), LocalTime.parse(endTime, timeFormatter));
} catch (Exception e) {
return null;
}
}
private static SessionVariable findRangeFromGlobalSessionVar(String varName) throws Exception {
SessionVariable sessionVariable = VariableMgr.newSessionVariable();
VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL);
VariableMgr.getValue(sessionVariable, variableExpr);
return sessionVariable;
}
}

View File

@ -21,6 +21,9 @@ import org.apache.doris.analysis.AnalyzeProperties;
import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.DdlException;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
@ -33,6 +36,7 @@ import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.hadoop.util.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -40,6 +44,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
// CHECKSTYLE OFF
@ -262,4 +267,83 @@ public class AnalysisManagerTest {
};
}
@Test
public void testSystemJobStatusUpdater() {
new MockUp<BaseAnalysisTask>() {
@Mock
protected void init(AnalysisInfo info) {
}
};
new MockUp<AnalysisManager>() {
@Mock
public void updateTableStats(AnalysisInfo jobInfo) {}
@Mock
protected void logAutoJob(AnalysisInfo autoJob) {
}
};
AnalysisManager analysisManager = new AnalysisManager();
AnalysisInfo job = new AnalysisInfoBuilder()
.setJobId(0)
.setColName("col1, col2").build();
analysisManager.systemJobInfoMap.put(job.jobId, job);
AnalysisInfo task1 = new AnalysisInfoBuilder()
.setJobId(0)
.setTaskId(1)
.setState(AnalysisState.RUNNING)
.setColName("col1").build();
AnalysisInfo task2 = new AnalysisInfoBuilder()
.setJobId(0)
.setTaskId(1)
.setState(AnalysisState.FINISHED)
.setColName("col2").build();
OlapAnalysisTask ot1 = new OlapAnalysisTask(task1);
OlapAnalysisTask ot2 = new OlapAnalysisTask(task2);
Map<Long, BaseAnalysisTask> taskMap = new HashMap<>();
taskMap.put(ot1.info.taskId, ot1);
taskMap.put(ot2.info.taskId, ot2);
analysisManager.analysisJobIdToTaskMap.put(job.jobId, taskMap);
// test invalid job
AnalysisInfo invalidJob = new AnalysisInfoBuilder().setJobId(-1).build();
analysisManager.systemJobStatusUpdater.apply(new TaskStatusWrapper(invalidJob,
AnalysisState.FAILED, "", 0));
// test finished
analysisManager.systemJobStatusUpdater.apply(new TaskStatusWrapper(task1, AnalysisState.FAILED, "", 0));
analysisManager.systemJobStatusUpdater.apply(new TaskStatusWrapper(task1, AnalysisState.FINISHED, "", 0));
Assertions.assertEquals(1, analysisManager.autoJobs.size());
Assertions.assertTrue(analysisManager.systemJobInfoMap.isEmpty());
}
@Test
public void testReAnalyze() {
new MockUp<OlapTable>() {
int count = 0;
int[] rowCount = new int[]{100, 200};
@Mock
public long getRowCount() {
return rowCount[count++];
}
@Mock
public List<Column> getBaseSchema() {
return Lists.newArrayList(new Column("col1", PrimitiveType.INT));
}
};
OlapTable olapTable = new OlapTable();
TableStats stats1 = new TableStats(0, 50, new AnalysisInfoBuilder().setColName("col1").build());
Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1));
TableStats stats2 = new TableStats(0, 190, new AnalysisInfoBuilder().setColName("col1").build());
Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2));
}
}

View File

@ -209,7 +209,7 @@ public class StatisticsAutoCollectorTest {
.setDbName("db")
.setTblName("tbl").build();
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
// uncomment it when updatedRows get ready
// uncomment it when updatedRows gets ready
// Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2));
}