[Improvement](statistics)Improve stats sample strategy (#26435)

Improve the accuracy of sample stats collection. For non distribution columns, use 
`n*d / (n - f1 + f1*n/N)`

where `f1` is the number of distinct values that occurred exactly once in our sample of n rows (from a total of N),
and `d` is the total number of distinct values in the sample.

For distribution columns, use `ndv(n) * fraction of tablets sampled` for NDV.

For very large tablet to sample, use limit to control the total lines to scan (for non key column only, because key column is sorted and will be inaccurate using limit).
This commit is contained in:
Jibing-Li
2023-11-13 15:52:21 +08:00
committed by GitHub
parent c6b97c4daa
commit 9c6c2f736e
18 changed files with 826 additions and 200 deletions

View File

@ -54,8 +54,7 @@ Syntax:
```SQL
ANALYZE < TABLE | DATABASE table_name | db_name >
[ (column_name [, ...]) ]
[ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] [ WITH SQL ] ]
[ PROPERTIES ("key" = "value", ...) ];
[ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] ];
```
Where:
@ -64,7 +63,6 @@ Where:
- `column_name`: The specified target column. It must be an existing column in `table_name`. You can specify multiple column names separated by commas.
- `sync`: Collect statistics synchronously. Returns after collection. If not specified, it executes asynchronously and returns a JOB ID.
- `sample percent | rows`: Collect statistics with sampling. You can specify a sampling percentage or a number of sampling rows.
- `sql`: Execute SQL to collect statistics for partitioned columns in external tables. By default, partitioned column information is collected from metadata, which is efficient but may not be accurate in terms of row count and data size. Users can specify using SQL to collect accurate partitioned column information.
Here are some examples:
@ -90,6 +88,13 @@ The collection jobs for statistics themselves consume a certain amount of system
If you are concerned about automatic collection jobs interfering with your business, you can specify a time frame for the automatic collection jobs to run during low business loads by setting the `full_auto_analyze_start_time` and `full_auto_analyze_end_time` parameters according to your needs. You can also completely disable this feature by setting the `enable_full_auto_analyze` parameter to `false`.
External catalogs do not participate in automatic collection by default. Because external catalogs often contain massive historical data, if they participate in automatic collection, it may occupy too many resources. You can turn on and off the automatic collection of external catalogs by setting the catalog's properties.
```sql
ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='true'); // Turn on
ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='false'); // Turn off
```
<br />

View File

@ -56,8 +56,7 @@ Doris支持用户通过提交ANALYZE语句来手动触发统计信息的收集
```SQL
ANALYZE < TABLE | DATABASE table_name | db_name >
[ (column_name [, ...]) ]
[ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] [ WITH SQL ] ]
[ PROPERTIES ("key" = "value", ...) ];
[ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] ];
```
其中:
@ -66,7 +65,6 @@ ANALYZE < TABLE | DATABASE table_name | db_name >
- column_name: 指定的目标列。必须是  `table_name`  中存在的列,多个列名称用逗号分隔。
- sync:同步收集统计信息。收集完后返回。若不指定则异步执行并返回JOB ID。
- sample percent | rows:抽样收集统计信息。可以指定抽样比例或者抽样行数。
- sql:执行sql来收集外表分区列统计信息。默认从元数据收集分区列信息,这样效率比较高但是行数和数据量大小可能不准。用户可以指定使用sql来收集,这样可以收集到准确的分区列信息。
下面是一些例子
@ -93,6 +91,13 @@ ANALYZE TABLE lineitem WITH SAMPLE ROWS 100000;
如果担心自动收集作业对业务造成干扰,可结合自身需求通过设置参数`full_auto_analyze_start_time`和参数`full_auto_analyze_end_time`指定自动收集作业在业务负载较低的时间段执行。也可以通过设置参数`enable_full_auto_analyze``false`来彻底关闭本功能。
External catalog 默认不参与自动收集。因为 external catalog 往往包含海量历史数据,如果参与自动收集,可能占用过多资源。可以通过设置 catalog 的 property 来打开和关闭 external catalog 的自动收集。
```sql
ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='true'); // 打开自动收集
ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='false'); // 关闭自动收集
```
<br />
## 2. 作业管理

View File

@ -90,6 +90,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -2371,4 +2372,17 @@ public class OlapTable extends Table {
}
}
}
@Override
public boolean isDistributionColumn(String columnName) {
Set<String> distributeColumns = getDistributionColumnNames()
.stream().map(String::toLowerCase).collect(Collectors.toSet());
return distributeColumns.contains(columnName.toLowerCase(Locale.ROOT));
}
@Override
public boolean isPartitionColumn(String columnName) {
return getPartitionInfo().getPartitionColumns().stream()
.anyMatch(c -> c.getName().equalsIgnoreCase(columnName));
}
}

View File

@ -254,5 +254,13 @@ public interface TableIf {
// TODO: Each tableIf should impl it by itself.
return 0;
}
default boolean isDistributionColumn(String columnName) {
return false;
}
default boolean isPartitionColumn(String columnName) {
return false;
}
}

View File

@ -699,6 +699,12 @@ public class HMSExternalTable extends ExternalTable {
}
return total;
}
@Override
public boolean isDistributionColumn(String columnName) {
return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
.collect(Collectors.toSet()).contains(columnName.toLowerCase(Locale.ROOT));
}
}

View File

