[Feature]Support external table sample stats collection (#24376)
Support hive table sample stats collection. Gramma is like `analyze table with sample percent 10`
This commit is contained in:
@ -272,6 +272,10 @@ public class AnalyzeProperties {
|
||||
return properties.containsKey(PROPERTY_FORCE_FULL);
|
||||
}
|
||||
|
||||
public boolean isSampleRows() {
|
||||
return properties.containsKey(PROPERTY_SAMPLE_ROWS);
|
||||
}
|
||||
|
||||
public String toSQL() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("PROPERTIES(");
|
||||
|
||||
@ -166,11 +166,9 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
analyzeProperties.check();
|
||||
|
||||
// TODO support external table
|
||||
if (analyzeProperties.isSample()) {
|
||||
if (!(table instanceof OlapTable)) {
|
||||
throw new AnalysisException("Sampling statistics "
|
||||
+ "collection of external tables is not supported");
|
||||
}
|
||||
if (analyzeProperties.isSampleRows() && !(table instanceof OlapTable)) {
|
||||
throw new AnalysisException("Sampling statistics "
|
||||
+ "collection of external tables is not supported with rows, use percent instead.");
|
||||
}
|
||||
if (analyzeProperties.isSync()
|
||||
&& (analyzeProperties.isAutomatic() || analyzeProperties.getPeriodTimeInMs() != 0)) {
|
||||
|
||||
@ -470,9 +470,16 @@ public class TableRef implements ParseNode, Writable {
|
||||
}
|
||||
|
||||
protected void analyzeSample() throws AnalysisException {
|
||||
if ((sampleTabletIds != null || tableSample != null) && desc.getTable().getType() != TableIf.TableType.OLAP) {
|
||||
if ((sampleTabletIds != null || tableSample != null)
|
||||
&& desc.getTable().getType() != TableIf.TableType.OLAP
|
||||
&& desc.getTable().getType() != TableIf.TableType.HMS_EXTERNAL_TABLE) {
|
||||
throw new AnalysisException("Sample table " + desc.getTable().getName()
|
||||
+ " type " + desc.getTable().getType() + " is not OLAP");
|
||||
+ " type " + desc.getTable().getType() + " is not supported");
|
||||
}
|
||||
if (tableSample != null && TableIf.TableType.HMS_EXTERNAL_TABLE.equals(desc.getTable().getType())) {
|
||||
if (!tableSample.isPercent()) {
|
||||
throw new AnalysisException("HMS table doesn't support sample rows, use percent instead.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1071,6 +1071,9 @@ public class HiveMetaStoreCache {
|
||||
long length;
|
||||
long blockSize;
|
||||
long modificationTime;
|
||||
boolean splittable;
|
||||
List<String> partitionValues;
|
||||
AcidInfo acidInfo;
|
||||
}
|
||||
|
||||
@Data
|
||||
|
||||
@ -2024,6 +2024,7 @@ public class SingleNodePlanner {
|
||||
break;
|
||||
case HIVE:
|
||||
scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
|
||||
((HiveScanNode) scanNode).setTableSample(tblRef.getTableSample());
|
||||
break;
|
||||
default:
|
||||
throw new UserException("Not supported table type: " + ((HMSExternalTable) table).getDlaType());
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.planner.external;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.TableSample;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -92,6 +93,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
protected Map<String, SlotDescriptor> destSlotDescByName;
|
||||
protected TFileScanRangeParams params;
|
||||
|
||||
protected TableSample tableSample;
|
||||
|
||||
/**
|
||||
* External file scan node for Query hms table
|
||||
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
|
||||
@ -200,6 +203,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
setColumnPositionMapping();
|
||||
}
|
||||
|
||||
public void setTableSample(TableSample tSample) {
|
||||
this.tableSample = tSample;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalize(Analyzer analyzer) throws UserException {
|
||||
doFinalize();
|
||||
|
||||
@ -63,6 +63,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
@ -218,6 +219,11 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
if (ConnectContext.get().getExecutor() != null) {
|
||||
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
|
||||
}
|
||||
if (tableSample != null) {
|
||||
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches);
|
||||
splitAllFiles(allFiles, hiveFileStatuses);
|
||||
return;
|
||||
}
|
||||
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
|
||||
// This if branch is to support old splitter, will remove later.
|
||||
if (fileCacheValue.getSplits() != null) {
|
||||
@ -235,6 +241,42 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
private void splitAllFiles(List<Split> allFiles,
|
||||
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses) throws IOException {
|
||||
for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
|
||||
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
|
||||
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
|
||||
status.isSplittable(), status.getPartitionValues(),
|
||||
new HiveSplitCreator(status.getAcidInfo())));
|
||||
}
|
||||
}
|
||||
|
||||
private List<HiveMetaStoreCache.HiveFileStatus> selectFiles(List<FileCacheValue> inputCacheValue) {
|
||||
List<HiveMetaStoreCache.HiveFileStatus> fileList = Lists.newArrayList();
|
||||
long totalSize = 0;
|
||||
for (FileCacheValue value : inputCacheValue) {
|
||||
for (HiveMetaStoreCache.HiveFileStatus file : value.getFiles()) {
|
||||
file.setSplittable(value.isSplittable());
|
||||
file.setPartitionValues(value.getPartitionValues());
|
||||
file.setAcidInfo(value.getAcidInfo());
|
||||
fileList.add(file);
|
||||
totalSize += file.getLength();
|
||||
}
|
||||
}
|
||||
long sampleSize = totalSize * tableSample.getSampleValue() / 100;
|
||||
long selectedSize = 0;
|
||||
Collections.shuffle(fileList);
|
||||
int index = 0;
|
||||
for (HiveMetaStoreCache.HiveFileStatus file : fileList) {
|
||||
selectedSize += file.getLength();
|
||||
index += 1;
|
||||
if (selectedSize >= sampleSize) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return fileList.subList(0, index);
|
||||
}
|
||||
|
||||
private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache, List<HivePartition> partitions) {
|
||||
for (HivePartition partition : partitions) {
|
||||
if (partition.getPartitionValues() == null || partition.getPartitionValues().isEmpty()) {
|
||||
|
||||
@ -17,10 +17,10 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
@ -31,9 +31,6 @@ import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
@ -61,14 +58,14 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
+ "${idxId} AS idx_id, "
|
||||
+ "'${colId}' AS col_id, "
|
||||
+ "NULL AS part_id, "
|
||||
+ "COUNT(1) AS row_count, "
|
||||
+ "${countExpr} AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
|
||||
+ "${nullCountExpr} AS null_count, "
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
|
||||
|
||||
private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
|
||||
@ -86,8 +83,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where ";
|
||||
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ${countExpr} as rowCount "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
|
||||
|
||||
// cache stats for each partition, it would be inserted into column_statistics in a batch.
|
||||
private final List<List<ColStatsData>> buf = new ArrayList<>();
|
||||
@ -163,6 +160,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
params.put("nullCountExpr", getNullCountExpression());
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
executeInsertSql(sql);
|
||||
@ -279,6 +277,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
commonParams.put("catalogName", catalog.getName());
|
||||
commonParams.put("dbName", db.getFullName());
|
||||
commonParams.put("tblName", tbl.getName());
|
||||
commonParams.put("sampleExpr", getSampleExpression());
|
||||
commonParams.put("countExpr", getCountExpression());
|
||||
if (col != null) {
|
||||
commonParams.put("type", col.getType().toString());
|
||||
}
|
||||
@ -286,20 +286,30 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
return commonParams;
|
||||
}
|
||||
|
||||
private void setParameterData(Map<String, String> parameters, Map<String, String> params) {
|
||||
String numRows = "";
|
||||
String timestamp = "";
|
||||
if (parameters.containsKey(NUM_ROWS)) {
|
||||
numRows = parameters.get(NUM_ROWS);
|
||||
protected String getCountExpression() {
|
||||
if (info.samplePercent > 0) {
|
||||
return String.format("ROUND(COUNT(1) * 100 / %d)", info.samplePercent);
|
||||
} else {
|
||||
return "COUNT(1)";
|
||||
}
|
||||
if (parameters.containsKey(TIMESTAMP)) {
|
||||
timestamp = parameters.get(TIMESTAMP);
|
||||
}
|
||||
|
||||
protected String getNullCountExpression() {
|
||||
if (info.samplePercent > 0) {
|
||||
return String.format("ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * 100 / %d)",
|
||||
info.samplePercent);
|
||||
} else {
|
||||
return "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END)";
|
||||
}
|
||||
}
|
||||
|
||||
protected String getDataSizeFunction(Column column) {
|
||||
String originFunction = super.getDataSizeFunction(column);
|
||||
if (info.samplePercent > 0 && !isPartitionOnly) {
|
||||
return String.format("ROUND((%s) * 100 / %d)", originFunction, info.samplePercent);
|
||||
} else {
|
||||
return originFunction;
|
||||
}
|
||||
params.put("numRows", numRows);
|
||||
params.put("rowCount", numRows);
|
||||
params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
|
||||
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
|
||||
ZoneId.systemDefault())));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user