[feature-wip](statistics) collect statistics by sql task (#12765)
This pull request includes some implementations of the statistics(#6370), it Implements sql-task to collect statistics based on internal-query(#9983). After the ANALYZE statement is parsed, statistical tasks will be generated. The statistical tasks includes mata-task(get statistics from metadata) and sql-task(get statistics by sql query). For sql-task, it will get statistics such as the row_count, the number of null values, and the maximum value by SQL query. For statistical tasks, also include sampling sql-task, which will be implemented in the next pr.
This commit is contained in:
@ -51,16 +51,13 @@ import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Collect statistics about a database
|
||||
* Collect statistics.
|
||||
*
|
||||
* syntax:
|
||||
* ANALYZE [[ db_name.tb_name ] [( column_name [, ...] )], ...] [ PROPERTIES(...) ]
|
||||
*
|
||||
* db_name.tb_name: collect table and column statistics from tb_name
|
||||
*
|
||||
* column_name: collect column statistics from column_name
|
||||
*
|
||||
* properties: properties of statistics jobs
|
||||
* db_name.tb_name: collect table and column statistics from tb_name
|
||||
* column_name: collect column statistics from column_name
|
||||
* properties: properties of statistics jobs
|
||||
*/
|
||||
public class AnalyzeStmt extends DdlStmt {
|
||||
// time to wait for collect statistics
|
||||
|
||||
@ -17,16 +17,30 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.InvalidFormatException;
|
||||
import org.apache.doris.statistics.StatisticsTaskResult.TaskResult;
|
||||
import org.apache.doris.statistics.StatsGranularity.Granularity;
|
||||
import org.apache.doris.statistics.util.InternalQuery;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.InternalSqlTemplate;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A statistics task that collects statistics by executing query.
|
||||
* The results of the query will be returned as @StatisticsTaskResult.
|
||||
*/
|
||||
public class SQLStatisticsTask extends StatisticsTask {
|
||||
private SelectStmt query;
|
||||
private String statement;
|
||||
|
||||
public SQLStatisticsTask(long jobId, List<StatisticsDesc> statsDescs) {
|
||||
super(jobId, statsDescs);
|
||||
@ -34,30 +48,89 @@ public class SQLStatisticsTask extends StatisticsTask {
|
||||
|
||||
@Override
|
||||
public StatisticsTaskResult call() throws Exception {
|
||||
// TODO
|
||||
// step1: construct query by statsDescList
|
||||
constructQuery();
|
||||
// step2: execute query
|
||||
// the result should be sequence by @statsTypeList
|
||||
List<String> queryResultList = executeQuery(query);
|
||||
// step3: construct StatisticsTaskResult by query result
|
||||
constructTaskResult(queryResultList);
|
||||
return null;
|
||||
checkStatisticsDesc();
|
||||
List<TaskResult> taskResults = Lists.newArrayList();
|
||||
|
||||
for (StatisticsDesc statsDesc : statsDescs) {
|
||||
statement = constructQuery(statsDesc);
|
||||
TaskResult taskResult = executeQuery(statsDesc);
|
||||
taskResults.add(taskResult);
|
||||
}
|
||||
|
||||
return new StatisticsTaskResult(taskResults);
|
||||
}
|
||||
|
||||
protected void constructQuery() {
|
||||
// TODO
|
||||
// step1: construct FROM by @granularityDesc
|
||||
// step2: construct SELECT LIST by @statsTypeList
|
||||
protected String constructQuery(StatisticsDesc statsDesc) throws DdlException,
|
||||
InvalidFormatException {
|
||||
Map<String, String> params = getQueryParams(statsDesc);
|
||||
List<StatsType> statsTypes = statsDesc.getStatsTypes();
|
||||
StatsType type = statsTypes.get(0);
|
||||
|
||||
StatsGranularity statsGranularity = statsDesc.getStatsGranularity();
|
||||
Granularity granularity = statsGranularity.getGranularity();
|
||||
boolean nonPartitioned = granularity != Granularity.PARTITION;
|
||||
|
||||
switch (type) {
|
||||
case ROW_COUNT:
|
||||
return nonPartitioned ? InternalSqlTemplate.buildStatsRowCountSql(params)
|
||||
: InternalSqlTemplate.buildStatsPartitionRowCountSql(params);
|
||||
case NUM_NULLS:
|
||||
return nonPartitioned ? InternalSqlTemplate.buildStatsNumNullsSql(params)
|
||||
: InternalSqlTemplate.buildStatsPartitionNumNullsSql(params);
|
||||
case MAX_SIZE:
|
||||
case AVG_SIZE:
|
||||
return nonPartitioned ? InternalSqlTemplate.buildStatsMaxAvgSizeSql(params)
|
||||
: InternalSqlTemplate.buildStatsPartitionMaxAvgSizeSql(params);
|
||||
case NDV:
|
||||
case MAX_VALUE:
|
||||
case MIN_VALUE:
|
||||
return nonPartitioned ? InternalSqlTemplate.buildStatsMinMaxNdvValueSql(params)
|
||||
: InternalSqlTemplate.buildStatsPartitionMinMaxNdvValueSql(params);
|
||||
case DATA_SIZE:
|
||||
default:
|
||||
throw new DdlException("Unsupported statistics type: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
protected List<String> executeQuery(SelectStmt query) {
|
||||
// TODO (ML)
|
||||
return null;
|
||||
protected TaskResult executeQuery(StatisticsDesc statsDesc) throws Exception {
|
||||
StatsGranularity granularity = statsDesc.getStatsGranularity();
|
||||
List<StatsType> statsTypes = statsDesc.getStatsTypes();
|
||||
StatsCategory category = statsDesc.getStatsCategory();
|
||||
|
||||
String dbName = Env.getCurrentInternalCatalog()
|
||||
.getDbOrDdlException(category.getDbId()).getFullName();
|
||||
InternalQuery query = new InternalQuery(dbName, statement);
|
||||
InternalQueryResult queryResult = query.query();
|
||||
List<ResultRow> resultRows = queryResult.getResultRows();
|
||||
|
||||
if (resultRows != null && resultRows.size() == 1) {
|
||||
ResultRow resultRow = resultRows.get(0);
|
||||
List<String> columns = resultRow.getColumns();
|
||||
TaskResult result = createNewTaskResult(category, granularity);
|
||||
|
||||
if (columns.size() == statsTypes.size()) {
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
StatsType statsType = StatsType.fromString(columns.get(i));
|
||||
result.getStatsTypeToValue().put(statsType, resultRow.getString(i));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
// Statistics statements are executed singly and return only one row data
|
||||
throw new DdlException("Statistics query result is incorrect, statement: "
|
||||
+ statement + " queryResult: " + queryResult);
|
||||
}
|
||||
|
||||
protected StatisticsTaskResult constructTaskResult(List<String> queryResultList) {
|
||||
// TODO
|
||||
return null;
|
||||
private Map<String, String> getQueryParams(StatisticsDesc statsDesc) throws DdlException {
|
||||
StatsCategory category = statsDesc.getStatsCategory();
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(category.getDbId());
|
||||
Table table = db.getTableOrDdlException(category.getTableId());
|
||||
|
||||
Map<String, String> params = Maps.newHashMap();
|
||||
params.put(InternalSqlTemplate.TABLE, table.getName());
|
||||
params.put(InternalSqlTemplate.PARTITION, category.getPartitionName());
|
||||
params.put(InternalSqlTemplate.COLUMN, category.getColumnName());
|
||||
return params;
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,13 +30,7 @@ public class SampleSQLStatisticsTask extends SQLStatisticsTask {
|
||||
private float samplePercentage = Config.cbo_default_sample_percentage;
|
||||
|
||||
public SampleSQLStatisticsTask(long jobId, List<StatisticsDesc> statsDescs) {
|
||||
// TODO(wzt): implement sql sampling to collect statistics
|
||||
super(jobId, statsDescs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void constructQuery() {
|
||||
// TODO
|
||||
super.constructQuery();
|
||||
// step1: construct table sample
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,7 +25,10 @@ public enum StatsType {
|
||||
MAX_SIZE("max_size"),
|
||||
NUM_NULLS("num_nulls"),
|
||||
MIN_VALUE("min_value"),
|
||||
MAX_VALUE("max_value");
|
||||
MAX_VALUE("max_value"),
|
||||
// only for test
|
||||
UNKNOWN("unknown");
|
||||
|
||||
private final String value;
|
||||
|
||||
StatsType(String value) {
|
||||
|
||||
@ -35,6 +35,11 @@ import java.util.regex.Pattern;
|
||||
* - ${column} and ${table} will be replaced with the actual executed table and column.
|
||||
*/
|
||||
public class InternalSqlTemplate {
|
||||
/** common parameters: tableName, columnName, partitionName */
|
||||
public static final String TABLE = "table";
|
||||
public static final String PARTITION = "partition";
|
||||
public static final String COLUMN = "column";
|
||||
|
||||
/** -------------------------- for statistics begin -------------------------- */
|
||||
public static final String MIN_VALUE_SQL = "SELECT MIN(${column}) AS min_value FROM ${table};";
|
||||
public static final String PARTITION_MIN_VALUE_SQL = "SELECT MIN(${column}) AS min_value"
|
||||
|
||||
@ -0,0 +1,207 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HashDistributionInfo;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.statistics.util.InternalQuery;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult;
|
||||
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
public class SQLStatisticsTaskTest {
|
||||
private SQLStatisticsTask sqlStatisticsTaskUnderTest;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
StatsCategory statsCategory = new StatsCategory();
|
||||
StatsGranularity statsGranularity = new StatsGranularity();
|
||||
List<StatsType> statsTypes = Collections.singletonList(StatsType.ROW_COUNT);
|
||||
sqlStatisticsTaskUnderTest = new SQLStatisticsTask(0L,
|
||||
Collections.singletonList(new StatisticsDesc(statsCategory, statsGranularity, statsTypes)));
|
||||
|
||||
InternalCatalog catalog = Env.getCurrentInternalCatalog();
|
||||
Column column = new Column("columnName", PrimitiveType.STRING);
|
||||
OlapTable tableName = new OlapTable(0L, "tableName",
|
||||
Collections.singletonList(column), KeysType.AGG_KEYS,
|
||||
new PartitionInfo(), new HashDistributionInfo());
|
||||
Database database = new Database(0L, "db");
|
||||
database.createTable(tableName);
|
||||
|
||||
ConcurrentHashMap<String, Database> fullNameToDb = new ConcurrentHashMap<>();
|
||||
fullNameToDb.put("cluster:db", database);
|
||||
Deencapsulation.setField(catalog, "fullNameToDb", fullNameToDb);
|
||||
|
||||
ConcurrentHashMap<Long, Database> idToDb = new ConcurrentHashMap<>();
|
||||
idToDb.put(0L, database);
|
||||
Deencapsulation.setField(catalog, "idToDb", idToDb);
|
||||
|
||||
List<String> columns = Collections.singletonList("row_count");
|
||||
List<PrimitiveType> types = Arrays.asList(PrimitiveType.STRING,
|
||||
PrimitiveType.INT, PrimitiveType.FLOAT,
|
||||
PrimitiveType.DOUBLE, PrimitiveType.BIGINT);
|
||||
InternalQueryResult queryResult = new InternalQueryResult(columns, types);
|
||||
InternalQueryResult.ResultRow resultRow =
|
||||
new InternalQueryResult.ResultRow(Collections.singletonList("1000"));
|
||||
queryResult.getResultRows().add(resultRow);
|
||||
|
||||
new MockUp<InternalQuery>(InternalQuery.class) {
|
||||
@Mock
|
||||
public InternalQueryResult query() {
|
||||
return queryResult;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstructQuery() throws Exception {
|
||||
// Setup
|
||||
String expectedSQL = "SELECT COUNT(1) AS row_count FROM tableName;";
|
||||
|
||||
StatsCategory statsCategory = new StatsCategory();
|
||||
statsCategory.setCategory(StatsCategory.Category.TABLE);
|
||||
statsCategory.setDbId(0L);
|
||||
statsCategory.setTableId(0L);
|
||||
statsCategory.setPartitionName("partitionName");
|
||||
statsCategory.setColumnName("columnName");
|
||||
statsCategory.setStatsValue("statsValue");
|
||||
|
||||
StatsGranularity statsGranularity = new StatsGranularity();
|
||||
statsGranularity.setGranularity(StatsGranularity.Granularity.TABLE);
|
||||
statsGranularity.setTableId(0L);
|
||||
statsGranularity.setPartitionId(0L);
|
||||
statsGranularity.setTabletId(0L);
|
||||
|
||||
StatisticsDesc statsDesc = new StatisticsDesc(statsCategory, statsGranularity,
|
||||
Collections.singletonList(StatsType.ROW_COUNT));
|
||||
|
||||
// Run the test
|
||||
String result = sqlStatisticsTaskUnderTest.constructQuery(statsDesc);
|
||||
|
||||
// Verify the results
|
||||
Assert.assertEquals(expectedSQL, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstructQuery_ThrowsDdlException() {
|
||||
// Setup
|
||||
StatsCategory statsCategory = new StatsCategory();
|
||||
statsCategory.setCategory(StatsCategory.Category.TABLE);
|
||||
statsCategory.setDbId(0L);
|
||||
statsCategory.setTableId(0L);
|
||||
statsCategory.setPartitionName("partitionName");
|
||||
statsCategory.setColumnName("columnName");
|
||||
statsCategory.setStatsValue("statsValue");
|
||||
|
||||
StatsGranularity statsGranularity = new StatsGranularity();
|
||||
statsGranularity.setGranularity(StatsGranularity.Granularity.TABLE);
|
||||
statsGranularity.setTableId(0L);
|
||||
statsGranularity.setPartitionId(0L);
|
||||
statsGranularity.setTabletId(0L);
|
||||
|
||||
StatisticsDesc statsDesc = new StatisticsDesc(statsCategory, statsGranularity,
|
||||
Collections.singletonList(StatsType.UNKNOWN));
|
||||
|
||||
// Run the test
|
||||
Assert.assertThrows(DdlException.class,
|
||||
() -> sqlStatisticsTaskUnderTest.constructQuery(statsDesc));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteQuery() throws Exception {
|
||||
// Setup
|
||||
StatsCategory statsCategory = new StatsCategory();
|
||||
statsCategory.setCategory(StatsCategory.Category.TABLE);
|
||||
statsCategory.setDbId(0L);
|
||||
statsCategory.setTableId(0L);
|
||||
statsCategory.setPartitionName("partitionName");
|
||||
statsCategory.setColumnName("columnName");
|
||||
statsCategory.setStatsValue("statsValue");
|
||||
|
||||
StatsGranularity statsGranularity = new StatsGranularity();
|
||||
statsGranularity.setGranularity(StatsGranularity.Granularity.TABLE);
|
||||
statsGranularity.setTableId(0L);
|
||||
statsGranularity.setPartitionId(0L);
|
||||
statsGranularity.setTabletId(0L);
|
||||
|
||||
StatisticsTaskResult.TaskResult expectedResult = new StatisticsTaskResult.TaskResult();
|
||||
expectedResult.setDbId(0L);
|
||||
expectedResult.setTableId(0L);
|
||||
expectedResult.setPartitionName("partitionName");
|
||||
expectedResult.setColumnName("columnName");
|
||||
expectedResult.setCategory(StatsCategory.Category.TABLE);
|
||||
expectedResult.setGranularity(StatsGranularity.Granularity.TABLE);
|
||||
HashMap<StatsType, String> hashMap = new HashMap<>();
|
||||
hashMap.put(StatsType.ROW_COUNT, "1000");
|
||||
expectedResult.setStatsTypeToValue(hashMap);
|
||||
|
||||
StatisticsDesc statsDesc = new StatisticsDesc(statsCategory, statsGranularity,
|
||||
Collections.singletonList(StatsType.ROW_COUNT));
|
||||
|
||||
// Run the test
|
||||
StatisticsTaskResult.TaskResult result = sqlStatisticsTaskUnderTest.executeQuery(statsDesc);
|
||||
|
||||
// Verify the results
|
||||
Assert.assertEquals(expectedResult, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteQuery_ThrowsException() {
|
||||
// Setup
|
||||
StatsCategory statsCategory = new StatsCategory();
|
||||
statsCategory.setCategory(StatsCategory.Category.TABLE);
|
||||
statsCategory.setDbId(0L);
|
||||
statsCategory.setTableId(0L);
|
||||
statsCategory.setPartitionName("partitionName");
|
||||
statsCategory.setColumnName("columnName");
|
||||
statsCategory.setStatsValue("statsValue");
|
||||
|
||||
StatsGranularity statsGranularity = new StatsGranularity();
|
||||
statsGranularity.setGranularity(StatsGranularity.Granularity.TABLE);
|
||||
statsGranularity.setTableId(0L);
|
||||
statsGranularity.setPartitionId(0L);
|
||||
statsGranularity.setTabletId(0L);
|
||||
|
||||
StatisticsDesc statsDesc = new StatisticsDesc(statsCategory, statsGranularity,
|
||||
Arrays.asList(StatsType.NDV, StatsType.MAX_VALUE, StatsType.MIN_VALUE));
|
||||
|
||||
// Run the test
|
||||
Assert.assertThrows(Exception.class,
|
||||
() -> sqlStatisticsTaskUnderTest.executeQuery(statsDesc));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user