[enchancement](statistics) support periodic collection of statistics (#19247)
This PR enables periodic collection of statistics and is a precursor to automatic statistics collection. It mainly includes the following contents: support periodic collection of statistics. Change the type of Date in statistics p0 to DateV2(see [Enhancement](data-type) add FE config to prohibit create date and decimalv2 type #19077) for test locally. complement cases(remove Chinese characters, optimize code, etc) , improve stability. Supports setting whether to keep records of statistics synchronization job info, convenient for use in p0 testing. The statistics job table was modified, and some auxiliary judgments were added to avoid the user perceiving the modification. This function was removed when the table schema is stable.
This commit is contained in:
@ -35,7 +35,9 @@ import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMode;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
@ -45,6 +47,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -85,6 +88,7 @@ public class AnalyzeStmt extends DdlStmt {
|
||||
public static final String PROPERTY_SAMPLE_ROWS = "sample.rows";
|
||||
public static final String PROPERTY_NUM_BUCKETS = "num.buckets";
|
||||
public static final String PROPERTY_ANALYSIS_TYPE = "analysis.type";
|
||||
public static final String PROPERTY_PERIOD_SECONDS = "period.seconds";
|
||||
|
||||
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
|
||||
.add(PROPERTY_SYNC)
|
||||
@ -93,6 +97,7 @@ public class AnalyzeStmt extends DdlStmt {
|
||||
.add(PROPERTY_SAMPLE_ROWS)
|
||||
.add(PROPERTY_NUM_BUCKETS)
|
||||
.add(PROPERTY_ANALYSIS_TYPE)
|
||||
.add(PROPERTY_PERIOD_SECONDS)
|
||||
.build();
|
||||
|
||||
private final TableName tableName;
|
||||
@ -232,6 +237,11 @@ public class AnalyzeStmt extends DdlStmt {
|
||||
1, Integer.MAX_VALUE, true, "needs at least 1 buckets");
|
||||
}
|
||||
|
||||
if (properties.containsKey(PROPERTY_PERIOD_SECONDS)) {
|
||||
checkNumericProperty(PROPERTY_PERIOD_SECONDS, properties.get(PROPERTY_PERIOD_SECONDS),
|
||||
1, Integer.MAX_VALUE, true, "needs at least 1 seconds");
|
||||
}
|
||||
|
||||
if (properties.containsKey(PROPERTY_ANALYSIS_TYPE)) {
|
||||
try {
|
||||
AnalysisType.valueOf(properties.get(PROPERTY_ANALYSIS_TYPE));
|
||||
@ -328,15 +338,30 @@ public class AnalyzeStmt extends DdlStmt {
|
||||
return Integer.parseInt(properties.get(PROPERTY_NUM_BUCKETS));
|
||||
}
|
||||
|
||||
public long getPeriodTimeInMs() {
|
||||
if (!properties.containsKey(PROPERTY_PERIOD_SECONDS)) {
|
||||
return 0;
|
||||
}
|
||||
int minutes = Integer.parseInt(properties.get(PROPERTY_PERIOD_SECONDS));
|
||||
return TimeUnit.SECONDS.toMillis(minutes);
|
||||
}
|
||||
|
||||
public AnalysisMode getAnalysisMode() {
|
||||
return isIncremental() ? AnalysisMode.INCREMENTAL : AnalysisMode.FULL;
|
||||
}
|
||||
|
||||
public AnalysisType getAnalysisType() {
|
||||
return AnalysisType.valueOf(properties.get(PROPERTY_ANALYSIS_TYPE));
|
||||
}
|
||||
|
||||
public AnalysisMethod getAnalysisMethod() {
|
||||
if (getSamplePercent() > 0 || getSampleRows() > 0) {
|
||||
return AnalysisMethod.SAMPLE;
|
||||
}
|
||||
return AnalysisMethod.FULL;
|
||||
double samplePercent = getSamplePercent();
|
||||
int sampleRows = getSampleRows();
|
||||
return (samplePercent > 0 || sampleRows > 0) ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
|
||||
}
|
||||
|
||||
public ScheduleType getScheduleType() {
|
||||
return getPeriodTimeInMs() > 0 ? ScheduleType.PERIOD : ScheduleType.ONCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -214,6 +214,7 @@ import org.apache.doris.resource.resourcegroup.ResourceGroupMgr;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.statistics.AnalysisManager;
|
||||
import org.apache.doris.statistics.AnalysisTaskScheduler;
|
||||
import org.apache.doris.statistics.StatisticsAutoAnalyzer;
|
||||
import org.apache.doris.statistics.StatisticsCache;
|
||||
import org.apache.doris.statistics.StatisticsCleaner;
|
||||
import org.apache.doris.system.Backend;
|
||||
@ -452,6 +453,8 @@ public class Env {
|
||||
|
||||
private StatisticsCleaner statisticsCleaner;
|
||||
|
||||
private StatisticsAutoAnalyzer statisticsAutoAnalyzer;
|
||||
|
||||
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
|
||||
if (nodeType == null) {
|
||||
// get all
|
||||
@ -653,6 +656,7 @@ public class Env {
|
||||
if (Config.enable_stats && !isCheckpointCatalog) {
|
||||
this.analysisManager = new AnalysisManager();
|
||||
this.statisticsCleaner = new StatisticsCleaner();
|
||||
this.statisticsAutoAnalyzer = new StatisticsAutoAnalyzer();
|
||||
}
|
||||
this.globalFunctionMgr = new GlobalFunctionMgr();
|
||||
this.resourceGroupMgr = new ResourceGroupMgr();
|
||||
@ -880,6 +884,9 @@ public class Env {
|
||||
if (statisticsCleaner != null) {
|
||||
statisticsCleaner.start();
|
||||
}
|
||||
if (statisticsAutoAnalyzer != null) {
|
||||
statisticsAutoAnalyzer.start();
|
||||
}
|
||||
}
|
||||
|
||||
// wait until FE is ready.
|
||||
@ -5379,4 +5386,8 @@ public class Env {
|
||||
public StatisticsCleaner getStatisticsCleaner() {
|
||||
return statisticsCleaner;
|
||||
}
|
||||
|
||||
public StatisticsAutoAnalyzer getStatisticsAutoAnalyzer() {
|
||||
return statisticsAutoAnalyzer;
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.analysis.ColumnDef;
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.analysis.DistributionDesc;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.analysis.HashDistributionDesc;
|
||||
import org.apache.doris.analysis.KeysDesc;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
@ -41,10 +42,12 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class InternalSchemaInitializer extends Thread {
|
||||
|
||||
@ -55,6 +58,12 @@ public class InternalSchemaInitializer extends Thread {
|
||||
*/
|
||||
public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 5;
|
||||
|
||||
/**
|
||||
* Used when an internal table schema changes.
|
||||
* TODO remove this code after the table structure is stable
|
||||
*/
|
||||
private boolean isSchemaChanged = false;
|
||||
|
||||
public void run() {
|
||||
if (FeConstants.disableInternalSchemaDb) {
|
||||
return;
|
||||
@ -183,12 +192,26 @@ public class InternalSchemaInitializer extends Thread {
|
||||
columnDefs.add(new ColumnDef("tbl_name", TypeDef.createVarchar(1024)));
|
||||
columnDefs.add(new ColumnDef("col_name", TypeDef.createVarchar(1024)));
|
||||
columnDefs.add(new ColumnDef("index_id", TypeDef.create(PrimitiveType.BIGINT)));
|
||||
columnDefs.add(new ColumnDef("col_partitions", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH)));
|
||||
columnDefs.add(new ColumnDef("job_type", TypeDef.createVarchar(32)));
|
||||
columnDefs.add(new ColumnDef("analysis_type", TypeDef.createVarchar(32)));
|
||||
columnDefs.add(new ColumnDef("message", TypeDef.createVarchar(1024)));
|
||||
columnDefs.add(new ColumnDef("last_exec_time_in_ms", TypeDef.create(PrimitiveType.BIGINT)));
|
||||
columnDefs.add(new ColumnDef("state", TypeDef.createVarchar(32)));
|
||||
columnDefs.add(new ColumnDef("analysis_mode", TypeDef.createVarchar(32)));
|
||||
columnDefs.add(new ColumnDef("analysis_method", TypeDef.createVarchar(32)));
|
||||
columnDefs.add(new ColumnDef("schedule_type", TypeDef.createVarchar(32)));
|
||||
columnDefs.add(new ColumnDef("state", TypeDef.createVarchar(32)));
|
||||
columnDefs.add(new ColumnDef("sample_percent", TypeDef.create(PrimitiveType.BIGINT)));
|
||||
columnDefs.add(new ColumnDef("sample_rows", TypeDef.create(PrimitiveType.BIGINT)));
|
||||
columnDefs.add(new ColumnDef("max_bucket_num", TypeDef.create(PrimitiveType.BIGINT)));
|
||||
columnDefs.add(new ColumnDef("period_time_in_ms", TypeDef.create(PrimitiveType.BIGINT)));
|
||||
columnDefs.add(new ColumnDef("last_exec_time_in_ms", TypeDef.create(PrimitiveType.BIGINT)));
|
||||
columnDefs.add(new ColumnDef("message", TypeDef.createVarchar(1024)));
|
||||
// TODO remove this code after the table structure is stable
|
||||
if (!isSchemaChanged && isTableChanged(tableName, columnDefs)) {
|
||||
isSchemaChanged = true;
|
||||
DropTableStmt dropTableStmt = new DropTableStmt(true, tableName, true);
|
||||
StatisticsUtil.analyze(dropTableStmt);
|
||||
Env.getCurrentEnv().getInternalCatalog().dropTable(dropTableStmt);
|
||||
}
|
||||
String engineName = "olap";
|
||||
ArrayList<String> uniqueKeys = Lists.newArrayList("job_id", "task_id",
|
||||
"catalog_name", "db_name", "tbl_name", "col_name", "index_id");
|
||||
@ -218,9 +241,51 @@ public class InternalSchemaInitializer extends Thread {
|
||||
return false;
|
||||
}
|
||||
Database db = optionalDatabase.get();
|
||||
return db.getTable(StatisticConstants.STATISTIC_TBL_NAME).isPresent()
|
||||
// TODO remove this code after the table structure is stable
|
||||
try {
|
||||
buildAnalysisJobTblStmt();
|
||||
} catch (UserException ignored) {
|
||||
// CHECKSTYLE IGNORE THIS LINE
|
||||
}
|
||||
return !isSchemaChanged
|
||||
&& db.getTable(StatisticConstants.STATISTIC_TBL_NAME).isPresent()
|
||||
&& db.getTable(StatisticConstants.HISTOGRAM_TBL_NAME).isPresent()
|
||||
&& db.getTable(StatisticConstants.ANALYSIS_JOB_TABLE).isPresent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare whether the current internal table schema meets expectations,
|
||||
* delete and rebuild if it does not meet the table schema.
|
||||
* TODO remove this code after the table structure is stable
|
||||
*/
|
||||
private boolean isTableChanged(TableName tableName, List<ColumnDef> columnDefs) {
|
||||
try {
|
||||
String catalogName = Env.getCurrentEnv().getInternalCatalog().getName();
|
||||
String dbName = SystemInfoService.DEFAULT_CLUSTER + ":" + tableName.getDb();
|
||||
TableIf table = StatisticsUtil.findTable(catalogName, dbName, tableName.getTbl());
|
||||
List<Column> existColumns = table.getBaseSchema(false);
|
||||
existColumns.sort(Comparator.comparing(Column::getName));
|
||||
List<Column> columns = columnDefs.stream()
|
||||
.map(ColumnDef::toColumn)
|
||||
.sorted(Comparator.comparing(Column::getName))
|
||||
.collect(Collectors.toList());
|
||||
if (columns.size() != existColumns.size()) {
|
||||
return true;
|
||||
}
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
Column c1 = columns.get(i);
|
||||
Column c2 = existColumns.get(i);
|
||||
if (!c1.getName().equals(c2.getName())
|
||||
|| c1.getDataType() != c2.getDataType()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to check table schema", t);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@ -257,6 +257,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String ENABLE_CBO_STATISTICS = "enable_cbo_statistics";
|
||||
|
||||
public static final String ENABLE_SAVE_STATISTICS_SYNC_JOB = "enable_save_statistics_sync_job";
|
||||
|
||||
public static final String ENABLE_ELIMINATE_SORT_NODE = "enable_eliminate_sort_node";
|
||||
|
||||
public static final String NEREIDS_TRACE_EVENT_MODE = "nereids_trace_event_mode";
|
||||
@ -731,6 +733,13 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_CBO_STATISTICS)
|
||||
public boolean enableCboStatistics = false;
|
||||
|
||||
/**
|
||||
* If true, when synchronously collecting statistics, the information of
|
||||
* the statistics job will be saved, currently mainly used for p0 test
|
||||
*/
|
||||
@VariableMgr.VarAttr(name = ENABLE_SAVE_STATISTICS_SYNC_JOB)
|
||||
public boolean enableSaveStatisticsSyncJob = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_ELIMINATE_SORT_NODE)
|
||||
public boolean enableEliminateSortNode = true;
|
||||
|
||||
@ -1411,6 +1420,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return enableCboStatistics;
|
||||
}
|
||||
|
||||
public boolean isEnableSaveStatisticsSyncJob() {
|
||||
return enableSaveStatisticsSyncJob;
|
||||
}
|
||||
|
||||
public long getFileSplitSize() {
|
||||
return fileSplitSize;
|
||||
}
|
||||
|
||||
@ -36,6 +36,7 @@ import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ShowResultSet;
|
||||
import org.apache.doris.qe.ShowResultSetMetaData;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMode;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;
|
||||
@ -52,6 +53,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@ -108,31 +110,47 @@ public class AnalysisManager {
|
||||
throw new DdlException("Stats table not available, please make sure your cluster status is normal");
|
||||
}
|
||||
|
||||
Map<String, Set<String>> columnToPartitions = validateAndGetPartitions(stmt);
|
||||
if (columnToPartitions.isEmpty()) {
|
||||
AnalysisTaskInfo jobInfo = buildAnalysisJobInfo(stmt);
|
||||
if (jobInfo.colToPartitions.isEmpty()) {
|
||||
// No statistics need to be collected or updated
|
||||
return;
|
||||
}
|
||||
|
||||
long jobId = Env.getCurrentEnv().getNextId();
|
||||
TableIf table = stmt.getTable();
|
||||
AnalysisType analysisType = stmt.getAnalysisType();
|
||||
boolean isSync = stmt.isSync();
|
||||
|
||||
AnalysisTaskInfoBuilder taskInfoBuilder = buildCommonTaskInfo(stmt, jobId);
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
createTaskForEachColumns(columnToPartitions, taskInfoBuilder, analysisTaskInfos, analysisType, isSync);
|
||||
createTaskForMVIdx(table, taskInfoBuilder, analysisTaskInfos, analysisType, isSync);
|
||||
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
|
||||
createTaskForMVIdx(jobInfo, analysisTaskInfos, isSync);
|
||||
|
||||
if (stmt.isSync()) {
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
if (!isSync || ctx.getSessionVariable().enableSaveStatisticsSyncJob) {
|
||||
persistAnalysisJob(jobInfo);
|
||||
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
|
||||
}
|
||||
|
||||
if (isSync) {
|
||||
syncExecute(analysisTaskInfos.values());
|
||||
return;
|
||||
}
|
||||
|
||||
persistAnalysisJob(taskInfoBuilder);
|
||||
analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
|
||||
analysisTaskInfos.values().forEach(taskScheduler::schedule);
|
||||
sendJobId(jobId);
|
||||
sendJobId(jobInfo.jobId);
|
||||
}
|
||||
|
||||
// Analysis job created by the system
|
||||
public void createAnalysisJob(AnalysisTaskInfo info) throws DdlException {
|
||||
AnalysisTaskInfo jobInfo = buildAnalysisJobInfo(info);
|
||||
if (jobInfo.colToPartitions.isEmpty()) {
|
||||
// No statistics need to be collected or updated
|
||||
return;
|
||||
}
|
||||
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
createTaskForEachColumns(jobInfo, analysisTaskInfos, false);
|
||||
createTaskForMVIdx(jobInfo, analysisTaskInfos, false);
|
||||
|
||||
persistAnalysisJob(jobInfo);
|
||||
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
|
||||
analysisTaskInfos.values().forEach(taskScheduler::schedule);
|
||||
}
|
||||
|
||||
private void sendJobId(long jobId) {
|
||||
@ -165,10 +183,9 @@ public class AnalysisManager {
|
||||
* <p>
|
||||
* TODO Supports incremental collection of statistics from materialized views
|
||||
*/
|
||||
private Map<String, Set<String>> validateAndGetPartitions(AnalyzeStmt stmt) throws DdlException {
|
||||
TableIf table = stmt.getTable();
|
||||
private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames,
|
||||
AnalysisType analysisType, AnalysisMode analysisMode) throws DdlException {
|
||||
long tableId = table.getId();
|
||||
Set<String> columnNames = stmt.getColumnNames();
|
||||
Set<String> partitionNames = table.getPartitionNames();
|
||||
|
||||
Map<String, Set<String>> columnToPartitions = columnNames.stream()
|
||||
@ -177,7 +194,7 @@ public class AnalysisManager {
|
||||
columnName -> new HashSet<>(partitionNames)
|
||||
));
|
||||
|
||||
if (stmt.getAnalysisType() == AnalysisType.HISTOGRAM) {
|
||||
if (analysisType == AnalysisType.HISTOGRAM) {
|
||||
// Collecting histograms does not need to support incremental collection,
|
||||
// and will automatically cover historical statistics
|
||||
return columnToPartitions;
|
||||
@ -204,7 +221,7 @@ public class AnalysisManager {
|
||||
StatisticsRepository.dropStatistics(invalidPartIds);
|
||||
}
|
||||
|
||||
if (stmt.isIncremental() && stmt.getAnalysisType() == AnalysisType.COLUMN) {
|
||||
if (analysisMode == AnalysisMode.INCREMENTAL && analysisType == AnalysisType.COLUMN) {
|
||||
existColAndPartsForStats.values().forEach(partIds -> partIds.removeAll(invalidPartIds));
|
||||
// In incremental collection mode, just collect the uncollected partition statistics
|
||||
existColAndPartsForStats.forEach((columnName, partitionIds) -> {
|
||||
@ -226,17 +243,22 @@ public class AnalysisManager {
|
||||
return columnToPartitions;
|
||||
}
|
||||
|
||||
private AnalysisTaskInfoBuilder buildCommonTaskInfo(AnalyzeStmt stmt, long jobId) {
|
||||
private AnalysisTaskInfo buildAnalysisJobInfo(AnalyzeStmt stmt) throws DdlException {
|
||||
AnalysisTaskInfoBuilder taskInfoBuilder = new AnalysisTaskInfoBuilder();
|
||||
long jobId = Env.getCurrentEnv().getNextId();
|
||||
String catalogName = stmt.getCatalogName();
|
||||
String db = stmt.getDBName();
|
||||
TableName tbl = stmt.getTblName();
|
||||
StatisticsUtil.convertTableNameToObjects(tbl);
|
||||
String tblName = tbl.getTbl();
|
||||
TableIf table = stmt.getTable();
|
||||
Set<String> columnNames = stmt.getColumnNames();
|
||||
int samplePercent = stmt.getSamplePercent();
|
||||
int sampleRows = stmt.getSampleRows();
|
||||
AnalysisType analysisType = stmt.getAnalysisType();
|
||||
AnalysisMode analysisMode = stmt.getAnalysisMode();
|
||||
AnalysisMethod analysisMethod = stmt.getAnalysisMethod();
|
||||
ScheduleType scheduleType = stmt.getScheduleType();
|
||||
|
||||
taskInfoBuilder.setJobId(jobId);
|
||||
taskInfoBuilder.setCatalogName(catalogName);
|
||||
@ -244,32 +266,71 @@ public class AnalysisManager {
|
||||
taskInfoBuilder.setTblName(tblName);
|
||||
taskInfoBuilder.setJobType(JobType.MANUAL);
|
||||
taskInfoBuilder.setState(AnalysisState.PENDING);
|
||||
taskInfoBuilder.setScheduleType(ScheduleType.ONCE);
|
||||
taskInfoBuilder.setAnalysisType(analysisType);
|
||||
taskInfoBuilder.setAnalysisMode(analysisMode);
|
||||
taskInfoBuilder.setAnalysisMethod(analysisMethod);
|
||||
taskInfoBuilder.setScheduleType(scheduleType);
|
||||
taskInfoBuilder.setLastExecTimeInMs(0);
|
||||
|
||||
if (analysisMethod == AnalysisMethod.SAMPLE) {
|
||||
taskInfoBuilder.setAnalysisMethod(AnalysisMethod.SAMPLE);
|
||||
taskInfoBuilder.setSamplePercent(samplePercent);
|
||||
taskInfoBuilder.setSampleRows(sampleRows);
|
||||
} else {
|
||||
taskInfoBuilder.setAnalysisMethod(AnalysisMethod.FULL);
|
||||
}
|
||||
|
||||
if (analysisType == AnalysisType.HISTOGRAM) {
|
||||
taskInfoBuilder.setAnalysisType(AnalysisType.HISTOGRAM);
|
||||
int numBuckets = stmt.getNumBuckets();
|
||||
int maxBucketNum = numBuckets > 0 ? numBuckets
|
||||
: StatisticConstants.HISTOGRAM_MAX_BUCKET_NUM;
|
||||
taskInfoBuilder.setMaxBucketNum(maxBucketNum);
|
||||
} else {
|
||||
taskInfoBuilder.setAnalysisType(AnalysisType.COLUMN);
|
||||
}
|
||||
|
||||
return taskInfoBuilder;
|
||||
if (scheduleType == ScheduleType.PERIOD) {
|
||||
long periodTimeInMs = stmt.getPeriodTimeInMs();
|
||||
taskInfoBuilder.setPeriodTimeInMs(periodTimeInMs);
|
||||
}
|
||||
|
||||
Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table,
|
||||
columnNames, analysisType, analysisMode);
|
||||
taskInfoBuilder.setColToPartitions(colToPartitions);
|
||||
|
||||
return taskInfoBuilder.build();
|
||||
}
|
||||
|
||||
private void persistAnalysisJob(AnalysisTaskInfoBuilder taskInfoBuilder) throws DdlException {
|
||||
private AnalysisTaskInfo buildAnalysisJobInfo(AnalysisTaskInfo jobInfo) {
|
||||
AnalysisTaskInfoBuilder taskInfoBuilder = new AnalysisTaskInfoBuilder();
|
||||
taskInfoBuilder.setJobId(jobInfo.jobId);
|
||||
taskInfoBuilder.setCatalogName(jobInfo.catalogName);
|
||||
taskInfoBuilder.setDbName(jobInfo.dbName);
|
||||
taskInfoBuilder.setTblName(jobInfo.tblName);
|
||||
taskInfoBuilder.setJobType(JobType.SYSTEM);
|
||||
taskInfoBuilder.setState(AnalysisState.PENDING);
|
||||
taskInfoBuilder.setAnalysisType(jobInfo.analysisType);
|
||||
taskInfoBuilder.setAnalysisMode(jobInfo.analysisMode);
|
||||
taskInfoBuilder.setAnalysisMethod(jobInfo.analysisMethod);
|
||||
taskInfoBuilder.setScheduleType(jobInfo.scheduleType);
|
||||
taskInfoBuilder.setSamplePercent(jobInfo.samplePercent);
|
||||
taskInfoBuilder.setSampleRows(jobInfo.sampleRows);
|
||||
taskInfoBuilder.setMaxBucketNum(jobInfo.maxBucketNum);
|
||||
taskInfoBuilder.setPeriodTimeInMs(jobInfo.periodTimeInMs);
|
||||
taskInfoBuilder.setLastExecTimeInMs(jobInfo.lastExecTimeInMs);
|
||||
try {
|
||||
AnalysisTaskInfoBuilder jobInfoBuilder = taskInfoBuilder.copy();
|
||||
TableIf table = StatisticsUtil
|
||||
.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
|
||||
Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table,
|
||||
jobInfo.colToPartitions.keySet(), jobInfo.analysisType, jobInfo.analysisMode);
|
||||
taskInfoBuilder.setColToPartitions(colToPartitions);
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return taskInfoBuilder.build();
|
||||
}
|
||||
|
||||
private void persistAnalysisJob(AnalysisTaskInfo jobInfo) throws DdlException {
|
||||
if (jobInfo.scheduleType == ScheduleType.PERIOD && jobInfo.lastExecTimeInMs > 0) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
AnalysisTaskInfoBuilder jobInfoBuilder = new AnalysisTaskInfoBuilder(jobInfo);
|
||||
AnalysisTaskInfo analysisTaskInfo = jobInfoBuilder.setTaskId(-1).build();
|
||||
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
|
||||
} catch (Throwable t) {
|
||||
@ -277,16 +338,22 @@ public class AnalysisManager {
|
||||
}
|
||||
}
|
||||
|
||||
private void createTaskForMVIdx(TableIf table, AnalysisTaskInfoBuilder taskInfoBuilder,
|
||||
Map<Long, BaseAnalysisTask> analysisTasks, AnalysisType analysisType,
|
||||
private void createTaskForMVIdx(AnalysisTaskInfo jobInfo, Map<Long, BaseAnalysisTask> analysisTasks,
|
||||
boolean isSync) throws DdlException {
|
||||
TableIf table;
|
||||
try {
|
||||
table = StatisticsUtil.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn(e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
TableType type = table.getType();
|
||||
if (analysisType != AnalysisType.INDEX || !type.equals(TableType.OLAP)) {
|
||||
if (jobInfo.analysisType != AnalysisType.INDEX || !type.equals(TableType.OLAP)) {
|
||||
// not need to collect statistics for materialized view
|
||||
return;
|
||||
}
|
||||
|
||||
taskInfoBuilder.setAnalysisType(analysisType);
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
|
||||
try {
|
||||
@ -295,17 +362,15 @@ public class AnalysisManager {
|
||||
if (meta.getDefineStmt() == null) {
|
||||
continue;
|
||||
}
|
||||
AnalysisTaskInfoBuilder indexTaskInfoBuilder = taskInfoBuilder.copy();
|
||||
long indexId = meta.getIndexId();
|
||||
long taskId = Env.getCurrentEnv().getNextId();
|
||||
AnalysisTaskInfoBuilder indexTaskInfoBuilder = new AnalysisTaskInfoBuilder(jobInfo);
|
||||
AnalysisTaskInfo analysisTaskInfo = indexTaskInfoBuilder.setIndexId(indexId)
|
||||
.setTaskId(taskId).build();
|
||||
analysisTasks.put(taskId, createTask(analysisTaskInfo));
|
||||
// TODO Temporarily save the statistics synchronous task,
|
||||
// which is mainly used to test the incremental collection of statistics.
|
||||
// if (isSync) {
|
||||
// return;
|
||||
// }
|
||||
if (isSync && !ConnectContext.get().getSessionVariable().enableSaveStatisticsSyncJob) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
|
||||
} catch (Exception e) {
|
||||
@ -317,27 +382,24 @@ public class AnalysisManager {
|
||||
}
|
||||
}
|
||||
|
||||
private void createTaskForEachColumns(Map<String, Set<String>> columnToPartitions,
|
||||
AnalysisTaskInfoBuilder taskInfoBuilder, Map<Long, BaseAnalysisTask> analysisTasks,
|
||||
AnalysisType analysisType, boolean isSync) throws DdlException {
|
||||
private void createTaskForEachColumns(AnalysisTaskInfo jobInfo, Map<Long, BaseAnalysisTask> analysisTasks,
|
||||
boolean isSync) throws DdlException {
|
||||
Map<String, Set<String>> columnToPartitions = jobInfo.colToPartitions;
|
||||
for (Entry<String, Set<String>> entry : columnToPartitions.entrySet()) {
|
||||
Set<String> partitionNames = entry.getValue();
|
||||
AnalysisTaskInfoBuilder colTaskInfoBuilder = taskInfoBuilder.copy();
|
||||
if (analysisType != AnalysisType.HISTOGRAM) {
|
||||
// Histograms do not need to specify partitions
|
||||
colTaskInfoBuilder.setPartitionNames(partitionNames);
|
||||
}
|
||||
long indexId = -1;
|
||||
String colName = entry.getKey();
|
||||
long taskId = Env.getCurrentEnv().getNextId();
|
||||
AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setColName(colName)
|
||||
.setIndexId(indexId).setTaskId(taskId).build();
|
||||
String colName = entry.getKey();
|
||||
AnalysisTaskInfoBuilder colTaskInfoBuilder = new AnalysisTaskInfoBuilder(jobInfo);
|
||||
if (jobInfo.analysisType != AnalysisType.HISTOGRAM) {
|
||||
colTaskInfoBuilder.setAnalysisType(AnalysisType.COLUMN);
|
||||
colTaskInfoBuilder.setColToPartitions(Collections.singletonMap(colName, entry.getValue()));
|
||||
}
|
||||
AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setColName(colName).setIndexId(indexId)
|
||||
.setTaskId(taskId).build();
|
||||
analysisTasks.put(taskId, createTask(analysisTaskInfo));
|
||||
// TODO Temporarily save the statistics synchronous task,
|
||||
// which is mainly used to test the incremental collection of statistics.
|
||||
// if (isSync) {
|
||||
// continue;
|
||||
// }
|
||||
if (isSync && !ConnectContext.get().getSessionVariable().enableSaveStatisticsSyncJob) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
|
||||
} catch (Exception e) {
|
||||
@ -446,7 +508,7 @@ public class AnalysisManager {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("jobState", AnalysisState.FAILED.toString());
|
||||
params.put("message", ", message = 'Killed by user : " + ConnectContext.get().getQualifiedUser() + "'");
|
||||
params.put("updateExecTime", ", last_exec_time_in_ms=" + String.valueOf(System.currentTimeMillis()));
|
||||
params.put("updateExecTime", ", last_exec_time_in_ms=" + System.currentTimeMillis());
|
||||
params.put("jobId", String.valueOf(killAnalysisJobStmt.jobId));
|
||||
params.put("taskId", "'-1'");
|
||||
params.put("isAllTask", "true");
|
||||
@ -507,8 +569,10 @@ public class AnalysisManager {
|
||||
}
|
||||
try {
|
||||
task.execute();
|
||||
updateSyncTaskStatus(task, AnalysisState.FINISHED);
|
||||
} catch (Throwable t) {
|
||||
colNames.add(task.info.colName);
|
||||
updateSyncTaskStatus(task, AnalysisState.FAILED);
|
||||
LOG.info("Failed to analyze, info: {}", task);
|
||||
}
|
||||
}
|
||||
@ -516,5 +580,12 @@ public class AnalysisManager {
|
||||
throw new RuntimeException("Failed to analyze following columns: " + String.join(",", colNames));
|
||||
}
|
||||
}
|
||||
|
||||
private void updateSyncTaskStatus(BaseAnalysisTask task, AnalysisState state) {
|
||||
if (ConnectContext.get().getSessionVariable().enableSaveStatisticsSyncJob) {
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(task.info, state, "", System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,11 +17,16 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
@ -29,6 +34,10 @@ public class AnalysisTaskInfo {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisTaskInfo.class);
|
||||
|
||||
public enum AnalysisMode {
|
||||
INCREMENTAL,
|
||||
FULL
|
||||
}
|
||||
|
||||
public enum AnalysisMethod {
|
||||
SAMPLE,
|
||||
@ -63,7 +72,7 @@ public class AnalysisTaskInfo {
|
||||
|
||||
public final String tblName;
|
||||
|
||||
public final Set<String> partitionNames;
|
||||
public final Map<String, Set<String>> colToPartitions;
|
||||
|
||||
public final String colName;
|
||||
|
||||
@ -71,6 +80,8 @@ public class AnalysisTaskInfo {
|
||||
|
||||
public final JobType jobType;
|
||||
|
||||
public final AnalysisMode analysisMode;
|
||||
|
||||
public final AnalysisMethod analysisMethod;
|
||||
|
||||
public final AnalysisType analysisType;
|
||||
@ -81,33 +92,38 @@ public class AnalysisTaskInfo {
|
||||
|
||||
public final int maxBucketNum;
|
||||
|
||||
public String message;
|
||||
public final long periodTimeInMs;
|
||||
|
||||
// finished or failed
|
||||
public int lastExecTimeInMs = 0;
|
||||
public long lastExecTimeInMs;
|
||||
|
||||
public AnalysisState state;
|
||||
|
||||
public final ScheduleType scheduleType;
|
||||
|
||||
public String message;
|
||||
|
||||
public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
|
||||
Set<String> partitionNames, String colName, Long indexId, JobType jobType, AnalysisMethod analysisMethod,
|
||||
AnalysisType analysisType, int samplePercent, int sampleRows, int maxBucketNum,
|
||||
String message, int lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType) {
|
||||
Map<String, Set<String>> colToPartitions, String colName, Long indexId, JobType jobType,
|
||||
AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
|
||||
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
|
||||
long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType) {
|
||||
this.jobId = jobId;
|
||||
this.taskId = taskId;
|
||||
this.catalogName = catalogName;
|
||||
this.dbName = dbName;
|
||||
this.tblName = tblName;
|
||||
this.partitionNames = partitionNames;
|
||||
this.colToPartitions = colToPartitions;
|
||||
this.colName = colName;
|
||||
this.indexId = indexId;
|
||||
this.jobType = jobType;
|
||||
this.analysisMode = analysisMode;
|
||||
this.analysisMethod = analysisMethod;
|
||||
this.analysisType = analysisType;
|
||||
this.samplePercent = samplePercent;
|
||||
this.sampleRows = sampleRows;
|
||||
this.maxBucketNum = maxBucketNum;
|
||||
this.periodTimeInMs = periodTimeInMs;
|
||||
this.message = message;
|
||||
this.lastExecTimeInMs = lastExecTimeInMs;
|
||||
this.state = state;
|
||||
@ -121,13 +137,30 @@ public class AnalysisTaskInfo {
|
||||
sj.add("CatalogName: " + catalogName);
|
||||
sj.add("DBName: " + dbName);
|
||||
sj.add("TableName: " + tblName);
|
||||
sj.add("PartitionNames: " + StatisticsUtil.joinElementsToString(partitionNames, ","));
|
||||
sj.add("ColumnName: " + colName);
|
||||
sj.add("TaskType: " + analysisType.toString());
|
||||
sj.add("TaskMode: " + analysisMode.toString());
|
||||
sj.add("TaskMethod: " + analysisMethod.toString());
|
||||
sj.add("Message: " + message);
|
||||
sj.add("LastExecTime: " + lastExecTimeInMs);
|
||||
sj.add("CurrentState: " + state.toString());
|
||||
if (samplePercent > 0) {
|
||||
sj.add("SamplePercent: " + samplePercent);
|
||||
}
|
||||
if (sampleRows > 0) {
|
||||
sj.add("SampleRows: " + sampleRows);
|
||||
}
|
||||
if (maxBucketNum > 0) {
|
||||
sj.add("MaxBucketNum: " + maxBucketNum);
|
||||
}
|
||||
if (colToPartitions != null) {
|
||||
sj.add("colToPartitions: " + getColToPartitionStr());
|
||||
}
|
||||
if (lastExecTimeInMs > 0) {
|
||||
sj.add("LastExecTime: " + StatisticsUtil.getReadableTime(lastExecTimeInMs));
|
||||
}
|
||||
if (periodTimeInMs > 0) {
|
||||
sj.add("periodTimeInMs: " + StatisticsUtil.getReadableTime(periodTimeInMs));
|
||||
}
|
||||
return sj.toString();
|
||||
}
|
||||
|
||||
@ -138,4 +171,73 @@ public class AnalysisTaskInfo {
|
||||
public boolean isJob() {
|
||||
return taskId == -1;
|
||||
}
|
||||
|
||||
// TODO: use thrift
|
||||
public static AnalysisTaskInfo fromResultRow(ResultRow resultRow) {
|
||||
try {
|
||||
AnalysisTaskInfoBuilder analysisTaskInfoBuilder = new AnalysisTaskInfoBuilder();
|
||||
long jobId = Long.parseLong(resultRow.getColumnValue("job_id"));
|
||||
analysisTaskInfoBuilder.setJobId(jobId);
|
||||
long taskId = Long.parseLong(resultRow.getColumnValue("task_id"));
|
||||
analysisTaskInfoBuilder.setTaskId(taskId);
|
||||
String catalogName = resultRow.getColumnValue("catalog_name");
|
||||
analysisTaskInfoBuilder.setCatalogName(catalogName);
|
||||
String dbName = resultRow.getColumnValue("db_name");
|
||||
analysisTaskInfoBuilder.setDbName(dbName);
|
||||
String tblName = resultRow.getColumnValue("tbl_name");
|
||||
analysisTaskInfoBuilder.setTblName(tblName);
|
||||
String colName = resultRow.getColumnValue("col_name");
|
||||
analysisTaskInfoBuilder.setColName(colName);
|
||||
long indexId = Long.parseLong(resultRow.getColumnValue("index_id"));
|
||||
analysisTaskInfoBuilder.setIndexId(indexId);
|
||||
String partitionNames = resultRow.getColumnValue("col_partitions");
|
||||
Map<String, Set<String>> colToPartitions = getColToPartition(partitionNames);
|
||||
analysisTaskInfoBuilder.setColToPartitions(colToPartitions);
|
||||
String jobType = resultRow.getColumnValue("job_type");
|
||||
analysisTaskInfoBuilder.setJobType(JobType.valueOf(jobType));
|
||||
String analysisType = resultRow.getColumnValue("analysis_type");
|
||||
analysisTaskInfoBuilder.setAnalysisType(AnalysisType.valueOf(analysisType));
|
||||
String analysisMode = resultRow.getColumnValue("analysis_mode");
|
||||
analysisTaskInfoBuilder.setAnalysisMode(AnalysisMode.valueOf(analysisMode));
|
||||
String analysisMethod = resultRow.getColumnValue("analysis_method");
|
||||
analysisTaskInfoBuilder.setAnalysisMethod(AnalysisMethod.valueOf(analysisMethod));
|
||||
String scheduleType = resultRow.getColumnValue("schedule_type");
|
||||
analysisTaskInfoBuilder.setScheduleType(ScheduleType.valueOf(scheduleType));
|
||||
String state = resultRow.getColumnValue("state");
|
||||
analysisTaskInfoBuilder.setState(AnalysisState.valueOf(state));
|
||||
String samplePercent = resultRow.getColumnValue("sample_percent");
|
||||
analysisTaskInfoBuilder.setSamplePercent(StatisticsUtil.convertStrToInt(samplePercent));
|
||||
String sampleRows = resultRow.getColumnValue("sample_rows");
|
||||
analysisTaskInfoBuilder.setSampleRows(StatisticsUtil.convertStrToInt(sampleRows));
|
||||
String maxBucketNum = resultRow.getColumnValue("max_bucket_num");
|
||||
analysisTaskInfoBuilder.setMaxBucketNum(StatisticsUtil.convertStrToInt(maxBucketNum));
|
||||
String periodTimeInMs = resultRow.getColumnValue("period_time_in_ms");
|
||||
analysisTaskInfoBuilder.setPeriodTimeInMs(StatisticsUtil.convertStrToInt(periodTimeInMs));
|
||||
String lastExecTimeInMs = resultRow.getColumnValue("last_exec_time_in_ms");
|
||||
analysisTaskInfoBuilder.setLastExecTimeInMs(StatisticsUtil.convertStrToLong(lastExecTimeInMs));
|
||||
String message = resultRow.getColumnValue("message");
|
||||
analysisTaskInfoBuilder.setMessage(message);
|
||||
return analysisTaskInfoBuilder.build();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to deserialize analysis task info.", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public String getColToPartitionStr() {
|
||||
if (colToPartitions == null || colToPartitions.isEmpty()) {
|
||||
return "";
|
||||
}
|
||||
Gson gson = new Gson();
|
||||
return gson.toJson(colToPartitions);
|
||||
}
|
||||
|
||||
private static Map<String, Set<String>> getColToPartition(String colToPartitionStr) {
|
||||
if (colToPartitionStr == null || colToPartitionStr.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
Gson gson = new Gson();
|
||||
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
|
||||
return gson.fromJson(colToPartitionStr, type);
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,10 +18,12 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMode;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
|
||||
import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class AnalysisTaskInfoBuilder {
|
||||
@ -30,19 +32,47 @@ public class AnalysisTaskInfoBuilder {
|
||||
private String catalogName;
|
||||
private String dbName;
|
||||
private String tblName;
|
||||
private Set<String> partitionNames;
|
||||
private Map<String, Set<String>> colToPartitions;
|
||||
private String colName;
|
||||
private Long indexId;
|
||||
private JobType jobType;
|
||||
private AnalysisMode analysisMode;
|
||||
private AnalysisMethod analysisMethod;
|
||||
private AnalysisType analysisType;
|
||||
private int maxBucketNum;
|
||||
private int samplePercent;
|
||||
private int sampleRows;
|
||||
private String message;
|
||||
private int lastExecTimeInMs;
|
||||
private long periodTimeInMs;
|
||||
private long lastExecTimeInMs;
|
||||
private AnalysisState state;
|
||||
private ScheduleType scheduleType;
|
||||
private String message;
|
||||
|
||||
public AnalysisTaskInfoBuilder() {
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder(AnalysisTaskInfo info) {
|
||||
jobId = info.jobId;
|
||||
taskId = info.taskId;
|
||||
catalogName = info.catalogName;
|
||||
dbName = info.dbName;
|
||||
tblName = info.tblName;
|
||||
colToPartitions = info.colToPartitions;
|
||||
colName = info.colName;
|
||||
indexId = info.indexId;
|
||||
jobType = info.jobType;
|
||||
analysisMode = info.analysisMode;
|
||||
analysisMethod = info.analysisMethod;
|
||||
analysisType = info.analysisType;
|
||||
samplePercent = info.samplePercent;
|
||||
sampleRows = info.sampleRows;
|
||||
periodTimeInMs = info.periodTimeInMs;
|
||||
maxBucketNum = info.maxBucketNum;
|
||||
message = info.message;
|
||||
lastExecTimeInMs = info.lastExecTimeInMs;
|
||||
state = info.state;
|
||||
scheduleType = info.scheduleType;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setJobId(long jobId) {
|
||||
this.jobId = jobId;
|
||||
@ -69,8 +99,8 @@ public class AnalysisTaskInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setPartitionNames(Set<String> partitionNames) {
|
||||
this.partitionNames = partitionNames;
|
||||
public AnalysisTaskInfoBuilder setColToPartitions(Map<String, Set<String>> colToPartitions) {
|
||||
this.colToPartitions = colToPartitions;
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -89,6 +119,11 @@ public class AnalysisTaskInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setAnalysisMode(AnalysisMode analysisMode) {
|
||||
this.analysisMode = analysisMode;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setAnalysisMethod(AnalysisMethod analysisMethod) {
|
||||
this.analysisMethod = analysisMethod;
|
||||
return this;
|
||||
@ -114,12 +149,17 @@ public class AnalysisTaskInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setPeriodTimeInMs(long periodTimeInMs) {
|
||||
this.periodTimeInMs = periodTimeInMs;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setMessage(String message) {
|
||||
this.message = message;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder setLastExecTimeInMs(int lastExecTimeInMs) {
|
||||
public AnalysisTaskInfoBuilder setLastExecTimeInMs(long lastExecTimeInMs) {
|
||||
this.lastExecTimeInMs = lastExecTimeInMs;
|
||||
return this;
|
||||
}
|
||||
@ -135,9 +175,9 @@ public class AnalysisTaskInfoBuilder {
|
||||
}
|
||||
|
||||
public AnalysisTaskInfo build() {
|
||||
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, partitionNames,
|
||||
colName, indexId, jobType, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, message, lastExecTimeInMs, state, scheduleType);
|
||||
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions,
|
||||
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, state, scheduleType);
|
||||
}
|
||||
|
||||
public AnalysisTaskInfoBuilder copy() {
|
||||
@ -147,14 +187,16 @@ public class AnalysisTaskInfoBuilder {
|
||||
.setCatalogName(catalogName)
|
||||
.setDbName(dbName)
|
||||
.setTblName(tblName)
|
||||
.setPartitionNames(partitionNames)
|
||||
.setColToPartitions(colToPartitions)
|
||||
.setColName(colName)
|
||||
.setIndexId(indexId)
|
||||
.setJobType(jobType)
|
||||
.setAnalysisMode(analysisMode)
|
||||
.setAnalysisMethod(analysisMethod)
|
||||
.setAnalysisType(analysisType)
|
||||
.setSamplePercent(samplePercent)
|
||||
.setSampleRows(sampleRows)
|
||||
.setPeriodTimeInMs(periodTimeInMs)
|
||||
.setMaxBucketNum(maxBucketNum)
|
||||
.setMessage(message)
|
||||
.setLastExecTimeInMs(lastExecTimeInMs)
|
||||
|
||||
@ -34,7 +34,7 @@ public class AnalysisTaskScheduler {
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisTaskScheduler.class);
|
||||
|
||||
private final PriorityQueue<BaseAnalysisTask> systemJobQueue =
|
||||
new PriorityQueue<>(Comparator.comparingInt(BaseAnalysisTask::getLastExecTime));
|
||||
new PriorityQueue<>(Comparator.comparingLong(BaseAnalysisTask::getLastExecTime));
|
||||
|
||||
private final Queue<BaseAnalysisTask> manualJobQueue = new LinkedList<>();
|
||||
|
||||
|
||||
@ -170,7 +170,7 @@ public abstract class BaseAnalysisTask {
|
||||
String.format("Job has been cancelled: %s", info.toString()), -1);
|
||||
}
|
||||
|
||||
public int getLastExecTime() {
|
||||
public long getLastExecTime() {
|
||||
return info.lastExecTimeInMs;
|
||||
}
|
||||
|
||||
|
||||
@ -74,7 +74,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
List<String> partitionAnalysisSQLs = new ArrayList<>();
|
||||
try {
|
||||
tbl.readLock();
|
||||
Set<String> partNames = info.partitionNames;
|
||||
Set<String> partNames = info.colToPartitions.get(info.colName);
|
||||
for (String partName : partNames) {
|
||||
Partition part = tbl.getPartition(partName);
|
||||
if (part == null) {
|
||||
|
||||
@ -0,0 +1,73 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
public class StatisticsAutoAnalyzer extends MasterDaemon {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsAutoAnalyzer.class);
|
||||
|
||||
public StatisticsAutoAnalyzer() {
|
||||
super("Automatic Analyzer", TimeUnit.SECONDS.toMillis(Config.auto_check_statistics_in_sec));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
return;
|
||||
}
|
||||
if (!StatisticsUtil.statsTblAvailable()) {
|
||||
return;
|
||||
}
|
||||
if (Config.enable_auto_collect_statistics) {
|
||||
// periodic analyze
|
||||
periodicAnalyze();
|
||||
// TODO auto analyze
|
||||
}
|
||||
}
|
||||
|
||||
private void periodicAnalyze() {
|
||||
List<ResultRow> resultRows = StatisticsRepository.fetchPeriodicAnalysisJobs();
|
||||
if (resultRows.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
List<AnalysisTaskInfo> jobInfos = StatisticsUtil.deserializeToAnalysisJob(resultRows);
|
||||
for (AnalysisTaskInfo jobInfo : jobInfos) {
|
||||
analysisManager.createAnalysisJob(jobInfo);
|
||||
}
|
||||
} catch (TException | DdlException e) {
|
||||
LOG.warn("Failed to periodically analyze the statistics." + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -30,6 +30,7 @@ import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -76,11 +77,12 @@ public class StatisticsRepository {
|
||||
+ FULL_QUALIFIED_COLUMN_HISTOGRAM_NAME
|
||||
+ " WHERE `id` = '${id}'";
|
||||
|
||||
private static final String PERSIST_ANALYSIS_TASK_SQL_TEMPLATE = "INSERT INTO "
|
||||
+ FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, ${taskId}, '${catalogName}', '${dbName}',"
|
||||
+ "'${tblName}','${colName}', '${indexId}','${jobType}', '${analysisType}', "
|
||||
+ "'${message}', '${lastExecTimeInMs}',"
|
||||
+ "'${state}', '${scheduleType}')";
|
||||
private static final String PERSIST_ANALYSIS_TASK_SQL_TEMPLATE =
|
||||
"INSERT INTO " + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME
|
||||
+ " VALUES(${jobId}, ${taskId}, '${catalogName}', '${dbName}', '${tblName}', "
|
||||
+ "'${colName}', '${indexId}', '${colPartitions}', '${jobType}', '${analysisType}', "
|
||||
+ "'${analysisMode}', '${analysisMethod}', '${scheduleType}', '${state}', ${samplePercent}, "
|
||||
+ "${sampleRows}, ${maxBucketNum}, ${periodTimeInMs}, ${lastExecTimeInMs}, '${message}')";
|
||||
|
||||
private static final String INSERT_INTO_COLUMN_STATISTICS = "INSERT INTO "
|
||||
+ FULL_QUALIFIED_COLUMN_STATISTICS_NAME + " VALUES('${id}', ${catalogId}, ${dbId}, ${tblId}, '${idxId}',"
|
||||
@ -114,6 +116,14 @@ public class StatisticsRepository {
|
||||
+ " WHERE tbl_id = ${tblId}"
|
||||
+ " AND part_id IS NOT NULL";
|
||||
|
||||
private static final String FETCH_PERIODIC_ANALYSIS_JOB_SQL = "SELECT * FROM "
|
||||
+ FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME
|
||||
+ " WHERE task_id = -1 "
|
||||
+ " AND schedule_type = 'PERIOD' "
|
||||
+ " AND state = 'FINISHED' "
|
||||
+ " AND last_exec_time_in_ms > 0 "
|
||||
+ " AND (${currentTimeStamp} - last_exec_time_in_ms >= period_time_in_ms)";
|
||||
|
||||
public static ColumnStatistic queryColumnStatisticsByName(long tableId, String colName) {
|
||||
ResultRow resultRow = queryColumnStatisticById(tableId, colName);
|
||||
if (resultRow == null) {
|
||||
@ -231,12 +241,19 @@ public class StatisticsRepository {
|
||||
params.put("tblName", analysisTaskInfo.tblName);
|
||||
params.put("colName", analysisTaskInfo.colName == null ? "" : analysisTaskInfo.colName);
|
||||
params.put("indexId", analysisTaskInfo.indexId == null ? "-1" : String.valueOf(analysisTaskInfo.indexId));
|
||||
params.put("colPartitions", analysisTaskInfo.getColToPartitionStr());
|
||||
params.put("jobType", analysisTaskInfo.jobType.toString());
|
||||
params.put("analysisType", analysisTaskInfo.analysisMethod.toString());
|
||||
params.put("message", "");
|
||||
params.put("lastExecTimeInMs", "0");
|
||||
params.put("state", AnalysisState.PENDING.toString());
|
||||
params.put("analysisType", analysisTaskInfo.analysisType.toString());
|
||||
params.put("analysisMode", analysisTaskInfo.analysisMode.toString());
|
||||
params.put("analysisMethod", analysisTaskInfo.analysisMethod.toString());
|
||||
params.put("scheduleType", analysisTaskInfo.scheduleType.toString());
|
||||
params.put("state", analysisTaskInfo.state.toString());
|
||||
params.put("samplePercent", String.valueOf(analysisTaskInfo.samplePercent));
|
||||
params.put("sampleRows", String.valueOf(analysisTaskInfo.sampleRows));
|
||||
params.put("maxBucketNum", String.valueOf(analysisTaskInfo.maxBucketNum));
|
||||
params.put("periodTimeInMs", String.valueOf(analysisTaskInfo.periodTimeInMs));
|
||||
params.put("lastExecTimeInMs", String.valueOf(analysisTaskInfo.lastExecTimeInMs));
|
||||
params.put("message", "");
|
||||
StatisticsUtil.execUpdate(
|
||||
new StringSubstitutor(params).replace(PERSIST_ANALYSIS_TASK_SQL_TEMPLATE));
|
||||
}
|
||||
@ -338,4 +355,17 @@ public class StatisticsRepository {
|
||||
|
||||
return columnToPartitions;
|
||||
}
|
||||
|
||||
public static List<ResultRow> fetchPeriodicAnalysisJobs() {
|
||||
ImmutableMap<String, String> params = ImmutableMap
|
||||
.of("currentTimeStamp", String.valueOf(System.currentTimeMillis()));
|
||||
try {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(FETCH_PERIODIC_ANALYSIS_JOB_SQL);
|
||||
return StatisticsUtil.execStatisticQuery(sql);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to update status", e);
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,12 +59,15 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -74,6 +77,8 @@ import java.util.stream.Collectors;
|
||||
|
||||
public class StatisticsUtil {
|
||||
|
||||
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||
|
||||
public static List<ResultRow> executeQuery(String template, Map<String, String> params) {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(template);
|
||||
@ -103,9 +108,14 @@ public class StatisticsUtil {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: finish this.
|
||||
public static List<AnalysisTaskInfo> deserializeToAnalysisJob(List<ResultRow> resultBatches) throws TException {
|
||||
return new ArrayList<>();
|
||||
public static List<AnalysisTaskInfo> deserializeToAnalysisJob(List<ResultRow> resultBatches)
|
||||
throws TException {
|
||||
if (CollectionUtils.isEmpty(resultBatches)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return resultBatches.stream()
|
||||
.map(AnalysisTaskInfo::fromResultRow)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static List<ColumnStatistic> deserializeToColumnStatistics(List<ResultRow> resultBatches)
|
||||
@ -362,4 +372,20 @@ public class StatisticsUtil {
|
||||
values.forEach(v -> builder.add(String.valueOf(v)));
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
public static int convertStrToInt(String str) {
|
||||
return StringUtils.isNumeric(str) ? Integer.parseInt(str) : 0;
|
||||
}
|
||||
|
||||
public static long convertStrToLong(String str) {
|
||||
return StringUtils.isNumeric(str) ? Long.parseLong(str) : 0;
|
||||
}
|
||||
|
||||
public static String getReadableTime(long timeInMs) {
|
||||
if (timeInMs <= 0) {
|
||||
return "";
|
||||
}
|
||||
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
|
||||
return format.format(new Date(timeInMs));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user