From e4bd81b00ecb563d0bc11db2fcee7e89ff410a6e Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 16 Jan 2026 12:28:38 +0800 Subject: [PATCH] objstore: extract AWS SDK related logic into separate client and extract common code of s3 like store (#65561) ref pingcap/tidb#65461 --- Makefile | 1 + pkg/objstore/s3like/BUILD.bazel | 25 +- pkg/objstore/s3like/interface.go | 99 ++++++ pkg/objstore/s3like/mock/BUILD.bazel | 13 + pkg/objstore/s3like/mock/client_mock.go | 233 +++++++++++++ pkg/objstore/s3like/permission.go | 50 +++ pkg/objstore/s3like/permission_test.go | 68 ++++ pkg/objstore/s3store/BUILD.bazel | 5 +- pkg/objstore/s3store/client.go | 414 ++++++++++++++++++++++++ pkg/objstore/s3store/client_test.go | 253 +++++++++++++++ pkg/objstore/s3store/io.go | 28 +- pkg/objstore/s3store/main_test.go | 10 +- pkg/objstore/s3store/s3.go | 410 ++++------------------- pkg/objstore/s3store/s3_test.go | 27 +- 14 files changed, 1252 insertions(+), 384 deletions(-) create mode 100644 pkg/objstore/s3like/interface.go create mode 100644 pkg/objstore/s3like/mock/BUILD.bazel create mode 100644 pkg/objstore/s3like/mock/client_mock.go create mode 100644 pkg/objstore/s3like/permission.go create mode 100644 pkg/objstore/s3like/permission_test.go create mode 100644 pkg/objstore/s3store/client.go create mode 100644 pkg/objstore/s3store/client_test.go diff --git a/Makefile b/Makefile index 957d1f67ce..cabb100d6c 100644 --- a/Makefile +++ b/Makefile @@ -567,6 +567,7 @@ gen_mock: mockgen tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go tools/bin/mockgen -package mockobjstore github.com/pingcap/tidb/pkg/objstore Storage > pkg/objstore/mockobjstore/objstore_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/objstore/s3store S3API > pkg/objstore/s3store/mock/s3api_mock.go + tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/objstore/s3like PrefixClient > pkg/objstore/s3like/mock/client_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl SchemaLoader > pkg/ddl/mock/schema_loader_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl/systable Manager > pkg/ddl/mock/systable_manager_mock.go diff --git a/pkg/objstore/s3like/BUILD.bazel b/pkg/objstore/s3like/BUILD.bazel index 02e1cb3296..71e0d3bc2d 100644 --- a/pkg/objstore/s3like/BUILD.bazel +++ b/pkg/objstore/s3like/BUILD.bazel @@ -1,13 +1,19 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "s3like", - srcs = ["retry.go"], + srcs = [ + "interface.go", + "permission.go", + "retry.go", + ], importpath = "github.com/pingcap/tidb/pkg/objstore/s3like", visibility = ["//visibility:public"], deps = [ "//br/pkg/logutil", "//pkg/metrics", + "//pkg/objstore/objectio", + "//pkg/objstore/storeapi", "@com_github_aliyun_alibabacloud_oss_go_sdk_v2//oss/retry", "@com_github_aws_aws_sdk_go_v2//aws", "@com_github_pingcap_errors//:errors", @@ -16,3 +22,18 @@ go_library( "@org_uber_go_zap//:zap", ], ) + +go_test( + name = "s3like_test", + timeout = "short", + srcs = ["permission_test.go"], + flaky = True, + deps = [ + ":s3like", + "//pkg/objstore/s3like/mock", + "//pkg/objstore/storeapi", + "@com_github_pingcap_errors//:errors", + "@com_github_stretchr_testify//require", + "@org_uber_go_mock//gomock", + ], +) diff --git a/pkg/objstore/s3like/interface.go b/pkg/objstore/s3like/interface.go new file mode 100644 index 0000000000..f635cb4349 --- /dev/null +++ b/pkg/objstore/s3like/interface.go @@ -0,0 +1,99 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package s3like + +import ( + "context" + "io" + + "github.com/pingcap/tidb/pkg/objstore/objectio" + "github.com/pingcap/tidb/pkg/objstore/storeapi" +) + +// GetResp is the response of GetObject. +type GetResp struct { + Body io.ReadCloser + IsFullRange bool + ContentLength *int64 + ContentRange *string +} + +// Object is the object info. +type Object struct { + Key string + Size int64 +} + +// ListResp is the response of ListObjects. +type ListResp struct { + NextMarker *string + IsTruncated bool + Objects []Object +} + +// CopyInput is the input of CopyObject. +type CopyInput struct { + FromLoc storeapi.BucketPrefix + // relative to FromLoc + FromKey string + // relative to the PrefixClient + ToKey string +} + +// Uploader is used to abstract the concurrent multipart uploader. +// such as the one in S3 SDK manager.Uploader +type Uploader interface { + // Upload uploads the data from the reader. + // should be run in a separate goroutine. + Upload(ctx context.Context, rd io.Reader) error +} + +// PrefixClient is the client for a given bucket prefix. +type PrefixClient interface { + // CheckBucketExistence checks the existence of the bucket. + CheckBucketExistence(ctx context.Context) error + // CheckListObjects checks the permission of listObjects + CheckListObjects(ctx context.Context) error + // CheckGetObject checks the permission of getObject + CheckGetObject(ctx context.Context) error + // CheckPutAndDeleteObject checks the permission of putObject + CheckPutAndDeleteObject(ctx context.Context) (err error) + // GetObject gets the object with the given name and range [startOffset, endOffset). + GetObject(ctx context.Context, name string, startOffset, endOffset int64) (*GetResp, error) + // PutObject puts the object with the given name and data. + PutObject(ctx context.Context, name string, data []byte) error + // DeleteObject deletes the object with the given name. + DeleteObject(ctx context.Context, name string) error + // DeleteObjects deletes multiple objects with the given names. + DeleteObjects(ctx context.Context, names []string) error + // IsObjectExists checks whether the object with the given name exists. + IsObjectExists(ctx context.Context, name string) (bool, error) + // ListObjects lists objects with the given extra prefix, marker and maxKeys. + // the marker is the key to start after, if nil, start from the beginning. + // maxKeys is the maximum number of keys to return. + // Note: the extraPrefix is directly appended to the storeapi.Prefix of the + // PrefixClient, caller should make sure the input extraPrefix correct. + ListObjects(ctx context.Context, extraPrefix string, marker *string, maxKeys int) (*ListResp, error) + // CopyObject copies an object from the source to the destination. + CopyObject(ctx context.Context, params *CopyInput) error + // MultipartWriter creates a multipart writer for the object with the given + // name. each write to the returned writer will be uploaded as a part, so + // the caller should control the size of each write to fit the part size + // limit of the underlying S3-like storage. + MultipartWriter(ctx context.Context, name string) (objectio.Writer, error) + // MultipartUploader creates a multipart uploader for the object. + // unlike MultipartWriter, this method allows concurrent uploading of parts. + MultipartUploader(name string, partSize int64, concurrency int) Uploader +} diff --git a/pkg/objstore/s3like/mock/BUILD.bazel b/pkg/objstore/s3like/mock/BUILD.bazel new file mode 100644 index 0000000000..eaead91ee2 --- /dev/null +++ b/pkg/objstore/s3like/mock/BUILD.bazel @@ -0,0 +1,13 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["client_mock.go"], + importpath = "github.com/pingcap/tidb/pkg/objstore/s3like/mock", + visibility = ["//visibility:public"], + deps = [ + "//pkg/objstore/objectio", + "//pkg/objstore/s3like", + "@org_uber_go_mock//gomock", + ], +) diff --git a/pkg/objstore/s3like/mock/client_mock.go b/pkg/objstore/s3like/mock/client_mock.go new file mode 100644 index 0000000000..326021f120 --- /dev/null +++ b/pkg/objstore/s3like/mock/client_mock.go @@ -0,0 +1,233 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/pingcap/tidb/pkg/objstore/s3like (interfaces: PrefixClient) +// +// Generated by this command: +// +// mockgen -package mock github.com/pingcap/tidb/pkg/objstore/s3like PrefixClient +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + objectio "github.com/pingcap/tidb/pkg/objstore/objectio" + s3like "github.com/pingcap/tidb/pkg/objstore/s3like" + gomock "go.uber.org/mock/gomock" +) + +// MockPrefixClient is a mock of PrefixClient interface. +type MockPrefixClient struct { + ctrl *gomock.Controller + recorder *MockPrefixClientMockRecorder +} + +// MockPrefixClientMockRecorder is the mock recorder for MockPrefixClient. +type MockPrefixClientMockRecorder struct { + mock *MockPrefixClient +} + +// NewMockPrefixClient creates a new mock instance. +func NewMockPrefixClient(ctrl *gomock.Controller) *MockPrefixClient { + mock := &MockPrefixClient{ctrl: ctrl} + mock.recorder = &MockPrefixClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPrefixClient) EXPECT() *MockPrefixClientMockRecorder { + return m.recorder +} + +// ISGOMOCK indicates that this struct is a gomock mock. +func (m *MockPrefixClient) ISGOMOCK() struct{} { + return struct{}{} +} + +// CheckBucketExistence mocks base method. +func (m *MockPrefixClient) CheckBucketExistence(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckBucketExistence", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckBucketExistence indicates an expected call of CheckBucketExistence. +func (mr *MockPrefixClientMockRecorder) CheckBucketExistence(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckBucketExistence", reflect.TypeOf((*MockPrefixClient)(nil).CheckBucketExistence), arg0) +} + +// CheckGetObject mocks base method. +func (m *MockPrefixClient) CheckGetObject(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckGetObject", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckGetObject indicates an expected call of CheckGetObject. +func (mr *MockPrefixClientMockRecorder) CheckGetObject(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckGetObject", reflect.TypeOf((*MockPrefixClient)(nil).CheckGetObject), arg0) +} + +// CheckListObjects mocks base method. +func (m *MockPrefixClient) CheckListObjects(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckListObjects", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckListObjects indicates an expected call of CheckListObjects. +func (mr *MockPrefixClientMockRecorder) CheckListObjects(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckListObjects", reflect.TypeOf((*MockPrefixClient)(nil).CheckListObjects), arg0) +} + +// CheckPutAndDeleteObject mocks base method. +func (m *MockPrefixClient) CheckPutAndDeleteObject(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckPutAndDeleteObject", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckPutAndDeleteObject indicates an expected call of CheckPutAndDeleteObject. +func (mr *MockPrefixClientMockRecorder) CheckPutAndDeleteObject(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckPutAndDeleteObject", reflect.TypeOf((*MockPrefixClient)(nil).CheckPutAndDeleteObject), arg0) +} + +// CopyObject mocks base method. +func (m *MockPrefixClient) CopyObject(arg0 context.Context, arg1 *s3like.CopyInput) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CopyObject", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// CopyObject indicates an expected call of CopyObject. +func (mr *MockPrefixClientMockRecorder) CopyObject(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CopyObject", reflect.TypeOf((*MockPrefixClient)(nil).CopyObject), arg0, arg1) +} + +// DeleteObject mocks base method. +func (m *MockPrefixClient) DeleteObject(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObject", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObject indicates an expected call of DeleteObject. +func (mr *MockPrefixClientMockRecorder) DeleteObject(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockPrefixClient)(nil).DeleteObject), arg0, arg1) +} + +// DeleteObjects mocks base method. +func (m *MockPrefixClient) DeleteObjects(arg0 context.Context, arg1 []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteObjects", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteObjects indicates an expected call of DeleteObjects. +func (mr *MockPrefixClientMockRecorder) DeleteObjects(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObjects", reflect.TypeOf((*MockPrefixClient)(nil).DeleteObjects), arg0, arg1) +} + +// GetObject mocks base method. +func (m *MockPrefixClient) GetObject(arg0 context.Context, arg1 string, arg2, arg3 int64) (*s3like.GetResp, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetObject", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*s3like.GetResp) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetObject indicates an expected call of GetObject. +func (mr *MockPrefixClientMockRecorder) GetObject(arg0, arg1, arg2, arg3 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockPrefixClient)(nil).GetObject), arg0, arg1, arg2, arg3) +} + +// IsObjectExists mocks base method. +func (m *MockPrefixClient) IsObjectExists(arg0 context.Context, arg1 string) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsObjectExists", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsObjectExists indicates an expected call of IsObjectExists. +func (mr *MockPrefixClientMockRecorder) IsObjectExists(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsObjectExists", reflect.TypeOf((*MockPrefixClient)(nil).IsObjectExists), arg0, arg1) +} + +// ListObjects mocks base method. +func (m *MockPrefixClient) ListObjects(arg0 context.Context, arg1 string, arg2 *string, arg3 int) (*s3like.ListResp, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListObjects", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*s3like.ListResp) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListObjects indicates an expected call of ListObjects. +func (mr *MockPrefixClientMockRecorder) ListObjects(arg0, arg1, arg2, arg3 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListObjects", reflect.TypeOf((*MockPrefixClient)(nil).ListObjects), arg0, arg1, arg2, arg3) +} + +// MultipartUploader mocks base method. +func (m *MockPrefixClient) MultipartUploader(arg0 string, arg1 int64, arg2 int) s3like.Uploader { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MultipartUploader", arg0, arg1, arg2) + ret0, _ := ret[0].(s3like.Uploader) + return ret0 +} + +// MultipartUploader indicates an expected call of MultipartUploader. +func (mr *MockPrefixClientMockRecorder) MultipartUploader(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MultipartUploader", reflect.TypeOf((*MockPrefixClient)(nil).MultipartUploader), arg0, arg1, arg2) +} + +// MultipartWriter mocks base method. +func (m *MockPrefixClient) MultipartWriter(arg0 context.Context, arg1 string) (objectio.Writer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MultipartWriter", arg0, arg1) + ret0, _ := ret[0].(objectio.Writer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MultipartWriter indicates an expected call of MultipartWriter. +func (mr *MockPrefixClientMockRecorder) MultipartWriter(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MultipartWriter", reflect.TypeOf((*MockPrefixClient)(nil).MultipartWriter), arg0, arg1) +} + +// PutObject mocks base method. +func (m *MockPrefixClient) PutObject(arg0 context.Context, arg1 string, arg2 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PutObject", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// PutObject indicates an expected call of PutObject. +func (mr *MockPrefixClientMockRecorder) PutObject(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutObject", reflect.TypeOf((*MockPrefixClient)(nil).PutObject), arg0, arg1, arg2) +} diff --git a/pkg/objstore/s3like/permission.go b/pkg/objstore/s3like/permission.go new file mode 100644 index 0000000000..71cb267e0d --- /dev/null +++ b/pkg/objstore/s3like/permission.go @@ -0,0 +1,50 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package s3like + +import ( + "context" + goerrors "errors" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/objstore/storeapi" +) + +// CheckPermissions checks whether the client has the given permissions. +func CheckPermissions(ctx context.Context, cli PrefixClient, perms []storeapi.Permission) error { + for _, perm := range perms { + switch perm { + case storeapi.AccessBuckets: + if err := cli.CheckBucketExistence(ctx); err != nil { + return errors.Annotatef(err, "permission %s", perm) + } + case storeapi.ListObjects: + if err := cli.CheckListObjects(ctx); err != nil { + return errors.Annotatef(err, "permission %s", perm) + } + case storeapi.GetObject: + if err := cli.CheckGetObject(ctx); err != nil { + return errors.Annotatef(err, "permission %s", perm) + } + case storeapi.PutAndDeleteObject: + if err := cli.CheckPutAndDeleteObject(ctx); err != nil { + return errors.Annotatef(err, "permission %s", perm) + } + default: + return goerrors.New("unknown permission: " + string(perm)) + } + } + return nil +} diff --git a/pkg/objstore/s3like/permission_test.go b/pkg/objstore/s3like/permission_test.go new file mode 100644 index 0000000000..77d5168416 --- /dev/null +++ b/pkg/objstore/s3like/permission_test.go @@ -0,0 +1,68 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package s3like_test + +import ( + "context" + "fmt" + "testing" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/objstore/s3like" + "github.com/pingcap/tidb/pkg/objstore/s3like/mock" + "github.com/pingcap/tidb/pkg/objstore/storeapi" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestCheckPermissions(t *testing.T) { + ctrl := gomock.NewController(t) + mockCli := mock.NewMockPrefixClient(ctrl) + ctx := context.Background() + for _, c := range []struct { + perm storeapi.Permission + mockFn func(any) *gomock.Call + }{ + {storeapi.AccessBuckets, mockCli.EXPECT().CheckBucketExistence}, + {storeapi.ListObjects, mockCli.EXPECT().CheckListObjects}, + {storeapi.GetObject, mockCli.EXPECT().CheckGetObject}, + {storeapi.PutAndDeleteObject, mockCli.EXPECT().CheckPutAndDeleteObject}, + } { + c.mockFn(gomock.Any()).Return(errors.New("some error")) + err := s3like.CheckPermissions(ctx, mockCli, []storeapi.Permission{c.perm}) + require.ErrorContains(t, err, fmt.Sprintf("permission %s: some error", c.perm)) + require.True(t, ctrl.Satisfied()) + } + + require.ErrorContains(t, s3like.CheckPermissions(ctx, mockCli, []storeapi.Permission{storeapi.PutObject}), + "unknown permission: PutObject") + + for _, mFn := range []func(any) *gomock.Call{ + mockCli.EXPECT().CheckBucketExistence, + mockCli.EXPECT().CheckListObjects, + mockCli.EXPECT().CheckGetObject, + mockCli.EXPECT().CheckPutAndDeleteObject, + } { + mFn(gomock.Any()).Return(nil) + } + err := s3like.CheckPermissions(ctx, mockCli, []storeapi.Permission{ + storeapi.AccessBuckets, + storeapi.ListObjects, + storeapi.GetObject, + storeapi.PutAndDeleteObject, + }) + require.NoError(t, err) + require.True(t, ctrl.Satisfied()) +} diff --git a/pkg/objstore/s3store/BUILD.bazel b/pkg/objstore/s3store/BUILD.bazel index f53bb188b0..e924021d9a 100644 --- a/pkg/objstore/s3store/BUILD.bazel +++ b/pkg/objstore/s3store/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "s3store", srcs = [ + "client.go", "interface.go", "io.go", "ks3.go", @@ -58,6 +59,7 @@ go_test( name = "s3store_test", timeout = "short", srcs = [ + "client_test.go", "main_test.go", "retry_test.go", "s3_flags_test.go", @@ -65,11 +67,12 @@ go_test( ], embed = [":s3store"], flaky = True, - shard_count = 49, + shard_count = 50, deps = [ "//pkg/objstore", "//pkg/objstore/objectio", "//pkg/objstore/recording", + "//pkg/objstore/s3like", "//pkg/objstore/s3store/mock", "//pkg/objstore/storeapi", "//pkg/testkit/testfailpoint", diff --git a/pkg/objstore/s3store/client.go b/pkg/objstore/s3store/client.go new file mode 100644 index 0000000000..2815d7a5a6 --- /dev/null +++ b/pkg/objstore/s3store/client.go @@ -0,0 +1,414 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package s3store + +import ( + "bytes" + "context" + goerrors "errors" + "fmt" + "io" + "path" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" + "github.com/aws/smithy-go/middleware" + smithyhttp "github.com/aws/smithy-go/transport/http" + "github.com/google/uuid" + "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/pkg/objstore/objectio" + "github.com/pingcap/tidb/pkg/objstore/s3like" + "github.com/pingcap/tidb/pkg/objstore/storeapi" + "go.uber.org/zap" +) + +type s3Client struct { + svc S3API + storeapi.BucketPrefix + options *backuppb.S3 + // used to indicate that the S3 storage is not the official AWS S3, but a + // S3-compatible storage, such as minio/KS3/OSS. + // SDK v2 has some compliance issue with its doc, such as DeleteObjects, v2 + // doesn't send the Content-MD5 header while the doc says it must be sent, + // and might report "Missing required header for this request: Content-Md5" + s3Compatible bool +} + +var _ s3like.PrefixClient = (*s3Client)(nil) + +func (c *s3Client) CheckBucketExistence(ctx context.Context) error { + input := &s3.HeadBucketInput{ + Bucket: aws.String(c.Bucket), + } + _, err := c.svc.HeadBucket(ctx, input) + return errors.Trace(err) +} + +func (c *s3Client) CheckListObjects(ctx context.Context) error { + input := &s3.ListObjectsInput{ + Bucket: aws.String(c.Bucket), + Prefix: aws.String(c.Prefix.String()), + MaxKeys: aws.Int32(1), + } + _, err := c.svc.ListObjects(ctx, input) + if err != nil { + return errors.Trace(err) + } + return nil +} + +// CheckGetObject checks the permission of getObject +func (c *s3Client) CheckGetObject(ctx context.Context) error { + input := &s3.GetObjectInput{ + Bucket: aws.String(c.Bucket), + Key: aws.String("not-exists"), + } + _, err := c.svc.GetObject(ctx, input) + var aerr smithy.APIError + if goerrors.As(err, &aerr) { + if aerr.ErrorCode() == noSuchKey { + // if key not exists, and we reach this error, that means we have + // the correct permission to GetObject otherwise we will get another + // error + return nil + } + } + return errors.Trace(err) +} + +// CheckPutAndDeleteObject checks the permission of putObject +// S3 API doesn't provide a way to check the permission, we have to put an +// object to check the permission. +// exported for testing. +func (c *s3Client) CheckPutAndDeleteObject(ctx context.Context) (err error) { + file := fmt.Sprintf("access-check/%s", uuid.New().String()) + key := c.Prefix.ObjectKey(file) + defer func() { + // we always delete the object used for permission check, + // even on error, since the object might be created successfully even + // when it returns an error. + input := &s3.DeleteObjectInput{ + Bucket: aws.String(c.Bucket), + Key: aws.String(key), + } + _, err2 := c.svc.DeleteObject(ctx, input) + var noSuchKey *types.NoSuchKey + if !goerrors.As(err2, &noSuchKey) { + log.Warn("failed to delete object used for permission check", + zap.String("bucket", c.Bucket), + zap.String("key", key), zap.Error(err2)) + } + if err == nil { + err = errors.Trace(err2) + } + }() + // when no permission, aws returns err with code "AccessDenied" + input := &s3.PutObjectInput{ + Body: bytes.NewReader([]byte("check")), + Bucket: aws.String(c.Bucket), + Key: aws.String(key), + } + _, err = c.svc.PutObject(ctx, input) + return errors.Trace(err) +} + +func (c *s3Client) GetObject(ctx context.Context, name string, startOffset, endOffset int64) (*s3like.GetResp, error) { + key := c.Prefix.ObjectKey(name) + input := &s3.GetObjectInput{ + Bucket: aws.String(c.Bucket), + Key: aws.String(key), + } + fullRange, rangeVal := storeapi.GetHTTPRange(startOffset, endOffset) + if rangeVal != "" { + input.Range = aws.String(rangeVal) + } + result, err := c.svc.GetObject(ctx, input) + if err != nil { + return nil, errors.Trace(err) + } + return &s3like.GetResp{ + Body: result.Body, + IsFullRange: fullRange, + ContentLength: result.ContentLength, + ContentRange: result.ContentRange, + }, nil +} + +func (c *s3Client) PutObject(ctx context.Context, name string, data []byte) error { + // we don't need to calculate contentMD5 if s3 object lock enabled. + // since aws-go-sdk already did it in #computeBodyHashes + // https://github.com/aws/aws-sdk-go/blob/bcb2cf3fc2263c8c28b3119b07d2dbb44d7c93a0/service/s3/body_hash.go#L30 + input := c.buildPutObjectInput(c.options, name, data) + _, err := c.svc.PutObject(ctx, input) + return errors.Trace(err) +} + +func (c *s3Client) buildPutObjectInput(options *backuppb.S3, file string, data []byte) *s3.PutObjectInput { + key := c.Prefix.ObjectKey(file) + input := &s3.PutObjectInput{ + Body: bytes.NewReader(data), + Bucket: aws.String(options.Bucket), + Key: aws.String(key), + } + if options.Acl != "" { + input.ACL = types.ObjectCannedACL(options.Acl) + } + if options.Sse != "" { + input.ServerSideEncryption = types.ServerSideEncryption(options.Sse) + } + if options.SseKmsKeyId != "" { + input.SSEKMSKeyId = aws.String(options.SseKmsKeyId) + } + if options.StorageClass != "" { + input.StorageClass = types.StorageClass(options.StorageClass) + } + return input +} + +func (c *s3Client) DeleteObject(ctx context.Context, name string) error { + key := c.Prefix.ObjectKey(name) + input := &s3.DeleteObjectInput{ + Bucket: aws.String(c.Bucket), + Key: aws.String(key), + } + + _, err := c.svc.DeleteObject(ctx, input) + return errors.Trace(err) +} + +func (c *s3Client) DeleteObjects(ctx context.Context, names []string) error { + if len(names) == 0 { + return nil + } + objects := make([]types.ObjectIdentifier, 0, len(names)) + for _, file := range names { + key := c.Prefix.ObjectKey(file) + objects = append(objects, types.ObjectIdentifier{ + Key: aws.String(key), + }) + } + input := &s3.DeleteObjectsInput{ + Bucket: aws.String(c.Bucket), + Delete: &types.Delete{ + Objects: objects, + Quiet: aws.Bool(false), + }, + } + var optFns []func(*s3.Options) + // when using AWS SDK to access S3 compatible storage, such as KS3. + if c.s3Compatible { + optFns = []func(*s3.Options){withContentMD5} + } + _, err := c.svc.DeleteObjects(ctx, input, optFns...) + return errors.Trace(err) +} + +func (c *s3Client) IsObjectExists(ctx context.Context, name string) (bool, error) { + key := c.Prefix.ObjectKey(name) + input := &s3.HeadObjectInput{ + Bucket: aws.String(c.Bucket), + Key: aws.String(key), + } + + _, err := c.svc.HeadObject(ctx, input) + if err != nil { + var aerr smithy.APIError + if goerrors.As(errors.Cause(err), &aerr) { + switch aerr.ErrorCode() { + case noSuchBucket, noSuchKey, notFound: + return false, nil + } + } + return false, errors.Trace(err) + } + return true, nil +} + +func (c *s3Client) ListObjects(ctx context.Context, extraPrefix string, marker *string, maxKeys int) (*s3like.ListResp, error) { + prefix := c.Prefix.ObjectKey(extraPrefix) + req := &s3.ListObjectsInput{ + Bucket: aws.String(c.Bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int32(int32(maxKeys)), + Marker: marker, + } + // FIXME: We can't use ListObjectsV2, it is not universally supported. + // (Ceph RGW supported ListObjectsV2 since v15.1.0, released 2020 Jan 30th) + // (as of 2020, DigitalOcean Spaces still does not support V2 - https://developers.digitalocean.com/documentation/spaces/#list-bucket-contents) + res, err := c.svc.ListObjects(ctx, req) + if err != nil { + return nil, errors.Trace(err) + } + var ( + nextMarker *string + objects = make([]s3like.Object, 0, len(res.Contents)) + ) + for _, obj := range res.Contents { + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html#AmazonS3-ListObjects-response-NextMarker - + // + // `res.NextMarker` is populated only if we specify req.Delimiter. + // Aliyun OSS and minio will populate NextMarker no matter what, + // but this documented behavior does apply to AWS S3: + // + // "If response does not include the NextMarker and it is truncated, + // you can use the value of the last Key in the response as the marker + // in the subsequent request to get the next set of object keys." + nextMarker = obj.Key + objects = append(objects, s3like.Object{ + Key: aws.ToString(obj.Key), + Size: aws.ToInt64(obj.Size), + }) + } + return &s3like.ListResp{ + NextMarker: nextMarker, + IsTruncated: aws.ToBool(res.IsTruncated), + Objects: objects, + }, nil +} + +func (c *s3Client) CopyObject(ctx context.Context, params *s3like.CopyInput) error { + fromKey := params.FromLoc.Prefix.ObjectKey(params.FromKey) + toKey := c.Prefix.ObjectKey(params.ToKey) + copyInput := &s3.CopyObjectInput{ + Bucket: aws.String(c.Bucket), + // NOTE: Perhaps we need to allow copy cross regions / accounts. + CopySource: aws.String(path.Join(params.FromLoc.Bucket, fromKey)), + Key: aws.String(toKey), + } + + // We must use the client of the target region. + _, err := c.svc.CopyObject(ctx, copyInput) + return errors.Trace(err) +} + +func (c *s3Client) MultipartWriter(ctx context.Context, name string) (objectio.Writer, error) { + key := c.Prefix.ObjectKey(name) + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(c.Bucket), + Key: aws.String(key), + } + if c.options.Acl != "" { + input.ACL = types.ObjectCannedACL(c.options.Acl) + } + if c.options.Sse != "" { + input.ServerSideEncryption = types.ServerSideEncryption(c.options.Sse) + } + if c.options.SseKmsKeyId != "" { + input.SSEKMSKeyId = aws.String(c.options.SseKmsKeyId) + } + if c.options.StorageClass != "" { + input.StorageClass = types.StorageClass(c.options.StorageClass) + } + + resp, err := c.svc.CreateMultipartUpload(ctx, input) + if err != nil { + return nil, errors.Trace(err) + } + return &multipartWriter{ + svc: c.svc, + createOutput: resp, + completeParts: make([]types.CompletedPart, 0, 128), + }, nil +} + +func (c *s3Client) MultipartUploader(name string, partSize int64, concurrency int) s3like.Uploader { + up := manager.NewUploader(c.svc, func(u *manager.Uploader) { + u.PartSize = partSize + u.Concurrency = concurrency + u.BufferProvider = manager.NewBufferedReadSeekerWriteToPool(concurrency * HardcodedChunkSize) + }) + return &multipartUploader{ + uploader: up, + BucketPrefix: c.BucketPrefix, + key: c.Prefix.ObjectKey(name), + } +} + +// withContentMD5 removes all flexible checksum procecdures from an operation, +// instead computing an MD5 checksum for the request payload. +func withContentMD5(o *s3.Options) { + o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error { + _, _ = stack.Initialize.Remove("AWSChecksum:SetupInputContext") + _, _ = stack.Build.Remove("AWSChecksum:RequestMetricsTracking") + _, _ = stack.Finalize.Remove("AWSChecksum:ComputeInputPayloadChecksum") + _, _ = stack.Finalize.Remove("addInputChecksumTrailer") + return smithyhttp.AddContentChecksumMiddleware(stack) + }) +} + +// multipartWriter does multi-part upload to s3. +type multipartWriter struct { + svc S3API + createOutput *s3.CreateMultipartUploadOutput + completeParts []types.CompletedPart +} + +// UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, +// and call CompleteMultipartUpload to finish it. +func (u *multipartWriter) Write(ctx context.Context, data []byte) (int, error) { + partInput := &s3.UploadPartInput{ + Body: bytes.NewReader(data), + Bucket: u.createOutput.Bucket, + Key: u.createOutput.Key, + PartNumber: aws.Int32(int32(len(u.completeParts) + 1)), + UploadId: u.createOutput.UploadId, + ContentLength: aws.Int64(int64(len(data))), + } + + uploadResult, err := u.svc.UploadPart(ctx, partInput) + if err != nil { + return 0, errors.Trace(err) + } + u.completeParts = append(u.completeParts, types.CompletedPart{ + ETag: uploadResult.ETag, + PartNumber: partInput.PartNumber, + }) + return len(data), nil +} + +// Close complete multi upload request. +func (u *multipartWriter) Close(ctx context.Context) error { + completeInput := &s3.CompleteMultipartUploadInput{ + Bucket: u.createOutput.Bucket, + Key: u.createOutput.Key, + UploadId: u.createOutput.UploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: u.completeParts, + }, + } + _, err := u.svc.CompleteMultipartUpload(ctx, completeInput) + return errors.Trace(err) +} + +type multipartUploader struct { + uploader *manager.Uploader + storeapi.BucketPrefix + key string +} + +func (u *multipartUploader) Upload(ctx context.Context, rd io.Reader) error { + upParams := &s3.PutObjectInput{ + Bucket: aws.String(u.Bucket), + Key: aws.String(u.key), + Body: rd, + } + _, err := u.uploader.Upload(ctx, upParams) + return errors.Trace(err) +} diff --git a/pkg/objstore/s3store/client_test.go b/pkg/objstore/s3store/client_test.go new file mode 100644 index 0000000000..5696bd156f --- /dev/null +++ b/pkg/objstore/s3store/client_test.go @@ -0,0 +1,253 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package s3store + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/objstore/s3like" + "github.com/pingcap/tidb/pkg/objstore/storeapi" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestClientPermission(t *testing.T) { + s := CreateS3Suite(t) + ctx := context.Background() + cli := &s3Client{ + svc: s.MockS3, + BucketPrefix: storeapi.NewBucketPrefix("bucket", "prefix/"), + } + + t.Run("test access buckets", func(t *testing.T) { + s.MockS3.EXPECT().HeadBucket(gomock.Any(), gomock.Any()).Return(nil, nil) + require.NoError(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.AccessBuckets})) + s.MockS3.EXPECT().HeadBucket(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock head bucket error")) + require.ErrorContains(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.AccessBuckets}), "mock head bucket error") + require.True(t, s.Controller.Satisfied()) + }) + + t.Run("test list objects", func(t *testing.T) { + s.MockS3.EXPECT().ListObjects(gomock.Any(), gomock.Any()).Return(&s3.ListObjectsOutput{}, nil) + require.NoError(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.ListObjects})) + + s.MockS3.EXPECT().ListObjects(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock list error")) + require.ErrorContains(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.ListObjects}), "mock list error") + require.True(t, s.Controller.Satisfied()) + }) + + t.Run("test get objects", func(t *testing.T) { + s.MockS3.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(&s3.GetObjectOutput{}, nil) + require.NoError(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.GetObject})) + s.MockS3.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(nil, &types.NoSuchKey{}) + require.NoError(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.GetObject})) + + s.MockS3.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock get error")) + require.ErrorContains(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.GetObject}), "mock get error") + require.True(t, s.Controller.Satisfied()) + }) + + t.Run("test put-and-delete object", func(t *testing.T) { + s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + require.NoError(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.PutAndDeleteObject})) + + s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("mock put error")) + s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + require.ErrorContains(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.PutAndDeleteObject}), "mock put error") + + s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("mock del error")) + require.ErrorContains(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.PutAndDeleteObject}), "mock del error") + + s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, &smithy.GenericAPIError{Code: "AccessDenied", Message: "AccessDenied", Fault: smithy.FaultUnknown}) + require.ErrorContains(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.PutAndDeleteObject}), "AccessDenied") + + s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("mock put error")) + s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("mock del error")) + require.ErrorContains(t, s3like.CheckPermissions(ctx, cli, []storeapi.Permission{storeapi.PutAndDeleteObject}), "mock put error") + require.True(t, s.Controller.Satisfied()) + }) +} + +func TestClientGetObject(t *testing.T) { + s := CreateS3Suite(t) + ctx := context.Background() + cli := &s3Client{ + svc: s.MockS3, + BucketPrefix: storeapi.NewBucketPrefix("bucket", "prefix/"), + } + + s.MockS3.EXPECT().GetObject(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, input *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + require.Equal(t, "prefix/object", *input.Key) + require.Equal(t, "bytes=0-9", *input.Range) + return &s3.GetObjectOutput{ + ContentLength: aws.Int64(10), + ContentRange: aws.String("bytes 0-9/100"), + }, nil + }, + ) + resp, err := cli.GetObject(ctx, "object", 0, 10) + require.NoError(t, err) + require.False(t, resp.IsFullRange) + require.Equal(t, int64(10), *resp.ContentLength) + require.Equal(t, "bytes 0-9/100", *resp.ContentRange) + require.True(t, s.Controller.Satisfied()) + + s.MockS3.EXPECT().GetObject(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock get error")) + _, err = cli.GetObject(ctx, "object", 0, 10) + require.ErrorContains(t, err, "mock get error") + require.True(t, s.Controller.Satisfied()) +} + +func TestClientDeleteObjects(t *testing.T) { + s := CreateS3Suite(t) + ctx := context.Background() + cli := &s3Client{ + svc: s.MockS3, + BucketPrefix: storeapi.NewBucketPrefix("bucket", "prefix/"), + } + + s.MockS3.EXPECT().DeleteObjects(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, input *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { + require.Len(t, input.Delete.Objects, 3) + require.Equal(t, "prefix/sub/object1", *input.Delete.Objects[0].Key) + require.Equal(t, "prefix/object2", *input.Delete.Objects[1].Key) + require.Equal(t, "prefix/sub/sub2/object3", *input.Delete.Objects[2].Key) + return &s3.DeleteObjectsOutput{}, nil + }, + ) + err := cli.DeleteObjects(ctx, []string{"sub/object1", "object2", "sub/sub2/object3"}) + require.NoError(t, err) + require.True(t, s.Controller.Satisfied()) + + s.MockS3.EXPECT().DeleteObjects(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock delete error")) + err = cli.DeleteObjects(ctx, []string{"sub/object1", "object2"}) + require.ErrorContains(t, err, "mock delete error") + require.True(t, s.Controller.Satisfied()) +} + +func TestClientIsObjectExists(t *testing.T) { + s := CreateS3Suite(t) + ctx := context.Background() + cli := &s3Client{ + svc: s.MockS3, + BucketPrefix: storeapi.NewBucketPrefix("bucket", "prefix/"), + } + for _, mockErr := range []error{ + &types.NotFound{}, + &types.NoSuchKey{}, + &types.NoSuchBucket{}, + } { + s.MockS3.EXPECT().HeadObject(gomock.Any(), gomock.Any()).Return(nil, mockErr) + exists, err := cli.IsObjectExists(ctx, "object") + require.NoError(t, err) + require.False(t, exists) + require.True(t, s.Controller.Satisfied()) + } + + s.MockS3.EXPECT().HeadObject(gomock.Any(), gomock.Any()).Return(nil, errors.New("some error")) + exists, err := cli.IsObjectExists(ctx, "object") + require.ErrorContains(t, err, "some error") + require.False(t, exists) + require.True(t, s.Controller.Satisfied()) + + s.MockS3.EXPECT().HeadObject(gomock.Any(), gomock.Any()).Return(nil, nil) + exists, err = cli.IsObjectExists(ctx, "object") + require.NoError(t, err) + require.True(t, exists) + require.True(t, s.Controller.Satisfied()) +} + +func TestClientListObjects(t *testing.T) { + s := CreateS3Suite(t) + ctx := context.Background() + cli := &s3Client{ + svc: s.MockS3, + BucketPrefix: storeapi.NewBucketPrefix("bucket", "prefix/"), + } + + s.MockS3.EXPECT().ListObjects(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, input *s3.ListObjectsInput, optFns ...func(*s3.Options)) (*s3.ListObjectsOutput, error) { + require.Equal(t, "prefix/target", *input.Prefix) + require.Nil(t, input.Marker) + require.Equal(t, int32(100), aws.ToInt32(input.MaxKeys)) + return &s3.ListObjectsOutput{ + IsTruncated: aws.Bool(true), + Contents: []types.Object{ + {Key: aws.String("prefix/target/object1"), Size: aws.Int64(10)}, + {Key: aws.String("prefix/target/sub/"), Size: aws.Int64(0)}, + {Key: aws.String("prefix/target/sub/object2"), Size: aws.Int64(20)}, + }, + }, nil + }, + ) + resp, err := cli.ListObjects(ctx, "target", nil, 100) + require.NoError(t, err) + require.True(t, resp.IsTruncated) + require.EqualValues(t, "prefix/target/sub/object2", *resp.NextMarker) + require.Equal(t, []s3like.Object{ + {Key: "prefix/target/object1", Size: 10}, + {Key: "prefix/target/sub/", Size: 0}, + {Key: "prefix/target/sub/object2", Size: 20}, + }, resp.Objects) + require.True(t, s.Controller.Satisfied()) + + s.MockS3.EXPECT().ListObjects(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock list error")) + _, err = cli.ListObjects(ctx, "target", nil, 100) + require.ErrorContains(t, err, "mock list error") + require.True(t, s.Controller.Satisfied()) +} + +func TestClientCopyObject(t *testing.T) { + s := CreateS3Suite(t) + ctx := context.Background() + cli := &s3Client{ + svc: s.MockS3, + BucketPrefix: storeapi.NewBucketPrefix("bucket", "prefix/"), + } + + s.MockS3.EXPECT().CopyObject(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, input *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { + require.Equal(t, "source-bucket/source-prefix/source-object", *input.CopySource) + require.Equal(t, "prefix/dir/dest-object", *input.Key) + return &s3.CopyObjectOutput{}, nil + }, + ) + err := cli.CopyObject(ctx, &s3like.CopyInput{ + FromLoc: storeapi.NewBucketPrefix("source-bucket", "source-prefix"), + FromKey: "/source-object", // we purposely add a leading '/' to test the trimming behavior + ToKey: "dir/dest-object", + }) + require.NoError(t, err) + require.True(t, s.Controller.Satisfied()) + + s.MockS3.EXPECT().CopyObject(gomock.Any(), gomock.Any()).Return(nil, errors.New("mock copy error")) + err = cli.CopyObject(ctx, &s3like.CopyInput{ + FromLoc: storeapi.NewBucketPrefix("source-bucket", "source-prefix"), + FromKey: "source-object", + ToKey: "dir/dest-object", + }) + require.ErrorContains(t, err, "mock copy error") + require.True(t, s.Controller.Satisfied()) +} diff --git a/pkg/objstore/s3store/io.go b/pkg/objstore/s3store/io.go index a764f719fa..610c040610 100644 --- a/pkg/objstore/s3store/io.go +++ b/pkg/objstore/s3store/io.go @@ -25,6 +25,7 @@ import ( errors2 "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/pkg/metrics" + "github.com/pingcap/tidb/pkg/objstore/s3like" "github.com/pingcap/tidb/pkg/util/injectfailpoint" "github.com/pingcap/tidb/pkg/util/prefetch" "go.uber.org/zap" @@ -163,17 +164,34 @@ func (r *s3ObjectReader) GetFileSize() (int64, error) { } type asyncWriter struct { - wd *io.PipeWriter - wg *sync.WaitGroup - err error + rd *io.PipeReader + wd *io.PipeWriter + wg *sync.WaitGroup + uploader s3like.Uploader + err error + name string } -// Write implement the io.Writer interface. +func (s *asyncWriter) start(ctx context.Context) { + s.wg.Add(1) + go func() { + err := s.uploader.Upload(ctx, s.rd) + // like a channel we only let sender close the pipe in happy path + if err != nil { + log.Warn("upload to s3 failed", zap.String("filename", s.name), zap.Error(err)) + _ = s.rd.CloseWithError(err) + } + s.err = err + s.wg.Done() + }() +} + +// Write implement the objectio.Writer interface. func (s *asyncWriter) Write(_ context.Context, p []byte) (int, error) { return s.wd.Write(p) } -// Close implement the io.Closer interface. +// Close implement the objectio.Writer interface. func (s *asyncWriter) Close(_ context.Context) error { err := s.wd.Close() if err != nil { diff --git a/pkg/objstore/s3store/main_test.go b/pkg/objstore/s3store/main_test.go index 38e615b2d2..2285ab3223 100644 --- a/pkg/objstore/s3store/main_test.go +++ b/pkg/objstore/s3store/main_test.go @@ -29,18 +29,18 @@ import ( "go.uber.org/mock/gomock" ) -type S3Suite struct { +type Suite struct { Controller *gomock.Controller MockS3 *mock.MockS3API Storage *S3Storage } -func CreateS3Suite(t *testing.T) *S3Suite { +func CreateS3Suite(t *testing.T) *Suite { return CreateS3SuiteWithRec(t, nil) } -func CreateS3SuiteWithRec(t *testing.T, accessRec *recording.AccessStats) *S3Suite { - s := new(S3Suite) +func CreateS3SuiteWithRec(t *testing.T, accessRec *recording.AccessStats) *Suite { + s := new(Suite) s.Controller = gomock.NewController(t) s.MockS3 = mock.NewMockS3API(s.Controller) s.Storage = NewS3StorageForTest( @@ -63,7 +63,7 @@ func CreateS3SuiteWithRec(t *testing.T, accessRec *recording.AccessStats) *S3Sui return s } -func (s *S3Suite) ExpectedCalls(t *testing.T, data []byte, startOffsets []int, newReader func(data []byte, offset int) io.ReadCloser) { +func (s *Suite) ExpectedCalls(t *testing.T, data []byte, startOffsets []int, newReader func(data []byte, offset int) io.ReadCloser) { var lastCall *gomock.Call for _, offset := range startOffsets { thisOffset := offset diff --git a/pkg/objstore/s3store/s3.go b/pkg/objstore/s3store/s3.go index 109c677397..e153a6ea27 100644 --- a/pkg/objstore/s3store/s3.go +++ b/pkg/objstore/s3store/s3.go @@ -15,13 +15,10 @@ package s3store import ( - "bytes" "context" - goerrors "errors" "fmt" "io" "net/url" - "path" "regexp" "strconv" "strings" @@ -38,10 +35,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/sts" - "github.com/aws/smithy-go" "github.com/aws/smithy-go/middleware" smithyhttp "github.com/aws/smithy-go/transport/http" - "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" @@ -92,13 +87,6 @@ const ( domainAWS = "amazonaws.com" ) -var permissionCheckFn = map[storeapi.Permission]func(context.Context, S3API, *backuppb.S3) error{ - storeapi.AccessBuckets: s3BucketExistenceCheck, - storeapi.ListObjects: listObjectsCheck, - storeapi.GetObject: getObjectCheck, - storeapi.PutAndDeleteObject: PutAndDeleteObjectCheck, -} - // WriteBufferSize is the size of the buffer used for writing. (64K may be a better choice) var WriteBufferSize = 5 * 1024 * 1024 @@ -106,15 +94,10 @@ var WriteBufferSize = 5 * 1024 * 1024 // It implements the `Storage` interface. type S3Storage struct { svc S3API + s3Cli s3like.PrefixClient bucketPrefix storeapi.BucketPrefix options *backuppb.S3 accessRec *recording.AccessStats - // used to indicate that the S3 storage is not the official AWS S3, but a - // S3-compatible storage, such as minio/KS3/OSS. - // SDK v2 has some compliance issue with its doc, such as DeleteObjects, v2 - // doesn't send the Content-MD5 header while the doc says it must be sent, - // and might report "Missing required header for this request: Content-Md5" - s3Compatible bool } // MarkStrongConsistency implements the Storage interface. @@ -128,66 +111,17 @@ func (rs *S3Storage) GetOptions() *backuppb.S3 { } // CopyFrom implements the Storage interface. -func (rs *S3Storage) CopyFrom(ctx context.Context, e storeapi.Storage, spec storeapi.CopySpec) error { - s, ok := e.(*S3Storage) +func (rs *S3Storage) CopyFrom(ctx context.Context, inStore storeapi.Storage, spec storeapi.CopySpec) error { + srcStore, ok := inStore.(*S3Storage) if !ok { - return errors.Annotatef(berrors.ErrStorageInvalidConfig, "S3Storage.CopyFrom supports S3 storage only, get %T", e) + return errors.Annotatef(berrors.ErrStorageInvalidConfig, "S3Storage.CopyFrom supports S3 storage only, get %T", inStore) } - copyInput := &s3.CopyObjectInput{ - Bucket: aws.String(rs.options.Bucket), - // NOTE: Perhaps we need to allow copy cross regions / accounts. - CopySource: aws.String(path.Join(s.options.Bucket, s.options.Prefix, spec.From)), - Key: aws.String(rs.options.Prefix + spec.To), - } - - // We must use the client of the target region. - _, err := rs.svc.CopyObject(ctx, copyInput) - return err -} - -// S3Uploader does multi-part upload to s3. -type S3Uploader struct { - svc S3API - createOutput *s3.CreateMultipartUploadOutput - completeParts []types.CompletedPart -} - -// UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, -// and call CompleteMultipartUpload to finish it. -func (u *S3Uploader) Write(ctx context.Context, data []byte) (int, error) { - partInput := &s3.UploadPartInput{ - Body: bytes.NewReader(data), - Bucket: u.createOutput.Bucket, - Key: u.createOutput.Key, - PartNumber: aws.Int32(int32(len(u.completeParts) + 1)), - UploadId: u.createOutput.UploadId, - ContentLength: aws.Int64(int64(len(data))), - } - - uploadResult, err := u.svc.UploadPart(ctx, partInput) - if err != nil { - return 0, errors.Trace(err) - } - u.completeParts = append(u.completeParts, types.CompletedPart{ - ETag: uploadResult.ETag, - PartNumber: partInput.PartNumber, + return rs.s3Cli.CopyObject(ctx, &s3like.CopyInput{ + FromLoc: srcStore.bucketPrefix, + FromKey: spec.From, + ToKey: spec.To, }) - return len(data), nil -} - -// Close complete multi upload request. -func (u *S3Uploader) Close(ctx context.Context) error { - completeInput := &s3.CompleteMultipartUploadInput{ - Bucket: u.createOutput.Bucket, - Key: u.createOutput.Key, - UploadId: u.createOutput.UploadId, - MultipartUpload: &types.CompletedMultipartUpload{ - Parts: u.completeParts, - }, - } - _, err := u.svc.CompleteMultipartUpload(ctx, completeInput) - return errors.Trace(err) } // S3BackendOptions contains options for s3 storage. @@ -346,7 +280,12 @@ func (options *S3BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { func NewS3StorageForTest(svc S3API, options *backuppb.S3, accessRec *recording.AccessStats) *S3Storage { bucketPrefix := storeapi.NewBucketPrefix(options.Bucket, options.Prefix) return &S3Storage{ - svc: svc, + svc: svc, + s3Cli: &s3Client{ + svc: svc, + BucketPrefix: bucketPrefix, + options: options, + }, bucketPrefix: bucketPrefix, options: options, accessRec: accessRec, @@ -618,22 +557,25 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *storeapi.Opti qs.Prefix = storeapi.NewPrefix(qs.Prefix).String() bucketPrefix := storeapi.NewBucketPrefix(qs.Bucket, qs.Prefix) + s3Cli := &s3Client{ + svc: client, + BucketPrefix: bucketPrefix, + options: &qs, + s3Compatible: !officialS3, + } // Perform permission checks - for _, p := range opts.CheckPermissions { - err := permissionCheckFn[p](ctx, client, &qs) - if err != nil { - return nil, errors.Annotatef(berrors.ErrStorageInvalidPermission, "check permission %s failed due to %v", p, err) - } + if err := s3like.CheckPermissions(ctx, s3Cli, opts.CheckPermissions); err != nil { + return nil, errors.Annotatef(berrors.ErrStorageInvalidPermission, "check permission failed due to %v", err) } // Create final S3Storage instance s3Storage := &S3Storage{ svc: client, + s3Cli: s3Cli, bucketPrefix: bucketPrefix, options: &qs, accessRec: opts.AccessRecording, - s3Compatible: !officialS3, } // Check object lock status if requested @@ -644,80 +586,6 @@ func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *storeapi.Opti return s3Storage, nil } -// s3BucketExistenceCheck checks if a bucket exists. -func s3BucketExistenceCheck(ctx context.Context, svc S3API, qs *backuppb.S3) error { - input := &s3.HeadBucketInput{ - Bucket: aws.String(qs.Bucket), - } - _, err := svc.HeadBucket(ctx, input) - return errors.Trace(err) -} - -// listObjectsCheck checks the permission of listObjects -func listObjectsCheck(ctx context.Context, svc S3API, qs *backuppb.S3) error { - input := &s3.ListObjectsInput{ - Bucket: aws.String(qs.Bucket), - Prefix: aws.String(qs.Prefix), - MaxKeys: aws.Int32(1), - } - _, err := svc.ListObjects(ctx, input) - if err != nil { - return errors.Trace(err) - } - return nil -} - -// getObjectCheck checks the permission of getObject -func getObjectCheck(ctx context.Context, svc S3API, qs *backuppb.S3) error { - input := &s3.GetObjectInput{ - Bucket: aws.String(qs.Bucket), - Key: aws.String("not-exists"), - } - _, err := svc.GetObject(ctx, input) - var aerr smithy.APIError - if goerrors.As(err, &aerr) { - if aerr.ErrorCode() == noSuchKey { - // if key not exists and we reach this error, that - // means we have the correct permission to GetObject - // other we will get another error - return nil - } - return errors.Trace(err) - } - return nil -} - -// PutAndDeleteObjectCheck checks the permission of putObject -// S3 API doesn't provide a way to check the permission, we have to put an -// object to check the permission. -// exported for testing. -func PutAndDeleteObjectCheck(ctx context.Context, svc S3API, options *backuppb.S3) (err error) { - file := fmt.Sprintf("access-check/%s", uuid.New().String()) - defer func() { - // we always delete the object used for permission check, - // even on error, since the object might be created successfully even - // when it returns an error. - input := &s3.DeleteObjectInput{ - Bucket: aws.String(options.Bucket), - Key: aws.String(options.Prefix + file), - } - _, err2 := svc.DeleteObject(ctx, input) - var noSuchKey *types.NoSuchKey - if !goerrors.As(err2, &noSuchKey) { - log.Warn("failed to delete object used for permission check", - zap.String("bucket", options.Bucket), - zap.String("key", *input.Key), zap.Error(err2)) - } - if err == nil { - err = errors.Trace(err2) - } - }() - // when no permission, aws returns err with code "AccessDenied" - input := buildPutObjectInput(options, file, []byte("check")) - _, err = svc.PutObject(ctx, input) - return errors.Trace(err) -} - // IsObjectLockEnabled checks whether the S3 bucket has Object Lock enabled. func (rs *S3Storage) IsObjectLockEnabled() bool { input := &s3.GetObjectLockConfigurationInput{ @@ -736,34 +604,9 @@ func (rs *S3Storage) IsObjectLockEnabled() bool { return false } -func buildPutObjectInput(options *backuppb.S3, file string, data []byte) *s3.PutObjectInput { - input := &s3.PutObjectInput{ - Body: bytes.NewReader(data), - Bucket: aws.String(options.Bucket), - Key: aws.String(options.Prefix + file), - } - if options.Acl != "" { - input.ACL = types.ObjectCannedACL(options.Acl) - } - if options.Sse != "" { - input.ServerSideEncryption = types.ServerSideEncryption(options.Sse) - } - if options.SseKmsKeyId != "" { - input.SSEKMSKeyId = aws.String(options.SseKmsKeyId) - } - if options.StorageClass != "" { - input.StorageClass = types.StorageClass(options.StorageClass) - } - return input -} - // WriteFile writes data to a file to storage. func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) error { - input := buildPutObjectInput(rs.options, file, data) - // we don't need to calculate contentMD5 if s3 object lock enabled. - // since aws-go-sdk already did it in #computeBodyHashes - // https://github.com/aws/aws-sdk-go/blob/bcb2cf3fc2263c8c28b3119b07d2dbb44d7c93a0/service/s3/body_hash.go#L30 - _, err := rs.svc.PutObject(ctx, input) + err := rs.s3Cli.PutObject(ctx, file, data) if err != nil { return errors.Trace(err) } @@ -810,15 +653,11 @@ func (rs *S3Storage) doReadFile(ctx context.Context, file string) ([]byte, error readErr error ) for retryCnt := range maxErrorRetries { - input := &s3.GetObjectInput{ - Bucket: aws.String(rs.options.Bucket), - Key: aws.String(rs.options.Prefix + file), - } - result, err := rs.svc.GetObject(ctx, input) + result, err := rs.s3Cli.GetObject(ctx, file, 0, 0) if err != nil { return nil, errors.Annotatef(err, "failed to read s3 file, file info: input.bucket='%s', input.key='%s'", - *input.Bucket, *input.Key) + rs.options.Bucket, rs.bucketPrefix.ObjectKey(file)) } data, readErr = io.ReadAll(result.Body) // close the body of response since data has been already read out @@ -832,7 +671,7 @@ func (rs *S3Storage) doReadFile(ctx context.Context, file string) ([]byte, error if readErr != nil { if s3like.IsDeadlineExceedError(readErr) || isCancelError(readErr) { return nil, errors.Annotatef(readErr, "failed to read body from get object result, file info: input.bucket='%s', input.key='%s', retryCnt='%d'", - *input.Bucket, *input.Key, retryCnt) + rs.options.Bucket, rs.bucketPrefix.ObjectKey(file), retryCnt) } metrics.RetryableErrorCount.WithLabelValues(readErr.Error()).Inc() continue @@ -841,18 +680,12 @@ func (rs *S3Storage) doReadFile(ctx context.Context, file string) ([]byte, error } // retry too much, should be failed return nil, errors.Annotatef(readErr, "failed to read body from get object result (retry too much), file info: input.bucket='%s', input.key='%s'", - rs.options.Bucket, rs.options.Prefix+file) + rs.options.Bucket, rs.bucketPrefix.ObjectKey(file)) } // DeleteFile delete the file in s3 storage func (rs *S3Storage) DeleteFile(ctx context.Context, file string) error { - input := &s3.DeleteObjectInput{ - Bucket: aws.String(rs.options.Bucket), - Key: aws.String(rs.options.Prefix + file), - } - - _, err := rs.svc.DeleteObject(ctx, input) - return errors.Trace(err) + return rs.s3Cli.DeleteObject(ctx, file) } // s3DeleteObjectsLimit is the upper limit of objects in a delete request. @@ -866,24 +699,7 @@ func (rs *S3Storage) DeleteFiles(ctx context.Context, files []string) error { if len(batch) > s3DeleteObjectsLimit { batch = batch[:s3DeleteObjectsLimit] } - objects := make([]types.ObjectIdentifier, 0, len(batch)) - for _, file := range batch { - objects = append(objects, types.ObjectIdentifier{ - Key: aws.String(rs.options.Prefix + file), - }) - } - input := &s3.DeleteObjectsInput{ - Bucket: aws.String(rs.options.Bucket), - Delete: &types.Delete{ - Objects: objects, - Quiet: aws.Bool(false), - }, - } - var optFns []func(*s3.Options) - if rs.s3Compatible { - optFns = []func(*s3.Options){withContentMD5} - } - _, err := rs.svc.DeleteObjects(ctx, input, optFns...) + err := rs.s3Cli.DeleteObjects(ctx, batch) if err != nil { return errors.Trace(err) } @@ -894,23 +710,7 @@ func (rs *S3Storage) DeleteFiles(ctx context.Context, files []string) error { // FileExists check if file exists on s3 storage. func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error) { - input := &s3.HeadObjectInput{ - Bucket: aws.String(rs.options.Bucket), - Key: aws.String(rs.options.Prefix + file), - } - - _, err := rs.svc.HeadObject(ctx, input) - if err != nil { - var aerr smithy.APIError - if goerrors.As(errors.Cause(err), &aerr) { - switch aerr.ErrorCode() { - case noSuchBucket, noSuchKey, notFound: - return false, nil - } - } - return false, errors.Trace(err) - } - return true, nil + return rs.s3Cli.IsObjectExists(ctx, file) } // WalkDir traverse all the files in a dir. @@ -923,57 +723,41 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *storeapi.WalkOption, fn f if opt == nil { opt = &storeapi.WalkOption{} } - prefix := rs.bucketPrefix.Prefix.JoinStr(opt.SubDir).ObjectKey(opt.ObjPrefix) - - maxKeys := int64(1000) + prefix := storeapi.NewPrefix(opt.SubDir).ObjectKey(opt.ObjPrefix) + var maxKeys = 1000 if opt.ListCount > 0 { - maxKeys = opt.ListCount - } - req := &s3.ListObjectsInput{ - Bucket: aws.String(rs.options.Bucket), - Prefix: aws.String(prefix), - MaxKeys: aws.Int32(int32(maxKeys)), + maxKeys = int(opt.ListCount) } - cliPrefix := rs.bucketPrefix.PrefixStr() + var ( + marker *string + cliPrefix = rs.bucketPrefix.PrefixStr() + ) for { - // FIXME: We can't use ListObjectsV2, it is not universally supported. - // (Ceph RGW supported ListObjectsV2 since v15.1.0, released 2020 Jan 30th) - // (as of 2020, DigitalOcean Spaces still does not support V2 - https://developers.digitalocean.com/documentation/spaces/#list-bucket-contents) - res, err := rs.svc.ListObjects(ctx, req) + res, err := rs.s3Cli.ListObjects(ctx, prefix, marker, maxKeys) if err != nil { return errors.Trace(err) } - for _, r := range res.Contents { - // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html#AmazonS3-ListObjects-response-NextMarker - - // - // `res.NextMarker` is populated only if we specify req.Delimiter. - // Aliyun OSS and minio will populate NextMarker no matter what, - // but this documented behavior does apply to AWS S3: - // - // "If response does not include the NextMarker and it is truncated, - // you can use the value of the last Key in the response as the marker - // in the subsequent request to get the next set of object keys." - req.Marker = r.Key - - // when walk on specify directory, the result include storage.Prefix, + for _, r := range res.Objects { + // when walk on specify directory, the result include client prefix, // which can not be reuse in other API(Open/Read) directly. // so we use TrimPrefix to filter Prefix for next Open/Read. - path := strings.TrimPrefix(*r.Key, cliPrefix) + trimmedKey := strings.TrimPrefix(r.Key, cliPrefix) // trim the prefix '/' to ensure that the path returned is consistent with the local storage - path = strings.TrimPrefix(path, "/") - itemSize := *r.Size + trimmedKey = strings.TrimPrefix(trimmedKey, "/") + itemSize := r.Size // filter out s3's empty directory items - if itemSize <= 0 && strings.HasSuffix(path, "/") { - log.Info("this path is an empty directory and cannot be opened in S3. Skip it", zap.String("path", path)) + if itemSize <= 0 && strings.HasSuffix(trimmedKey, "/") { + log.Info("skip empty directory which cannot be opened", zap.String("key", trimmedKey)) continue } - if err = fn(path, itemSize); err != nil { + if err = fn(trimmedKey, itemSize); err != nil { return errors.Trace(err) } } - if !aws.ToBool(res.IsTruncated) { + marker = res.NextMarker + if !res.IsTruncated { break } } @@ -1044,16 +828,7 @@ func (rs *S3Storage) open( path string, startOffset, endOffset int64, ) (io.ReadCloser, RangeInfo, error) { - input := &s3.GetObjectInput{ - Bucket: aws.String(rs.options.Bucket), - Key: aws.String(rs.options.Prefix + path), - } - - isFullRangeRequest, rangeOffset := storeapi.GetHTTPRange(startOffset, endOffset) - if rangeOffset != "" { - input.Range = aws.String(rangeOffset) - } - result, err := rs.svc.GetObject(ctx, input) + result, err := rs.s3Cli.GetObject(ctx, path, startOffset, endOffset) if err != nil { return nil, RangeInfo{}, errors.Trace(err) } @@ -1061,7 +836,7 @@ func (rs *S3Storage) open( var r RangeInfo // Those requests without a `Range` will have no `ContentRange` in the response, // In this case, we'll parse the `ContentLength` field instead. - if isFullRangeRequest { + if result.IsFullRange { // We must ensure the `ContentLengh` has data even if for empty objects, // otherwise we have no places to get the object size if result.ContentLength == nil { @@ -1095,8 +870,8 @@ func (rs *S3Storage) open( rangeStr = *result.ContentRange } return nil, r, errors.Annotatef(berrors.ErrStorageUnknown, - "open file '%s' failed, expected range: %s, got: %s", - path, rangeOffset, rangeStr) + "open file '%s' failed, expected range: [%d,%d), got: %s", + path, startOffset, endOffset, rangeStr) } return result.Body, r, nil @@ -1134,76 +909,33 @@ func ParseRangeInfo(info *string) (ri RangeInfo, err error) { return } -// createUploader create multi upload request. -func (rs *S3Storage) createUploader(ctx context.Context, name string) (objectio.Writer, error) { - input := &s3.CreateMultipartUploadInput{ - Bucket: aws.String(rs.options.Bucket), - Key: aws.String(rs.options.Prefix + name), - } - if rs.options.Acl != "" { - input.ACL = types.ObjectCannedACL(rs.options.Acl) - } - if rs.options.Sse != "" { - input.ServerSideEncryption = types.ServerSideEncryption(rs.options.Sse) - } - if rs.options.SseKmsKeyId != "" { - input.SSEKMSKeyId = aws.String(rs.options.SseKmsKeyId) - } - if rs.options.StorageClass != "" { - input.StorageClass = types.StorageClass(rs.options.StorageClass) - } - - resp, err := rs.svc.CreateMultipartUpload(ctx, input) - if err != nil { - return nil, errors.Trace(err) - } - return &S3Uploader{ - svc: rs.svc, - createOutput: resp, - completeParts: make([]types.CompletedPart, 0, 128), - }, nil -} - // Create creates multi upload request. func (rs *S3Storage) Create(ctx context.Context, name string, option *storeapi.WriterOption) (objectio.Writer, error) { - var uploader objectio.Writer + var writer objectio.Writer var err error if option == nil || option.Concurrency <= 1 { - uploader, err = rs.createUploader(ctx, name) + writer, err = rs.s3Cli.MultipartWriter(ctx, name) if err != nil { return nil, err } } else { - up := manager.NewUploader(rs.svc, func(u *manager.Uploader) { - u.PartSize = option.PartSize - u.Concurrency = option.Concurrency - u.BufferProvider = manager.NewBufferedReadSeekerWriteToPool(option.Concurrency * HardcodedChunkSize) - }) + up := rs.s3Cli.MultipartUploader(name, option.PartSize, option.Concurrency) rd, wd := io.Pipe() - upParams := &s3.PutObjectInput{ - Bucket: aws.String(rs.options.Bucket), - Key: aws.String(rs.options.Prefix + name), - Body: rd, + asyncW := &asyncWriter{ + rd: rd, + wd: wd, + wg: &sync.WaitGroup{}, + uploader: up, + name: name, } - s3Writer := &asyncWriter{wd: wd, wg: &sync.WaitGroup{}} - s3Writer.wg.Add(1) - go func() { - _, err := up.Upload(ctx, upParams) - // like a channel we only let sender close the pipe in happy path - if err != nil { - log.Warn("upload to s3 failed", zap.String("filename", name), zap.Error(err)) - _ = rd.CloseWithError(err) - } - s3Writer.err = err - s3Writer.wg.Done() - }() - uploader = s3Writer + asyncW.start(ctx) + writer = asyncW } bufSize := WriteBufferSize if option != nil && option.PartSize > 0 { bufSize = int(option.PartSize) } - uploaderWriter := objectio.NewBufferedWriter(uploader, bufSize, compressedio.NoCompression, rs.accessRec) + uploaderWriter := objectio.NewBufferedWriter(writer, bufSize, compressedio.NoCompression, rs.accessRec) return uploaderWriter, nil } @@ -1226,18 +958,6 @@ func (rs *S3Storage) Rename(ctx context.Context, oldFileName, newFileName string // Close implements Storage interface. func (*S3Storage) Close() {} -// withContentMD5 removes all flexible checksum procecdures from an operation, -// instead computing an MD5 checksum for the request payload. -func withContentMD5(o *s3.Options) { - o.APIOptions = append(o.APIOptions, func(stack *middleware.Stack) error { - _, _ = stack.Initialize.Remove("AWSChecksum:SetupInputContext") - _, _ = stack.Build.Remove("AWSChecksum:RequestMetricsTracking") - _, _ = stack.Finalize.Remove("AWSChecksum:ComputeInputPayloadChecksum") - _, _ = stack.Finalize.Remove("addInputChecksumTrailer") - return smithyhttp.AddContentChecksumMiddleware(stack) - }) -} - func isCancelError(err error) bool { return strings.Contains(err.Error(), "context canceled") } diff --git a/pkg/objstore/s3store/s3_test.go b/pkg/objstore/s3store/s3_test.go index f3d4235371..8659f77741 100644 --- a/pkg/objstore/s3store/s3_test.go +++ b/pkg/objstore/s3store/s3_test.go @@ -731,31 +731,6 @@ func TestOpenReadSlowly(t *testing.T) { require.Equal(t, []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ"), res) } -func TestPutAndDeleteObjectCheck(t *testing.T) { - s := CreateS3Suite(t) - ctx := context.Background() - - s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) - s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) - require.NoError(t, PutAndDeleteObjectCheck(ctx, s.MockS3, &backuppb.S3{})) - - s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("mock put error")) - s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) - require.ErrorContains(t, PutAndDeleteObjectCheck(ctx, s.MockS3, &backuppb.S3{}), "mock put error") - - s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) - s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("mock del error")) - require.ErrorContains(t, PutAndDeleteObjectCheck(ctx, s.MockS3, &backuppb.S3{}), "mock del error") - - s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) - s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, &smithy.GenericAPIError{Code: "AccessDenied", Message: "AccessDenied", Fault: smithy.FaultUnknown}) - require.ErrorContains(t, PutAndDeleteObjectCheck(ctx, s.MockS3, &backuppb.S3{}), "AccessDenied") - - s.MockS3.EXPECT().PutObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("mock put error")) - s.MockS3.EXPECT().DeleteObject(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, errors.New("mock del error")) - require.ErrorContains(t, PutAndDeleteObjectCheck(ctx, s.MockS3, &backuppb.S3{}), "mock put error") -} - // TestOpenSeek checks that Seek is implemented correctly. func TestOpenSeek(t *testing.T) { s := CreateS3Suite(t) @@ -1537,7 +1512,7 @@ func TestOpenRangeMismatchErrorMsg(t *testing.T) { }, nil }) reader, err := s.Storage.Open(ctx, "test", &storeapi.ReaderOption{StartOffset: &start, EndOffset: &end}) - require.ErrorContains(t, err, "expected range: bytes=10-29, got: bytes 10-20/20") + require.ErrorContains(t, err, "expected range: [10,30), got: bytes 10-20/20") require.Nil(t, reader) s.MockS3.EXPECT().