692 lines
19 KiB
Go
692 lines
19 KiB
Go
// Copyright 2020 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package objstore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
goerrors "errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"net/url"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/fsouza/fake-gcs-server/fakestorage"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/tidb/pkg/objstore/recording"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
func CheckAccessStats(t *testing.T, rec *recording.AccessStats, expectedGets, expectedPuts, expectedRead, expectWrite int) {
|
|
t.Helper()
|
|
require.EqualValues(t, expectedGets, rec.Requests.Get.Load())
|
|
require.EqualValues(t, expectedPuts, rec.Requests.Put.Load())
|
|
require.EqualValues(t, expectedRead, rec.Traffic.Read.Load())
|
|
require.EqualValues(t, expectWrite, rec.Traffic.Write.Load())
|
|
}
|
|
|
|
func prepareGCSStore(t *testing.T, bucketName string, accessRec *recording.AccessStats) (*fakestorage.Server, *GCSStorage) {
|
|
t.Helper()
|
|
require.True(t, intest.InTest)
|
|
ctx := context.Background()
|
|
|
|
opts := fakestorage.Options{
|
|
NoListener: true,
|
|
}
|
|
server, err := fakestorage.NewServerWithOptions(opts)
|
|
require.NoError(t, err)
|
|
server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName})
|
|
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b/",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "Fake Credentials",
|
|
}
|
|
stg, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
AccessRecording: accessRec,
|
|
})
|
|
require.NoError(t, err)
|
|
return server, stg
|
|
}
|
|
|
|
func TestGCS(t *testing.T) {
|
|
require.True(t, intest.InTest)
|
|
ctx := context.Background()
|
|
bucketName := "testbucket"
|
|
server, stg := prepareGCSStore(t, bucketName, nil)
|
|
|
|
err := stg.WriteFile(ctx, "key", []byte("data"))
|
|
require.NoError(t, err)
|
|
|
|
err = stg.WriteFile(ctx, "key1", []byte("data1"))
|
|
require.NoError(t, err)
|
|
|
|
key2Data := []byte("data22223346757222222222289722222")
|
|
err = stg.WriteFile(ctx, "key2", key2Data)
|
|
require.NoError(t, err)
|
|
|
|
rc, err := server.Client().Bucket(bucketName).Object("a/b/key").NewReader(ctx)
|
|
require.NoError(t, err)
|
|
d, err := io.ReadAll(rc)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("data"), d)
|
|
require.NoError(t, rc.Close())
|
|
|
|
d, err = stg.ReadFile(ctx, "key")
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("data"), d)
|
|
|
|
exist, err := stg.FileExists(ctx, "key")
|
|
require.NoError(t, err)
|
|
require.True(t, exist)
|
|
|
|
exist, err = stg.FileExists(ctx, "key_not_exist")
|
|
require.NoError(t, err)
|
|
require.False(t, exist)
|
|
|
|
keyDelete := "key_delete"
|
|
exist, err = stg.FileExists(ctx, keyDelete)
|
|
require.NoError(t, err)
|
|
require.False(t, exist)
|
|
|
|
err = stg.WriteFile(ctx, keyDelete, []byte("data"))
|
|
require.NoError(t, err)
|
|
|
|
exist, err = stg.FileExists(ctx, keyDelete)
|
|
require.NoError(t, err)
|
|
require.True(t, exist)
|
|
|
|
err = stg.DeleteFile(ctx, keyDelete)
|
|
require.NoError(t, err)
|
|
|
|
err = stg.DeleteFile(ctx, keyDelete)
|
|
require.NoError(t, err)
|
|
|
|
exist, err = stg.FileExists(ctx, keyDelete)
|
|
require.NoError(t, err)
|
|
require.False(t, exist)
|
|
|
|
checkWalkDir := func(stg *GCSStorage, opt *storeapi.WalkOption) {
|
|
var totalSize int64 = 0
|
|
err = stg.WalkDir(ctx, opt, func(name string, size int64) error {
|
|
totalSize += size
|
|
// also test can use this path open file
|
|
_, err := stg.Open(ctx, name, nil)
|
|
require.NoError(t, err)
|
|
return nil
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(42), totalSize)
|
|
}
|
|
// test right prefix without sub dir opt
|
|
{
|
|
checkWalkDir(stg, nil)
|
|
}
|
|
|
|
// test right prefix with sub dir opt
|
|
{
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/", // right prefix is /a/b/
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "Fake Credentials",
|
|
}
|
|
stg, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
checkWalkDir(stg, &storeapi.WalkOption{SubDir: "b/"})
|
|
}
|
|
|
|
// test prefix without slash in new bucket without sub dir opt
|
|
{
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b", // right prefix is "a/b/"
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "Fake Credentials",
|
|
}
|
|
stg, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
checkWalkDir(stg, nil)
|
|
}
|
|
// test prefix without slash in new bucket with sub dir opt
|
|
{
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a", // right prefix is "a/b/"
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "Fake Credentials",
|
|
}
|
|
stg, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
checkWalkDir(stg, &storeapi.WalkOption{SubDir: "b/"})
|
|
}
|
|
|
|
// test 1003 files
|
|
var totalSize int64 = 0
|
|
for i := range 1000 {
|
|
err = stg.WriteFile(ctx, fmt.Sprintf("f%d", i), []byte("data"))
|
|
require.NoError(t, err)
|
|
}
|
|
filesSet := make(map[string]struct{}, 1003)
|
|
err = stg.WalkDir(ctx, nil, func(name string, size int64) error {
|
|
filesSet[name] = struct{}{}
|
|
totalSize += size
|
|
return nil
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(42+4000), totalSize)
|
|
_, ok := filesSet["key"]
|
|
require.True(t, ok)
|
|
_, ok = filesSet["key1"]
|
|
require.True(t, ok)
|
|
_, ok = filesSet["key2"]
|
|
require.True(t, ok)
|
|
for i := range 1000 {
|
|
_, ok = filesSet[fmt.Sprintf("f%d", i)]
|
|
require.True(t, ok)
|
|
}
|
|
|
|
efr, err := stg.Open(ctx, "key2", nil)
|
|
require.NoError(t, err)
|
|
|
|
p := make([]byte, 10)
|
|
n, err := efr.Read(p)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 10, n)
|
|
require.Equal(t, "data222233", string(p))
|
|
|
|
p = make([]byte, 40)
|
|
n, err = efr.Read(p)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 23, n)
|
|
require.Equal(t, "46757222222222289722222", string(p[:23]))
|
|
|
|
p = make([]byte, 5)
|
|
offs, err := efr.Seek(3, io.SeekStart)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(3), offs)
|
|
|
|
n, err = efr.Read(p)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 5, n)
|
|
require.Equal(t, "a2222", string(p))
|
|
|
|
p = make([]byte, 5)
|
|
offs, err = efr.Seek(3, io.SeekCurrent)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(11), offs)
|
|
|
|
n, err = efr.Read(p)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 5, n)
|
|
require.Equal(t, "67572", string(p))
|
|
|
|
p = make([]byte, 5)
|
|
offs, err = efr.Seek(int64(-7), io.SeekEnd)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(26), offs)
|
|
|
|
n, err = efr.Read(p)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 5, n)
|
|
require.Equal(t, "97222", string(p))
|
|
|
|
offs, err = efr.Seek(int64(100), io.SeekStart)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(100), offs)
|
|
_, err = efr.Read(p)
|
|
require.Contains(t, err.Error(), "EOF")
|
|
|
|
offs, err = efr.Seek(int64(0), io.SeekEnd)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(len(key2Data)), offs)
|
|
_, err = efr.Read(p)
|
|
require.Contains(t, err.Error(), "EOF")
|
|
|
|
offs, err = efr.Seek(int64(1), io.SeekCurrent)
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(len(key2Data)+1), offs)
|
|
_, err = efr.Read(p)
|
|
require.Contains(t, err.Error(), "EOF")
|
|
|
|
_, err = efr.Seek(int64(-10000), io.SeekEnd)
|
|
require.Error(t, err)
|
|
|
|
err = efr.Close()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, "gcs://testbucket/a/b/", stg.URI())
|
|
}
|
|
|
|
func TestNewGCSStorage(t *testing.T) {
|
|
require.True(t, intest.InTest)
|
|
ctx := context.Background()
|
|
|
|
opts := fakestorage.Options{
|
|
NoListener: true,
|
|
}
|
|
server, err := fakestorage.NewServerWithOptions(opts)
|
|
require.NoError(t, err)
|
|
bucketName := "testbucket"
|
|
server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName})
|
|
testDir := t.TempDir()
|
|
|
|
{
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b/",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "FakeCredentials",
|
|
}
|
|
_, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: true,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, "FakeCredentials", gcs.CredentialsBlob)
|
|
}
|
|
|
|
{
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b/",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "FakeCredentials",
|
|
}
|
|
_, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, "", gcs.CredentialsBlob)
|
|
}
|
|
mustReportCredErr = true
|
|
{
|
|
fakeCredentialsFile, err := os.CreateTemp(testDir, "fakeCredentialsFile")
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, fakeCredentialsFile.Close())
|
|
require.NoError(t, os.Remove(fakeCredentialsFile.Name()))
|
|
}()
|
|
_, err = fakeCredentialsFile.Write([]byte(`{"type": "service_account"}`))
|
|
require.NoError(t, err)
|
|
err = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", fakeCredentialsFile.Name())
|
|
defer func() {
|
|
require.NoError(t, os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS"))
|
|
}()
|
|
require.NoError(t, err)
|
|
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b/",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "",
|
|
}
|
|
_, err = NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: true,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, `{"type": "service_account"}`, gcs.CredentialsBlob)
|
|
}
|
|
|
|
{
|
|
fakeCredentialsFile, err := os.CreateTemp(testDir, "fakeCredentialsFile")
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, fakeCredentialsFile.Close())
|
|
require.NoError(t, os.Remove(fakeCredentialsFile.Name()))
|
|
}()
|
|
_, err = fakeCredentialsFile.Write([]byte(`{"type": "service_account"}`))
|
|
require.NoError(t, err)
|
|
err = os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", fakeCredentialsFile.Name())
|
|
defer func() {
|
|
require.NoError(t, os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS"))
|
|
}()
|
|
require.NoError(t, err)
|
|
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b/",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "",
|
|
}
|
|
s, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, "", gcs.CredentialsBlob)
|
|
require.Equal(t, "a/b/x", s.objectName("x"))
|
|
}
|
|
|
|
{
|
|
require.NoError(t, os.Unsetenv("GOOGLE_APPLICATION_CREDENTIALS"))
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b/",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "",
|
|
}
|
|
_, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: true,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.Error(t, err)
|
|
}
|
|
// without http client
|
|
{
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: `{"type": "service_account"}`,
|
|
}
|
|
_, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
})
|
|
require.NoError(t, err)
|
|
}
|
|
mustReportCredErr = false
|
|
{
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "FakeCredentials",
|
|
}
|
|
s, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
require.Equal(t, "", gcs.CredentialsBlob)
|
|
require.Equal(t, "a/b/x", s.objectName("x"))
|
|
}
|
|
}
|
|
|
|
func createGCSStore(t *testing.T) *GCSStorage {
|
|
t.Helper()
|
|
require.True(t, intest.InTest)
|
|
ctx := context.Background()
|
|
|
|
opts := fakestorage.Options{
|
|
NoListener: true,
|
|
}
|
|
server, err := fakestorage.NewServerWithOptions(opts)
|
|
require.NoError(t, err)
|
|
bucketName := "testbucket"
|
|
server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName})
|
|
|
|
gcs := &backuppb.GCS{
|
|
Bucket: bucketName,
|
|
Prefix: "a/b/",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: "Fake Credentials",
|
|
}
|
|
stg, err := NewGCSStorage(ctx, gcs, &storeapi.Options{
|
|
SendCredentials: false,
|
|
CheckPermissions: []storeapi.Permission{storeapi.AccessBuckets},
|
|
HTTPClient: server.HTTPClient(),
|
|
})
|
|
require.NoError(t, err)
|
|
return stg
|
|
}
|
|
|
|
func TestReadRange(t *testing.T) {
|
|
require.True(t, intest.InTest)
|
|
ctx := context.Background()
|
|
|
|
stg := createGCSStore(t)
|
|
defer stg.Close()
|
|
|
|
filename := "key"
|
|
err := stg.WriteFile(ctx, filename, []byte("0123456789"))
|
|
require.NoError(t, err)
|
|
|
|
start := int64(2)
|
|
end := int64(5)
|
|
r, err := stg.Open(ctx, filename, &storeapi.ReaderOption{
|
|
StartOffset: &start,
|
|
EndOffset: &end,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
content := make([]byte, 10)
|
|
n, err := r.Read(content)
|
|
require.NoError(t, err)
|
|
require.Equal(t, []byte("234"), content[:n])
|
|
}
|
|
|
|
var testingStorageURI = flag.String("testing-storage-uri", "", "the URI of the storage used for testing")
|
|
|
|
func openTestingStorage(t *testing.T) storeapi.Storage {
|
|
if *testingStorageURI == "" {
|
|
t.Skip("testingStorageURI is not set")
|
|
}
|
|
s, err := NewFromURL(context.Background(), *testingStorageURI)
|
|
require.NoError(t, err)
|
|
return s
|
|
}
|
|
|
|
func TestMultiPartUpload(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
s := openTestingStorage(t)
|
|
if _, ok := s.(*GCSStorage); !ok {
|
|
t.Skipf("only test GCSStorage, got %T", s)
|
|
}
|
|
|
|
filename := "TestMultiPartUpload"
|
|
// just get some random content, use any seed is enough
|
|
data := make([]byte, 100*1024*1024)
|
|
rand.Read(data)
|
|
w, err := s.Create(ctx, filename, &storeapi.WriterOption{Concurrency: 10})
|
|
require.NoError(t, err)
|
|
_, err = w.Write(ctx, data)
|
|
require.NoError(t, err)
|
|
err = w.Close(ctx)
|
|
require.NoError(t, err)
|
|
|
|
got, err := s.ReadFile(ctx, filename)
|
|
require.NoError(t, err)
|
|
cmp := bytes.Compare(data, got)
|
|
require.Zero(t, cmp)
|
|
}
|
|
|
|
func TestSpeedReadManyFiles(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
s := openTestingStorage(t)
|
|
if _, ok := s.(*GCSStorage); !ok {
|
|
t.Skipf("only test GCSStorage, got %T", s)
|
|
}
|
|
|
|
fileNum := 1000
|
|
filenames := make([]string, fileNum)
|
|
for i := range fileNum {
|
|
filenames[i] = fmt.Sprintf("TestSpeedReadManySmallFiles/%d", i)
|
|
}
|
|
fileSize := 1024
|
|
data := make([]byte, fileSize)
|
|
eg := &errgroup.Group{}
|
|
for i := range fileNum {
|
|
filename := filenames[i]
|
|
eg.Go(func() error {
|
|
return s.WriteFile(ctx, filename, data)
|
|
})
|
|
}
|
|
require.NoError(t, eg.Wait())
|
|
|
|
testSize := []int{10, 100, 1000}
|
|
for _, size := range testSize {
|
|
testFiles := filenames[:size]
|
|
now := time.Now()
|
|
for i := range testFiles {
|
|
filename := testFiles[i]
|
|
eg.Go(func() error {
|
|
_, err := s.ReadFile(ctx, filename)
|
|
return err
|
|
})
|
|
}
|
|
require.NoError(t, eg.Wait())
|
|
t.Logf("read %d small files cost %v", len(testFiles), time.Since(now))
|
|
}
|
|
|
|
// test read 10 * 100MB files
|
|
|
|
fileNum = 30
|
|
filenames = make([]string, fileNum)
|
|
for i := range fileNum {
|
|
filenames[i] = fmt.Sprintf("TestSpeedReadManyLargeFiles/%d", i)
|
|
}
|
|
fileSize = 100 * 1024 * 1024
|
|
data = make([]byte, fileSize)
|
|
for i := range fileNum {
|
|
filename := filenames[i]
|
|
eg.Go(func() error {
|
|
return s.WriteFile(ctx, filename, data)
|
|
})
|
|
}
|
|
require.NoError(t, eg.Wait())
|
|
|
|
testFiles := filenames
|
|
now := time.Now()
|
|
for i := range testFiles {
|
|
filename := testFiles[i]
|
|
eg.Go(func() error {
|
|
_, err := s.ReadFile(ctx, filename)
|
|
return err
|
|
})
|
|
}
|
|
require.NoError(t, eg.Wait())
|
|
t.Logf("read %d large files cost %v", len(testFiles), time.Since(now))
|
|
}
|
|
|
|
func TestGCSShouldRetry(t *testing.T) {
|
|
require.True(t, shouldRetry(&url.Error{Err: goerrors.New("http2: server sent GOAWAY and closed the connectiont"), Op: "Get", URL: "https://storage.googleapis.com/storage/v1/"}))
|
|
require.True(t, shouldRetry(&url.Error{Err: goerrors.New("http2: client connection lost"), Op: "Get", URL: "https://storage.googleapis.com/storage/v1/"}))
|
|
require.True(t, shouldRetry(&url.Error{Err: io.EOF, Op: "Get", URL: "https://storage.googleapis.com/storage/v1/"}))
|
|
}
|
|
|
|
func TestCtxUsage(t *testing.T) {
|
|
httpSvr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
|
|
defer httpSvr.Close()
|
|
|
|
ctx := context.Background()
|
|
gcs := &backuppb.GCS{
|
|
Endpoint: httpSvr.URL,
|
|
Bucket: "test",
|
|
Prefix: "prefix",
|
|
StorageClass: "NEARLINE",
|
|
PredefinedAcl: "private",
|
|
CredentialsBlob: fmt.Sprintf(`
|
|
{
|
|
"type":"external_account",
|
|
"audience":"//iam.googleapis.com/projects/1234567890123/locations/global/workloadIdentityPools/my-pool/providers/my-provider",
|
|
"subject_token_type":"urn:ietf:params:oauth:token-type:access_token",
|
|
"credential_source":{"url":"%s"}
|
|
}`, httpSvr.URL),
|
|
}
|
|
stg, err := NewGCSStorage(ctx, gcs, &storeapi.Options{})
|
|
require.NoError(t, err)
|
|
|
|
_, err = stg.FileExists(ctx, "key")
|
|
// before the fix, it's context canceled error
|
|
require.ErrorContains(t, err, "invalid_request")
|
|
}
|
|
|
|
func TestDeleteFiles(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
stg := createGCSStore(t)
|
|
defer stg.Close()
|
|
|
|
filename := "key"
|
|
err := stg.WriteFile(ctx, filename, []byte("0123456789"))
|
|
require.NoError(t, err)
|
|
require.NoError(t, stg.DeleteFiles(ctx, []string{filename, "not-exist-file"}))
|
|
}
|
|
|
|
func TestGCSAccessRecording(t *testing.T) {
|
|
ctx := context.Background()
|
|
accessRec := &recording.AccessStats{}
|
|
_, store := prepareGCSStore(t, "testbucket", accessRec)
|
|
require.NoError(t, store.WriteFile(ctx, "a.txt", []byte("hello")))
|
|
CheckAccessStats(t, accessRec, 0, 1, 0, 5)
|
|
_, err := store.ReadFile(ctx, "a.txt")
|
|
require.NoError(t, err)
|
|
CheckAccessStats(t, accessRec, 1, 1, 5, 5)
|
|
writer, err := store.Create(ctx, "b.txt", nil)
|
|
require.NoError(t, err)
|
|
_, err = writer.Write(ctx, []byte(" world!"))
|
|
require.NoError(t, err)
|
|
require.NoError(t, writer.Close(ctx))
|
|
CheckAccessStats(t, accessRec, 1, 2, 5, 12)
|
|
reader, err := store.Open(ctx, "b.txt", nil)
|
|
require.NoError(t, err)
|
|
buf := make([]byte, 20)
|
|
n, err := reader.Read(buf)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 7, n)
|
|
require.NoError(t, reader.Close())
|
|
// Open will use 2 get, one for get file size.
|
|
CheckAccessStats(t, accessRec, 3, 2, 12, 12)
|
|
}
|