Files
tidb/lightning/pkg/server/checkpoint_control_test.go

438 lines
12 KiB
Go

// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"errors"
"path/filepath"
"testing"
mockimport "github.com/pingcap/tidb/lightning/pkg/importinto/mock"
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)
func TestImportIntoCheckpointControl(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockMgr := mockimport.NewMockCheckpointManager(ctrl)
cfg := config.NewConfig()
cfg.TikvImporter.Backend = config.BackendImportInto
control := &ImportIntoCheckpointControl{
cfg: cfg,
mgr: mockMgr,
}
ctx := context.Background()
tests := []struct {
name string
operation func() error
setup func()
wantErr bool
}{
{
name: "Remove",
operation: func() error {
return control.Remove(ctx, "db.t1")
},
setup: func() {
mockMgr.EXPECT().Remove(ctx, "db.t1").Return(nil)
mockMgr.EXPECT().Close().Return(nil)
},
wantErr: false,
},
{
name: "IgnoreError",
operation: func() error {
return control.IgnoreError(ctx, "db.t1")
},
setup: func() {
mockMgr.EXPECT().IgnoreError(ctx, "db.t1").Return(nil)
mockMgr.EXPECT().Close().Return(nil)
},
wantErr: false,
},
{
name: "Dump",
operation: func() error {
tempDir := t.TempDir()
err := control.Dump(ctx, tempDir)
if err == nil {
require.FileExists(t, filepath.Join(tempDir, "tables.csv"))
require.FileExists(t, filepath.Join(tempDir, "engines.csv"))
require.FileExists(t, filepath.Join(tempDir, "chunks.csv"))
}
return err
},
setup: func() {
mockMgr.EXPECT().DumpTables(ctx, gomock.Any()).Return(nil)
mockMgr.EXPECT().DumpEngines(ctx, gomock.Any()).Return(nil)
mockMgr.EXPECT().DumpChunks(ctx, gomock.Any()).Return(nil)
mockMgr.EXPECT().Close().Return(nil)
},
wantErr: false,
},
{
name: "GetLocalStoringTables",
operation: func() error {
res, err := control.GetLocalStoringTables(ctx)
require.Nil(t, res)
return err
},
setup: func() {},
wantErr: false,
},
{
name: "RemoveError",
operation: func() error {
return control.Remove(ctx, "db.t1")
},
setup: func() {
mockMgr.EXPECT().Remove(ctx, "db.t1").Return(errors.New("remove error"))
mockMgr.EXPECT().Close().Return(nil)
},
wantErr: true,
},
{
name: "IgnoreErrorError",
operation: func() error {
return control.IgnoreError(ctx, "db.t1")
},
setup: func() {
mockMgr.EXPECT().IgnoreError(ctx, "db.t1").Return(errors.New("ignore error error"))
mockMgr.EXPECT().Close().Return(nil)
},
wantErr: true,
},
{
name: "DumpError",
operation: func() error {
tempDir := t.TempDir()
return control.Dump(ctx, tempDir)
},
setup: func() {
mockMgr.EXPECT().DumpTables(ctx, gomock.Any()).Return(errors.New("dump error"))
mockMgr.EXPECT().Close().Return(nil)
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.setup()
err := tt.operation()
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
// newTestConfig creates a config suitable for file checkpoint testing
func newTestConfig(t *testing.T) *config.Config {
dir := t.TempDir()
cfg := config.NewConfig()
cfg.Mydumper.SourceDir = "/data"
cfg.TaskID = 0
cfg.TiDB.Port = 4000
cfg.TiDB.PdAddr = "127.0.0.1:2379"
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.SortedKVDir = filepath.Join(dir, "sorted-kv")
cfg.Checkpoint.Enable = true
cfg.Checkpoint.Driver = config.CheckpointDriverFile
cfg.Checkpoint.DSN = filepath.Join(dir, "cp.pb")
return cfg
}
// setupFileCheckpointsDB creates a file-based checkpoint DB with test data
func setupFileCheckpointsDB(t *testing.T, cfg *config.Config) *checkpoints.FileCheckpointsDB {
ctx := context.Background()
cpdb, err := checkpoints.NewFileCheckpointsDB(ctx, cfg.Checkpoint.DSN)
require.NoError(t, err)
// Initialize with checkpoint data
err = cpdb.Initialize(ctx, cfg, map[string]*checkpoints.TidbDBInfo{
"db1": {
Name: "db1",
Tables: map[string]*checkpoints.TidbTableInfo{
"t1": {Name: "t1"},
"t2": {Name: "t2"},
},
},
"db2": {
Name: "db2",
Tables: map[string]*checkpoints.TidbTableInfo{
"t3": {Name: "t3"},
},
},
})
require.NoError(t, err)
// Insert engine checkpoints
err = cpdb.InsertEngineCheckpoints(ctx, "`db1`.`t2`", map[int32]*checkpoints.EngineCheckpoint{
0: {
Status: checkpoints.CheckpointStatusLoaded,
Chunks: []*checkpoints.ChunkCheckpoint{{
Key: checkpoints.ChunkCheckpointKey{
Path: "/tmp/path/1.sql",
Offset: 0,
},
FileMeta: mydump.SourceFileMeta{
Path: "/tmp/path/1.sql",
Type: mydump.SourceTypeSQL,
FileSize: 12345,
},
Chunk: mydump.Chunk{
Offset: 12,
RealOffset: 10,
EndOffset: 102400,
PrevRowIDMax: 1,
RowIDMax: 5000,
},
}},
},
-1: {
Status: checkpoints.CheckpointStatusLoaded,
Chunks: nil,
},
})
require.NoError(t, err)
err = cpdb.InsertEngineCheckpoints(ctx, "`db2`.`t3`", map[int32]*checkpoints.EngineCheckpoint{
-1: {
Status: checkpoints.CheckpointStatusLoaded,
Chunks: nil,
},
})
require.NoError(t, err)
t.Cleanup(func() {
_ = cpdb.Close()
})
return cpdb
}
// setErrorStatus marks checkpoints as having errors for testing
func setErrorStatus(cpdb *checkpoints.FileCheckpointsDB) {
cpd := checkpoints.NewTableCheckpointDiff()
scm := checkpoints.StatusCheckpointMerger{
EngineID: -1,
Status: checkpoints.CheckpointStatusAllWritten,
}
scm.SetInvalid()
scm.MergeInto(cpd)
cpdb.Update(context.Background(), map[string]*checkpoints.TableCheckpointDiff{
"`db1`.`t2`": cpd,
"`db2`.`t3`": cpd,
})
}
func TestLegacyCheckpointControl(t *testing.T) {
tests := []struct {
name string
setup func(t *testing.T, cpdb *checkpoints.FileCheckpointsDB)
operation func(ctx context.Context, ctl *LegacyCheckpointControl, cfg *config.Config) error
verify func(t *testing.T, ctx context.Context, cfg *config.Config)
wantErr bool
errMsg string
}{
{
name: "Remove single checkpoint",
setup: func(t *testing.T, cpdb *checkpoints.FileCheckpointsDB) {},
operation: func(ctx context.Context, ctl *LegacyCheckpointControl, cfg *config.Config) error {
return ctl.Remove(ctx, "`db1`.`t2`")
},
verify: func(t *testing.T, ctx context.Context, cfg *config.Config) {
cpdb, err := checkpoints.NewFileCheckpointsDB(ctx, cfg.Checkpoint.DSN)
require.NoError(t, err)
defer cpdb.Close()
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.Nil(t, cp)
require.Error(t, err)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.NotNil(t, cp)
},
},
{
name: "Remove all checkpoints",
setup: func(t *testing.T, cpdb *checkpoints.FileCheckpointsDB) {},
operation: func(ctx context.Context, ctl *LegacyCheckpointControl, cfg *config.Config) error {
return ctl.Remove(ctx, "all")
},
verify: func(t *testing.T, ctx context.Context, cfg *config.Config) {
cpdb, err := checkpoints.NewFileCheckpointsDB(ctx, cfg.Checkpoint.DSN)
require.NoError(t, err)
defer cpdb.Close()
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.Nil(t, cp)
require.Error(t, err)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.Nil(t, cp)
require.Error(t, err)
},
},
{
name: "IgnoreError single checkpoint",
setup: func(t *testing.T, cpdb *checkpoints.FileCheckpointsDB) {
setErrorStatus(cpdb)
},
operation: func(ctx context.Context, ctl *LegacyCheckpointControl, cfg *config.Config) error {
return ctl.IgnoreError(ctx, "`db1`.`t2`")
},
verify: func(t *testing.T, ctx context.Context, cfg *config.Config) {
cpdb, err := checkpoints.NewFileCheckpointsDB(ctx, cfg.Checkpoint.DSN)
require.NoError(t, err)
defer cpdb.Close()
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusLoaded, cp.Status)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusAllWritten/10, cp.Status)
},
},
{
name: "IgnoreError all checkpoints",
setup: func(t *testing.T, cpdb *checkpoints.FileCheckpointsDB) {
setErrorStatus(cpdb)
},
operation: func(ctx context.Context, ctl *LegacyCheckpointControl, cfg *config.Config) error {
return ctl.IgnoreError(ctx, "all")
},
verify: func(t *testing.T, ctx context.Context, cfg *config.Config) {
cpdb, err := checkpoints.NewFileCheckpointsDB(ctx, cfg.Checkpoint.DSN)
require.NoError(t, err)
defer cpdb.Close()
cp, err := cpdb.Get(ctx, "`db1`.`t2`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusLoaded, cp.Status)
cp, err = cpdb.Get(ctx, "`db2`.`t3`")
require.NoError(t, err)
require.Equal(t, checkpoints.CheckpointStatusLoaded, cp.Status)
},
},
{
name: "Dump not supported for file checkpoint",
setup: func(t *testing.T, cpdb *checkpoints.FileCheckpointsDB) {},
operation: func(ctx context.Context, ctl *LegacyCheckpointControl, cfg *config.Config) error {
dumpDir := filepath.Join(t.TempDir(), "dump")
return ctl.Dump(ctx, dumpDir)
},
verify: func(t *testing.T, ctx context.Context, cfg *config.Config) {},
wantErr: true,
errMsg: "not unsupported",
},
{
name: "GetLocalStoringTables with partial progress",
setup: func(t *testing.T, cpdb *checkpoints.FileCheckpointsDB) {
cpd := checkpoints.NewTableCheckpointDiff()
ccm := checkpoints.ChunkCheckpointMerger{
EngineID: 0,
Key: checkpoints.ChunkCheckpointKey{Path: "/tmp/path/1.sql", Offset: 0},
Pos: 100,
RealPos: 100,
RowID: 50,
}
ccm.MergeInto(cpd)
cpdb.Update(context.Background(), map[string]*checkpoints.TableCheckpointDiff{"`db1`.`t2`": cpd})
},
operation: func(ctx context.Context, ctl *LegacyCheckpointControl, cfg *config.Config) error {
tables, err := ctl.GetLocalStoringTables(ctx)
if err != nil {
return err
}
require.Contains(t, tables, "`db1`.`t2`")
require.Contains(t, tables["`db1`.`t2`"], int32(0))
return nil
},
verify: func(t *testing.T, ctx context.Context, cfg *config.Config) {},
},
{
name: "GetLocalStoringTables empty when imported",
setup: func(t *testing.T, cpdb *checkpoints.FileCheckpointsDB) {
cpd := checkpoints.NewTableCheckpointDiff()
scm := checkpoints.StatusCheckpointMerger{
EngineID: 0,
Status: checkpoints.CheckpointStatusImported,
}
scm.MergeInto(cpd)
cpdb.Update(context.Background(), map[string]*checkpoints.TableCheckpointDiff{"`db1`.`t2`": cpd})
},
operation: func(ctx context.Context, ctl *LegacyCheckpointControl, cfg *config.Config) error {
tables, err := ctl.GetLocalStoringTables(ctx)
if err != nil {
return err
}
require.Empty(t, tables)
return nil
},
verify: func(t *testing.T, ctx context.Context, cfg *config.Config) {},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
cfg := newTestConfig(t)
cpdb := setupFileCheckpointsDB(t, cfg)
tt.setup(t, cpdb)
ctl, err := NewLegacyCheckpointControl(cfg, nil)
require.NoError(t, err)
err = tt.operation(ctx, ctl, cfg)
if tt.wantErr {
require.Error(t, err)
if tt.errMsg != "" {
require.Contains(t, err.Error(), tt.errMsg)
}
} else {
require.NoError(t, err)
tt.verify(t, ctx, cfg)
}
})
}
}
func TestNewCheckpointControl_LegacyBackend(t *testing.T) {
cfg := newTestConfig(t)
cfg.TikvImporter.Backend = config.BackendLocal
ctl, err := NewCheckpointControl(cfg, nil)
require.NoError(t, err)
require.IsType(t, &LegacyCheckpointControl{}, ctl)
}