[feature](stats) Enable period time with cron expr (#22095)
Support such grammar ANALYZE TABLE test WITH CRON "* * * * * ?" Such job would be scheduled as the cron expr specifie, but natively support minute-level schedule only
This commit is contained in:
@ -654,6 +654,7 @@ terminal BigDecimal DECIMAL_LITERAL;
|
||||
terminal String STRING_LITERAL;
|
||||
terminal String UNMATCHED_STRING_LITERAL;
|
||||
terminal String COMMENTED_PLAN_HINTS;
|
||||
terminal String KW_CRON;
|
||||
|
||||
// Statement that the result of this parser.
|
||||
nonterminal List<StatementBase> stmts;
|
||||
@ -5934,6 +5935,12 @@ with_analysis_properties ::=
|
||||
put("analysis.type", "HISTOGRAM");
|
||||
}};
|
||||
:}
|
||||
| KW_CRON STRING_LITERAL:cron_expr
|
||||
{:
|
||||
RESULT = new HashMap<String, String>() {{
|
||||
put("period.cron", cron_expr);
|
||||
}};
|
||||
:}
|
||||
;
|
||||
|
||||
opt_with_analysis_properties ::=
|
||||
@ -7664,6 +7671,8 @@ keyword ::=
|
||||
{: RESULT = id; :}
|
||||
| KW_PERCENT:id
|
||||
{: RESULT = id; :}
|
||||
| KW_CRON:id
|
||||
{: RESULT = id; :}
|
||||
;
|
||||
|
||||
// Identifier that contain keyword
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.quartz.CronExpression;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -50,6 +51,10 @@ public class AnalyzeProperties {
|
||||
}
|
||||
});
|
||||
|
||||
public static final String PROPERTY_PERIOD_CRON = "period.cron";
|
||||
|
||||
private CronExpression cronExpression;
|
||||
|
||||
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
|
||||
.add(PROPERTY_SYNC)
|
||||
.add(PROPERTY_INCREMENTAL)
|
||||
@ -59,6 +64,7 @@ public class AnalyzeProperties {
|
||||
.add(PROPERTY_NUM_BUCKETS)
|
||||
.add(PROPERTY_ANALYSIS_TYPE)
|
||||
.add(PROPERTY_PERIOD_SECONDS)
|
||||
.add(PROPERTY_PERIOD_CRON)
|
||||
.build();
|
||||
|
||||
public AnalyzeProperties(Map<String, String> properties) {
|
||||
@ -81,6 +87,7 @@ public class AnalyzeProperties {
|
||||
checkAnalysisMode(msgTemplate);
|
||||
checkAnalysisType(msgTemplate);
|
||||
checkScheduleType(msgTemplate);
|
||||
checkPeriod();
|
||||
}
|
||||
|
||||
public boolean isSync() {
|
||||
@ -124,6 +131,10 @@ public class AnalyzeProperties {
|
||||
return TimeUnit.SECONDS.toMillis(minutes);
|
||||
}
|
||||
|
||||
public CronExpression getCron() {
|
||||
return cronExpression;
|
||||
}
|
||||
|
||||
private void checkPeriodSeconds() throws AnalysisException {
|
||||
if (properties.containsKey(PROPERTY_PERIOD_SECONDS)) {
|
||||
checkNumericProperty(PROPERTY_PERIOD_SECONDS, properties.get(PROPERTY_PERIOD_SECONDS),
|
||||
@ -216,6 +227,22 @@ public class AnalyzeProperties {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkPeriod() throws AnalysisException {
|
||||
if (properties.containsKey(PROPERTY_PERIOD_SECONDS)
|
||||
&& properties.containsKey(PROPERTY_PERIOD_CRON)) {
|
||||
throw new AnalysisException(PROPERTY_PERIOD_SECONDS + " and " + PROPERTY_PERIOD_CRON
|
||||
+ " couldn't be set simultaneously");
|
||||
}
|
||||
String cronExprStr = properties.get(PROPERTY_PERIOD_CRON);
|
||||
if (cronExprStr != null) {
|
||||
try {
|
||||
cronExpression = new CronExpression(cronExprStr);
|
||||
} catch (java.text.ParseException e) {
|
||||
throw new AnalysisException("Invalid cron expression: " + cronExprStr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkNumericProperty(String key, String value, int lowerBound, int upperBound,
|
||||
boolean includeBoundary, String errorMsg) throws AnalysisException {
|
||||
if (!StringUtils.isNumeric(value)) {
|
||||
|
||||
@ -23,6 +23,8 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
|
||||
|
||||
import org.quartz.CronExpression;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class AnalyzeStmt extends StatementBase {
|
||||
@ -55,7 +57,8 @@ public class AnalyzeStmt extends StatementBase {
|
||||
if (analyzeProperties.isAutomatic()) {
|
||||
return ScheduleType.AUTOMATIC;
|
||||
}
|
||||
return analyzeProperties.getPeriodTimeInMs() > 0 ? ScheduleType.PERIOD : ScheduleType.ONCE;
|
||||
return analyzeProperties.getPeriodTimeInMs() > 0 || analyzeProperties.getCron() != null
|
||||
? ScheduleType.PERIOD : ScheduleType.ONCE;
|
||||
}
|
||||
|
||||
public boolean isSync() {
|
||||
@ -86,4 +89,8 @@ public class AnalyzeStmt extends StatementBase {
|
||||
public RedirectStatus getRedirectStatus() {
|
||||
return RedirectStatus.FORWARD_WITH_SYNC;
|
||||
}
|
||||
|
||||
public CronExpression getCron() {
|
||||
return analyzeProperties.getCron();
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,11 +30,13 @@ import com.google.gson.annotations.SerializedName;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.quartz.CronExpression;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.text.ParseException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
@ -149,12 +151,18 @@ public class AnalysisInfo implements Writable {
|
||||
@SerializedName("samplingPartition")
|
||||
public boolean samplingPartition;
|
||||
|
||||
// For serialize
|
||||
@SerializedName("cronExpr")
|
||||
public String cronExprStr;
|
||||
|
||||
public CronExpression cronExpression;
|
||||
|
||||
public AnalysisInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
|
||||
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
|
||||
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
|
||||
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
|
||||
long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask,
|
||||
boolean partitionOnly, boolean samplingPartition) {
|
||||
boolean partitionOnly, boolean samplingPartition, CronExpression cronExpression) {
|
||||
this.jobId = jobId;
|
||||
this.taskId = taskId;
|
||||
this.catalogName = catalogName;
|
||||
@ -179,6 +187,7 @@ public class AnalysisInfo implements Writable {
|
||||
this.externalTableLevelTask = isExternalTableLevelTask;
|
||||
this.partitionOnly = partitionOnly;
|
||||
this.samplingPartition = samplingPartition;
|
||||
this.cronExpression = cronExpression;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -337,7 +346,15 @@ public class AnalysisInfo implements Writable {
|
||||
return analysisInfoBuilder.build();
|
||||
} else {
|
||||
String json = Text.readString(dataInput);
|
||||
return GsonUtils.GSON.fromJson(json, AnalysisInfo.class);
|
||||
AnalysisInfo analysisInfo = GsonUtils.GSON.fromJson(json, AnalysisInfo.class);
|
||||
if (analysisInfo.cronExprStr != null) {
|
||||
try {
|
||||
analysisInfo.cronExpression = new CronExpression(analysisInfo.cronExprStr);
|
||||
} catch (ParseException e) {
|
||||
LOG.warn("Cron expression of job is invalid, there is a bug", e);
|
||||
}
|
||||
}
|
||||
return analysisInfo;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,6 +23,8 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
|
||||
|
||||
import org.quartz.CronExpression;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@ -52,6 +54,8 @@ public class AnalysisInfoBuilder {
|
||||
private boolean partitionOnly;
|
||||
private boolean samplingPartition;
|
||||
|
||||
private CronExpression cronExpression;
|
||||
|
||||
public AnalysisInfoBuilder() {
|
||||
}
|
||||
|
||||
@ -202,11 +206,15 @@ public class AnalysisInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setCronExpression(CronExpression cronExpression) {
|
||||
this.cronExpression = cronExpression;
|
||||
}
|
||||
|
||||
public AnalysisInfo build() {
|
||||
return new AnalysisInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, partitionNames,
|
||||
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, state, scheduleType,
|
||||
externalTableLevelTask, partitionOnly, samplingPartition);
|
||||
externalTableLevelTask, partitionOnly, samplingPartition, cronExpression);
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder copy() {
|
||||
|
||||
@ -61,6 +61,7 @@ import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.quartz.CronExpression;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
@ -69,6 +70,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -398,6 +400,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
AnalysisMode analysisMode = stmt.getAnalysisMode();
|
||||
AnalysisMethod analysisMethod = stmt.getAnalysisMethod();
|
||||
ScheduleType scheduleType = stmt.getScheduleType();
|
||||
CronExpression cronExpression = stmt.getCron();
|
||||
|
||||
taskInfoBuilder.setJobId(jobId);
|
||||
taskInfoBuilder.setCatalogName(catalogName);
|
||||
@ -419,6 +422,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
taskInfoBuilder.setAnalysisMethod(analysisMethod);
|
||||
taskInfoBuilder.setScheduleType(scheduleType);
|
||||
taskInfoBuilder.setLastExecTimeInMs(0);
|
||||
taskInfoBuilder.setCronExpression(cronExpression);
|
||||
|
||||
if (analysisMethod == AnalysisMethod.SAMPLE) {
|
||||
taskInfoBuilder.setSamplePercent(samplePercent);
|
||||
@ -432,10 +436,8 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
taskInfoBuilder.setMaxBucketNum(maxBucketNum);
|
||||
}
|
||||
|
||||
if (scheduleType == ScheduleType.PERIOD) {
|
||||
long periodTimeInMs = stmt.getPeriodTimeInMs();
|
||||
taskInfoBuilder.setPeriodTimeInMs(periodTimeInMs);
|
||||
}
|
||||
long periodTimeInMs = stmt.getPeriodTimeInMs();
|
||||
taskInfoBuilder.setPeriodTimeInMs(periodTimeInMs);
|
||||
|
||||
Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table, columnNames,
|
||||
partitionNames, analysisType, analysisMode);
|
||||
@ -850,10 +852,18 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
public List<AnalysisInfo> findPeriodicJobs() {
|
||||
synchronized (analysisJobInfoMap) {
|
||||
Predicate<AnalysisInfo> p = a -> {
|
||||
if (a.state.equals(AnalysisState.RUNNING) || a.state.equals(AnalysisState.PENDING)) {
|
||||
return false;
|
||||
}
|
||||
if (a.cronExpression == null) {
|
||||
return a.scheduleType.equals(ScheduleType.PERIOD)
|
||||
&& System.currentTimeMillis() - a.lastExecTimeInMs > a.periodTimeInMs;
|
||||
}
|
||||
return a.cronExpression.getTimeAfter(new Date(a.lastExecTimeInMs)).before(new Date());
|
||||
};
|
||||
return analysisJobInfoMap.values().stream()
|
||||
.filter(a -> a.scheduleType.equals(ScheduleType.PERIOD)
|
||||
&& (a.state.equals(AnalysisState.FINISHED))
|
||||
&& System.currentTimeMillis() - a.lastExecTimeInMs > a.periodTimeInMs)
|
||||
.filter(p)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,7 +62,6 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
|
||||
if (!StatisticsUtil.statsTblAvailable()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
|
||||
return;
|
||||
}
|
||||
@ -100,6 +99,8 @@ public class StatisticsAutoAnalyzer extends MasterDaemon {
|
||||
LOG.warn("Failed to analyze all statistics.", t);
|
||||
}
|
||||
}
|
||||
|
||||
analyzePeriodically();
|
||||
}
|
||||
|
||||
private void analyzePeriodically() {
|
||||
|
||||
@ -500,6 +500,7 @@ import org.apache.doris.qe.SqlModeHelper;
|
||||
keywordMap.put("lines", new Integer(SqlParserSymbols.KW_LINES));
|
||||
keywordMap.put("ignore", new Integer(SqlParserSymbols.KW_IGNORE));
|
||||
keywordMap.put("expired", new Integer(SqlParserSymbols.KW_EXPIRED));
|
||||
keywordMap.put("cron", new Integer(SqlParserSymbols.KW_CRON));
|
||||
}
|
||||
|
||||
// map from token id to token description
|
||||
|
||||
Reference in New Issue
Block a user