[enhancement](nereids) Remove useless config option #20905
1. Remove useless config option 2. Fix timeout cancel, before this PR an OlapAnalysisTask would continue running even if it's already timeout.
This commit is contained in:
@ -1586,12 +1586,6 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int be_exec_version = max_be_exec_version;
|
||||
|
||||
@ConfField(mutable = false)
|
||||
public static int statistic_job_scheduler_execution_interval_ms = 1000;
|
||||
|
||||
@ConfField(mutable = false)
|
||||
public static int statistic_task_scheduler_execution_interval_ms = 1000;
|
||||
|
||||
/*
|
||||
* mtmv is still under dev, remove this config when it is graduate.
|
||||
*/
|
||||
|
||||
@ -711,7 +711,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
checkPriv(anyTask);
|
||||
logKilled(analysisJobInfoMap.get(anyTask.getJobId()));
|
||||
for (BaseAnalysisTask taskInfo : analysisTaskMap.values()) {
|
||||
taskInfo.markAsKilled();
|
||||
taskInfo.cancel();
|
||||
logKilled(taskInfo.info);
|
||||
}
|
||||
}
|
||||
@ -780,7 +780,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
public void cancel() {
|
||||
cancelled = true;
|
||||
tasks.forEach(BaseAnalysisTask::markAsKilled);
|
||||
tasks.forEach(BaseAnalysisTask::cancel);
|
||||
}
|
||||
|
||||
public void execute() {
|
||||
|
||||
@ -21,7 +21,6 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
|
||||
import org.apache.doris.statistics.util.BlockingCounter;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -46,9 +45,6 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
|
||||
private final AnalysisTaskScheduler taskScheduler;
|
||||
|
||||
private final BlockingCounter blockingCounter =
|
||||
new BlockingCounter(Config.statistics_simultaneously_running_task_num);
|
||||
|
||||
private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
|
||||
new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
|
||||
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
|
||||
@ -75,7 +71,8 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
try {
|
||||
AnalysisTaskWrapper taskWrapper = taskQueue.take();
|
||||
try {
|
||||
long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes);
|
||||
long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes)
|
||||
- (System.currentTimeMillis() - taskWrapper.getStartTime());
|
||||
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (Exception e) {
|
||||
taskWrapper.cancel(e.getMessage());
|
||||
|
||||
@ -83,6 +83,7 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
|
||||
} catch (Exception e) {
|
||||
LOG.warn(String.format("Cancel job failed job info : %s", msg));
|
||||
}
|
||||
// Interrupt thread when it's writing metadata would cause FE crush.
|
||||
return super.cancel(false);
|
||||
}
|
||||
|
||||
|
||||
@ -163,12 +163,10 @@ public abstract class BaseAnalysisTask {
|
||||
public abstract void execute() throws Exception;
|
||||
|
||||
public void cancel() {
|
||||
killed = true;
|
||||
if (stmtExecutor != null) {
|
||||
stmtExecutor.cancel();
|
||||
}
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(info, AnalysisState.FAILED,
|
||||
String.format("Job has been cancelled: %s", info.toString()), -1);
|
||||
@ -202,9 +200,4 @@ public abstract class BaseAnalysisTask {
|
||||
return String.format("TABLESAMPLE(%d ROWS)", info.sampleRows);
|
||||
}
|
||||
}
|
||||
|
||||
public void markAsKilled() {
|
||||
this.killed = true;
|
||||
cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,8 +20,10 @@ package org.apache.doris.statistics;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -108,10 +110,19 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
|
||||
@VisibleForTesting
|
||||
public void execSQL(String sql) throws Exception {
|
||||
QueryState queryState = StatisticsUtil.execUpdate(sql);
|
||||
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s",
|
||||
info.catalogName, info.dbName, info.colName, queryState.getErrorMessage()));
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
r.connectContext.setExecutor(stmtExecutor);
|
||||
stmtExecutor.execute();
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s",
|
||||
info.catalogName, info.dbName, info.colName, queryState.getErrorMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,7 +24,6 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.util.BlockingCounter;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
@ -33,7 +32,6 @@ import mockit.Expectations;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import mockit.Mocked;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
@ -86,13 +84,6 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
|
||||
Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
|
||||
b.put(analysisTaskWrapper);
|
||||
analysisTaskExecutor.start();
|
||||
BlockingCounter counter = Deencapsulation.getField(analysisTaskExecutor, "blockingCounter");
|
||||
int sleepTime = 500;
|
||||
while (counter.getVal() != 0 && sleepTime > 0) {
|
||||
sleepTime -= 100;
|
||||
Thread.sleep(100);
|
||||
}
|
||||
Assertions.assertEquals(0, counter.getVal());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user