[Feature](statistics)Support external table analyze partition (#24154)
Enable collect partition level stats for hive external table.
This commit is contained in:
@ -528,6 +528,7 @@ terminal String
|
||||
KW_QUOTA,
|
||||
KW_RANDOM,
|
||||
KW_RANGE,
|
||||
KW_RECENT,
|
||||
KW_READ,
|
||||
KW_REBALANCE,
|
||||
KW_RECOVER,
|
||||
@ -5900,6 +5901,14 @@ partition_names ::=
|
||||
{:
|
||||
RESULT = new PartitionNames(true, Lists.newArrayList(partName));
|
||||
:}
|
||||
| KW_PARTITIONS LPAREN STAR RPAREN
|
||||
{:
|
||||
RESULT = new PartitionNames(true);
|
||||
:}
|
||||
| KW_PARTITIONS KW_WITH KW_RECENT INTEGER_LITERAL:count
|
||||
{:
|
||||
RESULT = new PartitionNames(count);
|
||||
:}
|
||||
;
|
||||
|
||||
opt_table_sample ::=
|
||||
|
||||
@ -84,7 +84,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
|
||||
private final TableName tableName;
|
||||
private List<String> columnNames;
|
||||
private List<String> partitionNames;
|
||||
private PartitionNames partitionNames;
|
||||
private boolean isAllColumns;
|
||||
|
||||
// after analyzed
|
||||
@ -97,7 +97,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
AnalyzeProperties properties) {
|
||||
super(properties);
|
||||
this.tableName = tableName;
|
||||
this.partitionNames = partitionNames == null ? null : partitionNames.getPartitionNames();
|
||||
this.partitionNames = partitionNames;
|
||||
this.columnNames = columnNames;
|
||||
this.analyzeProperties = properties;
|
||||
this.isAllColumns = columnNames == null;
|
||||
@ -240,14 +240,34 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
}
|
||||
|
||||
public Set<String> getPartitionNames() {
|
||||
Set<String> partitions = partitionNames == null ? table.getPartitionNames() : Sets.newHashSet(partitionNames);
|
||||
if (isSamplingPartition()) {
|
||||
int partNum = ConnectContext.get().getSessionVariable().getExternalTableAnalyzePartNum();
|
||||
partitions = partitions.stream().limit(partNum).collect(Collectors.toSet());
|
||||
if (partitionNames == null || partitionNames.getPartitionNames() == null) {
|
||||
return null;
|
||||
}
|
||||
Set<String> partitions = Sets.newHashSet();
|
||||
partitions.addAll(partitionNames.getPartitionNames());
|
||||
/*
|
||||
if (isSamplingPartition()) {
|
||||
int partNum = ConnectContext.get().getSessionVariable().getExternalTableAnalyzePartNum();
|
||||
partitions = partitions.stream().limit(partNum).collect(Collectors.toSet());
|
||||
}
|
||||
*/
|
||||
return partitions;
|
||||
}
|
||||
|
||||
public boolean isAllPartitions() {
|
||||
if (partitionNames == null) {
|
||||
return false;
|
||||
}
|
||||
return partitionNames.isAllPartitions();
|
||||
}
|
||||
|
||||
public long getPartitionCount() {
|
||||
if (partitionNames == null) {
|
||||
return 0;
|
||||
}
|
||||
return partitionNames.getCount();
|
||||
}
|
||||
|
||||
public boolean isPartitionOnly() {
|
||||
return partitionNames != null;
|
||||
}
|
||||
|
||||
@ -48,15 +48,37 @@ public class PartitionNames implements ParseNode, Writable {
|
||||
// true if these partitions are temp partitions
|
||||
@SerializedName(value = "isTemp")
|
||||
private final boolean isTemp;
|
||||
private final boolean allPartitions;
|
||||
private final long count;
|
||||
// Default partition count to collect statistic for external table.
|
||||
private static final long DEFAULT_PARTITION_COUNT = 100;
|
||||
|
||||
public PartitionNames(boolean isTemp, List<String> partitionNames) {
|
||||
this.partitionNames = partitionNames;
|
||||
this.isTemp = isTemp;
|
||||
this.allPartitions = false;
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
public PartitionNames(PartitionNames other) {
|
||||
this.partitionNames = Lists.newArrayList(other.partitionNames);
|
||||
this.isTemp = other.isTemp;
|
||||
this.allPartitions = other.allPartitions;
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
public PartitionNames(boolean allPartitions) {
|
||||
this.partitionNames = null;
|
||||
this.isTemp = false;
|
||||
this.allPartitions = allPartitions;
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
public PartitionNames(long partitionCount) {
|
||||
this.partitionNames = null;
|
||||
this.isTemp = false;
|
||||
this.allPartitions = false;
|
||||
this.count = partitionCount;
|
||||
}
|
||||
|
||||
public List<String> getPartitionNames() {
|
||||
@ -67,9 +89,23 @@ public class PartitionNames implements ParseNode, Writable {
|
||||
return isTemp;
|
||||
}
|
||||
|
||||
public boolean isAllPartitions() {
|
||||
return allPartitions;
|
||||
}
|
||||
|
||||
public long getCount() {
|
||||
return count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException {
|
||||
if (partitionNames.isEmpty()) {
|
||||
if (allPartitions && count > 0) {
|
||||
throw new AnalysisException("All partition and partition count couldn't be set at the same time.");
|
||||
}
|
||||
if (allPartitions || count > 0) {
|
||||
return;
|
||||
}
|
||||
if (partitionNames == null || partitionNames.isEmpty()) {
|
||||
throw new AnalysisException("No partition specified in partition lists");
|
||||
}
|
||||
// check if partition name is not empty string
|
||||
|
||||
@ -57,6 +57,7 @@ import org.apache.iceberg.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.time.LocalDate;
|
||||
@ -628,6 +629,12 @@ public class HMSExternalTable extends ExternalTable {
|
||||
builder.setMaxValue(Double.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void gsonPostProcess() throws IOException {
|
||||
super.gsonPostProcess();
|
||||
estimatedRowCount = -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -155,13 +155,19 @@ public class AnalysisInfo implements Writable {
|
||||
// True means this task is a table level task for external table.
|
||||
// This kind of task is mainly to collect the number of rows of a table.
|
||||
@SerializedName("externalTableLevelTask")
|
||||
public boolean externalTableLevelTask;
|
||||
public final boolean externalTableLevelTask;
|
||||
|
||||
@SerializedName("partitionOnly")
|
||||
public boolean partitionOnly;
|
||||
public final boolean partitionOnly;
|
||||
|
||||
@SerializedName("samplingPartition")
|
||||
public boolean samplingPartition;
|
||||
public final boolean samplingPartition;
|
||||
|
||||
@SerializedName("isAllPartition")
|
||||
public final boolean isAllPartition;
|
||||
|
||||
@SerializedName("partitionCount")
|
||||
public final long partitionCount;
|
||||
|
||||
// For serialize
|
||||
@SerializedName("cronExpr")
|
||||
@ -181,7 +187,7 @@ public class AnalysisInfo implements Writable {
|
||||
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
|
||||
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
|
||||
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
|
||||
CronExpression cronExpression, boolean forceFull) {
|
||||
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull) {
|
||||
this.jobId = jobId;
|
||||
this.taskId = taskId;
|
||||
this.taskIds = taskIds;
|
||||
@ -208,6 +214,8 @@ public class AnalysisInfo implements Writable {
|
||||
this.externalTableLevelTask = isExternalTableLevelTask;
|
||||
this.partitionOnly = partitionOnly;
|
||||
this.samplingPartition = samplingPartition;
|
||||
this.isAllPartition = isAllPartition;
|
||||
this.partitionCount = partitionCount;
|
||||
this.cronExpression = cronExpression;
|
||||
if (cronExpression != null) {
|
||||
this.cronExprStr = cronExpression.getCronExpression();
|
||||
|
||||
@ -56,6 +56,8 @@ public class AnalysisInfoBuilder {
|
||||
private boolean externalTableLevelTask;
|
||||
private boolean partitionOnly;
|
||||
private boolean samplingPartition;
|
||||
private boolean isAllPartition;
|
||||
private long partitionCount;
|
||||
|
||||
private CronExpression cronExpression;
|
||||
|
||||
@ -91,6 +93,8 @@ public class AnalysisInfoBuilder {
|
||||
externalTableLevelTask = info.externalTableLevelTask;
|
||||
partitionOnly = info.partitionOnly;
|
||||
samplingPartition = info.samplingPartition;
|
||||
isAllPartition = info.isAllPartition;
|
||||
partitionCount = info.partitionCount;
|
||||
cronExpression = info.cronExpression;
|
||||
forceFull = info.forceFull;
|
||||
}
|
||||
@ -225,6 +229,16 @@ public class AnalysisInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setAllPartition(boolean isAllPartition) {
|
||||
this.isAllPartition = isAllPartition;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setPartitionCount(long partitionCount) {
|
||||
this.partitionCount = partitionCount;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setCronExpression(CronExpression cronExpression) {
|
||||
this.cronExpression = cronExpression;
|
||||
}
|
||||
@ -237,6 +251,38 @@ public class AnalysisInfoBuilder {
|
||||
return new AnalysisInfo(jobId, taskId, taskIds, catalogName, dbName, tblName, colToPartitions, partitionNames,
|
||||
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
|
||||
externalTableLevelTask, partitionOnly, samplingPartition, cronExpression, forceFull);
|
||||
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
|
||||
cronExpression, forceFull);
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder copy() {
|
||||
return new AnalysisInfoBuilder()
|
||||
.setJobId(jobId)
|
||||
.setTaskId(taskId)
|
||||
.setTaskIds(taskIds)
|
||||
.setCatalogName(catalogName)
|
||||
.setDbName(dbName)
|
||||
.setTblName(tblName)
|
||||
.setColToPartitions(colToPartitions)
|
||||
.setColName(colName)
|
||||
.setIndexId(indexId)
|
||||
.setJobType(jobType)
|
||||
.setAnalysisMode(analysisMode)
|
||||
.setAnalysisMethod(analysisMethod)
|
||||
.setAnalysisType(analysisType)
|
||||
.setSamplePercent(samplePercent)
|
||||
.setSampleRows(sampleRows)
|
||||
.setPeriodTimeInMs(periodTimeInMs)
|
||||
.setMaxBucketNum(maxBucketNum)
|
||||
.setMessage(message)
|
||||
.setLastExecTimeInMs(lastExecTimeInMs)
|
||||
.setTimeCostInMs(timeCostInMs)
|
||||
.setState(state)
|
||||
.setScheduleType(scheduleType)
|
||||
.setExternalTableLevelTask(externalTableLevelTask)
|
||||
.setSamplingPartition(samplingPartition)
|
||||
.setPartitionOnly(partitionOnly)
|
||||
.setAllPartition(isAllPartition)
|
||||
.setPartitionCount(partitionCount);
|
||||
}
|
||||
}
|
||||
|
||||
@ -363,7 +363,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
boolean isSync = stmt.isSync();
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
|
||||
if (stmt.isAllColumns()
|
||||
if (!jobInfo.partitionOnly && stmt.isAllColumns()
|
||||
&& StatisticsUtil.isExternalTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName)) {
|
||||
createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
|
||||
}
|
||||
@ -453,7 +453,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
|
||||
// Get the partition granularity statistics that have been collected
|
||||
Map<String, Set<Long>> existColAndPartsForStats = StatisticsRepository
|
||||
Map<String, Set<String>> existColAndPartsForStats = StatisticsRepository
|
||||
.fetchColAndPartsForStats(tableId);
|
||||
|
||||
if (existColAndPartsForStats.isEmpty()) {
|
||||
@ -461,12 +461,12 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
return columnToPartitions;
|
||||
}
|
||||
|
||||
Set<Long> existPartIdsForStats = new HashSet<>();
|
||||
Set<String> existPartIdsForStats = new HashSet<>();
|
||||
existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll);
|
||||
Map<Long, String> idToPartition = StatisticsUtil.getPartitionIdToName(table);
|
||||
Set<String> idToPartition = StatisticsUtil.getPartitionIds(table);
|
||||
// Get an invalid set of partitions (those partitions were deleted)
|
||||
Set<Long> invalidPartIds = existPartIdsForStats.stream()
|
||||
.filter(id -> !idToPartition.containsKey(id)).collect(Collectors.toSet());
|
||||
Set<String> invalidPartIds = existPartIdsForStats.stream()
|
||||
.filter(id -> !idToPartition.contains(id)).collect(Collectors.toSet());
|
||||
|
||||
if (!invalidPartIds.isEmpty()) {
|
||||
// Delete invalid partition statistics to avoid affecting table statistics
|
||||
@ -496,6 +496,8 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
Set<String> partitionNames = stmt.getPartitionNames();
|
||||
boolean partitionOnly = stmt.isPartitionOnly();
|
||||
boolean isSamplingPartition = stmt.isSamplingPartition();
|
||||
boolean isAllPartition = stmt.isAllPartitions();
|
||||
long partitionCount = stmt.getPartitionCount();
|
||||
int samplePercent = stmt.getSamplePercent();
|
||||
int sampleRows = stmt.getSampleRows();
|
||||
AnalysisType analysisType = stmt.getAnalysisType();
|
||||
@ -516,6 +518,8 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
infoBuilder.setPartitionNames(partitionNames);
|
||||
infoBuilder.setPartitionOnly(partitionOnly);
|
||||
infoBuilder.setSamplingPartition(isSamplingPartition);
|
||||
infoBuilder.setAllPartition(isAllPartition);
|
||||
infoBuilder.setPartitionCount(partitionCount);
|
||||
infoBuilder.setJobType(JobType.MANUAL);
|
||||
infoBuilder.setState(AnalysisState.PENDING);
|
||||
infoBuilder.setLastExecTimeInMs(System.currentTimeMillis());
|
||||
|
||||
@ -93,7 +93,7 @@ public class ColumnStatistic {
|
||||
public final Histogram histogram;
|
||||
|
||||
@SerializedName("partitionIdToColStats")
|
||||
public final Map<Long, ColumnStatistic> partitionIdToColStats = new HashMap<>();
|
||||
public final Map<String, ColumnStatistic> partitionIdToColStats = new HashMap<>();
|
||||
|
||||
public final String updatedTime;
|
||||
|
||||
@ -120,7 +120,7 @@ public class ColumnStatistic {
|
||||
}
|
||||
|
||||
public static ColumnStatistic fromResultRow(List<ResultRow> resultRows) {
|
||||
Map<Long, ColumnStatistic> partitionIdToColStats = new HashMap<>();
|
||||
Map<String, ColumnStatistic> partitionIdToColStats = new HashMap<>();
|
||||
ColumnStatistic columnStatistic = null;
|
||||
try {
|
||||
for (ResultRow resultRow : resultRows) {
|
||||
@ -128,7 +128,7 @@ public class ColumnStatistic {
|
||||
if (partId == null) {
|
||||
columnStatistic = fromResultRow(resultRow);
|
||||
} else {
|
||||
partitionIdToColStats.put(Long.parseLong(partId), fromResultRow(resultRow));
|
||||
partitionIdToColStats.put(partId, fromResultRow(resultRow));
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
@ -392,7 +392,7 @@ public class ColumnStatistic {
|
||||
return isUnKnown;
|
||||
}
|
||||
|
||||
public void putPartStats(long partId, ColumnStatistic columnStatistic) {
|
||||
public void putPartStats(String partId, ColumnStatistic columnStatistic) {
|
||||
this.partitionIdToColStats.put(partId, columnStatistic);
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ public class ColumnStatisticBuilder {
|
||||
|
||||
private ColumnStatistic original;
|
||||
|
||||
private Map<Long, ColumnStatistic> partitionIdToColStats = new HashMap<>();
|
||||
private Map<String, ColumnStatistic> partitionIdToColStats = new HashMap<>();
|
||||
|
||||
private String updatedTime;
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -35,10 +35,13 @@ import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class);
|
||||
@ -48,7 +51,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
public static final String NUM_FILES = "numFiles";
|
||||
public static final String TIMESTAMP = "transient_lastDdlTime";
|
||||
|
||||
private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
|
||||
private static final String ANALYZE_TABLE_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
@ -67,10 +70,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
|
||||
private static final String ANALYZE_SQL_PARTITION_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
+ "${tblId} AS tbl_id, "
|
||||
@ -83,22 +84,22 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
+ "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where ";
|
||||
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
|
||||
// cache stats for each partition, it would be inserted into column_statistics in a batch.
|
||||
private final List<List<ColStatsData>> buf = new ArrayList<>();
|
||||
|
||||
private final boolean isTableLevelTask;
|
||||
private final boolean isSamplingPartition;
|
||||
private final boolean isPartitionOnly;
|
||||
private final Set<String> partitionNames;
|
||||
private Set<String> partitionNames;
|
||||
private HMSExternalTable table;
|
||||
|
||||
public HMSAnalysisTask(AnalysisInfo info) {
|
||||
super(info);
|
||||
isTableLevelTask = info.externalTableLevelTask;
|
||||
isSamplingPartition = info.samplingPartition;
|
||||
isPartitionOnly = info.partitionOnly;
|
||||
partitionNames = info.partitionNames;
|
||||
table = (HMSExternalTable) tbl;
|
||||
@ -113,7 +114,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get table row count and insert the result to __internal_schema.table_statistics
|
||||
* Get table row count
|
||||
*/
|
||||
private void getTableStats() throws Exception {
|
||||
Map<String, String> params = buildTableStatsParams(null);
|
||||
@ -147,55 +148,15 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
// 0 AS data_size,
|
||||
// NOW() FROM `hive`.`tpch100`.`region`
|
||||
if (isPartitionOnly) {
|
||||
for (String partId : partitionNames) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
|
||||
sb.append(" where ");
|
||||
String[] splits = partId.split("/");
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
String value = splits[i].split("=")[1];
|
||||
splits[i] = splits[i].replace(value, "\'" + value + "\'");
|
||||
}
|
||||
sb.append(StringUtils.join(splits, " and "));
|
||||
Map<String, String> params = buildTableStatsParams(partId);
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
executeInsertSql(sql);
|
||||
getPartitionNames();
|
||||
List<String> partitionAnalysisSQLs = new ArrayList<>();
|
||||
for (String partId : this.partitionNames) {
|
||||
partitionAnalysisSQLs.add(generateSqlForPartition(partId));
|
||||
}
|
||||
execSQLs(partitionAnalysisSQLs);
|
||||
} else {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
|
||||
if (isSamplingPartition) {
|
||||
sb.append(" where 1=1 ");
|
||||
String[] splitExample = partitionNames.stream().findFirst().get().split("/");
|
||||
int parts = splitExample.length;
|
||||
List<String> partNames = new ArrayList<>();
|
||||
for (String split : splitExample) {
|
||||
partNames.add(split.split("=")[0]);
|
||||
}
|
||||
List<List<String>> valueLists = new ArrayList<>();
|
||||
for (int i = 0; i < parts; i++) {
|
||||
valueLists.add(new ArrayList<>());
|
||||
}
|
||||
for (String partId : partitionNames) {
|
||||
String[] partIds = partId.split("/");
|
||||
for (int i = 0; i < partIds.length; i++) {
|
||||
valueLists.get(i).add("\'" + partIds[i].split("=")[1] + "\'");
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < parts; i++) {
|
||||
sb.append(" and ");
|
||||
sb.append(partNames.get(i));
|
||||
sb.append(" in (");
|
||||
sb.append(StringUtils.join(valueLists.get(i), ","));
|
||||
sb.append(") ");
|
||||
}
|
||||
}
|
||||
sb.append(ANALYZE_TABLE_TEMPLATE);
|
||||
Map<String, String> params = buildTableStatsParams("NULL");
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
@ -208,6 +169,80 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
}
|
||||
|
||||
private void getPartitionNames() {
|
||||
if (partitionNames == null) {
|
||||
if (info.isAllPartition) {
|
||||
partitionNames = table.getPartitionNames();
|
||||
} else if (info.partitionCount > 0) {
|
||||
partitionNames = table.getPartitionNames().stream()
|
||||
.limit(info.partitionCount).collect(Collectors.toSet());
|
||||
}
|
||||
if (partitionNames == null || partitionNames.isEmpty()) {
|
||||
throw new RuntimeException("Not a partition table or no partition specified.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String generateSqlForPartition(String partId) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_PARTITION_TEMPLATE);
|
||||
String[] splits = partId.split("/");
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
String[] kv = splits[i].split("=");
|
||||
sb.append(kv[0]);
|
||||
sb.append("='");
|
||||
sb.append(kv[1]);
|
||||
sb.append("'");
|
||||
if (i < splits.length - 1) {
|
||||
sb.append(" and ");
|
||||
}
|
||||
}
|
||||
Map<String, String> params = buildTableStatsParams(partId);
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
return new StringSubstitutor(params).replace(sb.toString());
|
||||
}
|
||||
|
||||
public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
LOG.debug("analyze task {} start at {}", info.toString(), new Date());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
List<List<String>> sqlGroups = Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT);
|
||||
for (List<String> group : sqlGroups) {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
StringJoiner partitionCollectSQL = new StringJoiner("UNION ALL");
|
||||
group.forEach(partitionCollectSQL::add);
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, partitionCollectSQL.toString());
|
||||
buf.add(stmtExecutor.executeInternalQuery()
|
||||
.stream().map(ColStatsData::new).collect(Collectors.toList()));
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
|
||||
info.catalogName, info.dbName, info.colName, partitionCollectSQL,
|
||||
queryState.getErrorMessage()));
|
||||
}
|
||||
}
|
||||
for (List<ColStatsData> colStatsDataList : buf) {
|
||||
StringBuilder batchInsertSQL =
|
||||
new StringBuilder("INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
|
||||
+ " VALUES ");
|
||||
StringJoiner sj = new StringJoiner(",");
|
||||
colStatsDataList.forEach(c -> sj.add(c.toSQL(true)));
|
||||
batchInsertSQL.append(sj);
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, batchInsertSQL.toString());
|
||||
executeWithExceptionOnFail(stmtExecutor);
|
||||
}
|
||||
} finally {
|
||||
LOG.debug("analyze task {} end. cost {}ms", info, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void executeInsertSql(String sql) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
@ -270,7 +305,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
@Override
|
||||
protected void afterExecution() {
|
||||
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
|
||||
if (isTableLevelTask) {
|
||||
// Partition only task doesn't need to refresh cached.
|
||||
if (isTableLevelTask || isPartitionOnly) {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
|
||||
|
||||
@ -288,7 +288,7 @@ public class StatisticsCache {
|
||||
StatsId statsId = new StatsId(r);
|
||||
long tblId = statsId.tblId;
|
||||
long idxId = statsId.idxId;
|
||||
long partId = statsId.partId;
|
||||
String partId = statsId.partId;
|
||||
String colId = statsId.colId;
|
||||
ColumnStatistic partStats = ColumnStatistic.fromResultRow(r);
|
||||
keyToColStats.get(new StatisticsCacheKey(tblId, idxId, colId)).putPartStats(partId, partStats);
|
||||
|
||||
@ -228,11 +228,11 @@ public class StatisticsCleaner extends MasterDaemon {
|
||||
continue;
|
||||
}
|
||||
OlapTable olapTable = (OlapTable) t;
|
||||
Long partId = statsId.partId;
|
||||
String partId = statsId.partId;
|
||||
if (partId == null) {
|
||||
continue;
|
||||
}
|
||||
if (!olapTable.getPartitionIds().contains(partId)) {
|
||||
if (!olapTable.getPartitionIds().contains(Long.parseLong(partId))) {
|
||||
expiredStats.ids.add(id);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -179,7 +179,7 @@ public class StatisticsRepository {
|
||||
return stringJoiner.toString();
|
||||
}
|
||||
|
||||
public static void dropStatistics(Set<Long> partIds) throws DdlException {
|
||||
public static void dropStatistics(Set<String> partIds) throws DdlException {
|
||||
dropStatisticsByPartId(partIds, StatisticConstants.STATISTIC_TBL_NAME);
|
||||
}
|
||||
|
||||
@ -202,7 +202,7 @@ public class StatisticsRepository {
|
||||
}
|
||||
}
|
||||
|
||||
public static void dropStatisticsByPartId(Set<Long> partIds, String statsTblName) throws DdlException {
|
||||
public static void dropStatisticsByPartId(Set<String> partIds, String statsTblName) throws DdlException {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
String right = StatisticsUtil.joinElementsToString(partIds, ",");
|
||||
String inPredicate = String.format(" part_id IN (%s)", right);
|
||||
@ -296,14 +296,14 @@ public class StatisticsRepository {
|
||||
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(FETCH_STATS_FULL_NAME));
|
||||
}
|
||||
|
||||
public static Map<String, Set<Long>> fetchColAndPartsForStats(long tblId) {
|
||||
public static Map<String, Set<String>> fetchColAndPartsForStats(long tblId) {
|
||||
Map<String, String> params = Maps.newHashMap();
|
||||
params.put("tblId", String.valueOf(tblId));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String partSql = stringSubstitutor.replace(FETCH_STATS_PART_ID);
|
||||
List<ResultRow> resultRows = StatisticsUtil.execStatisticQuery(partSql);
|
||||
|
||||
Map<String, Set<Long>> columnToPartitions = Maps.newHashMap();
|
||||
Map<String, Set<String>> columnToPartitions = Maps.newHashMap();
|
||||
|
||||
resultRows.forEach(row -> {
|
||||
try {
|
||||
|
||||
@ -32,7 +32,7 @@ public class StatsId {
|
||||
public final String colId;
|
||||
|
||||
// nullable
|
||||
public final Long partId;
|
||||
public final String partId;
|
||||
|
||||
public StatsId(ResultRow row) {
|
||||
this.id = row.get(0);
|
||||
@ -41,7 +41,7 @@ public class StatsId {
|
||||
this.tblId = Long.parseLong(row.get(3));
|
||||
this.idxId = Long.parseLong(row.get(4));
|
||||
this.colId = row.get(5);
|
||||
this.partId = row.get(6) == null ? null : Long.parseLong(row.get(6));
|
||||
this.partId = row.get(6);
|
||||
}
|
||||
|
||||
public String toSQL() {
|
||||
@ -51,8 +51,8 @@ public class StatsId {
|
||||
sj.add(String.valueOf(dbId));
|
||||
sj.add(String.valueOf(tblId));
|
||||
sj.add(String.valueOf(idxId));
|
||||
sj.add(StatisticsUtil.quote(String.valueOf(colId)));
|
||||
sj.add(String.valueOf(partId));
|
||||
sj.add(StatisticsUtil.quote(colId));
|
||||
sj.add(StatisticsUtil.quote(partId));
|
||||
return sj.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -103,6 +103,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
@ -439,6 +440,15 @@ public class StatisticsUtil {
|
||||
));
|
||||
}
|
||||
|
||||
public static Set<String> getPartitionIds(TableIf table) {
|
||||
if (table instanceof OlapTable) {
|
||||
return ((OlapTable) table).getPartitionIds().stream().map(String::valueOf).collect(Collectors.toSet());
|
||||
} else if (table instanceof ExternalTable) {
|
||||
return table.getPartitionNames();
|
||||
}
|
||||
throw new RuntimeException(String.format("Not supported Table %s", table.getClass().getName()));
|
||||
}
|
||||
|
||||
public static <T> String joinElementsToString(Collection<T> values, String delimiter) {
|
||||
StringJoiner builder = new StringJoiner(delimiter);
|
||||
values.forEach(v -> builder.add(String.valueOf(v)));
|
||||
@ -512,7 +522,11 @@ public class StatisticsUtil {
|
||||
}
|
||||
// Table parameters contains row count, simply get and return it.
|
||||
if (parameters.containsKey(NUM_ROWS)) {
|
||||
return Long.parseLong(parameters.get(NUM_ROWS));
|
||||
long rows = Long.parseLong(parameters.get(NUM_ROWS));
|
||||
// Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0.
|
||||
if (rows != 0) {
|
||||
return rows;
|
||||
}
|
||||
}
|
||||
if (!parameters.containsKey(TOTAL_SIZE) || isInit) {
|
||||
return -1;
|
||||
|
||||
@ -381,6 +381,7 @@ import org.apache.doris.qe.SqlModeHelper;
|
||||
keywordMap.put("read", new Integer(SqlParserSymbols.KW_READ));
|
||||
keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE));
|
||||
keywordMap.put("rebalance", new Integer(SqlParserSymbols.KW_REBALANCE));
|
||||
keywordMap.put("recent", new Integer(SqlParserSymbols.KW_RECENT));
|
||||
keywordMap.put("recover", new Integer(SqlParserSymbols.KW_RECOVER));
|
||||
keywordMap.put("recycle", new Integer(SqlParserSymbols.KW_RECYCLE));
|
||||
keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH));
|
||||
|
||||
Reference in New Issue
Block a user