diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 8cf5fdd204..ee28132480 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -744,5 +744,19 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static boolean disable_balance = false; + + // This threshold is to avoid piling up too many report task in FE, which may cause OOM exception. + // In some large Doris cluster, eg: 100 Backends with ten million replicas, a tablet report may cost + // several seconds after some modification of metadata(drop partition, etc..). + // And one Backend will report tablets info every 1 min, so unlimited receiving reports is unacceptable. + // TODO(cmy): we will optimize the processing speed of tablet report in future, but now, just discard + // the report if queue size exceeding limit. + // Some online time cost: + // 1. disk report: 0-1 ms + // 2. task report: 0-1 ms + // 3. tablet report + // 10000 replicas: 200ms + @ConfField(mutable = true, masterOnly = true) + public static int report_queue_size = 100; } diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index 548cfabd57..2534fe82d1 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -32,6 +32,8 @@ import org.apache.doris.clone.CloneChecker; import org.apache.doris.common.Config; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.util.Daemon; +import org.apache.doris.metric.GaugeMetric; +import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.BackendTabletsInfo; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.system.Backend; @@ -84,7 +86,17 @@ public class ReportHandler extends Daemon { private BlockingQueue reportQueue = Queues.newLinkedBlockingQueue(); + private GaugeMetric gaugeQueueSize; + public ReportHandler() { + gaugeQueueSize = (GaugeMetric) new GaugeMetric( + "report_queue_size", "report queue size") { + @Override + public Long getValue() { + return (long) reportQueue.size(); + } + }; + MetricRepo.addMetric(gaugeQueueSize); } public TMasterResult handleReport(TReportRequest request) throws TException { @@ -140,8 +152,8 @@ public class ReportHandler extends Daemon { ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion, forceRecovery); try { - reportQueue.put(reportTask); - } catch (InterruptedException e) { + putToQueue(reportTask); + } catch (Exception e) { tStatus.setStatus_code(TStatusCode.INTERNAL_ERROR); List errorMsgs = Lists.newArrayList(); errorMsgs.add("failed to put report task to queue. queue size: " + reportQueue.size()); @@ -155,6 +167,16 @@ public class ReportHandler extends Daemon { return result; } + private void putToQueue(ReportTask reportTask) throws Exception { + int currentSize = reportQueue.size(); + if (currentSize > Config.report_queue_size) { + LOG.warn("the report queue size exceeds the limit: {}. current: {}", Config.report_queue_size, currentSize); + throw new Exception( + "the report queue size exceeds the limit: " + Config.report_queue_size + ". current: " + currentSize); + } + reportQueue.put(reportTask); + } + private Map buildTabletMap(List tabletList) { Map tabletMap = Maps.newHashMap(); for (TTablet tTablet : tabletList) { @@ -271,7 +293,7 @@ public class ReportHandler extends Daemon { // handleForceCreateReplica(createReplicaTasks, backendId, forceRecovery); long end = System.currentTimeMillis(); - LOG.info("tablet report from backend[{}] cost: {}", backendId, (end - start)); + LOG.info("tablet report from backend[{}] cost: {} ms", backendId, (end - start)); } private static void taskReport(long backendId, Map> runningTasks) { diff --git a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java index 21c38ac3cf..1e2570e050 100644 --- a/fe/src/main/java/org/apache/doris/metric/MetricRepo.java +++ b/fe/src/main/java/org/apache/doris/metric/MetricRepo.java @@ -312,5 +312,10 @@ public final class MetricRepo { return sb.toString(); } + + public static void addMetric(Metric metric) { + init(); + PALO_METRIC_REGISTER.addPaloMetrics(metric); + } }