Add tablet compaction score metrics (#2427)

[Metric] Add tablet compaction score metrics

Backend:
    Add metric "tablet_max_compaction_score" to monitor the current max compaction
    score of tablets on this Backend. This metric will be updated each time
    the compaction thread picking tablets to compact.

Frontend:
    Add metric "tablet_max_compaction_score" for each Backend. These metrics will
    be updated when backends report tablet.
    And also add a calculated metric "max_tablet_compaction_core" to monitor the
    max compaction core of tablets on all Backends.
This commit is contained in:
Mingyu Chen
2019-12-12 17:46:59 +08:00
committed by ZHAO Chun
parent a5f52f80df
commit c39d35df4c
14 changed files with 97 additions and 12 deletions

View File

@ -1287,6 +1287,9 @@ void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) {
return (void*)0;
#endif
}
int64_t max_compaction_score = std::max(DorisMetrics::tablet_cumulative_max_compaction_score.value(),
DorisMetrics::tablet_base_max_compaction_score.value());
request.__set_tablet_max_compaction_score(max_compaction_score);
TMasterResult result;
status = worker_pool_this->_master_client->report(request, &result);

View File

@ -65,7 +65,7 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
_schema(tablet_meta->tablet_schema()),
_data_dir(data_dir),
_is_bad(false),
_last_compaction_failure_time(UnixMillis()) {
_last_compaction_failure_time(0) {
_tablet_path.append(_data_dir->path());
_tablet_path.append(DATA_PREFIX);
_tablet_path.append("/");

View File

@ -759,6 +759,11 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
LOG(INFO) << "find best tablet to do compaction."
<< " type: " << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "cumulative" : "base")
<< ", tablet id: " << best_tablet->tablet_id() << ", score: " << highest_score;
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
DorisMetrics::tablet_cumulative_max_compaction_score.set_value(highest_score);
} else {
DorisMetrics::tablet_base_max_compaction_score.set_value(highest_score);
}
}
return best_tablet;
}

View File

