[fix](fe) Fix mem leaks (#14570)

1. Fix memory leaks in StmtExecutor::executeInternalQuery
2. Limit the number of concurrent running load task for statistics cache
This commit is contained in:
Kikyou1997
2022-11-25 09:16:54 +08:00
committed by GitHub
parent 0ae246a93b
commit d12112b930
3 changed files with 90 additions and 62 deletions

View File

@ -1792,52 +1792,56 @@ public class StmtExecutor implements ProfileWriter {
}
public List<ResultRow> executeInternalQuery() {
analyzer = new Analyzer(context.getEnv(), context);
try {
analyze(context.getSessionVariable().toThrift());
} catch (UserException e) {
LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e);
return null;
}
planner.getFragments();
RowBatch batch;
coord = new Coordinator(context, analyzer, planner);
try {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
} catch (UserException e) {
LOG.warn(e.getMessage(), e);
}
coord.setProfileWriter(this);
Span queryScheduleSpan = context.getTracer()
.spanBuilder("internal SQL schedule").setParent(Context.current()).startSpan();
try (Scope scope = queryScheduleSpan.makeCurrent()) {
coord.exec();
} catch (Exception e) {
queryScheduleSpan.recordException(e);
LOG.warn("Unexpected exception when SQL running", e);
} finally {
queryScheduleSpan.end();
}
Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal SQL result")
.setParent(Context.current()).startSpan();
List<ResultRow> resultRows = new ArrayList<>();
try (Scope scope = fetchResultSpan.makeCurrent()) {
while (true) {
batch = coord.getNext();
if (batch == null || batch.isEos()) {
return resultRows;
} else {
resultRows.addAll(convertResultBatchToResultRows(batch.getBatch()));
}
analyzer = new Analyzer(context.getEnv(), context);
try {
analyze(context.getSessionVariable().toThrift());
} catch (UserException e) {
LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e);
return null;
}
planner.getFragments();
RowBatch batch;
coord = new Coordinator(context, analyzer, planner);
try {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
} catch (UserException e) {
LOG.warn(e.getMessage(), e);
}
coord.setProfileWriter(this);
Span queryScheduleSpan = context.getTracer()
.spanBuilder("internal SQL schedule").setParent(Context.current()).startSpan();
try (Scope scope = queryScheduleSpan.makeCurrent()) {
coord.exec();
} catch (Exception e) {
queryScheduleSpan.recordException(e);
LOG.warn("Unexpected exception when SQL running", e);
} finally {
queryScheduleSpan.end();
}
Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal SQL result")
.setParent(Context.current()).startSpan();
List<ResultRow> resultRows = new ArrayList<>();
try (Scope scope = fetchResultSpan.makeCurrent()) {
while (true) {
batch = coord.getNext();
if (batch == null || batch.isEos()) {
return resultRows;
} else {
resultRows.addAll(convertResultBatchToResultRows(batch.getBatch()));
}
}
} catch (Exception e) {
LOG.warn("Unexpected exception when SQL running", e);
fetchResultSpan.recordException(e);
return null;
} finally {
fetchResultSpan.end();
}
} catch (Exception e) {
LOG.warn("Unexpected exception when SQL running", e);
fetchResultSpan.recordException(e);
return null;
} finally {
fetchResultSpan.end();
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
}

View File

@ -53,8 +53,11 @@ public class StatisticConstants {
public static final long STATISTICS_RECORDS_CACHE_SIZE = 100000;
/**
* If analysys job execution time exceeds this time, it would be cancelled.
* If analysis job execution time exceeds this time, it would be cancelled.
*/
public static final long STATISTICS_TASKS_TIMEOUT_IN_MS = TimeUnit.MINUTES.toMillis(10);
public static final int LOAD_TASK_LIMITS = 10;
}

View File

@ -43,28 +43,49 @@ public class StatisticsCacheLoader implements AsyncCacheLoader<StatisticsCacheKe
+ "." + StatisticConstants.STATISTIC_TBL_NAME + " WHERE "
+ "id = CONCAT('${tblId}', '-', '${colId}')";
private static int CUR_RUNNING_LOAD = 0;
private static final Object LOCK = new Object();
// TODO: Maybe we should trigger a analyze job when the required ColumnStatistic doesn't exists.
@Override
public @NonNull CompletableFuture<ColumnStatistic> asyncLoad(@NonNull StatisticsCacheKey key,
@NonNull Executor executor) {
return CompletableFuture.supplyAsync(() -> {
Map<String, String> params = new HashMap<>();
params.put("tblId", String.valueOf(key.tableId));
params.put("colId", String.valueOf(key.colName));
List<ResultRow> resultBatches =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(QUERY_COLUMN_STATISTICS));
List<ColumnStatistic> columnStatistics = null;
try {
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(resultBatches);
} catch (Exception e) {
LOG.warn("Failed to deserialize column statistics", e);
throw new CompletionException(e);
synchronized (LOCK) {
if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) {
try {
LOCK.wait();
} catch (InterruptedException e) {
LOG.warn("Ignore interruption", e);
}
}
if (CollectionUtils.isEmpty(columnStatistics)) {
return ColumnStatistic.DEFAULT;
}
return columnStatistics.get(0);
});
CUR_RUNNING_LOAD++;
return CompletableFuture.supplyAsync(() -> {
try {
Map<String, String> params = new HashMap<>();
params.put("tblId", String.valueOf(key.tableId));
params.put("colId", String.valueOf(key.colName));
List<ResultRow> resultBatches =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(QUERY_COLUMN_STATISTICS));
List<ColumnStatistic> columnStatistics = null;
try {
columnStatistics = StatisticsUtil.deserializeToColumnStatistics(resultBatches);
} catch (Exception e) {
LOG.warn("Failed to deserialize column statistics", e);
throw new CompletionException(e);
}
if (CollectionUtils.isEmpty(columnStatistics)) {
return ColumnStatistic.DEFAULT;
}
return columnStatistics.get(0);
} finally {
synchronized (LOCK) {
CUR_RUNNING_LOAD--;
LOCK.notify();
}
}
});
}
}
}