diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index 08240250c6..ff60c6a180 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -93,6 +93,8 @@ public class CascadesContext implements ScheduleContext, PlanSource { private boolean isRewriteRoot; + private volatile boolean isTimeout = false; + private Optional outerScope = Optional.empty(); public CascadesContext(Plan plan, Memo memo, StatementContext statementContext, @@ -135,6 +137,14 @@ public class CascadesContext implements ScheduleContext, PlanSource { return new CascadesContext(initPlan, null, statementContext, cteContext, PhysicalProperties.ANY); } + public synchronized void setIsTimeout(boolean isTimeout) { + this.isTimeout = isTimeout; + } + + public synchronized boolean isTimeout() { + return isTimeout; + } + public void toMemo() { this.memo = new Memo(plan); } @@ -282,7 +292,9 @@ public class CascadesContext implements ScheduleContext, PlanSource { this.outerScope = Optional.ofNullable(outerScope); } - /** getAndCacheSessionVariable */ + /** + * getAndCacheSessionVariable + */ public T getAndCacheSessionVariable(String cacheName, T defaultValue, Function variableSupplier) { ConnectContext connectContext = getConnectContext(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 24fba6776f..b14f7a67c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -65,6 +65,10 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -187,6 +191,11 @@ public class NereidsPlanner extends Planner { } } + Optional timeoutExecutor = Optional.empty(); + if (ConnectContext.get().getSessionVariable().enableNereidsTimeout) { + timeoutExecutor = Optional.of(runTimeoutExecutor()); + } + // rule-based optimize rewrite(); if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) { @@ -224,6 +233,8 @@ public class NereidsPlanner extends Planner { optimizedPlan = physicalPlan; } + timeoutExecutor.ifPresent(ExecutorService::shutdown); + return physicalPlan; } } @@ -343,6 +354,14 @@ public class NereidsPlanner extends Planner { } } + private ScheduledExecutorService runTimeoutExecutor() { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + Runnable task = () -> cascadesContext.setIsTimeout(true); + executor.schedule(task, 5, TimeUnit.SECONDS); + + return executor; + } + @Override public String getExplainString(ExplainOptions explainOptions) { ExplainLevel explainLevel = getExplainLevel(explainOptions); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/CascadesOptimizer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/CascadesOptimizer.java index 9b7d15c9fc..fc0f0e12b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/CascadesOptimizer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/CascadesOptimizer.java @@ -26,7 +26,7 @@ import java.util.Objects; * cascade optimizer. */ public class CascadesOptimizer { - private CascadesContext cascadesContext; + private final CascadesContext cascadesContext; public CascadesOptimizer(CascadesContext cascadesContext) { this.cascadesContext = Objects.requireNonNull(cascadesContext, "cascadesContext cannot be null"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java index a93270d48e..940f9d572f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.jobs.scheduler; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.Job; /** @@ -27,6 +28,10 @@ public class SimpleJobScheduler implements JobScheduler { public void executeJobPool(ScheduleContext scheduleContext) { JobPool pool = scheduleContext.getJobPool(); while (!pool.isEmpty()) { + CascadesContext context = (CascadesContext) scheduleContext; + if (context.isTimeout()) { + throw new RuntimeException("Nereids cost too much time ( > 5s )"); + } Job job = pool.pop(); job.execute(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index b3a253103a..4478479e0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -208,6 +208,7 @@ public class SessionVariable implements Serializable, Writable { public static final String DISABLE_NEREIDS_RULES = "disable_nereids_rules"; public static final String ENABLE_NEW_COST_MODEL = "enable_new_cost_model"; public static final String ENABLE_FALLBACK_TO_ORIGINAL_PLANNER = "enable_fallback_to_original_planner"; + public static final String ENABLE_NEREIDS_TIMEOUT = "enable_nereids_timeout"; public static final String FORBID_UNKNOWN_COLUMN_STATS = "forbid_unknown_col_stats"; public static final String BROADCAST_RIGHT_TABLE_SCALE_FACTOR = "broadcast_right_table_scale_factor"; @@ -708,6 +709,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_FALLBACK_TO_ORIGINAL_PLANNER, needForward = true) public boolean enableFallbackToOriginalPlanner = true; + @VariableMgr.VarAttr(name = ENABLE_NEREIDS_TIMEOUT, needForward = true) + public boolean enableNereidsTimeout = true; + @VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD) public boolean enableNewShuffleHashMethod = true; diff --git a/regression-test/suites/nereids_tpcds_p0/query_empty_table/nereids_tpcds_query_empty_table.groovy b/regression-test/suites/nereids_tpcds_p0/query_empty_table/nereids_tpcds_query_empty_table.groovy index 5c1e845454..1acfc38239 100644 --- a/regression-test/suites/nereids_tpcds_p0/query_empty_table/nereids_tpcds_query_empty_table.groovy +++ b/regression-test/suites/nereids_tpcds_p0/query_empty_table/nereids_tpcds_query_empty_table.groovy @@ -54,6 +54,7 @@ suite("nereids_tpcds_query_empty_table") { sql "set enable_nereids_planner=true" sql "set enable_fallback_to_original_planner=false" sql "set query_timeout=60" + sql "set enable_nereids_timeout=false" logger.info("execute ${sqlFile.getName()} [${++num}/${sqlFiles.size()}]".toString()) try {