// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package web import ( "encoding/json" "sync" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/lightning/checkpoints" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/mydump" "go.uber.org/atomic" ) // checkpointsMap is a concurrent map (table name → checkpoints). // // Implementation note: Currently the checkpointsMap is only written from a // single goroutine inside (*RestoreController).listenCheckpointUpdates(), so // all writes are going to be single threaded. Writing to checkpoint is not // considered performance critical. The map can be read from any HTTP connection // goroutine. Therefore, we simply implement the concurrent map using a single // RWMutex. We may switch to more complicated data structure if contention is // shown to be a problem. // // Do not implement this using a sync.Map, its mutex can't protect the content // of a pointer. type checkpointsMap struct { mu sync.RWMutex checkpoints map[string]*checkpoints.TableCheckpoint } func makeCheckpointsMap() (res checkpointsMap) { res.checkpoints = make(map[string]*checkpoints.TableCheckpoint) return } func (cpm *checkpointsMap) clear() { cpm.mu.Lock() cpm.checkpoints = make(map[string]*checkpoints.TableCheckpoint) cpm.mu.Unlock() } func (cpm *checkpointsMap) insert(key string, cp *checkpoints.TableCheckpoint) { cpm.mu.Lock() cpm.checkpoints[key] = cp cpm.mu.Unlock() } type totalWritten struct { key string totalWritten int64 } func (cpm *checkpointsMap) update(diffs map[string]*checkpoints.TableCheckpointDiff) []totalWritten { totalWrittens := make([]totalWritten, 0, len(diffs)) cpm.mu.Lock() defer cpm.mu.Unlock() for key, diff := range diffs { cp := cpm.checkpoints[key] cp.Apply(diff) tw := int64(0) for _, engine := range cp.Engines { for _, chunk := range engine.Chunks { if engine.Status >= checkpoints.CheckpointStatusAllWritten { tw += chunk.TotalSize() } else { tw += chunk.Chunk.Offset - chunk.Key.Offset } } } totalWrittens = append(totalWrittens, totalWritten{key: key, totalWritten: tw}) } return totalWrittens } func (cpm *checkpointsMap) marshal(key string) ([]byte, error) { cpm.mu.RLock() defer cpm.mu.RUnlock() if cp, ok := cpm.checkpoints[key]; ok { return json.Marshal(cp) } return nil, errors.NotFoundf("table %s", key) } type taskStatus uint8 const ( taskStatusRunning taskStatus = 1 taskStatusCompleted taskStatus = 2 ) type tableInfo struct { TotalWritten int64 `json:"w"` TotalSize int64 `json:"z"` Status taskStatus `json:"s"` Message string `json:"m,omitempty"` Progresses []tableProgress `json:"progresses,omitempty"` } type tableProgress struct { Step string `json:"step"` Progress float64 `json:"progress"` } type taskProgress struct { mu sync.RWMutex Tables map[string]*tableInfo `json:"t"` Status taskStatus `json:"s"` Message string `json:"m,omitempty"` // The contents have their own mutex for protection checkpoints checkpointsMap } var ( currentProgress *taskProgress // whether progress is enabled progressEnabled = atomic.NewBool(false) ) // EnableCurrentProgress init current progress struct on demand. // NOTE: this call is not thread safe, so it should only be inited once at the very beginning of progress start. func EnableCurrentProgress() { currentProgress = &taskProgress{ checkpoints: makeCheckpointsMap(), } progressEnabled.Store(true) } // BroadcastStartTask sets the current task status to running. func BroadcastStartTask() { if !progressEnabled.Load() { return } currentProgress.mu.Lock() currentProgress.Status = taskStatusRunning currentProgress.mu.Unlock() currentProgress.checkpoints.clear() } // BroadcastEndTask sets the current task status to completed. func BroadcastEndTask(err error) { if !progressEnabled.Load() { return } errString := errors.ErrorStack(err) currentProgress.mu.Lock() currentProgress.Status = taskStatusCompleted currentProgress.Message = errString currentProgress.mu.Unlock() } // BroadcastInitProgress sets the total size of each table. func BroadcastInitProgress(databases []*mydump.MDDatabaseMeta) { if !progressEnabled.Load() { return } tables := make(map[string]*tableInfo, len(databases)) for _, db := range databases { for _, tbl := range db.Tables { name := common.UniqueTable(db.Name, tbl.Name) tables[name] = &tableInfo{TotalSize: tbl.TotalSize} } } currentProgress.mu.Lock() currentProgress.Tables = tables currentProgress.mu.Unlock() } // BroadcastTableCheckpoint updates the checkpoint of a table. func BroadcastTableCheckpoint(tableName string, cp *checkpoints.TableCheckpoint) { if !progressEnabled.Load() { return } currentProgress.mu.Lock() currentProgress.Tables[tableName].Status = taskStatusRunning currentProgress.mu.Unlock() // create a deep copy to avoid false sharing currentProgress.checkpoints.insert(tableName, cp.DeepCopy()) } // BroadcastTableProgress updates the progress of a table. func BroadcastTableProgress(tableName string, step string, progress float64) { if !progressEnabled.Load() { return } currentProgress.mu.Lock() progresses := currentProgress.Tables[tableName].Progresses var present bool for i, p := range progresses { if p.Step == step { progresses[i].Progress = progress present = true } } if !present { progresses = append(progresses, tableProgress{Step: step, Progress: progress}) } currentProgress.Tables[tableName].Progresses = progresses currentProgress.mu.Unlock() } // BroadcastCheckpointDiff updates the total written size of each table. func BroadcastCheckpointDiff(diffs map[string]*checkpoints.TableCheckpointDiff) { if !progressEnabled.Load() { return } totalWrittens := currentProgress.checkpoints.update(diffs) currentProgress.mu.Lock() for _, tw := range totalWrittens { currentProgress.Tables[tw.key].TotalWritten = tw.totalWritten } currentProgress.mu.Unlock() } // BroadcastError sets the error message of a table. func BroadcastError(tableName string, err error) { if !progressEnabled.Load() { return } errString := errors.ErrorStack(err) currentProgress.mu.Lock() if tbl := currentProgress.Tables[tableName]; tbl != nil { tbl.Status = taskStatusCompleted tbl.Message = errString } currentProgress.mu.Unlock() } // MarshalTaskProgress returns the current progress in JSON format. func MarshalTaskProgress() ([]byte, error) { if !progressEnabled.Load() { return nil, errors.New("progress is not enabled") } currentProgress.mu.RLock() defer currentProgress.mu.RUnlock() return json.Marshal(¤tProgress) } // MarshalTableCheckpoints returns the checkpoint of a table in JSON format. func MarshalTableCheckpoints(tableName string) ([]byte, error) { if !progressEnabled.Load() { return nil, errors.New("progress is not enabled") } return currentProgress.checkpoints.marshal(tableName) }