Files
tidb/pkg/lightning/checkpoints/checkpoints_test.go
2025-04-25 09:57:58 +00:00

408 lines
11 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkpoints
import (
"context"
"path/filepath"
"testing"
"github.com/pingcap/tidb/pkg/lightning/checkpoints/checkpointspb"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/lightning/verification"
"github.com/stretchr/testify/require"
)
func TestMergeStatusCheckpoint(t *testing.T) {
cpd := NewTableCheckpointDiff()
m := StatusCheckpointMerger{EngineID: 0, Status: CheckpointStatusImported}
m.MergeInto(cpd)
require.Equal(t, &TableCheckpointDiff{
hasStatus: false,
engines: map[int32]engineCheckpointDiff{
0: {
hasStatus: true,
status: CheckpointStatusImported,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
},
}, cpd)
m = StatusCheckpointMerger{EngineID: -1, Status: CheckpointStatusLoaded}
m.MergeInto(cpd)
require.Equal(t, &TableCheckpointDiff{
hasStatus: false,
engines: map[int32]engineCheckpointDiff{
0: {
hasStatus: true,
status: CheckpointStatusImported,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
-1: {
hasStatus: true,
status: CheckpointStatusLoaded,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
},
}, cpd)
m = StatusCheckpointMerger{EngineID: WholeTableEngineID, Status: CheckpointStatusClosed}
m.MergeInto(cpd)
require.Equal(t, &TableCheckpointDiff{
hasStatus: true,
status: CheckpointStatusClosed,
engines: map[int32]engineCheckpointDiff{
0: {
hasStatus: true,
status: CheckpointStatusImported,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
-1: {
hasStatus: true,
status: CheckpointStatusLoaded,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
},
}, cpd)
m = StatusCheckpointMerger{EngineID: -1, Status: CheckpointStatusAllWritten}
m.MergeInto(cpd)
require.Equal(t, &TableCheckpointDiff{
hasStatus: true,
status: CheckpointStatusClosed,
engines: map[int32]engineCheckpointDiff{
0: {
hasStatus: true,
status: CheckpointStatusImported,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
-1: {
hasStatus: true,
status: CheckpointStatusAllWritten,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
},
}, cpd)
}
func TestMergeInvalidStatusCheckpoint(t *testing.T) {
cpd := NewTableCheckpointDiff()
m := StatusCheckpointMerger{EngineID: 0, Status: CheckpointStatusLoaded}
m.MergeInto(cpd)
m = StatusCheckpointMerger{EngineID: -1, Status: CheckpointStatusAllWritten}
m.SetInvalid()
m.MergeInto(cpd)
require.Equal(t, &TableCheckpointDiff{
hasStatus: true,
status: CheckpointStatusAllWritten / 10,
engines: map[int32]engineCheckpointDiff{
0: {
hasStatus: true,
status: CheckpointStatusLoaded,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
-1: {
hasStatus: true,
status: CheckpointStatusAllWritten / 10,
chunks: make(map[ChunkCheckpointKey]chunkCheckpointDiff),
},
},
}, cpd)
}
func TestMergeChunkCheckpoint(t *testing.T) {
cpd := NewTableCheckpointDiff()
key := ChunkCheckpointKey{Path: "/tmp/path/1.sql", Offset: 0}
m := ChunkCheckpointMerger{
EngineID: 2,
Key: key,
Checksum: verification.MakeKVChecksum(700, 15, 1234567890),
Pos: 1055,
RealPos: 1053,
RowID: 31,
}
m.MergeInto(cpd)
require.Equal(t, &TableCheckpointDiff{
engines: map[int32]engineCheckpointDiff{
2: {
chunks: map[ChunkCheckpointKey]chunkCheckpointDiff{
key: {
pos: 1055,
realPos: 1053,
rowID: 31,
checksum: verification.MakeKVChecksum(700, 15, 1234567890),
},
},
},
},
}, cpd)
m = ChunkCheckpointMerger{
EngineID: 2,
Key: key,
Checksum: verification.MakeKVChecksum(800, 20, 1357924680),
Pos: 1080,
RealPos: 1070,
RowID: 42,
}
m.MergeInto(cpd)
require.Equal(t, &TableCheckpointDiff{
engines: map[int32]engineCheckpointDiff{
2: {
chunks: map[ChunkCheckpointKey]chunkCheckpointDiff{
key: {
pos: 1080,
realPos: 1070,
rowID: 42,
checksum: verification.MakeKVChecksum(800, 20, 1357924680),
},
},
},
},
}, cpd)
}
func TestRebaseCheckpoint(t *testing.T) {
cpd := NewTableCheckpointDiff()
m := RebaseCheckpointMerger{
AutoRandBase: 132861,
AutoIncrBase: 132862,
AutoRowIDBase: 132863,
}
m.MergeInto(cpd)
expected := &TableCheckpointDiff{
hasRebase: true,
autoRandBase: 132861,
autoIncrBase: 132862,
autoRowIDBase: 132863,
engines: make(map[int32]engineCheckpointDiff),
}
require.Equal(t, expected, cpd)
// shouldn't go backwards
m2 := RebaseCheckpointMerger{
AutoRandBase: 131,
AutoIncrBase: 132,
AutoRowIDBase: 133,
}
m2.MergeInto(cpd)
require.Equal(t, expected, cpd)
}
func TestApplyDiff(t *testing.T) {
cp := TableCheckpoint{
Status: CheckpointStatusLoaded,
AutoRandBase: 131,
AutoIncrBase: 132,
AutoRowIDBase: 133,
Engines: map[int32]*EngineCheckpoint{
-1: {
Status: CheckpointStatusLoaded,
},
0: {
Status: CheckpointStatusLoaded,
Chunks: []*ChunkCheckpoint{
{
Key: ChunkCheckpointKey{Path: "/tmp/01.sql"},
Chunk: mydump.Chunk{
Offset: 0,
RealOffset: 0,
EndOffset: 20000,
PrevRowIDMax: 0,
RowIDMax: 1000,
},
},
{
Key: ChunkCheckpointKey{Path: "/tmp/04.sql"},
Chunk: mydump.Chunk{
Offset: 0,
RealOffset: 0,
EndOffset: 15000,
PrevRowIDMax: 1000,
RowIDMax: 1300,
},
},
},
},
},
}
cpd := NewTableCheckpointDiff()
(&StatusCheckpointMerger{EngineID: -1, Status: CheckpointStatusImported}).MergeInto(cpd)
(&StatusCheckpointMerger{EngineID: WholeTableEngineID, Status: CheckpointStatusAllWritten}).MergeInto(cpd)
(&StatusCheckpointMerger{EngineID: 1234, Status: CheckpointStatusAnalyzeSkipped}).MergeInto(cpd)
(&RebaseCheckpointMerger{
AutoRandBase: 1131,
AutoIncrBase: 1132,
AutoRowIDBase: 1133,
}).MergeInto(cpd)
(&ChunkCheckpointMerger{
EngineID: 0,
Key: ChunkCheckpointKey{Path: "/tmp/01.sql"},
Checksum: verification.MakeKVChecksum(3333, 4444, 5555),
Pos: 6666,
RealPos: 6565,
RowID: 777,
}).MergeInto(cpd)
(&ChunkCheckpointMerger{
EngineID: 5678,
Key: ChunkCheckpointKey{Path: "/tmp/04.sql"},
Pos: 9999,
RealPos: 9888,
RowID: 888,
}).MergeInto(cpd)
(&ChunkCheckpointMerger{
EngineID: 0,
Key: ChunkCheckpointKey{Path: "/tmp/03.sql"},
Pos: 3636,
RealPos: 3535,
RowID: 2222,
}).MergeInto(cpd)
(&ChunkCheckpointMerger{
EngineID: 0,
Key: ChunkCheckpointKey{Path: "/tmp/10.sql"},
Pos: 4949,
RealPos: 4848,
RowID: 444,
}).MergeInto(cpd)
cp.Apply(cpd)
require.Equal(t, TableCheckpoint{
Status: CheckpointStatusAllWritten,
AutoRandBase: 1131,
AutoIncrBase: 1132,
AutoRowIDBase: 1133,
Engines: map[int32]*EngineCheckpoint{
-1: {
Status: CheckpointStatusImported,
},
0: {
Status: CheckpointStatusLoaded,
Chunks: []*ChunkCheckpoint{
{
Key: ChunkCheckpointKey{Path: "/tmp/01.sql"},
Chunk: mydump.Chunk{
Offset: 6666,
RealOffset: 6565,
EndOffset: 20000,
PrevRowIDMax: 777,
RowIDMax: 1000,
},
Checksum: verification.MakeKVChecksum(3333, 4444, 5555),
},
{
Key: ChunkCheckpointKey{Path: "/tmp/04.sql"},
Chunk: mydump.Chunk{
Offset: 0,
RealOffset: 0,
EndOffset: 15000,
PrevRowIDMax: 1000,
RowIDMax: 1300,
},
},
},
},
},
}, cp)
}
func TestCheckpointMarshallUnmarshall(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "filecheckpoint")
ctx := context.Background()
fileChkp, err := NewFileCheckpointsDB(ctx, path)
require.NoError(t, err)
fileChkp.checkpoints.Checkpoints["a"] = &checkpointspb.TableCheckpointModel{
Status: uint32(CheckpointStatusLoaded),
Engines: map[int32]*checkpointspb.EngineCheckpointModel{},
}
err = fileChkp.Close()
require.NoError(t, err)
fileChkp2, err := NewFileCheckpointsDB(ctx, path)
require.NoError(t, err)
// if not recover empty map explicitly, it will become nil
require.NotNil(t, fileChkp2.checkpoints.Checkpoints["a"].Engines)
}
func TestSeparateCompletePath(t *testing.T) {
testCases := []struct {
complete string
expectFileName string
expectPath string
}{
{"", "", ""},
{"/a/", "", "/a/"},
{"test.log", "test.log", "."},
{"./test.log", "test.log", "."},
{"./tmp/test.log", "test.log", "tmp"},
{"tmp/test.log", "test.log", "tmp"},
{"/test.log", "test.log", "/"},
{"/tmp/test.log", "test.log", "/tmp"},
{"/a%3F%2Fbc/a%3F%2Fbc.log", "a%3F%2Fbc.log", "/a%3F%2Fbc"},
{"/a??bc/a??bc.log", "a??bc.log", "/a??bc"},
{"/t-%C3%8B%21s%60t/t-%C3%8B%21s%60t.log", "t-%C3%8B%21s%60t.log", "/t-%C3%8B%21s%60t"},
{"/t-Ë!s`t/t-Ë!s`t.log", "t-Ë!s`t.log", "/t-Ë!s`t"},
{"file:///a%3F%2Fbc/a%3F%2Fcd.log", "cd.log", "file:///a%3F/bc/a%3F"},
{"file:///a?/bc/a?/cd.log", "a", "file:///?/bc/a?/cd.log"},
{"file:///a/?/bc/a?/cd.log", "", "file:///a/?/bc/a?/cd.log"},
{"file:///t-%C3%8B%21s%60t/t-%C3%8B%21s%60t.log", "t-Ë!s`t.log", "file:///t-%C3%8B%21s%60t"},
{"file:///t-Ë!s`t/t-Ë!s`t.log", "t-Ë!s`t.log", "file:///t-%C3%8B%21s%60t"},
{"s3://bucket2/test.log", "test.log", "s3://bucket2/"},
{"s3://bucket2/test/test.log", "test.log", "s3://bucket2/test"},
{"s3://bucket3/prefix/test.log?access-key=NXN7IPIOSAAKDEEOLMAF&secret-access-key=nREY/7Dt+PaIbYKrKlEEMMF/ExCiJEX=XMLPUANw", "test.log",
"s3://bucket3/prefix?access-key=NXN7IPIOSAAKDEEOLMAF&secret-access-key=nREY/7Dt%2BPaIbYKrKlEEMMF/ExCiJEX=XMLPUANw"},
}
for _, testCase := range testCases {
fileName, newPath, err := separateCompletePath(testCase.complete)
require.NoError(t, err)
require.Equal(t, testCase.expectFileName, fileName)
require.Equal(t, testCase.expectPath, newPath)
}
}
func TestTableCheckpointApplyBases(t *testing.T) {
tblCP := TableCheckpoint{
AutoRowIDBase: 11,
AutoIncrBase: 12,
AutoRandBase: 13,
}
tblCP.Apply(&TableCheckpointDiff{
hasRebase: true,
autoRowIDBase: 1,
autoIncrBase: 2,
autoRandBase: 3,
})
require.EqualValues(t, 11, tblCP.AutoRowIDBase)
require.EqualValues(t, 12, tblCP.AutoIncrBase)
require.EqualValues(t, 13, tblCP.AutoRandBase)
}