From 0be0b8ff586201f7941d13949a38e7512d78ee11 Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Thu, 14 Sep 2023 18:10:04 +0900 Subject: [PATCH] [opt](stats) Support display of auto analyze jobs (#24135) ### Support dispaly of auto analyze jobs After this PR, users and DBA could use such grammar to check the execution status of auto analyze jobs: ```sql SHOW AUTO ANALYZE [tbl_name] [WHERE STATE='SOME STATE'] ``` Record count of history auto analyze job could be configured by setting FE option: auto_analyze_job_record_count, default value is 2000 ### Enhance auto analyze After this PR, auto jobs those created automatically will no longer execute beyond a specific time frame. --- .../java/org/apache/doris/common/Config.java | 7 ++ fe/fe-core/src/main/cup/sql_parser.cup | 12 +- .../doris/analysis/ShowAnalyzeStmt.java | 86 +++---------- .../apache/doris/journal/JournalEntity.java | 5 + .../org/apache/doris/persist/EditLog.java | 9 ++ .../apache/doris/persist/OperationType.java | 3 + .../org/apache/doris/qe/ShowExecutor.java | 4 +- .../org/apache/doris/qe/StmtExecutor.java | 1 + .../apache/doris/statistics/AnalysisInfo.java | 4 + .../doris/statistics/AnalysisManager.java | 113 ++++++++++++++++-- .../statistics/AnalysisTaskExecutor.java | 5 + .../doris/statistics/AnalysisTaskWrapper.java | 12 ++ .../doris/statistics/BaseAnalysisTask.java | 2 +- .../doris/statistics/ColumnStatistic.java | 2 +- .../statistics/StatisticsAutoCollector.java | 79 ++++-------- .../doris/statistics/util/SimpleQueue.java | 65 ++++++++++ .../doris/statistics/util/StatisticsUtil.java | 54 +++++++++ .../doris/statistics/AnalysisManagerTest.java | 84 +++++++++++++ .../StatisticsAutoCollectorTest.java | 2 +- 19 files changed, 406 insertions(+), 143 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/util/SimpleQueue.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 7217acebcc..1d4f35fef7 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2190,4 +2190,11 @@ public class Config extends ConfigBase { + "统计信息", "Whether to enable automatic sampling for large tables, which, when enabled, automatically" + "collects statistics through sampling for tables larger than 'huge_table_lower_bound_size_in_bytes'"}) public static boolean enable_auto_sample = false; + + @ConfField(description = { + "控制统计信息的自动触发作业执行记录的持久化行数", + "Determine the persist number of automatic triggered analyze job execution status" + }) + public static long auto_analyze_job_record_count = 20000; + } diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index ecf4708d0b..8f9119308a 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4237,13 +4237,17 @@ show_param ::= RESULT = new ShowCreateMaterializedViewStmt(mvName, tableName); :} /* show analyze job */ - | KW_ANALYZE opt_table_name:tbl opt_wild_where order_by_clause:orderByClause limit_clause:limitClause + | KW_ANALYZE opt_table_name:tbl opt_wild_where {: - RESULT = new ShowAnalyzeStmt(tbl, parser.where, orderByClause, limitClause); + RESULT = new ShowAnalyzeStmt(tbl, parser.where, false); :} - | KW_ANALYZE INTEGER_LITERAL:jobId opt_wild_where order_by_clause:orderByClause limit_clause:limitClause + | KW_ANALYZE INTEGER_LITERAL:jobId opt_wild_where {: - RESULT = new ShowAnalyzeStmt(jobId, parser.where, orderByClause, limitClause); + RESULT = new ShowAnalyzeStmt(jobId, parser.where); + :} + | KW_AUTO KW_ANALYZE opt_table_name:tbl opt_wild_where + {: + RESULT = new ShowAnalyzeStmt(tbl, parser.where, true); :} | KW_ANALYZE KW_TASK KW_STATUS INTEGER_LITERAL:jobId {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java index 95035641a7..07c3029ee0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java @@ -25,7 +25,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.OrderByPair; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSetMetaData; @@ -35,10 +34,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.IntStream; - /** * ShowAnalyzeStmt is used to show statistics job info. * syntax: @@ -69,36 +64,30 @@ public class ShowAnalyzeStmt extends ShowStmt { .build(); private long jobId; - private TableName dbTableName; - private Expr whereClause; - private LimitElement limitElement; - private List orderByElements; - private String stateValue; - private ArrayList orderByPairs; + private final TableName dbTableName; + private final Expr whereClause; + + // extract from predicate + private String stateValue; + + private final boolean auto; - public ShowAnalyzeStmt() { - } public ShowAnalyzeStmt(TableName dbTableName, - Expr whereClause, - List orderByElements, - LimitElement limitElement) { + Expr whereClause, boolean auto) { this.dbTableName = dbTableName; this.whereClause = whereClause; - this.orderByElements = orderByElements; - this.limitElement = limitElement; + this.auto = auto; + } public ShowAnalyzeStmt(long jobId, - Expr whereClause, - List orderByElements, - LimitElement limitElement) { + Expr whereClause) { Preconditions.checkArgument(jobId > 0, "JobId must greater than 0."); this.jobId = jobId; this.dbTableName = null; this.whereClause = whereClause; - this.orderByElements = orderByElements; - this.limitElement = limitElement; + this.auto = false; } public long getJobId() { @@ -111,12 +100,6 @@ public class ShowAnalyzeStmt extends ShowStmt { return stateValue; } - public ArrayList getOrderByPairs() { - Preconditions.checkArgument(isAnalyzed(), - "The orderByPairs must be obtained after the parsing is complete"); - return orderByPairs; - } - public Expr getWhereClause() { Preconditions.checkArgument(isAnalyzed(), "The whereClause must be obtained after the parsing is complete"); @@ -124,13 +107,6 @@ public class ShowAnalyzeStmt extends ShowStmt { return whereClause; } - public long getLimit() { - if (limitElement != null && limitElement.hasLimit()) { - return limitElement.getLimit(); - } - return -1L; - } - @Override public void analyze(Analyzer analyzer) throws UserException { if (!Config.enable_stats) { @@ -149,21 +125,6 @@ public class ShowAnalyzeStmt extends ShowStmt { if (whereClause != null) { analyzeSubPredicate(whereClause); } - - // analyze order by - if (orderByElements != null && !orderByElements.isEmpty()) { - orderByPairs = new ArrayList<>(); - for (OrderByElement orderByElement : orderByElements) { - if (orderByElement.getExpr() instanceof SlotRef) { - SlotRef slotRef = (SlotRef) orderByElement.getExpr(); - int index = analyzeColumn(slotRef.getColumnName()); - OrderByPair orderByPair = new OrderByPair(index, !orderByElement.getIsAsc()); - orderByPairs.add(orderByPair); - } else { - throw new AnalysisException("Should order by column"); - } - } - } } @Override @@ -279,25 +240,6 @@ public class ShowAnalyzeStmt extends ShowStmt { sb.append(whereClause.toSql()); } - // Order By clause - if (orderByElements != null) { - sb.append(" "); - sb.append("ORDER BY"); - sb.append(" "); - IntStream.range(0, orderByElements.size()).forEach(i -> { - sb.append(orderByElements.get(i).getExpr().toSql()); - sb.append((orderByElements.get(i).getIsAsc()) ? " ASC" : " DESC"); - sb.append((i + 1 != orderByElements.size()) ? ", " : ""); - }); - } - - if (getLimit() != -1L) { - sb.append(" "); - sb.append("LIMIT"); - sb.append(" "); - sb.append(getLimit()); - } - return sb.toString(); } @@ -309,4 +251,8 @@ public class ShowAnalyzeStmt extends ShowStmt { public TableName getDbTableName() { return dbTableName; } + + public boolean isAuto() { + return auto; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 0415e43581..017a535c54 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -885,6 +885,11 @@ public class JournalEntity implements Writable { isRead = true; break; } + case OperationType.OP_PERSIST_AUTO_JOB: { + data = AnalysisInfo.read(in); + isRead = true; + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 76c10e8567..9695df482b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1108,6 +1108,10 @@ public class EditLog { env.getAnalysisManager().replayUpdateTableStatsStatus((TableStats) journal.getData()); break; } + case OperationType.OP_PERSIST_AUTO_JOB: { + env.getAnalysisManager().replayPersistSysJob((AnalysisInfo) journal.getData()); + break; + } default: { IOException e = new IOException(); LOG.error("UNKNOWN Operation Type {}", opCode, e); @@ -1945,4 +1949,9 @@ public class EditLog { public void logCreateTableStats(TableStats tableStats) { logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats); } + + public void logAutoJob(AnalysisInfo analysisInfo) { + logEdit(OperationType.OP_PERSIST_AUTO_JOB, analysisInfo); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index ccfa283177..a1af8da41b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -333,6 +333,8 @@ public class OperationType { public static final short OP_UPDATE_TABLE_STATS = 455; + public static final short OP_PERSIST_AUTO_JOB = 456; + /** * Get opcode name by op code. **/ @@ -354,4 +356,5 @@ public class OperationType { } return "Not Found"; } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 465f6a3bd5..bb435c443d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -2675,7 +2675,9 @@ public class ShowExecutor { ZoneId.systemDefault()))); row.add(analysisInfo.state.toString()); try { - row.add(Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId)); + row.add(showStmt.isAuto() + ? analysisInfo.progress + : Env.getCurrentEnv().getAnalysisManager().getJobProgress(analysisInfo.jobId)); } catch (Exception e) { row.add("N/A"); LOG.warn("Failed to get progress for job: {}", analysisInfo, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f8d9b16be8..5100e2d436 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2582,6 +2582,7 @@ public class StmtExecutor { analyze(context.getSessionVariable().toThrift()); } } catch (Exception e) { + LOG.warn("Failed to run internal SQL: {}", originStmt, e); throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e); } planner.getFragments(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index f23707b799..00b8c7cdaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -48,6 +48,7 @@ public class AnalysisInfo implements Writable { private static final Logger LOG = LogManager.getLogger(AnalysisInfo.class); + // TODO: useless, remove it later public enum AnalysisMode { INCREMENTAL, FULL @@ -166,6 +167,9 @@ public class AnalysisInfo implements Writable { @SerializedName("cronExpr") public String cronExprStr; + @SerializedName("progress") + public String progress; + public CronExpression cronExpression; @SerializedName("forceFull") diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index d78434df73..f25cbe8a2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; @@ -39,11 +40,13 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.Util; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.AnalyzeDeletionLog; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSet; import org.apache.doris.qe.ShowResultSetMetaData; @@ -52,11 +55,13 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMode; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; +import org.apache.doris.statistics.util.SimpleQueue; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.reflect.TypeToken; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -66,6 +71,7 @@ import org.quartz.CronExpression; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -73,6 +79,7 @@ import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -94,7 +101,7 @@ public class AnalysisManager extends Daemon implements Writable { private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); // Tracking running manually submitted async tasks, keep in mem only - private final ConcurrentMap> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); + protected final ConcurrentMap> analysisJobIdToTaskMap = new ConcurrentHashMap<>(); private StatisticsCache statisticsCache; @@ -107,13 +114,15 @@ public class AnalysisManager extends Daemon implements Writable { private final Map analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>()); // Tracking system submitted job, keep in mem only - private final Map systemJobInfoMap = new ConcurrentHashMap<>(); + protected final Map systemJobInfoMap = new ConcurrentHashMap<>(); // Tracking and control sync analyze tasks, keep in mem only private final ConcurrentMap ctxToSyncTask = new ConcurrentHashMap<>(); private final Map idToTblStats = new ConcurrentHashMap<>(); + protected SimpleQueue autoJobs = createSimpleQueue(null, this); + private final Function userJobStatusUpdater = w -> { AnalysisInfo info = w.info; AnalysisState taskState = w.taskState; @@ -174,26 +183,44 @@ public class AnalysisManager extends Daemon implements Writable { return null; }; - private final Function systemJobStatusUpdater = w -> { + private final String progressDisplayTemplate = "%d Finished | %d Failed | %d In Progress | %d Total"; + + protected final Function systemJobStatusUpdater = w -> { AnalysisInfo info = w.info; info.state = w.taskState; + info.message = w.message; AnalysisInfo job = systemJobInfoMap.get(info.jobId); if (job == null) { return null; } - for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) { - if (!task.info.state.equals(AnalysisState.FINISHED)) { - if (task.info.state.equals(AnalysisState.FAILED)) { - systemJobInfoMap.remove(info.jobId); - } + int failedCount = 0; + StringJoiner reason = new StringJoiner(", "); + Map taskMap = analysisJobIdToTaskMap.get(info.jobId); + for (BaseAnalysisTask task : taskMap.values()) { + if (task.info.state.equals(AnalysisState.RUNNING) || task.info.state.equals(AnalysisState.PENDING)) { return null; } + if (task.info.state.equals(AnalysisState.FAILED)) { + failedCount++; + reason.add(task.info.message); + } } try { updateTableStats(job); } catch (Throwable e) { LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e); } finally { + job.lastExecTimeInMs = System.currentTimeMillis(); + job.message = reason.toString(); + job.progress = String.format(progressDisplayTemplate, + taskMap.size() - failedCount, failedCount, 0, taskMap.size()); + if (failedCount > 0) { + job.message = reason.toString(); + job.state = AnalysisState.FAILED; + } else { + job.state = AnalysisState.FINISHED; + } + autoJobs.offer(job); systemJobInfoMap.remove(info.jobId); } return null; @@ -202,7 +229,6 @@ public class AnalysisManager extends Daemon implements Writable { private final Function[] updaters = new Function[] {userJobStatusUpdater, systemJobStatusUpdater}; - public AnalysisManager() { super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS)); if (!Env.isCheckpointThread()) { @@ -616,9 +642,19 @@ public class AnalysisManager extends Daemon implements Writable { } public List showAnalysisJob(ShowAnalyzeStmt stmt) { + if (stmt.isAuto()) { + // It's ok to sync on this field, it would only be assigned when instance init or do checkpoint + synchronized (autoJobs) { + return findShowAnalyzeResult(autoJobs, stmt); + } + } + return findShowAnalyzeResult(analysisJobInfoMap.values(), stmt); + } + + protected List findShowAnalyzeResult(Collection analysisInfos, ShowAnalyzeStmt stmt) { String state = stmt.getStateValue(); TableName tblName = stmt.getDbTableName(); - return analysisJobInfoMap.values().stream() + return analysisInfos.stream() .filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId()) .filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state))) .filter(a -> tblName == null || a.catalogName.equals(tblName.getCtl()) @@ -649,7 +685,7 @@ public class AnalysisManager extends Daemon implements Writable { break; } } - return String.format("%d Finished/%d Failed/%d In Progress/%d Total", finished, failed, inProgress, total); + return String.format(progressDisplayTemplate, finished, failed, inProgress, total); } @VisibleForTesting @@ -879,6 +915,7 @@ public class AnalysisManager extends Daemon implements Writable { readAnalysisInfo(in, analysisManager.analysisJobInfoMap, true); readAnalysisInfo(in, analysisManager.analysisTaskInfoMap, false); readIdToTblStats(in, analysisManager.idToTblStats); + readAutoJobs(in, analysisManager); return analysisManager; } @@ -898,11 +935,18 @@ public class AnalysisManager extends Daemon implements Writable { } } + private static void readAutoJobs(DataInput in, AnalysisManager analysisManager) throws IOException { + Type type = new TypeToken>() {}.getType(); + Collection autoJobs = GsonUtils.GSON.fromJson(Text.readString(in), type); + analysisManager.autoJobs = analysisManager.createSimpleQueue(autoJobs, analysisManager); + } + @Override public void write(DataOutput out) throws IOException { writeJobInfo(out, analysisJobInfoMap); writeJobInfo(out, analysisTaskInfoMap); writeTableStats(out); + writeAutoJobsStatus(out); } private void writeJobInfo(DataOutput out, Map infoMap) throws IOException { @@ -919,6 +963,12 @@ public class AnalysisManager extends Daemon implements Writable { } } + private void writeAutoJobsStatus(DataOutput output) throws IOException { + Type type = new TypeToken>() {}.getType(); + String autoJobs = GsonUtils.GSON.toJson(this.autoJobs, type); + Text.writeString(output, autoJobs); + } + // For unit test use only. public void addToJobIdTasksMap(long jobId, Map tasks) { analysisJobIdToTaskMap.put(jobId, tasks); @@ -954,4 +1004,45 @@ public class AnalysisManager extends Daemon implements Writable { systemJobInfoMap.put(jobInfo.jobId, jobInfo); analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos); } + + @VisibleForTesting + protected Set findReAnalyzeNeededPartitions(TableIf table) { + TableStats tableStats = findTableStatsStatus(table.getId()); + if (tableStats == null) { + return table.getPartitionNames().stream().map(table::getPartition) + .filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet()); + } + return table.getPartitionNames().stream() + .map(table::getPartition) + .filter(Partition::hasData) + .filter(partition -> + partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName) + .collect(Collectors.toSet()); + } + + protected void logAutoJob(AnalysisInfo autoJob) { + Env.getCurrentEnv().getEditLog().logAutoJob(autoJob); + } + + public void replayPersistSysJob(AnalysisInfo analysisInfo) { + autoJobs.offer(analysisInfo); + } + + protected SimpleQueue createSimpleQueue(Collection collection, + AnalysisManager analysisManager) { + return new SimpleQueue<>(Config.auto_analyze_job_record_count, + a -> { + // FE is not ready when replaying log and operations triggered by replaying + // shouldn't be logged again. + if (Env.getCurrentEnv().isReady() && !Env.isCheckpointThread()) { + analysisManager.logAutoJob(a); + } + return null; + }, + a -> { + // DO NOTHING + return null; + }, null); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index a7b0073bb4..4b133ce0eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -99,4 +99,9 @@ public class AnalysisTaskExecutor extends Thread { public boolean idle() { return executors.getQueue().isEmpty(); } + + public void clear() { + executors.getQueue().clear(); + taskQueue.clear(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java index 7f55469f53..800c446576 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java @@ -18,11 +18,15 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.statistics.AnalysisInfo.ScheduleType; +import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.time.LocalTime; import java.util.concurrent.FutureTask; public class AnalysisTaskWrapper extends FutureTask { @@ -52,6 +56,14 @@ public class AnalysisTaskWrapper extends FutureTask { if (task.killed) { return; } + if (task.info.scheduleType.equals(ScheduleType.AUTOMATIC) && !StatisticsUtil.checkAnalyzeTime( + LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) { + // TODO: Do we need a separate AnalysisState here? + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, AnalysisState.FAILED, "Auto task" + + "doesn't get executed within specified time range", System.currentTimeMillis()); + return; + } executor.putJob(this); super.run(); Object result = get(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 8940b7182d..92fcc870c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -115,7 +115,7 @@ public abstract class BaseAnalysisTask { init(info); } - private void init(AnalysisInfo info) { + protected void init(AnalysisInfo info) { catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName); if (catalog == null) { Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED, diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index b4a261847d..11f924b5d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -163,7 +163,7 @@ public class ColumnStatistic { String colName = row.get(5); Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName); if (col == null) { - LOG.warn("Failed to deserialize column statistics, ctlId: {} dbId: {}" + LOG.debug("Failed to deserialize column statistics, ctlId: {} dbId: {}" + "tblId: {} column: {} not exists", catalogId, dbID, tblId, colName); return ColumnStatistic.UNKNOWN; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 53bebf53e8..64ea89ff63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -17,9 +17,7 @@ package org.apache.doris.statistics; -import org.apache.doris.analysis.SetType; import org.apache.doris.analysis.TableName; -import org.apache.doris.analysis.VariableExpr; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -27,23 +25,19 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.Config; -import org.apache.doris.common.Pair; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.qe.SessionVariable; -import org.apache.doris.qe.VariableMgr; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.AnalysisInfo.ScheduleType; import org.apache.doris.statistics.util.StatisticsUtil; -import org.apache.commons.lang3.StringUtils; +import com.google.common.collect.Maps; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.time.LocalTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -64,7 +58,8 @@ public class StatisticsAutoCollector extends StatisticsCollector { @Override protected void collect() { - if (!checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) { + if (!StatisticsUtil.checkAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) { + analysisTaskExecutor.clear(); return; } if (Config.enable_full_auto_analyze) { @@ -147,7 +142,7 @@ public class StatisticsAutoCollector extends StatisticsCollector { .setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL) .setAnalysisMethod(analysisMethod) .setSamplePercent(Config.huge_table_default_sample_rows) - .setScheduleType(AnalysisInfo.ScheduleType.ONCE) + .setScheduleType(ScheduleType.AUTOMATIC) .setState(AnalysisState.PENDING) .setTaskIds(new ArrayList<>()) .setLastExecTimeInMs(System.currentTimeMillis()) @@ -175,51 +170,27 @@ public class StatisticsAutoCollector extends StatisticsCollector { return new AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build(); } - - private boolean checkAnalyzeTime(LocalTime now) { - try { - Pair range = findRangeFromGlobalSessionVar(); - DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); - LocalTime start = range == null - ? LocalTime.parse(Config.full_auto_analyze_start_time, timeFormatter) : range.first; - LocalTime end = range == null - ? LocalTime.parse(Config.full_auto_analyze_end_time, timeFormatter) : range.second; - - if (start.isAfter(end) && (now.isAfter(start) || now.isBefore(end))) { - return true; - } else { - return now.isAfter(start) && now.isBefore(end); + @VisibleForTesting + protected AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table, + Set needRunPartitions) { + Map> newColToPartitions = Maps.newHashMap(); + Map> colToPartitions = jobInfo.colToPartitions; + if (colToPartitions == null) { + for (Column c : table.getColumns()) { + if (StatisticsUtil.isUnsupportedType(c.getType())) { + continue; + } + newColToPartitions.put(c.getName(), needRunPartitions); } - } catch (DateTimeParseException e) { - LOG.warn("Parse analyze start/end time format fail", e); - return true; + } else { + colToPartitions.keySet().forEach(colName -> { + Column column = table.getColumn(colName); + if (column != null) { + newColToPartitions.put(colName, needRunPartitions); + } + }); } - } - - private Pair findRangeFromGlobalSessionVar() { - try { - String startTime = - findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME) - .fullAutoAnalyzeStartTime; - if (StringUtils.isEmpty(startTime)) { - return null; - } - String endTime = findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME) - .fullAutoAnalyzeEndTime; - if (StringUtils.isEmpty(startTime)) { - return null; - } - DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); - return Pair.of(LocalTime.parse(startTime, timeFormatter), LocalTime.parse(endTime, timeFormatter)); - } catch (Exception e) { - return null; - } - } - - private SessionVariable findRangeFromGlobalSessionVar(String varName) throws Exception { - SessionVariable sessionVariable = VariableMgr.newSessionVariable(); - VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL); - VariableMgr.getValue(sessionVariable, variableExpr); - return sessionVariable; + return new AnalysisInfoBuilder(jobInfo) + .setColToPartitions(newColToPartitions).build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/SimpleQueue.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/SimpleQueue.java new file mode 100644 index 0000000000..5740c4e308 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/SimpleQueue.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics.util; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.function.Function; + +// Any operation on this structure should be thread-safe +public class SimpleQueue extends LinkedList { + + private final long limit; + + private final Function offerFunc; + + private final Function evictFunc; + + + public SimpleQueue(long limit, Function offerFunc, Function evictFunc) { + this.limit = limit; + this.offerFunc = offerFunc; + this.evictFunc = evictFunc; + } + + @Override + public synchronized boolean offer(T analysisInfo) { + while (size() >= limit) { + remove(); + } + super.offer(analysisInfo); + offerFunc.apply(analysisInfo); + return true; + } + + @Override + public synchronized T remove() { + T analysisInfo = super.remove(); + evictFunc.apply(analysisInfo); + return analysisInfo; + } + + public SimpleQueue(long limit, Function offerFunc, Function evictFunc, Collection collection) { + this(limit, offerFunc, evictFunc); + if (collection != null) { + for (T e : collection) { + offer(e); + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 7a0d700fbb..d7c635bf01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -25,10 +25,12 @@ import org.apache.doris.analysis.FloatLiteral; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LargeIntLiteral; import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.SetType; import org.apache.doris.analysis.StatementBase; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.analysis.VariableExpr; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; @@ -49,6 +51,7 @@ import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; @@ -62,6 +65,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.qe.VariableMgr; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.ColumnStatisticBuilder; import org.apache.doris.statistics.Histogram; @@ -87,6 +91,9 @@ import org.apache.logging.log4j.Logger; import java.net.InetSocketAddress; import java.text.SimpleDateFormat; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -711,4 +718,51 @@ public class StatisticsUtil { } return table instanceof ExternalTable; } + + public static boolean checkAnalyzeTime(LocalTime now) { + try { + Pair range = findRangeFromGlobalSessionVar(); + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); + LocalTime start = range == null + ? LocalTime.parse(Config.full_auto_analyze_start_time, timeFormatter) : range.first; + LocalTime end = range == null + ? LocalTime.parse(Config.full_auto_analyze_end_time, timeFormatter) : range.second; + + if (start.isAfter(end) && (now.isAfter(start) || now.isBefore(end))) { + return true; + } else { + return now.isAfter(start) && now.isBefore(end); + } + } catch (DateTimeParseException e) { + LOG.warn("Parse analyze start/end time format fail", e); + return true; + } + } + + private static Pair findRangeFromGlobalSessionVar() { + try { + String startTime = + findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME) + .fullAutoAnalyzeStartTime; + if (StringUtils.isEmpty(startTime)) { + return null; + } + String endTime = findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME) + .fullAutoAnalyzeEndTime; + if (StringUtils.isEmpty(startTime)) { + return null; + } + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); + return Pair.of(LocalTime.parse(startTime, timeFormatter), LocalTime.parse(endTime, timeFormatter)); + } catch (Exception e) { + return null; + } + } + + private static SessionVariable findRangeFromGlobalSessionVar(String varName) throws Exception { + SessionVariable sessionVariable = VariableMgr.newSessionVariable(); + VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL); + VariableMgr.getValue(sessionVariable, variableExpr); + return sessionVariable; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index 16ef1705d8..2146722db9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -21,6 +21,9 @@ import org.apache.doris.analysis.AnalyzeProperties; import org.apache.doris.analysis.AnalyzeTblStmt; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.DdlException; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; @@ -33,6 +36,7 @@ import mockit.Injectable; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; +import org.apache.hadoop.util.Lists; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -40,6 +44,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; // CHECKSTYLE OFF @@ -262,4 +267,83 @@ public class AnalysisManagerTest { }; } + @Test + public void testSystemJobStatusUpdater() { + new MockUp() { + + @Mock + protected void init(AnalysisInfo info) { + + } + }; + + new MockUp() { + @Mock + public void updateTableStats(AnalysisInfo jobInfo) {} + + @Mock + protected void logAutoJob(AnalysisInfo autoJob) { + + } + }; + + AnalysisManager analysisManager = new AnalysisManager(); + AnalysisInfo job = new AnalysisInfoBuilder() + .setJobId(0) + .setColName("col1, col2").build(); + analysisManager.systemJobInfoMap.put(job.jobId, job); + AnalysisInfo task1 = new AnalysisInfoBuilder() + .setJobId(0) + .setTaskId(1) + .setState(AnalysisState.RUNNING) + .setColName("col1").build(); + AnalysisInfo task2 = new AnalysisInfoBuilder() + .setJobId(0) + .setTaskId(1) + .setState(AnalysisState.FINISHED) + .setColName("col2").build(); + OlapAnalysisTask ot1 = new OlapAnalysisTask(task1); + OlapAnalysisTask ot2 = new OlapAnalysisTask(task2); + Map taskMap = new HashMap<>(); + taskMap.put(ot1.info.taskId, ot1); + taskMap.put(ot2.info.taskId, ot2); + analysisManager.analysisJobIdToTaskMap.put(job.jobId, taskMap); + + // test invalid job + AnalysisInfo invalidJob = new AnalysisInfoBuilder().setJobId(-1).build(); + analysisManager.systemJobStatusUpdater.apply(new TaskStatusWrapper(invalidJob, + AnalysisState.FAILED, "", 0)); + + // test finished + analysisManager.systemJobStatusUpdater.apply(new TaskStatusWrapper(task1, AnalysisState.FAILED, "", 0)); + analysisManager.systemJobStatusUpdater.apply(new TaskStatusWrapper(task1, AnalysisState.FINISHED, "", 0)); + Assertions.assertEquals(1, analysisManager.autoJobs.size()); + Assertions.assertTrue(analysisManager.systemJobInfoMap.isEmpty()); + } + + @Test + public void testReAnalyze() { + new MockUp() { + + int count = 0; + int[] rowCount = new int[]{100, 200}; + @Mock + public long getRowCount() { + return rowCount[count++]; + } + + @Mock + public List getBaseSchema() { + return Lists.newArrayList(new Column("col1", PrimitiveType.INT)); + } + + }; + OlapTable olapTable = new OlapTable(); + TableStats stats1 = new TableStats(0, 50, new AnalysisInfoBuilder().setColName("col1").build()); + Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1)); + TableStats stats2 = new TableStats(0, 190, new AnalysisInfoBuilder().setColName("col1").build()); + Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2)); + + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java index 152e5cf948..2ceda0a888 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/StatisticsAutoCollectorTest.java @@ -209,7 +209,7 @@ public class StatisticsAutoCollectorTest { .setDbName("db") .setTblName("tbl").build(); Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); - // uncomment it when updatedRows get ready + // uncomment it when updatedRows gets ready // Assertions.assertNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); Assertions.assertNotNull(statisticsAutoCollector.getReAnalyzeRequiredPart(analysisInfo2)); }