271 lines
7.4 KiB
Go
271 lines
7.4 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package web
|
|
|
|
import (
|
|
"encoding/json"
|
|
"sync"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/lightning/mydump"
|
|
"go.uber.org/atomic"
|
|
)
|
|
|
|
// checkpointsMap is a concurrent map (table name → checkpoints).
|
|
//
|
|
// Implementation note: Currently the checkpointsMap is only written from a
|
|
// single goroutine inside (*RestoreController).listenCheckpointUpdates(), so
|
|
// all writes are going to be single threaded. Writing to checkpoint is not
|
|
// considered performance critical. The map can be read from any HTTP connection
|
|
// goroutine. Therefore, we simply implement the concurrent map using a single
|
|
// RWMutex. We may switch to more complicated data structure if contention is
|
|
// shown to be a problem.
|
|
//
|
|
// Do not implement this using a sync.Map, its mutex can't protect the content
|
|
// of a pointer.
|
|
type checkpointsMap struct {
|
|
mu sync.RWMutex
|
|
checkpoints map[string]*checkpoints.TableCheckpoint
|
|
}
|
|
|
|
func makeCheckpointsMap() (res checkpointsMap) {
|
|
res.checkpoints = make(map[string]*checkpoints.TableCheckpoint)
|
|
return
|
|
}
|
|
|
|
func (cpm *checkpointsMap) clear() {
|
|
cpm.mu.Lock()
|
|
cpm.checkpoints = make(map[string]*checkpoints.TableCheckpoint)
|
|
cpm.mu.Unlock()
|
|
}
|
|
|
|
func (cpm *checkpointsMap) insert(key string, cp *checkpoints.TableCheckpoint) {
|
|
cpm.mu.Lock()
|
|
cpm.checkpoints[key] = cp
|
|
cpm.mu.Unlock()
|
|
}
|
|
|
|
type totalWritten struct {
|
|
key string
|
|
totalWritten int64
|
|
}
|
|
|
|
func (cpm *checkpointsMap) update(diffs map[string]*checkpoints.TableCheckpointDiff) []totalWritten {
|
|
totalWrittens := make([]totalWritten, 0, len(diffs))
|
|
|
|
cpm.mu.Lock()
|
|
defer cpm.mu.Unlock()
|
|
|
|
for key, diff := range diffs {
|
|
cp := cpm.checkpoints[key]
|
|
cp.Apply(diff)
|
|
|
|
tw := int64(0)
|
|
for _, engine := range cp.Engines {
|
|
for _, chunk := range engine.Chunks {
|
|
if engine.Status >= checkpoints.CheckpointStatusAllWritten {
|
|
tw += chunk.TotalSize()
|
|
} else {
|
|
tw += chunk.Chunk.Offset - chunk.Key.Offset
|
|
}
|
|
}
|
|
}
|
|
totalWrittens = append(totalWrittens, totalWritten{key: key, totalWritten: tw})
|
|
}
|
|
return totalWrittens
|
|
}
|
|
|
|
func (cpm *checkpointsMap) marshal(key string) ([]byte, error) {
|
|
cpm.mu.RLock()
|
|
defer cpm.mu.RUnlock()
|
|
|
|
if cp, ok := cpm.checkpoints[key]; ok {
|
|
return json.Marshal(cp)
|
|
}
|
|
return nil, errors.NotFoundf("table %s", key)
|
|
}
|
|
|
|
type taskStatus uint8
|
|
|
|
const (
|
|
taskStatusRunning taskStatus = 1
|
|
taskStatusCompleted taskStatus = 2
|
|
)
|
|
|
|
type tableInfo struct {
|
|
TotalWritten int64 `json:"w"`
|
|
TotalSize int64 `json:"z"`
|
|
Status taskStatus `json:"s"`
|
|
Message string `json:"m,omitempty"`
|
|
Progresses []tableProgress `json:"progresses,omitempty"`
|
|
}
|
|
|
|
type tableProgress struct {
|
|
Step string `json:"step"`
|
|
Progress float64 `json:"progress"`
|
|
}
|
|
|
|
type taskProgress struct {
|
|
mu sync.RWMutex
|
|
Tables map[string]*tableInfo `json:"t"`
|
|
Status taskStatus `json:"s"`
|
|
Message string `json:"m,omitempty"`
|
|
|
|
// The contents have their own mutex for protection
|
|
checkpoints checkpointsMap
|
|
}
|
|
|
|
var (
|
|
currentProgress *taskProgress
|
|
// whether progress is enabled
|
|
progressEnabled = atomic.NewBool(false)
|
|
)
|
|
|
|
// EnableCurrentProgress init current progress struct on demand.
|
|
// NOTE: this call is not thread safe, so it should only be inited once at the very beginning of progress start.
|
|
func EnableCurrentProgress() {
|
|
currentProgress = &taskProgress{
|
|
checkpoints: makeCheckpointsMap(),
|
|
}
|
|
progressEnabled.Store(true)
|
|
}
|
|
|
|
// BroadcastStartTask sets the current task status to running.
|
|
func BroadcastStartTask() {
|
|
if !progressEnabled.Load() {
|
|
return
|
|
}
|
|
currentProgress.mu.Lock()
|
|
currentProgress.Status = taskStatusRunning
|
|
currentProgress.mu.Unlock()
|
|
|
|
currentProgress.checkpoints.clear()
|
|
}
|
|
|
|
// BroadcastEndTask sets the current task status to completed.
|
|
func BroadcastEndTask(err error) {
|
|
if !progressEnabled.Load() {
|
|
return
|
|
}
|
|
errString := errors.ErrorStack(err)
|
|
|
|
currentProgress.mu.Lock()
|
|
currentProgress.Status = taskStatusCompleted
|
|
currentProgress.Message = errString
|
|
currentProgress.mu.Unlock()
|
|
}
|
|
|
|
// BroadcastInitProgress sets the total size of each table.
|
|
func BroadcastInitProgress(databases []*mydump.MDDatabaseMeta) {
|
|
if !progressEnabled.Load() {
|
|
return
|
|
}
|
|
tables := make(map[string]*tableInfo, len(databases))
|
|
|
|
for _, db := range databases {
|
|
for _, tbl := range db.Tables {
|
|
name := common.UniqueTable(db.Name, tbl.Name)
|
|
tables[name] = &tableInfo{TotalSize: tbl.TotalSize}
|
|
}
|
|
}
|
|
|
|
currentProgress.mu.Lock()
|
|
currentProgress.Tables = tables
|
|
currentProgress.mu.Unlock()
|
|
}
|
|
|
|
// BroadcastTableCheckpoint updates the checkpoint of a table.
|
|
func BroadcastTableCheckpoint(tableName string, cp *checkpoints.TableCheckpoint) {
|
|
if !progressEnabled.Load() {
|
|
return
|
|
}
|
|
currentProgress.mu.Lock()
|
|
currentProgress.Tables[tableName].Status = taskStatusRunning
|
|
currentProgress.mu.Unlock()
|
|
|
|
// create a deep copy to avoid false sharing
|
|
currentProgress.checkpoints.insert(tableName, cp.DeepCopy())
|
|
}
|
|
|
|
// BroadcastTableProgress updates the progress of a table.
|
|
func BroadcastTableProgress(tableName string, step string, progress float64) {
|
|
if !progressEnabled.Load() {
|
|
return
|
|
}
|
|
currentProgress.mu.Lock()
|
|
progresses := currentProgress.Tables[tableName].Progresses
|
|
var present bool
|
|
for i, p := range progresses {
|
|
if p.Step == step {
|
|
progresses[i].Progress = progress
|
|
present = true
|
|
}
|
|
}
|
|
if !present {
|
|
progresses = append(progresses, tableProgress{Step: step, Progress: progress})
|
|
}
|
|
currentProgress.Tables[tableName].Progresses = progresses
|
|
currentProgress.mu.Unlock()
|
|
}
|
|
|
|
// BroadcastCheckpointDiff updates the total written size of each table.
|
|
func BroadcastCheckpointDiff(diffs map[string]*checkpoints.TableCheckpointDiff) {
|
|
if !progressEnabled.Load() {
|
|
return
|
|
}
|
|
totalWrittens := currentProgress.checkpoints.update(diffs)
|
|
|
|
currentProgress.mu.Lock()
|
|
for _, tw := range totalWrittens {
|
|
currentProgress.Tables[tw.key].TotalWritten = tw.totalWritten
|
|
}
|
|
currentProgress.mu.Unlock()
|
|
}
|
|
|
|
// BroadcastError sets the error message of a table.
|
|
func BroadcastError(tableName string, err error) {
|
|
if !progressEnabled.Load() {
|
|
return
|
|
}
|
|
errString := errors.ErrorStack(err)
|
|
|
|
currentProgress.mu.Lock()
|
|
if tbl := currentProgress.Tables[tableName]; tbl != nil {
|
|
tbl.Status = taskStatusCompleted
|
|
tbl.Message = errString
|
|
}
|
|
currentProgress.mu.Unlock()
|
|
}
|
|
|
|
// MarshalTaskProgress returns the current progress in JSON format.
|
|
func MarshalTaskProgress() ([]byte, error) {
|
|
if !progressEnabled.Load() {
|
|
return nil, errors.New("progress is not enabled")
|
|
}
|
|
currentProgress.mu.RLock()
|
|
defer currentProgress.mu.RUnlock()
|
|
return json.Marshal(¤tProgress)
|
|
}
|
|
|
|
// MarshalTableCheckpoints returns the checkpoint of a table in JSON format.
|
|
func MarshalTableCheckpoints(tableName string) ([]byte, error) {
|
|
if !progressEnabled.Load() {
|
|
return nil, errors.New("progress is not enabled")
|
|
}
|
|
return currentProgress.checkpoints.marshal(tableName)
|
|
}
|