@ -76,6 +76,16 @@ public class AnalysisJob {
queryingTask.remove(task);
buf.addAll(statsData);
queryFinished.add(task);
markOneTaskDone();
}
public synchronized void rowCountDone(BaseAnalysisTask task) {
queryingTask.remove(task);
queryFinished.add(task);
markOneTaskDone();
}
protected void markOneTaskDone() {
queryFinishedTaskCount += 1;
if (queryFinishedTaskCount == totalTaskCount) {
writeBuf();
@ -183,6 +193,9 @@ public class AnalysisJob {
protected void syncLoadStats() {
long tblId = jobInfo.tblId;
for (BaseAnalysisTask task : queryFinished) {
if (task.info.externalTableLevelTask) {
continue;
}
String colName = task.col.getName();
if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) {
analysisManager.removeColStatsStatus(tblId, colName);

View File

@ -333,11 +333,11 @@ public class AnalysisManager implements Writable {
boolean isSync = stmt.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
constructJob(jobInfo, analysisTaskInfos.values());
if (!jobInfo.partitionOnly && stmt.isAllColumns()
&& StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
}
constructJob(jobInfo, analysisTaskInfos.values());
if (isSync) {
syncExecute(analysisTaskInfos.values());
updateTableStats(jobInfo);

View File

@ -36,6 +36,7 @@ import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@ -43,46 +44,80 @@ public abstract class BaseAnalysisTask {
public static final Logger LOG = LogManager.getLogger(BaseAnalysisTask.class);
protected static final String NDV_MULTIPLY_THRESHOLD = "0.3";
protected static final String NDV_SAMPLE_TEMPLATE = "case when NDV(`${colName}`)/count('${colName}') < "
+ NDV_MULTIPLY_THRESHOLD
+ " then NDV(`${colName}`) "
+ "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, "
;
public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
public static final double LIMIT_FACTOR = 1.2;
protected static final String COLLECT_COL_STATISTICS =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ " ${catalogId} AS catalog_id, "
+ " ${dbId} AS db_id, "
+ " ${tblId} AS tbl_id, "
+ " ${idxId} AS idx_id, "
+ " '${colId}' AS col_id, "
+ " NULL AS part_id, "
+ " COUNT(1) AS row_count, "
+ " NDV(`${colName}`) AS ndv, "
+ " COUNT(1) - COUNT(${colName}) AS null_count, "
+ " CAST(MIN(${colName}) AS STRING) AS min, "
+ " CAST(MAX(${colName}) AS STRING) AS max, "
+ " ${dataSizeFunction} AS data_size, "
+ " NOW() AS update_time "
+ " FROM `${dbName}`.`${tblName}`";
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ " ${catalogId} AS `catalog_id`, "
+ " ${dbId} AS `db_id`, "
+ " ${tblId} AS `tbl_id`, "
+ " ${idxId} AS `idx_id`, "
+ " '${colId}' AS `col_id`, "
+ " NULL AS `part_id`, "
+ " COUNT(1) AS `row_count`, "
+ " NDV(`${colName}`) AS `ndv`, "
+ " COUNT(1) - COUNT(${colName}) AS `null_count`, "
+ " CAST(MIN(${colName}) AS STRING) AS `min`, "
+ " CAST(MAX(${colName}) AS STRING) AS `max`, "
+ " ${dataSizeFunction} AS `data_size`, "
+ " NOW() AS `update_time` "
+ " FROM `${catalogName}`.`${dbName}`.`${tblName}`";
protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE =
" SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
+ "${tblId} AS tbl_id, "
+ "${idxId} AS idx_id, "
+ "'${colId}' AS col_id, "
+ "NULL AS part_id, "
+ "${row_count} AS row_count, "
+ "${ndv} AS ndv, "
+ "${null_count} AS null_count, "
+ "'${min}' AS min, "
+ "'${max}' AS max, "
+ "${data_size} AS data_size, "
protected static final String LINEAR_ANALYZE_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "ROUND(COUNT(1) * ${scaleFactor}) AS `row_count`, "
+ "ROUND(NDV(`${colName}`) * ${scaleFactor}) as `ndv`, "
+ "ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor}) AS `null_count`, "
+ "${min} AS `min`, "
+ "${max} AS `max`, "
+ "${dataSizeFunction} * ${scaleFactor} AS `data_size`, "
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints} ${limit}";
protected static final String DUJ1_ANALYZE_TEMPLATE = "SELECT "
+ "CONCAT('${tblId}', '-', '${idxId}', '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "${rowCount} AS `row_count`, "
+ "${ndvFunction} as `ndv`, "
+ "IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.count, 0)), 0) * ${scaleFactor} as `null_count`, "
+ "'${min}' AS `min`, "
+ "'${max}' AS `max`, "
+ "${dataSizeFunction} * ${scaleFactor} AS `data_size`, "
+ "NOW() "
+ "FROM ( "
+ " SELECT t0.`${colName}` as column_key, COUNT(1) as `count` "
+ " FROM "
+ " (SELECT `${colName}` FROM `${catalogName}`.`${dbName}`.`${tblName}` "
+ " ${sampleHints} ${limit}) as `t0` "
+ " GROUP BY `t0`.`${colName}` "
+ ") as `t1` ";
protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "${row_count} AS `row_count`, "
+ "${ndv} AS `ndv`, "
+ "${null_count} AS `null_count`, "
+ "'${min}' AS `min`, "
+ "'${max}' AS `max`, "
+ "${data_size} AS `data_size`, "
+ "NOW() ";
protected AnalysisInfo info;
@ -199,29 +234,51 @@ public abstract class BaseAnalysisTask {
return info.jobId;
}
// TODO : time cost is intolerable when column is string type, return 0 directly for now.
protected String getDataSizeFunction(Column column) {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`${colName}`))";
protected String getDataSizeFunction(Column column, boolean useDuj1) {
if (useDuj1) {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`column_key`) * count)";
} else {
return "SUM(t1.count) * " + column.getType().getSlotSize();
}
} else {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`${colName}`))";
} else {
return "COUNT(1) * " + column.getType().getSlotSize();
}
}
return "COUNT(1) * " + column.getType().getSlotSize();
}
// Min value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan.
protected String getMinFunction() {
if (tableSample == null) {
return "CAST(MIN(`${colName}`) as ${type}) ";
return "to_base64(CAST(MIN(`${colName}`) as ${type})) ";
} else {
return "NULL ";
// Min value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan.
return "NULL";
}
}
protected String getNdvFunction(String totalRows) {
String sampleRows = "SUM(t1.count)";
String onceCount = "SUM(IF(t1.count = 1, 1, 0))";
String countDistinct = "COUNT(1)";
// DUJ1 estimator: n*d / (n - f1 + f1*n/N)
// f1 is the count of element that appears only once in the sample.
// (https://github.com/postgres/postgres/blob/master/src/backend/commands/analyze.c)
// (http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.93.8637&rep=rep1&type=pdf)
// sample_row * count_distinct / ( sample_row - once_count + once_count * sample_row / total_row)
String fn = MessageFormat.format("{0} * {1} / ({0} - {2} + {2} * {0} / {3})", sampleRows,
countDistinct, onceCount, totalRows);
return fn;
}
// Max value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan.
protected String getMaxFunction() {
if (tableSample == null) {
return "CAST(MAX(`${colName}`) as ${type}) ";
return "to_base64(CAST(MAX(`${colName}`) as ${type})) ";
} else {
return "NULL ";
return "NULL";
}
}
@ -254,12 +311,11 @@ public abstract class BaseAnalysisTask {
this.job = job;
}
protected void runQuery(String sql) {
protected void runQuery(String sql, boolean needEncode) {
long startTime = System.currentTimeMillis();
try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) {
stmtExecutor = new StmtExecutor(a.connectContext, sql);
stmtExecutor.executeInternalQuery();
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0), needEncode);
job.appendBuf(this, Collections.singletonList(colStatsData));
} finally {
LOG.debug("End cost time in secs: " + (System.currentTimeMillis() - startTime) / 1000);

View File

@ -56,6 +56,8 @@ public class ColStatsData {
public final String updateTime;
public final boolean needEncode;
@VisibleForTesting
public ColStatsData() {
statsId = new StatsId();
@ -66,9 +68,10 @@ public class ColStatsData {
maxLit = null;
dataSizeInBytes = 0;
updateTime = null;
needEncode = true;
}
public ColStatsData(ResultRow row) {
public ColStatsData(ResultRow row, boolean needEncode) {
this.statsId = new StatsId(row);
this.count = (long) Double.parseDouble(row.get(7));
this.ndv = (long) Double.parseDouble(row.getWithDefault(8, "0"));
@ -77,6 +80,7 @@ public class ColStatsData {
this.maxLit = row.get(11);
this.dataSizeInBytes = (long) Double.parseDouble(row.getWithDefault(12, "0"));
this.updateTime = row.get(13);
this.needEncode = needEncode;
}
public String toSQL(boolean roundByParentheses) {
@ -89,10 +93,12 @@ public class ColStatsData {
sj.add(String.valueOf(count));
sj.add(String.valueOf(ndv));
sj.add(String.valueOf(nullCount));
sj.add(minLit == null ? "NULL" :
"'" + Base64.getEncoder().encodeToString(minLit.getBytes(StandardCharsets.UTF_8)) + "'");
sj.add(maxLit == null ? "NULL" :
"'" + Base64.getEncoder().encodeToString(maxLit.getBytes(StandardCharsets.UTF_8)) + "'");
sj.add(minLit == null ? "NULL" : needEncode
? "'" + Base64.getEncoder().encodeToString(minLit.getBytes(StandardCharsets.UTF_8)) + "'"
: "'" + minLit + "'");
sj.add(maxLit == null ? "NULL" : needEncode
? "'" + Base64.getEncoder().encodeToString(maxLit.getBytes(StandardCharsets.UTF_8)) + "'"
: "'" + maxLit + "'");
sj.add(String.valueOf(dataSizeInBytes));
sj.add(StatisticsUtil.quote(updateTime));
return sj.toString();

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.collect.Sets;
@ -174,30 +175,35 @@ public class ColumnStatistic {
String min = row.get(10);
String max = row.get(11);
if (min != null && !min.equalsIgnoreCase("NULL")) {
min = new String(Base64.getDecoder().decode(min),
StandardCharsets.UTF_8);
try {
columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min));
columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min));
} catch (AnalysisException e) {
LOG.warn("Failed to deserialize column {} min value {}.", col, min, e);
min = new String(Base64.getDecoder().decode(min), StandardCharsets.UTF_8);
// Internal catalog get the min/max value using a separate SQL,
// and the value is already encoded by base64. Need to handle internal and external catalog separately.
if (catalogId != InternalCatalog.INTERNAL_CATALOG_ID && min.equalsIgnoreCase("NULL")) {
columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
} else {
try {
columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min));
columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min));
} catch (AnalysisException e) {
LOG.warn("Failed to deserialize column {} min value {}.", col, min, e);
columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
}
}
} else {
columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
}
if (max != null && !max.equalsIgnoreCase("NULL")) {
max = new String(Base64.getDecoder().decode(max),
StandardCharsets.UTF_8);
try {
columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max));
columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max));
} catch (AnalysisException e) {
LOG.warn("Failed to deserialize column {} max value {}.", col, max, e);
max = new String(Base64.getDecoder().decode(max), StandardCharsets.UTF_8);
if (catalogId != InternalCatalog.INTERNAL_CATALOG_ID && max.equalsIgnoreCase("NULL")) {
columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY);
} else {
try {
columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max));
columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max));
} catch (AnalysisException e) {
LOG.warn("Failed to deserialize column {} max value {}.", col, max, e);
columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY);
}
}
} else {
columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY);

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.statistics.util.StatisticsUtil;
@ -36,44 +37,23 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
public class HMSAnalysisTask extends BaseAnalysisTask {
private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class);
// While doing sample analysis, the sampled ndv result will multiply a factor (total size/sample size)
// if ndv(col)/count(col) is greater than this threshold.
private static final String ANALYZE_TABLE_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
+ "${tblId} AS tbl_id, "
+ "${idxId} AS idx_id, "
+ "'${colId}' AS col_id, "
+ "NULL AS part_id, "
+ "ROUND(COUNT(1) * ${scaleFactor}) AS row_count, "
+ NDV_SAMPLE_TEMPLATE
+ "ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor}) AS null_count, "
+ "to_base64(${minFunction}) AS min, "
+ "to_base64(${maxFunction}) AS max, "
+ "${dataSizeFunction} * ${scaleFactor} AS data_size, "
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
private final boolean isTableLevelTask;
private final boolean isPartitionOnly;
private Set<String> partitionNames;
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints}";
private boolean isTableLevelTask;
private boolean isPartitionOnly;
private HMSExternalTable table;
public HMSAnalysisTask() {
}
public HMSAnalysisTask(AnalysisInfo info) {
super(info);
isTableLevelTask = info.externalTableLevelTask;
isPartitionOnly = info.partitionOnly;
partitionNames = info.partitionNames;
table = (HMSExternalTable) tbl;
}
@ -85,6 +65,11 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
}
}
// For test
protected void setTable(HMSExternalTable table) {
this.table = table;
}
/**
* Get table row count
*/
@ -97,6 +82,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(
new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
job.rowCountDone(this);
}
/**
@ -120,34 +106,62 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
return table.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName()));
}
// Get ordinary column stats. Ordinary column means not partition column.
private void getOrdinaryColumnStats() throws Exception {
// An example sql for a column stats:
// INSERT INTO __internal_schema.column_statistics
// SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
// 13002 AS catalog_id,
// 13038 AS db_id,
// 13055 AS tbl_id,
// -1 AS idx_id,
// 'r_regionkey' AS col_id,
// 'NULL' AS part_id,
// COUNT(1) AS row_count,
// NDV(`r_regionkey`) AS ndv,
// SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count,
// MIN(`r_regionkey`) AS min,
// MAX(`r_regionkey`) AS max,
// 0 AS data_size,
// NOW() FROM `hive`.`tpch100`.`region`
StringBuilder sb = new StringBuilder();
sb.append(ANALYZE_TABLE_TEMPLATE);
Map<String, String> params = buildStatsParams("NULL");
params.put("dataSizeFunction", getDataSizeFunction(col));
params.put("minFunction", getMinFunction());
params.put("maxFunction", getMaxFunction());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
params.put("min", getMinFunction());
params.put("max", getMaxFunction());
params.put("dataSizeFunction", getDataSizeFunction(col, false));
Pair<Double, Long> sampleInfo = getSampleInfo();
params.put("scaleFactor", String.valueOf(sampleInfo.first));
StringSubstitutor stringSubstitutor;
if (tableSample == null) {
// Do full analyze
LOG.debug("Will do full collection for column {}", col.getName());
sb.append(COLLECT_COL_STATISTICS);
} else {
// Do sample analyze
LOG.debug("Will do sample collection for column {}", col.getName());
boolean limitFlag = false;
boolean bucketFlag = false;
// If sample size is too large, use limit to control the sample size.
if (needLimit(sampleInfo.second, sampleInfo.first)) {
limitFlag = true;
long columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
double targetRows = (double) sampleInfo.second / columnSize;
// Estimate the new scaleFactor based on the schema.
if (targetRows > StatisticsUtil.getHugeTableSampleRows()) {
params.put("limit", "limit " + StatisticsUtil.getHugeTableSampleRows());
params.put("scaleFactor",
String.valueOf(sampleInfo.first * targetRows / StatisticsUtil.getHugeTableSampleRows()));
}
}
// Distribution columns don't fit for DUJ1 estimator, use linear estimator.
if (tbl.isDistributionColumn(col.getName())) {
bucketFlag = true;
sb.append(LINEAR_ANALYZE_TEMPLATE);
params.put("rowCount", "ROUND(count(1) * ${scaleFactor})");
} else {
sb.append(DUJ1_ANALYZE_TEMPLATE);
params.put("dataSizeFunction", getDataSizeFunction(col, true));
params.put("ndvFunction", getNdvFunction("ROUND(SUM(t1.count) * ${scaleFactor})"));
params.put("rowCount", "ROUND(SUM(t1.count) * ${scaleFactor})");
}
LOG.info("Sample for column [{}]. Scale factor [{}], "
+ "limited [{}], is distribute column [{}]",
col.getName(), params.get("scaleFactor"), limitFlag, bucketFlag);
}
stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
runQuery(sql);
runQuery(sql, true);
}
// Collect the partition column stats through HMS metadata.
// Get all the partition values and calculate the stats based on the values.
private void getPartitionColumnStats() throws Exception {
Set<String> partitionNames = table.getPartitionNames();
Set<String> ndvPartValues = Sets.newHashSet();
@ -190,7 +204,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
params.put("data_size", String.valueOf(dataSize));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
runQuery(sql);
runQuery(sql, true);
}
private String updateMinValue(String currentMin, String value) {
@ -235,20 +249,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
return value.compareTo(currentMax) > 0 ? value : currentMax;
}
private void getPartitionNames() {
if (partitionNames == null) {
if (info.isAllPartition) {
partitionNames = table.getPartitionNames();
} else if (info.partitionCount > 0) {
partitionNames = table.getPartitionNames().stream()
.limit(info.partitionCount).collect(Collectors.toSet());
}
if (partitionNames == null || partitionNames.isEmpty()) {
throw new RuntimeException("Not a partition table or no partition specified.");
}
}
}
private Map<String, String> buildStatsParams(String partId) {
Map<String, String> commonParams = new HashMap<>();
String id = StatisticsUtil.constructId(tbl.getId(), -1);
@ -271,8 +271,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
commonParams.put("catalogName", catalog.getName());
commonParams.put("dbName", db.getFullName());
commonParams.put("tblName", tbl.getName());
commonParams.put("sampleExpr", getSampleExpression());
commonParams.put("scaleFactor", getSampleScaleFactor());
commonParams.put("sampleHints", getSampleHint());
commonParams.put("limit", "");
commonParams.put("scaleFactor", "1");
if (col != null) {
commonParams.put("type", col.getType().toString());
}
@ -280,7 +281,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
return commonParams;
}
protected String getSampleExpression() {
protected String getSampleHint() {
if (tableSample == null) {
return "";
}
@ -291,13 +292,17 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
}
}
// Get the sample scale factor. While analyzing, the result of count, null count and data size need to
// multiply this factor to get more accurate result.
protected String getSampleScaleFactor() {
/**
* Get the pair of sample scale factor and the file size going to sample.
* While analyzing, the result of count, null count and data size need to
* multiply this scale factor to get more accurate result.
* @return Pair of sample scale factor and the file size going to sample.
*/
protected Pair<Double, Long> getSampleInfo() {
if (tableSample == null) {
return "1";
return Pair.of(1.0, 0L);
}
long target = 0;
long target;
// Get list of all files' size in this HMS table.
List<Long> chunkSizes = table.getChunkSizes();
Collections.shuffle(chunkSizes, new Random(tableSample.getSeek()));
@ -324,7 +329,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
break;
}
}
return Double.toString(Math.max(((double) total) / cumulate, 1));
return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate);
}
@Override
@ -336,4 +341,30 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
}
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}
/**
* If the size to sample is larger than LIMIT_SIZE (1GB)
* and is much larger (1.2*) than the size user want to sample,
* use limit to control the total sample size.
* @param sizeToRead The file size to sample.
* @param factor sizeToRead * factor = Table total size.
* @return True if need to limit.
*/
protected boolean needLimit(long sizeToRead, double factor) {
long total = (long) (sizeToRead * factor);
long target;
if (tableSample.isPercent()) {
target = total * tableSample.getSampleValue() / 100;
} else {
int columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
target = columnSize * tableSample.getSampleValue();
}
if (sizeToRead > LIMIT_SIZE && sizeToRead > target * LIMIT_FACTOR) {
return true;
}
return false;
}
}

