[opt](nereids) Retry when async analyze task failed (#21889)
Retry at most 5 times when async analyze task execution failed
This commit is contained in:
@ -2018,7 +2018,7 @@ public class Config extends ConfigBase {
|
||||
public static boolean enable_feature_binlog = false;
|
||||
|
||||
@ConfField
|
||||
public static int analyze_task_timeout_in_minutes = 120;
|
||||
public static int analyze_task_timeout_in_hours = 12;
|
||||
|
||||
@ConfField(mutable = true, masterOnly = true, description = {
|
||||
"是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
|
||||
|
||||
@ -820,7 +820,7 @@ under the License.
|
||||
<reuseForks>false</reuseForks>
|
||||
<useFile>false</useFile>
|
||||
<argLine>
|
||||
-javaagent:${settings.localRepository}/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar
|
||||
-Xmx512m -javaagent:${settings.localRepository}/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar
|
||||
</argLine>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
@ -816,7 +816,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
task.execute();
|
||||
task.doExecute();
|
||||
updateSyncTaskStatus(task, AnalysisState.FINISHED);
|
||||
} catch (Throwable t) {
|
||||
colNames.add(task.info.colName);
|
||||
|
||||
@ -71,7 +71,7 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
try {
|
||||
AnalysisTaskWrapper taskWrapper = taskQueue.take();
|
||||
try {
|
||||
long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes)
|
||||
long timeout = TimeUnit.HOURS.toMillis(Config.analyze_task_timeout_in_hours)
|
||||
- (System.currentTimeMillis() - taskWrapper.getStartTime());
|
||||
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -160,7 +160,22 @@ public abstract class BaseAnalysisTask {
|
||||
|
||||
}
|
||||
|
||||
public abstract void execute() throws Exception;
|
||||
public void execute() {
|
||||
int retriedTimes = 0;
|
||||
while (retriedTimes <= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
if (killed) {
|
||||
break;
|
||||
}
|
||||
try {
|
||||
doExecute();
|
||||
break;
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void doExecute() throws Exception;
|
||||
|
||||
public void cancel() {
|
||||
killed = true;
|
||||
|
||||
@ -104,7 +104,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
table = (HMSExternalTable) tbl;
|
||||
}
|
||||
|
||||
public void execute() throws Exception {
|
||||
public void doExecute() throws Exception {
|
||||
if (isTableLevelTask) {
|
||||
getTableStats();
|
||||
} else {
|
||||
|
||||
@ -58,7 +58,7 @@ public class HistogramTask extends BaseAnalysisTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute() throws Exception {
|
||||
public void doExecute() throws Exception {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("histogramStatTbl", StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
|
||||
@ -86,7 +86,7 @@ public class MVAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute() throws Exception {
|
||||
public void doExecute() throws Exception {
|
||||
for (Column column : meta.getSchema()) {
|
||||
SelectStmt selectOne = (SelectStmt) selectStmt.clone();
|
||||
TableRef tableRef = selectOne.getTableRefs().get(0);
|
||||
|
||||
@ -59,7 +59,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
super(info);
|
||||
}
|
||||
|
||||
public void execute() throws Exception {
|
||||
public void doExecute() throws Exception {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
|
||||
@ -84,6 +84,8 @@ public class StatisticConstants {
|
||||
|
||||
public static List<String> STATISTICS_DB_BLACK_LIST = new ArrayList<>();
|
||||
|
||||
public static int ANALYZE_TASK_RETRY_TIMES = 5;
|
||||
|
||||
static {
|
||||
STATISTICS_DB_BLACK_LIST.add(SystemInfoService.DEFAULT_CLUSTER
|
||||
+ ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);
|
||||
|
||||
@ -121,6 +121,9 @@ public class StatisticsUtil {
|
||||
}
|
||||
|
||||
public static List<ResultRow> execStatisticQuery(String sql) {
|
||||
if (FeConstants.disableInternalSchemaDb) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
r.connectContext.setExecutor(stmtExecutor);
|
||||
|
||||
@ -130,7 +130,7 @@ public class AnalysisJobTest extends TestWithFeService {
|
||||
.setAnalysisType(AnalysisType.FUNDAMENTALS)
|
||||
.setColToPartitions(colToPartitions)
|
||||
.build();
|
||||
new OlapAnalysisTask(analysisJobInfo).execute();
|
||||
new OlapAnalysisTask(analysisJobInfo).doExecute();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -127,7 +127,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
|
||||
};
|
||||
new Expectations() {
|
||||
{
|
||||
task.execute();
|
||||
task.doExecute();
|
||||
times = 1;
|
||||
}
|
||||
};
|
||||
|
||||
@ -120,7 +120,7 @@ public class HistogramTaskTest extends TestWithFeService {
|
||||
};
|
||||
new Expectations() {
|
||||
{
|
||||
task.execute();
|
||||
task.doExecute();
|
||||
times = 1;
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user