[feature-wip](statistics) step2: schedule the statistics job and generate executable tasks (#8859)
This pull request includes some implementations of the statistics(https://github.com/apache/incubator-doris/issues/6370), it will not affect any existing code and users will not be able to create statistics job. After receiving the statistics collection statement, it generates a job. Here it implements the division of statistics collection jobs according to the following statistics categories: table: - `row_count`: table row count are critical in estimating cardinality and memory usage of scan nodes. - `data_size`: table size, not applicable to CBO, mainly used to monitor and manage table size. column: - `num_distinct_value`: used to determine the selectivity of an equivalent expression. - `min`: The minimum value. - `max`: The maximum value. - `num_nulls`: number of nulls. - `avg_col_len`: the average length of a column, in bytes, is used for memory and network IO evaluation. - `max_col_len`: the Max length of the column, in bytes, is used for memory and network IO evaluation. After the job is divided, statistics tasks will be obtained.
This commit is contained in:
@ -21,15 +21,17 @@ import org.apache.doris.analysis.AnalyzeStmt;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/***
|
||||
* Used to store statistics job info,
|
||||
@ -47,6 +49,8 @@ public class StatisticsJob {
|
||||
CANCELLED
|
||||
}
|
||||
|
||||
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||
|
||||
private final long id = Catalog.getCurrentCatalog().getNextId();
|
||||
|
||||
/**
|
||||
@ -89,6 +93,22 @@ public class StatisticsJob {
|
||||
this.properties = properties == null ? Maps.newHashMap() : properties;
|
||||
}
|
||||
|
||||
public void readLock() {
|
||||
lock.readLock().lock();
|
||||
}
|
||||
|
||||
public void readUnlock() {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
private void writeLock() {
|
||||
lock.writeLock().lock();
|
||||
}
|
||||
|
||||
private void writeUnlock() {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return this.id;
|
||||
}
|
||||
@ -137,6 +157,103 @@ public class StatisticsJob {
|
||||
return this.progress;
|
||||
}
|
||||
|
||||
public void updateJobState(JobState newState) throws IllegalStateException {
|
||||
LOG.info("To change statistics job(id={}) state from {} to {}", id, jobState, newState);
|
||||
writeLock();
|
||||
|
||||
try {
|
||||
// PENDING -> SCHEDULING/FAILED/CANCELLED
|
||||
if (jobState == JobState.PENDING) {
|
||||
if (newState == JobState.SCHEDULING) {
|
||||
this.jobState = newState;
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else if (newState == JobState.FAILED) {
|
||||
this.jobState = newState;
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else if (newState == JobState.CANCELLED) {
|
||||
this.jobState = newState;
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else {
|
||||
LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState);
|
||||
throw new IllegalStateException("Invalid job state transition from PENDING to " + newState);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// SCHEDULING -> RUNNING/FAILED/CANCELLED
|
||||
if (jobState == JobState.SCHEDULING) {
|
||||
if (newState == JobState.RUNNING) {
|
||||
this.jobState = newState;
|
||||
// job start running, set start time
|
||||
this.startTime = System.currentTimeMillis();
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else if (newState == JobState.FAILED) {
|
||||
this.jobState = newState;
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else if (newState == JobState.CANCELLED) {
|
||||
this.jobState = newState;
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else {
|
||||
LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState);
|
||||
throw new IllegalStateException("Invalid job state transition from SCHEDULING to " + newState);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// RUNNING -> FINISHED/FAILED/CANCELLED
|
||||
if (jobState == JobState.RUNNING) {
|
||||
if (newState == JobState.FINISHED) {
|
||||
// set finish time
|
||||
this.finishTime = System.currentTimeMillis();
|
||||
this.jobState = newState;
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else if (newState == JobState.FAILED) {
|
||||
this.jobState = newState;
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else if (newState == JobState.CANCELLED) {
|
||||
this.jobState = newState;
|
||||
LOG.info("Statistics job(id={}) state changed from {} to {}", id, jobState, newState);
|
||||
} else {
|
||||
LOG.info("Invalid statistics job(id={}) state transition from {} to {}", id, jobState, newState);
|
||||
throw new IllegalStateException("Invalid job state transition from RUNNING to " + newState);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// unsupported transition
|
||||
LOG.info("Invalid job(id={}) state transition from {} to {}", id, jobState, newState);
|
||||
throw new IllegalStateException("Invalid job state transition from " + jobState + " to " + newState);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
LOG.info("Statistics job(id={}) current state is {} ", id, jobState);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateJobInfoByTaskId(Long taskId, String errorMsg) {
|
||||
writeLock();
|
||||
|
||||
try {
|
||||
for (StatisticsTask task : this.tasks) {
|
||||
if (taskId == task.getId()) {
|
||||
if (Strings.isNullOrEmpty(errorMsg)) {
|
||||
this.progress += 1;
|
||||
if (this.progress == this.tasks.size()) {
|
||||
updateJobState(StatisticsJob.JobState.FINISHED);
|
||||
}
|
||||
task.updateTaskState(StatisticsTask.TaskState.FINISHED);
|
||||
} else {
|
||||
this.errorMsgs.add(errorMsg);
|
||||
task.updateTaskState(StatisticsTask.TaskState.FAILED);
|
||||
updateJobState(StatisticsJob.JobState.FAILED);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get statisticsJob from analyzeStmt.
|
||||
* AnalyzeStmt: analyze t1(c1,c2,c3)
|
||||
|
||||
@ -18,23 +18,52 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
|
||||
import com.google.common.collect.Queues;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
/*
|
||||
Schedule statistics job.
|
||||
1. divide job to multi task
|
||||
2. submit all task to StatisticsTaskScheduler
|
||||
Switch job state from pending to scheduling.
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Schedule statistics job.
|
||||
* 1. divide job to multi task
|
||||
* 2. submit all task to StatisticsTaskScheduler
|
||||
* Switch job state from pending to scheduling.
|
||||
*/
|
||||
public class StatisticsJobScheduler extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsJobScheduler.class);
|
||||
|
||||
public Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue();
|
||||
/**
|
||||
* If the table row-count is greater than the maximum number of Be scans for a single BE,
|
||||
* we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
|
||||
* COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is for min(c1)/max(c1)/ndv(c1).
|
||||
*/
|
||||
private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
|
||||
private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
|
||||
|
||||
/**
|
||||
* Different statistics need to be collected for the jobs submitted by users.
|
||||
* if all statistics be collected at the same time, the cluster may be overburdened
|
||||
* and normal query services may be affected. Therefore, we put the jobs into the queue
|
||||
* and schedule them one by one, and finally divide each job to several subtasks and execute them.
|
||||
*/
|
||||
public final Queue<StatisticsJob> pendingJobQueue = Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
|
||||
|
||||
public StatisticsJobScheduler() {
|
||||
super("Statistics job scheduler", 0);
|
||||
@ -42,22 +71,193 @@ public class StatisticsJobScheduler extends MasterDaemon {
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
// TODO
|
||||
StatisticsJob pendingJob = pendingJobQueue.peek();
|
||||
// step0: check job state again
|
||||
// step1: divide statistics job to task
|
||||
List<StatisticsTask> statisticsTaskList = divide(pendingJob);
|
||||
// step2: submit
|
||||
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
|
||||
if (pendingJob != null) {
|
||||
try {
|
||||
if (pendingJob.getTasks().size() == 0) {
|
||||
divide(pendingJob);
|
||||
}
|
||||
List<StatisticsTask> tasks = pendingJob.getTasks();
|
||||
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(tasks);
|
||||
pendingJob.updateJobState(StatisticsJob.JobState.SCHEDULING);
|
||||
pendingJobQueue.remove();
|
||||
} catch (IllegalStateException e) {
|
||||
// throw IllegalStateException if the queue is full, re-add the tasks next time
|
||||
LOG.info("The statistics task queue is full, schedule the job(id={}) later", pendingJob.getId());
|
||||
} catch (DdlException e) {
|
||||
pendingJobQueue.remove();
|
||||
pendingJob.updateJobState(StatisticsJob.JobState.FAILED);
|
||||
LOG.info("Failed to schedule the statistical job(id={})", pendingJob.getId(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void addPendingJob(StatisticsJob statisticsJob) throws IllegalStateException {
|
||||
pendingJobQueue.add(statisticsJob);
|
||||
}
|
||||
|
||||
/**
|
||||
* Statistics tasks are of the following types:
|
||||
* table:
|
||||
* - row_count: table row count are critical in estimating cardinality and memory usage of scan nodes.
|
||||
* - data_size: table size, not applicable to CBO, mainly used to monitor and manage table size.
|
||||
* column:
|
||||
* - num_distinct_value: used to determine the selectivity of an equivalent expression.
|
||||
* - min: The minimum value.
|
||||
* - max: The maximum value.
|
||||
* - num_nulls: number of nulls.
|
||||
* - avg_col_len: the average length of a column, in bytes, is used for memory and network IO evaluation.
|
||||
* - max_col_len: the Max length of the column, in bytes, is used for memory and network IO evaluation.
|
||||
* <p>
|
||||
* Divide:
|
||||
* - min, max, ndv: These three full indicators are collected by a sub-task.
|
||||
* - max_col_lens, avg_col_lens: Two sampling indicators were collected by a sub-task.
|
||||
* <p>
|
||||
* If the table row-count is greater than the maximum number of Be scans for a single BE,
|
||||
* we'll divide subtasks by partition. relevant values(3700000000L&600000000L) are derived from test.
|
||||
* <p>
|
||||
* Eventually, we will get several subtasks of the following types:
|
||||
*
|
||||
* @throws DdlException DdlException
|
||||
* @see MetaStatisticsTask
|
||||
* @see SampleSQLStatisticsTask
|
||||
* @see SQLStatisticsTask
|
||||
*/
|
||||
private void divide(StatisticsJob statisticsJob) throws DdlException {
|
||||
long jobId = statisticsJob.getId();
|
||||
long dbId = statisticsJob.getDbId();
|
||||
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbId);
|
||||
Set<Long> tblIds = statisticsJob.getTblIds();
|
||||
Map<Long, List<String>> tableIdToColumnName = statisticsJob.getTableIdToColumnName();
|
||||
List<StatisticsTask> tasks = statisticsJob.getTasks();
|
||||
List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
|
||||
|
||||
private List<StatisticsTask> divide(StatisticsJob statisticsJob) {
|
||||
// TODO
|
||||
return new ArrayList<>();
|
||||
for (Long tblId : tblIds) {
|
||||
Table tbl = db.getTableOrDdlException(tblId);
|
||||
long rowCount = tbl.getRowCount();
|
||||
List<Long> partitionIds = ((OlapTable) tbl).getPartitionIds();
|
||||
List<String> columnNameList = tableIdToColumnName.get(tblId);
|
||||
|
||||
// step 1: generate data_size task
|
||||
StatsCategoryDesc dataSizeCategory = getTblStatsCategoryDesc(dbId, tblId);
|
||||
StatsGranularityDesc dataSizeGranularity = getTblStatsGranularityDesc(tblId);
|
||||
MetaStatisticsTask dataSizeTask = new MetaStatisticsTask(jobId,
|
||||
dataSizeGranularity, dataSizeCategory, Collections.singletonList(StatsType.DATA_SIZE));
|
||||
tasks.add(dataSizeTask);
|
||||
|
||||
// step 2: generate row_count task
|
||||
KeysType keysType = ((OlapTable) tbl).getKeysType();
|
||||
if (keysType == KeysType.DUP_KEYS) {
|
||||
StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
|
||||
StatsGranularityDesc rowCountGranularity = getTblStatsGranularityDesc(tblId);
|
||||
MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId,
|
||||
rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
|
||||
tasks.add(metaTask);
|
||||
} else {
|
||||
if (rowCount > backendIds.size() * COUNT_MAX_SCAN_PER_TASK) {
|
||||
// divide subtasks by partition
|
||||
for (Long partitionId : partitionIds) {
|
||||
StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
|
||||
StatsGranularityDesc rowCountGranularity = getPartitionStatsGranularityDesc(tblId, partitionId);
|
||||
SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
|
||||
rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
|
||||
tasks.add(sqlTask);
|
||||
}
|
||||
} else {
|
||||
StatsCategoryDesc rowCountCategory = getTblStatsCategoryDesc(dbId, tblId);
|
||||
StatsGranularityDesc rowCountGranularity = getTblStatsGranularityDesc(tblId);
|
||||
SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
|
||||
rowCountGranularity, rowCountCategory, Collections.singletonList(StatsType.ROW_COUNT));
|
||||
tasks.add(sqlTask);
|
||||
}
|
||||
}
|
||||
|
||||
// step 3: generate [min,max,ndv] task
|
||||
if (rowCount > backendIds.size() * NDV_MAX_SCAN_PER_TASK) {
|
||||
// divide subtasks by partition
|
||||
columnNameList.forEach(columnName -> {
|
||||
for (Long partitionId : partitionIds) {
|
||||
StatsCategoryDesc columnCategory = getColStatsCategoryDesc(dbId, tblId, columnName);
|
||||
StatsGranularityDesc columnGranularity = getPartitionStatsGranularityDesc(tblId, partitionId);
|
||||
List<StatsType> statsTypes = Arrays.asList(StatsType.MIN_VALUE, StatsType.MAX_VALUE, StatsType.NDV);
|
||||
SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
|
||||
tasks.add(sqlTask);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
for (String columnName : columnNameList) {
|
||||
StatsCategoryDesc columnCategory = getColStatsCategoryDesc(dbId, tblId, columnName);
|
||||
StatsGranularityDesc columnGranularity = getTblStatsGranularityDesc(tblId);
|
||||
List<StatsType> statsTypes = Arrays.asList(StatsType.MIN_VALUE, StatsType.MAX_VALUE, StatsType.NDV);
|
||||
SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
|
||||
tasks.add(sqlTask);
|
||||
}
|
||||
}
|
||||
|
||||
// step 4: generate num_nulls task
|
||||
for (String columnName : columnNameList) {
|
||||
StatsCategoryDesc columnCategory = getColStatsCategoryDesc(dbId, tblId, columnName);
|
||||
StatsGranularityDesc columnGranularity = getTblStatsGranularityDesc(tblId);
|
||||
SQLStatisticsTask sqlTask = new SQLStatisticsTask(jobId,
|
||||
columnGranularity, columnCategory, Collections.singletonList(StatsType.NUM_NULLS));
|
||||
tasks.add(sqlTask);
|
||||
}
|
||||
|
||||
// step 5: generate [max_col_lens, avg_col_lens] task
|
||||
for (String columnName : columnNameList) {
|
||||
StatsCategoryDesc columnCategory = getColStatsCategoryDesc(dbId, tblId, columnName);
|
||||
StatsGranularityDesc columnGranularity = getTblStatsGranularityDesc(tblId);
|
||||
List<StatsType> statsTypes = Arrays.asList(StatsType.MAX_SIZE, StatsType.AVG_SIZE);
|
||||
Column column = tbl.getColumn(columnName);
|
||||
Type colType = column.getType();
|
||||
if (colType.isStringType()) {
|
||||
SQLStatisticsTask sampleSqlTask = new SampleSQLStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
|
||||
tasks.add(sampleSqlTask);
|
||||
} else {
|
||||
MetaStatisticsTask metaTask = new MetaStatisticsTask(jobId, columnGranularity, columnCategory, statsTypes);
|
||||
tasks.add(metaTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private StatsCategoryDesc getTblStatsCategoryDesc(long dbId, long tableId) {
|
||||
StatsCategoryDesc statsCategoryDesc = new StatsCategoryDesc();
|
||||
statsCategoryDesc.setCategory(StatsCategoryDesc.StatsCategory.TABLE);
|
||||
statsCategoryDesc.setDbId(dbId);
|
||||
statsCategoryDesc.setTableId(tableId);
|
||||
return statsCategoryDesc;
|
||||
}
|
||||
|
||||
private StatsCategoryDesc getColStatsCategoryDesc(long dbId, long tableId, String columnName) {
|
||||
StatsCategoryDesc statsCategoryDesc = new StatsCategoryDesc();
|
||||
statsCategoryDesc.setDbId(dbId);
|
||||
statsCategoryDesc.setTableId(tableId);
|
||||
statsCategoryDesc.setCategory(StatsCategoryDesc.StatsCategory.COLUMN);
|
||||
statsCategoryDesc.setColumnName(columnName);
|
||||
return statsCategoryDesc;
|
||||
}
|
||||
|
||||
private StatsGranularityDesc getTblStatsGranularityDesc(long tableId) {
|
||||
StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
|
||||
statsGranularityDesc.setTableId(tableId);
|
||||
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.TABLE);
|
||||
return statsGranularityDesc;
|
||||
}
|
||||
|
||||
private StatsGranularityDesc getPartitionStatsGranularityDesc(long tableId, long partitionId) {
|
||||
StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
|
||||
statsGranularityDesc.setTableId(tableId);
|
||||
statsGranularityDesc.setPartitionId(partitionId);
|
||||
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.PARTITION);
|
||||
return statsGranularityDesc;
|
||||
}
|
||||
|
||||
private StatsGranularityDesc getTabletStatsGranularityDesc(long tableId) {
|
||||
StatsGranularityDesc statsGranularityDesc = new StatsGranularityDesc();
|
||||
statsGranularityDesc.setTableId(tableId);
|
||||
statsGranularityDesc.setGranularity(StatsGranularityDesc.StatsGranularity.PARTITION);
|
||||
return statsGranularityDesc;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -19,12 +19,17 @@ package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* The StatisticsTask belongs to one StatisticsJob.
|
||||
* A job may be split into multiple tasks but a task can only belong to one job.
|
||||
*
|
||||
* @granularityDesc, @categoryDesc, @statsTypeList
|
||||
* These three attributes indicate which statistics this task is responsible for collecting.
|
||||
* In general, a task will collect more than one @StatsType at the same time
|
||||
@ -32,24 +37,147 @@ import java.util.concurrent.Callable;
|
||||
* For example: the task is responsible for collecting min, max, ndv of t1.c1 in partition p1.
|
||||
* @granularityDesc: StatsGranularity=partition
|
||||
*/
|
||||
public class StatisticsTask implements Callable<StatisticsTaskResult> {
|
||||
protected long id = Catalog.getCurrentCatalog().getNextId();;
|
||||
public abstract class StatisticsTask implements Callable<StatisticsTaskResult> {
|
||||
protected static final Logger LOG = LogManager.getLogger(StatisticsTask.class);
|
||||
|
||||
public enum TaskState {
|
||||
PENDING,
|
||||
RUNNING,
|
||||
FINISHED,
|
||||
FAILED
|
||||
}
|
||||
|
||||
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||
|
||||
protected long id = Catalog.getCurrentCatalog().getNextId();
|
||||
protected long jobId;
|
||||
protected StatsGranularityDesc granularityDesc;
|
||||
protected StatsCategoryDesc categoryDesc;
|
||||
protected List<StatsType> statsTypeList;
|
||||
protected TaskState taskState = TaskState.PENDING;
|
||||
|
||||
public StatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
|
||||
StatsCategoryDesc categoryDesc, List<StatsType> statsTypeList) {
|
||||
protected final long createTime = System.currentTimeMillis();
|
||||
protected long startTime = -1L;
|
||||
protected long finishTime = -1L;
|
||||
|
||||
public StatisticsTask(long jobId,
|
||||
StatsGranularityDesc granularityDesc,
|
||||
StatsCategoryDesc categoryDesc,
|
||||
List<StatsType> statsTypeList) {
|
||||
this.jobId = jobId;
|
||||
this.granularityDesc = granularityDesc;
|
||||
this.categoryDesc = categoryDesc;
|
||||
this.statsTypeList = statsTypeList;
|
||||
}
|
||||
|
||||
public void readLock() {
|
||||
lock.readLock().lock();
|
||||
}
|
||||
|
||||
public void readUnlock() {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
protected void writeLock() {
|
||||
lock.writeLock().lock();
|
||||
}
|
||||
|
||||
protected void writeUnlock() {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
public void setId(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public long getJobId() {
|
||||
return this.jobId;
|
||||
}
|
||||
|
||||
public StatsGranularityDesc getGranularityDesc() {
|
||||
return this.granularityDesc;
|
||||
}
|
||||
|
||||
public StatsCategoryDesc getCategoryDesc() {
|
||||
return this.categoryDesc;
|
||||
}
|
||||
|
||||
public List<StatsType> getStatsTypeList() {
|
||||
return this.statsTypeList;
|
||||
}
|
||||
|
||||
public TaskState getTaskState() {
|
||||
return this.taskState;
|
||||
}
|
||||
|
||||
public long getCreateTime() {
|
||||
return this.createTime;
|
||||
}
|
||||
|
||||
public long getStartTime() {
|
||||
return this.startTime;
|
||||
}
|
||||
|
||||
public long getFinishTime() {
|
||||
return this.finishTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Different statistics implement different collection methods.
|
||||
*
|
||||
* @return true if this task is finished, false otherwise
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public StatisticsTaskResult call() throws Exception {
|
||||
// TODO
|
||||
return null;
|
||||
public abstract StatisticsTaskResult call() throws Exception;
|
||||
|
||||
public void updateTaskState(TaskState newState) throws IllegalStateException{
|
||||
LOG.info("To change statistics task(id={}) state from {} to {}", id, taskState, newState);
|
||||
writeLock();
|
||||
|
||||
try {
|
||||
// PENDING -> RUNNING/FAILED
|
||||
if (taskState == TaskState.PENDING) {
|
||||
if (newState == TaskState.RUNNING) {
|
||||
taskState = newState;
|
||||
// task start running, set start time
|
||||
startTime = System.currentTimeMillis();
|
||||
LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
|
||||
} else if (newState == TaskState.FAILED) {
|
||||
taskState = newState;
|
||||
LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
|
||||
} else {
|
||||
LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
|
||||
throw new IllegalStateException("Invalid task state transition from PENDING to " + newState);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// RUNNING -> FINISHED/FAILED
|
||||
if (taskState == TaskState.RUNNING) {
|
||||
if (newState == TaskState.FINISHED) {
|
||||
// set finish time
|
||||
finishTime = System.currentTimeMillis();
|
||||
taskState = newState;
|
||||
LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
|
||||
} else if (newState == TaskState.FAILED) {
|
||||
taskState = newState;
|
||||
LOG.info("Statistics task(id={}) state changed from {} to {}", id, taskState, newState);
|
||||
} else {
|
||||
LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
|
||||
throw new IllegalStateException("Invalid task state transition from RUNNING to " + newState);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Invalid task(id={}) state transition from {} to {}", id, taskState, newState);
|
||||
throw new IllegalStateException("Invalid task state transition from " + taskState + " to " + newState);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
LOG.info("Statistics task(id={}) current state is {}", id, taskState);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.statistics;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Queues;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -31,15 +32,13 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import com.clearspring.analytics.util.Lists;
|
||||
|
||||
/*
|
||||
Schedule statistics task
|
||||
*/
|
||||
public class StatisticsTaskScheduler extends MasterDaemon {
|
||||
private final static Logger LOG = LogManager.getLogger(StatisticsTaskScheduler.class);
|
||||
|
||||
private Queue<StatisticsTask> queue = Queues.newLinkedBlockingQueue();
|
||||
private final Queue<StatisticsTask> queue = Queues.newLinkedBlockingQueue();
|
||||
|
||||
public StatisticsTaskScheduler() {
|
||||
super("Statistics task scheduler", 0);
|
||||
|
||||
@ -24,8 +24,40 @@ public class StatsCategoryDesc {
|
||||
}
|
||||
|
||||
private StatsCategory category;
|
||||
private long dbId;
|
||||
private long tableId;
|
||||
private String columnName;
|
||||
|
||||
public StatsCategory getCategory() {
|
||||
return this.category;
|
||||
}
|
||||
|
||||
public void setCategory(StatsCategory category) {
|
||||
this.category = category;
|
||||
}
|
||||
|
||||
public long getDbId() {
|
||||
return this.dbId;
|
||||
}
|
||||
|
||||
public void setDbId(long dbId) {
|
||||
this.dbId = dbId;
|
||||
}
|
||||
|
||||
public long getTableId() {
|
||||
return this.tableId;
|
||||
}
|
||||
|
||||
public void setTableId(long tableId) {
|
||||
this.tableId = tableId;
|
||||
}
|
||||
|
||||
public String getColumnName() {
|
||||
return this.columnName;
|
||||
}
|
||||
|
||||
public void setColumnName(String columnName) {
|
||||
this.columnName = columnName;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -29,4 +29,36 @@ public class StatsGranularityDesc {
|
||||
private long partitionId;
|
||||
private long tabletId;
|
||||
|
||||
public StatsGranularity getGranularity() {
|
||||
return this.granularity;
|
||||
}
|
||||
|
||||
public void setGranularity(StatsGranularity granularity) {
|
||||
this.granularity = granularity;
|
||||
}
|
||||
|
||||
public long getTableId() {
|
||||
return this.tableId;
|
||||
}
|
||||
|
||||
public void setTableId(long tableId) {
|
||||
this.tableId = tableId;
|
||||
}
|
||||
|
||||
public long getPartitionId() {
|
||||
return this.partitionId;
|
||||
}
|
||||
|
||||
public void setPartitionId(long partitionId) {
|
||||
this.partitionId = partitionId;
|
||||
}
|
||||
|
||||
public long getTabletId() {
|
||||
return this.tabletId;
|
||||
}
|
||||
|
||||
public void setTabletId(long tabletId) {
|
||||
this.tabletId = tabletId;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user