store, metrics: Add metrics for safetTS updating (#24687)
This commit is contained in:
@ -18,6 +18,7 @@ import (
|
||||
"crypto/tls"
|
||||
"math"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -436,17 +437,20 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
|
||||
storeAddr := store.addr
|
||||
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
|
||||
defer wg.Done()
|
||||
// TODO: add metrics for updateSafeTS
|
||||
resp, err := tikvClient.SendRequest(ctx, storeAddr, tikvrpc.NewRequest(tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{KeyRange: &kvrpcpb.KeyRange{
|
||||
StartKey: []byte(""),
|
||||
EndKey: []byte(""),
|
||||
}}), ReadTimeoutShort)
|
||||
storeIDStr := strconv.Itoa(int(storeID))
|
||||
if err != nil {
|
||||
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
|
||||
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
|
||||
return
|
||||
}
|
||||
safeTSResp := resp.Resp.(*kvrpcpb.StoreSafeTSResponse)
|
||||
s.setSafeTS(storeID, safeTSResp.GetSafeTs())
|
||||
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", storeIDStr).Inc()
|
||||
metrics.TiKVSafeTSUpdateStats.WithLabelValues(storeIDStr).Set(float64(safeTSResp.GetSafeTs()))
|
||||
}(ctx, wg, storeID, storeAddr)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
@ -59,6 +59,8 @@ var (
|
||||
TiKVPanicCounter *prometheus.CounterVec
|
||||
TiKVForwardRequestCounter *prometheus.CounterVec
|
||||
TiKVTSFutureWaitDuration prometheus.Histogram
|
||||
TiKVSafeTSUpdateCounter *prometheus.CounterVec
|
||||
TiKVSafeTSUpdateStats *prometheus.GaugeVec
|
||||
)
|
||||
|
||||
// Label constants.
|
||||
@ -414,6 +416,22 @@ func initMetrics(namespace, subsystem string) {
|
||||
Buckets: prometheus.ExponentialBuckets(0.000005, 2, 30), // 5us ~ 2560s
|
||||
})
|
||||
|
||||
TiKVSafeTSUpdateCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "safets_update_counter",
|
||||
Help: "Counter of tikv safe_ts being updated.",
|
||||
}, []string{LblResult, LblStore})
|
||||
|
||||
TiKVSafeTSUpdateStats = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "safets_update_stats",
|
||||
Help: "stat of tikv updating safe_ts stats",
|
||||
}, []string{LblStore})
|
||||
|
||||
initShortcuts()
|
||||
}
|
||||
|
||||
@ -468,6 +486,8 @@ func RegisterMetrics() {
|
||||
prometheus.MustRegister(TiKVPanicCounter)
|
||||
prometheus.MustRegister(TiKVForwardRequestCounter)
|
||||
prometheus.MustRegister(TiKVTSFutureWaitDuration)
|
||||
prometheus.MustRegister(TiKVSafeTSUpdateCounter)
|
||||
prometheus.MustRegister(TiKVSafeTSUpdateStats)
|
||||
}
|
||||
|
||||
// readCounter reads the value of a prometheus.Counter.
|
||||
|
||||
Reference in New Issue
Block a user