From e72a012ada00f8cccd31a210098b0dd3450f7dd6 Mon Sep 17 00:00:00 2001 From: AKIRA <33112463+Kikyou1997@users.noreply.github.com> Date: Mon, 31 Jul 2023 17:33:20 +0800 Subject: [PATCH] [enhancement](stats) Retry when loading stats (#21849) --- .../doris/common/ThreadPoolManager.java | 11 +++- .../qe/InternalQueryExecutionException.java | 24 ++++++++ .../org/apache/doris/qe/StmtExecutor.java | 2 +- .../ColumnStatisticsCacheLoader.java | 61 +++++++++++++++++-- .../doris/statistics/StatisticConstants.java | 6 ++ 5 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/qe/InternalQueryExecutionException.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java index be8731b6b2..c4fa964659 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java @@ -17,6 +17,7 @@ package org.apache.doris.common; + import org.apache.doris.metric.Metric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricLabel; @@ -45,7 +46,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; - /** * ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource. * thread names in thread pool are formatted as poolName-ID, where ID is a unique, sequentially assigned integer. @@ -134,6 +134,15 @@ public class ThreadPoolManager { poolName, needRegisterMetric); } + public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, int queueSize, + String poolName, + boolean needRegisterMetric, + RejectedExecutionHandler handler) { + return newDaemonThreadPool(numThread, numThread, KEEP_ALIVE_TIME, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueSize), handler, + poolName, needRegisterMetric); + } + public static ThreadPoolExecutor newDaemonFixedPriorityThreadPool(int numThread, int initQueueSize, Comparator comparator, Class tClass, String poolName, boolean needRegisterMetric) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InternalQueryExecutionException.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InternalQueryExecutionException.java new file mode 100644 index 0000000000..c368533c53 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InternalQueryExecutionException.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +public class InternalQueryExecutionException extends RuntimeException { + public InternalQueryExecutionException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 4d5d5e8851..592f261db7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2505,7 +2505,7 @@ public class StmtExecutor { coord.exec(); } catch (Exception e) { queryScheduleSpan.recordException(e); - throw new RuntimeException("Failed to execute internal SQL. " + Util.getRootCauseMessage(e), e); + throw new InternalQueryExecutionException(e.getMessage() + Util.getRootCauseMessage(e), e); } finally { queryScheduleSpan.end(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java index d94a90b75f..a44ba7867c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatisticsCacheLoader.java @@ -19,6 +19,8 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.qe.InternalQueryExecutionException; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; @@ -27,16 +29,23 @@ import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Optional; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader> { private static final Logger LOG = LogManager.getLogger(ColumnStatisticsCacheLoader.class); + private static final ThreadPoolExecutor singleThreadPool = ThreadPoolManager.newDaemonFixedThreadPool( + StatisticConstants.RETRY_LOAD_THREAD_POOL_SIZE, + StatisticConstants.RETRY_LOAD_QUEUE_SIZE, "STATS_RELOAD", + true, + new DiscardOldestPolicy()); + @Override protected Optional doLoad(StatisticsCacheKey key) { // Load from statistics table. - Optional columnStatistic = loadFromStatsTable(key.tableId, - key.idxId, key.colName); + Optional columnStatistic = loadFromStatsTable(key); if (columnStatistic.isPresent()) { return columnStatistic; } @@ -52,8 +61,14 @@ public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader loadFromStatsTable(long tableId, long idxId, String colName) { - List columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName); + private Optional loadFromStatsTable(StatisticsCacheKey key) { + List columnResults = null; + try { + columnResults = StatisticsRepository.loadColStats(key.tableId, key.idxId, key.colName); + } catch (InternalQueryExecutionException e) { + retryLoad(key); + return Optional.empty(); + } ColumnStatistic columnStatistics; try { columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResults); @@ -67,4 +82,42 @@ public class ColumnStatisticsCacheLoader extends StatisticsCacheLoader columnResults = null; + try { + columnResults = StatisticsRepository.loadColStats(key.tableId, key.idxId, key.colName); + } catch (InternalQueryExecutionException e) { + if (this.retryTimes < StatisticConstants.LOAD_RETRY_TIMES) { + retryTimes++; + singleThreadPool.submit(this); + } + return; + } + ColumnStatistic columnStatistics; + try { + columnStatistics = StatisticsUtil.deserializeToColumnStatistics(columnResults); + } catch (Exception e) { + LOG.warn("Exception to deserialize column statistics", e); + return; + } + if (columnStatistics != null) { + Env.getCurrentEnv().getStatisticsCache().putCache(key, columnStatistics); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index 072738e19b..1612d618cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -79,6 +79,12 @@ public class StatisticConstants { public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3; + public static final int RETRY_LOAD_QUEUE_SIZE = 1000; + + public static final int RETRY_LOAD_THREAD_POOL_SIZE = 1; + + public static final int LOAD_RETRY_TIMES = 3; + static { STATISTICS_DB_BLACK_LIST.add(SystemInfoService.DEFAULT_CLUSTER + ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);