View File

@ -63,7 +63,7 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
if (isTableLevelTask) {
getTableStats();
} else {
getTableColumnStats();
getColumnStats();
}
}
@ -77,12 +77,13 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
job.rowCountDone(this);
}
/**
* Get column statistics and insert the result to __internal_schema.column_statistics
*/
private void getTableColumnStats() throws Exception {
private void getColumnStats() throws Exception {
// An example sql for a column stats:
// INSERT INTO __internal_schema.column_statistics
// SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
@ -106,10 +107,10 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("colName", col.getName());
params.put("colId", info.colName);
params.put("dataSizeFunction", getDataSizeFunction(col));
params.put("dataSizeFunction", getDataSizeFunction(col, false));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
runQuery(sql);
runQuery(sql, true);
}
private Map<String, String> buildTableStatsParams(String partId) {

View File

@ -30,8 +30,10 @@ import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.text.StringSubstitutor;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -44,23 +46,10 @@ import java.util.stream.Collectors;
*/
public class OlapAnalysisTask extends BaseAnalysisTask {
private static final String SAMPLE_COLUMN_SQL_TEMPLATE = "SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
+ "${tblId} AS tbl_id, "
+ "${idxId} AS idx_id, "
+ "'${colId}' AS col_id, "
+ "NULL AS part_id, "
+ "ROUND(COUNT(1) * ${scaleFactor}) AS row_count, "
+ NDV_SAMPLE_TEMPLATE
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor} AS null_count, "
+ "NULL AS min, "
+ "NULL AS max, "
+ "${dataSizeFunction} * ${scaleFactor} AS data_size, "
+ "NOW() "
+ "FROM `${dbName}`.`${tblName}`"
+ "${tablets}";
private static final String BASIC_STATS_TEMPLATE = "SELECT "
+ "MIN(`${colName}`) as min, "
+ "MAX(`${colName}`) as max "
+ "FROM `${dbName}`.`${tblName}`";
@VisibleForTesting
public OlapAnalysisTask() {
@ -85,46 +74,96 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
* 3. insert col stats and partition stats
*/
protected void doSample() throws Exception {
Pair<List<Long>, Long> pair = calcActualSampleTablets();
LOG.debug("Will do sample collection for column {}", col.getName());
Pair<List<Long>, Long> pair = calcActualSampleTablets(tbl.isPartitionColumn(col.getName()));
LOG.info("Number of tablets selected {}, rows in tablets {}", pair.first.size(), pair.second);
List<Long> tabletIds = pair.first;
double scaleFactor = (double) tbl.getRowCount() / (double) pair.second;
// might happen if row count in fe metadata hasn't been updated yet
if (Double.isInfinite(scaleFactor) || Double.isNaN(scaleFactor)) {
LOG.warn("Scale factor is infinite or Nan, will set scale factor to 1.");
scaleFactor = 1;
tabletIds = Collections.emptyList();
pair.second = tbl.getRowCount();
}
String tabletStr = tabletIds.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) {
// Get basic stats, including min and max.
ResultRow basicStats = collectBasicStat(r);
long rowCount = tbl.getRowCount();
String min = Base64.getEncoder().encodeToString(basicStats.get(0).getBytes(StandardCharsets.UTF_8));
String max = Base64.getEncoder().encodeToString(basicStats.get(1).getBytes(StandardCharsets.UTF_8));
boolean limitFlag = false;
long rowsToSample = pair.second;
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("catalogId", String.valueOf(catalog.getId()));
params.put("catalogName", catalog.getName());
params.put("dbId", String.valueOf(db.getId()));
params.put("tblId", String.valueOf(tbl.getId()));
params.put("idxId", String.valueOf(info.indexId));
params.put("colId", String.valueOf(info.colName));
params.put("dataSizeFunction", getDataSizeFunction(col));
params.put("dataSizeFunction", getDataSizeFunction(col, false));
params.put("dbName", db.getFullName());
params.put("colName", info.colName);
params.put("tblName", tbl.getName());
params.put("scaleFactor", String.valueOf(scaleFactor));
params.put("tablets", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr));
params.put("sampleHints", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr));
params.put("ndvFunction", getNdvFunction(String.valueOf(rowCount)));
params.put("min", min);
params.put("max", max);
params.put("rowCount", String.valueOf(rowCount));
params.put("type", col.getType().toString());
params.put("limit", "");
if (needLimit()) {
// If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate
// the scaleFactor.
limitFlag = true;
rowsToSample = Math.min(getSampleRows(), pair.second);
params.put("limit", "limit " + rowsToSample);
params.put("scaleFactor", String.valueOf(scaleFactor * (double) pair.second / rowsToSample));
}
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
stmtExecutor = new StmtExecutor(r.connectContext, stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE));
// Scalar query only return one row
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
job.appendBuf(this, Collections.singletonList(colStatsData));
String sql;
// Distribution columns don't fit for DUJ1 estimator, use linear estimator.
if (tbl.isDistributionColumn(col.getName())) {
params.put("min", StatisticsUtil.quote(min));
params.put("max", StatisticsUtil.quote(max));
sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE);
} else {
params.put("dataSizeFunction", getDataSizeFunction(col, true));
sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE);
}
LOG.info("Sample for column [{}]. Total rows [{}], rows to sample [{}], scale factor [{}], "
+ "limited [{}], distribute column [{}], partition column [{}], key column [{}]",
col.getName(), params.get("rowCount"), rowsToSample, params.get("scaleFactor"),
limitFlag, tbl.isDistributionColumn(col.getName()),
tbl.isPartitionColumn(col.getName()), col.isKey());
runQuery(sql, false);
}
}
protected ResultRow collectBasicStat(AutoCloseConnectContext context) {
Map<String, String> params = new HashMap<>();
params.put("dbName", db.getFullName());
params.put("colName", info.colName);
params.put("tblName", tbl.getName());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
stmtExecutor = new StmtExecutor(context.connectContext, stringSubstitutor.replace(BASIC_STATS_TEMPLATE));
return stmtExecutor.executeInternalQuery().get(0);
}
/**
* 1. Get stats of each partition
* 2. insert partition in batch
* 3. calculate column stats based on partition stats
*/
protected void doFull() throws Exception {
LOG.debug("Will do full collection for column {}", col.getName());
Set<String> partitionNames = info.colToPartitions.get(info.colName);
if (partitionNames.isEmpty()) {
job.appendBuf(this, Collections.emptyList());
@ -138,37 +177,30 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
params.put("tblId", String.valueOf(tbl.getId()));
params.put("idxId", String.valueOf(info.indexId));
params.put("colId", String.valueOf(info.colName));
params.put("dataSizeFunction", getDataSizeFunction(col));
params.put("dataSizeFunction", getDataSizeFunction(col, false));
params.put("catalogName", catalog.getName());
params.put("dbName", db.getFullName());
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(tbl.getName()));
execSQL(params);
}
@VisibleForTesting
public void execSQL(Map<String, String> params) throws Exception {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String collectColStats = stringSubstitutor.replace(COLLECT_COL_STATISTICS);
runQuery(collectColStats);
runQuery(collectColStats, true);
}
// Get sample tablets id and scale up scaleFactor
protected Pair<List<Long>, Long> calcActualSampleTablets() {
protected Pair<List<Long>, Long> calcActualSampleTablets(boolean forPartitionColumn) {
// Below code copied from OlapScanNode.java
long sampleRows; // The total number of sample rows
long totalRows = 0; // The total number of partition rows hit
long totalTablet = 0; // The total number of tablets in the hit partition
OlapTable olapTable = (OlapTable) tbl;
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(olapTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = Math.max(tableSample.getSampleValue(), 1);
}
sampleRows = getSampleRows();
// calculate the number of tablets by each partition
long avgRowsPerPartition = sampleRows / Math.max(olapTable.getPartitions().size(), 1);
List<Long> sampleTabletIds = new ArrayList<>();
long actualSampledRowCount = 0;
boolean enough = false;
for (Partition p : olapTable.getPartitions()) {
List<Long> ids = p.getBaseIndex().getTabletIdsInOrder();
@ -194,22 +226,55 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
long tabletId = ids.get(seekTid);
sampleTabletIds.add(tabletId);
actualSampledRowCount += baseIndex.getTablet(tabletId).getRowCount(true);
if (actualSampledRowCount >= sampleRows && !forPartitionColumn) {
enough = true;
break;
}
}
totalRows += p.getBaseIndex().getRowCount();
totalTablet += ids.size();
if (enough) {
break;
}
}
// all hit, direct full
if (totalRows < sampleRows) {
// can't fill full sample rows
sampleTabletIds.clear();
} else if (sampleTabletIds.size() == totalTablet) {
// TODO add limit
} else if (sampleTabletIds.size() == totalTablet && !enough) {
sampleTabletIds.clear();
} else if (!sampleTabletIds.isEmpty()) {
// TODO add limit
}
return Pair.of(sampleTabletIds, actualSampledRowCount);
}
/**
* For ordinary column (neither key column nor partition column), need to limit sample size to user specified value.
* @return Return true when need to limit.
*/
protected boolean needLimit() {
// Key column is sorted, use limit will cause the ndv not accurate enough, so skip key columns.
if (col.isKey()) {
return false;
}
// Partition column need to scan tablets from all partitions.
if (tbl.isPartitionColumn(col.getName())) {
return false;
}
return true;
}
/**
* Calculate rows to sample based on user given sample value.
* @return Rows to sample.
*/
protected long getSampleRows() {
long sampleRows;
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(tbl.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = Math.max(tableSample.getSampleValue(), 1);
}
return sampleRows;
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.system.SystemInfoService;
import java.util.ArrayList;
@ -68,7 +69,8 @@ public class StatisticConstants {
public static final String DB_NAME = SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME;
public static final String FULL_QUALIFIED_STATS_TBL_NAME = FeConstants.INTERNAL_DB_NAME + "." + STATISTIC_TBL_NAME;
public static final String FULL_QUALIFIED_STATS_TBL_NAME = InternalCatalog.INTERNAL_CATALOG_NAME
+ "." + FeConstants.INTERNAL_DB_NAME + "." + STATISTIC_TBL_NAME;
public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3;

View File

@ -158,7 +158,7 @@ public class AnalyzeTest extends TestWithFeService {
new MockUp<BaseAnalysisTask>() {
@Mock
protected void runQuery(String sql) {}
protected void runQuery(String sql, boolean needEncode) {}
};
HashMap<String, Set<String>> colToPartitions = Maps.newHashMap();
colToPartitions.put("col1", Collections.singleton("t1"));

View File

@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class BaseAnalysisTaskTest {
@Test
public void testGetFunctions() {
OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
Column column = new Column("string_column", PrimitiveType.STRING);
String dataSizeFunction = olapAnalysisTask.getDataSizeFunction(column, true);
Assertions.assertEquals("SUM(LENGTH(`column_key`) * count)", dataSizeFunction);
dataSizeFunction = olapAnalysisTask.getDataSizeFunction(column, false);
Assertions.assertEquals("SUM(LENGTH(`${colName}`))", dataSizeFunction);
column = new Column("int_column", PrimitiveType.INT);
dataSizeFunction = olapAnalysisTask.getDataSizeFunction(column, false);
Assertions.assertEquals("COUNT(1) * 4", dataSizeFunction);
dataSizeFunction = olapAnalysisTask.getDataSizeFunction(column, true);
Assertions.assertEquals("SUM(t1.count) * 4", dataSizeFunction);
String minFunction = olapAnalysisTask.getMinFunction();
Assertions.assertEquals("to_base64(CAST(MIN(`${colName}`) as ${type})) ", minFunction);
olapAnalysisTask.tableSample = new TableSample(true, 20L);
minFunction = olapAnalysisTask.getMinFunction();
Assertions.assertEquals("NULL", minFunction);
olapAnalysisTask.tableSample = null;
String maxFunction = olapAnalysisTask.getMaxFunction();
Assertions.assertEquals("to_base64(CAST(MAX(`${colName}`) as ${type})) ", maxFunction);
olapAnalysisTask.tableSample = new TableSample(true, 20L);
maxFunction = olapAnalysisTask.getMaxFunction();
Assertions.assertEquals("NULL", maxFunction);
String ndvFunction = olapAnalysisTask.getNdvFunction(String.valueOf(100));
Assertions.assertEquals("SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) "
+ "+ SUM(IF(t1.count = 1, 1, 0)) * SUM(t1.count) / 100)", ndvFunction);
System.out.println(ndvFunction);
}
}

View File

@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.collect.Lists;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
public class HMSAnalysisTaskTest {
@Test
public void testNeedLimit(@Mocked HMSExternalTable tableIf)
throws Exception {
new MockUp<HMSExternalTable>() {
@Mock
public List<Column> getFullSchema() {
ArrayList<Column> objects = Lists.newArrayList();
objects.add(new Column("int_column", PrimitiveType.INT));
return objects;
}
};
HMSAnalysisTask task = new HMSAnalysisTask();
task.setTable(tableIf);
task.tableSample = new TableSample(true, 10L);
Assertions.assertFalse(task.needLimit(100, 5.0));
task.tableSample = new TableSample(false, 100L);
Assertions.assertFalse(task.needLimit(100, 5.0));
Assertions.assertTrue(task.needLimit(2L * 1024 * 1024 * 1024, 5.0));
task.tableSample = new TableSample(false, 512L * 1024 * 1024);
Assertions.assertFalse(task.needLimit(2L * 1024 * 1024 * 1024, 5.0));
}
@Test
public void testAutoSampleHugeTable(@Mocked HMSExternalTable tableIf)
throws Exception {
new MockUp<HMSExternalTable>() {
@Mock
public long getDataSize(boolean singleReplica) {
return 6L * 1024 * 1024 * 1024;
}
};
HMSAnalysisTask task = new HMSAnalysisTask();
task.tbl = tableIf;
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder();
analysisInfoBuilder.setJobType(AnalysisInfo.JobType.SYSTEM);
analysisInfoBuilder.setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL);
task.info = analysisInfoBuilder.build();
TableSample tableSample = task.getTableSample();
Assertions.assertFalse(tableSample.isPercent());
Assertions.assertEquals(StatisticsUtil.getHugeTableSampleRows(), tableSample.getSampleValue());
}
@Test
public void testAutoSampleSmallTable(@Mocked HMSExternalTable tableIf)
throws Exception {
new MockUp<HMSExternalTable>() {
@Mock
public long getDataSize(boolean singleReplica) {
return 1000;
}
};
HMSAnalysisTask task = new HMSAnalysisTask();
task.tbl = tableIf;
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder();
analysisInfoBuilder.setJobType(AnalysisInfo.JobType.SYSTEM);
analysisInfoBuilder.setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL);
task.info = analysisInfoBuilder.build();
TableSample tableSample = task.getTableSample();
Assertions.assertNull(tableSample);
}
@Test
public void testManualFull(@Mocked HMSExternalTable tableIf)
throws Exception {
new MockUp<HMSExternalTable>() {
@Mock
public long getDataSize(boolean singleReplica) {
return 1000;
}
};
HMSAnalysisTask task = new HMSAnalysisTask();
task.tbl = tableIf;
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder();
analysisInfoBuilder.setJobType(AnalysisInfo.JobType.MANUAL);
analysisInfoBuilder.setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL);
task.info = analysisInfoBuilder.build();
TableSample tableSample = task.getTableSample();
Assertions.assertNull(tableSample);
}
@Test
public void testManualSample(@Mocked HMSExternalTable tableIf)
throws Exception {
new MockUp<HMSExternalTable>() {
@Mock
public long getDataSize(boolean singleReplica) {
return 1000;
}
};
HMSAnalysisTask task = new HMSAnalysisTask();
task.tbl = tableIf;
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder();
analysisInfoBuilder.setJobType(AnalysisInfo.JobType.MANUAL);
analysisInfoBuilder.setAnalysisMethod(AnalysisInfo.AnalysisMethod.SAMPLE);
analysisInfoBuilder.setSampleRows(1000);
task.info = analysisInfoBuilder.build();
TableSample tableSample = task.getTableSample();
Assertions.assertNotNull(tableSample);
Assertions.assertEquals(1000, tableSample.getSampleValue());
}
}

