diff --git a/pkg/objstore/s3store/s3.go b/pkg/objstore/s3store/s3.go index d01ab2412f..bafb38200c 100644 --- a/pkg/objstore/s3store/s3.go +++ b/pkg/objstore/s3store/s3.go @@ -110,9 +110,10 @@ var WriteBufferSize = 5 * 1024 * 1024 // S3Storage defines some standard operations for BR/Lightning on the S3 storage. // It implements the `Storage` interface. type S3Storage struct { - svc S3API - options *backuppb.S3 - accessRec *recording.AccessStats + svc S3API + bucketPrefix storeapi.BucketPrefix + options *backuppb.S3 + accessRec *recording.AccessStats // used to indicate that the S3 storage is not the official AWS S3, but a // S3-compatible storage, such as minio/KS3/OSS. // SDK v2 has some compliance issue with its doc, such as DeleteObjects, v2 @@ -348,10 +349,12 @@ func (options *S3BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { // NewS3StorageForTest creates a new S3Storage for testing only. func NewS3StorageForTest(svc S3API, options *backuppb.S3, accessRec *recording.AccessStats) *S3Storage { + bucketPrefix := storeapi.NewBucketPrefix(options.Bucket, options.Prefix) return &S3Storage{ - svc: svc, - options: options, - accessRec: accessRec, + svc: svc, + bucketPrefix: bucketPrefix, + options: options, + accessRec: accessRec, } } @@ -643,10 +646,8 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *storeapi.Opti } log.Info("succeed to get bucket region from s3", zap.String("bucket region", detectedRegion)) - // Ensure prefix ends with "/" - if len(qs.Prefix) > 0 && !strings.HasSuffix(qs.Prefix, "/") { - qs.Prefix += "/" - } + qs.Prefix = storeapi.NewPrefix(qs.Prefix).String() + bucketPrefix := storeapi.NewBucketPrefix(qs.Bucket, qs.Prefix) // Perform permission checks for _, p := range opts.CheckPermissions { @@ -659,6 +660,7 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *storeapi.Opti // Create final S3Storage instance s3Storage := &S3Storage{ svc: client, + bucketPrefix: bucketPrefix, options: &qs, accessRec: opts.AccessRecording, s3Compatible: !officialS3, @@ -951,14 +953,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *storeapi.WalkOption, fn f if opt == nil { opt = &storeapi.WalkOption{} } - prefix := path.Join(rs.options.Prefix, opt.SubDir) - if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") { - prefix += "/" - } - - if len(opt.ObjPrefix) != 0 { - prefix += opt.ObjPrefix - } + prefix := rs.bucketPrefix.Prefix.JoinStr(opt.SubDir).ObjectKey(opt.ObjPrefix) maxKeys := int64(1000) if opt.ListCount > 0 { @@ -970,6 +965,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *storeapi.WalkOption, fn f MaxKeys: aws.Int32(int32(maxKeys)), } + cliPrefix := rs.bucketPrefix.PrefixStr() for { // FIXME: We can't use ListObjectsV2, it is not universally supported. // (Ceph RGW supported ListObjectsV2 since v15.1.0, released 2020 Jan 30th) @@ -993,7 +989,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *storeapi.WalkOption, fn f // when walk on specify directory, the result include storage.Prefix, // which can not be reuse in other API(Open/Read) directly. // so we use TrimPrefix to filter Prefix for next Open/Read. - path := strings.TrimPrefix(*r.Key, rs.options.Prefix) + path := strings.TrimPrefix(*r.Key, cliPrefix) // trim the prefix '/' to ensure that the path returned is consistent with the local storage path = strings.TrimPrefix(path, "/") itemSize := *r.Size @@ -1017,7 +1013,7 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *storeapi.WalkOption, fn f // URI returns s3:///. func (rs *S3Storage) URI() string { - return "s3://" + rs.options.Bucket + "/" + rs.options.Prefix + return "s3://" + rs.options.Bucket + "/" + rs.bucketPrefix.PrefixStr() } // Open a Reader by file path. @@ -1083,24 +1079,10 @@ func (rs *S3Storage) open( Key: aws.String(rs.options.Prefix + path), } - // If we just open part of the object, we set `Range` in the request. - // If we meant to open the whole object, not just a part of it, - // we do not pass the range in the request, - // so that even if the object is empty, we can still get the response without errors. - // Then this behavior is similar to openning an empty file in local file system. - isFullRangeRequest := false - var rangeOffset *string - switch { - case endOffset > startOffset: - // s3 endOffset is inclusive - rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset-1)) - case startOffset == 0: - // openning the whole object, no need to fill the `Range` field in the request - isFullRangeRequest = true - default: - rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset)) + isFullRangeRequest, rangeOffset := storeapi.GetHTTPRange(startOffset, endOffset) + if rangeOffset != "" { + input.Range = aws.String(rangeOffset) } - input.Range = rangeOffset result, err := rs.svc.GetObject(ctx, input) if err != nil { return nil, RangeInfo{}, errors.Trace(err) @@ -1144,7 +1126,7 @@ func (rs *S3Storage) open( } return nil, r, errors.Annotatef(berrors.ErrStorageUnknown, "open file '%s' failed, expected range: %s, got: %s", - path, *rangeOffset, rangeStr) + path, rangeOffset, rangeStr) } return result.Body, r, nil diff --git a/pkg/objstore/storeapi/BUILD.bazel b/pkg/objstore/storeapi/BUILD.bazel index 5aa8a0b28a..d0f6f1d767 100644 --- a/pkg/objstore/storeapi/BUILD.bazel +++ b/pkg/objstore/storeapi/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "storeapi", @@ -11,3 +11,13 @@ go_library( "@com_github_aws_aws_sdk_go_v2//aws/retry", ], ) + +go_test( + name = "storeapi_test", + timeout = "short", + srcs = ["storage_test.go"], + embed = [":storeapi"], + flaky = False, + shard_count = 2, + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/pkg/objstore/storeapi/storage.go b/pkg/objstore/storeapi/storage.go index 55ff6596ad..95088f2143 100644 --- a/pkg/objstore/storeapi/storage.go +++ b/pkg/objstore/storeapi/storage.go @@ -16,8 +16,10 @@ package storeapi import ( "context" + "fmt" "io" "net/http" + "strings" "github.com/aws/aws-sdk-go-v2/aws/retry" "github.com/pingcap/tidb/pkg/objstore/objectio" @@ -197,3 +199,93 @@ type Options struct { // caused by retry AccessRecording *recording.AccessStats } + +// Prefix is like a folder if not empty, we still call it a prefix to match S3 +// terminology. +// if not empty, it cannot start with '/' and must end with a '/', such as +// 'a/b/'. the folder name must be valid, we don't check it here. +type Prefix string + +// NewPrefix returns a new Prefix instance from the given string. +func NewPrefix(prefix string) Prefix { + p := strings.Trim(prefix, "/") + if p != "" { + p += "/" + } + return Prefix(p) +} + +func (p Prefix) join(other Prefix) Prefix { + // due to the definition of Prefix, we can add them directly. + return p + other +} + +// JoinStr returns a new Prefix by joining the given string to the current Prefix. +func (p Prefix) JoinStr(str string) Prefix { + strPrefix := NewPrefix(str) + return p.join(strPrefix) +} + +// ObjectKey returns the object key by joining the name to the Prefix. +func (p Prefix) ObjectKey(name string) string { + // if p is not empty, it already ends with '/'. + // the name better not start with '/', else there will be double '/' in the + // key. + // this is existing behavior, we keep it. + return string(p) + name +} + +// String implements fmt.Stringer interface. +func (p Prefix) String() string { + return string(p) +} + +// BucketPrefix represents a prefix in a bucket. +type BucketPrefix struct { + Bucket string + Prefix Prefix +} + +// NewBucketPrefix returns a new BucketPrefix instance. +func NewBucketPrefix(bucket, prefix string) BucketPrefix { + return BucketPrefix{ + Bucket: bucket, + Prefix: NewPrefix(prefix), + } +} + +// ObjectKey returns the object key by joining the name to the Prefix. +func (bp *BucketPrefix) ObjectKey(name string) string { + return bp.Prefix.ObjectKey(name) +} + +// PrefixStr returns the Prefix as a string. +func (bp *BucketPrefix) PrefixStr() string { + return bp.Prefix.String() +} + +// GetHTTPRange returns the HTTP Range header value for the given start and end +// offsets. +// If endOffset is not 0, startOffset must <= endOffset; we don't check the +// validity here. +// If startOffset == 0 and endOffset == 0, `full` is true and `rangeVal` is empty. +// Otherwise, a partial object is requested, `full` is false and `rangeVal` +// contains the Range header value. +func GetHTTPRange(startOffset, endOffset int64) (full bool, rangeVal string) { + // If we just open part of the object, we set `Range` in the request. + // If we meant to open the whole object, not just a part of it, + // we do not pass the range in the request, + // so that even if the object is empty, we can still get the response without errors. + // Then this behavior is similar to opening an empty file in local file system. + switch { + case endOffset > startOffset: + // both end of http Range header are inclusive + rangeVal = fmt.Sprintf("bytes=%d-%d", startOffset, endOffset-1) + case startOffset == 0: + // opening the whole object, no need to fill the `Range` field in the request + full = true + default: + rangeVal = fmt.Sprintf("bytes=%d-", startOffset) + } + return +} diff --git a/pkg/objstore/storeapi/storage_test.go b/pkg/objstore/storeapi/storage_test.go new file mode 100644 index 0000000000..c6a617a2a5 --- /dev/null +++ b/pkg/objstore/storeapi/storage_test.go @@ -0,0 +1,59 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storeapi + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPrefix(t *testing.T) { + require.EqualValues(t, "", NewPrefix("")) + for _, c := range []string{"dir", "/dir", "dir/", "/dir/"} { + require.EqualValues(t, "dir/", NewPrefix(c)) + } + require.EqualValues(t, "dir/sub/sub2/sub3/", NewPrefix("/dir/sub/sub2/sub3/")) + prefix := NewPrefix("") + require.EqualValues(t, "", prefix.JoinStr("")) + for _, c := range []string{"dir", "/dir", "dir/", "/dir/"} { + require.EqualValues(t, "dir/", prefix.JoinStr(c)) + } + prefix = NewPrefix("/parent") + require.EqualValues(t, "parent/", prefix.JoinStr("")) + for _, c := range []string{"dir", "/dir", "dir/", "/dir/"} { + require.EqualValues(t, "parent/dir/", prefix.JoinStr(c)) + } + require.EqualValues(t, "parent/file.txt", prefix.ObjectKey("file.txt")) + require.EqualValues(t, "parent//file.txt", prefix.ObjectKey("/file.txt")) +} + +func TestGetHTTPRange(t *testing.T) { + full, val := GetHTTPRange(0, 0) + require.True(t, full) + require.Empty(t, val) + + full, val = GetHTTPRange(0, 100) + require.False(t, full) + require.EqualValues(t, "bytes=0-99", val) + + full, val = GetHTTPRange(50, 100) + require.False(t, full) + require.EqualValues(t, "bytes=50-99", val) + + full, val = GetHTTPRange(50, 0) + require.False(t, full) + require.EqualValues(t, "bytes=50-", val) +}