[Enhancement](metrics) add more metrics (#11693)

* Add `AutoMappedMetric` to measure dynamic object.
* Add query instance and rpc metrics
* Add thrift rpc metrics
* Add txn metrics
* Reorganize metrics init routine.

Co-authored-by: 迟成 <chicheng@meituan.com>
This commit is contained in:
ccoffline
2022-10-26 08:31:03 +08:00
committed by GitHub
parent 458cb8f072
commit 9691db7918
15 changed files with 303 additions and 70 deletions

View File

@ -17,7 +17,7 @@
package org.apache.doris.common;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
@ -37,6 +37,8 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
/**
* ThreadPoolManager is a helper class for construct daemon thread pool with limit thread and memory resource.
@ -75,29 +77,35 @@ public class ThreadPoolManager {
}
public static void registerThreadPoolMetric(String poolName, ThreadPoolExecutor threadPool) {
for (String poolMetricType : poolMetricTypes) {
GaugeMetric<Integer> gauge = new GaugeMetric<Integer>(
"thread_pool", MetricUnit.NOUNIT, "thread_pool statistics") {
@Override
public Integer getValue() {
String metricType = this.getLabels().get(1).getValue();
switch (metricType) {
case "pool_size":
return threadPool.getPoolSize();
case "active_thread_num":
return threadPool.getActiveCount();
case "task_in_queue":
return threadPool.getQueue().size();
default:
return 0;
}
}
};
gauge.addLabel(new MetricLabel("name", poolName)).addLabel(new MetricLabel("type", poolMetricType));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
Metric.MetricType gauge = Metric.MetricType.GAUGE;
Metric.MetricType counter = Metric.MetricType.COUNTER;
MetricUnit nounit = MetricUnit.NOUNIT;
registerMetric(poolName, "pool_size", gauge, nounit, threadPool::getPoolSize);
registerMetric(poolName, "active_thread_num", gauge, nounit, threadPool::getActiveCount);
registerMetric(poolName, "active_thread_pct", gauge, MetricUnit.PERCENT,
() -> 1.0 * threadPool.getActiveCount() / threadPool.getMaximumPoolSize());
registerMetric(poolName, "task_in_queue", gauge, nounit, () -> threadPool.getQueue().size());
registerMetric(poolName, "task_count", counter, nounit, threadPool::getTaskCount);
registerMetric(poolName, "completed_task_count", counter, nounit, threadPool::getCompletedTaskCount);
RejectedExecutionHandler rejectedHandler = threadPool.getRejectedExecutionHandler();
if (rejectedHandler instanceof LogDiscardPolicy) {
registerMetric(poolName, "task_rejected", counter, nounit,
((LogDiscardPolicy) rejectedHandler).rejectedNum::get);
}
}
private static <T> void registerMetric(String poolName, String metricName,
Metric.MetricType type, MetricUnit unit, Supplier<T> supplier) {
Metric<T> gauge = new Metric<T>("thread_pool", type, unit, "thread_pool statistics") {
@Override
public T getValue() {
return supplier.get();
}
};
gauge.addLabel(new MetricLabel("name", poolName)).addLabel(new MetricLabel("type", metricName));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
}
public static ThreadPoolExecutor newDaemonCacheThreadPool(int maxNumThread,
String poolName, boolean needRegisterMetric) {
return newDaemonThreadPool(0, maxNumThread, KEEP_ALIVE_TIME,
@ -165,14 +173,17 @@ public class ThreadPoolManager {
private static final Logger LOG = LogManager.getLogger(LogDiscardPolicy.class);
private String threadPoolName;
private AtomicLong rejectedNum;
public LogDiscardPolicy(String threadPoolName) {
this.threadPoolName = threadPoolName;
this.rejectedNum = new AtomicLong(0);
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOG.warn("Task " + r.toString() + " rejected from " + threadPoolName + " " + executor.toString());
this.rejectedNum.incrementAndGet();
}
}

View File

@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.metric;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
public class AutoMappedMetric<M> {
private final Map<String, M> nameToMetric = new ConcurrentHashMap<>();
private final Function<String, M> metricSupplier;
public AutoMappedMetric(Function<String, M> metricSupplier) {
this.metricSupplier = metricSupplier;
}
public M getOrAdd(String name) {
return nameToMetric.computeIfAbsent(name, metricSupplier);
}
}

View File

@ -51,7 +51,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
public final class MetricRepo {
private static final Logger LOG = LogManager.getLogger(MetricRepo.class);
@ -71,6 +71,13 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_QUERY_ERR;
public static LongCounterMetric COUNTER_QUERY_TABLE;
public static LongCounterMetric COUNTER_QUERY_OLAP_TABLE;
public static Histogram HISTO_QUERY_LATENCY;
public static AutoMappedMetric<Histogram> DB_HISTO_QUERY_LATENCY;
public static AutoMappedMetric<GaugeMetricImpl<Long>> USER_GAUGE_QUERY_INSTANCE_NUM;
public static AutoMappedMetric<LongCounterMetric> USER_COUNTER_QUERY_INSTANCE_BEGIN;
public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_ALL;
public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_FAILED;
public static AutoMappedMetric<LongCounterMetric> BE_COUNTER_QUERY_RPC_SIZE;
public static LongCounterMetric COUNTER_CACHE_ADDED_SQL;
public static LongCounterMetric COUNTER_CACHE_ADDED_PARTITION;
@ -80,27 +87,33 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES;
public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS;
public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED;
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
public static LongCounterMetric COUNTER_IMAGE_WRITE_SUCCESS;
public static LongCounterMetric COUNTER_IMAGE_WRITE_FAILED;
public static LongCounterMetric COUNTER_IMAGE_PUSH_SUCCESS;
public static LongCounterMetric COUNTER_IMAGE_PUSH_FAILED;
public static LongCounterMetric COUNTER_IMAGE_CLEAN_SUCCESS;
public static LongCounterMetric COUNTER_IMAGE_CLEAN_FAILED;
public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_SUCCESS;
public static LongCounterMetric COUNTER_EDIT_LOG_CLEAN_FAILED;
public static LongCounterMetric COUNTER_TXN_REJECT;
public static LongCounterMetric COUNTER_TXN_BEGIN;
public static LongCounterMetric COUNTER_TXN_FAILED;
public static LongCounterMetric COUNTER_TXN_SUCCESS;
public static Histogram HISTO_TXN_EXEC_LATENCY;
public static Histogram HISTO_TXN_PUBLISH_LATENCY;
public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_NUM;
public static AutoMappedMetric<GaugeMetricImpl<Long>> DB_GAUGE_TXN_REPLICA_NUM;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;
public static Histogram HISTO_QUERY_LATENCY;
public static Histogram HISTO_EDIT_LOG_WRITE_LATENCY;
public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_LATENCY;
// following metrics will be updated by metric calculator
public static GaugeMetricImpl<Double> GAUGE_QUERY_PER_SECOND;
@ -118,7 +131,6 @@ public final class MetricRepo {
return;
}
// 1. gauge
// load jobs
LoadManager loadManger = Env.getCurrentEnv().getLoadManager();
for (EtlJobType jobType : EtlJobType.values()) {
@ -228,21 +240,6 @@ public final class MetricRepo {
};
DORIS_METRIC_REGISTER.addMetrics(scheduledTabletNum);
GaugeMetric<Long> maxInstanceNum = new GaugeMetric<Long>("max_instances_num_per_user",
MetricUnit.NOUNIT, "max instances num of all current users") {
@Override
public Long getValue() {
try {
return ((QeProcessorImpl) QeProcessorImpl.INSTANCE).getInstancesNumPerUser().values().stream()
.reduce(-1, BinaryOperator.maxBy(Integer::compareTo)).longValue();
} catch (Throwable ex) {
LOG.warn("Get max_instances_num_per_user error", ex);
return -2L;
}
}
};
DORIS_METRIC_REGISTER.addMetrics(maxInstanceNum);
// txn status
for (TransactionStatus status : TransactionStatus.values()) {
GaugeMetric<Long> gauge = new GaugeMetric<Long>("txn_status", MetricUnit.NOUNIT, "txn statistics") {
@ -274,19 +271,53 @@ public final class MetricRepo {
DORIS_METRIC_REGISTER.addMetrics(GAUGE_MAX_TABLET_COMPACTION_SCORE);
GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L);
// 2. counter
// query
COUNTER_REQUEST_ALL = new LongCounterMetric("request_total", MetricUnit.REQUESTS, "total request");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_REQUEST_ALL);
COUNTER_QUERY_ALL = new LongCounterMetric("query_total", MetricUnit.REQUESTS, "total query");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_ALL);
COUNTER_QUERY_ERR = new LongCounterMetric("query_err", MetricUnit.REQUESTS, "total error query");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_ERR);
COUNTER_QUERY_TABLE = new LongCounterMetric("query_table", MetricUnit.REQUESTS, "total query from table");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_TABLE);
COUNTER_QUERY_OLAP_TABLE = new LongCounterMetric("query_olap_table", MetricUnit.REQUESTS,
"total query from olap table");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_OLAP_TABLE);
HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("query", "latency", "ms"));
DB_HISTO_QUERY_LATENCY = new AutoMappedMetric<>(name -> {
String metricName = MetricRegistry.name("query", "latency", "ms", "db=" + name);
return METRIC_REGISTER.histogram(metricName);
});
USER_COUNTER_QUERY_INSTANCE_BEGIN = addLabeledMetrics("user", () ->
new LongCounterMetric("query_instance_begin", MetricUnit.NOUNIT,
"number of query instance begin"));
USER_GAUGE_QUERY_INSTANCE_NUM = addLabeledMetrics("user", () ->
new GaugeMetricImpl<>("query_instance_num", MetricUnit.NOUNIT,
"number of running query instances of current user"));
GaugeMetric<Long> queryInstanceNum = new GaugeMetric<Long>("query_instance_num",
MetricUnit.NOUNIT, "number of query instances of all current users") {
@Override
public Long getValue() {
QeProcessorImpl qe = ((QeProcessorImpl) QeProcessorImpl.INSTANCE);
long totalInstanceNum = 0;
for (Map.Entry<String, Integer> e : qe.getInstancesNumPerUser().entrySet()) {
long value = e.getValue() == null ? 0L : e.getValue().longValue();
totalInstanceNum += value;
USER_GAUGE_QUERY_INSTANCE_NUM.getOrAdd(e.getKey()).setValue(value);
}
return totalInstanceNum;
}
};
DORIS_METRIC_REGISTER.addMetrics(queryInstanceNum);
BE_COUNTER_QUERY_RPC_ALL = addLabeledMetrics("be", () ->
new LongCounterMetric("query_rpc_total", MetricUnit.NOUNIT, ""));
BE_COUNTER_QUERY_RPC_FAILED = addLabeledMetrics("be", () ->
new LongCounterMetric("query_rpc_failed", MetricUnit.NOUNIT, ""));
BE_COUNTER_QUERY_RPC_SIZE = addLabeledMetrics("be", () ->
new LongCounterMetric("query_rpc_size", MetricUnit.BYTES, ""));
// cache
COUNTER_CACHE_ADDED_SQL = new LongCounterMetric("cache_added", MetricUnit.REQUESTS,
"Number of SQL mode cache added");
COUNTER_CACHE_ADDED_SQL.addLabel(new MetricLabel("type", "sql"));
@ -304,6 +335,7 @@ public final class MetricRepo {
COUNTER_CACHE_HIT_PARTITION.addLabel(new MetricLabel("type", "partition"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_CACHE_HIT_PARTITION);
// edit log
COUNTER_EDIT_LOG_WRITE = new LongCounterMetric("edit_log", MetricUnit.OPERATIONS,
"counter of edit log write into bdbje");
COUNTER_EDIT_LOG_WRITE.addLabel(new MetricLabel("type", "write"));
@ -315,6 +347,18 @@ public final class MetricRepo {
COUNTER_EDIT_LOG_SIZE_BYTES = new LongCounterMetric("edit_log", MetricUnit.BYTES, "size of edit log");
COUNTER_EDIT_LOG_SIZE_BYTES.addLabel(new MetricLabel("type", "bytes"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_SIZE_BYTES);
HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("editlog", "write", "latency", "ms"));
// edit log clean
COUNTER_EDIT_LOG_CLEAN_SUCCESS = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
"counter of edit log succeed in cleaning");
COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type", "success"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS);
COUNTER_EDIT_LOG_CLEAN_FAILED = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
"counter of edit log failed to clean");
COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED);
// image generate
COUNTER_IMAGE_WRITE_SUCCESS = new LongCounterMetric("image_write", MetricUnit.OPERATIONS,
@ -345,16 +389,7 @@ public final class MetricRepo {
COUNTER_IMAGE_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_IMAGE_CLEAN_FAILED);
// edit log clean
COUNTER_EDIT_LOG_CLEAN_SUCCESS = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
"counter of edit log succeed in cleaning");
COUNTER_EDIT_LOG_CLEAN_SUCCESS.addLabel(new MetricLabel("type", "success"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_SUCCESS);
COUNTER_EDIT_LOG_CLEAN_FAILED = new LongCounterMetric("edit_log_clean", MetricUnit.OPERATIONS,
"counter of edit log failed to clean");
COUNTER_EDIT_LOG_CLEAN_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_EDIT_LOG_CLEAN_FAILED);
// txn
COUNTER_TXN_REJECT = new LongCounterMetric("txn_counter", MetricUnit.REQUESTS,
"counter of rejected transactions");
COUNTER_TXN_REJECT.addLabel(new MetricLabel("type", "reject"));
@ -371,6 +406,30 @@ public final class MetricRepo {
"counter of failed transactions");
COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED);
HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("txn", "exec", "latency", "ms"));
HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("txn", "publish", "latency", "ms"));
GaugeMetric<Long> txnNum = new GaugeMetric<Long>("txn_num", MetricUnit.NOUNIT,
"number of running transactions") {
@Override
public Long getValue() {
return Env.getCurrentGlobalTransactionMgr().getAllRunningTxnNum();
}
};
DORIS_METRIC_REGISTER.addMetrics(txnNum);
DB_GAUGE_TXN_NUM = addLabeledMetrics("db", () ->
new GaugeMetricImpl<>("txn_num", MetricUnit.NOUNIT, "number of running transactions"));
GaugeMetric<Long> txnReplicaNum = new GaugeMetric<Long>("txn_replica_num", MetricUnit.NOUNIT,
"number of writing tablets in all running transactions") {
@Override
public Long getValue() {
return Env.getCurrentGlobalTransactionMgr().getAllRunningTxnReplicaNum();
}
};
DORIS_METRIC_REGISTER.addMetrics(txnReplicaNum);
DB_GAUGE_TXN_REPLICA_NUM = addLabeledMetrics("db", () -> new GaugeMetricImpl<>("txn_replica_num",
MetricUnit.NOUNIT, "number of writing tablets in all running transactions"));
COUNTER_ROUTINE_LOAD_ROWS = new LongCounterMetric("routine_load_rows", MetricUnit.ROWS,
"total rows of routine load");
@ -385,11 +444,11 @@ public final class MetricRepo {
COUNTER_HIT_SQL_BLOCK_RULE = new LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
"total hit sql block rule query");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_HIT_SQL_BLOCK_RULE);
// 3. histogram
HISTO_QUERY_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("query", "latency", "ms"));
HISTO_EDIT_LOG_WRITE_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("editlog", "write", "latency", "ms"));
THRIFT_COUNTER_RPC_ALL = addLabeledMetrics("method", () ->
new LongCounterMetric("thrift_rpc_total", MetricUnit.NOUNIT, ""));
THRIFT_COUNTER_RPC_LATENCY = addLabeledMetrics("method", () ->
new LongCounterMetric("thrift_rpc_latency_ms", MetricUnit.MILLISECONDS, ""));
// init system metrics
initSystemMetrics();
@ -587,6 +646,15 @@ public final class MetricRepo {
return sb.toString();
}
public static <M extends Metric<?>> AutoMappedMetric<M> addLabeledMetrics(String label, Supplier<M> metric) {
return new AutoMappedMetric<>(value -> {
M m = metric.get();
m.addLabel(new MetricLabel(label, value));
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(m);
return m;
});
}
// update some metrics to make a ready to be visited
private static void updateMetrics() {
SYSTEM_METRICS.update();

View File

@ -62,6 +62,7 @@ import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TTabletLocation;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
@ -346,6 +347,7 @@ public class OlapTableSink extends DataSink {
TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam();
// BE id -> path hash
Multimap<Long, Long> allBePathsMap = HashMultimap.create();
int replicaNum = 0;
for (Long partitionId : partitionIds) {
Partition partition = table.getPartition(partitionId);
int quorum = table.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2 + 1;
@ -375,6 +377,7 @@ public class OlapTableSink extends DataSink {
Lists.newArrayList(bePathsMap.keySet())));
}
allBePathsMap.putAll(bePathsMap);
replicaNum += bePathsMap.size();
}
}
}
@ -385,6 +388,14 @@ public class OlapTableSink extends DataSink {
if (!st.ok()) {
throw new DdlException(st.getErrorMsg());
}
long dbId = tDataSink.getOlapTableSink().getDbId();
long txnId = tDataSink.getOlapTableSink().getTxnId();
try {
DatabaseTransactionMgr mgr = Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
mgr.registerTxnReplicas(txnId, replicaNum);
} catch (Exception e) {
LOG.error("register txn replica failed, txnId={}, dbId={}", txnId, dbId);
}
return Arrays.asList(locationParam, slaveLocationParam);
}

View File

@ -192,6 +192,7 @@ public class ConnectProcessor {
} else if (ctx.getState().getStateType() == MysqlStateType.OK) {
// ok query
MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
MetricRepo.DB_HISTO_QUERY_LATENCY.getOrAdd(ctx.getDatabase()).update(elapseMs);
if (elapseMs > Config.qe_slow_log_ms) {
String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest());
ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);

View File

@ -38,6 +38,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
@ -724,8 +725,10 @@ public class Coordinator {
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(pair.first.brpcAddr.hostname).increase(1L);
throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
case THRIFT_RPC_ERROR:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(pair.first.brpcAddr.hostname).increase(1L);
SimpleScheduler.addToBlacklist(pair.first.beId, errMsg);
throw new RpcException(pair.first.brpcAddr.hostname, errMsg, exception);
default:

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ProfileWriter;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
@ -113,6 +114,7 @@ public final class QeProcessorImpl implements QeProcessor {
}
queryToInstancesNum.put(queryId, instancesNum);
userToInstancesCount.computeIfAbsent(user, ignored -> new AtomicInteger(0)).addAndGet(instancesNum);
MetricRepo.USER_COUNTER_QUERY_INSTANCE_BEGIN.getOrAdd(user).increase(instancesNum.longValue());
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.rpc;
import org.apache.doris.common.Config;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
import org.apache.doris.proto.Types;
@ -122,6 +123,8 @@ public class BackendServiceProxy {
builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_2);
final InternalService.PExecPlanFragmentRequest pRequest = builder.build();
MetricRepo.BE_COUNTER_QUERY_RPC_ALL.getOrAdd(address.hostname).increase(1L);
MetricRepo.BE_COUNTER_QUERY_RPC_SIZE.getOrAdd(address.hostname).increase((long) pRequest.getSerializedSize());
try {
final BackendServiceClient client = getProxy(address);
if (twoPhaseExecution) {

View File

@ -18,6 +18,7 @@
package org.apache.doris.service;
import org.apache.doris.common.ThriftServer;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.thrift.FrontendService;
import org.apache.logging.log4j.LogManager;
@ -25,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TProcessor;
import java.io.IOException;
import java.lang.reflect.Proxy;
/**
* Doris frontend thrift server
@ -40,9 +42,21 @@ public class FeServer {
}
public void start() throws IOException {
FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
FrontendService.Iface instance = (FrontendService.Iface) Proxy.newProxyInstance(
FrontendServiceImpl.class.getClassLoader(),
FrontendServiceImpl.class.getInterfaces(),
(proxy, method, args) -> {
long begin = System.currentTimeMillis();
String name = method.getName();
MetricRepo.THRIFT_COUNTER_RPC_ALL.getOrAdd(name).increase(1L);
Object r = method.invoke(service, args);
long end = System.currentTimeMillis();
MetricRepo.THRIFT_COUNTER_RPC_LATENCY.getOrAdd(name).increase(end - begin);
return r;
});
// setup frontend server
TProcessor tprocessor = new FrontendService.Processor<FrontendService.Iface>(
new FrontendServiceImpl(ExecuteEnv.getInstance()));
TProcessor tprocessor = new FrontendService.Processor<>(instance);
server = new ThriftServer(port, tprocessor);
server.start();
LOG.info("thrift server started.");

View File

@ -128,6 +128,7 @@ public class DatabaseTransactionMgr {
// count the number of running txns of database, except for the routine load txn
private volatile int runningTxnNums = 0;
private volatile int runningTxnReplicaNums = 0;
// count only the number of running routine load txns of database
private volatile int runningRoutineLoadTxnNums = 0;
@ -984,7 +985,11 @@ public class DatabaseTransactionMgr {
return;
}
// update transaction state version
transactionState.setCommitTime(System.currentTimeMillis());
long commitTime = System.currentTimeMillis();
transactionState.setCommitTime(commitTime);
if (MetricRepo.isInit) {
MetricRepo.HISTO_TXN_EXEC_LATENCY.update(commitTime - transactionState.getPrepareTime());
}
transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
transactionState.setErrorReplicas(errorReplicaIds);
for (long tableId : tableToPartition.keySet()) {
@ -1095,6 +1100,38 @@ public class DatabaseTransactionMgr {
updateTxnLabels(transactionState);
}
public void registerTxnReplicas(long txnId, int replicaNum) throws UserException {
writeLock();
try {
TransactionState transactionState = idToRunningTransactionState.get(txnId);
if (transactionState == null) {
throw new UserException("running transaction not found, txnId=" + txnId);
}
transactionState.setReplicaNum(replicaNum);
runningTxnReplicaNums += replicaNum;
} finally {
writeUnlock();
}
}
public int getRunningTxnNum() {
readLock();
try {
return runningTxnNums;
} finally {
readUnlock();
}
}
public int getRunningTxnReplicaNum() {
readLock();
try {
return runningTxnReplicaNums;
} finally {
readUnlock();
}
}
private void updateTxnLabels(TransactionState transactionState) {
Set<Long> txnIds = labelToTxnIds.get(transactionState.getLabel());
if (txnIds == null) {

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
import org.apache.doris.persist.EditLog;
import org.apache.doris.thrift.TStatus;
@ -640,4 +641,31 @@ public class GlobalTransactionMgr implements Writable {
}
throw new TimeoutException("Operation is timeout");
}
public long getAllRunningTxnNum() {
long total = 0;
for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) {
long num = mgr.getRunningTxnNum();
total += num;
Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId());
if (db != null) {
MetricRepo.DB_GAUGE_TXN_NUM.getOrAdd(db.getFullName()).setValue(num);
}
}
return total;
}
public long getAllRunningTxnReplicaNum() {
long total = 0;
for (DatabaseTransactionMgr mgr : dbIdToDatabaseTransactionMgrs.values()) {
long num = mgr.getRunningTxnReplicaNum();
total += num;
Database db = Env.getCurrentInternalCatalog().getDbNullable(mgr.getDbId());
if (db != null) {
MetricRepo.DB_GAUGE_TXN_REPLICA_NUM.getOrAdd(db.getFullName()).setValue(num);
}
}
return total;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
@ -255,6 +256,10 @@ public class PublishVersionDaemon extends MasterDaemon {
for (PublishVersionTask task : transactionState.getPublishVersionTasks().values()) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
if (MetricRepo.isInit) {
long publishTime = transactionState.getPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
}
}
} // end for readyTransactionStates
}

View File

@ -171,6 +171,7 @@ public class TransactionState implements Writable {
private long dbId;
private List<Long> tableIdList;
private int replicaNum = 0;
private long transactionId;
private String label;
// requestId is used to judge whether a begin request is a internal retry request.
@ -493,6 +494,14 @@ public class TransactionState implements Writable {
return tableIdList;
}
public int getReplicaNum() {
return replicaNum;
}
public void setReplicaNum(int replicaNum) {
this.replicaNum = replicaNum;
}
public Map<Long, TableCommitInfo> getIdToTableCommitInfos() {
return idToTableCommitInfos;
}

View File

@ -17,13 +17,9 @@
package org.apache.doris.common;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricRepo;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
public class ThreadPoolManagerTest {
@ -37,9 +33,6 @@ public class ThreadPoolManagerTest {
ThreadPoolManager.registerThreadPoolMetric("test_cache_pool", testCachedPool);
ThreadPoolManager.registerThreadPoolMetric("test_fixed_thread_pool", testFixedThreaddPool);
List<Metric> metricList = MetricRepo.getMetricsByName("thread_pool");
Assert.assertEquals(6, metricList.size());
Assert.assertEquals(ThreadPoolManager.LogDiscardPolicy.class,
testCachedPool.getRejectedExecutionHandler().getClass());
Assert.assertEquals(ThreadPoolManager.BlockedPolicy.class,