From 4bd5d4f1631db8e11ff4ead44206aa2e790b4482 Mon Sep 17 00:00:00 2001 From: ElvinWei Date: Sun, 1 May 2022 11:34:08 +0800 Subject: [PATCH] [feature-wip](statistics) step3: schedule the statistics tasks and update relevant info (#8860) This pull request includes some implementations of the statistics(https://github.com/apache/incubator-doris/issues/6370), it will not affect any existing code and users will not be able to create statistics job. After receiving the statistics statement and dividing the collection task, here we will start implementing the scheduling statistics task and updating the job information. Mainly include the following: - Create a thread pool to schedule a certain number of tasks, and the number of concurrency is related to the configuration `cbo_concurrency_statistics_task_num`. - After the task is completed, update the information of of the statistics Job. --- .../doris/analysis/AlterColumnStatsStmt.java | 18 ++- .../doris/analysis/AlterTableStatsStmt.java | 18 ++- .../doris/analysis/ShowColumnStatsStmt.java | 12 +- .../doris/analysis/ShowTableStatsStmt.java | 4 +- .../apache/doris/statistics/ColumnStats.java | 53 ++++--- .../apache/doris/statistics/Statistics.java | 8 +- .../doris/statistics/StatisticsManager.java | 54 ++++++- .../doris/statistics/StatisticsTask.java | 101 +++++------- .../statistics/StatisticsTaskResult.java | 18 ++- .../statistics/StatisticsTaskScheduler.java | 147 +++++++++++++++--- .../apache/doris/statistics/StatsType.java | 34 +++- .../apache/doris/statistics/TableStats.java | 20 +-- 12 files changed, 333 insertions(+), 154 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java index 7787b3d876..17d631a1a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java @@ -25,15 +25,17 @@ import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.ColumnStats; +import org.apache.doris.statistics.StatsType; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import java.util.Map; import java.util.Optional; public class AlterColumnStatsStmt extends DdlStmt { - private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() + private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() .add(ColumnStats.NDV) .add(ColumnStats.AVG_SIZE) .add(ColumnStats.MAX_SIZE) @@ -45,6 +47,7 @@ public class AlterColumnStatsStmt extends DdlStmt { private TableName tableName; private String columnName; private Map properties; + public final Map statsTypeToValue = Maps.newHashMap(); public AlterColumnStatsStmt(TableName tableName, String columnName, Map properties) { this.tableName = tableName; @@ -58,8 +61,8 @@ public class AlterColumnStatsStmt extends DdlStmt { // check table name tableName.analyze(analyzer); // check properties - Optional optional = properties.keySet().stream().filter( - entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity.toLowerCase())).findFirst(); + Optional optional = properties.keySet().stream().map(StatsType::fromString) + .filter(statsType -> !CONFIGURABLE_PROPERTIES_SET.contains(statsType)).findFirst(); if (optional.isPresent()) { throw new AnalysisException(optional.get() + " is invalid statistic"); } @@ -71,6 +74,11 @@ public class AlterColumnStatsStmt extends DdlStmt { ConnectContext.get().getRemoteIP(), tableName.getDb() + ": " + tableName.getTbl()); } + // get statsTypeToValue + properties.forEach((key, value) -> { + StatsType statsType = StatsType.fromString(key); + statsTypeToValue.put(statsType, value); + }); } public TableName getTableName() { @@ -81,7 +89,7 @@ public class AlterColumnStatsStmt extends DdlStmt { return columnName; } - public Map getProperties() { - return properties; + public Map getStatsTypeToValue() { + return statsTypeToValue; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java index 97512a4160..6d8ea6027e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java @@ -24,22 +24,25 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.StatsType; import org.apache.doris.statistics.TableStats; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import java.util.Map; import java.util.Optional; public class AlterTableStatsStmt extends DdlStmt { - private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() + private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() .add(TableStats.DATA_SIZE) .add(TableStats.ROW_COUNT) .build(); private TableName tableName; private Map properties; + public final Map statsTypeToValue = Maps.newHashMap(); public AlterTableStatsStmt(TableName tableName, Map properties) { this.tableName = tableName; @@ -52,8 +55,8 @@ public class AlterTableStatsStmt extends DdlStmt { // check table name tableName.analyze(analyzer); // check properties - Optional optional = properties.keySet().stream().filter( - entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity.toLowerCase())).findFirst(); + Optional optional = properties.keySet().stream().map(StatsType::fromString) + .filter(statsType -> !CONFIGURABLE_PROPERTIES_SET.contains(statsType)).findFirst(); if (optional.isPresent()) { throw new AnalysisException(optional.get() + " is invalid statistic"); } @@ -65,13 +68,18 @@ public class AlterTableStatsStmt extends DdlStmt { ConnectContext.get().getRemoteIP(), tableName.getDb() + ": " + tableName.getTbl()); } + // get statsTypeToValue + properties.forEach((key, value) -> { + StatsType statsType = StatsType.fromString(key); + statsTypeToValue.put(statsType, value); + }); } public TableName getTableName() { return tableName; } - public Map getProperties() { - return properties; + public Map getStatsTypeToValue() { + return statsTypeToValue; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java index af41121301..784e719188 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java @@ -31,12 +31,12 @@ public class ShowColumnStatsStmt extends ShowStmt { private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("column_name") - .add(ColumnStats.NDV) - .add(ColumnStats.AVG_SIZE) - .add(ColumnStats.MAX_SIZE) - .add(ColumnStats.NUM_NULLS) - .add(ColumnStats.MIN_VALUE) - .add(ColumnStats.MAX_VALUE) + .add(ColumnStats.NDV.getValue()) + .add(ColumnStats.AVG_SIZE.getValue()) + .add(ColumnStats.MAX_SIZE.getValue()) + .add(ColumnStats.NUM_NULLS.getValue()) + .add(ColumnStats.MIN_VALUE.getValue()) + .add(ColumnStats.MAX_VALUE.getValue()) .build(); private TableName tableName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java index a35eb1f0c1..90541cb7dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java @@ -35,8 +35,8 @@ public class ShowTableStatsStmt extends ShowStmt { private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("table_name") - .add(TableStats.ROW_COUNT) - .add(TableStats.DATA_SIZE) + .add(TableStats.ROW_COUNT.getValue()) + .add(TableStats.DATA_SIZE.getValue()) .build(); private TableName tableName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java index 5982982f21..df3b17899b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java @@ -32,12 +32,12 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.Util; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import java.util.List; import java.util.Map; import java.util.function.Predicate; -import com.clearspring.analytics.util.Lists; /** * There are the statistics of column. @@ -57,12 +57,12 @@ import com.clearspring.analytics.util.Lists; */ public class ColumnStats { - public static final String NDV = "ndv"; - public static final String AVG_SIZE = "avg_size"; - public static final String MAX_SIZE = "max_size"; - public static final String NUM_NULLS = "num_nulls"; - public static final String MIN_VALUE = "min_value"; - public static final String MAX_VALUE = "max_value"; + public static final StatsType NDV = StatsType.NDV; + public static final StatsType AVG_SIZE = StatsType.AVG_SIZE; + public static final StatsType MAX_SIZE = StatsType.MAX_SIZE; + public static final StatsType NUM_NULLS = StatsType.NUM_NULLS; + public static final StatsType MIN_VALUE = StatsType.MIN_VALUE; + public static final StatsType MAX_VALUE = StatsType.MAX_VALUE; private static final Predicate DESIRED_NDV_PRED = (v) -> v >= -1L; private static final Predicate DESIRED_AVG_SIZE_PRED = (v) -> (v == -1) || (v >= 0); @@ -76,25 +76,34 @@ public class ColumnStats { private LiteralExpr minValue; private LiteralExpr maxValue; - public void updateStats(Type columnType, Map statsNameToValue) throws AnalysisException { - for (Map.Entry entry : statsNameToValue.entrySet()) { - String statsName = entry.getKey(); - if (statsName.equalsIgnoreCase(NDV)) { - ndv = Util.getLongPropertyOrDefault(entry.getValue(), ndv, + public void updateStats(Type columnType, Map statsNameToValue) throws AnalysisException { + for (Map.Entry entry : statsNameToValue.entrySet()) { + StatsType statsType = entry.getKey(); + switch (statsType) { + case NDV: + ndv = Util.getLongPropertyOrDefault(entry.getValue(), ndv, DESIRED_NDV_PRED, NDV + " should >= -1"); - } else if (statsName.equalsIgnoreCase(AVG_SIZE)) { - avgSize = Util.getFloatPropertyOrDefault(entry.getValue(), avgSize, + break; + case AVG_SIZE: + avgSize = Util.getFloatPropertyOrDefault(entry.getValue(), avgSize, DESIRED_AVG_SIZE_PRED, AVG_SIZE + " should (>=0) or (=-1)"); - } else if (statsName.equalsIgnoreCase(MAX_SIZE)) { - maxSize = Util.getLongPropertyOrDefault(entry.getValue(), maxSize, + break; + case MAX_SIZE: + maxSize = Util.getLongPropertyOrDefault(entry.getValue(), maxSize, DESIRED_MAX_SIZE_PRED, MAX_SIZE + " should >=-1"); - } else if (statsName.equalsIgnoreCase(NUM_NULLS)) { - numNulls = Util.getLongPropertyOrDefault(entry.getValue(), numNulls, + break; + case NUM_NULLS: + numNulls = Util.getLongPropertyOrDefault(entry.getValue(), numNulls, DESIRED_NUM_NULLS_PRED, NUM_NULLS + " should >=-1"); - } else if (statsName.equalsIgnoreCase(MIN_VALUE)) { - minValue = validateColumnValue(columnType, entry.getValue()); - } else if (statsName.equalsIgnoreCase(MAX_VALUE)) { - maxValue = validateColumnValue(columnType, entry.getValue()); + break; + case MIN_VALUE: + minValue = validateColumnValue(columnType, entry.getValue()); + break; + case MAX_VALUE: + maxValue = validateColumnValue(columnType, entry.getValue()); + break; + default: + throw new AnalysisException("Unknown stats type: " + statsType); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java index 58003de90c..77aed0c5f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java @@ -38,25 +38,25 @@ public class Statistics { private Map idToTableStats = Maps.newConcurrentMap(); - public void updateTableStats(long tableId, Map statsNameToValue) + public void updateTableStats(long tableId, Map statsTypeToValue) throws AnalysisException { TableStats tableStats = idToTableStats.get(tableId); if (tableStats == null) { tableStats = new TableStats(); idToTableStats.put(tableId, tableStats); } - tableStats.updateTableStats(statsNameToValue); + tableStats.updateTableStats(statsTypeToValue); } public void updateColumnStats(long tableId, String columnName, Type columnType, - Map statsNameToValue) + Map statsTypeToValue) throws AnalysisException { TableStats tableStats = idToTableStats.get(tableId); if (tableStats == null) { tableStats = new TableStats(); idToTableStats.put(tableId, tableStats); } - tableStats.updateColumnStats(columnName, columnType, statsNameToValue); + tableStats.updateColumnStats(columnName, columnType, statsTypeToValue); } public TableStats getTableStats(long tableId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java index b1c88e36a5..0fea564740 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java @@ -24,18 +24,25 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.Map; -import com.clearspring.analytics.util.Lists; - public class StatisticsManager { + private final static Logger LOG = LogManager.getLogger(StatisticsManager.class); + private Statistics statistics; public StatisticsManager() { @@ -45,7 +52,7 @@ public class StatisticsManager { public void alterTableStatistics(AlterTableStatsStmt stmt) throws AnalysisException { Table table = validateTableName(stmt.getTableName()); - statistics.updateTableStats(table.getId(), stmt.getProperties()); + statistics.updateTableStats(table.getId(), stmt.getStatsTypeToValue()); } public void alterColumnStatistics(AlterColumnStatsStmt stmt) throws AnalysisException { @@ -56,7 +63,7 @@ public class StatisticsManager { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName()); } // match type and column value - statistics.updateColumnStats(table.getId(), columnName, column.getType(), stmt.getProperties()); + statistics.updateColumnStats(table.getId(), columnName, column.getType(), stmt.getStatsTypeToValue()); } public List> showTableStatsList(String dbName, String tableName) @@ -128,13 +135,48 @@ public class StatisticsManager { return row; } + public void alterTableStatistics(StatisticsTaskResult taskResult) throws AnalysisException { + StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc(); + validateTableAndColumn(categoryDesc); + long tblId = categoryDesc.getTableId(); + Map statsTypeToValue = taskResult.getStatsTypeToValue(); + statistics.updateTableStats(tblId, statsTypeToValue); + } + + public void alterColumnStatistics(StatisticsTaskResult taskResult) throws AnalysisException { + StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc(); + validateTableAndColumn(categoryDesc); + long dbId = categoryDesc.getDbId(); + long tblId = categoryDesc.getTableId(); + Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId); + Table table = db.getTableOrAnalysisException(tblId); + String columnName = categoryDesc.getColumnName(); + Type columnType = table.getColumn(columnName).getType(); + Map statsTypeToValue = taskResult.getStatsTypeToValue(); + statistics.updateColumnStats(tblId, columnName, columnType, statsTypeToValue); + } + private Table validateTableName(TableName dbTableName) throws AnalysisException { String dbName = dbTableName.getDb(); String tableName = dbTableName.getTbl(); Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbName); - Table table = db.getTableOrAnalysisException(tableName); - return table; + return db.getTableOrAnalysisException(tableName); + } + + private void validateTableAndColumn(StatsCategoryDesc categoryDesc) throws AnalysisException { + long dbId = categoryDesc.getDbId(); + long tblId = categoryDesc.getTableId(); + String columnName = categoryDesc.getColumnName(); + + Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId); + Table table = db.getTableOrAnalysisException(tblId); + if (!Strings.isNullOrEmpty(columnName)) { + Column column = table.getColumn(columnName); + if (column == null) { + throw new AnalysisException("Column " + columnName + " does not exist in table " + table.getName()); + } + } } public Statistics getStatistics() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java index fa236584c3..76af28c48f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java @@ -25,7 +25,6 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * The StatisticsTask belongs to one StatisticsJob. @@ -48,8 +47,6 @@ public abstract class StatisticsTask implements Callable { FAILED } - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - protected long id = Catalog.getCurrentCatalog().getNextId(); protected long jobId; protected StatsGranularityDesc granularityDesc; @@ -71,24 +68,8 @@ public abstract class StatisticsTask implements Callable { this.statsTypeList = statsTypeList; } - public void readLock() { - lock.readLock().lock(); - } - - public void readUnlock() { - lock.readLock().unlock(); - } - - protected void writeLock() { - lock.writeLock().lock(); - } - - protected void writeUnlock() { - lock.writeLock().unlock(); - } - public long getId() { - return this.id; + return id; } public void setId(long id) { @@ -96,35 +77,35 @@ public abstract class StatisticsTask implements Callable { } public long getJobId() { - return this.jobId; + return jobId; } public StatsGranularityDesc getGranularityDesc() { - return this.granularityDesc; + return granularityDesc; } public StatsCategoryDesc getCategoryDesc() { - return this.categoryDesc; + return categoryDesc; } public List getStatsTypeList() { - return this.statsTypeList; + return statsTypeList; } public TaskState getTaskState() { - return this.taskState; + return taskState; } public long getCreateTime() { - return this.createTime; + return createTime; } public long getStartTime() { - return this.startTime; + return startTime; } public long getFinishTime() { - return this.finishTime; + return finishTime; } /** @@ -139,44 +120,38 @@ public abstract class StatisticsTask implements Callable { // please retain job lock firstly public void updateTaskState(TaskState newState) throws DdlException { LOG.info("To change statistics task(id={}) state from {} to {}", id, taskState, newState); - try { - // PENDING -> RUNNING/FAILED - if (taskState == TaskState.PENDING) { - if (newState == TaskState.RUNNING) { - taskState = newState; - // task start running, set start time + String errorMsg = "Invalid statistics task state transition from "; + + // PENDING -> RUNNING/FAILED + if (taskState == TaskState.PENDING) { + switch (newState) { + case RUNNING: startTime = System.currentTimeMillis(); - LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); - } else if (newState == TaskState.FAILED) { - taskState = newState; - LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); - } else { - LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new DdlException("Invalid task state transition from PENDING to " + newState); - } - return; - } - - // RUNNING -> FINISHED/FAILED - if (taskState == TaskState.RUNNING) { - if (newState == TaskState.FINISHED) { - // set finish time + break; + case FAILED: finishTime = System.currentTimeMillis(); - taskState = newState; - LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); - } else if (newState == TaskState.FAILED) { - taskState = newState; - LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); - } else { - LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new DdlException("Invalid task state transition from RUNNING to " + newState); - } + break; + default: + throw new DdlException(errorMsg + taskState + " to " + newState); } - - LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new DdlException("Invalid task state transition from " + taskState + " to " + newState); - } finally { - LOG.info("Statistics task(id={}) current state is {}", id, taskState); } + // RUNNING -> FINISHED/FAILED + else if (taskState == TaskState.RUNNING) { + switch (newState) { + case FINISHED: + case FAILED: + finishTime = System.currentTimeMillis(); + break; + default: + throw new DdlException(errorMsg + taskState + " to " + newState); + } + } + // unsupported state transition + else { + throw new DdlException(errorMsg + taskState + " to " + newState); + } + + LOG.info("Statistics job(id={}) state changed from {} to {}", id, taskState, newState); + taskState = newState; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java index 700e2fa93a..94f4c50934 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java @@ -20,9 +20,9 @@ package org.apache.doris.statistics; import java.util.Map; public class StatisticsTaskResult { - private StatsGranularityDesc granularityDesc; - private StatsCategoryDesc categoryDesc; - private Map statsTypeToValue; + private final StatsGranularityDesc granularityDesc; + private final StatsCategoryDesc categoryDesc; + private final Map statsTypeToValue; public StatisticsTaskResult(StatsGranularityDesc granularityDesc, StatsCategoryDesc categoryDesc, Map statsTypeToValue) { @@ -30,4 +30,16 @@ public class StatisticsTaskResult { this.categoryDesc = categoryDesc; this.statsTypeToValue = statsTypeToValue; } + + public StatsGranularityDesc getGranularityDesc() { + return granularityDesc; + } + + public StatsCategoryDesc getCategoryDesc() { + return categoryDesc; + } + + public Map getStatsTypeToValue() { + return statsTypeToValue; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java index 46450417b6..0edfa82047 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java @@ -17,20 +17,33 @@ package org.apache.doris.statistics; +import org.apache.doris.analysis.AnalyzeStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.statistics.StatisticsJob.JobState; +import org.apache.doris.statistics.StatisticsTask.TaskState; +import org.apache.doris.statistics.StatsCategoryDesc.StatsCategory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /* Schedule statistics task @@ -46,25 +59,41 @@ public class StatisticsTaskScheduler extends MasterDaemon { @Override protected void runAfterCatalogReady() { - // TODO // step1: task n concurrent tasks from the queue List tasks = peek(); - // step2: execute tasks - ExecutorService executor = Executors.newFixedThreadPool(tasks.size()); - List> taskResultList = null; - try { - taskResultList = executor.invokeAll(tasks); - } catch (InterruptedException e) { - LOG.warn("Failed to execute this turn of statistics tasks", e); - } - // step3: update job and statistics - handleTaskResult(taskResultList); - // step4: remove task from queue - remove(tasks.size()); + if (!tasks.isEmpty()) { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(tasks.size(), + "statistic-pool", false); + StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager(); + Map statisticsJobs = jobManager.getIdToStatisticsJob(); + Map>>> resultMap = Maps.newLinkedHashMap(); + + for (StatisticsTask task : tasks) { + queue.remove(); + long jobId = task.getJobId(); + StatisticsJob statisticsJob = statisticsJobs.get(jobId); + + if (checkJobIsValid(jobId)) { + // step2: execute task and save task result + Future future = executor.submit(task); + if (updateTaskAndJobState(task, statisticsJob)) { + Map> taskInfo = Maps.newHashMap(); + taskInfo.put(task.getId(), future); + List>> jobInfo = resultMap + .getOrDefault(jobId, Lists.newArrayList()); + jobInfo.add(taskInfo); + resultMap.put(jobId, jobInfo); + } + } + } + + // step3: handle task results + handleTaskResult(resultMap); + } } - public void addTasks(List statisticsTaskList) { + public void addTasks(List statisticsTaskList) throws IllegalStateException { queue.addAll(statisticsTaskList); } @@ -82,11 +111,89 @@ public class StatisticsTaskScheduler extends MasterDaemon { return tasks; } - private void remove(int size) { - // TODO + /** + * Update task and job state + * + * @param task statistics task + * @param job statistics job + * @return true if update task and job state successfully. + */ + private boolean updateTaskAndJobState(StatisticsTask task, StatisticsJob job) { + try { + // update task state + task.updateTaskState(TaskState.RUNNING); + } catch (DdlException e) { + LOG.info("Update statistics task state failed, taskId: " + task.getId(), e); + } + + try { + // update job state + if (task.getTaskState() != TaskState.RUNNING) { + job.updateJobState(JobState.FAILED); + } else { + if (job.getJobState() == JobState.SCHEDULING) { + job.updateJobState(JobState.RUNNING); + } + } + } catch (DdlException e) { + LOG.info("Update statistics job state failed, jobId: " + job.getId(), e); + return false; + } + return true; } - private void handleTaskResult(List> taskResultLists) { - // TODO + private void handleTaskResult(Map>>> resultMap) { + StatisticsManager statsManager = Catalog.getCurrentCatalog().getStatisticsManager(); + StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager(); + + resultMap.forEach((jobId, taskMapList) -> { + if (checkJobIsValid(jobId)) { + String errorMsg = ""; + StatisticsJob statisticsJob = jobManager.getIdToStatisticsJob().get(jobId); + Map properties = statisticsJob.getProperties(); + long timeout = Long.parseLong(properties.get(AnalyzeStmt.CBO_STATISTICS_TASK_TIMEOUT_SEC)); + + for (Map> taskInfos : taskMapList) { + for (Map.Entry> taskInfo : taskInfos.entrySet()) { + Long taskId = taskInfo.getKey(); + Future future = taskInfo.getValue(); + + try { + StatisticsTaskResult taskResult = future.get(timeout, TimeUnit.SECONDS); + StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc(); + StatsCategory category = categoryDesc.getCategory(); + if (category == StatsCategory.TABLE) { + // update table statistics + statsManager.alterTableStatistics(taskResult); + } else if (category == StatsCategory.COLUMN) { + // update column statistics + statsManager.alterColumnStatistics(taskResult); + } + } catch (AnalysisException | TimeoutException | ExecutionException + | InterruptedException | CancellationException e) { + errorMsg = e.getMessage(); + LOG.info("Failed to update statistics. jobId: {}, taskId: {}, e: {}", jobId, taskId, e); + } + + try { + // update the task and job info + statisticsJob.updateJobInfoByTaskId(taskId, errorMsg); + } catch (DdlException e) { + LOG.info("Failed to update statistics job info. jobId: {}, taskId: {}, e: {}", jobId, taskId, e); + } + } + } + } + }); + } + + public boolean checkJobIsValid(Long jobId) { + StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager(); + StatisticsJob statisticsJob = jobManager.getIdToStatisticsJob().get(jobId); + if (statisticsJob == null) { + return false; + } + JobState jobState = statisticsJob.getJobState(); + return jobState != JobState.CANCELLED && jobState != JobState.FAILED; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java index 50f916dadf..e805bff1a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java @@ -18,12 +18,30 @@ package org.apache.doris.statistics; public enum StatsType { - ROW_COUNT, - DATA_SIZE, - NDV, - AVG_SIZE, - MAX_SIZE, - NUM_NULLS, - MIN_VALUE, - MAX_VALUE + ROW_COUNT("row_count"), + DATA_SIZE("data_size"), + NDV("ndv"), + AVG_SIZE("avg_size"), + MAX_SIZE("max_size"), + NUM_NULLS("num_nulls"), + MIN_VALUE("min_value"), + MAX_VALUE("max_value"), + MAX_COL_LENS("max_col_lens"), + AVG_COL_LENS("avg_col_lens"); + private final String value; + StatsType(String value) { + this.value = value; + } + public String getValue() { + return value; + } + + public static StatsType fromString(String value) { + for (StatsType type : StatsType.values()) { + if (type.value.equalsIgnoreCase(value)) { + return type; + } + } + throw new IllegalArgumentException("Invalid StatsType: " + value); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java index ef494bd9f6..08ef2cc6be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java @@ -21,13 +21,13 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.Util; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; import java.util.Map; import java.util.function.Predicate; -import com.clearspring.analytics.util.Lists; /** * There are the statistics of table. @@ -51,8 +51,8 @@ import com.clearspring.analytics.util.Lists; */ public class TableStats { - public static final String ROW_COUNT = "row_count"; - public static final String DATA_SIZE = "data_size"; + public static final StatsType DATA_SIZE = StatsType.DATA_SIZE; + public static final StatsType ROW_COUNT = StatsType.ROW_COUNT; private static final Predicate DESIRED_ROW_COUNT_PRED = (v) -> v >= -1L; private static final Predicate DESIRED_DATA_SIZE_PRED = (v) -> v >= -1L; @@ -61,27 +61,27 @@ public class TableStats { private long dataSize = -1; private Map nameToColumnStats = Maps.newConcurrentMap(); - public void updateTableStats(Map statsNameToValue) throws AnalysisException { - for (Map.Entry entry : statsNameToValue.entrySet()) { - String statsName = entry.getKey(); - if (statsName.equalsIgnoreCase(ROW_COUNT)) { + public void updateTableStats(Map statsTypeToValue) throws AnalysisException { + for (Map.Entry entry : statsTypeToValue.entrySet()) { + StatsType statsType = entry.getKey(); + if (statsType == ROW_COUNT) { rowCount = Util.getLongPropertyOrDefault(entry.getValue(), rowCount, DESIRED_ROW_COUNT_PRED, ROW_COUNT + " should >= -1"); - } else if (statsName.equalsIgnoreCase(DATA_SIZE)) { + } else if (statsType == DATA_SIZE) { dataSize = Util.getLongPropertyOrDefault(entry.getValue(), dataSize, DESIRED_DATA_SIZE_PRED, DATA_SIZE + " should >= -1"); } } } - public void updateColumnStats(String columnName, Type columnType, Map statsNameToValue) + public void updateColumnStats(String columnName, Type columnType, Map statsTypeToValue) throws AnalysisException { ColumnStats columnStats = nameToColumnStats.get(columnName); if (columnStats == null) { columnStats = new ColumnStats(); nameToColumnStats.put(columnName, columnStats); } - columnStats.updateStats(columnType, statsNameToValue); + columnStats.updateStats(columnType, statsTypeToValue); } public List getShowInfo() {