[fix](stats) fix auto collector always create sample job no matter the table size (#26968)
This commit is contained in:
@ -187,6 +187,12 @@ public class AnalysisInfo implements Writable {
|
||||
|
||||
@SerializedName("endTime")
|
||||
public long endTime;
|
||||
/**
|
||||
*
|
||||
* Used to store the newest partition version of tbl when creating this job.
|
||||
* This variables would be saved by table stats meta.
|
||||
*/
|
||||
public final long tblUpdateTime;
|
||||
|
||||
public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId, long dbId, long tblId,
|
||||
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
|
||||
@ -195,7 +201,7 @@ public class AnalysisInfo implements Writable {
|
||||
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
|
||||
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
|
||||
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
|
||||
boolean usingSqlForPartitionColumn) {
|
||||
boolean usingSqlForPartitionColumn, long tblUpdateTime) {
|
||||
this.jobId = jobId;
|
||||
this.taskId = taskId;
|
||||
this.taskIds = taskIds;
|
||||
@ -230,6 +236,7 @@ public class AnalysisInfo implements Writable {
|
||||
}
|
||||
this.forceFull = forceFull;
|
||||
this.usingSqlForPartitionColumn = usingSqlForPartitionColumn;
|
||||
this.tblUpdateTime = tblUpdateTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -62,6 +62,8 @@ public class AnalysisInfoBuilder {
|
||||
private boolean forceFull;
|
||||
private boolean usingSqlForPartitionColumn;
|
||||
|
||||
private long tblUpdateTime;
|
||||
|
||||
public AnalysisInfoBuilder() {
|
||||
}
|
||||
|
||||
@ -97,6 +99,7 @@ public class AnalysisInfoBuilder {
|
||||
cronExpression = info.cronExpression;
|
||||
forceFull = info.forceFull;
|
||||
usingSqlForPartitionColumn = info.usingSqlForPartitionColumn;
|
||||
tblUpdateTime = info.tblUpdateTime;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setJobId(long jobId) {
|
||||
@ -254,45 +257,17 @@ public class AnalysisInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setTblUpdateTime(long tblUpdateTime) {
|
||||
this.tblUpdateTime = tblUpdateTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfo build() {
|
||||
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames,
|
||||
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
|
||||
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
|
||||
cronExpression, forceFull, usingSqlForPartitionColumn);
|
||||
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime);
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder copy() {
|
||||
return new AnalysisInfoBuilder()
|
||||
.setJobId(jobId)
|
||||
.setTaskId(taskId)
|
||||
.setTaskIds(taskIds)
|
||||
.setCatalogId(catalogId)
|
||||
.setDBId(dbId)
|
||||
.setTblId(tblId)
|
||||
.setColToPartitions(colToPartitions)
|
||||
.setColName(colName)
|
||||
.setIndexId(indexId)
|
||||
.setJobType(jobType)
|
||||
.setAnalysisMode(analysisMode)
|
||||
.setAnalysisMethod(analysisMethod)
|
||||
.setAnalysisType(analysisType)
|
||||
.setSamplePercent(samplePercent)
|
||||
.setSampleRows(sampleRows)
|
||||
.setPeriodTimeInMs(periodTimeInMs)
|
||||
.setMaxBucketNum(maxBucketNum)
|
||||
.setMessage(message)
|
||||
.setLastExecTimeInMs(lastExecTimeInMs)
|
||||
.setTimeCostInMs(timeCostInMs)
|
||||
.setState(state)
|
||||
.setScheduleType(scheduleType)
|
||||
.setExternalTableLevelTask(externalTableLevelTask)
|
||||
.setSamplingPartition(samplingPartition)
|
||||
.setPartitionOnly(partitionOnly)
|
||||
.setAllPartition(isAllPartition)
|
||||
.setPartitionCount(partitionCount)
|
||||
.setCronExpression(cronExpression)
|
||||
.setForceFull(forceFull)
|
||||
.setUsingSqlForPartitionColumn(usingSqlForPartitionColumn);
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,7 +29,6 @@ import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.View;
|
||||
@ -505,7 +504,7 @@ public class AnalysisManager implements Writable {
|
||||
partitionNames, analysisType);
|
||||
infoBuilder.setColToPartitions(colToPartitions);
|
||||
infoBuilder.setTaskIds(Lists.newArrayList());
|
||||
|
||||
infoBuilder.setTblUpdateTime(table.getUpdateTime());
|
||||
return infoBuilder.build();
|
||||
}
|
||||
|
||||
@ -601,9 +600,9 @@ public class AnalysisManager implements Writable {
|
||||
}
|
||||
TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
|
||||
if (tableStats == null) {
|
||||
updateTableStatsStatus(new TableStatsMeta(tbl.getId(), tbl.estimatedRowCount(), jobInfo));
|
||||
updateTableStatsStatus(new TableStatsMeta(tbl.estimatedRowCount(), jobInfo, tbl));
|
||||
} else {
|
||||
tableStats.updateByJob(jobInfo);
|
||||
tableStats.update(jobInfo, tbl);
|
||||
logCreateTableStats(tableStats);
|
||||
}
|
||||
|
||||
@ -1005,21 +1004,6 @@ public class AnalysisManager implements Writable {
|
||||
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
|
||||
TableStatsMeta tableStats = findTableStatsStatus(table.getId());
|
||||
if (tableStats == null) {
|
||||
return table.getPartitionNames().stream().map(table::getPartition)
|
||||
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
|
||||
}
|
||||
return table.getPartitionNames().stream()
|
||||
.map(table::getPartition)
|
||||
.filter(Partition::hasData)
|
||||
.filter(partition ->
|
||||
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
protected void logAutoJob(AnalysisInfo autoJob) {
|
||||
Env.getCurrentEnv().getEditLog().logAutoJob(autoJob);
|
||||
}
|
||||
|
||||
@ -81,7 +81,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
String rowCount = columnResult.get(0).get(0);
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTableStatsStatus(
|
||||
new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
|
||||
new TableStatsMeta(Long.parseLong(rowCount), info, tbl));
|
||||
job.rowCountDone(this);
|
||||
}
|
||||
|
||||
|
||||
@ -76,7 +76,7 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE));
|
||||
String rowCount = columnResult.get(0).get(0);
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTableStatsStatus(new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
|
||||
.updateTableStatsStatus(new TableStatsMeta(Long.parseLong(rowCount), info, table));
|
||||
job.rowCountDone(this);
|
||||
}
|
||||
|
||||
|
||||
@ -142,12 +142,15 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
|
||||
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
|
||||
.setAnalysisMethod(analysisMethod)
|
||||
.setSampleRows(StatisticsUtil.getHugeTableSampleRows())
|
||||
.setSampleRows(analysisMethod.equals(AnalysisMethod.SAMPLE)
|
||||
? StatisticsUtil.getHugeTableSampleRows() : -1)
|
||||
.setScheduleType(ScheduleType.AUTOMATIC)
|
||||
.setState(AnalysisState.PENDING)
|
||||
.setTaskIds(new ArrayList<>())
|
||||
.setLastExecTimeInMs(System.currentTimeMillis())
|
||||
.setJobType(JobType.SYSTEM).build();
|
||||
.setJobType(JobType.SYSTEM)
|
||||
.setTblUpdateTime(table.getUpdateTime())
|
||||
.build();
|
||||
analysisInfos.add(jobInfo);
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
@ -64,11 +66,11 @@ public class TableStatsMeta implements Writable {
|
||||
|
||||
// It's necessary to store these fields separately from AnalysisInfo, since the lifecycle between AnalysisInfo
|
||||
// and TableStats is quite different.
|
||||
public TableStatsMeta(long tblId, long rowCount, AnalysisInfo analyzedJob) {
|
||||
this.tblId = tblId;
|
||||
public TableStatsMeta(long rowCount, AnalysisInfo analyzedJob, TableIf table) {
|
||||
this.tblId = table.getId();
|
||||
this.idxId = -1;
|
||||
this.rowCount = rowCount;
|
||||
updateByJob(analyzedJob);
|
||||
update(analyzedJob, table);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -112,8 +114,8 @@ public class TableStatsMeta implements Writable {
|
||||
colNameToColStatsMeta.values().forEach(ColStatsMeta::clear);
|
||||
}
|
||||
|
||||
public void updateByJob(AnalysisInfo analyzedJob) {
|
||||
updatedTime = System.currentTimeMillis();
|
||||
public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
|
||||
updatedTime = analyzedJob.tblUpdateTime;
|
||||
String colNameStr = analyzedJob.colName;
|
||||
// colName field AnalyzeJob's format likes: "[col1, col2]", we need to remove brackets here
|
||||
// TODO: Refactor this later
|
||||
@ -133,5 +135,10 @@ public class TableStatsMeta implements Writable {
|
||||
}
|
||||
}
|
||||
jobType = analyzedJob.jobType;
|
||||
if (tableIf != null && analyzedJob.colToPartitions.keySet()
|
||||
.containsAll(tableIf.getBaseSchema().stream().map(Column::getName).collect(
|
||||
Collectors.toSet()))) {
|
||||
updatedRows.set(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user