From d12112b9304871fe820bf61c6e8a6145126e1593 Mon Sep 17 00:00:00 2001 From: Kikyou1997 <33112463+Kikyou1997@users.noreply.github.com> Date: Fri, 25 Nov 2022 09:16:54 +0800 Subject: [PATCH] [fix](fe) Fix mem leaks (#14570) 1. Fix memory leaks in StmtExecutor::executeInternalQuery 2. Limit the number of concurrent running load task for statistics cache --- .../org/apache/doris/qe/StmtExecutor.java | 90 ++++++++++--------- .../doris/statistics/StatisticConstants.java | 5 +- .../statistics/StatisticsCacheLoader.java | 57 ++++++++---- 3 files changed, 90 insertions(+), 62 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c22cb0c2d8..cd8df03f24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1792,52 +1792,56 @@ public class StmtExecutor implements ProfileWriter { } public List executeInternalQuery() { - analyzer = new Analyzer(context.getEnv(), context); try { - analyze(context.getSessionVariable().toThrift()); - } catch (UserException e) { - LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e); - return null; - } - planner.getFragments(); - RowBatch batch; - coord = new Coordinator(context, analyzer, planner); - try { - QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), - new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - } catch (UserException e) { - LOG.warn(e.getMessage(), e); - } - - coord.setProfileWriter(this); - Span queryScheduleSpan = context.getTracer() - .spanBuilder("internal SQL schedule").setParent(Context.current()).startSpan(); - try (Scope scope = queryScheduleSpan.makeCurrent()) { - coord.exec(); - } catch (Exception e) { - queryScheduleSpan.recordException(e); - LOG.warn("Unexpected exception when SQL running", e); - } finally { - queryScheduleSpan.end(); - } - Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal SQL result") - .setParent(Context.current()).startSpan(); - List resultRows = new ArrayList<>(); - try (Scope scope = fetchResultSpan.makeCurrent()) { - while (true) { - batch = coord.getNext(); - if (batch == null || batch.isEos()) { - return resultRows; - } else { - resultRows.addAll(convertResultBatchToResultRows(batch.getBatch())); - } + analyzer = new Analyzer(context.getEnv(), context); + try { + analyze(context.getSessionVariable().toThrift()); + } catch (UserException e) { + LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e); + return null; + } + planner.getFragments(); + RowBatch batch; + coord = new Coordinator(context, analyzer, planner); + try { + QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), + new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); + } catch (UserException e) { + LOG.warn(e.getMessage(), e); + } + + coord.setProfileWriter(this); + Span queryScheduleSpan = context.getTracer() + .spanBuilder("internal SQL schedule").setParent(Context.current()).startSpan(); + try (Scope scope = queryScheduleSpan.makeCurrent()) { + coord.exec(); + } catch (Exception e) { + queryScheduleSpan.recordException(e); + LOG.warn("Unexpected exception when SQL running", e); + } finally { + queryScheduleSpan.end(); + } + Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal SQL result") + .setParent(Context.current()).startSpan(); + List resultRows = new ArrayList<>(); + try (Scope scope = fetchResultSpan.makeCurrent()) { + while (true) { + batch = coord.getNext(); + if (batch == null || batch.isEos()) { + return resultRows; + } else { + resultRows.addAll(convertResultBatchToResultRows(batch.getBatch())); + } + } + } catch (Exception e) { + LOG.warn("Unexpected exception when SQL running", e); + fetchResultSpan.recordException(e); + return null; + } finally { + fetchResultSpan.end(); } - } catch (Exception e) { - LOG.warn("Unexpected exception when SQL running", e); - fetchResultSpan.recordException(e); - return null; } finally { - fetchResultSpan.end(); + QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index d6d04683bf..dc89cc42fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -53,8 +53,11 @@ public class StatisticConstants { public static final long STATISTICS_RECORDS_CACHE_SIZE = 100000; /** - * If analysys job execution time exceeds this time, it would be cancelled. + * If analysis job execution time exceeds this time, it would be cancelled. */ public static final long STATISTICS_TASKS_TIMEOUT_IN_MS = TimeUnit.MINUTES.toMillis(10); + + public static final int LOAD_TASK_LIMITS = 10; + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java index be34a65622..d27f5893b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCacheLoader.java @@ -43,28 +43,49 @@ public class StatisticsCacheLoader implements AsyncCacheLoader asyncLoad(@NonNull StatisticsCacheKey key, @NonNull Executor executor) { - return CompletableFuture.supplyAsync(() -> { - Map params = new HashMap<>(); - params.put("tblId", String.valueOf(key.tableId)); - params.put("colId", String.valueOf(key.colName)); - List resultBatches = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(QUERY_COLUMN_STATISTICS)); - List columnStatistics = null; - try { - columnStatistics = StatisticsUtil.deserializeToColumnStatistics(resultBatches); - } catch (Exception e) { - LOG.warn("Failed to deserialize column statistics", e); - throw new CompletionException(e); + synchronized (LOCK) { + if (CUR_RUNNING_LOAD > StatisticConstants.LOAD_TASK_LIMITS) { + try { + LOCK.wait(); + } catch (InterruptedException e) { + LOG.warn("Ignore interruption", e); + } } - if (CollectionUtils.isEmpty(columnStatistics)) { - return ColumnStatistic.DEFAULT; - } - return columnStatistics.get(0); - }); + CUR_RUNNING_LOAD++; + return CompletableFuture.supplyAsync(() -> { + try { + Map params = new HashMap<>(); + params.put("tblId", String.valueOf(key.tableId)); + params.put("colId", String.valueOf(key.colName)); + List resultBatches = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(QUERY_COLUMN_STATISTICS)); + List columnStatistics = null; + try { + columnStatistics = StatisticsUtil.deserializeToColumnStatistics(resultBatches); + } catch (Exception e) { + LOG.warn("Failed to deserialize column statistics", e); + throw new CompletionException(e); + } + if (CollectionUtils.isEmpty(columnStatistics)) { + return ColumnStatistic.DEFAULT; + } + return columnStatistics.get(0); + } finally { + synchronized (LOCK) { + CUR_RUNNING_LOAD--; + LOCK.notify(); + } + } + }); + } } }