Files
tidb/br/pkg/stream/stream_misc_test.go

174 lines
5.4 KiB
Go

// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
package stream_test
import (
"context"
"slices"
"testing"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/stretchr/testify/require"
)
func TestGetCheckpointOfTask(t *testing.T) {
task := stream.TaskStatus{
Info: backuppb.StreamBackupTaskInfo{
StartTs: 8,
},
Checkpoints: []streamhelper.Checkpoint{
{
ID: 1,
TS: 10,
},
{
ID: 2,
TS: 8,
},
{
ID: 6,
TS: 12,
},
},
}
require.Equal(t, task.GetMinStoreCheckpoint().TS, uint64(8), "progress = %v", task.Checkpoints)
task.Checkpoints[1].TS = 18
require.Equal(t, task.GetMinStoreCheckpoint().TS, uint64(10))
task.Checkpoints[0].TS = 14
require.Equal(t, task.GetMinStoreCheckpoint().TS, uint64(12), "progress = %v", task.Checkpoints)
}
func TestMetadataHelperReadFile(t *testing.T) {
ctx := context.Background()
tmpdir := t.TempDir()
s, err := objstore.NewLocalStorage(tmpdir)
require.Nil(t, err)
helper := stream.NewMetadataHelper()
filename1 := "full_data"
filename2 := "misc_data"
data1 := []byte("Test MetadataHelper. The data contains bare data (or maybe compressed data).")
// data2 is the compressed content from data1
data2 := []byte{0x28, 0xb5, 0x2f, 0xfd, 0x0, 0x58, 0x15, 0x2, 0x0, 0x52, 0x44, 0xe, 0x14, 0xb0, 0x37, 0x1, 0xe7,
0xa4, 0x1c, 0xd9, 0x3d, 0xc7, 0xee, 0xe7, 0x3e, 0x15, 0x14, 0x80, 0xdc, 0x25, 0x14, 0xc, 0x60, 0x24, 0x70,
0xda, 0x6a, 0x47, 0xfc, 0x2d, 0xa4, 0x4e, 0x6f, 0xe, 0xa3, 0x9e, 0x2, 0xce, 0xa6, 0x98, 0xa, 0x67, 0x52,
0xa7, 0x4e, 0x5b, 0x4f, 0x94, 0x92, 0xfb, 0x32, 0x2e, 0x38, 0x6d, 0xce, 0xf, 0x4d, 0x7c, 0x9, 0x1, 0x0,
0xb2, 0x6c, 0x96, 0x1}
// write data at first
err = s.WriteFile(ctx, filename1, data1)
require.NoError(t, err)
err = s.WriteFile(ctx, filename2, slices.Concat(data1, data2))
require.NoError(t, err)
helper.InitCacheEntry(filename2, 2)
get_data, err := helper.ReadFile(ctx, filename1, 0, 0, backuppb.CompressionType_UNKNOWN, s, nil)
require.NoError(t, err)
require.Equal(t, data1, get_data)
get_data, err = helper.ReadFile(ctx, filename2, 0, uint64(len(data1)), backuppb.CompressionType_UNKNOWN, s, nil)
require.NoError(t, err)
require.Equal(t, data1, get_data)
get_data, err = helper.ReadFile(ctx, filename2, uint64(len(data1)), uint64(len(data2)),
backuppb.CompressionType_ZSTD, s, nil)
require.NoError(t, err)
require.Equal(t, data1, get_data)
}
func TestFilterPath(t *testing.T) {
type args struct {
path string
shiftStartTS uint64
restoreTS uint64
}
tests := []struct {
name string
args args
expected string
}{
{
name: "normal: minDefaultTs < minTs",
args: args{
path: "v1/backupmeta/000000000000000a-0000000000000005-000000000000000a-000000000000001e.meta", // flush=10, minDefault=5, min=10, max=30
shiftStartTS: 5,
restoreTS: 10,
},
expected: "v1/backupmeta/000000000000000a-0000000000000005-000000000000000a-000000000000001e.meta",
},
{
name: "normal: minDefaultTs == minTs",
args: args{
path: "v1/backupmeta/000000000000000a-000000000000000a-000000000000000a-000000000000001e.meta", // all = 10
shiftStartTS: 5,
restoreTS: 10,
},
expected: "v1/backupmeta/000000000000000a-000000000000000a-000000000000000a-000000000000001e.meta",
},
{
name: "fallback: minDefaultTs == 0",
args: args{
path: "v1/backupmeta/000000000000000a-0000000000000000-000000000000000a-000000000000001e.meta", // minDefault=0, min=10
shiftStartTS: 5,
restoreTS: 10,
},
expected: "v1/backupmeta/000000000000000a-0000000000000000-000000000000000a-000000000000001e.meta",
},
{
name: "fallback: minDefaultTs > minTs, should fallback to minTs",
args: args{
path: "v1/backupmeta/000000000000000a-0000000000000014-000000000000000a-000000000000001e.meta", // minDefault=20, min=10
shiftStartTS: 5,
restoreTS: 11, // fallback to 10, 11>10
},
expected: "v1/backupmeta/000000000000000a-0000000000000014-000000000000000a-000000000000001e.meta",
},
{
name: "restoreTS < fallback minTs, should be filtered",
args: args{
path: "v1/backupmeta/000000000000000a-0000000000000014-000000000000000a-000000000000001e.meta", // fallback to min=10
shiftStartTS: 5,
restoreTS: 9, // 9 < 10, should be filtered
},
expected: "",
},
{
name: "maxTs < shiftStartTS, should be filtered",
args: args{
path: "v1/backupmeta/000000000000000a-0000000000000005-000000000000000a-0000000000000004.meta", // max=4 < 5
shiftStartTS: 5,
restoreTS: 10,
},
expected: "",
},
{
name: "invalid: minDefaultTs > minTs, fallback, but restoreTS < fallback",
args: args{
path: "v1/backupmeta/000000000000000a-0000000000000014-000000000000000a-000000000000001e.meta",
shiftStartTS: 5,
restoreTS: 8, // fallback to 10, 8 < 10
},
expected: "",
},
{
name: "non-matching file name format, preserved for compatibility",
args: args{
path: "v1/backupmeta/unexpected_format.meta",
shiftStartTS: 10,
restoreTS: 10,
},
expected: "v1/backupmeta/unexpected_format.meta",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := stream.FilterPathByTs(tt.args.path, tt.args.shiftStartTS, tt.args.restoreTS)
require.Equal(t, tt.expected, got)
})
}
}