[feat](optimizer) Scale sample stats with ratio to make it more precise (#25079)

Since Doris support query specific tablet only, so we don't depend on tableSample to do sample, instead use grammar: TABLET(id) to do so. In OlapAnalyzeTask, we calculate which tablets would be hit and set theirs id in it, so we could get how many rows actually queried and furthur we could get the scale up ratio here
This commit is contained in:
AKIRA
2023-10-09 21:01:59 +09:00
committed by GitHub
parent 400b9f2f97
commit 08e7a7b932
9 changed files with 209 additions and 32 deletions

View File

@ -129,7 +129,7 @@ public class AnalysisInfo implements Writable {
public final int samplePercent;
@SerializedName("sampleRows")
public final int sampleRows;
public final long sampleRows;
@SerializedName("maxBucketNum")
public final int maxBucketNum;
@ -186,7 +186,7 @@ public class AnalysisInfo implements Writable {
public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, String catalogName, String dbName, String tblName,
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
int samplePercent, long sampleRows, int maxBucketNum, long periodTimeInMs, String message,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull) {

View File

@ -46,7 +46,7 @@ public class AnalysisInfoBuilder {
private AnalysisType analysisType;
private int maxBucketNum;
private int samplePercent;
private int sampleRows;
private long sampleRows;
private long periodTimeInMs;
private long lastExecTimeInMs;
private long timeCostInMs;
@ -179,7 +179,7 @@ public class AnalysisInfoBuilder {
return this;
}
public AnalysisInfoBuilder setSampleRows(int sampleRows) {
public AnalysisInfoBuilder setSampleRows(long sampleRows) {
this.sampleRows = sampleRows;
return this;
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@ -82,7 +83,7 @@ public abstract class BaseAnalysisTask {
+ " MIN(CAST(min AS ${type})) AS min, "
+ " MAX(CAST(max AS ${type})) AS max, "
+ " SUM(data_size_in_bytes) AS data_size, "
+ " NOW() AS update_time\n"
+ " NOW() AS update_time \n"
+ " FROM ${internalDB}.${columnStatTbl}"
+ " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND "
+ " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
@ -105,6 +106,8 @@ public abstract class BaseAnalysisTask {
protected volatile boolean killed;
protected TableSample tableSample = null;
@VisibleForTesting
public BaseAnalysisTask() {
@ -147,6 +150,7 @@ public abstract class BaseAnalysisTask {
Preconditions.checkArgument(!StatisticsUtil.isUnsupportedType(col.getType()),
String.format("Column with type %s is not supported", col.getType().toString()));
}
tableSample = getTableSample();
}
@ -222,23 +226,23 @@ public abstract class BaseAnalysisTask {
return "COUNT(1) * " + column.getType().getSlotSize();
}
protected String getSampleExpression() {
protected TableSample getTableSample() {
if (info.forceFull) {
return "";
return null;
}
int sampleRows = info.sampleRows;
long sampleRows = info.sampleRows;
if (info.analysisMethod == AnalysisMethod.FULL) {
if (Config.enable_auto_sample
&& tbl.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes) {
sampleRows = Config.huge_table_default_sample_rows;
} else {
return "";
return null;
}
}
if (info.samplePercent > 0) {
return String.format("TABLESAMPLE(%d PERCENT)", info.samplePercent);
return new TableSample(true, (long) info.samplePercent);
} else {
return String.format("TABLESAMPLE(%d ROWS)", sampleRows);
return new TableSample(false, sampleRows);
}
}

View File

@ -54,12 +54,12 @@ public class ColStatsData {
public ColStatsData(ResultRow row) {
this.statsId = new StatsId(row);
this.count = Long.parseLong(row.get(7));
this.ndv = Long.parseLong(row.getWithDefault(8, "0"));
this.nullCount = Long.parseLong(row.getWithDefault(9, "0"));
this.count = (long) Double.parseDouble(row.get(7));
this.ndv = (long) Double.parseDouble(row.getWithDefault(8, "0"));
this.nullCount = (long) Double.parseDouble(row.getWithDefault(9, "0"));
this.minLit = row.get(10);
this.maxLit = row.get(11);
this.dataSizeInBytes = Long.parseLong(row.getWithDefault(12, "0"));
this.dataSizeInBytes = (long) Double.parseDouble(row.getWithDefault(12, "0"));
this.updateTime = row.get(13);
}

View File

@ -65,7 +65,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
+ "MAX(`${colName}`) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW() "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
@ -277,7 +277,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
commonParams.put("catalogName", catalog.getName());
commonParams.put("dbName", db.getFullName());
commonParams.put("tblName", tbl.getName());
commonParams.put("sampleExpr", getSampleExpression());
commonParams.put("countExpr", getCountExpression());
if (col != null) {
commonParams.put("type", col.getType().toString());

View File

@ -46,7 +46,7 @@ public class HistogramTask extends BaseAnalysisTask {
+ " HISTOGRAM(`${colName}`, ${maxBucketNum}) AS buckets, "
+ " NOW() AS create_time "
+ "FROM "
+ " `${dbName}`.`${tblName}` ${sampleExpr}";
+ " `${dbName}`.`${tblName}`";
@VisibleForTesting
public HistogramTask() {
@ -71,7 +71,6 @@ public class HistogramTask extends BaseAnalysisTask {
params.put("tblName", String.valueOf(info.tblName));
params.put("colName", String.valueOf(info.colName));
params.put("sampleRate", getSampleRateFunction());
params.put("sampleExpr", getSampleExpression());
params.put("maxBucketNum", String.valueOf(info.maxBucketNum));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);

View File

@ -118,7 +118,6 @@ public class MVAnalysisTask extends BaseAnalysisTask {
params.put("colName", colName);
params.put("tblName", String.valueOf(info.tblName));
params.put("sql", sql);
params.put("sampleExpr", getSampleExpression());
StatisticsUtil.execUpdate(ANALYZE_MV_PART, params);
}
params.remove("partId");

View File

@ -17,8 +17,12 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.QueryState.MysqlStateType;
@ -30,7 +34,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.text.StringSubstitutor;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -48,9 +54,9 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
// NDV should only be computed for the relevant partition.
private static final String ANALYZE_COLUMN_SQL_TEMPLATE = INSERT_COL_STATISTICS
+ " (SELECT NDV(`${colName}`) AS ndv "
+ " FROM `${dbName}`.`${tblName}` ${sampleExpr}) t2\n";
+ " FROM `${dbName}`.`${tblName}`) t2\n";
private static final String collectPartitionStatsSQLTemplate =
private static final String COLLECT_PARTITION_STATS_SQL_TEMPLATE =
" SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
+ "${catalogId} AS catalog_id, "
@ -65,7 +71,25 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
+ "MIN(`${colName}`) AS min, "
+ "MAX(`${colName}`) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW() FROM `${dbName}`.`${tblName}` PARTITION ${partitionName} ${sampleExpr}";
+ "NOW() FROM `${dbName}`.`${tblName}` PARTITION ${partitionName}";
private static final String SAMPLE_COLUMN_SQL_TEMPLATE = "SELECT \n"
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, \n"
+ "${catalogId} AS catalog_id, \n"
+ "${dbId} AS db_id, \n"
+ "${tblId} AS tbl_id, \n"
+ "${idxId} AS idx_id, \n"
+ "'${colId}' AS col_id, \n"
+ "NULL AS part_id, \n"
+ "COUNT(1) * ${scaleFactor} AS row_count, \n"
+ "NDV(`${colName}`) * ${scaleFactor} AS ndv, \n"
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor} AS null_count, \n"
+ "MIN(`${colName}`) AS min, \n"
+ "MAX(`${colName}`) AS max, \n"
+ "${dataSizeFunction} * ${scaleFactor} AS data_size, \n"
+ "NOW()\n"
+ "FROM `${dbName}`.`${tblName}`\n"
+ "${tablets}";
// cache stats for each partition, it would be inserted into column_statistics in a batch.
private final List<List<ColStatsData>> buf = new ArrayList<>();
@ -79,6 +103,97 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
}
public void doExecute() throws Exception {
if (tableSample != null) {
doSample();
} else {
doFull();
}
}
/**
* 1. Get col stats in sample ways
* 2. estimate partition stats
* 3. insert col stats and partition stats
*/
protected void doSample() throws Exception {
Pair<List<Long>, Long> pair = calcActualSampleTablets();
List<Long> tabletIds = pair.first;
double scaleFactor = (double) tbl.getRowCount() / (double) pair.second;
// might happen if row count in fe metadata hasn't been updated yet
if (Double.isInfinite(scaleFactor)) {
scaleFactor = 1;
}
String tabletStr = tabletIds.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) {
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("catalogId", String.valueOf(catalog.getId()));
params.put("dbId", String.valueOf(db.getId()));
params.put("tblId", String.valueOf(tbl.getId()));
params.put("idxId", String.valueOf(info.indexId));
params.put("colId", String.valueOf(info.colName));
params.put("dataSizeFunction", getDataSizeFunction(col));
params.put("dbName", info.dbName);
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(info.tblName));
params.put("scaleFactor", String.valueOf(scaleFactor));
params.put("tablets", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
stmtExecutor = new StmtExecutor(r.connectContext, stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE));
// Scalar query only return one row
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
OlapTable olapTable = (OlapTable) tbl;
Collection<Partition> partitions = olapTable.getPartitions();
int partitionCount = partitions.size();
List<String> values = partitions.stream().map(p -> String.format(
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())",
StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName(), p.getId())),
InternalCatalog.INTERNAL_CATALOG_ID,
db.getId(),
tbl.getId(),
-1,
StatisticsUtil.quote(col.getName()),
p.getId(),
colStatsData.count / partitionCount,
colStatsData.ndv / partitionCount,
colStatsData.nullCount / partitionCount,
StatisticsUtil.quote(colStatsData.minLit),
StatisticsUtil.quote(colStatsData.maxLit),
colStatsData.dataSizeInBytes / partitionCount)).collect(Collectors.toList());
values.add(String.format(
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())",
StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName())),
InternalCatalog.INTERNAL_CATALOG_ID,
db.getId(),
tbl.getId(),
-1,
StatisticsUtil.quote(col.getName()),
"NULL",
colStatsData.count,
colStatsData.ndv,
colStatsData.nullCount,
StatisticsUtil.quote(colStatsData.minLit),
StatisticsUtil.quote(colStatsData.maxLit),
colStatsData.dataSizeInBytes));
String insertSQL = "INSERT INTO "
+ StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
+ " VALUES "
+ String.join(",", values);
stmtExecutor = new StmtExecutor(r.connectContext, insertSQL);
executeWithExceptionOnFail(stmtExecutor);
}
}
/**
* 1. Get stats of each partition
* 2. insert partition in batch
* 3. calculate column stats based on partition stats
*/
protected void doFull() throws Exception {
Set<String> partitionNames = info.colToPartitions.get(info.colName);
if (partitionNames.isEmpty()) {
return;
@ -95,7 +210,6 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
params.put("dbName", info.dbName);
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(info.tblName));
params.put("sampleExpr", getSampleExpression());
List<String> partitionAnalysisSQLs = new ArrayList<>();
try {
tbl.readLock();
@ -109,7 +223,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
// Avoid error when get the default partition
params.put("partitionName", "`" + partitionName + "`");
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
partitionAnalysisSQLs.add(stringSubstitutor.replace(collectPartitionStatsSQLTemplate));
partitionAnalysisSQLs.add(stringSubstitutor.replace(COLLECT_PARTITION_STATS_SQL_TEMPLATE));
}
} finally {
tbl.readUnlock();
@ -158,6 +272,66 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
LOG.debug("analyze task {} end. cost {}ms", info,
System.currentTimeMillis() - startTime);
}
}
// Get sample tablets id and scale up scaleFactor
protected Pair<List<Long>, Long> calcActualSampleTablets() {
// Below code copied from OlapScanNode.java
long sampleRows; // The total number of sample rows
long totalRows = 0; // The total number of partition rows hit
long totalTablet = 0; // The total number of tablets in the hit partition
OlapTable olapTable = (OlapTable) tbl;
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(olapTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = Math.max(tableSample.getSampleValue(), 1);
}
// calculate the number of tablets by each partition
long avgRowsPerPartition = sampleRows / Math.max(olapTable.getPartitions().size(), 1);
List<Long> sampleTabletIds = new ArrayList<>();
long actualSampledRowCount = 0;
for (Partition p : olapTable.getPartitions()) {
List<Long> ids = p.getBaseIndex().getTabletIdsInOrder();
if (ids.isEmpty()) {
continue;
}
// Skip partitions with row count < row count / 2 expected to be sampled per partition.
// It can be expected to sample a smaller number of partitions to avoid uneven distribution
// of sampling results.
if (p.getBaseIndex().getRowCount() < (avgRowsPerPartition / 2)) {
continue;
}
MaterializedIndex baseIndex = p.getBaseIndex();
long avgRowsPerTablet = Math.max(baseIndex.getRowCount() / ids.size(), 1);
long tabletCounts = Math.max(
avgRowsPerPartition / avgRowsPerTablet + (avgRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1);
tabletCounts = Math.min(tabletCounts, ids.size());
long seek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * ids.size());
for (int i = 0; i < tabletCounts; i++) {
int seekTid = (int) ((i + seek) % ids.size());
long tabletId = ids.get(seekTid);
sampleTabletIds.add(tabletId);
actualSampledRowCount += baseIndex.getTablet(tabletId).getRowCount(true);
}
totalRows += p.getBaseIndex().getRowCount();
totalTablet += ids.size();
}
// all hit, direct full
if (totalRows < sampleRows) {
// can't fill full sample rows
sampleTabletIds.clear();
} else if (sampleTabletIds.size() == totalTablet) {
// TODO add limit
sampleTabletIds.clear();
} else if (!sampleTabletIds.isEmpty()) {
// TODO add limit
}
return Pair.of(sampleTabletIds, actualSampledRowCount);
}
}