[Improvement](statistics) Improve sample count accuracy (#25175)
While doing sample analyze, the result of row count, null number and datasize need to multiply a coefficient based on the sample percent/rows. This pr is mainly to calculate the coefficient according to the sampled file size over total size.
This commit is contained in:
@ -21,7 +21,6 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.View;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
@ -167,11 +166,6 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
}
|
||||
analyzeProperties.check();
|
||||
|
||||
// TODO support external table
|
||||
if (analyzeProperties.isSampleRows() && !(table instanceof OlapTable)) {
|
||||
throw new AnalysisException("Sampling statistics "
|
||||
+ "collection of external tables is not supported with rows, use percent instead.");
|
||||
}
|
||||
if (analyzeProperties.isSync()
|
||||
&& (analyzeProperties.isAutomatic() || analyzeProperties.getPeriodTimeInMs() != 0)) {
|
||||
throw new AnalysisException("Automatic/Period statistics collection "
|
||||
|
||||
@ -476,11 +476,6 @@ public class TableRef implements ParseNode, Writable {
|
||||
throw new AnalysisException("Sample table " + desc.getTable().getName()
|
||||
+ " type " + desc.getTable().getType() + " is not supported");
|
||||
}
|
||||
if (tableSample != null && TableIf.TableType.HMS_EXTERNAL_TABLE.equals(desc.getTable().getType())) {
|
||||
if (!tableSample.isPercent()) {
|
||||
throw new AnalysisException("HMS table doesn't support sample rows, use percent instead.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -583,4 +583,9 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
|
||||
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Long> getChunkSizes() {
|
||||
throw new NotImplementedException("getChunkSized not implemented");
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,6 +142,10 @@ public interface TableIf {
|
||||
|
||||
Map<String, Set<String>> findReAnalyzeNeededPartitions();
|
||||
|
||||
// Get all the chunk sizes of this table. Now, only HMS external table implemented this interface.
|
||||
// For HMS external table, the return result is a list of all the files' size.
|
||||
List<Long> getChunkSizes();
|
||||
|
||||
void write(DataOutput out) throws IOException;
|
||||
|
||||
/**
|
||||
|
||||
@ -399,4 +399,9 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
|
||||
partitions.add("Dummy Partition");
|
||||
return getBaseSchema().stream().collect(Collectors.toMap(Column::getName, k -> partitions));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Long> getChunkSizes() {
|
||||
throw new NotImplementedException("getChunkSized not implemented");
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
@ -644,6 +645,36 @@ public class HMSExternalTable extends ExternalTable {
|
||||
super.gsonPostProcess();
|
||||
estimatedRowCount = -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Long> getChunkSizes() {
|
||||
HiveMetaStoreCache.HivePartitionValues partitionValues = StatisticsUtil.getPartitionValuesForTable(this);
|
||||
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions
|
||||
= StatisticsUtil.getFilesForPartitions(this, partitionValues, 0);
|
||||
List<Long> result = Lists.newArrayList();
|
||||
for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) {
|
||||
for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) {
|
||||
result.add(file.getLength());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataSize(boolean singleReplica) {
|
||||
long totalSize = StatisticsUtil.getTotalSizeFromHMS(this);
|
||||
// Usually, we can get total size from HMS parameter.
|
||||
if (totalSize > 0) {
|
||||
return totalSize;
|
||||
}
|
||||
// If not found the size in HMS, calculate it by sum all files' size in table.
|
||||
List<Long> chunkSizes = getChunkSizes();
|
||||
long total = 0;
|
||||
for (long size : chunkSizes) {
|
||||
total += size;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -480,7 +480,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
break;
|
||||
case HIVE:
|
||||
scanNode = new HiveScanNode(fileScan.translatePlanNodeId(), tupleDescriptor, false);
|
||||
((HiveScanNode) scanNode).setSelectedPartitions(fileScan.getSelectedPartitions());
|
||||
HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
|
||||
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
|
||||
if (fileScan.getTableSample().isPresent()) {
|
||||
hiveScanNode.setTableSample(new TableSample(fileScan.getTableSample().get().isPercent,
|
||||
fileScan.getTableSample().get().sampleValue, fileScan.getTableSample().get().seek));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("do not support DLA type " + ((HMSExternalTable) table).getDlaType());
|
||||
|
||||
@ -230,11 +230,13 @@ public class BindRelation extends OneAnalysisRuleFactory {
|
||||
Plan hiveViewPlan = parseAndAnalyzeHiveView(hiveCatalog, ddlSql, cascadesContext);
|
||||
return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan);
|
||||
}
|
||||
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, tableQualifier);
|
||||
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, tableQualifier,
|
||||
unboundRelation.getTableSample());
|
||||
case ICEBERG_EXTERNAL_TABLE:
|
||||
case PAIMON_EXTERNAL_TABLE:
|
||||
case MAX_COMPUTE_EXTERNAL_TABLE:
|
||||
return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table, tableQualifier);
|
||||
return new LogicalFileScan(unboundRelation.getRelationId(), (ExternalTable) table, tableQualifier,
|
||||
unboundRelation.getTableSample());
|
||||
case SCHEMA:
|
||||
return new LogicalSchemaScan(unboundRelation.getRelationId(), table, tableQualifier);
|
||||
case JDBC_EXTERNAL_TABLE:
|
||||
|
||||
@ -39,7 +39,8 @@ public class LogicalFileScanToPhysicalFileScan extends OneImplementationRuleFact
|
||||
Optional.empty(),
|
||||
fileScan.getLogicalProperties(),
|
||||
fileScan.getConjuncts(),
|
||||
fileScan.getSelectedPartitions())
|
||||
fileScan.getSelectedPartitions(),
|
||||
fileScan.getTableSample())
|
||||
).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE);
|
||||
}
|
||||
}
|
||||
|
||||
@ -212,7 +212,7 @@ public class LogicalPlanDeepCopier extends DefaultPlanRewriter<DeepCopierContext
|
||||
return context.getRelationReplaceMap().get(fileScan.getRelationId());
|
||||
}
|
||||
LogicalFileScan newFileScan = new LogicalFileScan(StatementScopeIdGenerator.newRelationId(),
|
||||
fileScan.getTable(), fileScan.getQualifier());
|
||||
fileScan.getTable(), fileScan.getQualifier(), fileScan.getTableSample());
|
||||
updateLeadingRelationIdMap(newFileScan.getRelationId(), fileScan.getTable().getName(), newFileScan);
|
||||
updateReplaceMapWithOutput(fileScan, newFileScan, context.exprIdReplaceMap);
|
||||
context.putRelation(fileScan.getRelationId(), newFileScan);
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.trees.TableSample;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
@ -49,22 +50,26 @@ public class LogicalFileScan extends LogicalCatalogRelation {
|
||||
private final Set<Expression> conjuncts;
|
||||
@Getter
|
||||
private final SelectedPartitions selectedPartitions;
|
||||
@Getter
|
||||
private final Optional<TableSample> tableSample;
|
||||
|
||||
/**
|
||||
* Constructor for LogicalFileScan.
|
||||
*/
|
||||
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
|
||||
Set<Expression> conjuncts, SelectedPartitions selectedPartitions) {
|
||||
Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample) {
|
||||
super(id, PlanType.LOGICAL_FILE_SCAN, table, qualifier,
|
||||
groupExpression, logicalProperties);
|
||||
this.conjuncts = conjuncts;
|
||||
this.selectedPartitions = selectedPartitions;
|
||||
this.tableSample = tableSample;
|
||||
}
|
||||
|
||||
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier) {
|
||||
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
Optional<TableSample> tableSample) {
|
||||
this(id, table, qualifier, Optional.empty(), Optional.empty(),
|
||||
Sets.newHashSet(), SelectedPartitions.NOT_PRUNED);
|
||||
Sets.newHashSet(), SelectedPartitions.NOT_PRUNED, tableSample);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -85,24 +90,24 @@ public class LogicalFileScan extends LogicalCatalogRelation {
|
||||
@Override
|
||||
public LogicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, groupExpression,
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions);
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
|
||||
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier,
|
||||
groupExpression, logicalProperties, conjuncts, selectedPartitions);
|
||||
groupExpression, logicalProperties, conjuncts, selectedPartitions, tableSample);
|
||||
}
|
||||
|
||||
public LogicalFileScan withConjuncts(Set<Expression> conjuncts) {
|
||||
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, groupExpression,
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions);
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
|
||||
}
|
||||
|
||||
public LogicalFileScan withSelectedPartitions(SelectedPartitions selectedPartitions) {
|
||||
return new LogicalFileScan(relationId, (ExternalTable) table, qualifier, groupExpression,
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions);
|
||||
Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.DistributionSpec;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.TableSample;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
@ -47,6 +48,8 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
|
||||
private final Set<Expression> conjuncts;
|
||||
@Getter
|
||||
private final SelectedPartitions selectedPartitions;
|
||||
@Getter
|
||||
private final Optional<TableSample> tableSample;
|
||||
|
||||
/**
|
||||
* Constructor for PhysicalFileScan.
|
||||
@ -54,11 +57,12 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
|
||||
public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, Set<Expression> conjuncts,
|
||||
SelectedPartitions selectedPartitions) {
|
||||
SelectedPartitions selectedPartitions, Optional<TableSample> tableSample) {
|
||||
super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties);
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.conjuncts = conjuncts;
|
||||
this.selectedPartitions = selectedPartitions;
|
||||
this.tableSample = tableSample;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -67,12 +71,14 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
|
||||
public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
|
||||
DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, PhysicalProperties physicalProperties,
|
||||
Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions) {
|
||||
Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions,
|
||||
Optional<TableSample> tableSample) {
|
||||
super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties,
|
||||
physicalProperties, statistics);
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.conjuncts = conjuncts;
|
||||
this.selectedPartitions = selectedPartitions;
|
||||
this.tableSample = tableSample;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -95,14 +101,14 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
|
||||
@Override
|
||||
public PhysicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec,
|
||||
groupExpression, getLogicalProperties(), conjuncts, selectedPartitions);
|
||||
groupExpression, getLogicalProperties(), conjuncts, selectedPartitions, tableSample);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
|
||||
return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec,
|
||||
groupExpression, logicalProperties.get(), conjuncts, selectedPartitions);
|
||||
groupExpression, logicalProperties.get(), conjuncts, selectedPartitions, tableSample);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -115,6 +121,6 @@ public class PhysicalFileScan extends PhysicalCatalogRelation {
|
||||
Statistics statistics) {
|
||||
return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec,
|
||||
groupExpression, getLogicalProperties(), physicalProperties, statistics, conjuncts,
|
||||
selectedPartitions);
|
||||
selectedPartitions, tableSample);
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,6 +74,7 @@ import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.Getter;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -95,6 +96,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
protected Map<String, SlotDescriptor> destSlotDescByName;
|
||||
protected TFileScanRangeParams params;
|
||||
|
||||
@Getter
|
||||
protected TableSample tableSample;
|
||||
|
||||
/**
|
||||
|
||||
@ -66,6 +66,7 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HiveScanNode extends FileQueryScanNode {
|
||||
@ -263,9 +264,18 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
totalSize += file.getLength();
|
||||
}
|
||||
}
|
||||
long sampleSize = totalSize * tableSample.getSampleValue() / 100;
|
||||
long sampleSize = 0;
|
||||
if (tableSample.isPercent()) {
|
||||
sampleSize = totalSize * tableSample.getSampleValue() / 100;
|
||||
} else {
|
||||
long estimatedRowSize = 0;
|
||||
for (Column column : hmsTable.getFullSchema()) {
|
||||
estimatedRowSize += column.getDataType().getSlotSize();
|
||||
}
|
||||
sampleSize = estimatedRowSize * tableSample.getSampleValue();
|
||||
}
|
||||
long selectedSize = 0;
|
||||
Collections.shuffle(fileList);
|
||||
Collections.shuffle(fileList, new Random(tableSample.getSeek()));
|
||||
int index = 0;
|
||||
for (HiveMetaStoreCache.HiveFileStatus file : fileList) {
|
||||
selectedSize += file.getLength();
|
||||
|
||||
@ -58,9 +58,7 @@ public class AnalysisInfoBuilder {
|
||||
private boolean samplingPartition;
|
||||
private boolean isAllPartition;
|
||||
private long partitionCount;
|
||||
|
||||
private CronExpression cronExpression;
|
||||
|
||||
private boolean forceFull;
|
||||
|
||||
public AnalysisInfoBuilder() {
|
||||
|
||||
@ -137,6 +137,7 @@ public abstract class BaseAnalysisTask {
|
||||
info, AnalysisState.FAILED,
|
||||
String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis());
|
||||
}
|
||||
tableSample = getTableSample();
|
||||
// External Table level task doesn't contain a column. Don't need to do the column related analyze.
|
||||
if (info.externalTableLevelTask) {
|
||||
return;
|
||||
@ -150,8 +151,6 @@ public abstract class BaseAnalysisTask {
|
||||
Preconditions.checkArgument(!StatisticsUtil.isUnsupportedType(col.getType()),
|
||||
String.format("Column with type %s is not supported", col.getType().toString()));
|
||||
}
|
||||
tableSample = getTableSample();
|
||||
|
||||
}
|
||||
|
||||
public void execute() {
|
||||
@ -230,19 +229,18 @@ public abstract class BaseAnalysisTask {
|
||||
if (info.forceFull) {
|
||||
return null;
|
||||
}
|
||||
long sampleRows = info.sampleRows;
|
||||
if (info.analysisMethod == AnalysisMethod.FULL) {
|
||||
if (Config.enable_auto_sample
|
||||
&& tbl.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes) {
|
||||
sampleRows = Config.huge_table_default_sample_rows;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
// If user specified sample percent or sample rows, use it.
|
||||
if (info.samplePercent > 0) {
|
||||
return new TableSample(true, (long) info.samplePercent);
|
||||
} else if (info.sampleRows > 0) {
|
||||
return new TableSample(false, info.sampleRows);
|
||||
} else if (info.analysisMethod == AnalysisMethod.FULL
|
||||
&& Config.enable_auto_sample
|
||||
&& tbl.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes) {
|
||||
// If user doesn't specify sample percent/rows, use auto sample and update sample rows in analysis info.
|
||||
return new TableSample(false, (long) Config.huge_table_default_sample_rows);
|
||||
} else {
|
||||
return new TableSample(false, sampleRows);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -32,10 +32,12 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.stream.Collectors;
|
||||
@ -43,10 +45,9 @@ import java.util.stream.Collectors;
|
||||
public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class);
|
||||
|
||||
public static final String TOTAL_SIZE = "totalSize";
|
||||
public static final String NUM_ROWS = "numRows";
|
||||
public static final String NUM_FILES = "numFiles";
|
||||
public static final String TIMESTAMP = "transient_lastDdlTime";
|
||||
// While doing sample analysis, the sampled ndv result will multiply a factor (total size/sample size)
|
||||
// if ndv(col)/count(col) is greater than this threshold.
|
||||
private static final String NDV_MULTIPLY_THRESHOLD = "0.3";
|
||||
|
||||
private static final String ANALYZE_TABLE_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
@ -58,14 +59,17 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
+ "${idxId} AS idx_id, "
|
||||
+ "'${colId}' AS col_id, "
|
||||
+ "NULL AS part_id, "
|
||||
+ "${countExpr} AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "${nullCountExpr} AS null_count, "
|
||||
+ "ROUND(COUNT(1) * ${scaleFactor}) AS row_count, "
|
||||
+ "case when NDV(`${colName}`)/count('${colName}') < "
|
||||
+ NDV_MULTIPLY_THRESHOLD
|
||||
+ " then NDV(`${colName}`) "
|
||||
+ "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, "
|
||||
+ "ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor}) AS null_count, "
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "${dataSizeFunction} * ${scaleFactor} AS data_size, "
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
|
||||
|
||||
private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
|
||||
@ -83,7 +87,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where ";
|
||||
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ${countExpr} as rowCount "
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
|
||||
|
||||
// cache stats for each partition, it would be inserted into column_statistics in a batch.
|
||||
@ -160,7 +164,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
params.put("nullCountExpr", getNullCountExpression());
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
executeInsertSql(sql);
|
||||
@ -277,7 +280,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
commonParams.put("catalogName", catalog.getName());
|
||||
commonParams.put("dbName", db.getFullName());
|
||||
commonParams.put("tblName", tbl.getName());
|
||||
commonParams.put("countExpr", getCountExpression());
|
||||
commonParams.put("sampleExpr", getSampleExpression());
|
||||
commonParams.put("scaleFactor", getSampleScaleFactor());
|
||||
if (col != null) {
|
||||
commonParams.put("type", col.getType().toString());
|
||||
}
|
||||
@ -285,30 +289,51 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
return commonParams;
|
||||
}
|
||||
|
||||
protected String getCountExpression() {
|
||||
if (info.samplePercent > 0) {
|
||||
return String.format("ROUND(COUNT(1) * 100 / %d)", info.samplePercent);
|
||||
protected String getSampleExpression() {
|
||||
if (tableSample == null) {
|
||||
return "";
|
||||
}
|
||||
if (tableSample.isPercent()) {
|
||||
return String.format("TABLESAMPLE(%d PERCENT)", tableSample.getSampleValue());
|
||||
} else {
|
||||
return "COUNT(1)";
|
||||
return String.format("TABLESAMPLE(%d ROWS)", tableSample.getSampleValue());
|
||||
}
|
||||
}
|
||||
|
||||
protected String getNullCountExpression() {
|
||||
if (info.samplePercent > 0) {
|
||||
return String.format("ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * 100 / %d)",
|
||||
info.samplePercent);
|
||||
} else {
|
||||
return "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END)";
|
||||
// Get the sample scale factor. While analyzing, the result of count, null count and data size need to
|
||||
// multiply this factor to get more accurate result.
|
||||
protected String getSampleScaleFactor() {
|
||||
if (tableSample == null) {
|
||||
return "1";
|
||||
}
|
||||
}
|
||||
|
||||
protected String getDataSizeFunction(Column column) {
|
||||
String originFunction = super.getDataSizeFunction(column);
|
||||
if (info.samplePercent > 0 && !isPartitionOnly) {
|
||||
return String.format("ROUND((%s) * 100 / %d)", originFunction, info.samplePercent);
|
||||
} else {
|
||||
return originFunction;
|
||||
long target = 0;
|
||||
// Get list of all files' size in this HMS table.
|
||||
List<Long> chunkSizes = table.getChunkSizes();
|
||||
Collections.shuffle(chunkSizes, new Random(tableSample.getSeek()));
|
||||
long total = 0;
|
||||
// Calculate the total size of this HMS table.
|
||||
for (long size : chunkSizes) {
|
||||
total += size;
|
||||
}
|
||||
// Calculate the sample target size for percent and rows sample.
|
||||
if (tableSample.isPercent()) {
|
||||
target = total * tableSample.getSampleValue() / 100;
|
||||
} else {
|
||||
int columnSize = 0;
|
||||
for (Column column : table.getFullSchema()) {
|
||||
columnSize += column.getDataType().getSlotSize();
|
||||
}
|
||||
target = columnSize * tableSample.getSampleValue();
|
||||
}
|
||||
// Calculate the actual sample size (cumulate).
|
||||
long cumulate = 0;
|
||||
for (long size : chunkSizes) {
|
||||
cumulate += size;
|
||||
if (cumulate >= target) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return Double.toString(Math.max(((double) total) / cumulate, 1));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -539,6 +539,19 @@ public class StatisticsUtil {
|
||||
return totalSize / estimatedRowSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total size parameter from HMS.
|
||||
* @param table Hive HMSExternalTable to get HMS total size parameter.
|
||||
* @return Long value of table total size, return 0 if not found.
|
||||
*/
|
||||
public static long getTotalSizeFromHMS(HMSExternalTable table) {
|
||||
Map<String, String> parameters = table.getRemoteTable().getParameters();
|
||||
if (parameters == null) {
|
||||
return 0;
|
||||
}
|
||||
return parameters.containsKey(TOTAL_SIZE) ? Long.parseLong(parameters.get(TOTAL_SIZE)) : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimate iceberg table row count.
|
||||
* Get the row count by adding all task file recordCount.
|
||||
@ -574,50 +587,13 @@ public class StatisticsUtil {
|
||||
if (table.isView()) {
|
||||
return 0;
|
||||
}
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMetaStoreCache((HMSExternalCatalog) table.getCatalog());
|
||||
List<Type> partitionColumnTypes = table.getPartitionColumnTypes();
|
||||
HiveMetaStoreCache.HivePartitionValues partitionValues = null;
|
||||
List<HivePartition> hivePartitions = Lists.newArrayList();
|
||||
int samplePartitionSize = Config.hive_stats_partition_sample_size;
|
||||
int totalPartitionSize = 1;
|
||||
// Get table partitions from cache.
|
||||
if (!partitionColumnTypes.isEmpty()) {
|
||||
// It is ok to get partition values from cache,
|
||||
// no need to worry that this call will invalid or refresh the cache.
|
||||
// because it has enough space to keep partition info of all tables in cache.
|
||||
partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes);
|
||||
}
|
||||
if (partitionValues != null) {
|
||||
Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem();
|
||||
totalPartitionSize = idToPartitionItem.size();
|
||||
Collection<PartitionItem> partitionItems;
|
||||
List<List<String>> partitionValuesList;
|
||||
// If partition number is too large, randomly choose part of them to estimate the whole table.
|
||||
if (samplePartitionSize < totalPartitionSize) {
|
||||
List<PartitionItem> items = new ArrayList<>(idToPartitionItem.values());
|
||||
Collections.shuffle(items);
|
||||
partitionItems = items.subList(0, samplePartitionSize);
|
||||
partitionValuesList = Lists.newArrayListWithCapacity(samplePartitionSize);
|
||||
} else {
|
||||
partitionItems = idToPartitionItem.values();
|
||||
partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize);
|
||||
}
|
||||
for (PartitionItem item : partitionItems) {
|
||||
partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList());
|
||||
}
|
||||
// get partitions without cache, so that it will not invalid the cache when executing
|
||||
// non query request such as `show table status`
|
||||
hivePartitions = cache.getAllPartitionsWithoutCache(table.getDbName(), table.getName(),
|
||||
partitionValuesList);
|
||||
} else {
|
||||
hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true,
|
||||
table.getRemoteTable().getSd().getInputFormat(),
|
||||
table.getRemoteTable().getSd().getLocation(), null));
|
||||
}
|
||||
HiveMetaStoreCache.HivePartitionValues partitionValues = getPartitionValuesForTable(table);
|
||||
int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size();
|
||||
|
||||
// Get files for all partitions.
|
||||
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions = cache.getFilesByPartitionsWithoutCache(
|
||||
hivePartitions, true);
|
||||
int samplePartitionSize = Config.hive_stats_partition_sample_size;
|
||||
List<HiveMetaStoreCache.FileCacheValue> filesByPartitions
|
||||
= getFilesForPartitions(table, partitionValues, samplePartitionSize);
|
||||
long totalSize = 0;
|
||||
// Calculate the total file size.
|
||||
for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) {
|
||||
@ -639,6 +615,63 @@ public class StatisticsUtil {
|
||||
return totalSize / estimatedRowSize;
|
||||
}
|
||||
|
||||
public static HiveMetaStoreCache.HivePartitionValues getPartitionValuesForTable(HMSExternalTable table) {
|
||||
if (table.isView()) {
|
||||
return null;
|
||||
}
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMetaStoreCache((HMSExternalCatalog) table.getCatalog());
|
||||
List<Type> partitionColumnTypes = table.getPartitionColumnTypes();
|
||||
HiveMetaStoreCache.HivePartitionValues partitionValues = null;
|
||||
// Get table partitions from cache.
|
||||
if (!partitionColumnTypes.isEmpty()) {
|
||||
// It is ok to get partition values from cache,
|
||||
// no need to worry that this call will invalid or refresh the cache.
|
||||
// because it has enough space to keep partition info of all tables in cache.
|
||||
partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes);
|
||||
}
|
||||
return partitionValues;
|
||||
}
|
||||
|
||||
public static List<HiveMetaStoreCache.FileCacheValue> getFilesForPartitions(
|
||||
HMSExternalTable table, HiveMetaStoreCache.HivePartitionValues partitionValues, int sampleSize) {
|
||||
if (table.isView()) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMetaStoreCache((HMSExternalCatalog) table.getCatalog());
|
||||
List<HivePartition> hivePartitions = Lists.newArrayList();
|
||||
if (partitionValues != null) {
|
||||
Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem();
|
||||
int totalPartitionSize = idToPartitionItem.size();
|
||||
Collection<PartitionItem> partitionItems;
|
||||
List<List<String>> partitionValuesList;
|
||||
// If partition number is too large, randomly choose part of them to estimate the whole table.
|
||||
if (sampleSize > 0 && sampleSize < totalPartitionSize) {
|
||||
List<PartitionItem> items = new ArrayList<>(idToPartitionItem.values());
|
||||
Collections.shuffle(items);
|
||||
partitionItems = items.subList(0, sampleSize);
|
||||
partitionValuesList = Lists.newArrayListWithCapacity(sampleSize);
|
||||
} else {
|
||||
partitionItems = idToPartitionItem.values();
|
||||
partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize);
|
||||
}
|
||||
for (PartitionItem item : partitionItems) {
|
||||
partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList());
|
||||
}
|
||||
// get partitions without cache, so that it will not invalid the cache when executing
|
||||
// non query request such as `show table status`
|
||||
hivePartitions = cache.getAllPartitionsWithoutCache(table.getDbName(), table.getName(),
|
||||
partitionValuesList);
|
||||
} else {
|
||||
hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true,
|
||||
table.getRemoteTable().getSd().getInputFormat(),
|
||||
table.getRemoteTable().getSd().getLocation(), null));
|
||||
}
|
||||
// Get files for all partitions.
|
||||
return cache.getFilesByPartitionsWithoutCache(hivePartitions, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Iceberg column statistics.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user