Files
tidb/lightning/pkg/server/checkpoint_control.go

316 lines
10 KiB
Go

// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"os"
"path/filepath"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/lightning/pkg/importer"
"github.com/pingcap/tidb/lightning/pkg/importinto"
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/log"
"go.uber.org/zap"
)
// CheckpointControl defines the interface for checkpoint management operations.
type CheckpointControl interface {
Remove(ctx context.Context, tableName string) error
IgnoreError(ctx context.Context, tableName string) error
DestroyError(ctx context.Context, tableName string) error
Dump(ctx context.Context, dumpFolder string) error
GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
}
// NewCheckpointControl creates a new CheckpointControl based on the configuration.
func NewCheckpointControl(cfg *config.Config, tls *common.TLS) (CheckpointControl, error) {
if cfg.TikvImporter.Backend == config.BackendImportInto {
return NewImportIntoCheckpointControl(cfg, tls)
}
return NewLegacyCheckpointControl(cfg, tls)
}
// LegacyCheckpointControl implements CheckpointControl for legacy checkpoints.
type LegacyCheckpointControl struct {
cfg *config.Config
tls *common.TLS
}
// NewLegacyCheckpointControl creates a new LegacyCheckpointControl.
func NewLegacyCheckpointControl(cfg *config.Config, tls *common.TLS) (*LegacyCheckpointControl, error) {
return &LegacyCheckpointControl{cfg: cfg, tls: tls}, nil
}
func (c *LegacyCheckpointControl) withDB(ctx context.Context, fn func(checkpoints.DB) error) error {
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, c.cfg)
if err != nil {
return errors.Trace(err)
}
defer func() {
if closeErr := cpdb.Close(); closeErr != nil {
log.L().Warn("failed to close checkpoint db", zap.Error(closeErr))
}
}()
return fn(cpdb)
}
// Remove drops checkpoints as well as any meta leftovers for legacy backend tables.
func (c *LegacyCheckpointControl) Remove(ctx context.Context, tableName string) error {
return c.withDB(ctx, func(cpdb checkpoints.DB) error {
// try to remove the metadata first.
taskCp, err := cpdb.TaskCheckpoint(ctx)
if err != nil {
return errors.Trace(err)
}
// a empty id means this task is not inited, we needn't further check metas.
if taskCp != nil && taskCp.TaskID != 0 {
// try to clean up table metas if exists
if err = CleanupMetas(ctx, c.cfg, tableName); err != nil {
return errors.Trace(err)
}
}
return errors.Trace(cpdb.RemoveCheckpoint(ctx, tableName))
})
}
// IgnoreError marks failed checkpoints as pending so they can be resumed.
func (c *LegacyCheckpointControl) IgnoreError(ctx context.Context, tableName string) error {
return c.withDB(ctx, func(cpdb checkpoints.DB) error {
return errors.Trace(cpdb.IgnoreErrorCheckpoint(ctx, tableName))
})
}
// DestroyError removes failed checkpoints and associated temporary data.
func (c *LegacyCheckpointControl) DestroyError(ctx context.Context, tableName string) error {
return c.withDB(ctx, func(cpdb checkpoints.DB) error {
target, err := importer.NewTiDBManager(ctx, c.cfg.TiDB, c.tls)
if err != nil {
return errors.Trace(err)
}
defer target.Close()
targetTables, err := cpdb.DestroyErrorCheckpoint(ctx, tableName)
if err != nil {
return errors.Trace(err)
}
var errs []error
for _, table := range targetTables {
log.L().Info("Dropping table", zap.String("table", table.TableName))
err := target.DropTable(ctx, table.TableName)
if err != nil {
log.L().Error("Encountered error while dropping table", zap.Error(err))
errs = append(errs, err)
}
}
if c.cfg.TikvImporter.Backend == config.BackendLocal {
for _, table := range targetTables {
for engineID := table.MinEngineID; engineID <= table.MaxEngineID; engineID++ {
log.L().Info("Closing and cleaning up engine", zap.String("table", table.TableName), zap.Int32("engineID", engineID))
_, eID := backend.MakeUUID(table.TableName, int64(engineID))
engine := local.Engine{UUID: eID}
err := engine.Cleanup(c.cfg.TikvImporter.SortedKVDir)
if err != nil {
log.L().Error("Encountered error while cleanup engine", zap.Error(err))
errs = append(errs, err)
}
}
}
}
// try clean up metas
if len(errs) == 0 {
errs = append(errs, CleanupMetas(ctx, c.cfg, tableName))
}
return errors.Trace(errors.Join(errs...))
})
}
// Dump exports checkpoint information to CSV files for inspection.
func (c *LegacyCheckpointControl) Dump(ctx context.Context, dumpFolder string) error {
return c.withDB(ctx, func(cpdb checkpoints.DB) error {
if err := os.MkdirAll(dumpFolder, 0o750); err != nil {
return errors.Trace(err)
}
tablesFileName := filepath.Join(dumpFolder, "tables.csv")
tablesFile, err := os.Create(tablesFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", tablesFileName)
}
defer tablesFile.Close()
enginesFileName := filepath.Join(dumpFolder, "engines.csv")
enginesFile, err := os.Create(enginesFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", enginesFileName)
}
defer enginesFile.Close()
chunksFileName := filepath.Join(dumpFolder, "chunks.csv")
chunksFile, err := os.Create(chunksFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", chunksFileName)
}
defer chunksFile.Close()
if err := cpdb.DumpTables(ctx, tablesFile); err != nil {
return errors.Trace(err)
}
if err := cpdb.DumpEngines(ctx, enginesFile); err != nil {
return errors.Trace(err)
}
if err := cpdb.DumpChunks(ctx, chunksFile); err != nil {
return errors.Trace(err)
}
return nil
})
}
// GetLocalStoringTables returns engines that are still being persisted locally.
func (c *LegacyCheckpointControl) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error) {
var result map[string][]int32
err := c.withDB(ctx, func(cpdb checkpoints.DB) error {
var err error
result, err = cpdb.GetLocalStoringTables(ctx)
return err
})
return result, err
}
// ImportIntoCheckpointControl implements CheckpointControl for import into checkpoints.
type ImportIntoCheckpointControl struct {
cfg *config.Config
mgr importinto.CheckpointManager
tls *common.TLS
}
func (c *ImportIntoCheckpointControl) closeManager() {
if c.mgr == nil {
return
}
if err := c.mgr.Close(); err != nil {
log.L().Warn("failed to close import-into checkpoint manager", zap.Error(err))
}
}
// NewImportIntoCheckpointControl creates a new ImportIntoCheckpointControl.
func NewImportIntoCheckpointControl(cfg *config.Config, tls *common.TLS) (*ImportIntoCheckpointControl, error) {
mgr, err := importinto.NewCheckpointManager(cfg)
if err != nil {
return nil, errors.Trace(err)
}
return &ImportIntoCheckpointControl{cfg: cfg, mgr: mgr, tls: tls}, nil
}
// Remove deletes checkpoints in the import-into backend.
func (c *ImportIntoCheckpointControl) Remove(ctx context.Context, tableName string) error {
defer c.closeManager()
return c.mgr.Remove(ctx, tableName)
}
// IgnoreError resets failed checkpoints to allow resuming the import.
func (c *ImportIntoCheckpointControl) IgnoreError(ctx context.Context, tableName string) error {
defer c.closeManager()
return c.mgr.IgnoreError(ctx, tableName)
}
// DestroyError removes failed checkpoints completely.
func (c *ImportIntoCheckpointControl) DestroyError(ctx context.Context, tableName string) error {
defer c.closeManager()
target, err := importer.NewTiDBManager(ctx, c.cfg.TiDB, c.tls)
if err != nil {
return errors.Trace(err)
}
defer target.Close()
destroyed, err := c.mgr.DestroyError(ctx, tableName)
if err != nil {
return errors.Trace(err)
}
var errs []error
for _, cp := range destroyed {
log.L().Info("Dropping table", zap.String("table", cp.TableName))
if err := target.DropTable(ctx, cp.TableName); err != nil {
log.L().Error("Encountered error while dropping table", zap.Error(err))
errs = append(errs, err)
}
}
// try clean up metas
if len(errs) == 0 {
errs = append(errs, CleanupMetas(ctx, c.cfg, tableName))
}
return errors.Trace(errors.Join(errs...))
}
// Dump exports checkpoint data for import-into backend tables.
func (c *ImportIntoCheckpointControl) Dump(ctx context.Context, dumpFolder string) error {
defer c.closeManager()
if err := os.MkdirAll(dumpFolder, 0o750); err != nil {
return errors.Trace(err)
}
tablesFileName := filepath.Join(dumpFolder, "tables.csv")
tablesFile, err := os.Create(tablesFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", tablesFileName)
}
defer tablesFile.Close()
// We also create engines and chunks files to be consistent, even if empty
enginesFileName := filepath.Join(dumpFolder, "engines.csv")
enginesFile, err := os.Create(enginesFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", enginesFileName)
}
defer enginesFile.Close()
chunksFileName := filepath.Join(dumpFolder, "chunks.csv")
chunksFile, err := os.Create(chunksFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", chunksFileName)
}
defer chunksFile.Close()
if err := c.mgr.DumpTables(ctx, tablesFile); err != nil {
return errors.Trace(err)
}
if err := c.mgr.DumpEngines(ctx, enginesFile); err != nil {
return errors.Trace(err)
}
if err := c.mgr.DumpChunks(ctx, chunksFile); err != nil {
return errors.Trace(err)
}
return nil
}
// GetLocalStoringTables returns nil because import-into does not keep local engines.
func (*ImportIntoCheckpointControl) GetLocalStoringTables(context.Context) (map[string][]int32, error) {
return nil, nil
}