diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 6bdcd354a0..7787c8fdea 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -653,7 +653,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt, use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt, link_stmt, migrate_stmt, switch_stmt, enter_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt, import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt, - import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, refresh_stmt, clean_stmt, analyze_stmt, show_mtmv_stmt; + import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, refresh_stmt, clean_stmt, analyze_stmt, show_mtmv_stmt, kill_analysis_job_stmt; nonterminal FromClause opt_using_clause; @@ -1083,6 +1083,8 @@ stmt ::= {: RESULT = set; :} | kill_stmt:kill {: RESULT = kill; :} + | kill_analysis_job_stmt: k + {: RESULT = k; :} | describe_stmt:describe {: RESULT = describe; :} | show_stmt:show @@ -4753,6 +4755,13 @@ kill_stmt ::= :} ; +kill_analysis_job_stmt ::= + KW_KILL KW_ANALYZE INTEGER_LITERAL:value + {: + RESULT = new KillAnalysisJobStmt(value.intValue()); + :} + ; + // TODO(zhaochun): stolen from MySQL. Why not use value list, maybe avoid shift/reduce conflict // Set statement set_stmt ::= diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java index f4d9319ec7..468f6b25c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java @@ -342,7 +342,7 @@ public class AnalyzeStmt extends DdlStmt { @Override public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append("ANALYZE"); + sb.append("ANALYZE TABLE "); if (tableName != null) { sb.append(" "); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/KillAnalysisJobStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/KillAnalysisJobStmt.java new file mode 100644 index 0000000000..a950c663d3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/KillAnalysisJobStmt.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +public class KillAnalysisJobStmt extends DdlStmt { + + public final long jobId; + + public KillAnalysisJobStmt(long jobId) { + this.jobId = jobId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 1430a65776..cb12c3fdb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -92,6 +92,7 @@ import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.DropUserStmt; import org.apache.doris.analysis.GrantStmt; import org.apache.doris.analysis.InstallPluginStmt; +import org.apache.doris.analysis.KillAnalysisJobStmt; import org.apache.doris.analysis.LinkDbStmt; import org.apache.doris.analysis.MigrateDbStmt; import org.apache.doris.analysis.PauseRoutineLoadStmt; @@ -332,6 +333,8 @@ public class DdlExecutor { ProfileManager.getInstance().cleanProfile(); } else if (ddlStmt instanceof DropStatsStmt) { env.getAnalysisManager().dropStats((DropStatsStmt) ddlStmt); + } else if (ddlStmt instanceof KillAnalysisJobStmt) { + env.getAnalysisManager().handleKillAnalyzeStmt((KillAnalysisJobStmt) ddlStmt); } else { throw new DdlException("Unknown statement."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f592552746..c70554b1c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1109,6 +1109,9 @@ public class StmtExecutor implements ProfileWriter { if (mysqlLoadId != null) { Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId); } + if (parsedStmt instanceof AnalyzeStmt) { + Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context); + } } // Handle kill statement. diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index f67e330afc..9f63d830b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -19,6 +19,7 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.analysis.DropStatsStmt; +import org.apache.doris.analysis.KillAnalysisJobStmt; import org.apache.doris.analysis.ShowAnalyzeStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; @@ -30,6 +31,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ShowResultSet; import org.apache.doris.qe.ShowResultSetMetaData; @@ -66,7 +68,8 @@ public class AnalysisManager { private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE " + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " " - + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId} and task_id=${taskId}"; + + "SET state = '${jobState}' ${message} ${updateExecTime} " + + "WHERE job_id = ${jobId} and (task_id=${taskId} || ${isAllTask})"; private static final String SHOW_JOB_STATE_SQL_TEMPLATE = "SELECT " + "job_id, catalog_name, db_name, tbl_name, col_name, job_type, " @@ -76,12 +79,14 @@ public class AnalysisManager { // The time field that needs to be displayed private static final String LAST_EXEC_TIME_IN_MS = "last_exec_time_in_ms"; - private final ConcurrentMap> analysisJobIdToTaskMap; + private final ConcurrentMap> analysisJobIdToTaskMap; private StatisticsCache statisticsCache; private final AnalysisTaskExecutor taskExecutor; + private ConcurrentMap ctxToSyncTask = new ConcurrentHashMap<>(); + public AnalysisManager() { analysisJobIdToTaskMap = new ConcurrentHashMap<>(); this.taskScheduler = new AnalysisTaskScheduler(); @@ -96,10 +101,12 @@ public class AnalysisManager { // Each analyze stmt corresponding to an analysis job. public void createAnalysisJob(AnalyzeStmt stmt) throws DdlException { + if (!StatisticsUtil.statsTblAvailable() && !FeConstants.runningUnitTest) { + throw new DdlException("Stats table not available, please make sure your cluster status is normal"); + } long jobId = Env.getCurrentEnv().getNextId(); AnalysisTaskInfoBuilder taskInfoBuilder = buildCommonTaskInfo(stmt, jobId); - Map analysisTaskInfos = new HashMap<>(); - // start build analysis tasks + Map analysisTaskInfos = new HashMap<>(); createTaskForEachColumns(stmt.getColumnNames(), taskInfoBuilder, analysisTaskInfos, stmt.isSync()); createTaskForMVIdx(stmt.getTable(), taskInfoBuilder, analysisTaskInfos, stmt.getAnalysisType(), stmt.isSync()); @@ -182,7 +189,7 @@ public class AnalysisManager { } private void createTaskForMVIdx(TableIf table, AnalysisTaskInfoBuilder taskInfoBuilder, - Map analysisTaskInfos, AnalysisType analysisType, + Map analysisTasks, AnalysisType analysisType, boolean isSync) throws DdlException { TableType type = table.getType(); if (analysisType != AnalysisType.INDEX || !type.equals(TableType.OLAP)) { @@ -204,6 +211,7 @@ public class AnalysisManager { long taskId = Env.getCurrentEnv().getNextId(); AnalysisTaskInfo analysisTaskInfo = indexTaskInfoBuilder.setIndexId(indexId) .setTaskId(taskId).build(); + analysisTasks.put(taskId, createTask(analysisTaskInfo)); if (isSync) { return; } @@ -219,14 +227,14 @@ public class AnalysisManager { } private void createTaskForEachColumns(Set colNames, AnalysisTaskInfoBuilder taskInfoBuilder, - Map analysisTaskInfos, boolean isSync) throws DdlException { + Map analysisTasks, boolean isSync) throws DdlException { for (String colName : colNames) { AnalysisTaskInfoBuilder colTaskInfoBuilder = taskInfoBuilder.copy(); long indexId = -1; long taskId = Env.getCurrentEnv().getNextId(); AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setColName(colName) .setIndexId(indexId).setTaskId(taskId).build(); - analysisTaskInfos.put(taskId, analysisTaskInfo); + analysisTasks.put(taskId, createTask(analysisTaskInfo)); if (isSync) { continue; } @@ -249,6 +257,7 @@ public class AnalysisManager { params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time); params.put("jobId", String.valueOf(info.jobId)); params.put("taskId", String.valueOf(info.taskId)); + params.put("isAllTask", "false"); try { StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE)); } catch (Exception e) { @@ -256,8 +265,8 @@ public class AnalysisManager { } finally { info.state = jobState; if (analysisJobIdToTaskMap.get(info.jobId).values() - .stream().allMatch(i -> i.state != null - && i.state != AnalysisState.PENDING && i.state != AnalysisState.RUNNING)) { + .stream().allMatch(t -> t.info.state != null + && t.info.state != AnalysisState.PENDING && t.info.state != AnalysisState.RUNNING)) { analysisJobIdToTaskMap.remove(info.jobId); params.put("taskId", String.valueOf(-1)); try { @@ -298,21 +307,14 @@ public class AnalysisManager { return results; } - private void syncExecute(Collection taskInfos) { - List colNames = new ArrayList<>(); - for (AnalysisTaskInfo info : taskInfos) { - try { - TableIf table = StatisticsUtil.findTable(info.catalogName, - info.dbName, info.tblName); - BaseAnalysisTask analysisTask = table.createAnalysisTask(info); - analysisTask.execute(); - } catch (Throwable t) { - colNames.add(info.colName); - LOG.info("Failed to analyze, info: {}", info); - } - } - if (!colNames.isEmpty()) { - throw new RuntimeException("Failed to analyze following columns: " + String.join(",", colNames)); + private void syncExecute(Collection tasks) { + SyncTaskCollection syncTaskCollection = new SyncTaskCollection(tasks); + ConnectContext ctx = ConnectContext.get(); + try { + ctxToSyncTask.put(ctx, syncTaskCollection); + syncTaskCollection.execute(); + } finally { + ctxToSyncTask.remove(ctx); } } @@ -329,4 +331,91 @@ public class AnalysisManager { } } + public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) throws DdlException { + Map analysisTaskInfoMap = analysisJobIdToTaskMap.remove(killAnalysisJobStmt.jobId); + if (analysisTaskInfoMap == null) { + throw new DdlException("Job not exists or already finished"); + } + BaseAnalysisTask anyTask = analysisTaskInfoMap.values().stream().findFirst().orElse(null); + if (anyTask == null) { + return; + } + checkPriv(anyTask); + for (BaseAnalysisTask taskInfo : analysisTaskInfoMap.values()) { + taskInfo.markAsKilled(); + } + Map params = new HashMap<>(); + params.put("jobState", AnalysisState.FAILED.toString()); + params.put("message", ", message = 'Killed by user : " + ConnectContext.get().getQualifiedUser() + "'"); + params.put("updateExecTime", ", last_exec_time_in_ms=" + String.valueOf(System.currentTimeMillis())); + params.put("jobId", String.valueOf(killAnalysisJobStmt.jobId)); + params.put("taskId", "'-1'"); + params.put("isAllTask", "true"); + try { + StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE)); + } catch (Exception e) { + LOG.warn("Failed to update status", e); + } + } + + private void checkPriv(BaseAnalysisTask analysisTask) { + String dbName = analysisTask.db.getFullName(); + String tblName = analysisTask.tbl.getName(); + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), dbName, tblName, PrivPredicate.SELECT)) { + throw new RuntimeException("You need at least SELECT PRIV to corresponding table to kill this analyze" + + " job"); + } + } + + public void cancelSyncTask(ConnectContext connectContext) { + SyncTaskCollection syncTaskCollection = ctxToSyncTask.get(connectContext); + if (syncTaskCollection != null) { + syncTaskCollection.cancel(); + } + } + + private BaseAnalysisTask createTask(AnalysisTaskInfo analysisTaskInfo) throws DdlException { + try { + TableIf table = StatisticsUtil.findTable(analysisTaskInfo.catalogName, + analysisTaskInfo.dbName, analysisTaskInfo.tblName); + return table.createAnalysisTask(analysisTaskInfo); + } catch (Throwable t) { + LOG.warn("Failed to find table", t); + throw new DdlException("Error when trying to find table", t); + } + } + + private static class SyncTaskCollection { + public volatile boolean cancelled; + + public final Collection tasks; + + public SyncTaskCollection(Collection tasks) { + this.tasks = tasks; + } + + public void cancel() { + cancelled = true; + tasks.forEach(BaseAnalysisTask::markAsKilled); + } + + public void execute() { + List colNames = new ArrayList<>(); + for (BaseAnalysisTask task : tasks) { + if (cancelled) { + continue; + } + try { + task.execute(); + } catch (Throwable t) { + colNames.add(task.info.colName); + LOG.info("Failed to analyze, info: {}", task); + } + } + if (!colNames.isEmpty()) { + throw new RuntimeException("Failed to analyze following columns: " + String.join(",", colNames)); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 919185d287..74a1748a50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -103,21 +103,12 @@ public class AnalysisTaskExecutor extends Thread { private void doFetchAndExecute() { BaseAnalysisTask task = taskScheduler.getPendingTasks(); AnalysisTaskWrapper taskWrapper = new AnalysisTaskWrapper(this, task); - incr(); executors.submit(taskWrapper); Env.getCurrentEnv().getAnalysisManager() .updateTaskStatus(task.info, AnalysisState.RUNNING, "", System.currentTimeMillis()); } - public void decr() { - blockingCounter.decr(); - } - - public void incr() { - blockingCounter.incr(); - } - public void putJob(AnalysisTaskWrapper wrapper) throws Exception { taskQueue.put(wrapper); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java index ede3677161..c854af38d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java @@ -18,8 +18,6 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -44,12 +42,10 @@ public class AnalysisTaskScheduler { private final Set manualJobSet = new HashSet<>(); - public synchronized void schedule(AnalysisTaskInfo analysisTaskInfo) { + public synchronized void schedule(BaseAnalysisTask analysisTask) { try { - TableIf table = StatisticsUtil.findTable(analysisTaskInfo.catalogName, - analysisTaskInfo.dbName, analysisTaskInfo.tblName); - BaseAnalysisTask analysisTask = table.createAnalysisTask(analysisTaskInfo); - switch (analysisTaskInfo.jobType) { + + switch (analysisTask.info.jobType) { case MANUAL: addToManualJobQueue(analysisTask); break; @@ -57,11 +53,11 @@ public class AnalysisTaskScheduler { addToSystemQueue(analysisTask); break; default: - throw new IllegalArgumentException("Unknown job type: " + analysisTaskInfo.jobType); + throw new IllegalArgumentException("Unknown job type: " + analysisTask.info.jobType); } } catch (Throwable t) { Env.getCurrentEnv().getAnalysisManager().updateTaskStatus( - analysisTaskInfo, AnalysisState.FAILED, t.getMessage(), System.currentTimeMillis()); + analysisTask.info, AnalysisState.FAILED, t.getMessage(), System.currentTimeMillis()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java index 3ca55dbd9e..4590e138f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java @@ -48,6 +48,9 @@ public class AnalysisTaskWrapper extends FutureTask { startTime = System.currentTimeMillis(); Throwable except = null; try { + if (task.killed) { + return; + } executor.putJob(this); super.run(); Object result = get(); @@ -57,18 +60,19 @@ public class AnalysisTaskWrapper extends FutureTask { } catch (Exception e) { except = e; } finally { - executor.decr(); - if (except != null) { - LOG.warn("Failed to execute task", except); - Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, - AnalysisState.FAILED, except.getMessage(), -1); - } else { - Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(task.info, - AnalysisState.FINISHED, "", System.currentTimeMillis()); + if (!task.killed) { + if (except != null) { + LOG.warn("Failed to execute task", except); + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, + AnalysisState.FAILED, except.getMessage(), -1); + } else { + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, + AnalysisState.FINISHED, "", System.currentTimeMillis()); + } + LOG.warn("{} finished, cost time:{}", task.toString(), System.currentTimeMillis() - startTime); } - LOG.warn("{} finished, cost time:{}", task.toString(), System.currentTimeMillis() - startTime); } } @@ -78,8 +82,6 @@ public class AnalysisTaskWrapper extends FutureTask { task.cancel(); } catch (Exception e) { LOG.warn(String.format("Cancel job failed job info : %s", task.toString())); - } finally { - executor.decr(); } return super.cancel(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 53c268426d..1178355bd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -99,10 +99,10 @@ public abstract class BaseAnalysisTask { protected StmtExecutor stmtExecutor; - protected AnalysisState analysisState; - protected Set unsupportedType = new HashSet<>(); + protected volatile boolean killed; + @VisibleForTesting public BaseAnalysisTask() { @@ -162,6 +162,9 @@ public abstract class BaseAnalysisTask { if (stmtExecutor != null) { stmtExecutor.cancel(); } + if (killed) { + return; + } Env.getCurrentEnv().getAnalysisManager() .updateTaskStatus(info, AnalysisState.FAILED, String.format("Job has been cancelled: %s", info.toString()), -1); @@ -175,10 +178,6 @@ public abstract class BaseAnalysisTask { return info.jobId; } - public AnalysisState getAnalysisState() { - return analysisState; - } - protected String getDataSizeFunction(Column column) { if (column.getType().isStringType()) { return "SUM(LENGTH(`${colName}`))"; @@ -201,4 +200,9 @@ public abstract class BaseAnalysisTask { return String.format("TABLESAMPLE(%d ROWS)", info.sampleRows); } } + + public void markAsKilled() { + this.killed = true; + cancel(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index c16b0a4d36..7a835a0503 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -68,4 +68,5 @@ public class StatisticConstants { public static final int FETCH_INTERVAL_IN_MS = 500; public static final int HISTOGRAM_MAX_BUCKET_NUM = 128; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 0c17a39059..204cf43182 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -33,6 +33,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; @@ -41,6 +42,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; import org.apache.doris.qe.AutoCloseConnectContext; @@ -229,6 +231,7 @@ public class StatisticsUtil { return dateTimeLiteral.getDouble(); case CHAR: case VARCHAR: + case STRING: VarcharLiteral varchar = new VarcharLiteral(columnValue); return varchar.getDouble(); case HLL: @@ -311,4 +314,35 @@ public class StatisticsUtil { .map(s -> "null".equalsIgnoreCase(s) || s.isEmpty()) .orElse(true); } + + public static boolean statsTblAvailable() { + String dbName = SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME; + List statsTbls = new ArrayList<>(); + try { + statsTbls.add( + (OlapTable) StatisticsUtil + .findTable(InternalCatalog.INTERNAL_CATALOG_NAME, + dbName, + StatisticConstants.STATISTIC_TBL_NAME)); + statsTbls.add( + (OlapTable) StatisticsUtil + .findTable(InternalCatalog.INTERNAL_CATALOG_NAME, + dbName, + StatisticConstants.HISTOGRAM_TBL_NAME)); + statsTbls.add((OlapTable) StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME, + dbName, + StatisticConstants.ANALYSIS_JOB_TABLE)); + } catch (Throwable t) { + return false; + } + for (OlapTable table : statsTbls) { + for (Partition partition : table.getPartitions()) { + if (partition.getBaseIndex().getTablets().stream() + .anyMatch(t -> t.getNormalReplicaBackendIds().isEmpty())) { + return false; + } + } + } + return true; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index 09f64621ce..6ec2b70a2c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -19,6 +19,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.common.FeConstants; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; @@ -52,13 +53,14 @@ public class AnalysisJobTest extends TestWithFeService { } catch (Exception e) { throw new RuntimeException(e); } + FeConstants.runningUnitTest = true; } @Test public void testCreateAnalysisJob(@Mocked AnalysisTaskScheduler scheduler) throws Exception { new Expectations() { { - scheduler.schedule((AnalysisTaskInfo) any); + scheduler.schedule((BaseAnalysisTask) any); times = 3; } }; diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java index 78d4e829ab..196fa883f1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/HistogramTaskTest.java @@ -18,6 +18,7 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; @@ -66,6 +67,7 @@ public class HistogramTaskTest extends TestWithFeService { + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" + ")"); + FeConstants.runningUnitTest = true; } @Tested @@ -78,19 +80,19 @@ public class HistogramTaskTest extends TestWithFeService { "ANALYZE TABLE t1(col1) UPDATE HISTOGRAM"); Assertions.assertNotNull(executor); - ConcurrentMap> taskMap = + ConcurrentMap> taskMap = Deencapsulation.getField(analysisManager, "analysisJobIdToTaskMap"); Assertions.assertEquals(1, taskMap.size()); - for (Entry> infoMap : taskMap.entrySet()) { - Map taskInfo = infoMap.getValue(); + for (Entry> infoMap : taskMap.entrySet()) { + Map taskInfo = infoMap.getValue(); Assertions.assertEquals(1, taskInfo.size()); - for (Entry infoEntry : taskInfo.entrySet()) { - AnalysisTaskInfo info = infoEntry.getValue(); - Assertions.assertEquals(AnalysisType.HISTOGRAM, info.analysisType); - Assertions.assertEquals("t1", info.tblName); - Assertions.assertEquals("col1", info.colName); + for (Entry infoEntry : taskInfo.entrySet()) { + BaseAnalysisTask task = infoEntry.getValue(); + Assertions.assertEquals(AnalysisType.HISTOGRAM, task.info.analysisType); + Assertions.assertEquals("t1", task.info.tblName); + Assertions.assertEquals("col1", task.info.colName); } } } diff --git a/regression-test/data/statistics/analyze_test.out b/regression-test/data/statistics/analyze_test.out index 497e2f6509..fbe46f36cc 100644 --- a/regression-test/data/statistics/analyze_test.out +++ b/regression-test/data/statistics/analyze_test.out @@ -26,6 +26,10 @@ -- !sql_5 -- 0 +-- !sql_3 -- +0 + +>>>>>>> e483c51955 (kill) -- !sql_4 -- 4 diff --git a/regression-test/suites/statistics/analyze_test.groovy b/regression-test/suites/statistics/analyze_test.groovy index 520f5d1f5e..c793a1c8cd 100644 --- a/regression-test/suites/statistics/analyze_test.groovy +++ b/regression-test/suites/statistics/analyze_test.groovy @@ -263,8 +263,6 @@ suite("analyze_test") { // throw e; // } - - sql """CREATE TABLE ${tblName2} (analyze_test_col1 varchar(11451) not null, analyze_test_col2 int not null, analyze_test_col3 int not null) UNIQUE KEY(analyze_test_col1) DISTRIBUTED BY HASH(analyze_test_col1)