[opt](stats) Use single connect context for each olap analyze task

1. add some comment 
2. Fix potential NPE caused by deleting a running analyze job
3. Use single connect  context for each olap analyze task
This commit is contained in:
AKIRA
2023-08-10 15:04:28 +08:00
committed by GitHub
parent f7d00d467a
commit ec0cedab51
3 changed files with 31 additions and 29 deletions

View File

@ -96,17 +96,23 @@ public class AnalysisManager extends Daemon implements Writable {
private static final Logger LOG = LogManager.getLogger(AnalysisManager.class);
private ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();
// Tracking running manually submitted async tasks, keep in mem only
private final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();
private StatisticsCache statisticsCache;
private AnalysisTaskExecutor taskExecutor;
// Store task information in metadata.
private final Map<Long, AnalysisInfo> analysisTaskInfoMap = Collections.synchronizedMap(new TreeMap<>());
// Store job information in metadata
private final Map<Long, AnalysisInfo> analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>());
// Tracking system submitted job, keep in mem only
private final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>();
// Tracking and control sync analyze tasks, keep in mem only
private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();
private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> {
@ -127,6 +133,10 @@ public class AnalysisManager extends Daemon implements Writable {
}
info.lastExecTimeInMs = time;
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
// Job may get deleted during execution.
if (job == null) {
return null;
}
// Synchronize the job state change in job level.
synchronized (job) {
job.lastExecTimeInMs = time;
@ -333,8 +343,6 @@ public class AnalysisManager extends Daemon implements Writable {
if (!isSync) {
persistAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
}
if (!isSync) {
try {
updateTableStats(jobInfo);
} catch (Throwable e) {

View File

@ -67,7 +67,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(info.tblName));
params.put("sampleExpr", getSampleExpression());
List<String> partitionAnalysisSQLs = new ArrayList<>();
List<String> sqls = new ArrayList<>();
try {
tbl.readLock();
Set<String> partNames = info.colToPartitions.get(info.colName);
@ -80,46 +80,40 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
// Avoid error when get the default partition
params.put("partName", "`" + partName + "`");
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
sqls.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
}
} finally {
tbl.readUnlock();
}
execSQLs(partitionAnalysisSQLs);
params.remove("partId");
params.put("type", col.getType().toString());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
execSQL(sql);
sqls.add(sql);
execSQLs(sqls);
}
@VisibleForTesting
public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
for (String sql : partitionAnalysisSQLs) {
execSQL(sql);
}
}
@VisibleForTesting
public void execSQL(String sql) throws Exception {
if (killed) {
return;
}
public void execSQLs(List<String> sqls) throws Exception {
long startTime = System.currentTimeMillis();
LOG.info("ANALYZE SQL : " + sql + " start at " + startTime);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
QueryState queryState = r.connectContext.getState();
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
for (String sql : sqls) {
if (killed) {
return;
}
LOG.info("ANALYZE SQL : " + sql + " start at " + startTime);
stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
QueryState queryState = r.connectContext.getState();
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
}
}
} finally {
LOG.info("Analyze SQL: " + sql + " cost time: " + (System.currentTimeMillis() - startTime) + "ms");
LOG.debug("Analyze SQL: " + sqls + " cost time: " + (System.currentTimeMillis() - startTime) + "ms");
}
}
}

View File

@ -89,7 +89,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
new MockUp<OlapAnalysisTask>() {
@Mock
public void execSQL(String sql) throws Exception {
public void execSQLs(List<String> sqls) throws Exception {
}
};