Files
tidb/lightning/pkg/web/progress.go

271 lines
7.4 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package web
import (
"encoding/json"
"sync"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"go.uber.org/atomic"
)
// checkpointsMap is a concurrent map (table name → checkpoints).
//
// Implementation note: Currently the checkpointsMap is only written from a
// single goroutine inside (*RestoreController).listenCheckpointUpdates(), so
// all writes are going to be single threaded. Writing to checkpoint is not
// considered performance critical. The map can be read from any HTTP connection
// goroutine. Therefore, we simply implement the concurrent map using a single
// RWMutex. We may switch to more complicated data structure if contention is
// shown to be a problem.
//
// Do not implement this using a sync.Map, its mutex can't protect the content
// of a pointer.
type checkpointsMap struct {
mu sync.RWMutex
checkpoints map[string]*checkpoints.TableCheckpoint
}
func makeCheckpointsMap() (res checkpointsMap) {
res.checkpoints = make(map[string]*checkpoints.TableCheckpoint)
return
}
func (cpm *checkpointsMap) clear() {
cpm.mu.Lock()
cpm.checkpoints = make(map[string]*checkpoints.TableCheckpoint)
cpm.mu.Unlock()
}
func (cpm *checkpointsMap) insert(key string, cp *checkpoints.TableCheckpoint) {
cpm.mu.Lock()
cpm.checkpoints[key] = cp
cpm.mu.Unlock()
}
type totalWritten struct {
key string
totalWritten int64
}
func (cpm *checkpointsMap) update(diffs map[string]*checkpoints.TableCheckpointDiff) []totalWritten {
totalWrittens := make([]totalWritten, 0, len(diffs))
cpm.mu.Lock()
defer cpm.mu.Unlock()
for key, diff := range diffs {
cp := cpm.checkpoints[key]
cp.Apply(diff)
tw := int64(0)
for _, engine := range cp.Engines {
for _, chunk := range engine.Chunks {
if engine.Status >= checkpoints.CheckpointStatusAllWritten {
tw += chunk.TotalSize()
} else {
tw += chunk.Chunk.Offset - chunk.Key.Offset
}
}
}
totalWrittens = append(totalWrittens, totalWritten{key: key, totalWritten: tw})
}
return totalWrittens
}
func (cpm *checkpointsMap) marshal(key string) ([]byte, error) {
cpm.mu.RLock()
defer cpm.mu.RUnlock()
if cp, ok := cpm.checkpoints[key]; ok {
return json.Marshal(cp)
}
return nil, errors.NotFoundf("table %s", key)
}
type taskStatus uint8
const (
taskStatusRunning taskStatus = 1
taskStatusCompleted taskStatus = 2
)
type tableInfo struct {
TotalWritten int64 `json:"w"`
TotalSize int64 `json:"z"`
Status taskStatus `json:"s"`
Message string `json:"m,omitempty"`
Progresses []tableProgress `json:"progresses,omitempty"`
}
type tableProgress struct {
Step string `json:"step"`
Progress float64 `json:"progress"`
}
type taskProgress struct {
mu sync.RWMutex
Tables map[string]*tableInfo `json:"t"`
Status taskStatus `json:"s"`
Message string `json:"m,omitempty"`
// The contents have their own mutex for protection
checkpoints checkpointsMap
}
var (
currentProgress *taskProgress
// whether progress is enabled
progressEnabled = atomic.NewBool(false)
)
// EnableCurrentProgress init current progress struct on demand.
// NOTE: this call is not thread safe, so it should only be inited once at the very beginning of progress start.
func EnableCurrentProgress() {
currentProgress = &taskProgress{
checkpoints: makeCheckpointsMap(),
}
progressEnabled.Store(true)
}
// BroadcastStartTask sets the current task status to running.
func BroadcastStartTask() {
if !progressEnabled.Load() {
return
}
currentProgress.mu.Lock()
currentProgress.Status = taskStatusRunning
currentProgress.mu.Unlock()
currentProgress.checkpoints.clear()
}
// BroadcastEndTask sets the current task status to completed.
func BroadcastEndTask(err error) {
if !progressEnabled.Load() {
return
}
errString := errors.ErrorStack(err)
currentProgress.mu.Lock()
currentProgress.Status = taskStatusCompleted
currentProgress.Message = errString
currentProgress.mu.Unlock()
}
// BroadcastInitProgress sets the total size of each table.
func BroadcastInitProgress(databases []*mydump.MDDatabaseMeta) {
if !progressEnabled.Load() {
return
}
tables := make(map[string]*tableInfo, len(databases))
for _, db := range databases {
for _, tbl := range db.Tables {
name := common.UniqueTable(db.Name, tbl.Name)
tables[name] = &tableInfo{TotalSize: tbl.TotalSize}
}
}
currentProgress.mu.Lock()
currentProgress.Tables = tables
currentProgress.mu.Unlock()
}
// BroadcastTableCheckpoint updates the checkpoint of a table.
func BroadcastTableCheckpoint(tableName string, cp *checkpoints.TableCheckpoint) {
if !progressEnabled.Load() {
return
}
currentProgress.mu.Lock()
currentProgress.Tables[tableName].Status = taskStatusRunning
currentProgress.mu.Unlock()
// create a deep copy to avoid false sharing
currentProgress.checkpoints.insert(tableName, cp.DeepCopy())
}
// BroadcastTableProgress updates the progress of a table.
func BroadcastTableProgress(tableName string, step string, progress float64) {
if !progressEnabled.Load() {
return
}
currentProgress.mu.Lock()
progresses := currentProgress.Tables[tableName].Progresses
var present bool
for i, p := range progresses {
if p.Step == step {
progresses[i].Progress = progress
present = true
}
}
if !present {
progresses = append(progresses, tableProgress{Step: step, Progress: progress})
}
currentProgress.Tables[tableName].Progresses = progresses
currentProgress.mu.Unlock()
}
// BroadcastCheckpointDiff updates the total written size of each table.
func BroadcastCheckpointDiff(diffs map[string]*checkpoints.TableCheckpointDiff) {
if !progressEnabled.Load() {
return
}
totalWrittens := currentProgress.checkpoints.update(diffs)
currentProgress.mu.Lock()
for _, tw := range totalWrittens {
currentProgress.Tables[tw.key].TotalWritten = tw.totalWritten
}
currentProgress.mu.Unlock()
}
// BroadcastError sets the error message of a table.
func BroadcastError(tableName string, err error) {
if !progressEnabled.Load() {
return
}
errString := errors.ErrorStack(err)
currentProgress.mu.Lock()
if tbl := currentProgress.Tables[tableName]; tbl != nil {
tbl.Status = taskStatusCompleted
tbl.Message = errString
}
currentProgress.mu.Unlock()
}
// MarshalTaskProgress returns the current progress in JSON format.
func MarshalTaskProgress() ([]byte, error) {
if !progressEnabled.Load() {
return nil, errors.New("progress is not enabled")
}
currentProgress.mu.RLock()
defer currentProgress.mu.RUnlock()
return json.Marshal(&currentProgress)
}
// MarshalTableCheckpoints returns the checkpoint of a table in JSON format.
func MarshalTableCheckpoints(tableName string) ([]byte, error) {
if !progressEnabled.Load() {
return nil, errors.New("progress is not enabled")
}
return currentProgress.checkpoints.marshal(tableName)
}