View File

@ -18,20 +18,35 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class OlapAnalysisTaskTest {
// test manual
@ -98,7 +113,196 @@ public class OlapAnalysisTaskTest {
olapAnalysisTask.tbl = tbl;
TableSample tableSample = olapAnalysisTask.getTableSample();
Assertions.assertNull(tableSample);
}
@Test
public void testManualSampleNonDistributeKey(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked OlapTable tableIf)
throws Exception {
new Expectations() {
{
tableIf.getRowCount();
result = 500;
tableIf.getId();
result = 30001;
catalogIf.getId();
result = 10001;
catalogIf.getName();
result = "catalogName";
databaseIf.getId();
result = 20001;
}
};
new MockUp<OlapAnalysisTask>() {
@Mock
public Pair<List<Long>, Long> calcActualSampleTablets() {
return Pair.of(Lists.newArrayList(), 100L);
}
@Mock
public ResultRow collectBasicStat(AutoCloseConnectContext context) {
List<String> values = Lists.newArrayList();
values.add("1");
values.add("2");
return new ResultRow(values);
}
@Mock
public void runQuery(String sql, boolean needEncode) {
Assertions.assertFalse(needEncode);
Assertions.assertEquals("SELECT CONCAT('30001', '-', '-1', '-', 'null') AS `id`, 10001 AS `catalog_id`, 20001 AS `db_id`, 30001 AS `tbl_id`, -1 AS `idx_id`, 'null' AS `col_id`, NULL AS `part_id`, 500 AS `row_count`, SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * SUM(t1.count) / 500) as `ndv`, IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.count, 0)), 0) * 5.0 as `null_count`, 'MQ==' AS `min`, 'Mg==' AS `max`, SUM(LENGTH(`column_key`) * count) * 5.0 AS `data_size`, NOW() FROM ( SELECT t0.`${colName}` as column_key, COUNT(1) as `count` FROM (SELECT `${colName}` FROM `catalogName`.`${dbName}`.`${tblName}` limit 100) as `t0` GROUP BY `t0`.`${colName}` ) as `t1` ", sql);
return;
}
};
new MockUp<StatisticsUtil>() {
@Mock
public AutoCloseConnectContext buildConnectContext(boolean scanLimit) {
return null;
}
};
new MockUp<OlapTable>() {
@Mock
public Set<String> getDistributionColumnNames() {
return Sets.newHashSet();
}
};
OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
olapAnalysisTask.col = new Column("test", PrimitiveType.STRING);
olapAnalysisTask.tbl = tableIf;
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder();
analysisInfoBuilder.setJobType(AnalysisInfo.JobType.MANUAL);
olapAnalysisTask.info = analysisInfoBuilder.build();
olapAnalysisTask.catalog = catalogIf;
olapAnalysisTask.db = databaseIf;
olapAnalysisTask.tableSample = new TableSample(false, 100L);
olapAnalysisTask.doSample();
}
@Test
public void testManualSampleDistributeKey(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked OlapTable tableIf)
throws Exception {
new Expectations() {
{
tableIf.getRowCount();
result = 500;
tableIf.getId();
result = 30001;
catalogIf.getId();
result = 10001;
catalogIf.getName();
result = "catalogName";
databaseIf.getId();
result = 20001;
}
};
new MockUp<OlapAnalysisTask>() {
@Mock
public Pair<List<Long>, Long> calcActualSampleTablets() {
return Pair.of(Lists.newArrayList(), 100L);
}
@Mock
public ResultRow collectBasicStat(AutoCloseConnectContext context) {
List<String> values = Lists.newArrayList();
values.add("1");
values.add("2");
return new ResultRow(values);
}
@Mock
public void runQuery(String sql, boolean needEncode) {
Assertions.assertFalse(needEncode);
Assertions.assertEquals(" SELECT CONCAT(30001, '-', -1, '-', 'null') AS `id`, 10001 AS `catalog_id`, 20001 AS `db_id`, 30001 AS `tbl_id`, -1 AS `idx_id`, 'null' AS `col_id`, NULL AS `part_id`, ROUND(COUNT(1) * 5.0) AS `row_count`, ROUND(NDV(`${colName}`) * 5.0) as `ndv`, ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * 5.0) AS `null_count`, 'MQ==' AS `min`, 'Mg==' AS `max`, SUM(LENGTH(`${colName}`)) * 5.0 AS `data_size`, NOW() FROM `catalogName`.`${dbName}`.`${tblName}` limit 100", sql);
return;
}
};
new MockUp<StatisticsUtil>() {
@Mock
public AutoCloseConnectContext buildConnectContext(boolean scanLimit) {
return null;
}
};
new MockUp<OlapTable>() {
@Mock
public Set<String> getDistributionColumnNames() {
HashSet<String> cols = Sets.newHashSet();
cols.add("test");
return cols;
}
@Mock
public boolean isDistributionColumn(String columnName) {
return true;
}
};
OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
olapAnalysisTask.col = new Column("test", PrimitiveType.STRING);
olapAnalysisTask.tbl = tableIf;
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder();
analysisInfoBuilder.setJobType(AnalysisInfo.JobType.MANUAL);
olapAnalysisTask.info = analysisInfoBuilder.build();
olapAnalysisTask.catalog = catalogIf;
olapAnalysisTask.db = databaseIf;
olapAnalysisTask.tableSample = new TableSample(false, 100L);
olapAnalysisTask.doSample();
}
@Test
public void testNeedLimitFalse(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked OlapTable tableIf)
throws Exception {
new MockUp<OlapTable>() {
@Mock
public PartitionInfo getPartitionInfo() {
ArrayList<Column> columns = Lists.newArrayList();
columns.add(new Column("test", PrimitiveType.STRING));
return new PartitionInfo(PartitionType.RANGE, columns);
}
@Mock
public boolean isPartitionColumn(String columnName) {
return true;
}
};
OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
olapAnalysisTask.col = new Column("test", Type.fromPrimitiveType(PrimitiveType.STRING),
true, null, null, null);
olapAnalysisTask.tbl = tableIf;
Assertions.assertFalse(olapAnalysisTask.needLimit());
olapAnalysisTask.col = new Column("test", Type.fromPrimitiveType(PrimitiveType.STRING),
false, null, null, null);
Assertions.assertFalse(olapAnalysisTask.needLimit());
}
@Test
public void testNeedLimitTrue(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked OlapTable tableIf)
throws Exception {
new MockUp<OlapTable>() {
@Mock
public PartitionInfo getPartitionInfo() {
ArrayList<Column> columns = Lists.newArrayList();
columns.add(new Column("NOFOUND", PrimitiveType.STRING));
return new PartitionInfo(PartitionType.RANGE, columns);
}
};
OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask();
olapAnalysisTask.tbl = tableIf;
olapAnalysisTask.col = new Column("test", Type.fromPrimitiveType(PrimitiveType.STRING),
false, null, null, null);
Assertions.assertTrue(olapAnalysisTask.needLimit());
}
}