Add report queue size limit to avoid too many report requests (#846)
This commit is contained in:
@ -744,5 +744,19 @@ public class Config extends ConfigBase {
|
||||
*/
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static boolean disable_balance = false;
|
||||
|
||||
// This threshold is to avoid piling up too many report task in FE, which may cause OOM exception.
|
||||
// In some large Doris cluster, eg: 100 Backends with ten million replicas, a tablet report may cost
|
||||
// several seconds after some modification of metadata(drop partition, etc..).
|
||||
// And one Backend will report tablets info every 1 min, so unlimited receiving reports is unacceptable.
|
||||
// TODO(cmy): we will optimize the processing speed of tablet report in future, but now, just discard
|
||||
// the report if queue size exceeding limit.
|
||||
// Some online time cost:
|
||||
// 1. disk report: 0-1 ms
|
||||
// 2. task report: 0-1 ms
|
||||
// 3. tablet report
|
||||
// 10000 replicas: 200ms
|
||||
@ConfField(mutable = true, masterOnly = true)
|
||||
public static int report_queue_size = 100;
|
||||
}
|
||||
|
||||
|
||||
@ -32,6 +32,8 @@ import org.apache.doris.clone.CloneChecker;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.metric.GaugeMetric;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.persist.BackendTabletsInfo;
|
||||
import org.apache.doris.persist.ReplicaPersistInfo;
|
||||
import org.apache.doris.system.Backend;
|
||||
@ -84,7 +86,17 @@ public class ReportHandler extends Daemon {
|
||||
|
||||
private BlockingQueue<ReportTask> reportQueue = Queues.newLinkedBlockingQueue();
|
||||
|
||||
private GaugeMetric<Long> gaugeQueueSize;
|
||||
|
||||
public ReportHandler() {
|
||||
gaugeQueueSize = (GaugeMetric<Long>) new GaugeMetric<Long>(
|
||||
"report_queue_size", "report queue size") {
|
||||
@Override
|
||||
public Long getValue() {
|
||||
return (long) reportQueue.size();
|
||||
}
|
||||
};
|
||||
MetricRepo.addMetric(gaugeQueueSize);
|
||||
}
|
||||
|
||||
public TMasterResult handleReport(TReportRequest request) throws TException {
|
||||
@ -140,8 +152,8 @@ public class ReportHandler extends Daemon {
|
||||
|
||||
ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion, forceRecovery);
|
||||
try {
|
||||
reportQueue.put(reportTask);
|
||||
} catch (InterruptedException e) {
|
||||
putToQueue(reportTask);
|
||||
} catch (Exception e) {
|
||||
tStatus.setStatus_code(TStatusCode.INTERNAL_ERROR);
|
||||
List<String> errorMsgs = Lists.newArrayList();
|
||||
errorMsgs.add("failed to put report task to queue. queue size: " + reportQueue.size());
|
||||
@ -155,6 +167,16 @@ public class ReportHandler extends Daemon {
|
||||
return result;
|
||||
}
|
||||
|
||||
private void putToQueue(ReportTask reportTask) throws Exception {
|
||||
int currentSize = reportQueue.size();
|
||||
if (currentSize > Config.report_queue_size) {
|
||||
LOG.warn("the report queue size exceeds the limit: {}. current: {}", Config.report_queue_size, currentSize);
|
||||
throw new Exception(
|
||||
"the report queue size exceeds the limit: " + Config.report_queue_size + ". current: " + currentSize);
|
||||
}
|
||||
reportQueue.put(reportTask);
|
||||
}
|
||||
|
||||
private Map<Long, TTablet> buildTabletMap(List<TTablet> tabletList) {
|
||||
Map<Long, TTablet> tabletMap = Maps.newHashMap();
|
||||
for (TTablet tTablet : tabletList) {
|
||||
@ -271,7 +293,7 @@ public class ReportHandler extends Daemon {
|
||||
// handleForceCreateReplica(createReplicaTasks, backendId, forceRecovery);
|
||||
|
||||
long end = System.currentTimeMillis();
|
||||
LOG.info("tablet report from backend[{}] cost: {}", backendId, (end - start));
|
||||
LOG.info("tablet report from backend[{}] cost: {} ms", backendId, (end - start));
|
||||
}
|
||||
|
||||
private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
|
||||
|
||||
@ -312,5 +312,10 @@ public final class MetricRepo {
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public static void addMetric(Metric<?> metric) {
|
||||
init();
|
||||
PALO_METRIC_REGISTER.addPaloMetrics(metric);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user