From 3fa820ec50003e681b2ef810d156c38b00f44989 Mon Sep 17 00:00:00 2001 From: ElvinWei Date: Thu, 22 Sep 2022 11:13:35 +0800 Subject: [PATCH] [feature-wip](statistics) collect statistics by sql task (#12765) This pull request includes some implementations of the statistics(#6370), it Implements sql-task to collect statistics based on internal-query(#9983). After the ANALYZE statement is parsed, statistical tasks will be generated. The statistical tasks includes mata-task(get statistics from metadata) and sql-task(get statistics by sql query). For sql-task, it will get statistics such as the row_count, the number of null values, and the maximum value by SQL query. For statistical tasks, also include sampling sql-task, which will be implemented in the next pr. --- .../apache/doris/analysis/AnalyzeStmt.java | 11 +- .../doris/statistics/SQLStatisticsTask.java | 115 ++++++++-- .../statistics/SampleSQLStatisticsTask.java | 8 +- .../apache/doris/statistics/StatsType.java | 5 +- .../statistics/util/InternalSqlTemplate.java | 5 + .../statistics/SQLStatisticsTaskTest.java | 207 ++++++++++++++++++ 6 files changed, 315 insertions(+), 36 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/statistics/SQLStatisticsTaskTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java index 9684cc86d8..f25c2381a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java @@ -51,16 +51,13 @@ import java.util.function.Predicate; import java.util.stream.Collectors; /** - * Collect statistics about a database + * Collect statistics. * * syntax: * ANALYZE [[ db_name.tb_name ] [( column_name [, ...] )], ...] [ PROPERTIES(...) ] - * - * db_name.tb_name: collect table and column statistics from tb_name - * - * column_name: collect column statistics from column_name - * - * properties: properties of statistics jobs + * db_name.tb_name: collect table and column statistics from tb_name + * column_name: collect column statistics from column_name + * properties: properties of statistics jobs */ public class AnalyzeStmt extends DdlStmt { // time to wait for collect statistics diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java index ef25c89262..741803ca88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/SQLStatisticsTask.java @@ -17,16 +17,30 @@ package org.apache.doris.statistics; -import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.InvalidFormatException; +import org.apache.doris.statistics.StatisticsTaskResult.TaskResult; +import org.apache.doris.statistics.StatsGranularity.Granularity; +import org.apache.doris.statistics.util.InternalQuery; +import org.apache.doris.statistics.util.InternalQueryResult; +import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; +import org.apache.doris.statistics.util.InternalSqlTemplate; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.util.List; +import java.util.Map; /** * A statistics task that collects statistics by executing query. * The results of the query will be returned as @StatisticsTaskResult. */ public class SQLStatisticsTask extends StatisticsTask { - private SelectStmt query; + private String statement; public SQLStatisticsTask(long jobId, List statsDescs) { super(jobId, statsDescs); @@ -34,30 +48,89 @@ public class SQLStatisticsTask extends StatisticsTask { @Override public StatisticsTaskResult call() throws Exception { - // TODO - // step1: construct query by statsDescList - constructQuery(); - // step2: execute query - // the result should be sequence by @statsTypeList - List queryResultList = executeQuery(query); - // step3: construct StatisticsTaskResult by query result - constructTaskResult(queryResultList); - return null; + checkStatisticsDesc(); + List taskResults = Lists.newArrayList(); + + for (StatisticsDesc statsDesc : statsDescs) { + statement = constructQuery(statsDesc); + TaskResult taskResult = executeQuery(statsDesc); + taskResults.add(taskResult); + } + + return new StatisticsTaskResult(taskResults); } - protected void constructQuery() { - // TODO - // step1: construct FROM by @granularityDesc - // step2: construct SELECT LIST by @statsTypeList + protected String constructQuery(StatisticsDesc statsDesc) throws DdlException, + InvalidFormatException { + Map params = getQueryParams(statsDesc); + List statsTypes = statsDesc.getStatsTypes(); + StatsType type = statsTypes.get(0); + + StatsGranularity statsGranularity = statsDesc.getStatsGranularity(); + Granularity granularity = statsGranularity.getGranularity(); + boolean nonPartitioned = granularity != Granularity.PARTITION; + + switch (type) { + case ROW_COUNT: + return nonPartitioned ? InternalSqlTemplate.buildStatsRowCountSql(params) + : InternalSqlTemplate.buildStatsPartitionRowCountSql(params); + case NUM_NULLS: + return nonPartitioned ? InternalSqlTemplate.buildStatsNumNullsSql(params) + : InternalSqlTemplate.buildStatsPartitionNumNullsSql(params); + case MAX_SIZE: + case AVG_SIZE: + return nonPartitioned ? InternalSqlTemplate.buildStatsMaxAvgSizeSql(params) + : InternalSqlTemplate.buildStatsPartitionMaxAvgSizeSql(params); + case NDV: + case MAX_VALUE: + case MIN_VALUE: + return nonPartitioned ? InternalSqlTemplate.buildStatsMinMaxNdvValueSql(params) + : InternalSqlTemplate.buildStatsPartitionMinMaxNdvValueSql(params); + case DATA_SIZE: + default: + throw new DdlException("Unsupported statistics type: " + type); + } } - protected List executeQuery(SelectStmt query) { - // TODO (ML) - return null; + protected TaskResult executeQuery(StatisticsDesc statsDesc) throws Exception { + StatsGranularity granularity = statsDesc.getStatsGranularity(); + List statsTypes = statsDesc.getStatsTypes(); + StatsCategory category = statsDesc.getStatsCategory(); + + String dbName = Env.getCurrentInternalCatalog() + .getDbOrDdlException(category.getDbId()).getFullName(); + InternalQuery query = new InternalQuery(dbName, statement); + InternalQueryResult queryResult = query.query(); + List resultRows = queryResult.getResultRows(); + + if (resultRows != null && resultRows.size() == 1) { + ResultRow resultRow = resultRows.get(0); + List columns = resultRow.getColumns(); + TaskResult result = createNewTaskResult(category, granularity); + + if (columns.size() == statsTypes.size()) { + for (int i = 0; i < columns.size(); i++) { + StatsType statsType = StatsType.fromString(columns.get(i)); + result.getStatsTypeToValue().put(statsType, resultRow.getString(i)); + } + return result; + } + } + + // Statistics statements are executed singly and return only one row data + throw new DdlException("Statistics query result is incorrect, statement: " + + statement + " queryResult: " + queryResult); } - protected StatisticsTaskResult constructTaskResult(List queryResultList) { - // TODO - return null; + private Map getQueryParams(StatisticsDesc statsDesc) throws DdlException { + StatsCategory category = statsDesc.getStatsCategory(); + Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(category.getDbId()); + Table table = db.getTableOrDdlException(category.getTableId()); + + Map params = Maps.newHashMap(); + params.put(InternalSqlTemplate.TABLE, table.getName()); + params.put(InternalSqlTemplate.PARTITION, category.getPartitionName()); + params.put(InternalSqlTemplate.COLUMN, category.getColumnName()); + return params; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/SampleSQLStatisticsTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/SampleSQLStatisticsTask.java index f9930fc8cb..89ac522927 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/SampleSQLStatisticsTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/SampleSQLStatisticsTask.java @@ -30,13 +30,7 @@ public class SampleSQLStatisticsTask extends SQLStatisticsTask { private float samplePercentage = Config.cbo_default_sample_percentage; public SampleSQLStatisticsTask(long jobId, List statsDescs) { + // TODO(wzt): implement sql sampling to collect statistics super(jobId, statsDescs); } - - @Override - protected void constructQuery() { - // TODO - super.constructQuery(); - // step1: construct table sample - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java index cb501ad819..a7fd1d6cf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsType.java @@ -25,7 +25,10 @@ public enum StatsType { MAX_SIZE("max_size"), NUM_NULLS("num_nulls"), MIN_VALUE("min_value"), - MAX_VALUE("max_value"); + MAX_VALUE("max_value"), + // only for test + UNKNOWN("unknown"); + private final String value; StatsType(String value) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/InternalSqlTemplate.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/InternalSqlTemplate.java index 9350dd3c27..33a26c4ce1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/InternalSqlTemplate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/InternalSqlTemplate.java @@ -35,6 +35,11 @@ import java.util.regex.Pattern; * - ${column} and ${table} will be replaced with the actual executed table and column. */ public class InternalSqlTemplate { + /** common parameters: tableName, columnName, partitionName */ + public static final String TABLE = "table"; + public static final String PARTITION = "partition"; + public static final String COLUMN = "column"; + /** -------------------------- for statistics begin -------------------------- */ public static final String MIN_VALUE_SQL = "SELECT MIN(${column}) AS min_value FROM ${table};"; public static final String PARTITION_MIN_VALUE_SQL = "SELECT MIN(${column}) AS min_value" diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/SQLStatisticsTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/SQLStatisticsTaskTest.java new file mode 100644 index 0000000000..b813e2a7a3 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/SQLStatisticsTaskTest.java @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.statistics.util.InternalQuery; +import org.apache.doris.statistics.util.InternalQueryResult; + +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + + +public class SQLStatisticsTaskTest { + private SQLStatisticsTask sqlStatisticsTaskUnderTest; + + @Before + public void setUp() throws Exception { + StatsCategory statsCategory = new StatsCategory(); + StatsGranularity statsGranularity = new StatsGranularity(); + List statsTypes = Collections.singletonList(StatsType.ROW_COUNT); + sqlStatisticsTaskUnderTest = new SQLStatisticsTask(0L, + Collections.singletonList(new StatisticsDesc(statsCategory, statsGranularity, statsTypes))); + + InternalCatalog catalog = Env.getCurrentInternalCatalog(); + Column column = new Column("columnName", PrimitiveType.STRING); + OlapTable tableName = new OlapTable(0L, "tableName", + Collections.singletonList(column), KeysType.AGG_KEYS, + new PartitionInfo(), new HashDistributionInfo()); + Database database = new Database(0L, "db"); + database.createTable(tableName); + + ConcurrentHashMap fullNameToDb = new ConcurrentHashMap<>(); + fullNameToDb.put("cluster:db", database); + Deencapsulation.setField(catalog, "fullNameToDb", fullNameToDb); + + ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); + idToDb.put(0L, database); + Deencapsulation.setField(catalog, "idToDb", idToDb); + + List columns = Collections.singletonList("row_count"); + List types = Arrays.asList(PrimitiveType.STRING, + PrimitiveType.INT, PrimitiveType.FLOAT, + PrimitiveType.DOUBLE, PrimitiveType.BIGINT); + InternalQueryResult queryResult = new InternalQueryResult(columns, types); + InternalQueryResult.ResultRow resultRow = + new InternalQueryResult.ResultRow(Collections.singletonList("1000")); + queryResult.getResultRows().add(resultRow); + + new MockUp(InternalQuery.class) { + @Mock + public InternalQueryResult query() { + return queryResult; + } + }; + } + + @Test + public void testConstructQuery() throws Exception { + // Setup + String expectedSQL = "SELECT COUNT(1) AS row_count FROM tableName;"; + + StatsCategory statsCategory = new StatsCategory(); + statsCategory.setCategory(StatsCategory.Category.TABLE); + statsCategory.setDbId(0L); + statsCategory.setTableId(0L); + statsCategory.setPartitionName("partitionName"); + statsCategory.setColumnName("columnName"); + statsCategory.setStatsValue("statsValue"); + + StatsGranularity statsGranularity = new StatsGranularity(); + statsGranularity.setGranularity(StatsGranularity.Granularity.TABLE); + statsGranularity.setTableId(0L); + statsGranularity.setPartitionId(0L); + statsGranularity.setTabletId(0L); + + StatisticsDesc statsDesc = new StatisticsDesc(statsCategory, statsGranularity, + Collections.singletonList(StatsType.ROW_COUNT)); + + // Run the test + String result = sqlStatisticsTaskUnderTest.constructQuery(statsDesc); + + // Verify the results + Assert.assertEquals(expectedSQL, result); + } + + @Test + public void testConstructQuery_ThrowsDdlException() { + // Setup + StatsCategory statsCategory = new StatsCategory(); + statsCategory.setCategory(StatsCategory.Category.TABLE); + statsCategory.setDbId(0L); + statsCategory.setTableId(0L); + statsCategory.setPartitionName("partitionName"); + statsCategory.setColumnName("columnName"); + statsCategory.setStatsValue("statsValue"); + + StatsGranularity statsGranularity = new StatsGranularity(); + statsGranularity.setGranularity(StatsGranularity.Granularity.TABLE); + statsGranularity.setTableId(0L); + statsGranularity.setPartitionId(0L); + statsGranularity.setTabletId(0L); + + StatisticsDesc statsDesc = new StatisticsDesc(statsCategory, statsGranularity, + Collections.singletonList(StatsType.UNKNOWN)); + + // Run the test + Assert.assertThrows(DdlException.class, + () -> sqlStatisticsTaskUnderTest.constructQuery(statsDesc)); + } + + @Test + public void testExecuteQuery() throws Exception { + // Setup + StatsCategory statsCategory = new StatsCategory(); + statsCategory.setCategory(StatsCategory.Category.TABLE); + statsCategory.setDbId(0L); + statsCategory.setTableId(0L); + statsCategory.setPartitionName("partitionName"); + statsCategory.setColumnName("columnName"); + statsCategory.setStatsValue("statsValue"); + + StatsGranularity statsGranularity = new StatsGranularity(); + statsGranularity.setGranularity(StatsGranularity.Granularity.TABLE); + statsGranularity.setTableId(0L); + statsGranularity.setPartitionId(0L); + statsGranularity.setTabletId(0L); + + StatisticsTaskResult.TaskResult expectedResult = new StatisticsTaskResult.TaskResult(); + expectedResult.setDbId(0L); + expectedResult.setTableId(0L); + expectedResult.setPartitionName("partitionName"); + expectedResult.setColumnName("columnName"); + expectedResult.setCategory(StatsCategory.Category.TABLE); + expectedResult.setGranularity(StatsGranularity.Granularity.TABLE); + HashMap hashMap = new HashMap<>(); + hashMap.put(StatsType.ROW_COUNT, "1000"); + expectedResult.setStatsTypeToValue(hashMap); + + StatisticsDesc statsDesc = new StatisticsDesc(statsCategory, statsGranularity, + Collections.singletonList(StatsType.ROW_COUNT)); + + // Run the test + StatisticsTaskResult.TaskResult result = sqlStatisticsTaskUnderTest.executeQuery(statsDesc); + + // Verify the results + Assert.assertEquals(expectedResult, result); + } + + @Test + public void testExecuteQuery_ThrowsException() { + // Setup + StatsCategory statsCategory = new StatsCategory(); + statsCategory.setCategory(StatsCategory.Category.TABLE); + statsCategory.setDbId(0L); + statsCategory.setTableId(0L); + statsCategory.setPartitionName("partitionName"); + statsCategory.setColumnName("columnName"); + statsCategory.setStatsValue("statsValue"); + + StatsGranularity statsGranularity = new StatsGranularity(); + statsGranularity.setGranularity(StatsGranularity.Granularity.TABLE); + statsGranularity.setTableId(0L); + statsGranularity.setPartitionId(0L); + statsGranularity.setTabletId(0L); + + StatisticsDesc statsDesc = new StatisticsDesc(statsCategory, statsGranularity, + Arrays.asList(StatsType.NDV, StatsType.MAX_VALUE, StatsType.MIN_VALUE)); + + // Run the test + Assert.assertThrows(Exception.class, + () -> sqlStatisticsTaskUnderTest.executeQuery(statsDesc)); + } +}