@ -115,6 +115,9 @@ IntGaugeMetricsMap DorisMetrics::disks_avail_capacity;
IntGaugeMetricsMap DorisMetrics::disks_data_used_capacity;
IntGaugeMetricsMap DorisMetrics::disks_state;
IntGauge DorisMetrics::tablet_cumulative_max_compaction_score;
IntGauge DorisMetrics::tablet_base_max_compaction_score;
IntGauge DorisMetrics::push_request_write_bytes_per_second;
IntGauge DorisMetrics::query_scan_bytes_per_second;
IntGauge DorisMetrics::max_disk_io_util_percent;
@ -266,6 +269,9 @@ void DorisMetrics::initialize(
REGISTER_DORIS_METRIC(process_fd_num_limit_soft);
REGISTER_DORIS_METRIC(process_fd_num_limit_hard);
REGISTER_DORIS_METRIC(tablet_cumulative_max_compaction_score);
REGISTER_DORIS_METRIC(tablet_base_max_compaction_score);
// disk usage
for (auto& path : paths) {
IntGauge* gauge = disks_total_capacity.set_key(path);

View File

@ -139,6 +139,12 @@ public:
static IntGaugeMetricsMap disks_data_used_capacity;
static IntGaugeMetricsMap disks_state;
// the max compaction score of all tablets.
// Record base and cumulative scores separately, because
// we need to get the larger of the two.
static IntGauge tablet_cumulative_max_compaction_score;
static IntGauge tablet_base_max_compaction_score;
// The following metrics will be calculated
// by metric calculator
static IntGauge push_request_write_bytes_per_second;

View File

@ -157,6 +157,10 @@ public class ReportHandler extends Daemon {
forceRecovery = request.isForce_recovery();
}
if (request.isSetTablet_max_compaction_score()) {
backend.setTabletMaxCompactionScore(request.getTablet_max_compaction_score());
}
ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion, forceRecovery);
try {
putToQueue(reportTask);

View File

@ -38,6 +38,11 @@ public class DorisMetricRegistry {
return Lists.newArrayList(paloMetrics);
}
// the metrics by metric name
public synchronized List<Metric> getPaloMetricsByName(String name) {
return paloMetrics.stream().filter(m -> m.getName().equals(name)).collect(Collectors.toList());
}
public synchronized void removeMetrics(String name) {
paloMetrics = paloMetrics.stream().filter(m -> !(m.getName().equals(name))).collect(Collectors.toList());
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.metric;
import java.util.List;
import java.util.TimerTask;
/*
@ -65,5 +66,15 @@ public class MetricCalculator extends TimerTask {
lastQueryErrCounter = currentErrCounter;
lastTs = currentTs;
// max tabet compaction score of all backends
long maxCompactionScore = 0;
List<Metric> compactionScoreMetrics = MetricRepo.getMetricsByName(MetricRepo.TABLET_MAX_COMPACTION_SCORE);
for (Metric metric : compactionScoreMetrics) {
if (((GaugeMetric<Long>) metric).getValue() > maxCompactionScore) {
maxCompactionScore = ((GaugeMetric<Long>) metric).getValue();
}
}
MetricRepo.GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(maxCompactionScore);
}
}

View File

@ -38,6 +38,7 @@ import com.codahale.metrics.MetricRegistry;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.Timer;
@ -51,6 +52,9 @@ public final class MetricRepo {
public static AtomicBoolean isInit = new AtomicBoolean(false);
public static final String TABLET_NUM = "tablet_num";
public static final String TABLET_MAX_COMPACTION_SCORE = "tablet_max_compaction_score";
public static LongCounterMetric COUNTER_REQUEST_ALL;
public static LongCounterMetric COUNTER_QUERY_ALL;
public static LongCounterMetric COUNTER_QUERY_ERR;
@ -76,6 +80,7 @@ public final class MetricRepo {
public static GaugeMetricImpl<Double> GAUGE_QUERY_PER_SECOND;
public static GaugeMetricImpl<Double> GAUGE_REQUEST_PER_SECOND;
public static GaugeMetricImpl<Double> GAUGE_QUERY_ERR_RATE;
public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;
private static Timer metricTimer = new Timer();
private static MetricCalculator metricCalculator = new MetricCalculator();
@ -135,7 +140,7 @@ public final class MetricRepo {
}
// capacity
generateTabletNumMetrics();
generateBackendsTabletMetrics();
// connections
GaugeMetric<Integer> conections = (GaugeMetric<Integer>) new GaugeMetric<Integer>(
@ -182,9 +187,13 @@ public final class MetricRepo {
GAUGE_REQUEST_PER_SECOND = new GaugeMetricImpl<>("rps", "request per second");
GAUGE_REQUEST_PER_SECOND.setValue(0.0);
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_REQUEST_PER_SECOND);
GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", "query_error_rate");
GAUGE_QUERY_ERR_RATE = new GaugeMetricImpl<>("query_err_rate", "query error rate");
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_QUERY_ERR_RATE);
GAUGE_QUERY_ERR_RATE.setValue(0.0);
GAUGE_MAX_TABLET_COMPACTION_SCORE = new GaugeMetricImpl<>("max_tablet_compaction_score",
"max tablet compaction score of all backends");
PALO_METRIC_REGISTER.addPaloMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE);
GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L);
// 2. counter
COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", "total request");
@ -238,14 +247,14 @@ public final class MetricRepo {
}
}
// to generate the metrics related to tablets of each backends
// this metric is reentrant, so that we can add or remove metric along with the backend add or remove
// at runtime.
public static void generateTabletNumMetrics() {
final String TABLET_NUM = "tablet_num";
// remove all previous 'tablet_num' metric
public static void generateBackendsTabletMetrics() {
// remove all previous 'tablet' metric
PALO_METRIC_REGISTER.removeMetrics(TABLET_NUM);
PALO_METRIC_REGISTER.removeMetrics(TABLET_MAX_COMPACTION_SCORE);
LOG.info("begin to generate capacity metrics");
SystemInfoService infoService = Catalog.getCurrentSystemInfo();
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
@ -266,9 +275,24 @@ public final class MetricRepo {
return (long) invertedIndex.getTabletNumByBackendId(beId);
}
};
tabletNum.addLabel(new MetricLabel("backend", be.getHost() + ":" + be.getHeartbeatPort()));
PALO_METRIC_REGISTER.addPaloMetrics(tabletNum);
// max compaction score of tablets on each backends
GaugeMetric<Long> tabletMaxCompactionScore = (GaugeMetric<Long>) new GaugeMetric<Long>(
TABLET_MAX_COMPACTION_SCORE,
"tablet max compaction score") {
@Override
public Long getValue() {
if (!Catalog.getInstance().isMaster()) {
return 0L;
}
return be.getTabletMaxCompactionScore();
}
};
tabletMaxCompactionScore.addLabel(new MetricLabel("backend", be.getHost() + ":" + be.getHeartbeatPort()));
PALO_METRIC_REGISTER.addPaloMetrics(tabletMaxCompactionScore);
} // end for backends
}
@ -299,6 +323,10 @@ public final class MetricRepo {
return sb.toString();
}
public static synchronized List<Metric> getMetricsByName(String name) {
return PALO_METRIC_REGISTER.getPaloMetricsByName(name);
}
public static void addMetric(Metric<?> metric) {
init();
PALO_METRIC_REGISTER.addPaloMetrics(metric);

View File

@ -54,6 +54,8 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {
public static final String REQUEST_PER_SECOND = "rps";
public static final String QUERY_ERR_RATE = "query_err_rate";
public static final String MAX_TABLET_COMPACTION_SCORE = "max_tablet_compaction_score";
private static final Map<String, String> CORE_METRICS = Maps.newHashMap();
static {
CORE_METRICS.put(MAX_JOURMAL_ID, TYPE_LONG);
@ -62,6 +64,7 @@ public class SimpleCoreMetricVisitor extends MetricVisitor {
CORE_METRICS.put(QUERY_PER_SECOND, TYPE_DOUBLE);
CORE_METRICS.put(REQUEST_PER_SECOND, TYPE_DOUBLE);
CORE_METRICS.put(QUERY_ERR_RATE, TYPE_DOUBLE);
CORE_METRICS.put(MAX_TABLET_COMPACTION_SCORE, TYPE_LONG);
}
public SimpleCoreMetricVisitor(String prefix) {

View File

@ -88,7 +88,10 @@ public class Backend implements Writable {
// after init it, this variable is set to true.
private boolean initPathInfo = false;
long lastMissingHeartbeatTime = -1;
private long lastMissingHeartbeatTime = -1;
// the max tablet compaction score of this backend.
// this field is set by tablet report, and just for metric monitor, no need to persist.
private AtomicLong tabletMaxCompactionScore = new AtomicLong(0);
public Backend() {
this.host = "";
@ -615,5 +618,13 @@ public class Backend implements Writable {
return isChanged;
}
public void setTabletMaxCompactionScore(long compactionScore) {
tabletMaxCompactionScore.set(compactionScore);
}
public long getTabletMaxCompactionScore() {
return tabletMaxCompactionScore.get();
}
}

View File

@ -172,7 +172,7 @@ public class SystemInfoService {
LOG.info("finished to add {} ", newBackend);
// backends is changed, regenerated tablet number metrics
MetricRepo.generateTabletNumMetrics();
MetricRepo.generateBackendsTabletMetrics();
}
public void dropBackends(List<Pair<String, Integer>> hostPortPairs) throws DdlException {
@ -230,7 +230,7 @@ public class SystemInfoService {
LOG.info("finished to drop {}", droppedBackend);
// backends is changed, regenerated tablet number metrics
MetricRepo.generateTabletNumMetrics();
MetricRepo.generateBackendsTabletMetrics();
}
// only for test

View File

@ -75,7 +75,7 @@ public class BackendTest {
backend.updateOnce(bePort, httpPort, beRpcPort);
PowerMock.mockStatic(MetricRepo.class);
MetricRepo.generateTabletNumMetrics();
MetricRepo.generateBackendsTabletMetrics();
EasyMock.expectLastCall().anyTimes();
PowerMock.replay(MetricRepo.class);
}

View File

@ -80,6 +80,9 @@ struct TReportRequest {
5: optional map<string, TDisk> disks // string root_path
6: optional bool force_recovery
7: optional list<TTablet> tablet_list
// the max compaction score of all tablets on a backend,
// this field should be set along with tablet report
8: optional i64 tablet_max_compaction_score
}
struct TMasterResult {