[feature](Nereids): when cost time > 5s, throw timeout Exception (#18316)
This commit is contained in:
@ -93,6 +93,8 @@ public class CascadesContext implements ScheduleContext, PlanSource {
|
||||
|
||||
private boolean isRewriteRoot;
|
||||
|
||||
private volatile boolean isTimeout = false;
|
||||
|
||||
private Optional<Scope> outerScope = Optional.empty();
|
||||
|
||||
public CascadesContext(Plan plan, Memo memo, StatementContext statementContext,
|
||||
@ -135,6 +137,14 @@ public class CascadesContext implements ScheduleContext, PlanSource {
|
||||
return new CascadesContext(initPlan, null, statementContext, cteContext, PhysicalProperties.ANY);
|
||||
}
|
||||
|
||||
public synchronized void setIsTimeout(boolean isTimeout) {
|
||||
this.isTimeout = isTimeout;
|
||||
}
|
||||
|
||||
public synchronized boolean isTimeout() {
|
||||
return isTimeout;
|
||||
}
|
||||
|
||||
public void toMemo() {
|
||||
this.memo = new Memo(plan);
|
||||
}
|
||||
@ -282,7 +292,9 @@ public class CascadesContext implements ScheduleContext, PlanSource {
|
||||
this.outerScope = Optional.ofNullable(outerScope);
|
||||
}
|
||||
|
||||
/** getAndCacheSessionVariable */
|
||||
/**
|
||||
* getAndCacheSessionVariable
|
||||
*/
|
||||
public <T> T getAndCacheSessionVariable(String cacheName,
|
||||
T defaultValue, Function<SessionVariable, T> variableSupplier) {
|
||||
ConnectContext connectContext = getConnectContext();
|
||||
|
||||
@ -65,6 +65,10 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -187,6 +191,11 @@ public class NereidsPlanner extends Planner {
|
||||
}
|
||||
}
|
||||
|
||||
Optional<ScheduledExecutorService> timeoutExecutor = Optional.empty();
|
||||
if (ConnectContext.get().getSessionVariable().enableNereidsTimeout) {
|
||||
timeoutExecutor = Optional.of(runTimeoutExecutor());
|
||||
}
|
||||
|
||||
// rule-based optimize
|
||||
rewrite();
|
||||
if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
|
||||
@ -224,6 +233,8 @@ public class NereidsPlanner extends Planner {
|
||||
optimizedPlan = physicalPlan;
|
||||
}
|
||||
|
||||
timeoutExecutor.ifPresent(ExecutorService::shutdown);
|
||||
|
||||
return physicalPlan;
|
||||
}
|
||||
}
|
||||
@ -343,6 +354,14 @@ public class NereidsPlanner extends Planner {
|
||||
}
|
||||
}
|
||||
|
||||
private ScheduledExecutorService runTimeoutExecutor() {
|
||||
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||
Runnable task = () -> cascadesContext.setIsTimeout(true);
|
||||
executor.schedule(task, 5, TimeUnit.SECONDS);
|
||||
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getExplainString(ExplainOptions explainOptions) {
|
||||
ExplainLevel explainLevel = getExplainLevel(explainOptions);
|
||||
|
||||
@ -26,7 +26,7 @@ import java.util.Objects;
|
||||
* cascade optimizer.
|
||||
*/
|
||||
public class CascadesOptimizer {
|
||||
private CascadesContext cascadesContext;
|
||||
private final CascadesContext cascadesContext;
|
||||
|
||||
public CascadesOptimizer(CascadesContext cascadesContext) {
|
||||
this.cascadesContext = Objects.requireNonNull(cascadesContext, "cascadesContext cannot be null");
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.nereids.jobs.scheduler;
|
||||
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.jobs.Job;
|
||||
|
||||
/**
|
||||
@ -27,6 +28,10 @@ public class SimpleJobScheduler implements JobScheduler {
|
||||
public void executeJobPool(ScheduleContext scheduleContext) {
|
||||
JobPool pool = scheduleContext.getJobPool();
|
||||
while (!pool.isEmpty()) {
|
||||
CascadesContext context = (CascadesContext) scheduleContext;
|
||||
if (context.isTimeout()) {
|
||||
throw new RuntimeException("Nereids cost too much time ( > 5s )");
|
||||
}
|
||||
Job job = pool.pop();
|
||||
job.execute();
|
||||
}
|
||||
|
||||
@ -208,6 +208,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String DISABLE_NEREIDS_RULES = "disable_nereids_rules";
|
||||
public static final String ENABLE_NEW_COST_MODEL = "enable_new_cost_model";
|
||||
public static final String ENABLE_FALLBACK_TO_ORIGINAL_PLANNER = "enable_fallback_to_original_planner";
|
||||
public static final String ENABLE_NEREIDS_TIMEOUT = "enable_nereids_timeout";
|
||||
|
||||
public static final String FORBID_UNKNOWN_COLUMN_STATS = "forbid_unknown_col_stats";
|
||||
public static final String BROADCAST_RIGHT_TABLE_SCALE_FACTOR = "broadcast_right_table_scale_factor";
|
||||
@ -708,6 +709,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_FALLBACK_TO_ORIGINAL_PLANNER, needForward = true)
|
||||
public boolean enableFallbackToOriginalPlanner = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_NEREIDS_TIMEOUT, needForward = true)
|
||||
public boolean enableNereidsTimeout = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD)
|
||||
public boolean enableNewShuffleHashMethod = true;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user