// Copyright 2023 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package external import ( "context" goerrors "errors" "io" "net/http/httptest" "testing" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/johannesboyne/gofakes3" "github.com/johannesboyne/gofakes3/backend/s3mem" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/tidb/pkg/lightning/common" "github.com/pingcap/tidb/pkg/lightning/membuf" "github.com/pingcap/tidb/pkg/objstore" "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/objstore/s3like" "github.com/pingcap/tidb/pkg/objstore/s3store" "github.com/stretchr/testify/require" "golang.org/x/exp/rand" ) // mockExtStore is only used for test. type mockExtStore struct { src []byte idx uint64 } func (s *mockExtStore) Read(p []byte) (n int, err error) { // Read from src to p. if s.idx >= uint64(len(s.src)) { return 0, io.EOF } n = copy(p, s.src[s.idx:]) s.idx += uint64(n) return n, nil } func (s *mockExtStore) Seek(_ int64, _ int) (int64, error) { return 0, errors.Errorf("unsupported operation") } func (*mockExtStore) Close() error { return nil } func (s *mockExtStore) GetFileSize() (int64, error) { return int64(len(s.src)), nil } func TestByteReader(t *testing.T) { st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix") defer clean() // Prepare err := st.WriteFile(context.Background(), "testfile", []byte("abcde")) require.NoError(t, err) newRsc := func() objectio.Reader { rsc, err := st.Open(context.Background(), "testfile", nil) require.NoError(t, err) return rsc } // Test basic next() usage. br, err := newByteReader(context.Background(), newRsc(), 3) require.NoError(t, err) n, bs := br.next(1) require.Equal(t, 1, n) require.Equal(t, [][]byte{{'a'}}, bs) n, bs = br.next(2) require.Equal(t, 2, n) require.Equal(t, [][]byte{{'b', 'c'}}, bs) require.NoError(t, br.Close()) // Test basic readNBytes() usage. br, err = newByteReader(context.Background(), newRsc(), 3) require.NoError(t, err) x, err := br.readNBytes(2) require.NoError(t, err) require.Equal(t, 2, len(x)) require.Equal(t, byte('a'), x[0]) require.Equal(t, byte('b'), x[1]) require.NoError(t, br.Close()) br, err = newByteReader(context.Background(), newRsc(), 3) require.NoError(t, err) x, err = br.readNBytes(5) // Read all the data. require.NoError(t, err) require.Equal(t, 5, len(x)) require.Equal(t, byte('e'), x[4]) _, err = br.readNBytes(1) // EOF require.ErrorIs(t, err, io.EOF) require.NoError(t, br.Close()) br, err = newByteReader(context.Background(), newRsc(), 3) require.NoError(t, err) _, err = br.readNBytes(7) // EOF require.ErrorIs(t, err, io.ErrUnexpectedEOF) err = st.WriteFile(context.Background(), "testfile", []byte("abcdef")) require.NoError(t, err) ms := &mockExtStore{src: []byte("abcdef")} br, err = newByteReader(context.Background(), ms, 2) require.NoError(t, err) x, err = br.readNBytes(3) require.NoError(t, err) // Pollute mockExtStore to verify if the slice is not affected. copy(ms.src, "xyz") require.Equal(t, 3, len(x)) require.Equal(t, byte('c'), x[2]) require.NoError(t, br.Close()) ms = &mockExtStore{src: []byte("abcdef")} br, err = newByteReader(context.Background(), ms, 2) require.NoError(t, err) x, err = br.readNBytes(2) require.NoError(t, err) // Pollute mockExtStore to verify if the slice is not affected. copy(ms.src, "xyz") require.Equal(t, 2, len(x)) require.Equal(t, byte('b'), x[1]) require.NoError(t, br.Close()) } func TestByteReaderAuxBuf(t *testing.T) { ms := &mockExtStore{src: []byte("0123456789")} br, err := newByteReader(context.Background(), ms, 1) require.NoError(t, err) y1, err := br.readNBytes(1) require.NoError(t, err) require.Equal(t, []byte("0"), y1) y2, err := br.readNBytes(2) require.NoError(t, err) require.Equal(t, []byte("12"), y2) y3, err := br.readNBytes(1) require.NoError(t, err) require.Equal(t, []byte("3"), y3) y4, err := br.readNBytes(2) require.NoError(t, err) require.Equal(t, []byte("45"), y4) } func TestUnexpectedEOF(t *testing.T) { st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix") defer func() { clean() }() // Prepare err := st.WriteFile(context.Background(), "testfile", []byte("0123456789")) require.NoError(t, err) newRsc := func() objectio.Reader { rsc, err := st.Open(context.Background(), "testfile", nil) require.NoError(t, err) return rsc } br, err := newByteReader(context.Background(), newRsc(), 3) require.NoError(t, err) _, err = br.readNBytes(100) require.ErrorIs(t, err, io.ErrUnexpectedEOF) br, err = newByteReader(context.Background(), newRsc(), 3) require.NoError(t, err) _, err = br.readNBytes(100) require.ErrorIs(t, err, io.ErrUnexpectedEOF) } func TestEmptyContent(t *testing.T) { ms := &mockExtStore{src: []byte{}} _, err := newByteReader(context.Background(), ms, 100) require.ErrorIs(t, err, io.EOF) st, clean := NewS3WithBucketAndPrefix(t, "test", "testprefix") defer clean() // Prepare err = st.WriteFile(context.Background(), "testfile", []byte("")) require.NoError(t, err) newRsc := func() objectio.Reader { rsc, err := st.Open(context.Background(), "testfile", nil) require.NoError(t, err) return rsc } _, err = newByteReader(context.Background(), newRsc(), 100) require.ErrorIs(t, err, io.EOF) } func TestSwitchMode(t *testing.T) { seed := time.Now().Unix() rand.Seed(uint64(seed)) t.Logf("seed: %d", seed) st := objstore.NewMemStorage() // Prepare var kvAndStat [2]string ctx := context.Background() writer := NewWriterBuilder(). SetPropSizeDistance(100). SetPropKeysDistance(2). SetOnCloseFunc(func(summary *WriterSummary) { kvAndStat = summary.MultipleFilesStats[0].Filenames[0] }). BuildOneFile(st, "/test", "0") writer.InitPartSizeAndLogger(ctx, 5*1024*1024) kvCnt := 1000000 kvs := make([]common.KvPair, kvCnt) for i := range kvCnt { randLen := rand.Intn(10) + 1 kvs[i].Key = make([]byte, randLen) _, err := rand.Read(kvs[i].Key) require.NoError(t, err) randLen = rand.Intn(10) + 1 kvs[i].Val = make([]byte, randLen) _, err = rand.Read(kvs[i].Val) require.NoError(t, err) } for _, item := range kvs { err := writer.WriteRow(ctx, item.Key, item.Val) require.NoError(t, err) } err := writer.Close(ctx) require.NoError(t, err) pool := membuf.NewPool() ConcurrentReaderBufferSizePerConc = rand.Intn(100) + 1 kvReader, err := NewKVReader(context.Background(), kvAndStat[0], st, 0, 64*1024) require.NoError(t, err) kvReader.byteReader.enableConcurrentRead(st, kvAndStat[0], 100, ConcurrentReaderBufferSizePerConc, pool.NewBuffer()) modeUseCon := false i := 0 for { if rand.Intn(5) == 0 { if modeUseCon { kvReader.byteReader.switchConcurrentMode(false) modeUseCon = false } else { kvReader.byteReader.switchConcurrentMode(true) modeUseCon = true } } key, val, err := kvReader.NextKV() if goerrors.Is(err, io.EOF) { break } require.NoError(t, err) require.Equal(t, kvs[i].Key, key) require.Equal(t, kvs[i].Val, val) i++ } } // NewS3WithBucketAndPrefix creates a new S3Storage for testing. func NewS3WithBucketAndPrefix(t *testing.T, bucketName, prefixName string) (*s3like.Storage, func()) { backend := s3mem.New() faker := gofakes3.New(backend) ts := httptest.NewServer(faker.Server()) err := backend.CreateBucket("test") require.NoError(t, err) cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion("region"), config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy-access", "dummy-secret", "")), ) require.NoError(t, err) svc := s3.NewFromConfig(cfg, func(o *s3.Options) { o.BaseEndpoint = aws.String(ts.URL) o.UsePathStyle = true // Removes need for subdomain }) st := s3store.NewS3StorageForTest(svc, &backuppb.S3{ Region: "region", Bucket: bucketName, Prefix: prefixName, Acl: "acl", Sse: "sse", StorageClass: "sc", }, nil) return st, ts.Close }