[feature](stats) Support to kill analyze #18901

1. Report error if submit analyze jobs when stats table is not available
2. Support kill analyze
3. Support cancel sync analyze
This commit is contained in:
AKIRA
2023-04-26 15:23:44 +09:00
committed by GitHub
parent 50d9f35f63
commit d3a0b94602
16 changed files with 239 additions and 74 deletions

View File

@ -342,7 +342,7 @@ public class AnalyzeStmt extends DdlStmt {
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("ANALYZE");
sb.append("ANALYZE TABLE ");
if (tableName != null) {
sb.append(" ");

View File

@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
public class KillAnalysisJobStmt extends DdlStmt {
public final long jobId;
public KillAnalysisJobStmt(long jobId) {
this.jobId = jobId;
}
}

View File

@ -92,6 +92,7 @@ import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.DropUserStmt;
import org.apache.doris.analysis.GrantStmt;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.analysis.KillAnalysisJobStmt;
import org.apache.doris.analysis.LinkDbStmt;
import org.apache.doris.analysis.MigrateDbStmt;
import org.apache.doris.analysis.PauseRoutineLoadStmt;
@ -332,6 +333,8 @@ public class DdlExecutor {
ProfileManager.getInstance().cleanProfile();
} else if (ddlStmt instanceof DropStatsStmt) {
env.getAnalysisManager().dropStats((DropStatsStmt) ddlStmt);
} else if (ddlStmt instanceof KillAnalysisJobStmt) {
env.getAnalysisManager().handleKillAnalyzeStmt((KillAnalysisJobStmt) ddlStmt);
} else {
throw new DdlException("Unknown statement.");
}

View File

@ -1109,6 +1109,9 @@ public class StmtExecutor implements ProfileWriter {
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
}
if (parsedStmt instanceof AnalyzeStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
}
// Handle kill statement.

View File

@ -19,6 +19,7 @@ package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.DropStatsStmt;
import org.apache.doris.analysis.KillAnalysisJobStmt;
import org.apache.doris.analysis.ShowAnalyzeStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
@ -30,6 +31,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
@ -66,7 +68,8 @@ public class AnalysisManager {
private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " "
+ "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId} and task_id=${taskId}";
+ "SET state = '${jobState}' ${message} ${updateExecTime} "
+ "WHERE job_id = ${jobId} and (task_id=${taskId} || ${isAllTask})";
private static final String SHOW_JOB_STATE_SQL_TEMPLATE = "SELECT "
+ "job_id, catalog_name, db_name, tbl_name, col_name, job_type, "
@ -76,12 +79,14 @@ public class AnalysisManager {
// The time field that needs to be displayed
private static final String LAST_EXEC_TIME_IN_MS = "last_exec_time_in_ms";
private final ConcurrentMap<Long, Map<Long, AnalysisTaskInfo>> analysisJobIdToTaskMap;
private final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap;
private StatisticsCache statisticsCache;
private final AnalysisTaskExecutor taskExecutor;
private ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();
public AnalysisManager() {
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
this.taskScheduler = new AnalysisTaskScheduler();
@ -96,10 +101,12 @@ public class AnalysisManager {
// Each analyze stmt corresponding to an analysis job.
public void createAnalysisJob(AnalyzeStmt stmt) throws DdlException {
if (!StatisticsUtil.statsTblAvailable() && !FeConstants.runningUnitTest) {
throw new DdlException("Stats table not available, please make sure your cluster status is normal");
}
long jobId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfoBuilder taskInfoBuilder = buildCommonTaskInfo(stmt, jobId);
Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>();
// start build analysis tasks
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(stmt.getColumnNames(), taskInfoBuilder, analysisTaskInfos, stmt.isSync());
createTaskForMVIdx(stmt.getTable(), taskInfoBuilder, analysisTaskInfos, stmt.getAnalysisType(), stmt.isSync());
@ -182,7 +189,7 @@ public class AnalysisManager {
}
private void createTaskForMVIdx(TableIf table, AnalysisTaskInfoBuilder taskInfoBuilder,
Map<Long, AnalysisTaskInfo> analysisTaskInfos, AnalysisType analysisType,
Map<Long, BaseAnalysisTask> analysisTasks, AnalysisType analysisType,
boolean isSync) throws DdlException {
TableType type = table.getType();
if (analysisType != AnalysisType.INDEX || !type.equals(TableType.OLAP)) {
@ -204,6 +211,7 @@ public class AnalysisManager {
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = indexTaskInfoBuilder.setIndexId(indexId)
.setTaskId(taskId).build();
analysisTasks.put(taskId, createTask(analysisTaskInfo));
if (isSync) {
return;
}
@ -219,14 +227,14 @@ public class AnalysisManager {
}
private void createTaskForEachColumns(Set<String> colNames, AnalysisTaskInfoBuilder taskInfoBuilder,
Map<Long, AnalysisTaskInfo> analysisTaskInfos, boolean isSync) throws DdlException {
Map<Long, BaseAnalysisTask> analysisTasks, boolean isSync) throws DdlException {
for (String colName : colNames) {
AnalysisTaskInfoBuilder colTaskInfoBuilder = taskInfoBuilder.copy();
long indexId = -1;
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setColName(colName)
.setIndexId(indexId).setTaskId(taskId).build();
analysisTaskInfos.put(taskId, analysisTaskInfo);
analysisTasks.put(taskId, createTask(analysisTaskInfo));
if (isSync) {
continue;
}
@ -249,6 +257,7 @@ public class AnalysisManager {
params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time);
params.put("jobId", String.valueOf(info.jobId));
params.put("taskId", String.valueOf(info.taskId));
params.put("isAllTask", "false");
try {
StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
} catch (Exception e) {
@ -256,8 +265,8 @@ public class AnalysisManager {
} finally {
info.state = jobState;
if (analysisJobIdToTaskMap.get(info.jobId).values()
.stream().allMatch(i -> i.state != null
&& i.state != AnalysisState.PENDING && i.state != AnalysisState.RUNNING)) {
.stream().allMatch(t -> t.info.state != null
&& t.info.state != AnalysisState.PENDING && t.info.state != AnalysisState.RUNNING)) {
analysisJobIdToTaskMap.remove(info.jobId);
params.put("taskId", String.valueOf(-1));
try {
@ -298,21 +307,14 @@ public class AnalysisManager {
return results;
}
private void syncExecute(Collection<AnalysisTaskInfo> taskInfos) {
List<String> colNames = new ArrayList<>();
for (AnalysisTaskInfo info : taskInfos) {
try {
TableIf table = StatisticsUtil.findTable(info.catalogName,
info.dbName, info.tblName);
BaseAnalysisTask analysisTask = table.createAnalysisTask(info);
analysisTask.execute();
} catch (Throwable t) {
colNames.add(info.colName);
LOG.info("Failed to analyze, info: {}", info);
}
}
if (!colNames.isEmpty()) {
throw new RuntimeException("Failed to analyze following columns: " + String.join(",", colNames));
private void syncExecute(Collection<BaseAnalysisTask> tasks) {
SyncTaskCollection syncTaskCollection = new SyncTaskCollection(tasks);
ConnectContext ctx = ConnectContext.get();
try {
ctxToSyncTask.put(ctx, syncTaskCollection);
syncTaskCollection.execute();
} finally {
ctxToSyncTask.remove(ctx);
}
}
@ -329,4 +331,91 @@ public class AnalysisManager {
}
}
public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) throws DdlException {
Map<Long, BaseAnalysisTask> analysisTaskInfoMap = analysisJobIdToTaskMap.remove(killAnalysisJobStmt.jobId);
if (analysisTaskInfoMap == null) {
throw new DdlException("Job not exists or already finished");
}
BaseAnalysisTask anyTask = analysisTaskInfoMap.values().stream().findFirst().orElse(null);
if (anyTask == null) {
return;
}
checkPriv(anyTask);
for (BaseAnalysisTask taskInfo : analysisTaskInfoMap.values()) {
taskInfo.markAsKilled();
}
Map<String, String> params = new HashMap<>();
params.put("jobState", AnalysisState.FAILED.toString());
params.put("message", ", message = 'Killed by user : " + ConnectContext.get().getQualifiedUser() + "'");
params.put("updateExecTime", ", last_exec_time_in_ms=" + String.valueOf(System.currentTimeMillis()));
params.put("jobId", String.valueOf(killAnalysisJobStmt.jobId));
params.put("taskId", "'-1'");
params.put("isAllTask", "true");
try {
StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
} catch (Exception e) {
LOG.warn("Failed to update status", e);
}
}
private void checkPriv(BaseAnalysisTask analysisTask) {
String dbName = analysisTask.db.getFullName();
String tblName = analysisTask.tbl.getName();
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbName, tblName, PrivPredicate.SELECT)) {
throw new RuntimeException("You need at least SELECT PRIV to corresponding table to kill this analyze"
+ " job");
}
}
public void cancelSyncTask(ConnectContext connectContext) {
SyncTaskCollection syncTaskCollection = ctxToSyncTask.get(connectContext);
if (syncTaskCollection != null) {
syncTaskCollection.cancel();
}
}
private BaseAnalysisTask createTask(AnalysisTaskInfo analysisTaskInfo) throws DdlException {
try {
TableIf table = StatisticsUtil.findTable(analysisTaskInfo.catalogName,
analysisTaskInfo.dbName, analysisTaskInfo.tblName);
return table.createAnalysisTask(analysisTaskInfo);
} catch (Throwable t) {
LOG.warn("Failed to find table", t);
throw new DdlException("Error when trying to find table", t);
}
}
private static class SyncTaskCollection {
public volatile boolean cancelled;
public final Collection<BaseAnalysisTask> tasks;
public SyncTaskCollection(Collection<BaseAnalysisTask> tasks) {
this.tasks = tasks;
}
public void cancel() {
cancelled = true;
tasks.forEach(BaseAnalysisTask::markAsKilled);
}
public void execute() {
List<String> colNames = new ArrayList<>();
for (BaseAnalysisTask task : tasks) {
if (cancelled) {
continue;
}
try {
task.execute();
} catch (Throwable t) {
colNames.add(task.info.colName);
LOG.info("Failed to analyze, info: {}", task);
}
}
if (!colNames.isEmpty()) {
throw new RuntimeException("Failed to analyze following columns: " + String.join(",", colNames));
}
}
}
}

View File

@ -103,21 +103,12 @@ public class AnalysisTaskExecutor extends Thread {
private void doFetchAndExecute() {
BaseAnalysisTask task = taskScheduler.getPendingTasks();
AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task);
incr();
executors.submit(taskWrapper);
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.RUNNING, "", System.currentTimeMillis());
}
public void decr() {
blockingCounter.decr();
}
public void incr() {
blockingCounter.incr();
}
public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
taskQueue.put(wrapper);
}

View File

@ -18,8 +18,6 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -44,12 +42,10 @@ public class AnalysisTaskScheduler {
private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>();
public synchronized void schedule(AnalysisTaskInfo analysisTaskInfo) {
public synchronized void schedule(BaseAnalysisTask analysisTask) {
try {
TableIf table = StatisticsUtil.findTable(analysisTaskInfo.catalogName,
analysisTaskInfo.dbName, analysisTaskInfo.tblName);
BaseAnalysisTask analysisTask = table.createAnalysisTask(analysisTaskInfo);
switch (analysisTaskInfo.jobType) {
switch (analysisTask.info.jobType) {
case MANUAL:
addToManualJobQueue(analysisTask);
break;
@ -57,11 +53,11 @@ public class AnalysisTaskScheduler {
addToSystemQueue(analysisTask);
break;
default:
throw new IllegalArgumentException("Unknown job type: " + analysisTaskInfo.jobType);
throw new IllegalArgumentException("Unknown job type: " + analysisTask.info.jobType);
}
} catch (Throwable t) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
analysisTaskInfo, AnalysisState.FAILED, t.getMessage(), System.currentTimeMillis());
analysisTask.info, AnalysisState.FAILED, t.getMessage(), System.currentTimeMillis());
}
}

View File

@ -48,6 +48,9 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
startTime = System.currentTimeMillis();
Throwable except = null;
try {
if (task.killed) {
return;
}
executor.putJob(this);
super.run();
Object result = get();
@ -57,18 +60,19 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
} catch (Exception e) {
except = e;
} finally {
executor.decr();
if (except != null) {
LOG.warn("Failed to execute task", except);
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.FAILED, except.getMessage(), -1);
} else {
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.FINISHED, "", System.currentTimeMillis());
if (!task.killed) {
if (except != null) {
LOG.warn("Failed to execute task", except);
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.FAILED, except.getMessage(), -1);
} else {
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.FINISHED, "", System.currentTimeMillis());
}
LOG.warn("{} finished, cost time:{}", task.toString(), System.currentTimeMillis() - startTime);
}
LOG.warn("{} finished, cost time:{}", task.toString(), System.currentTimeMillis() - startTime);
}
}
@ -78,8 +82,6 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
task.cancel();
} catch (Exception e) {
LOG.warn(String.format("Cancel job failed job info : %s", task.toString()));
} finally {
executor.decr();
}
return super.cancel(false);
}

View File

@ -99,10 +99,10 @@ public abstract class BaseAnalysisTask {
protected StmtExecutor stmtExecutor;
protected AnalysisState analysisState;
protected Set<PrimitiveType> unsupportedType = new HashSet<>();
protected volatile boolean killed;
@VisibleForTesting
public BaseAnalysisTask() {
@ -162,6 +162,9 @@ public abstract class BaseAnalysisTask {
if (stmtExecutor != null) {
stmtExecutor.cancel();
}
if (killed) {
return;
}
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.FAILED,
String.format("Job has been cancelled: %s", info.toString()), -1);
@ -175,10 +178,6 @@ public abstract class BaseAnalysisTask {
return info.jobId;
}
public AnalysisState getAnalysisState() {
return analysisState;
}
protected String getDataSizeFunction(Column column) {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`${colName}`))";
@ -201,4 +200,9 @@ public abstract class BaseAnalysisTask {
return String.format("TABLESAMPLE(%d ROWS)", info.sampleRows);
}
}
public void markAsKilled() {
this.killed = true;
cancel();
}
}

View File

@ -68,4 +68,5 @@ public class StatisticConstants {
public static final int FETCH_INTERVAL_IN_MS = 500;
public static final int HISTOGRAM_MAX_BUCKET_NUM = 128;
}

View File

@ -33,6 +33,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
@ -41,6 +42,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.qe.AutoCloseConnectContext;
@ -229,6 +231,7 @@ public class StatisticsUtil {
return dateTimeLiteral.getDouble();
case CHAR:
case VARCHAR:
case STRING:
VarcharLiteral varchar = new VarcharLiteral(columnValue);
return varchar.getDouble();
case HLL:
@ -311,4 +314,35 @@ public class StatisticsUtil {
.map(s -> "null".equalsIgnoreCase(s) || s.isEmpty())
.orElse(true);
}
public static boolean statsTblAvailable() {
String dbName = SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME;
List<OlapTable> statsTbls = new ArrayList<>();
try {
statsTbls.add(
(OlapTable) StatisticsUtil
.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
dbName,
StatisticConstants.STATISTIC_TBL_NAME));
statsTbls.add(
(OlapTable) StatisticsUtil
.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
dbName,
StatisticConstants.HISTOGRAM_TBL_NAME));
statsTbls.add((OlapTable) StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
dbName,
StatisticConstants.ANALYSIS_JOB_TABLE));
} catch (Throwable t) {
return false;
}
for (OlapTable table : statsTbls) {
for (Partition partition : table.getPartitions()) {
if (partition.getBaseIndex().getTablets().stream()
.anyMatch(t -> t.getNormalReplicaBackendIds().isEmpty())) {
return false;
}
}
}
return true;
}
}