diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java index 7787b3d876..17d631a1a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterColumnStatsStmt.java @@ -25,15 +25,17 @@ import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.ColumnStats; +import org.apache.doris.statistics.StatsType; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import java.util.Map; import java.util.Optional; public class AlterColumnStatsStmt extends DdlStmt { - private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() + private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() .add(ColumnStats.NDV) .add(ColumnStats.AVG_SIZE) .add(ColumnStats.MAX_SIZE) @@ -45,6 +47,7 @@ public class AlterColumnStatsStmt extends DdlStmt { private TableName tableName; private String columnName; private Map properties; + public final Map statsTypeToValue = Maps.newHashMap(); public AlterColumnStatsStmt(TableName tableName, String columnName, Map properties) { this.tableName = tableName; @@ -58,8 +61,8 @@ public class AlterColumnStatsStmt extends DdlStmt { // check table name tableName.analyze(analyzer); // check properties - Optional optional = properties.keySet().stream().filter( - entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity.toLowerCase())).findFirst(); + Optional optional = properties.keySet().stream().map(StatsType::fromString) + .filter(statsType -> !CONFIGURABLE_PROPERTIES_SET.contains(statsType)).findFirst(); if (optional.isPresent()) { throw new AnalysisException(optional.get() + " is invalid statistic"); } @@ -71,6 +74,11 @@ public class AlterColumnStatsStmt extends DdlStmt { ConnectContext.get().getRemoteIP(), tableName.getDb() + ": " + tableName.getTbl()); } + // get statsTypeToValue + properties.forEach((key, value) -> { + StatsType statsType = StatsType.fromString(key); + statsTypeToValue.put(statsType, value); + }); } public TableName getTableName() { @@ -81,7 +89,7 @@ public class AlterColumnStatsStmt extends DdlStmt { return columnName; } - public Map getProperties() { - return properties; + public Map getStatsTypeToValue() { + return statsTypeToValue; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java index 97512a4160..6d8ea6027e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableStatsStmt.java @@ -24,22 +24,25 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.StatsType; import org.apache.doris.statistics.TableStats; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import java.util.Map; import java.util.Optional; public class AlterTableStatsStmt extends DdlStmt { - private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() + private static final ImmutableSet CONFIGURABLE_PROPERTIES_SET = new ImmutableSet.Builder() .add(TableStats.DATA_SIZE) .add(TableStats.ROW_COUNT) .build(); private TableName tableName; private Map properties; + public final Map statsTypeToValue = Maps.newHashMap(); public AlterTableStatsStmt(TableName tableName, Map properties) { this.tableName = tableName; @@ -52,8 +55,8 @@ public class AlterTableStatsStmt extends DdlStmt { // check table name tableName.analyze(analyzer); // check properties - Optional optional = properties.keySet().stream().filter( - entity -> !CONFIGURABLE_PROPERTIES_SET.contains(entity.toLowerCase())).findFirst(); + Optional optional = properties.keySet().stream().map(StatsType::fromString) + .filter(statsType -> !CONFIGURABLE_PROPERTIES_SET.contains(statsType)).findFirst(); if (optional.isPresent()) { throw new AnalysisException(optional.get() + " is invalid statistic"); } @@ -65,13 +68,18 @@ public class AlterTableStatsStmt extends DdlStmt { ConnectContext.get().getRemoteIP(), tableName.getDb() + ": " + tableName.getTbl()); } + // get statsTypeToValue + properties.forEach((key, value) -> { + StatsType statsType = StatsType.fromString(key); + statsTypeToValue.put(statsType, value); + }); } public TableName getTableName() { return tableName; } - public Map getProperties() { - return properties; + public Map getStatsTypeToValue() { + return statsTypeToValue; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java index af41121301..784e719188 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java @@ -31,12 +31,12 @@ public class ShowColumnStatsStmt extends ShowStmt { private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("column_name") - .add(ColumnStats.NDV) - .add(ColumnStats.AVG_SIZE) - .add(ColumnStats.MAX_SIZE) - .add(ColumnStats.NUM_NULLS) - .add(ColumnStats.MIN_VALUE) - .add(ColumnStats.MAX_VALUE) + .add(ColumnStats.NDV.getValue()) + .add(ColumnStats.AVG_SIZE.getValue()) + .add(ColumnStats.MAX_SIZE.getValue()) + .add(ColumnStats.NUM_NULLS.getValue()) + .add(ColumnStats.MIN_VALUE.getValue()) + .add(ColumnStats.MAX_VALUE.getValue()) .build(); private TableName tableName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java index a35eb1f0c1..90541cb7dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowTableStatsStmt.java @@ -35,8 +35,8 @@ public class ShowTableStatsStmt extends ShowStmt { private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("table_name") - .add(TableStats.ROW_COUNT) - .add(TableStats.DATA_SIZE) + .add(TableStats.ROW_COUNT.getValue()) + .add(TableStats.DATA_SIZE.getValue()) .build(); private TableName tableName; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java index 5982982f21..df3b17899b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStats.java @@ -32,12 +32,12 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.Util; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import java.util.List; import java.util.Map; import java.util.function.Predicate; -import com.clearspring.analytics.util.Lists; /** * There are the statistics of column. @@ -57,12 +57,12 @@ import com.clearspring.analytics.util.Lists; */ public class ColumnStats { - public static final String NDV = "ndv"; - public static final String AVG_SIZE = "avg_size"; - public static final String MAX_SIZE = "max_size"; - public static final String NUM_NULLS = "num_nulls"; - public static final String MIN_VALUE = "min_value"; - public static final String MAX_VALUE = "max_value"; + public static final StatsType NDV = StatsType.NDV; + public static final StatsType AVG_SIZE = StatsType.AVG_SIZE; + public static final StatsType MAX_SIZE = StatsType.MAX_SIZE; + public static final StatsType NUM_NULLS = StatsType.NUM_NULLS; + public static final StatsType MIN_VALUE = StatsType.MIN_VALUE; + public static final StatsType MAX_VALUE = StatsType.MAX_VALUE; private static final Predicate DESIRED_NDV_PRED = (v) -> v >= -1L; private static final Predicate DESIRED_AVG_SIZE_PRED = (v) -> (v == -1) || (v >= 0); @@ -76,25 +76,34 @@ public class ColumnStats { private LiteralExpr minValue; private LiteralExpr maxValue; - public void updateStats(Type columnType, Map statsNameToValue) throws AnalysisException { - for (Map.Entry entry : statsNameToValue.entrySet()) { - String statsName = entry.getKey(); - if (statsName.equalsIgnoreCase(NDV)) { - ndv = Util.getLongPropertyOrDefault(entry.getValue(), ndv, + public void updateStats(Type columnType, Map statsNameToValue) throws AnalysisException { + for (Map.Entry entry : statsNameToValue.entrySet()) { + StatsType statsType = entry.getKey(); + switch (statsType) { + case NDV: + ndv = Util.getLongPropertyOrDefault(entry.getValue(), ndv, DESIRED_NDV_PRED, NDV + " should >= -1"); - } else if (statsName.equalsIgnoreCase(AVG_SIZE)) { - avgSize = Util.getFloatPropertyOrDefault(entry.getValue(), avgSize, + break; + case AVG_SIZE: + avgSize = Util.getFloatPropertyOrDefault(entry.getValue(), avgSize, DESIRED_AVG_SIZE_PRED, AVG_SIZE + " should (>=0) or (=-1)"); - } else if (statsName.equalsIgnoreCase(MAX_SIZE)) { - maxSize = Util.getLongPropertyOrDefault(entry.getValue(), maxSize, + break; + case MAX_SIZE: + maxSize = Util.getLongPropertyOrDefault(entry.getValue(), maxSize, DESIRED_MAX_SIZE_PRED, MAX_SIZE + " should >=-1"); - } else if (statsName.equalsIgnoreCase(NUM_NULLS)) { - numNulls = Util.getLongPropertyOrDefault(entry.getValue(), numNulls, + break; + case NUM_NULLS: + numNulls = Util.getLongPropertyOrDefault(entry.getValue(), numNulls, DESIRED_NUM_NULLS_PRED, NUM_NULLS + " should >=-1"); - } else if (statsName.equalsIgnoreCase(MIN_VALUE)) { - minValue = validateColumnValue(columnType, entry.getValue()); - } else if (statsName.equalsIgnoreCase(MAX_VALUE)) { - maxValue = validateColumnValue(columnType, entry.getValue()); + break; + case MIN_VALUE: + minValue = validateColumnValue(columnType, entry.getValue()); + break; + case MAX_VALUE: + maxValue = validateColumnValue(columnType, entry.getValue()); + break; + default: + throw new AnalysisException("Unknown stats type: " + statsType); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java index 58003de90c..77aed0c5f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java @@ -38,25 +38,25 @@ public class Statistics { private Map idToTableStats = Maps.newConcurrentMap(); - public void updateTableStats(long tableId, Map statsNameToValue) + public void updateTableStats(long tableId, Map statsTypeToValue) throws AnalysisException { TableStats tableStats = idToTableStats.get(tableId); if (tableStats == null) { tableStats = new TableStats(); idToTableStats.put(tableId, tableStats); } - tableStats.updateTableStats(statsNameToValue); + tableStats.updateTableStats(statsTypeToValue); } public void updateColumnStats(long tableId, String columnName, Type columnType, - Map statsNameToValue) + Map statsTypeToValue) throws AnalysisException { TableStats tableStats = idToTableStats.get(tableId); if (tableStats == null) { tableStats = new TableStats(); idToTableStats.put(tableId, tableStats); } - tableStats.updateColumnStats(columnName, columnType, statsNameToValue); + tableStats.updateColumnStats(columnName, columnType, statsTypeToValue); } public TableStats getTableStats(long tableId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java index b1c88e36a5..0fea564740 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java @@ -24,18 +24,25 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.Map; -import com.clearspring.analytics.util.Lists; - public class StatisticsManager { + private final static Logger LOG = LogManager.getLogger(StatisticsManager.class); + private Statistics statistics; public StatisticsManager() { @@ -45,7 +52,7 @@ public class StatisticsManager { public void alterTableStatistics(AlterTableStatsStmt stmt) throws AnalysisException { Table table = validateTableName(stmt.getTableName()); - statistics.updateTableStats(table.getId(), stmt.getProperties()); + statistics.updateTableStats(table.getId(), stmt.getStatsTypeToValue()); } public void alterColumnStatistics(AlterColumnStatsStmt stmt) throws AnalysisException { @@ -56,7 +63,7 @@ public class StatisticsManager { ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_FIELD_ERROR, columnName, table.getName()); } // match type and column value - statistics.updateColumnStats(table.getId(), columnName, column.getType(), stmt.getProperties()); + statistics.updateColumnStats(table.getId(), columnName, column.getType(), stmt.getStatsTypeToValue()); } public List> showTableStatsList(String dbName, String tableName) @@ -128,13 +135,48 @@ public class StatisticsManager { return row; } + public void alterTableStatistics(StatisticsTaskResult taskResult) throws AnalysisException { + StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc(); + validateTableAndColumn(categoryDesc); + long tblId = categoryDesc.getTableId(); + Map statsTypeToValue = taskResult.getStatsTypeToValue(); + statistics.updateTableStats(tblId, statsTypeToValue); + } + + public void alterColumnStatistics(StatisticsTaskResult taskResult) throws AnalysisException { + StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc(); + validateTableAndColumn(categoryDesc); + long dbId = categoryDesc.getDbId(); + long tblId = categoryDesc.getTableId(); + Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId); + Table table = db.getTableOrAnalysisException(tblId); + String columnName = categoryDesc.getColumnName(); + Type columnType = table.getColumn(columnName).getType(); + Map statsTypeToValue = taskResult.getStatsTypeToValue(); + statistics.updateColumnStats(tblId, columnName, columnType, statsTypeToValue); + } + private Table validateTableName(TableName dbTableName) throws AnalysisException { String dbName = dbTableName.getDb(); String tableName = dbTableName.getTbl(); Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbName); - Table table = db.getTableOrAnalysisException(tableName); - return table; + return db.getTableOrAnalysisException(tableName); + } + + private void validateTableAndColumn(StatsCategoryDesc categoryDesc) throws AnalysisException { + long dbId = categoryDesc.getDbId(); + long tblId = categoryDesc.getTableId(); + String columnName = categoryDesc.getColumnName(); + + Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId); + Table table = db.getTableOrAnalysisException(tblId); + if (!Strings.isNullOrEmpty(columnName)) { + Column column = table.getColumn(columnName); + if (column == null) { + throw new AnalysisException("Column " + columnName + " does not exist in table " + table.getName()); + } + } } public Statistics getStatistics() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java index fa236584c3..76af28c48f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java @@ -25,7 +25,6 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * The StatisticsTask belongs to one StatisticsJob. @@ -48,8 +47,6 @@ public abstract class StatisticsTask implements Callable { FAILED } - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); - protected long id = Catalog.getCurrentCatalog().getNextId(); protected long jobId; protected StatsGranularityDesc granularityDesc; @@ -71,24 +68,8 @@ public abstract class StatisticsTask implements Callable { this.statsTypeList = statsTypeList; } - public void readLock() { - lock.readLock().lock(); - } - - public void readUnlock() { - lock.readLock().unlock(); - } - - protected void writeLock() { - lock.writeLock().lock(); - } - - protected void writeUnlock() { - lock.writeLock().unlock(); - } - public long getId() { - return this.id; + return id; } public void setId(long id) { @@ -96,35 +77,35 @@ public abstract class StatisticsTask implements Callable { } public long getJobId() { - return this.jobId; + return jobId; } public StatsGranularityDesc getGranularityDesc() { - return this.granularityDesc; + return granularityDesc; } public StatsCategoryDesc getCategoryDesc() { - return this.categoryDesc; + return categoryDesc; } public List getStatsTypeList() { - return this.statsTypeList; + return statsTypeList; } public TaskState getTaskState() { - return this.taskState; + return taskState; } public long getCreateTime() { - return this.createTime; + return createTime; } public long getStartTime() { - return this.startTime; + return startTime; } public long getFinishTime() { - return this.finishTime; + return finishTime; } /** @@ -139,44 +120,38 @@ public abstract class StatisticsTask implements Callable { // please retain job lock firstly public void updateTaskState(TaskState newState) throws DdlException { LOG.info("To change statistics task(id={}) state from {} to {}", id, taskState, newState); - try { - // PENDING -> RUNNING/FAILED - if (taskState == TaskState.PENDING) { - if (newState == TaskState.RUNNING) { - taskState = newState; - // task start running, set start time + String errorMsg = "Invalid statistics task state transition from "; + + // PENDING -> RUNNING/FAILED + if (taskState == TaskState.PENDING) { + switch (newState) { + case RUNNING: startTime = System.currentTimeMillis(); - LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); - } else if (newState == TaskState.FAILED) { - taskState = newState; - LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); - } else { - LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new DdlException("Invalid task state transition from PENDING to " + newState); - } - return; - } - - // RUNNING -> FINISHED/FAILED - if (taskState == TaskState.RUNNING) { - if (newState == TaskState.FINISHED) { - // set finish time + break; + case FAILED: finishTime = System.currentTimeMillis(); - taskState = newState; - LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); - } else if (newState == TaskState.FAILED) { - taskState = newState; - LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState); - } else { - LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new DdlException("Invalid task state transition from RUNNING to " + newState); - } + break; + default: + throw new DdlException(errorMsg + taskState + " to " + newState); } - - LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState); - throw new DdlException("Invalid task state transition from " + taskState + " to " + newState); - } finally { - LOG.info("Statistics task(id={}) current state is {}", id, taskState); } + // RUNNING -> FINISHED/FAILED + else if (taskState == TaskState.RUNNING) { + switch (newState) { + case FINISHED: + case FAILED: + finishTime = System.currentTimeMillis(); + break; + default: + throw new DdlException(errorMsg + taskState + " to " + newState); + } + } + // unsupported state transition + else { + throw new DdlException(errorMsg + taskState + " to " + newState); + } + + LOG.info("Statistics job(id={}) state changed from {} to {}", id, taskState, newState); + taskState = newState; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java index 700e2fa93a..94f4c50934 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java @@ -20,9 +20,9 @@ package org.apache.doris.statistics; import java.util.Map; public class StatisticsTaskResult { - private StatsGranularityDesc granularityDesc; - private StatsCategoryDesc categoryDesc; - private Map statsTypeToValue; + private final StatsGranularityDesc granularityDesc; + private final StatsCategoryDesc categoryDesc; + private final Map statsTypeToValue; public StatisticsTaskResult(StatsGranularityDesc granularityDesc, StatsCategoryDesc categoryDesc, Map statsTypeToValue) { @@ -30,4 +30,16 @@ public class StatisticsTaskResult { this.categoryDesc = categoryDesc; this.statsTypeToValue = statsTypeToValue; } + + public StatsGranularityDesc getGranularityDesc() { + return granularityDesc; + } + + public StatsCategoryDesc getCategoryDesc() { + return categoryDesc; + } + + public Map getStatsTypeToValue() { + return statsTypeToValue; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java index 46450417b6..0edfa82047 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java @@ -17,20 +17,33 @@ package org.apache.doris.statistics; +import org.apache.doris.analysis.AnalyzeStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.statistics.StatisticsJob.JobState; +import org.apache.doris.statistics.StatisticsTask.TaskState; +import org.apache.doris.statistics.StatsCategoryDesc.StatsCategory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Queues; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Map; import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /* Schedule statistics task @@ -46,25 +59,41 @@ public class StatisticsTaskScheduler extends MasterDaemon { @Override protected void runAfterCatalogReady() { - // TODO // step1: task n concurrent tasks from the queue List tasks = peek(); - // step2: execute tasks - ExecutorService executor = Executors.newFixedThreadPool(tasks.size()); - List> taskResultList = null; - try { - taskResultList = executor.invokeAll(tasks); - } catch (InterruptedException e) { - LOG.warn("Failed to execute this turn of statistics tasks", e); - } - // step3: update job and statistics - handleTaskResult(taskResultList); - // step4: remove task from queue - remove(tasks.size()); + if (!tasks.isEmpty()) { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonCacheThreadPool(tasks.size(), + "statistic-pool", false); + StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager(); + Map statisticsJobs = jobManager.getIdToStatisticsJob(); + Map>>> resultMap = Maps.newLinkedHashMap(); + + for (StatisticsTask task : tasks) { + queue.remove(); + long jobId = task.getJobId(); + StatisticsJob statisticsJob = statisticsJobs.get(jobId); + + if (checkJobIsValid(jobId)) { + // step2: execute task and save task result + Future future = executor.submit(task); + if (updateTaskAndJobState(task, statisticsJob)) { + Map> taskInfo = Maps.newHashMap(); + taskInfo.put(task.getId(), future); + List>> jobInfo = resultMap + .getOrDefault(jobId, Lists.newArrayList()); + jobInfo.add(taskInfo); + resultMap.put(jobId, jobInfo); + } + } + } + + // step3: handle task results + handleTaskResult(resultMap); + } } - public void addTasks(List statisticsTaskList) { + public void addTasks(List statisticsTaskList) throws IllegalStateException { queue.addAll(statisticsTaskList); } @@ -82,11 +111,89 @@ public class StatisticsTaskScheduler extends MasterDaemon { return tasks; } - private void remove(int size) { - // TODO + /** + * Update task and job state + * + * @param task statistics task + * @param job statistics job + * @return true if update task and job state successfully. + */ + private boolean updateTaskAndJobState(StatisticsTask task, StatisticsJob job) { + try { + // update task state + task.updateTaskState(TaskState.RUNNING); + } catch (DdlException e) { + LOG.info("Update statistics task state failed, taskId: " + task.getId(), e); + } + + try { + // update job state + if (task.getTaskState() != TaskState.RUNNING) { + job.updateJobState(JobState.FAILED); + } else { + if (job.getJobState() == JobState.SCHEDULING) { + job.updateJobState(JobState.RUNNING); + } + } + } catch (DdlException e) { + LOG.info("Update statistics job state failed, jobId: " + job.getId(), e); + return false; + } + return true; } - private void handleTaskResult(List> taskResultLists) { - // TODO + private void handleTaskResult(Map>>> resultMap) { + StatisticsManager statsManager = Catalog.getCurrentCatalog().getStatisticsManager(); + StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager(); + + resultMap.forEach((jobId, taskMapList) -> { + if (checkJobIsValid(jobId)) { + String errorMsg = ""; + StatisticsJob statisticsJob = jobManager.getIdToStatisticsJob().get(jobId); + Map properties = statisticsJob.getProperties(); + long timeout = Long.parseLong(properties.get(AnalyzeStmt.CBO_STATISTICS_TASK_TIMEOUT_SEC)); + + for (Map> taskInfos : taskMapList) { + for (Map.Entry> taskInfo : taskInfos.entrySet()) { + Long taskId = taskInfo.getKey(); + Future future = taskInfo.getValue(); + + try { + StatisticsTaskResult taskResult = future.get(timeout, TimeUnit.SECONDS); + StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc(); + StatsCategory category = categoryDesc.getCategory(); + if (category == StatsCategory.TABLE) { + // update table statistics + statsManager.alterTableStatistics(taskResult); + } else if (category == StatsCategory.COLUMN) { + // update column statistics + statsManager.alterColumnStatistics(taskResult); + } + } catch (AnalysisException | TimeoutException | ExecutionException + | InterruptedException | CancellationException e) { + errorMsg = e.getMessage(); + LOG.info("Failed to update statistics. jobId: {}, taskId: {}, e: {}", jobId, taskId, e); + } + + try { + // update the task and job info + statisticsJob.updateJobInfoByTaskId(taskId, errorMsg); + } catch (DdlException e) { + LOG.info("Failed to update statistics job info. jobId: {}, taskId: {}, e: {}", jobId, taskId, e); + } + } + } + } + }); + } + + public boolean checkJobIsValid(Long jobId) { + StatisticsJobManager jobManager = Catalog.getCurrentCatalog().getStatisticsJobManager(); + StatisticsJob statisticsJob = jobManager.getIdToStatisticsJob().get(jobId); + if (statisticsJob == null) { + return false; + } + JobState jobState = statisticsJob.getJobState(); + return jobState != JobState.CANCELLED && jobState != JobState.FAILED; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java index 50f916dadf..e805bff1a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java @@ -18,12 +18,30 @@ package org.apache.doris.statistics; public enum StatsType { - ROW_COUNT, - DATA_SIZE, - NDV, - AVG_SIZE, - MAX_SIZE, - NUM_NULLS, - MIN_VALUE, - MAX_VALUE + ROW_COUNT("row_count"), + DATA_SIZE("data_size"), + NDV("ndv"), + AVG_SIZE("avg_size"), + MAX_SIZE("max_size"), + NUM_NULLS("num_nulls"), + MIN_VALUE("min_value"), + MAX_VALUE("max_value"), + MAX_COL_LENS("max_col_lens"), + AVG_COL_LENS("avg_col_lens"); + private final String value; + StatsType(String value) { + this.value = value; + } + public String getValue() { + return value; + } + + public static StatsType fromString(String value) { + for (StatsType type : StatsType.values()) { + if (type.value.equalsIgnoreCase(value)) { + return type; + } + } + throw new IllegalArgumentException("Invalid StatsType: " + value); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java index ef494bd9f6..08ef2cc6be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStats.java @@ -21,13 +21,13 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.Util; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.List; import java.util.Map; import java.util.function.Predicate; -import com.clearspring.analytics.util.Lists; /** * There are the statistics of table. @@ -51,8 +51,8 @@ import com.clearspring.analytics.util.Lists; */ public class TableStats { - public static final String ROW_COUNT = "row_count"; - public static final String DATA_SIZE = "data_size"; + public static final StatsType DATA_SIZE = StatsType.DATA_SIZE; + public static final StatsType ROW_COUNT = StatsType.ROW_COUNT; private static final Predicate DESIRED_ROW_COUNT_PRED = (v) -> v >= -1L; private static final Predicate DESIRED_DATA_SIZE_PRED = (v) -> v >= -1L; @@ -61,27 +61,27 @@ public class TableStats { private long dataSize = -1; private Map nameToColumnStats = Maps.newConcurrentMap(); - public void updateTableStats(Map statsNameToValue) throws AnalysisException { - for (Map.Entry entry : statsNameToValue.entrySet()) { - String statsName = entry.getKey(); - if (statsName.equalsIgnoreCase(ROW_COUNT)) { + public void updateTableStats(Map statsTypeToValue) throws AnalysisException { + for (Map.Entry entry : statsTypeToValue.entrySet()) { + StatsType statsType = entry.getKey(); + if (statsType == ROW_COUNT) { rowCount = Util.getLongPropertyOrDefault(entry.getValue(), rowCount, DESIRED_ROW_COUNT_PRED, ROW_COUNT + " should >= -1"); - } else if (statsName.equalsIgnoreCase(DATA_SIZE)) { + } else if (statsType == DATA_SIZE) { dataSize = Util.getLongPropertyOrDefault(entry.getValue(), dataSize, DESIRED_DATA_SIZE_PRED, DATA_SIZE + " should >= -1"); } } } - public void updateColumnStats(String columnName, Type columnType, Map statsNameToValue) + public void updateColumnStats(String columnName, Type columnType, Map statsTypeToValue) throws AnalysisException { ColumnStats columnStats = nameToColumnStats.get(columnName); if (columnStats == null) { columnStats = new ColumnStats(); nameToColumnStats.put(columnName, columnStats); } - columnStats.updateStats(columnType, statsNameToValue); + columnStats.updateStats(columnType, statsTypeToValue); } public List getShowInfo() {