From 87258a13c4e134cf5eb5b3ad4c28a2281fb869f8 Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Tue, 20 Jun 2023 11:37:46 +0900 Subject: [PATCH] [enhancement](nereids) Remove useless config option #20905 1. Remove useless config option 2. Fix timeout cancel, before this PR an OlapAnalysisTask would continue running even if it's already timeout. --- .../java/org/apache/doris/common/Config.java | 6 ------ .../doris/statistics/AnalysisManager.java | 4 ++-- .../statistics/AnalysisTaskExecutor.java | 7 ++----- .../doris/statistics/AnalysisTaskWrapper.java | 1 + .../doris/statistics/BaseAnalysisTask.java | 9 +-------- .../doris/statistics/OlapAnalysisTask.java | 19 +++++++++++++++---- .../statistics/AnalysisTaskExecutorTest.java | 9 --------- 7 files changed, 21 insertions(+), 34 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index f44a9833d5..c01cf90d1b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1586,12 +1586,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int be_exec_version = max_be_exec_version; - @ConfField(mutable = false) - public static int statistic_job_scheduler_execution_interval_ms = 1000; - - @ConfField(mutable = false) - public static int statistic_task_scheduler_execution_interval_ms = 1000; - /* * mtmv is still under dev, remove this config when it is graduate. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index b5b182dd62..99c4922850 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -711,7 +711,7 @@ public class AnalysisManager extends Daemon implements Writable { checkPriv(anyTask); logKilled(analysisJobInfoMap.get(anyTask.getJobId())); for (BaseAnalysisTask taskInfo : analysisTaskMap.values()) { - taskInfo.markAsKilled(); + taskInfo.cancel(); logKilled(taskInfo.info); } } @@ -780,7 +780,7 @@ public class AnalysisManager extends Daemon implements Writable { public void cancel() { cancelled = true; - tasks.forEach(BaseAnalysisTask::markAsKilled); + tasks.forEach(BaseAnalysisTask::cancel); } public void execute() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 3f22d1ccf3..00e2b9fc76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -21,7 +21,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; -import org.apache.doris.statistics.util.BlockingCounter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,9 +45,6 @@ public class AnalysisTaskExecutor extends Thread { private final AnalysisTaskScheduler taskScheduler; - private final BlockingCounter blockingCounter = - new BlockingCounter(Config.statistics_simultaneously_running_task_num); - private final BlockingQueue taskQueue = new PriorityBlockingQueue(20, Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); @@ -75,7 +71,8 @@ public class AnalysisTaskExecutor extends Thread { try { AnalysisTaskWrapper taskWrapper = taskQueue.take(); try { - long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes); + long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes) + - (System.currentTimeMillis() - taskWrapper.getStartTime()); taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); } catch (Exception e) { taskWrapper.cancel(e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java index b2615e5d05..864c9100d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java @@ -83,6 +83,7 @@ public class AnalysisTaskWrapper extends FutureTask { } catch (Exception e) { LOG.warn(String.format("Cancel job failed job info : %s", msg)); } + // Interrupt thread when it's writing metadata would cause FE crush. return super.cancel(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 1fc97c10dc..a1a52c0a11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -163,12 +163,10 @@ public abstract class BaseAnalysisTask { public abstract void execute() throws Exception; public void cancel() { + killed = true; if (stmtExecutor != null) { stmtExecutor.cancel(); } - if (killed) { - return; - } Env.getCurrentEnv().getAnalysisManager() .updateTaskStatus(info, AnalysisState.FAILED, String.format("Job has been cancelled: %s", info.toString()), -1); @@ -202,9 +200,4 @@ public abstract class BaseAnalysisTask { return String.format("TABLESAMPLE(%d ROWS)", info.sampleRows); } } - - public void markAsKilled() { - this.killed = true; - cancel(); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 4391d87f52..d33248e873 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -20,8 +20,10 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.common.FeConstants; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; @@ -108,10 +110,19 @@ public class OlapAnalysisTask extends BaseAnalysisTask { @VisibleForTesting public void execSQL(String sql) throws Exception { - QueryState queryState = StatisticsUtil.execUpdate(sql); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s", - info.catalogName, info.dbName, info.colName, queryState.getErrorMessage())); + if (killed) { + return; + } + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + stmtExecutor = new StmtExecutor(r.connectContext, sql); + r.connectContext.setExecutor(stmtExecutor); + stmtExecutor.execute(); + QueryState queryState = r.connectContext.getState(); + if (queryState.getStateType().equals(MysqlStateType.ERR)) { + throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s", + info.catalogName, info.dbName, info.colName, queryState.getErrorMessage())); + } } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index 522a28d9c2..264015dd9e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -24,7 +24,6 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisMode; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; -import org.apache.doris.statistics.util.BlockingCounter; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.utframe.TestWithFeService; @@ -33,7 +32,6 @@ import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -86,13 +84,6 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { Deencapsulation.setField(analysisTaskWrapper, "startTime", 5); b.put(analysisTaskWrapper); analysisTaskExecutor.start(); - BlockingCounter counter = Deencapsulation.getField(analysisTaskExecutor, "blockingCounter"); - int sleepTime = 500; - while (counter.getVal() != 0 && sleepTime > 0) { - sleepTime -= 100; - Thread.sleep(100); - } - Assertions.assertEquals(0, counter.getVal()); } @Test