dxf: add metrics for alert purpose in nextgen (#64100)

ref pingcap/tidb#61702
This commit is contained in:
D3Hunter
2025-10-27 15:27:31 +08:00
committed by GitHub
parent edabae3a31
commit 9dc8189afa
30 changed files with 472 additions and 109 deletions

View File

@ -94,6 +94,7 @@ go_library(
"//pkg/ddl/util",
"//pkg/distsql",
"//pkg/distsql/context",
"//pkg/disttask/framework/dxfmetric",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/metering",
"//pkg/disttask/framework/proto",

View File

@ -19,6 +19,7 @@ import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"sort"
"time"
@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
@ -146,7 +148,14 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch(
logger.Info("available local disk space resource", zap.String("size", units.BytesSize(float64(availableDisk))))
return generateReadIndexPlan(ctx, sch.d, store, tbl, job, sch.GlobalSort, nodeCnt, logger)
case proto.BackfillStepMergeSort:
return generateMergeSortPlan(ctx, taskHandle, task, nodeCnt, backfillMeta.CloudStorageURI, logger)
metaBytes, err2 := generateMergeSortPlan(ctx, taskHandle, task, nodeCnt, backfillMeta.CloudStorageURI, logger)
if err2 != nil {
return nil, err2
}
if len(metaBytes) > 0 {
dxfmetric.ScheduleEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventMergeSort).Inc()
}
return metaBytes, nil
case proto.BackfillStepWriteAndIngest:
if sch.GlobalSort {
failpoint.Inject("mockWriteIngest", func() {

View File

@ -15,6 +15,7 @@
package partition
import (
"fmt"
"testing"
"github.com/pingcap/failpoint"
@ -117,18 +118,20 @@ func TestTruncatePartitionListFailures(t *testing.T) {
func testDDLWithInjectedErrors(t *testing.T, tests FailureTest, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterRollback, afterRecover [][]any, skipTests ...string) {
TEST:
for _, test := range tests.Tests {
for i, test := range tests.Tests {
for _, skip := range skipTests {
if test.Name == skip {
continue TEST
}
}
if test.Recoverable {
runOneTest(t, test, true, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRecover)
}
if test.Rollback {
runOneTest(t, test, false, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRollback)
}
t.Run(fmt.Sprint(i), func(t *testing.T) {
if test.Recoverable {
runOneTest(t, test, true, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRecover)
}
if test.Rollback {
runOneTest(t, test, false, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRollback)
}
})
}
}

View File

@ -0,0 +1,18 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "dxfmetric",
srcs = [
"collector.go",
"metric.go",
],
importpath = "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric",
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/proto",
"//pkg/metrics/common",
"//pkg/util/intest",
"@com_github_google_uuid//:uuid",
"@com_github_prometheus_client_golang//prometheus",
],
)

View File

@ -12,30 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
package dxfmetric
import (
"strconv"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/prometheus/client_golang/prometheus"
)
var disttaskCollector = newCollector()
func init() {
prometheus.MustRegister(disttaskCollector)
}
// Collector is a custom Prometheus collector for DXF metrics.
// Because the exec_id of a subtask may change, after all tasks
// are successful, subtasks will be migrated from tidb_subtask_background
// to tidb_subtask_background_history. In the above situation,
// the built-in collector of Prometheus needs to delete the previously
// added metrics, which is quite troublesome.
// Therefore, a custom collector is used.
type collector struct {
type Collector struct {
subtaskInfo atomic.Pointer[[]*proto.SubtaskBase]
taskInfo atomic.Pointer[[]*proto.TaskBase]
@ -44,40 +42,53 @@ type collector struct {
subtaskDuration *prometheus.Desc
}
func newCollector() *collector {
return &collector{
tasks: prometheus.NewDesc(
// NewCollector creates a new Collector.
func NewCollector() *Collector {
var constLabels prometheus.Labels
// we might create multiple domains in the same process in tests, we will
// add an uuid label to avoid conflict.
if intest.InTest {
constLabels = prometheus.Labels{"server_id": uuid.New().String()}
}
return &Collector{
tasks: metricscommon.NewDesc(
"tidb_disttask_task_status",
"Number of tasks.",
[]string{"task_type", "status"}, nil,
[]string{"task_type", "status"}, constLabels,
),
subtasks: prometheus.NewDesc(
subtasks: metricscommon.NewDesc(
"tidb_disttask_subtasks",
"Number of subtasks.",
[]string{"task_type", "task_id", "status", "exec_id"}, nil,
[]string{"task_type", "task_id", "status", "exec_id"}, constLabels,
),
subtaskDuration: prometheus.NewDesc(
subtaskDuration: metricscommon.NewDesc(
"tidb_disttask_subtask_duration",
"Duration of subtasks in different states.",
[]string{"task_type", "task_id", "status", "subtask_id", "exec_id"}, nil,
[]string{"task_type", "task_id", "status", "subtask_id", "exec_id"}, constLabels,
),
}
}
// UpdateInfo updates the task and subtask info in the collector.
func (c *Collector) UpdateInfo(tasks []*proto.TaskBase, subtasks []*proto.SubtaskBase) {
c.taskInfo.Store(&tasks)
c.subtaskInfo.Store(&subtasks)
}
// Describe implements the prometheus.Collector interface.
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.tasks
ch <- c.subtasks
ch <- c.subtaskDuration
}
// Collect implements the prometheus.Collector interface.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
func (c *Collector) Collect(ch chan<- prometheus.Metric) {
c.collectTasks(ch)
c.collectSubtasks(ch)
}
func (c *collector) collectTasks(ch chan<- prometheus.Metric) {
func (c *Collector) collectTasks(ch chan<- prometheus.Metric) {
p := c.taskInfo.Load()
if p == nil {
return
@ -104,7 +115,7 @@ func (c *collector) collectTasks(ch chan<- prometheus.Metric) {
}
}
func (c *collector) collectSubtasks(ch chan<- prometheus.Metric) {
func (c *Collector) collectSubtasks(ch chan<- prometheus.Metric) {
p := c.subtaskInfo.Load()
if p == nil {
return
@ -142,7 +153,7 @@ func (c *collector) collectSubtasks(ch chan<- prometheus.Metric) {
}
}
func (c *collector) setDistSubtaskDuration(ch chan<- prometheus.Metric, subtask *proto.SubtaskBase) {
func (c *Collector) setDistSubtaskDuration(ch chan<- prometheus.Metric, subtask *proto.SubtaskBase) {
switch subtask.State {
case proto.SubtaskStatePending:
ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue,

View File

@ -0,0 +1,99 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dxfmetric
import (
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
"github.com/prometheus/client_golang/prometheus"
)
const (
namespaceTiDB = "tidb"
subsystemDXF = "dxf"
lblType = "type"
lblEvent = "event"
lblState = "state"
// LblTaskID is the label for task ID.
LblTaskID = "task_id"
)
// event names during schedule and execute
const (
EventSubtaskScheduledAway = "subtask-scheduled-away"
EventSubtaskRerun = "subtask-rerun"
EventSubtaskSlow = "subtask-slow"
EventRetry = "retry"
EventTooManyIdx = "too-many-idx"
EventMergeSort = "merge-sort"
)
// DXF metrics
var (
UsedSlotsGauge *prometheus.GaugeVec
WorkerCount *prometheus.GaugeVec
FinishedTaskCounter *prometheus.CounterVec
ScheduleEventCounter *prometheus.CounterVec
ExecuteEventCounter *prometheus.CounterVec
)
// InitDistTaskMetrics initializes disttask metrics.
func InitDistTaskMetrics() {
UsedSlotsGauge = metricscommon.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespaceTiDB,
Subsystem: "disttask",
Name: "used_slots",
Help: "Gauge of used slots on a executor node.",
}, []string{"service_scope"})
WorkerCount = metricscommon.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespaceTiDB,
Subsystem: subsystemDXF,
Name: "worker_count",
Help: "Gauge of DXF worker count.",
}, []string{lblType})
FinishedTaskCounter = metricscommon.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespaceTiDB,
Subsystem: subsystemDXF,
Name: "finished_task_total",
Help: "Counter of finished DXF tasks.",
}, []string{lblState})
ScheduleEventCounter = metricscommon.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespaceTiDB,
Subsystem: subsystemDXF,
Name: "schedule_event_total",
Help: "Counter of DXF schedule events fo tasks.",
}, []string{LblTaskID, lblEvent})
// we use task ID instead of subtask ID to avoid too many lines of metrics.
ExecuteEventCounter = metricscommon.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespaceTiDB,
Subsystem: subsystemDXF,
Name: "execute_event_total",
Help: "Counter of DXF execute events fo tasks.",
}, []string{LblTaskID, lblEvent})
}
// Register registers DXF metrics.
func Register(register prometheus.Registerer) {
register.MustRegister(UsedSlotsGauge)
register.MustRegister(WorkerCount)
register.MustRegister(FinishedTaskCounter)
register.MustRegister(ScheduleEventCounter)
register.MustRegister(ExecuteEventCounter)
}

View File

@ -57,10 +57,7 @@ func GetScheduleStatus(ctx context.Context) (*schstatus.Status, error) {
if err != nil {
return nil, errors.Trace(err)
}
requiredNodes := calculateRequiredNodes(tasks, nodeCPU)
// make sure 1 node exist for DXF owner and works as a reserved node, to make
// small tasks more responsive.
requiredNodes = max(requiredNodes, 1)
requiredNodes := CalculateRequiredNodes(tasks, nodeCPU)
status := &schstatus.Status{
Version: schstatus.Version1,
TaskQueue: schstatus.TaskQueue{ScheduledCount: len(tasks)},
@ -135,10 +132,10 @@ func GetBusyNodes(ctx context.Context, manager *storage.TaskManager) ([]schstatu
return busyNodes, nil
}
// this function simulates how scheduler and balancer schedules tasks, and
// calculates the required node count to run the tasks.
// CalculateRequiredNodes simulates how scheduler and balancer schedules tasks,
// and calculates the required node count to run the tasks.
// 'tasks' must be ordered by its rank, see TaskBase for more info about task rank.
func calculateRequiredNodes(tasks []*proto.TaskBase, cpuCount int) int {
func CalculateRequiredNodes(tasks []*proto.TaskBase, cpuCount int) int {
availResources := make([]int, 0, len(tasks))
// for each task, at most MaxNodeCount subtasks can be run in parallel, and
// on each node, each task can have at most 1 subtask running. we will try to
@ -162,7 +159,9 @@ func calculateRequiredNodes(tasks []*proto.TaskBase, cpuCount int) int {
availResources = append(availResources, cpuCount-t.Concurrency)
}
}
return len(availResources)
// make sure 1 node exist for DXF owner and works as a reserved node, to make
// small tasks more responsive.
return max(len(availResources), 1)
}
// GetScheduleFlags returns the schedule flags, such as pause-scale-in flag.

View File

@ -29,6 +29,8 @@ func TestCalculateRequiredNodes(t *testing.T) {
params [][]int
expected int
}{
// no task
{cpuCount: 8, params: [][]int{}, expected: 1},
// single task cases
{cpuCount: 8, params: [][]int{{1, 1}}, expected: 1},
{cpuCount: 8, params: [][]int{{1, 3}}, expected: 3},
@ -67,7 +69,7 @@ func TestCalculateRequiredNodes(t *testing.T) {
}
tasks = append(tasks, task)
}
require.Equal(t, c.expected, calculateRequiredNodes(tasks, c.cpuCount))
require.Equal(t, c.expected, CalculateRequiredNodes(tasks, c.cpuCount))
})
}
}

View File

@ -5,7 +5,6 @@ go_library(
srcs = [
"autoscaler.go",
"balancer.go",
"collector.go",
"interface.go",
"nodes.go",
"scheduler.go",
@ -17,6 +16,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/config/kerneltype",
"//pkg/disttask/framework/dxfmetric",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/schstatus",
@ -26,6 +26,7 @@ go_library(
"//pkg/domain/serverinfo",
"//pkg/kv",
"//pkg/lightning/log",
"//pkg/metrics",
"//pkg/sessionctx",
"//pkg/sessionctx/vardef",
"//pkg/util",
@ -61,10 +62,11 @@ go_test(
embed = [":scheduler"],
flaky = True,
race = "off",
shard_count = 42,
shard_count = 43,
deps = [
"//pkg/config",
"//pkg/config/kerneltype",
"//pkg/disttask/framework/dxfmetric",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/proto",
@ -78,6 +80,7 @@ go_test(
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/util",
"//pkg/util/cpu",
"//pkg/util/disttask",
"//pkg/util/logutil",
@ -86,6 +89,8 @@ go_test(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_golang//prometheus",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",

View File

@ -17,6 +17,7 @@ package scheduler
import (
"context"
goerrors "errors"
"fmt"
"math/rand"
"strings"
"sync/atomic"
@ -24,6 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
@ -36,6 +38,7 @@ import (
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -123,7 +126,8 @@ func (s *BaseScheduler) ScheduleTask() {
}
// Close closes the scheduler.
func (*BaseScheduler) Close() {
func (s *BaseScheduler) Close() {
dxfmetric.ScheduleEventCounter.DeletePartialMatch(prometheus.Labels{dxfmetric.LblTaskID: fmt.Sprint(s.GetTask().ID)})
}
// GetTask implements the Scheduler interface.
@ -351,6 +355,7 @@ func (s *BaseScheduler) onReverting() error {
}
task.State = proto.TaskStateReverted
s.task.Store(task)
onTaskFinished(task.State, task.Error)
return nil
}
// Wait all subtasks in this step finishes.
@ -469,6 +474,7 @@ func (s *BaseScheduler) switch2NextStep() error {
task.Step = nextStep
task.State = proto.TaskStateSucceed
s.task.Store(task)
onTaskFinished(task.State, task.Error)
return nil
}
@ -579,6 +585,7 @@ func (s *BaseScheduler) handlePlanErr(err error) error {
task := s.getTaskClone()
s.logger.Warn("generate plan failed", zap.Error(err), zap.Stringer("state", task.State))
if s.IsRetryableErr(err) {
dxfmetric.ScheduleEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventRetry).Inc()
return err
}
return s.revertTask(err)
@ -697,6 +704,9 @@ func (s *BaseScheduler) GetLogger() *zap.Logger {
// IsCancelledErr checks if the error is a cancelled error.
func IsCancelledErr(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), taskCancelMsg)
}
@ -715,3 +725,21 @@ func getEligibleNodes(ctx context.Context, sch Scheduler, managedNodes []string)
return serverNodes, nil
}
func onTaskFinished(state proto.TaskState, taskErr error) {
// when task finishes, we classify the finished tasks into succeed/failed/cancelled
var metricState string
if state == proto.TaskStateSucceed || state == proto.TaskStateFailed {
metricState = state.String()
} else if state == proto.TaskStateReverted {
metricState = proto.TaskStateFailed.String()
if IsCancelledErr(taskErr) {
metricState = "cancelled"
}
}
if len(metricState) > 0 {
dxfmetric.FinishedTaskCounter.WithLabelValues("all").Inc()
dxfmetric.FinishedTaskCounter.WithLabelValues(metricState).Inc()
}
}

View File

@ -21,9 +21,13 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
@ -38,8 +42,10 @@ var (
// defaultHistorySubtaskTableGcInterval is the interval of gc history subtask table.
defaultHistorySubtaskTableGcInterval = 24 * time.Hour
// DefaultCleanUpInterval is the interval of cleanup routine.
DefaultCleanUpInterval = 10 * time.Minute
defaultCollectMetricsInterval = 5 * time.Second
DefaultCleanUpInterval = 10 * time.Minute
// metric scraping mostly happens at 15s intervals, it's meaningless to update
// internal collected date more frequently, so we align with that.
defaultCollectMetricsInterval = 15 * time.Second
)
func (sm *Manager) getSchedulerCount() int {
@ -118,6 +124,8 @@ type Manager struct {
schedulers []Scheduler
}
nodeRes *proto.NodeResource
// initialized on demand
metricCollector *dxfmetric.Collector
}
// NewManager creates a scheduler struct.
@ -187,6 +195,10 @@ func (sm *Manager) Stop() {
sm.clearSchedulers()
sm.initialized = false
close(sm.finishCh)
// clear existing counters on owner change
dxfmetric.WorkerCount.Reset()
dxfmetric.FinishedTaskCounter.Reset()
}
// Initialized check the manager initialized.
@ -295,6 +307,8 @@ func (sm *Manager) failTask(id int64, currState proto.TaskState, err error) {
if err2 := sm.taskMgr.FailTask(sm.ctx, id, currState, err); err2 != nil {
sm.logger.Warn("failed to update task state to failed",
zap.Int64("task-id", id), zap.Error(err2))
} else {
onTaskFinished(proto.TaskStateFailed, err)
}
}
@ -447,6 +461,11 @@ func (sm *Manager) collectLoop() {
sm.logger.Info("collect loop start")
ticker := time.NewTicker(defaultCollectMetricsInterval)
defer ticker.Stop()
sm.metricCollector = dxfmetric.NewCollector()
metrics.Register(sm.metricCollector)
defer func() {
metrics.Unregister(sm.metricCollector)
}()
for {
select {
case <-sm.ctx.Done():
@ -468,8 +487,36 @@ func (sm *Manager) collect() {
sm.logger.Warn("get all subtasks failed", zap.Error(err))
return
}
disttaskCollector.taskInfo.Store(&tasks)
disttaskCollector.subtaskInfo.Store(&subtasks)
sm.metricCollector.UpdateInfo(tasks, subtasks)
if kerneltype.IsNextGen() {
sm.collectWorkerMetrics(tasks)
}
}
func (sm *Manager) collectWorkerMetrics(tasks []*proto.TaskBase) {
manager, err := storage.GetTaskManager()
if err != nil {
sm.logger.Warn("failed to get task manager", zap.Error(err))
return
}
nodeCount, nodeCPU, err := handle.GetNodesInfo(sm.ctx, manager)
if err != nil {
sm.logger.Warn("failed to get nodes info", zap.Error(err))
return
}
scheduledTasks := make([]*proto.TaskBase, 0, len(tasks))
for _, t := range tasks {
if t.State == proto.TaskStateRunning || t.State == proto.TaskStateModifying {
scheduledTasks = append(scheduledTasks, t)
}
}
slices.SortFunc(scheduledTasks, func(i, j *proto.TaskBase) int {
return i.Compare(j)
})
requiredNodes := handle.CalculateRequiredNodes(tasks, nodeCPU)
dxfmetric.WorkerCount.WithLabelValues("required").Set(float64(requiredNodes))
dxfmetric.WorkerCount.WithLabelValues("current").Set(float64(nodeCount))
}
// MockScheduler mock one scheduler for one task, only used for tests.

View File

@ -22,11 +22,15 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
schmock "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/kv"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
"go.uber.org/mock/gomock"
@ -572,3 +576,44 @@ func TestSchedulerMaintainTaskFields(t *testing.T) {
require.True(t, ctrl.Satisfied())
})
}
func TestOnTaskFinished(t *testing.T) {
bak := dxfmetric.FinishedTaskCounter
t.Cleanup(func() {
dxfmetric.FinishedTaskCounter = bak
})
dxfmetric.FinishedTaskCounter = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "test"}, []string{"state"})
collectMetricsFn := func() map[string]int {
var ch = make(chan prometheus.Metric)
items := make([]*dto.Metric, 0)
var wg tidbutil.WaitGroupWrapper
wg.Run(func() {
for m := range ch {
dm := &dto.Metric{}
require.NoError(t, m.Write(dm))
items = append(items, dm)
}
})
dxfmetric.FinishedTaskCounter.Collect(ch)
close(ch)
wg.Wait()
values := make(map[string]int)
for _, it := range items {
values[*it.GetLabel()[0].Value] = int(it.GetCounter().GetValue())
}
return values
}
onTaskFinished(proto.TaskStateSucceed, nil)
require.EqualValues(t, map[string]int{"all": 1, "succeed": 1}, collectMetricsFn())
onTaskFinished(proto.TaskStateReverted, nil)
require.EqualValues(t, map[string]int{"all": 2, "succeed": 1, "failed": 1}, collectMetricsFn())
onTaskFinished(proto.TaskStateReverted, errors.New("some err"))
require.EqualValues(t, map[string]int{"all": 3, "succeed": 1, "failed": 2}, collectMetricsFn())
onTaskFinished(proto.TaskStateReverted, errors.New(taskCancelMsg))
require.EqualValues(t, map[string]int{"all": 4, "succeed": 1, "failed": 2, "cancelled": 1}, collectMetricsFn())
onTaskFinished(proto.TaskStateFailed, errors.New("some err"))
require.EqualValues(t, map[string]int{"all": 5, "succeed": 1, "failed": 3, "cancelled": 1}, collectMetricsFn())
// noop for non-finished state.
onTaskFinished(proto.TaskStateRunning, nil)
require.EqualValues(t, map[string]int{"all": 5, "succeed": 1, "failed": 3, "cancelled": 1}, collectMetricsFn())
}

View File

@ -402,6 +402,7 @@ func TestVerifyTaskStateTransform(t *testing.T) {
}
func TestIsCancelledErr(t *testing.T) {
require.False(t, scheduler.IsCancelledErr(nil))
require.False(t, scheduler.IsCancelledErr(errors.New("some err")))
require.False(t, scheduler.IsCancelledErr(context.Canceled))
require.True(t, scheduler.IsCancelledErr(errors.New("cancelled by user")))

View File

@ -14,6 +14,7 @@ go_library(
deps = [
"//br/pkg/storage",
"//pkg/config",
"//pkg/disttask/framework/dxfmetric",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
@ -33,6 +34,7 @@ go_library(
"//pkg/util/memory",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_golang//prometheus",
"@org_uber_go_zap//:zap",
],
)

View File

@ -23,6 +23,7 @@ import (
"github.com/pingcap/failpoint"
litstorage "github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
@ -149,7 +150,7 @@ func (m *Manager) handleTasksLoop() {
m.handleTasks()
// service scope might change, so we call WithLabelValues every time.
metrics.DistTaskUsedSlotsGauge.WithLabelValues(vardef.ServiceScope.Load()).
dxfmetric.UsedSlotsGauge.WithLabelValues(vardef.ServiceScope.Load()).
Set(float64(m.slotManager.usedSlots()))
metrics.GlobalSortUploadWorkerCount.Set(float64(litstorage.GetActiveUploadWorkerCount()))
}

View File

@ -18,6 +18,7 @@ import (
"bytes"
"context"
goerrors "errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
@ -25,6 +26,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
@ -39,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -138,6 +141,7 @@ func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *Ba
// - If current running subtask are scheduled away from this node, i.e. this node
// is taken as down, cancel running.
func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCancel context.CancelFunc) {
start := time.Now()
ticker := time.NewTicker(checkBalanceSubtaskInterval)
defer ticker.Stop()
for {
@ -148,6 +152,12 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCa
}
task := e.task.Load()
if time.Since(start) > time.Hour {
start = time.Now()
e.logger.Info("subtask running for too long", zap.Int64("subtaskID", e.currSubtaskID.Load()))
dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventSubtaskSlow).Inc()
}
subtasks, err := e.taskTable.GetSubtasksByExecIDAndStepAndStates(ctx, e.execID, task.ID, task.Step,
proto.SubtaskStateRunning)
if err != nil {
@ -158,6 +168,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCa
e.logger.Info("subtask is scheduled away, cancel running",
zap.Int64("subtaskID", e.currSubtaskID.Load()))
// cancels runStep, but leave the subtask state unchanged.
dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventSubtaskScheduledAway).Inc()
if subtaskCtxCancel != nil {
subtaskCtxCancel()
}
@ -369,6 +380,7 @@ func (e *BaseTaskExecutor) createStepExecutor() error {
if err := stepExecutor.Init(e.ctx); err != nil {
if e.IsRetryableError(err) {
dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventRetry).Inc()
e.logger.Info("meet retryable err when init step executor", zap.Error(err))
} else {
e.logger.Info("failed to init step executor", zap.Error(err))
@ -423,6 +435,7 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) {
}
return ErrNonIdempotentSubtask
}
dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(subtask.TaskID), dxfmetric.EventSubtaskRerun).Inc()
e.logger.Info("subtask in running state and is idempotent",
zap.Int64("subtask-id", subtask.ID))
} else {
@ -626,6 +639,8 @@ func (e *BaseTaskExecutor) Cancel() {
// Close closes the TaskExecutor when all the subtasks are complete.
func (e *BaseTaskExecutor) Close() {
e.Cancel()
dxfmetric.ExecuteEventCounter.DeletePartialMatch(prometheus.Labels{dxfmetric.LblTaskID: fmt.Sprint(e.GetTaskBase().ID)})
}
func (e *BaseTaskExecutor) cancelRunStepWith(cause error) {
@ -709,6 +724,7 @@ func (e *BaseTaskExecutor) markSubTaskCanceledOrFailed(ctx context.Context, subt
e.logger.Info("meet context canceled for gracefully shutdown")
} else if e.IsRetryableError(stErr) {
dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(subtask.TaskID), dxfmetric.EventRetry).Inc()
e.logger.Warn("meet retryable error", zap.Error(stErr))
} else {
e.logger.Warn("subtask failed", zap.Error(stErr))

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/config",
"//pkg/config/kerneltype",
"//pkg/ddl",
"//pkg/disttask/framework/dxfmetric",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/metering",
"//pkg/disttask/framework/planner",

View File

@ -17,6 +17,7 @@ package importinto
import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"
@ -26,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
tidb "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/planner"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
@ -51,6 +53,11 @@ import (
)
const (
// warningIndexCount is the threshold to log warning for too many indexes on
// the target table, it's known to be slow to import in this case.
// the value if chosen as most tables have less than 32 indexes, we can adjust
// it later if needed.
warningIndexCount = 32
registerTaskTTL = 10 * time.Minute
refreshTaskTTLInterval = 3 * time.Minute
registerTimeout = 5 * time.Second
@ -314,6 +321,9 @@ func (sch *importScheduler) OnNextSubtasksBatch(
if err = sch.startJob(ctx, logger, taskMeta, jobStep); err != nil {
return nil, err
}
if importer.GetNumOfIndexGenKV(taskMeta.Plan.TableInfo) > warningIndexCount {
dxfmetric.ScheduleEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventTooManyIdx).Inc()
}
case proto.ImportStepMergeSort:
sortAndEncodeMeta, err := taskHandle.GetPreviousSubtaskMetas(task.ID, proto.ImportStepEncodeAndSort)
if err != nil {
@ -391,6 +401,9 @@ func (sch *importScheduler) OnNextSubtasksBatch(
}
logger.Info("generate subtasks", zap.Int("subtask-count", len(metaBytes)))
if nextStep == proto.ImportStepMergeSort && len(metaBytes) > 0 {
dxfmetric.ScheduleEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventMergeSort).Inc()
}
return metaBytes, nil
}

View File

@ -16,6 +16,7 @@ package domain_test
import (
"context"
"fmt"
"testing"
"time"
@ -30,15 +31,15 @@ import (
)
func TestWriteRUStatistics(t *testing.T) {
tz, _ := time.LoadLocation("Asia/Shanghai")
testWriteRUStatisticsTz(t, tz)
tzShanghai, _ := time.LoadLocation("Asia/Shanghai")
// test with DST timezone.
tz, _ = time.LoadLocation("Australia/Lord_Howe")
testWriteRUStatisticsTz(t, tz)
tzLord, _ := time.LoadLocation("Australia/Lord_Howe")
testWriteRUStatisticsTz(t, time.Local)
testWriteRUStatisticsTz(t, time.UTC)
for i, tz := range []*time.Location{tzShanghai, tzLord, time.Local, time.UTC} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
testWriteRUStatisticsTz(t, tz)
})
}
}
func testWriteRUStatisticsTz(t *testing.T, tz *time.Location) {

View File

@ -229,7 +229,7 @@ func TestPartitionMemCacheReadLock(t *testing.T) {
}
func TestPointGetLockExistKey(t *testing.T) {
testLock := func(rc bool, key string, tableName string) {
testLock := func(t *testing.T, rc bool, key string, tableName string) {
store := testkit.CreateMockStore(t)
tk1, tk2 := testkit.NewTestKit(t, store), testkit.NewTestKit(t, store)
@ -336,9 +336,11 @@ func TestPointGetLockExistKey(t *testing.T) {
{rc: true, key: "unique key"},
} {
tableName := fmt.Sprintf("t_%d", i)
func(rc bool, key string, tableName string) {
testLock(rc, key, tableName)
}(one.rc, one.key, tableName)
t.Run(tableName, func(t *testing.T) {
func(rc bool, key string, tableName string) {
testLock(t, rc, key, tableName)
}(one.rc, one.key, tableName)
})
}
}

View File

@ -32,8 +32,11 @@ import (
// TODO remove spill as it should be tested in sort_spill_test.go
// TODO add some new fail points, as some fail point may be aborted after the refine
func TestSortInDisk(t *testing.T) {
testSortInDisk(t, false)
testSortInDisk(t, true)
for i, v := range []bool{false, true} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
testSortInDisk(t, v)
})
}
}
// TODO remove failpoint

View File

@ -7,7 +7,6 @@ go_library(
"br.go",
"ddl.go",
"distsql.go",
"disttask.go",
"domain.go",
"executor.go",
"gc_worker.go",
@ -32,6 +31,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/metrics",
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/dxfmetric",
"//pkg/lightning/metric",
"//pkg/metrics/common",
"//pkg/parser/terror",

View File

@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "common",
@ -7,3 +7,15 @@ go_library(
visibility = ["//visibility:public"],
deps = ["@com_github_prometheus_client_golang//prometheus"],
)
go_test(
name = "common_test",
timeout = "short",
srcs = ["wrapper_test.go"],
embed = [":common"],
flaky = True,
deps = [
"@com_github_prometheus_client_golang//prometheus",
"@com_github_stretchr_testify//require",
],
)

View File

@ -16,6 +16,7 @@ package metricscommon
import (
"fmt"
"maps"
"strings"
"github.com/prometheus/client_golang/prometheus"
@ -28,6 +29,19 @@ func GetConstLabels() prometheus.Labels {
return constLabels
}
// GetMergedConstLabels merges input constant labels with package-level constant labels.
func GetMergedConstLabels(in prometheus.Labels) prometheus.Labels {
res := constLabels
if len(in) > 0 {
res = make(prometheus.Labels, len(constLabels)+len(in))
// merge in and constLabels, but constLabels defined in this package has
// higher priority.
maps.Copy(res, in)
maps.Copy(res, constLabels)
}
return res
}
// SetConstLabels sets constant labels for metrics.
func SetConstLabels(kv ...string) {
if len(kv)%2 == 1 {
@ -80,3 +94,9 @@ func NewSummaryVec(opts prometheus.SummaryOpts, labelNames []string) *prometheus
opts.ConstLabels = constLabels
return prometheus.NewSummaryVec(opts, labelNames)
}
// NewDesc wraps a prometheus.NewDesc.
func NewDesc(fqName, help string, variableLabels []string, inConstLbls prometheus.Labels) *prometheus.Desc {
cstLabels := GetMergedConstLabels(inConstLbls)
return prometheus.NewDesc(fqName, help, variableLabels, cstLabels)
}

View File

@ -0,0 +1,39 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metricscommon
import (
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
func TestGetMergedConstLabels(t *testing.T) {
bak := constLabels
t.Cleanup(func() {
constLabels = bak
})
SetConstLabels()
require.EqualValues(t, prometheus.Labels{}, GetMergedConstLabels(nil))
require.EqualValues(t, prometheus.Labels{"c": "3"}, GetMergedConstLabels(prometheus.Labels{"c": "3"}))
SetConstLabels("a", "1", "b", "2")
require.EqualValues(t, prometheus.Labels{"a": "1", "b": "2"}, GetMergedConstLabels(nil))
require.EqualValues(t, prometheus.Labels{"a": "1", "b": "2"}, GetMergedConstLabels(prometheus.Labels{}))
require.EqualValues(t, prometheus.Labels{"a": "1", "b": "2", "c": "3"}, GetMergedConstLabels(prometheus.Labels{"c": "3"}))
// constLabels has higher priority
require.EqualValues(t, prometheus.Labels{"a": "1", "b": "2"}, GetMergedConstLabels(prometheus.Labels{"a": "100"}))
}

View File

@ -1,36 +0,0 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
"github.com/prometheus/client_golang/prometheus"
)
var (
// DistTaskUsedSlotsGauge is the gauge of used slots on executor node.
DistTaskUsedSlotsGauge *prometheus.GaugeVec
)
// InitDistTaskMetrics initializes disttask metrics.
func InitDistTaskMetrics() {
DistTaskUsedSlotsGauge = metricscommon.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "disttask",
Name: "used_slots",
Help: "Gauge of used slots on a executor node.",
}, []string{"service_scope"})
}

View File

@ -16,6 +16,7 @@ package metrics
import (
"github.com/pingcap/tidb/pkg/lightning/metric"
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
"github.com/pingcap/tidb/pkg/util/promutil"
"github.com/prometheus/client_golang/prometheus"
)
@ -24,7 +25,8 @@ const importMetricSubsystem = "import"
// GetRegisteredImportMetrics returns the registered import metrics.
func GetRegisteredImportMetrics(factory promutil.Factory, constLabels prometheus.Labels) *metric.Common {
metrics := metric.NewCommon(factory, TiDB, importMetricSubsystem, constLabels)
mergedCstLabels := metricscommon.GetMergedConstLabels(constLabels)
metrics := metric.NewCommon(factory, TiDB, importMetricSubsystem, mergedCstLabels)
metrics.RegisterTo(prometheus.DefaultRegisterer)
return metrics
}

View File

@ -17,6 +17,7 @@ package metrics
import (
"sync"
"github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric"
metricscommon "github.com/pingcap/tidb/pkg/metrics/common"
timermetrics "github.com/pingcap/tidb/pkg/timer/metrics"
"github.com/pingcap/tidb/pkg/util/logutil"
@ -95,7 +96,7 @@ func InitMetrics() {
InitTelemetryMetrics()
InitTopSQLMetrics()
InitTTLMetrics()
InitDistTaskMetrics()
dxfmetric.InitDistTaskMetrics()
InitResourceGroupMetrics()
InitGlobalSortMetrics()
InitInfoSchemaV2Metrics()
@ -282,7 +283,8 @@ func RegisterMetrics() {
prometheus.MustRegister(PlanReplayerTaskCounter)
prometheus.MustRegister(PlanReplayerRegisterTaskGauge)
prometheus.MustRegister(DistTaskUsedSlotsGauge)
dxfmetric.Register(prometheus.DefaultRegisterer)
prometheus.MustRegister(RunawayCheckerCounter)
prometheus.MustRegister(GlobalSortWriteToCloudStorageDuration)
prometheus.MustRegister(GlobalSortWriteToCloudStorageRate)
@ -335,6 +337,18 @@ func RegisterMetrics() {
tikvmetrics.TiKVPanicCounter = PanicCounter // reset tidb metrics for tikv metrics
}
// Register registers custom collectors.
func Register(cs ...prometheus.Collector) {
prometheus.MustRegister(cs...)
}
// Unregister unregisters custom collectors.
func Unregister(cs ...prometheus.Collector) {
for _, c := range cs {
prometheus.Unregister(c)
}
}
var mode struct {
sync.Mutex
isSimplified bool

View File

@ -837,13 +837,15 @@ func planCacheIndexMergePrepareData(tk *testkit.TestKit) {
}
func TestPlanCacheRandomCases(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
testRandomPlanCacheCases(t, planCacheIndexMergePrepareData, planCacheIndexMergeQueries)
testRandomPlanCacheCases(t, planCacheIntConvertPrepareData, planCacheIntConvertQueries)
testRandomPlanCacheCases(t, planCachePointGetPrepareData, planCachePointGetQueries)
t.Run("1", func(t *testing.T) {
testRandomPlanCacheCases(t, planCacheIndexMergePrepareData, planCacheIndexMergeQueries)
})
t.Run("2", func(t *testing.T) {
testRandomPlanCacheCases(t, planCacheIntConvertPrepareData, planCacheIntConvertQueries)
})
t.Run("3", func(t *testing.T) {
testRandomPlanCacheCases(t, planCachePointGetPrepareData, planCachePointGetQueries)
})
}
func testRandomPlanCacheCases(t *testing.T,

View File

@ -167,8 +167,11 @@ func TestCursorFetchExecuteCheck(t *testing.T) {
}
func TestConcurrentExecuteAndFetch(t *testing.T) {
runTestConcurrentExecuteAndFetch(t, false)
runTestConcurrentExecuteAndFetch(t, true)
for i, v := range []bool{false, true} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
runTestConcurrentExecuteAndFetch(t, v)
})
}
}
func runTestConcurrentExecuteAndFetch(t *testing.T, lazy bool) {