From 9f221a703be3dace426b21cdc2a5e502fc104fc8 Mon Sep 17 00:00:00 2001 From: ElvinWei Date: Thu, 4 Aug 2022 16:10:49 +0800 Subject: [PATCH] [feature-wip](statistics) step5: show statistics job information (#8862) This pull request includes some implementations of the statistics(https://github.com/apache/incubator-doris/issues/6370), it will not affect any existing code and users will not be able to create statistics job. It implements the display of statistics job information, with the following syntax, users will be able to view the corresponding job information. syntax: ``` SHOW ANALYZE [TABLE | ID] [ WHERE [STATE = ["PENDING"|"SCHEDULING"|"RUNNING"|"FINISHED"|"FAILED"|"CANCELLED"]] ] [ORDER BY ...] [LIMIT limit][OFFSET offset]; ``` e.g. | id | create_time | start_time | finish_time | error_msg | scope | progress | state | | ----- | ----------------------- | ----------------------- | ----------------------- | --------- | ------------------- | -------- | -------- | | 60051 | 2022-07-21 01:26:26.173 | 2022-07-21 01:26:26.186 | 2022-07-21 01:26:27.104 | | table1(citycode,pv) | 5/5 | FINISHED | --- .../doris/analysis/ShowAnalyzeStmt.java | 346 ++++++++++++++++++ .../org/apache/doris/qe/ShowExecutor.java | 12 + .../doris/statistics/StatisticsJob.java | 87 ++++- .../statistics/StatisticsJobManager.java | 99 ++++- 4 files changed, 538 insertions(+), 6 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java new file mode 100644 index 0000000000..29e17e8ef0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowAnalyzeStmt.java @@ -0,0 +1,346 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.OrderByPair; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.statistics.StatisticsJob; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.IntStream; + +/** + * ShowAnalyzeStmt is used to show statistics job info. + * syntax: + * SHOW ANALYZE + * [TABLE | ID] + * [ + * WHERE + * [STATE = ["PENDING"|"SCHEDULING"|"RUNNING"|"FINISHED"|"FAILED"|"CANCELLED"]] + * ] + * [ORDER BY ...] + * [LIMIT limit][OFFSET offset]; + */ +public class ShowAnalyzeStmt extends ShowStmt { + private static final String STATE_NAME = "state"; + private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() + .add("id") + .add("create_time") + .add("start_time") + .add("finish_time") + .add("error_msg") + .add("scope") + .add("progress") + .add("state") + .build(); + + private List jobIds; + private TableName dbTableName; + private Expr whereClause; + private LimitElement limitElement; + private List orderByElements; + + // after analyzed + private long dbId; + private final Set tblIds = Sets.newHashSet(); + + private String stateValue; + private ArrayList orderByPairs; + + public ShowAnalyzeStmt() { + } + + public ShowAnalyzeStmt(List jobIds) { + this.jobIds = jobIds; + } + + public ShowAnalyzeStmt(TableName dbTableName, + Expr whereClause, + List orderByElements, + LimitElement limitElement) { + this.dbTableName = dbTableName; + this.whereClause = whereClause; + this.orderByElements = orderByElements; + this.limitElement = limitElement; + } + + public List getJobIds() { + return jobIds; + } + + public long getDbId() { + Preconditions.checkArgument(isAnalyzed(), + "The dbId must be obtained after the parsing is complete"); + return dbId; + } + + public Set getTblIds() { + Preconditions.checkArgument(isAnalyzed(), + "The dbId must be obtained after the parsing is complete"); + return tblIds; + } + + public String getStateValue() { + Preconditions.checkArgument(isAnalyzed(), + "The tbl name must be obtained after the parsing is complete"); + return stateValue; + } + + public ArrayList getOrderByPairs() { + Preconditions.checkArgument(isAnalyzed(), + "The tbl name must be obtained after the parsing is complete"); + return orderByPairs; + } + + public long getLimit() { + if (limitElement != null && limitElement.hasLimit()) { + return limitElement.getLimit(); + } + return -1L; + } + + public long getOffset() { + if (limitElement != null && limitElement.hasOffset()) { + return limitElement.getOffset(); + } + return -1L; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + + if (dbTableName != null) { + dbTableName.analyze(analyzer); + String dbName = dbTableName.getDb(); + String tblName = dbTableName.getTbl(); + checkShowAnalyzePriv(dbName, tblName); + + Database db = analyzer.getEnv() + .getInternalDataSource().getDbOrAnalysisException(dbName); + Table table = db.getTableOrAnalysisException(tblName); + dbId = db.getId(); + tblIds.add(table.getId()); + } else { + // analyze the current default db + String dbName = analyzer.getDefaultDb(); + if (Strings.isNullOrEmpty(dbName)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); + } + + Database db = analyzer.getEnv() + .getInternalDataSource().getDbOrAnalysisException(dbName); + + db.readLock(); + try { + List tables = db.getTables(); + for (Table table : tables) { + checkShowAnalyzePriv(dbName, table.getName()); + } + + dbId = db.getId(); + for (Table table : tables) { + long tblId = table.getId(); + tblIds.add(tblId); + } + } finally { + db.readUnlock(); + } + } + + // analyze where clause if not null + if (whereClause != null) { + analyzeSubPredicate(whereClause); + } + + // analyze order by + if (orderByElements != null && !orderByElements.isEmpty()) { + orderByPairs = new ArrayList<>(); + for (OrderByElement orderByElement : orderByElements) { + if (orderByElement.getExpr() instanceof SlotRef) { + SlotRef slotRef = (SlotRef) orderByElement.getExpr(); + int index = analyzeColumn(slotRef.getColumnName()); + OrderByPair orderByPair = new OrderByPair(index, !orderByElement.getIsAsc()); + orderByPairs.add(orderByPair); + } else { + throw new AnalysisException("Should order by column"); + } + } + } + } + + @Override + public ShowResultSetMetaData getMetaData() { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String title : TITLE_NAMES) { + builder.addColumn(new Column(title, ScalarType.createVarchar(128))); + } + return builder.build(); + } + + @Override + public RedirectStatus getRedirectStatus() { + return RedirectStatus.FORWARD_NO_SYNC; + } + + private void checkShowAnalyzePriv(String dbName, String tblName) throws AnalysisException { + PaloAuth auth = Env.getCurrentEnv().getAuth(); + if (!auth.checkTblPriv(ConnectContext.get(), dbName, tblName, PrivPredicate.SHOW)) { + ErrorReport.reportAnalysisException( + ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, + "SHOW ANALYZE", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + dbName + ": " + tblName); + } + } + + private void analyzeSubPredicate(Expr subExpr) throws AnalysisException { + if (subExpr == null) { + return; + } + + boolean valid = true; + + CHECK: { + if (subExpr instanceof BinaryPredicate) { + BinaryPredicate binaryPredicate = (BinaryPredicate) subExpr; + if (binaryPredicate.getOp() != BinaryPredicate.Operator.EQ) { + valid = false; + break CHECK; + } + } else { + valid = false; + break CHECK; + } + + // left child + if (!(subExpr.getChild(0) instanceof SlotRef)) { + valid = false; + break CHECK; + } + String leftKey = ((SlotRef) subExpr.getChild(0)).getColumnName(); + if (!STATE_NAME.equalsIgnoreCase(leftKey)) { + valid = false; + break CHECK; + } + + // right child + if (!(subExpr.getChild(1) instanceof StringLiteral)) { + valid = false; + break CHECK; + } + + String value = subExpr.getChild(1).getStringValue(); + if (Strings.isNullOrEmpty(value)) { + valid = false; + break CHECK; + } + + stateValue = value.toUpperCase(); + try { + StatisticsJob.JobState.valueOf(stateValue); + } catch (Exception e) { + valid = false; + } + } + + if (!valid) { + throw new AnalysisException("Where clause should looks like: " + + "STATE = \"PENDING|SCHEDULING|RUNNING|FINISHED|FAILED|CANCELLED\""); + } + } + + private int analyzeColumn(String columnName) throws AnalysisException { + for (String title : TITLE_NAMES) { + if (title.equalsIgnoreCase(columnName)) { + return TITLE_NAMES.indexOf(title); + } + } + throw new AnalysisException("Title name[" + columnName + "] does not exist"); + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("SHOW ANALYZE"); + if (dbTableName != null) { + sb.append(" "); + sb.append(dbTableName.toSql()); + } + + if (whereClause != null) { + sb.append(" "); + sb.append("WHERE"); + sb.append(" "); + sb.append(whereClause.toSql()); + } + + // Order By clause + if (orderByElements != null) { + sb.append(" "); + sb.append("ORDER BY"); + sb.append(" "); + IntStream.range(0, orderByElements.size()).forEach(i -> { + sb.append(orderByElements.get(i).getExpr().toSql()); + sb.append((orderByElements.get(i).getIsAsc()) ? " ASC" : " DESC"); + sb.append((i + 1 != orderByElements.size()) ? ", " : ""); + }); + } + + if (getLimit() != -1L) { + sb.append(" "); + sb.append("LIMIT"); + sb.append(" "); + sb.append(getLimit()); + } + + if (getOffset() != -1L) { + sb.append(" "); + sb.append("OFFSET"); + sb.append(" "); + sb.append(getOffset()); + } + + return sb.toString(); + } + + @Override + public String toString() { + return toSql(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 33268b0781..4c0a09d0b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.DescribeStmt; import org.apache.doris.analysis.HelpStmt; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.ShowAlterStmt; +import org.apache.doris.analysis.ShowAnalyzeStmt; import org.apache.doris.analysis.ShowAuthorStmt; import org.apache.doris.analysis.ShowBackendsStmt; import org.apache.doris.analysis.ShowBackupStmt; @@ -166,6 +167,7 @@ import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.statistics.StatisticsJobManager; import org.apache.doris.system.Backend; import org.apache.doris.system.Diagnoser; import org.apache.doris.system.SystemInfoService; @@ -356,6 +358,8 @@ public class ShowExecutor { handleShowPolicy(); } else if (stmt instanceof ShowCatalogStmt) { handleShowCatalogs(); + } else if (stmt instanceof ShowAnalyzeStmt) { + handleShowAnalyze(); } else { handleEmtpy(); } @@ -2242,4 +2246,12 @@ public class ShowExecutor { ShowCatalogStmt showStmt = (ShowCatalogStmt) stmt; resultSet = Env.getCurrentEnv().getDataSourceMgr().showCatalogs(showStmt); } + + private void handleShowAnalyze() throws AnalysisException { + ShowAnalyzeStmt showStmt = (ShowAnalyzeStmt) stmt; + StatisticsJobManager jobManager = Env.getCurrentEnv() + .getStatisticsJobManager(); + List> results = jobManager.getAnalyzeJobInfos(showStmt); + resultSet = new ShowResultSet(showStmt.getMetaData(), results); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java index 7d214fca29..b89de60324 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java @@ -18,20 +18,28 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeStmt; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.common.util.TimeUtils; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.text.SimpleDateFormat; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Nullable; /*** * Used to store statistics job info, @@ -255,7 +263,7 @@ public class StatisticsJob { * get statisticsJob from analyzeStmt. * AnalyzeStmt: analyze t1(c1,c2,c3) * tableId: [t1] - * tableIdToColumnName + * tableIdToColumnName: {t1: [c1,c2,c3]} */ public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt stmt) throws AnalysisException { long dbId = stmt.getDbId(); @@ -265,4 +273,81 @@ public class StatisticsJob { Map properties = stmt.getProperties(); return new StatisticsJob(dbId, tblIds, tableIdToPartitionName, tableIdToColumnName, properties); } + + public List getShowInfo(@Nullable Long tableId) throws AnalysisException { + List result = Lists.newArrayList(); + + result.add(Long.toString(id)); + + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + result.add(TimeUtils.longToTimeString(createTime, dateFormat)); + result.add(startTime != -1L ? TimeUtils.longToTimeString(startTime, dateFormat) : "N/A"); + result.add(finishTime != -1L ? TimeUtils.longToTimeString(finishTime, dateFormat) : "N/A"); + + StringBuilder sb = new StringBuilder(); + for (String errorMsg : errorMsgs) { + sb.append(errorMsg).append("\n"); + } + result.add(sb.toString()); + + int totalTaskNum = 0; + int finishedTaskNum = 0; + Map> tblIdToCols = Maps.newHashMap(); + + for (StatisticsTask task : tasks) { + List statsDescs = task.getStatsDescs(); + + if (!statsDescs.isEmpty()) { + // The same task has the same stats properties + StatsCategory statsCategory = statsDescs.get(0).getStatsCategory(); + long tblId = statsCategory.getTableId(); + + if (tableId == null || tableId == tblId) { + totalTaskNum++; + if (task.getTaskState() == StatisticsTask.TaskState.FINISHED) { + finishedTaskNum++; + } + + String col = statsCategory.getColumnName(); + if (Strings.isNullOrEmpty(col)) { + continue; + } + tblIdToCols.computeIfAbsent(tblId, + (key) -> Sets.newHashSet()).add(col); + } + } + } + + List scope = Lists.newArrayList(); + Database db = Env.getCurrentEnv() + .getInternalDataSource().getDbOrAnalysisException(dbId); + for (Long tblId : tblIds) { + try { + Table table = db.getTableOrAnalysisException(tblId); + List baseSchema = table.getBaseSchema(); + Set cols = tblIdToCols.get(tblId); + if (cols != null) { + if (baseSchema.size() == cols.size()) { + scope.add(table.getName() + "(*)"); + } else { + scope.add(table.getName() + "(" + StringUtils.join(cols.toArray(), ", ") + ")"); + } + } + } catch (AnalysisException e) { + // catch this exception when table is dropped + LOG.info("get table failed, tableId: " + tblId, e); + } + } + + result.add(StringUtils.join(scope.toArray(), ",")); + result.add(finishedTaskNum + "/" + totalTaskNum); + + if (totalTaskNum == finishedTaskNum) { + result.add("FINISHED"); + } else { + result.add(jobState.toString()); + } + + return result; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java index 1513cbd05b..a8afd9850f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java @@ -18,6 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeStmt; +import org.apache.doris.analysis.ShowAnalyzeStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; @@ -27,14 +28,22 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.ListComparator; +import org.apache.doris.common.util.OrderByPair; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * For unified management of statistics job, @@ -67,7 +76,7 @@ public class StatisticsJobManager { } public Map getIdToStatisticsJob() { - return this.idToStatisticsJob; + return idToStatisticsJob; } public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException { @@ -76,16 +85,16 @@ public class StatisticsJobManager { writeLock(); try { // step2: check restrict - this.checkRestrict(analyzeStmt.getDbId(), statisticsJob.getTblIds()); + checkRestrict(analyzeStmt.getDbId(), statisticsJob.getTblIds()); // step3: create it - this.createStatisticsJob(statisticsJob); + createStatisticsJob(statisticsJob); } finally { writeUnlock(); } } public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException { - this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob); + idToStatisticsJob.put(statisticsJob.getId(), statisticsJob); try { Env.getCurrentEnv().getStatisticsJobScheduler().addPendingJob(statisticsJob); } catch (IllegalStateException e) { @@ -119,7 +128,7 @@ public class StatisticsJobManager { int unfinishedJobs = 0; // check table unfinished job - for (StatisticsJob statisticsJob : this.idToStatisticsJob.values()) { + for (StatisticsJob statisticsJob : idToStatisticsJob.values()) { StatisticsJob.JobState jobState = statisticsJob.getJobState(); Set tblIds = statisticsJob.getTblIds(); if (jobState == StatisticsJob.JobState.PENDING @@ -140,4 +149,84 @@ public class StatisticsJobManager { + Config.cbo_max_statistics_job_num); } } + + public List> getAnalyzeJobInfos(ShowAnalyzeStmt showStmt) throws AnalysisException { + List> results = Lists.newArrayList(); + + String stateValue = showStmt.getStateValue(); + StatisticsJob.JobState jobState = null; + if (!Strings.isNullOrEmpty(stateValue)) { + jobState = StatisticsJob.JobState.valueOf(stateValue); + } + + // step 1: get job infos + List jobIds = showStmt.getJobIds(); + if (jobIds != null && !jobIds.isEmpty()) { + for (Long jobId : jobIds) { + StatisticsJob statisticsJob = idToStatisticsJob.get(jobId); + if (statisticsJob == null) { + throw new AnalysisException("No such job id: " + jobId); + } + if (jobState == null || jobState == statisticsJob.getJobState()) { + List showInfo = statisticsJob.getShowInfo(null); + results.add(showInfo); + } + } + } else { + long dbId = showStmt.getDbId(); + Set tblIds = showStmt.getTblIds(); + for (StatisticsJob statisticsJob : idToStatisticsJob.values()) { + long jobDbId = statisticsJob.getDbId(); + if (jobDbId == dbId) { + // check the state + if (jobState == null || jobState == statisticsJob.getJobState()) { + Set jobTblIds = statisticsJob.getTblIds(); + // get the intersection of two sets + Set set = Sets.newHashSet(); + set.addAll(jobTblIds); + set.retainAll(tblIds); + for (long tblId : set) { + List showInfo = statisticsJob.getShowInfo(tblId); + results.add(showInfo); + } + } + } + } + } + + // step2: order the result + ListComparator> comparator; + List orderByPairs = showStmt.getOrderByPairs(); + if (orderByPairs == null) { + // sort by id asc + comparator = new ListComparator<>(0); + } else { + OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()]; + comparator = new ListComparator<>(orderByPairs.toArray(orderByPairArr)); + } + results.sort(comparator); + + // step3: filter by limit + long limit = showStmt.getLimit(); + long offset = showStmt.getOffset() == -1L ? 0 : showStmt.getOffset(); + if (offset >= results.size()) { + results = Collections.emptyList(); + } else if (limit != -1L) { + if ((limit + offset) >= results.size()) { + results = results.subList((int) offset, results.size()); + } else { + results = results.subList((int) offset, (int) (limit + offset)); + } + } + + // step4: convert to result and return it + List> rows = Lists.newArrayList(); + for (List result : results) { + List row = result.stream().map(Object::toString) + .collect(Collectors.toList()); + rows.add(row); + } + + return rows; + } }