[feat](stats) Return job id for async analyze stmt (#18800)

1. Return job id from async analysis
2. Sync analysis jobs don't save to analysis_jobs anymore
This commit is contained in:
AKIRA
2023-04-25 15:43:54 +09:00
committed by GitHub
parent 339d804ec4
commit a4a85f2476
9 changed files with 240 additions and 72 deletions

View File

@ -21,7 +21,10 @@ public class AutoCloseConnectContext implements AutoCloseable {
public final ConnectContext connectContext;
private final ConnectContext previousContext;
public AutoCloseConnectContext(ConnectContext connectContext) {
this.previousContext = ConnectContext.get();
this.connectContext = connectContext;
connectContext.setThreadLocalInfo();
}
@ -29,5 +32,8 @@ public class AutoCloseConnectContext implements AutoCloseable {
@Override
public void close() {
ConnectContext.remove();
if (previousContext != null) {
previousContext.setThreadLocalInfo();
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.qe;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ArrayLiteral;
import org.apache.doris.analysis.CreateTableAsSelectStmt;
@ -2067,7 +2068,9 @@ public class StmtExecutor implements ProfileWriter {
private void handleDdlStmt() {
try {
DdlExecutor.execute(context.getEnv(), (DdlStmt) parsedStmt);
context.getState().setOk();
if (!(parsedStmt instanceof AnalyzeStmt)) {
context.getState().setOk();
}
} catch (QueryStateException e) {
LOG.warn("", e);
context.setState(e.getQueryState());

View File

@ -21,13 +21,18 @@ import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.DropStatsStmt;
import org.apache.doris.analysis.ShowAnalyzeStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
@ -94,19 +99,35 @@ public class AnalysisManager {
long jobId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfoBuilder taskInfoBuilder = buildCommonTaskInfo(stmt, jobId);
Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>();
// start build analysis tasks
createTaskForEachColumns(stmt.getColumnNames(), taskInfoBuilder, analysisTaskInfos);
createTaskForMVIdx(stmt.getTable(), taskInfoBuilder, analysisTaskInfos, stmt.getAnalysisType());
persistAnalysisJob(taskInfoBuilder);
createTaskForEachColumns(stmt.getColumnNames(), taskInfoBuilder, analysisTaskInfos, stmt.isSync());
createTaskForMVIdx(stmt.getTable(), taskInfoBuilder, analysisTaskInfos, stmt.getAnalysisType(), stmt.isSync());
if (stmt.isSync()) {
syncExecute(analysisTaskInfos.values());
return;
}
persistAnalysisJob(taskInfoBuilder);
analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
analysisTaskInfos.values().forEach(taskScheduler::schedule);
sendJobId(jobId);
}
private void sendJobId(long jobId) {
List<Column> columns = new ArrayList<>();
columns.add(new Column("Job_Id", ScalarType.createVarchar(19)));
ShowResultSetMetaData commonResultSetMetaData = new ShowResultSetMetaData(columns);
List<List<String>> resultRows = new ArrayList<>();
List<String> row = new ArrayList<>();
row.add(String.valueOf(jobId));
resultRows.add(row);
ShowResultSet commonResultSet = new ShowResultSet(commonResultSetMetaData, resultRows);
try {
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
} catch (Throwable t) {
LOG.warn("Failed to send job id to user", t);
}
}
private AnalysisTaskInfoBuilder buildCommonTaskInfo(AnalyzeStmt stmt, long jobId) {
@ -161,7 +182,8 @@ public class AnalysisManager {
}
private void createTaskForMVIdx(TableIf table, AnalysisTaskInfoBuilder taskInfoBuilder,
Map<Long, AnalysisTaskInfo> analysisTaskInfos, AnalysisType analysisType) throws DdlException {
Map<Long, AnalysisTaskInfo> analysisTaskInfos, AnalysisType analysisType,
boolean isSync) throws DdlException {
TableType type = table.getType();
if (analysisType != AnalysisType.INDEX || !type.equals(TableType.OLAP)) {
// not need to collect statistics for materialized view
@ -182,12 +204,14 @@ public class AnalysisManager {
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = indexTaskInfoBuilder.setIndexId(indexId)
.setTaskId(taskId).build();
if (isSync) {
return;
}
try {
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
}
analysisTaskInfos.put(taskId, analysisTaskInfo);
}
} finally {
olapTable.readUnlock();
@ -195,23 +219,30 @@ public class AnalysisManager {
}
private void createTaskForEachColumns(Set<String> colNames, AnalysisTaskInfoBuilder taskInfoBuilder,
Map<Long, AnalysisTaskInfo> analysisTaskInfos) throws DdlException {
Map<Long, AnalysisTaskInfo> analysisTaskInfos, boolean isSync) throws DdlException {
for (String colName : colNames) {
AnalysisTaskInfoBuilder colTaskInfoBuilder = taskInfoBuilder.copy();
long indexId = -1;
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setColName(colName)
.setIndexId(indexId).setTaskId(taskId).build();
analysisTaskInfos.put(taskId, analysisTaskInfo);
if (isSync) {
continue;
}
try {
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
}
analysisTaskInfos.put(taskId, analysisTaskInfo);
}
}
public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) {
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
return;
}
Map<String, String> params = new HashMap<>();
params.put("jobState", jobState.toString());
params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : "");
@ -235,7 +266,6 @@ public class AnalysisManager {
LOG.warn(String.format("Failed to update state for job: %s", info.jobId), e);
}
}
}
}

View File

@ -48,7 +48,6 @@ public class StatisticConstants {
*/
public static final int STATISTIC_CLEAN_INTERVAL_IN_HOURS = 24 * 2;
/**
* The max cached item in `StatisticsCache`.
*/

View File

@ -36,6 +36,7 @@ import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -55,6 +56,8 @@ public class StatisticsCleaner extends MasterDaemon {
private OlapTable colStatsTbl;
private OlapTable histStatsTbl;
private OlapTable jobTbl;
private Map<Long, CatalogIf> idToCatalog;
/* Internal DB only */
@ -82,30 +85,48 @@ public class StatisticsCleaner extends MasterDaemon {
if (!init()) {
return;
}
clear(colStatsTbl);
clear(histStatsTbl);
clearStats(colStatsTbl);
clearStats(histStatsTbl);
clearJobTbl();
}
private void clear(OlapTable statsTbl) {
private void clearStats(OlapTable statsTbl) {
ExpiredStats expiredStats = null;
long offset = 0;
do {
expiredStats = findExpiredStats(statsTbl);
deleteExpiredStats(expiredStats);
expiredStats = new ExpiredStats();
offset = findExpiredStats(statsTbl, expiredStats, offset);
deleteExpiredStats(expiredStats, statsTbl.getName());
} while (!expiredStats.isEmpty());
}
private void clearJobTbl() {
List<String> jobIds = null;
long offset = 0;
do {
jobIds = new ArrayList<>();
offset = findExpiredJobs(jobIds, offset);
doDelete("job_id", jobIds, FeConstants.INTERNAL_DB_NAME + "."
+ StatisticConstants.ANALYSIS_JOB_TABLE);
} while (!jobIds.isEmpty());
}
private boolean init() {
try {
String dbName = SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME;
colStatsTbl =
(OlapTable) StatisticsUtil
.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME,
dbName,
StatisticConstants.STATISTIC_TBL_NAME);
histStatsTbl =
(OlapTable) StatisticsUtil
.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME,
dbName,
StatisticConstants.HISTOGRAM_TBL_NAME);
jobTbl = (OlapTable) StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
dbName,
StatisticConstants.ANALYSIS_JOB_TABLE);
} catch (Throwable t) {
LOG.warn("Failed to init stats cleaner", t);
return false;
@ -141,22 +162,26 @@ public class StatisticsCleaner extends MasterDaemon {
return idToMVIdx;
}
private void deleteExpiredStats(ExpiredStats expiredStats) {
private void deleteExpiredStats(ExpiredStats expiredStats, String tblName) {
doDelete("catalog_id", expiredStats.expiredCatalog.stream()
.map(String::valueOf).collect(Collectors.toList()));
.map(String::valueOf).collect(Collectors.toList()),
FeConstants.INTERNAL_DB_NAME + "." + tblName);
doDelete("db_id", expiredStats.expiredDatabase.stream()
.map(String::valueOf).collect(Collectors.toList()));
.map(String::valueOf).collect(Collectors.toList()),
FeConstants.INTERNAL_DB_NAME + "." + tblName);
doDelete("tbl_id", expiredStats.expiredTable.stream()
.map(String::valueOf).collect(Collectors.toList()));
.map(String::valueOf).collect(Collectors.toList()),
FeConstants.INTERNAL_DB_NAME + "." + tblName);
doDelete("idx_id", expiredStats.expiredIdxId.stream()
.map(String::valueOf).collect(Collectors.toList()));
.map(String::valueOf).collect(Collectors.toList()),
FeConstants.INTERNAL_DB_NAME + "." + tblName);
doDelete("id", expiredStats.ids.stream()
.map(String::valueOf).collect(Collectors.toList()));
.map(String::valueOf).collect(Collectors.toList()),
FeConstants.INTERNAL_DB_NAME + "." + tblName);
}
private void doDelete(String/*col name*/ colName, List<String> pred) {
String deleteTemplate = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE ${left} IN (${right})";
private void doDelete(String/*col name*/ colName, List<String> pred, String tblName) {
String deleteTemplate = "DELETE FROM " + tblName + " WHERE ${left} IN (${right})";
if (CollectionUtils.isEmpty(pred)) {
return;
}
@ -172,9 +197,8 @@ public class StatisticsCleaner extends MasterDaemon {
}
}
public ExpiredStats findExpiredStats(OlapTable statsTbl) {
ExpiredStats expiredStats = new ExpiredStats();
long pos = 0;
private long findExpiredStats(OlapTable statsTbl, ExpiredStats expiredStats, long offset) {
long pos = offset;
while (pos < statsTbl.getRowCount()
&& !expiredStats.isFull()) {
List<ResultRow> rows = StatisticsRepository.fetchStatsFullName(StatisticConstants.FETCH_LIMIT, pos);
@ -226,13 +250,26 @@ public class StatisticsCleaner extends MasterDaemon {
LOG.warn("Error occurred when retrieving expired stats", e);
}
}
try {
Thread.sleep(StatisticConstants.FETCH_INTERVAL_IN_MS);
} catch (InterruptedException t) {
// IGNORE
}
this.yieldForOtherTask();
}
return expiredStats;
return pos;
}
private long findExpiredJobs(List<String> jobIds, long offset) {
long pos = offset;
while (pos < jobTbl.getRowCount() && jobIds.size() < Config.max_allowed_in_element_num_of_delete) {
List<ResultRow> rows = StatisticsRepository.fetchExpiredJobs(StatisticConstants.FETCH_LIMIT, pos);
for (ResultRow r : rows) {
try {
jobIds.add(r.getColumnValue("job_id"));
} catch (Exception e) {
LOG.warn("Error when get job_id from ResultRow", e);
}
}
pos += StatisticConstants.FETCH_LIMIT;
this.yieldForOtherTask();
}
return pos;
}
private static class ExpiredStats {
@ -245,11 +282,11 @@ public class StatisticsCleaner extends MasterDaemon {
Set<String> ids = new HashSet<>();
public boolean isFull() {
return expiredCatalog.size() >= Config.expr_children_limit
|| expiredDatabase.size() >= Config.expr_children_limit
|| expiredTable.size() >= Config.expr_children_limit
|| expiredIdxId.size() >= Config.expr_children_limit
|| ids.size() >= Config.expr_children_limit;
return expiredCatalog.size() >= Config.max_allowed_in_element_num_of_delete
|| expiredDatabase.size() >= Config.max_allowed_in_element_num_of_delete
|| expiredTable.size() >= Config.max_allowed_in_element_num_of_delete
|| expiredIdxId.size() >= Config.max_allowed_in_element_num_of_delete
|| ids.size() >= Config.max_allowed_in_element_num_of_delete;
}
public boolean isEmpty() {
@ -257,7 +294,16 @@ public class StatisticsCleaner extends MasterDaemon {
&& expiredDatabase.isEmpty()
&& expiredTable.isEmpty()
&& expiredIdxId.isEmpty()
&& ids.size() < Config.expr_children_limit / 100;
&& ids.size() < Config.max_allowed_in_element_num_of_delete / 10;
}
}
// Avoid this task takes too much IO.
private void yieldForOtherTask() {
try {
Thread.sleep(StatisticConstants.FETCH_INTERVAL_IN_MS);
} catch (InterruptedException t) {
// IGNORE
}
}

View File

@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -87,6 +88,13 @@ public class StatisticsRepository {
private static final String DROP_TABLE_STATISTICS_TEMPLATE = "DELETE FROM " + FeConstants.INTERNAL_DB_NAME
+ "." + "${tblName}" + " WHERE ${condition}";
private static final String FIND_EXPIRED_JOBS = "SELECT job_id FROM "
+ FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME
+ " WHERE task_id = -1 AND ${now} - last_exec_time_in_ms > "
+ TimeUnit.HOURS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS)
+ " ORDER BY last_exec_time_in_ms"
+ " LIMIT ${limit} OFFSET ${offset}";
private static final String FETCH_RECENT_STATS_UPDATED_COL =
"SELECT * FROM "
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.STATISTIC_TBL_NAME
@ -282,4 +290,12 @@ public class StatisticsRepository {
params.put("offset", String.valueOf(offset));
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(FETCH_STATS_FULL_NAME));
}
public static List<ResultRow> fetchExpiredJobs(long limit, long offset) {
Map<String, String> params = new HashMap<>();
params.put("limit", String.valueOf(limit));
params.put("offset", String.valueOf(offset));
params.put("now", String.valueOf(System.currentTimeMillis()));
return StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(FIND_EXPIRED_JOBS));
}
}

View File

@ -291,6 +291,9 @@ public class StatisticsUtil {
return tableIf.getColumn(columnName);
}
/**
* Throw RuntimeException if table not exists.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static TableIf findTable(String catalogName, String dbName, String tblName) throws Throwable {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()