diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 78a0b3ecf6..908cf10775 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -94,6 +94,7 @@ go_library( "//pkg/ddl/util", "//pkg/distsql", "//pkg/distsql/context", + "//pkg/disttask/framework/dxfmetric", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/metering", "//pkg/disttask/framework/proto", diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index e156234e3b..2cff9f5bfd 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -19,6 +19,7 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "math" "sort" "time" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/ddl/logutil" + "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" @@ -146,7 +148,14 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch( logger.Info("available local disk space resource", zap.String("size", units.BytesSize(float64(availableDisk)))) return generateReadIndexPlan(ctx, sch.d, store, tbl, job, sch.GlobalSort, nodeCnt, logger) case proto.BackfillStepMergeSort: - return generateMergeSortPlan(ctx, taskHandle, task, nodeCnt, backfillMeta.CloudStorageURI, logger) + metaBytes, err2 := generateMergeSortPlan(ctx, taskHandle, task, nodeCnt, backfillMeta.CloudStorageURI, logger) + if err2 != nil { + return nil, err2 + } + if len(metaBytes) > 0 { + dxfmetric.ScheduleEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventMergeSort).Inc() + } + return metaBytes, nil case proto.BackfillStepWriteAndIngest: if sch.GlobalSort { failpoint.Inject("mockWriteIngest", func() { diff --git a/pkg/ddl/tests/partition/error_injection_test.go b/pkg/ddl/tests/partition/error_injection_test.go index dac8aca75e..a046164779 100644 --- a/pkg/ddl/tests/partition/error_injection_test.go +++ b/pkg/ddl/tests/partition/error_injection_test.go @@ -15,6 +15,7 @@ package partition import ( + "fmt" "testing" "github.com/pingcap/failpoint" @@ -117,18 +118,20 @@ func TestTruncatePartitionListFailures(t *testing.T) { func testDDLWithInjectedErrors(t *testing.T, tests FailureTest, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterRollback, afterRecover [][]any, skipTests ...string) { TEST: - for _, test := range tests.Tests { + for i, test := range tests.Tests { for _, skip := range skipTests { if test.Name == skip { continue TEST } } - if test.Recoverable { - runOneTest(t, test, true, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRecover) - } - if test.Rollback { - runOneTest(t, test, false, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRollback) - } + t.Run(fmt.Sprint(i), func(t *testing.T) { + if test.Recoverable { + runOneTest(t, test, true, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRecover) + } + if test.Rollback { + runOneTest(t, test, false, tests.FailpointPrefix, createSQL, alterSQL, beforeDML, beforeResult, afterDML, afterRollback) + } + }) } } diff --git a/pkg/disttask/framework/dxfmetric/BUILD.bazel b/pkg/disttask/framework/dxfmetric/BUILD.bazel new file mode 100644 index 0000000000..0365383a93 --- /dev/null +++ b/pkg/disttask/framework/dxfmetric/BUILD.bazel @@ -0,0 +1,18 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "dxfmetric", + srcs = [ + "collector.go", + "metric.go", + ], + importpath = "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric", + visibility = ["//visibility:public"], + deps = [ + "//pkg/disttask/framework/proto", + "//pkg/metrics/common", + "//pkg/util/intest", + "@com_github_google_uuid//:uuid", + "@com_github_prometheus_client_golang//prometheus", + ], +) diff --git a/pkg/disttask/framework/scheduler/collector.go b/pkg/disttask/framework/dxfmetric/collector.go similarity index 75% rename from pkg/disttask/framework/scheduler/collector.go rename to pkg/disttask/framework/dxfmetric/collector.go index acef9ebd21..9e15372107 100644 --- a/pkg/disttask/framework/scheduler/collector.go +++ b/pkg/disttask/framework/dxfmetric/collector.go @@ -12,30 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -package scheduler +package dxfmetric import ( "strconv" "sync/atomic" "time" + "github.com/google/uuid" "github.com/pingcap/tidb/pkg/disttask/framework/proto" + metricscommon "github.com/pingcap/tidb/pkg/metrics/common" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/prometheus/client_golang/prometheus" ) -var disttaskCollector = newCollector() - -func init() { - prometheus.MustRegister(disttaskCollector) -} - +// Collector is a custom Prometheus collector for DXF metrics. // Because the exec_id of a subtask may change, after all tasks // are successful, subtasks will be migrated from tidb_subtask_background // to tidb_subtask_background_history. In the above situation, // the built-in collector of Prometheus needs to delete the previously // added metrics, which is quite troublesome. // Therefore, a custom collector is used. -type collector struct { +type Collector struct { subtaskInfo atomic.Pointer[[]*proto.SubtaskBase] taskInfo atomic.Pointer[[]*proto.TaskBase] @@ -44,40 +42,53 @@ type collector struct { subtaskDuration *prometheus.Desc } -func newCollector() *collector { - return &collector{ - tasks: prometheus.NewDesc( +// NewCollector creates a new Collector. +func NewCollector() *Collector { + var constLabels prometheus.Labels + // we might create multiple domains in the same process in tests, we will + // add an uuid label to avoid conflict. + if intest.InTest { + constLabels = prometheus.Labels{"server_id": uuid.New().String()} + } + return &Collector{ + tasks: metricscommon.NewDesc( "tidb_disttask_task_status", "Number of tasks.", - []string{"task_type", "status"}, nil, + []string{"task_type", "status"}, constLabels, ), - subtasks: prometheus.NewDesc( + subtasks: metricscommon.NewDesc( "tidb_disttask_subtasks", "Number of subtasks.", - []string{"task_type", "task_id", "status", "exec_id"}, nil, + []string{"task_type", "task_id", "status", "exec_id"}, constLabels, ), - subtaskDuration: prometheus.NewDesc( + subtaskDuration: metricscommon.NewDesc( "tidb_disttask_subtask_duration", "Duration of subtasks in different states.", - []string{"task_type", "task_id", "status", "subtask_id", "exec_id"}, nil, + []string{"task_type", "task_id", "status", "subtask_id", "exec_id"}, constLabels, ), } } +// UpdateInfo updates the task and subtask info in the collector. +func (c *Collector) UpdateInfo(tasks []*proto.TaskBase, subtasks []*proto.SubtaskBase) { + c.taskInfo.Store(&tasks) + c.subtaskInfo.Store(&subtasks) +} + // Describe implements the prometheus.Collector interface. -func (c *collector) Describe(ch chan<- *prometheus.Desc) { +func (c *Collector) Describe(ch chan<- *prometheus.Desc) { ch <- c.tasks ch <- c.subtasks ch <- c.subtaskDuration } // Collect implements the prometheus.Collector interface. -func (c *collector) Collect(ch chan<- prometheus.Metric) { +func (c *Collector) Collect(ch chan<- prometheus.Metric) { c.collectTasks(ch) c.collectSubtasks(ch) } -func (c *collector) collectTasks(ch chan<- prometheus.Metric) { +func (c *Collector) collectTasks(ch chan<- prometheus.Metric) { p := c.taskInfo.Load() if p == nil { return @@ -104,7 +115,7 @@ func (c *collector) collectTasks(ch chan<- prometheus.Metric) { } } -func (c *collector) collectSubtasks(ch chan<- prometheus.Metric) { +func (c *Collector) collectSubtasks(ch chan<- prometheus.Metric) { p := c.subtaskInfo.Load() if p == nil { return @@ -142,7 +153,7 @@ func (c *collector) collectSubtasks(ch chan<- prometheus.Metric) { } } -func (c *collector) setDistSubtaskDuration(ch chan<- prometheus.Metric, subtask *proto.SubtaskBase) { +func (c *Collector) setDistSubtaskDuration(ch chan<- prometheus.Metric, subtask *proto.SubtaskBase) { switch subtask.State { case proto.SubtaskStatePending: ch <- prometheus.MustNewConstMetric(c.subtaskDuration, prometheus.GaugeValue, diff --git a/pkg/disttask/framework/dxfmetric/metric.go b/pkg/disttask/framework/dxfmetric/metric.go new file mode 100644 index 0000000000..804b36cf1b --- /dev/null +++ b/pkg/disttask/framework/dxfmetric/metric.go @@ -0,0 +1,99 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dxfmetric + +import ( + metricscommon "github.com/pingcap/tidb/pkg/metrics/common" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespaceTiDB = "tidb" + subsystemDXF = "dxf" + lblType = "type" + lblEvent = "event" + lblState = "state" + + // LblTaskID is the label for task ID. + LblTaskID = "task_id" +) + +// event names during schedule and execute +const ( + EventSubtaskScheduledAway = "subtask-scheduled-away" + EventSubtaskRerun = "subtask-rerun" + EventSubtaskSlow = "subtask-slow" + EventRetry = "retry" + EventTooManyIdx = "too-many-idx" + EventMergeSort = "merge-sort" +) + +// DXF metrics +var ( + UsedSlotsGauge *prometheus.GaugeVec + WorkerCount *prometheus.GaugeVec + FinishedTaskCounter *prometheus.CounterVec + ScheduleEventCounter *prometheus.CounterVec + ExecuteEventCounter *prometheus.CounterVec +) + +// InitDistTaskMetrics initializes disttask metrics. +func InitDistTaskMetrics() { + UsedSlotsGauge = metricscommon.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespaceTiDB, + Subsystem: "disttask", + Name: "used_slots", + Help: "Gauge of used slots on a executor node.", + }, []string{"service_scope"}) + WorkerCount = metricscommon.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespaceTiDB, + Subsystem: subsystemDXF, + Name: "worker_count", + Help: "Gauge of DXF worker count.", + }, []string{lblType}) + FinishedTaskCounter = metricscommon.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespaceTiDB, + Subsystem: subsystemDXF, + Name: "finished_task_total", + Help: "Counter of finished DXF tasks.", + }, []string{lblState}) + ScheduleEventCounter = metricscommon.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespaceTiDB, + Subsystem: subsystemDXF, + Name: "schedule_event_total", + Help: "Counter of DXF schedule events fo tasks.", + }, []string{LblTaskID, lblEvent}) + // we use task ID instead of subtask ID to avoid too many lines of metrics. + ExecuteEventCounter = metricscommon.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespaceTiDB, + Subsystem: subsystemDXF, + Name: "execute_event_total", + Help: "Counter of DXF execute events fo tasks.", + }, []string{LblTaskID, lblEvent}) +} + +// Register registers DXF metrics. +func Register(register prometheus.Registerer) { + register.MustRegister(UsedSlotsGauge) + register.MustRegister(WorkerCount) + register.MustRegister(FinishedTaskCounter) + register.MustRegister(ScheduleEventCounter) + register.MustRegister(ExecuteEventCounter) +} diff --git a/pkg/disttask/framework/handle/status.go b/pkg/disttask/framework/handle/status.go index 75218b9015..734ee68a82 100644 --- a/pkg/disttask/framework/handle/status.go +++ b/pkg/disttask/framework/handle/status.go @@ -57,10 +57,7 @@ func GetScheduleStatus(ctx context.Context) (*schstatus.Status, error) { if err != nil { return nil, errors.Trace(err) } - requiredNodes := calculateRequiredNodes(tasks, nodeCPU) - // make sure 1 node exist for DXF owner and works as a reserved node, to make - // small tasks more responsive. - requiredNodes = max(requiredNodes, 1) + requiredNodes := CalculateRequiredNodes(tasks, nodeCPU) status := &schstatus.Status{ Version: schstatus.Version1, TaskQueue: schstatus.TaskQueue{ScheduledCount: len(tasks)}, @@ -135,10 +132,10 @@ func GetBusyNodes(ctx context.Context, manager *storage.TaskManager) ([]schstatu return busyNodes, nil } -// this function simulates how scheduler and balancer schedules tasks, and -// calculates the required node count to run the tasks. +// CalculateRequiredNodes simulates how scheduler and balancer schedules tasks, +// and calculates the required node count to run the tasks. // 'tasks' must be ordered by its rank, see TaskBase for more info about task rank. -func calculateRequiredNodes(tasks []*proto.TaskBase, cpuCount int) int { +func CalculateRequiredNodes(tasks []*proto.TaskBase, cpuCount int) int { availResources := make([]int, 0, len(tasks)) // for each task, at most MaxNodeCount subtasks can be run in parallel, and // on each node, each task can have at most 1 subtask running. we will try to @@ -162,7 +159,9 @@ func calculateRequiredNodes(tasks []*proto.TaskBase, cpuCount int) int { availResources = append(availResources, cpuCount-t.Concurrency) } } - return len(availResources) + // make sure 1 node exist for DXF owner and works as a reserved node, to make + // small tasks more responsive. + return max(len(availResources), 1) } // GetScheduleFlags returns the schedule flags, such as pause-scale-in flag. diff --git a/pkg/disttask/framework/handle/status_test.go b/pkg/disttask/framework/handle/status_test.go index 9e4ad427d4..fb939b2d9c 100644 --- a/pkg/disttask/framework/handle/status_test.go +++ b/pkg/disttask/framework/handle/status_test.go @@ -29,6 +29,8 @@ func TestCalculateRequiredNodes(t *testing.T) { params [][]int expected int }{ + // no task + {cpuCount: 8, params: [][]int{}, expected: 1}, // single task cases {cpuCount: 8, params: [][]int{{1, 1}}, expected: 1}, {cpuCount: 8, params: [][]int{{1, 3}}, expected: 3}, @@ -67,7 +69,7 @@ func TestCalculateRequiredNodes(t *testing.T) { } tasks = append(tasks, task) } - require.Equal(t, c.expected, calculateRequiredNodes(tasks, c.cpuCount)) + require.Equal(t, c.expected, CalculateRequiredNodes(tasks, c.cpuCount)) }) } } diff --git a/pkg/disttask/framework/scheduler/BUILD.bazel b/pkg/disttask/framework/scheduler/BUILD.bazel index d47a9bbba5..05459edf4f 100644 --- a/pkg/disttask/framework/scheduler/BUILD.bazel +++ b/pkg/disttask/framework/scheduler/BUILD.bazel @@ -5,7 +5,6 @@ go_library( srcs = [ "autoscaler.go", "balancer.go", - "collector.go", "interface.go", "nodes.go", "scheduler.go", @@ -17,6 +16,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/config/kerneltype", + "//pkg/disttask/framework/dxfmetric", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/proto", "//pkg/disttask/framework/schstatus", @@ -26,6 +26,7 @@ go_library( "//pkg/domain/serverinfo", "//pkg/kv", "//pkg/lightning/log", + "//pkg/metrics", "//pkg/sessionctx", "//pkg/sessionctx/vardef", "//pkg/util", @@ -61,10 +62,11 @@ go_test( embed = [":scheduler"], flaky = True, race = "off", - shard_count = 42, + shard_count = 43, deps = [ "//pkg/config", "//pkg/config/kerneltype", + "//pkg/disttask/framework/dxfmetric", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/mock", "//pkg/disttask/framework/proto", @@ -78,6 +80,7 @@ go_test( "//pkg/testkit", "//pkg/testkit/testfailpoint", "//pkg/testkit/testsetup", + "//pkg/util", "//pkg/util/cpu", "//pkg/util/disttask", "//pkg/util/logutil", @@ -86,6 +89,8 @@ go_test( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_prometheus_client_golang//prometheus", + "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//util", diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index 77ae61d067..4044683c3f 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -17,6 +17,7 @@ package scheduler import ( "context" goerrors "errors" + "fmt" "math/rand" "strings" "sync/atomic" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/storage" @@ -36,6 +38,7 @@ import ( disttaskutil "github.com/pingcap/tidb/pkg/util/disttask" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -123,7 +126,8 @@ func (s *BaseScheduler) ScheduleTask() { } // Close closes the scheduler. -func (*BaseScheduler) Close() { +func (s *BaseScheduler) Close() { + dxfmetric.ScheduleEventCounter.DeletePartialMatch(prometheus.Labels{dxfmetric.LblTaskID: fmt.Sprint(s.GetTask().ID)}) } // GetTask implements the Scheduler interface. @@ -351,6 +355,7 @@ func (s *BaseScheduler) onReverting() error { } task.State = proto.TaskStateReverted s.task.Store(task) + onTaskFinished(task.State, task.Error) return nil } // Wait all subtasks in this step finishes. @@ -469,6 +474,7 @@ func (s *BaseScheduler) switch2NextStep() error { task.Step = nextStep task.State = proto.TaskStateSucceed s.task.Store(task) + onTaskFinished(task.State, task.Error) return nil } @@ -579,6 +585,7 @@ func (s *BaseScheduler) handlePlanErr(err error) error { task := s.getTaskClone() s.logger.Warn("generate plan failed", zap.Error(err), zap.Stringer("state", task.State)) if s.IsRetryableErr(err) { + dxfmetric.ScheduleEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventRetry).Inc() return err } return s.revertTask(err) @@ -697,6 +704,9 @@ func (s *BaseScheduler) GetLogger() *zap.Logger { // IsCancelledErr checks if the error is a cancelled error. func IsCancelledErr(err error) bool { + if err == nil { + return false + } return strings.Contains(err.Error(), taskCancelMsg) } @@ -715,3 +725,21 @@ func getEligibleNodes(ctx context.Context, sch Scheduler, managedNodes []string) return serverNodes, nil } + +func onTaskFinished(state proto.TaskState, taskErr error) { + // when task finishes, we classify the finished tasks into succeed/failed/cancelled + var metricState string + + if state == proto.TaskStateSucceed || state == proto.TaskStateFailed { + metricState = state.String() + } else if state == proto.TaskStateReverted { + metricState = proto.TaskStateFailed.String() + if IsCancelledErr(taskErr) { + metricState = "cancelled" + } + } + if len(metricState) > 0 { + dxfmetric.FinishedTaskCounter.WithLabelValues("all").Inc() + dxfmetric.FinishedTaskCounter.WithLabelValues(metricState).Inc() + } +} diff --git a/pkg/disttask/framework/scheduler/scheduler_manager.go b/pkg/disttask/framework/scheduler/scheduler_manager.go index d5a44e2b52..473596afa1 100644 --- a/pkg/disttask/framework/scheduler/scheduler_manager.go +++ b/pkg/disttask/framework/scheduler/scheduler_manager.go @@ -21,9 +21,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config/kerneltype" + "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" + "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/metrics" tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" @@ -38,8 +42,10 @@ var ( // defaultHistorySubtaskTableGcInterval is the interval of gc history subtask table. defaultHistorySubtaskTableGcInterval = 24 * time.Hour // DefaultCleanUpInterval is the interval of cleanup routine. - DefaultCleanUpInterval = 10 * time.Minute - defaultCollectMetricsInterval = 5 * time.Second + DefaultCleanUpInterval = 10 * time.Minute + // metric scraping mostly happens at 15s intervals, it's meaningless to update + // internal collected date more frequently, so we align with that. + defaultCollectMetricsInterval = 15 * time.Second ) func (sm *Manager) getSchedulerCount() int { @@ -118,6 +124,8 @@ type Manager struct { schedulers []Scheduler } nodeRes *proto.NodeResource + // initialized on demand + metricCollector *dxfmetric.Collector } // NewManager creates a scheduler struct. @@ -187,6 +195,10 @@ func (sm *Manager) Stop() { sm.clearSchedulers() sm.initialized = false close(sm.finishCh) + + // clear existing counters on owner change + dxfmetric.WorkerCount.Reset() + dxfmetric.FinishedTaskCounter.Reset() } // Initialized check the manager initialized. @@ -295,6 +307,8 @@ func (sm *Manager) failTask(id int64, currState proto.TaskState, err error) { if err2 := sm.taskMgr.FailTask(sm.ctx, id, currState, err); err2 != nil { sm.logger.Warn("failed to update task state to failed", zap.Int64("task-id", id), zap.Error(err2)) + } else { + onTaskFinished(proto.TaskStateFailed, err) } } @@ -447,6 +461,11 @@ func (sm *Manager) collectLoop() { sm.logger.Info("collect loop start") ticker := time.NewTicker(defaultCollectMetricsInterval) defer ticker.Stop() + sm.metricCollector = dxfmetric.NewCollector() + metrics.Register(sm.metricCollector) + defer func() { + metrics.Unregister(sm.metricCollector) + }() for { select { case <-sm.ctx.Done(): @@ -468,8 +487,36 @@ func (sm *Manager) collect() { sm.logger.Warn("get all subtasks failed", zap.Error(err)) return } - disttaskCollector.taskInfo.Store(&tasks) - disttaskCollector.subtaskInfo.Store(&subtasks) + sm.metricCollector.UpdateInfo(tasks, subtasks) + + if kerneltype.IsNextGen() { + sm.collectWorkerMetrics(tasks) + } +} + +func (sm *Manager) collectWorkerMetrics(tasks []*proto.TaskBase) { + manager, err := storage.GetTaskManager() + if err != nil { + sm.logger.Warn("failed to get task manager", zap.Error(err)) + return + } + nodeCount, nodeCPU, err := handle.GetNodesInfo(sm.ctx, manager) + if err != nil { + sm.logger.Warn("failed to get nodes info", zap.Error(err)) + return + } + scheduledTasks := make([]*proto.TaskBase, 0, len(tasks)) + for _, t := range tasks { + if t.State == proto.TaskStateRunning || t.State == proto.TaskStateModifying { + scheduledTasks = append(scheduledTasks, t) + } + } + slices.SortFunc(scheduledTasks, func(i, j *proto.TaskBase) int { + return i.Compare(j) + }) + requiredNodes := handle.CalculateRequiredNodes(tasks, nodeCPU) + dxfmetric.WorkerCount.WithLabelValues("required").Set(float64(requiredNodes)) + dxfmetric.WorkerCount.WithLabelValues("current").Set(float64(nodeCount)) } // MockScheduler mock one scheduler for one task, only used for tests. diff --git a/pkg/disttask/framework/scheduler/scheduler_nokit_test.go b/pkg/disttask/framework/scheduler/scheduler_nokit_test.go index 129f6d1d45..6214b617c6 100644 --- a/pkg/disttask/framework/scheduler/scheduler_nokit_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_nokit_test.go @@ -22,11 +22,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric" "github.com/pingcap/tidb/pkg/disttask/framework/mock" "github.com/pingcap/tidb/pkg/disttask/framework/proto" schmock "github.com/pingcap/tidb/pkg/disttask/framework/scheduler/mock" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/kv" + tidbutil "github.com/pingcap/tidb/pkg/util" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/util" "go.uber.org/mock/gomock" @@ -572,3 +576,44 @@ func TestSchedulerMaintainTaskFields(t *testing.T) { require.True(t, ctrl.Satisfied()) }) } + +func TestOnTaskFinished(t *testing.T) { + bak := dxfmetric.FinishedTaskCounter + t.Cleanup(func() { + dxfmetric.FinishedTaskCounter = bak + }) + dxfmetric.FinishedTaskCounter = prometheus.NewCounterVec(prometheus.CounterOpts{Name: "test"}, []string{"state"}) + collectMetricsFn := func() map[string]int { + var ch = make(chan prometheus.Metric) + items := make([]*dto.Metric, 0) + var wg tidbutil.WaitGroupWrapper + wg.Run(func() { + for m := range ch { + dm := &dto.Metric{} + require.NoError(t, m.Write(dm)) + items = append(items, dm) + } + }) + dxfmetric.FinishedTaskCounter.Collect(ch) + close(ch) + wg.Wait() + values := make(map[string]int) + for _, it := range items { + values[*it.GetLabel()[0].Value] = int(it.GetCounter().GetValue()) + } + return values + } + onTaskFinished(proto.TaskStateSucceed, nil) + require.EqualValues(t, map[string]int{"all": 1, "succeed": 1}, collectMetricsFn()) + onTaskFinished(proto.TaskStateReverted, nil) + require.EqualValues(t, map[string]int{"all": 2, "succeed": 1, "failed": 1}, collectMetricsFn()) + onTaskFinished(proto.TaskStateReverted, errors.New("some err")) + require.EqualValues(t, map[string]int{"all": 3, "succeed": 1, "failed": 2}, collectMetricsFn()) + onTaskFinished(proto.TaskStateReverted, errors.New(taskCancelMsg)) + require.EqualValues(t, map[string]int{"all": 4, "succeed": 1, "failed": 2, "cancelled": 1}, collectMetricsFn()) + onTaskFinished(proto.TaskStateFailed, errors.New("some err")) + require.EqualValues(t, map[string]int{"all": 5, "succeed": 1, "failed": 3, "cancelled": 1}, collectMetricsFn()) + // noop for non-finished state. + onTaskFinished(proto.TaskStateRunning, nil) + require.EqualValues(t, map[string]int{"all": 5, "succeed": 1, "failed": 3, "cancelled": 1}, collectMetricsFn()) +} diff --git a/pkg/disttask/framework/scheduler/scheduler_test.go b/pkg/disttask/framework/scheduler/scheduler_test.go index f810ee9a71..06fc3ce2b5 100644 --- a/pkg/disttask/framework/scheduler/scheduler_test.go +++ b/pkg/disttask/framework/scheduler/scheduler_test.go @@ -402,6 +402,7 @@ func TestVerifyTaskStateTransform(t *testing.T) { } func TestIsCancelledErr(t *testing.T) { + require.False(t, scheduler.IsCancelledErr(nil)) require.False(t, scheduler.IsCancelledErr(errors.New("some err"))) require.False(t, scheduler.IsCancelledErr(context.Canceled)) require.True(t, scheduler.IsCancelledErr(errors.New("cancelled by user"))) diff --git a/pkg/disttask/framework/taskexecutor/BUILD.bazel b/pkg/disttask/framework/taskexecutor/BUILD.bazel index 420522ecf5..e4494fb569 100644 --- a/pkg/disttask/framework/taskexecutor/BUILD.bazel +++ b/pkg/disttask/framework/taskexecutor/BUILD.bazel @@ -14,6 +14,7 @@ go_library( deps = [ "//br/pkg/storage", "//pkg/config", + "//pkg/disttask/framework/dxfmetric", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/proto", "//pkg/disttask/framework/scheduler", @@ -33,6 +34,7 @@ go_library( "//pkg/util/memory", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_prometheus_client_golang//prometheus", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index e2ae02c349..937d8a579d 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" litstorage "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" @@ -149,7 +150,7 @@ func (m *Manager) handleTasksLoop() { m.handleTasks() // service scope might change, so we call WithLabelValues every time. - metrics.DistTaskUsedSlotsGauge.WithLabelValues(vardef.ServiceScope.Load()). + dxfmetric.UsedSlotsGauge.WithLabelValues(vardef.ServiceScope.Load()). Set(float64(m.slotManager.usedSlots())) metrics.GlobalSortUploadWorkerCount.Set(float64(litstorage.GetActiveUploadWorkerCount())) } diff --git a/pkg/disttask/framework/taskexecutor/task_executor.go b/pkg/disttask/framework/taskexecutor/task_executor.go index 4bbfa1945e..4ed89fbe97 100644 --- a/pkg/disttask/framework/taskexecutor/task_executor.go +++ b/pkg/disttask/framework/taskexecutor/task_executor.go @@ -18,6 +18,7 @@ import ( "bytes" "context" goerrors "errors" + "fmt" "runtime" "sync" "sync/atomic" @@ -25,6 +26,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" @@ -39,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -138,6 +141,7 @@ func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *Ba // - If current running subtask are scheduled away from this node, i.e. this node // is taken as down, cancel running. func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCancel context.CancelFunc) { + start := time.Now() ticker := time.NewTicker(checkBalanceSubtaskInterval) defer ticker.Stop() for { @@ -148,6 +152,12 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCa } task := e.task.Load() + if time.Since(start) > time.Hour { + start = time.Now() + e.logger.Info("subtask running for too long", zap.Int64("subtaskID", e.currSubtaskID.Load())) + dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventSubtaskSlow).Inc() + } + subtasks, err := e.taskTable.GetSubtasksByExecIDAndStepAndStates(ctx, e.execID, task.ID, task.Step, proto.SubtaskStateRunning) if err != nil { @@ -158,6 +168,7 @@ func (e *BaseTaskExecutor) checkBalanceSubtask(ctx context.Context, subtaskCtxCa e.logger.Info("subtask is scheduled away, cancel running", zap.Int64("subtaskID", e.currSubtaskID.Load())) // cancels runStep, but leave the subtask state unchanged. + dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventSubtaskScheduledAway).Inc() if subtaskCtxCancel != nil { subtaskCtxCancel() } @@ -369,6 +380,7 @@ func (e *BaseTaskExecutor) createStepExecutor() error { if err := stepExecutor.Init(e.ctx); err != nil { if e.IsRetryableError(err) { + dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventRetry).Inc() e.logger.Info("meet retryable err when init step executor", zap.Error(err)) } else { e.logger.Info("failed to init step executor", zap.Error(err)) @@ -423,6 +435,7 @@ func (e *BaseTaskExecutor) runSubtask(subtask *proto.Subtask) (resErr error) { } return ErrNonIdempotentSubtask } + dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(subtask.TaskID), dxfmetric.EventSubtaskRerun).Inc() e.logger.Info("subtask in running state and is idempotent", zap.Int64("subtask-id", subtask.ID)) } else { @@ -626,6 +639,8 @@ func (e *BaseTaskExecutor) Cancel() { // Close closes the TaskExecutor when all the subtasks are complete. func (e *BaseTaskExecutor) Close() { e.Cancel() + + dxfmetric.ExecuteEventCounter.DeletePartialMatch(prometheus.Labels{dxfmetric.LblTaskID: fmt.Sprint(e.GetTaskBase().ID)}) } func (e *BaseTaskExecutor) cancelRunStepWith(cause error) { @@ -709,6 +724,7 @@ func (e *BaseTaskExecutor) markSubTaskCanceledOrFailed(ctx context.Context, subt e.logger.Info("meet context canceled for gracefully shutdown") } else if e.IsRetryableError(stErr) { + dxfmetric.ExecuteEventCounter.WithLabelValues(fmt.Sprint(subtask.TaskID), dxfmetric.EventRetry).Inc() e.logger.Warn("meet retryable error", zap.Error(stErr)) } else { e.logger.Warn("subtask failed", zap.Error(stErr)) diff --git a/pkg/disttask/importinto/BUILD.bazel b/pkg/disttask/importinto/BUILD.bazel index 5c59cec1a4..a106206e34 100644 --- a/pkg/disttask/importinto/BUILD.bazel +++ b/pkg/disttask/importinto/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//pkg/config", "//pkg/config/kerneltype", "//pkg/ddl", + "//pkg/disttask/framework/dxfmetric", "//pkg/disttask/framework/handle", "//pkg/disttask/framework/metering", "//pkg/disttask/framework/planner", diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index bbe9c7e620..8b1b5c15f2 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -17,6 +17,7 @@ package importinto import ( "context" "encoding/json" + "fmt" "strconv" "sync" "time" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" tidb "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/config/kerneltype" + "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric" "github.com/pingcap/tidb/pkg/disttask/framework/handle" "github.com/pingcap/tidb/pkg/disttask/framework/planner" "github.com/pingcap/tidb/pkg/disttask/framework/proto" @@ -51,6 +53,11 @@ import ( ) const ( + // warningIndexCount is the threshold to log warning for too many indexes on + // the target table, it's known to be slow to import in this case. + // the value if chosen as most tables have less than 32 indexes, we can adjust + // it later if needed. + warningIndexCount = 32 registerTaskTTL = 10 * time.Minute refreshTaskTTLInterval = 3 * time.Minute registerTimeout = 5 * time.Second @@ -314,6 +321,9 @@ func (sch *importScheduler) OnNextSubtasksBatch( if err = sch.startJob(ctx, logger, taskMeta, jobStep); err != nil { return nil, err } + if importer.GetNumOfIndexGenKV(taskMeta.Plan.TableInfo) > warningIndexCount { + dxfmetric.ScheduleEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventTooManyIdx).Inc() + } case proto.ImportStepMergeSort: sortAndEncodeMeta, err := taskHandle.GetPreviousSubtaskMetas(task.ID, proto.ImportStepEncodeAndSort) if err != nil { @@ -391,6 +401,9 @@ func (sch *importScheduler) OnNextSubtasksBatch( } logger.Info("generate subtasks", zap.Int("subtask-count", len(metaBytes))) + if nextStep == proto.ImportStepMergeSort && len(metaBytes) > 0 { + dxfmetric.ScheduleEventCounter.WithLabelValues(fmt.Sprint(task.ID), dxfmetric.EventMergeSort).Inc() + } return metaBytes, nil } diff --git a/pkg/domain/ru_stats_test.go b/pkg/domain/ru_stats_test.go index 7c34c2917e..06d5ed2444 100644 --- a/pkg/domain/ru_stats_test.go +++ b/pkg/domain/ru_stats_test.go @@ -16,6 +16,7 @@ package domain_test import ( "context" + "fmt" "testing" "time" @@ -30,15 +31,15 @@ import ( ) func TestWriteRUStatistics(t *testing.T) { - tz, _ := time.LoadLocation("Asia/Shanghai") - testWriteRUStatisticsTz(t, tz) - + tzShanghai, _ := time.LoadLocation("Asia/Shanghai") // test with DST timezone. - tz, _ = time.LoadLocation("Australia/Lord_Howe") - testWriteRUStatisticsTz(t, tz) + tzLord, _ := time.LoadLocation("Australia/Lord_Howe") - testWriteRUStatisticsTz(t, time.Local) - testWriteRUStatisticsTz(t, time.UTC) + for i, tz := range []*time.Location{tzShanghai, tzLord, time.Local, time.UTC} { + t.Run(fmt.Sprint(i), func(t *testing.T) { + testWriteRUStatisticsTz(t, tz) + }) + } } func testWriteRUStatisticsTz(t *testing.T, tz *time.Location) { diff --git a/pkg/executor/point_get_test.go b/pkg/executor/point_get_test.go index 14eb73c7aa..a87670f50b 100644 --- a/pkg/executor/point_get_test.go +++ b/pkg/executor/point_get_test.go @@ -229,7 +229,7 @@ func TestPartitionMemCacheReadLock(t *testing.T) { } func TestPointGetLockExistKey(t *testing.T) { - testLock := func(rc bool, key string, tableName string) { + testLock := func(t *testing.T, rc bool, key string, tableName string) { store := testkit.CreateMockStore(t) tk1, tk2 := testkit.NewTestKit(t, store), testkit.NewTestKit(t, store) @@ -336,9 +336,11 @@ func TestPointGetLockExistKey(t *testing.T) { {rc: true, key: "unique key"}, } { tableName := fmt.Sprintf("t_%d", i) - func(rc bool, key string, tableName string) { - testLock(rc, key, tableName) - }(one.rc, one.key, tableName) + t.Run(tableName, func(t *testing.T) { + func(rc bool, key string, tableName string) { + testLock(t, rc, key, tableName) + }(one.rc, one.key, tableName) + }) } } diff --git a/pkg/executor/sortexec/sort_test.go b/pkg/executor/sortexec/sort_test.go index c5cd467487..e0c75628b1 100644 --- a/pkg/executor/sortexec/sort_test.go +++ b/pkg/executor/sortexec/sort_test.go @@ -32,8 +32,11 @@ import ( // TODO remove spill as it should be tested in sort_spill_test.go // TODO add some new fail points, as some fail point may be aborted after the refine func TestSortInDisk(t *testing.T) { - testSortInDisk(t, false) - testSortInDisk(t, true) + for i, v := range []bool{false, true} { + t.Run(fmt.Sprint(i), func(t *testing.T) { + testSortInDisk(t, v) + }) + } } // TODO remove failpoint diff --git a/pkg/metrics/BUILD.bazel b/pkg/metrics/BUILD.bazel index 6eed882aba..3e381f7a92 100644 --- a/pkg/metrics/BUILD.bazel +++ b/pkg/metrics/BUILD.bazel @@ -7,7 +7,6 @@ go_library( "br.go", "ddl.go", "distsql.go", - "disttask.go", "domain.go", "executor.go", "gc_worker.go", @@ -32,6 +31,7 @@ go_library( importpath = "github.com/pingcap/tidb/pkg/metrics", visibility = ["//visibility:public"], deps = [ + "//pkg/disttask/framework/dxfmetric", "//pkg/lightning/metric", "//pkg/metrics/common", "//pkg/parser/terror", diff --git a/pkg/metrics/common/BUILD.bazel b/pkg/metrics/common/BUILD.bazel index 937d770a6b..54e89ef041 100644 --- a/pkg/metrics/common/BUILD.bazel +++ b/pkg/metrics/common/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "common", @@ -7,3 +7,15 @@ go_library( visibility = ["//visibility:public"], deps = ["@com_github_prometheus_client_golang//prometheus"], ) + +go_test( + name = "common_test", + timeout = "short", + srcs = ["wrapper_test.go"], + embed = [":common"], + flaky = True, + deps = [ + "@com_github_prometheus_client_golang//prometheus", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/metrics/common/wrapper.go b/pkg/metrics/common/wrapper.go index 093b9db216..f668ceb7c7 100644 --- a/pkg/metrics/common/wrapper.go +++ b/pkg/metrics/common/wrapper.go @@ -16,6 +16,7 @@ package metricscommon import ( "fmt" + "maps" "strings" "github.com/prometheus/client_golang/prometheus" @@ -28,6 +29,19 @@ func GetConstLabels() prometheus.Labels { return constLabels } +// GetMergedConstLabels merges input constant labels with package-level constant labels. +func GetMergedConstLabels(in prometheus.Labels) prometheus.Labels { + res := constLabels + if len(in) > 0 { + res = make(prometheus.Labels, len(constLabels)+len(in)) + // merge in and constLabels, but constLabels defined in this package has + // higher priority. + maps.Copy(res, in) + maps.Copy(res, constLabels) + } + return res +} + // SetConstLabels sets constant labels for metrics. func SetConstLabels(kv ...string) { if len(kv)%2 == 1 { @@ -80,3 +94,9 @@ func NewSummaryVec(opts prometheus.SummaryOpts, labelNames []string) *prometheus opts.ConstLabels = constLabels return prometheus.NewSummaryVec(opts, labelNames) } + +// NewDesc wraps a prometheus.NewDesc. +func NewDesc(fqName, help string, variableLabels []string, inConstLbls prometheus.Labels) *prometheus.Desc { + cstLabels := GetMergedConstLabels(inConstLbls) + return prometheus.NewDesc(fqName, help, variableLabels, cstLabels) +} diff --git a/pkg/metrics/common/wrapper_test.go b/pkg/metrics/common/wrapper_test.go new file mode 100644 index 0000000000..ac4791ac7e --- /dev/null +++ b/pkg/metrics/common/wrapper_test.go @@ -0,0 +1,39 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metricscommon + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestGetMergedConstLabels(t *testing.T) { + bak := constLabels + t.Cleanup(func() { + constLabels = bak + }) + SetConstLabels() + require.EqualValues(t, prometheus.Labels{}, GetMergedConstLabels(nil)) + require.EqualValues(t, prometheus.Labels{"c": "3"}, GetMergedConstLabels(prometheus.Labels{"c": "3"})) + + SetConstLabels("a", "1", "b", "2") + require.EqualValues(t, prometheus.Labels{"a": "1", "b": "2"}, GetMergedConstLabels(nil)) + require.EqualValues(t, prometheus.Labels{"a": "1", "b": "2"}, GetMergedConstLabels(prometheus.Labels{})) + require.EqualValues(t, prometheus.Labels{"a": "1", "b": "2", "c": "3"}, GetMergedConstLabels(prometheus.Labels{"c": "3"})) + // constLabels has higher priority + require.EqualValues(t, prometheus.Labels{"a": "1", "b": "2"}, GetMergedConstLabels(prometheus.Labels{"a": "100"})) +} diff --git a/pkg/metrics/disttask.go b/pkg/metrics/disttask.go deleted file mode 100644 index cbc6a1dfa9..0000000000 --- a/pkg/metrics/disttask.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metrics - -import ( - metricscommon "github.com/pingcap/tidb/pkg/metrics/common" - "github.com/prometheus/client_golang/prometheus" -) - -var ( - // DistTaskUsedSlotsGauge is the gauge of used slots on executor node. - DistTaskUsedSlotsGauge *prometheus.GaugeVec -) - -// InitDistTaskMetrics initializes disttask metrics. -func InitDistTaskMetrics() { - DistTaskUsedSlotsGauge = metricscommon.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "tidb", - Subsystem: "disttask", - Name: "used_slots", - Help: "Gauge of used slots on a executor node.", - }, []string{"service_scope"}) -} diff --git a/pkg/metrics/import.go b/pkg/metrics/import.go index 5d6194a852..a515ca64b2 100644 --- a/pkg/metrics/import.go +++ b/pkg/metrics/import.go @@ -16,6 +16,7 @@ package metrics import ( "github.com/pingcap/tidb/pkg/lightning/metric" + metricscommon "github.com/pingcap/tidb/pkg/metrics/common" "github.com/pingcap/tidb/pkg/util/promutil" "github.com/prometheus/client_golang/prometheus" ) @@ -24,7 +25,8 @@ const importMetricSubsystem = "import" // GetRegisteredImportMetrics returns the registered import metrics. func GetRegisteredImportMetrics(factory promutil.Factory, constLabels prometheus.Labels) *metric.Common { - metrics := metric.NewCommon(factory, TiDB, importMetricSubsystem, constLabels) + mergedCstLabels := metricscommon.GetMergedConstLabels(constLabels) + metrics := metric.NewCommon(factory, TiDB, importMetricSubsystem, mergedCstLabels) metrics.RegisterTo(prometheus.DefaultRegisterer) return metrics } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index a319de84db..2ce1a7ba20 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -17,6 +17,7 @@ package metrics import ( "sync" + "github.com/pingcap/tidb/pkg/disttask/framework/dxfmetric" metricscommon "github.com/pingcap/tidb/pkg/metrics/common" timermetrics "github.com/pingcap/tidb/pkg/timer/metrics" "github.com/pingcap/tidb/pkg/util/logutil" @@ -95,7 +96,7 @@ func InitMetrics() { InitTelemetryMetrics() InitTopSQLMetrics() InitTTLMetrics() - InitDistTaskMetrics() + dxfmetric.InitDistTaskMetrics() InitResourceGroupMetrics() InitGlobalSortMetrics() InitInfoSchemaV2Metrics() @@ -282,7 +283,8 @@ func RegisterMetrics() { prometheus.MustRegister(PlanReplayerTaskCounter) prometheus.MustRegister(PlanReplayerRegisterTaskGauge) - prometheus.MustRegister(DistTaskUsedSlotsGauge) + dxfmetric.Register(prometheus.DefaultRegisterer) + prometheus.MustRegister(RunawayCheckerCounter) prometheus.MustRegister(GlobalSortWriteToCloudStorageDuration) prometheus.MustRegister(GlobalSortWriteToCloudStorageRate) @@ -335,6 +337,18 @@ func RegisterMetrics() { tikvmetrics.TiKVPanicCounter = PanicCounter // reset tidb metrics for tikv metrics } +// Register registers custom collectors. +func Register(cs ...prometheus.Collector) { + prometheus.MustRegister(cs...) +} + +// Unregister unregisters custom collectors. +func Unregister(cs ...prometheus.Collector) { + for _, c := range cs { + prometheus.Unregister(c) + } +} + var mode struct { sync.Mutex isSimplified bool diff --git a/pkg/planner/core/plan_cache_test.go b/pkg/planner/core/plan_cache_test.go index 71ad3c3bf5..2653dfaa50 100644 --- a/pkg/planner/core/plan_cache_test.go +++ b/pkg/planner/core/plan_cache_test.go @@ -837,13 +837,15 @@ func planCacheIndexMergePrepareData(tk *testkit.TestKit) { } func TestPlanCacheRandomCases(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - - testRandomPlanCacheCases(t, planCacheIndexMergePrepareData, planCacheIndexMergeQueries) - testRandomPlanCacheCases(t, planCacheIntConvertPrepareData, planCacheIntConvertQueries) - testRandomPlanCacheCases(t, planCachePointGetPrepareData, planCachePointGetQueries) + t.Run("1", func(t *testing.T) { + testRandomPlanCacheCases(t, planCacheIndexMergePrepareData, planCacheIndexMergeQueries) + }) + t.Run("2", func(t *testing.T) { + testRandomPlanCacheCases(t, planCacheIntConvertPrepareData, planCacheIntConvertQueries) + }) + t.Run("3", func(t *testing.T) { + testRandomPlanCacheCases(t, planCachePointGetPrepareData, planCachePointGetQueries) + }) } func testRandomPlanCacheCases(t *testing.T, diff --git a/pkg/server/tests/cursor/cursor_test.go b/pkg/server/tests/cursor/cursor_test.go index c6df0a503c..94f52d44a8 100644 --- a/pkg/server/tests/cursor/cursor_test.go +++ b/pkg/server/tests/cursor/cursor_test.go @@ -167,8 +167,11 @@ func TestCursorFetchExecuteCheck(t *testing.T) { } func TestConcurrentExecuteAndFetch(t *testing.T) { - runTestConcurrentExecuteAndFetch(t, false) - runTestConcurrentExecuteAndFetch(t, true) + for i, v := range []bool{false, true} { + t.Run(fmt.Sprint(i), func(t *testing.T) { + runTestConcurrentExecuteAndFetch(t, v) + }) + } } func runTestConcurrentExecuteAndFetch(t *testing.T, lazy bool) {