307 lines
8.5 KiB
Go
307 lines
8.5 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package external
|
|
|
|
import (
|
|
"context"
|
|
goerrors "errors"
|
|
"io"
|
|
"net/http/httptest"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/johannesboyne/gofakes3"
|
|
"github.com/johannesboyne/gofakes3/backend/s3mem"
|
|
"github.com/pingcap/errors"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/lightning/membuf"
|
|
"github.com/pingcap/tidb/pkg/objstore"
|
|
"github.com/pingcap/tidb/pkg/objstore/objectio"
|
|
"github.com/pingcap/tidb/pkg/objstore/s3like"
|
|
"github.com/pingcap/tidb/pkg/objstore/s3store"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/exp/rand"
|
|
)
|
|
|
|
// mockExtStore is only used for test.
|
|
type mockExtStore struct {
|
|
src []byte
|
|
idx uint64
|
|
}
|
|
|
|
func (s *mockExtStore) Read(p []byte) (n int, err error) {
|
|
// Read from src to p.
|
|
if s.idx >= uint64(len(s.src)) {
|
|
return 0, io.EOF
|
|
}
|
|
n = copy(p, s.src[s.idx:])
|
|
s.idx += uint64(n)
|
|
return n, nil
|
|
}
|
|
|
|
func (s *mockExtStore) Seek(_ int64, _ int) (int64, error) {
|
|
return 0, errors.Errorf("unsupported operation")
|
|
}
|
|
|
|
func (*mockExtStore) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func (s *mockExtStore) GetFileSize() (int64, error) {
|
|
return int64(len(s.src)), nil
|
|
}
|
|
|
|
func TestByteReader(t *testing.T) {
|
|
st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix")
|
|
defer clean()
|
|
|
|
// Prepare
|
|
err := st.WriteFile(context.Background(), "testfile", []byte("abcde"))
|
|
require.NoError(t, err)
|
|
|
|
newRsc := func() objectio.Reader {
|
|
rsc, err := st.Open(context.Background(), "testfile", nil)
|
|
require.NoError(t, err)
|
|
return rsc
|
|
}
|
|
|
|
// Test basic next() usage.
|
|
br, err := newByteReader(context.Background(), newRsc(), 3)
|
|
require.NoError(t, err)
|
|
n, bs := br.next(1)
|
|
require.Equal(t, 1, n)
|
|
require.Equal(t, [][]byte{{'a'}}, bs)
|
|
n, bs = br.next(2)
|
|
require.Equal(t, 2, n)
|
|
require.Equal(t, [][]byte{{'b', 'c'}}, bs)
|
|
require.NoError(t, br.Close())
|
|
|
|
// Test basic readNBytes() usage.
|
|
br, err = newByteReader(context.Background(), newRsc(), 3)
|
|
require.NoError(t, err)
|
|
x, err := br.readNBytes(2)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, len(x))
|
|
require.Equal(t, byte('a'), x[0])
|
|
require.Equal(t, byte('b'), x[1])
|
|
require.NoError(t, br.Close())
|
|
|
|
br, err = newByteReader(context.Background(), newRsc(), 3)
|
|
require.NoError(t, err)
|
|
x, err = br.readNBytes(5) // Read all the data.
|
|
require.NoError(t, err)
|
|
require.Equal(t, 5, len(x))
|
|
require.Equal(t, byte('e'), x[4])
|
|
_, err = br.readNBytes(1) // EOF
|
|
require.ErrorIs(t, err, io.EOF)
|
|
require.NoError(t, br.Close())
|
|
|
|
br, err = newByteReader(context.Background(), newRsc(), 3)
|
|
require.NoError(t, err)
|
|
_, err = br.readNBytes(7) // EOF
|
|
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
|
|
|
|
err = st.WriteFile(context.Background(), "testfile", []byte("abcdef"))
|
|
require.NoError(t, err)
|
|
|
|
ms := &mockExtStore{src: []byte("abcdef")}
|
|
br, err = newByteReader(context.Background(), ms, 2)
|
|
require.NoError(t, err)
|
|
x, err = br.readNBytes(3)
|
|
require.NoError(t, err)
|
|
// Pollute mockExtStore to verify if the slice is not affected.
|
|
copy(ms.src, "xyz")
|
|
require.Equal(t, 3, len(x))
|
|
require.Equal(t, byte('c'), x[2])
|
|
require.NoError(t, br.Close())
|
|
|
|
ms = &mockExtStore{src: []byte("abcdef")}
|
|
br, err = newByteReader(context.Background(), ms, 2)
|
|
require.NoError(t, err)
|
|
x, err = br.readNBytes(2)
|
|
require.NoError(t, err)
|
|
// Pollute mockExtStore to verify if the slice is not affected.
|
|
copy(ms.src, "xyz")
|
|
require.Equal(t, 2, len(x))
|
|
require.Equal(t, byte('b'), x[1])
|
|
require.NoError(t, br.Close())
|
|
}
|
|
|
|
func TestByteReaderAuxBuf(t *testing.T) {
|
|
ms := &mockExtStore{src: []byte("0123456789")}
|
|
br, err := newByteReader(context.Background(), ms, 1)
|
|
require.NoError(t, err)
|
|
y1, err := br.readNBytes(1)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("0"), y1)
|
|
y2, err := br.readNBytes(2)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("12"), y2)
|
|
|
|
y3, err := br.readNBytes(1)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("3"), y3)
|
|
y4, err := br.readNBytes(2)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("45"), y4)
|
|
}
|
|
|
|
func TestUnexpectedEOF(t *testing.T) {
|
|
st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix")
|
|
defer func() {
|
|
clean()
|
|
}()
|
|
|
|
// Prepare
|
|
err := st.WriteFile(context.Background(), "testfile", []byte("0123456789"))
|
|
require.NoError(t, err)
|
|
|
|
newRsc := func() objectio.Reader {
|
|
rsc, err := st.Open(context.Background(), "testfile", nil)
|
|
require.NoError(t, err)
|
|
return rsc
|
|
}
|
|
|
|
br, err := newByteReader(context.Background(), newRsc(), 3)
|
|
require.NoError(t, err)
|
|
_, err = br.readNBytes(100)
|
|
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
|
|
|
|
br, err = newByteReader(context.Background(), newRsc(), 3)
|
|
require.NoError(t, err)
|
|
_, err = br.readNBytes(100)
|
|
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
|
|
}
|
|
|
|
func TestEmptyContent(t *testing.T) {
|
|
ms := &mockExtStore{src: []byte{}}
|
|
_, err := newByteReader(context.Background(), ms, 100)
|
|
require.ErrorIs(t, err, io.EOF)
|
|
|
|
st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix")
|
|
defer clean()
|
|
|
|
// Prepare
|
|
err = st.WriteFile(context.Background(), "testfile", []byte(""))
|
|
require.NoError(t, err)
|
|
|
|
newRsc := func() objectio.Reader {
|
|
rsc, err := st.Open(context.Background(), "testfile", nil)
|
|
require.NoError(t, err)
|
|
return rsc
|
|
}
|
|
_, err = newByteReader(context.Background(), newRsc(), 100)
|
|
require.ErrorIs(t, err, io.EOF)
|
|
}
|
|
|
|
func TestSwitchMode(t *testing.T) {
|
|
seed := time.Now().Unix()
|
|
rand.Seed(uint64(seed))
|
|
t.Logf("seed: %d", seed)
|
|
st := objstore.NewMemStorage()
|
|
// Prepare
|
|
var kvAndStat [2]string
|
|
ctx := context.Background()
|
|
writer := NewWriterBuilder().
|
|
SetPropSizeDistance(100).
|
|
SetPropKeysDistance(2).
|
|
SetOnCloseFunc(func(summary *WriterSummary) { kvAndStat = summary.MultipleFilesStats[0].Filenames[0] }).
|
|
BuildOneFile(st, "/test", "0")
|
|
|
|
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
|
|
|
|
kvCnt := 1000000
|
|
kvs := make([]common.KvPair, kvCnt)
|
|
for i := range kvCnt {
|
|
randLen := rand.Intn(10) + 1
|
|
kvs[i].Key = make([]byte, randLen)
|
|
_, err := rand.Read(kvs[i].Key)
|
|
require.NoError(t, err)
|
|
randLen = rand.Intn(10) + 1
|
|
kvs[i].Val = make([]byte, randLen)
|
|
_, err = rand.Read(kvs[i].Val)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
for _, item := range kvs {
|
|
err := writer.WriteRow(ctx, item.Key, item.Val)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
err := writer.Close(ctx)
|
|
require.NoError(t, err)
|
|
pool := membuf.NewPool()
|
|
ConcurrentReaderBufferSizePerConc = rand.Intn(100) + 1
|
|
kvReader, err := NewKVReader(context.Background(), kvAndStat[0], st, 0, 64*1024)
|
|
require.NoError(t, err)
|
|
kvReader.byteReader.enableConcurrentRead(st, kvAndStat[0], 100, ConcurrentReaderBufferSizePerConc, pool.NewBuffer())
|
|
modeUseCon := false
|
|
i := 0
|
|
for {
|
|
if rand.Intn(5) == 0 {
|
|
if modeUseCon {
|
|
kvReader.byteReader.switchConcurrentMode(false)
|
|
modeUseCon = false
|
|
} else {
|
|
kvReader.byteReader.switchConcurrentMode(true)
|
|
modeUseCon = true
|
|
}
|
|
}
|
|
key, val, err := kvReader.NextKV()
|
|
if goerrors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
require.NoError(t, err)
|
|
require.Equal(t, kvs[i].Key, key)
|
|
require.Equal(t, kvs[i].Val, val)
|
|
i++
|
|
}
|
|
}
|
|
|
|
// NewS3WithBucketAndPrefix creates a new S3Storage for testing.
|
|
func NewS3WithBucketAndPrefix(t *testing.T, bucketName, prefixName string) (*s3like.Storage, func()) {
|
|
backend := s3mem.New()
|
|
faker := gofakes3.New(backend)
|
|
ts := httptest.NewServer(faker.Server())
|
|
err := backend.CreateBucket("test")
|
|
require.NoError(t, err)
|
|
|
|
cfg, err := config.LoadDefaultConfig(context.Background(),
|
|
config.WithRegion("region"),
|
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy-access", "dummy-secret", "")),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
svc := s3.NewFromConfig(cfg, func(o *s3.Options) {
|
|
o.BaseEndpoint = aws.String(ts.URL)
|
|
o.UsePathStyle = true // Removes need for subdomain
|
|
})
|
|
|
|
st := s3store.NewS3StorageForTest(svc, &backuppb.S3{
|
|
Region: "region",
|
|
Bucket: bucketName,
|
|
Prefix: prefixName,
|
|
Acl: "acl",
|
|
Sse: "sse",
|
|
StorageClass: "sc",
|
|
}, nil)
|
|
return st, ts.Close
|
|
}
|