739 lines
21 KiB
Go
739 lines
21 KiB
Go
// Copyright 2020 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package objstore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
goerrors "errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
|
|
"cloud.google.com/go/storage"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/log"
|
|
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
|
"github.com/pingcap/tidb/pkg/objstore/compressedio"
|
|
"github.com/pingcap/tidb/pkg/objstore/objectio"
|
|
"github.com/pingcap/tidb/pkg/objstore/recording"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/prefetch"
|
|
"github.com/spf13/pflag"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/net/http2"
|
|
"golang.org/x/oauth2/google"
|
|
"google.golang.org/api/googleapi"
|
|
"google.golang.org/api/iterator"
|
|
"google.golang.org/api/option"
|
|
htransport "google.golang.org/api/transport/http"
|
|
)
|
|
|
|
const (
|
|
gcsEndpointOption = "gcs.endpoint"
|
|
gcsStorageClassOption = "gcs.storage-class"
|
|
gcsPredefinedACL = "gcs.predefined-acl"
|
|
gcsCredentialsFile = "gcs.credentials-file"
|
|
)
|
|
|
|
// GCSBackendOptions are options for configuration the GCS storage.
|
|
type GCSBackendOptions struct {
|
|
Endpoint string `json:"endpoint" toml:"endpoint"`
|
|
StorageClass string `json:"storage-class" toml:"storage-class"`
|
|
PredefinedACL string `json:"predefined-acl" toml:"predefined-acl"`
|
|
CredentialsFile string `json:"credentials-file" toml:"credentials-file"`
|
|
}
|
|
|
|
func (options *GCSBackendOptions) apply(gcs *backuppb.GCS) error {
|
|
gcs.Endpoint = options.Endpoint
|
|
gcs.StorageClass = options.StorageClass
|
|
gcs.PredefinedAcl = options.PredefinedACL
|
|
|
|
if options.CredentialsFile != "" {
|
|
b, err := os.ReadFile(options.CredentialsFile)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
gcs.CredentialsBlob = string(b)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func defineGCSFlags(flags *pflag.FlagSet) {
|
|
// TODO: remove experimental tag if it's stable
|
|
flags.String(gcsEndpointOption, "", "(experimental) Set the GCS endpoint URL")
|
|
flags.String(gcsStorageClassOption, "", "(experimental) Specify the GCS storage class for objects")
|
|
flags.String(gcsPredefinedACL, "", "(experimental) Specify the GCS predefined acl for objects")
|
|
flags.String(gcsCredentialsFile, "", "(experimental) Set the GCS credentials file path")
|
|
}
|
|
|
|
func hiddenGCSFlags(flags *pflag.FlagSet) {
|
|
_ = flags.MarkHidden(gcsEndpointOption)
|
|
_ = flags.MarkHidden(gcsStorageClassOption)
|
|
_ = flags.MarkHidden(gcsPredefinedACL)
|
|
_ = flags.MarkHidden(gcsCredentialsFile)
|
|
}
|
|
|
|
func (options *GCSBackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
|
|
var err error
|
|
options.Endpoint, err = flags.GetString(gcsEndpointOption)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.StorageClass, err = flags.GetString(gcsStorageClassOption)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.PredefinedACL, err = flags.GetString(gcsPredefinedACL)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.CredentialsFile, err = flags.GetString(gcsCredentialsFile)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GCSStorage defines some standard operations for BR/Lightning on the GCS storage.
|
|
// It implements the `Storage` interface.
|
|
type GCSStorage struct {
|
|
gcs *backuppb.GCS
|
|
idx *atomic.Int64
|
|
clientCnt int64
|
|
clientOps []option.ClientOption
|
|
|
|
handles []*storage.BucketHandle
|
|
clients []*storage.Client
|
|
clientCancel context.CancelFunc
|
|
accessRec *recording.AccessStats
|
|
}
|
|
|
|
// CopyFrom implements Copier.
|
|
func (s *GCSStorage) CopyFrom(ctx context.Context, e storeapi.Storage, spec storeapi.CopySpec) error {
|
|
es, ok := e.(*GCSStorage)
|
|
if !ok {
|
|
return errors.Annotatef(berrors.ErrStorageInvalidConfig, "GCSStorage.CopyFrom supports only GCSStorage, get %T", e)
|
|
}
|
|
dstName := s.objectName(spec.To)
|
|
srcName := es.objectName(spec.From)
|
|
// A note here:
|
|
// https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite
|
|
// It seems extra configuration is needed when doing cross-region copying.
|
|
copier := s.GetBucketHandle().Object(dstName).CopierFrom(es.GetBucketHandle().Object(srcName))
|
|
_, err := copier.Run(ctx)
|
|
if err != nil {
|
|
return errors.Annotatef(
|
|
err,
|
|
"failed to copy %s/%s to %s/%s",
|
|
es.gcs.Bucket,
|
|
srcName,
|
|
s.gcs.Bucket,
|
|
dstName,
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// MarkStrongConsistency implements Storage interface.
|
|
func (s *GCSStorage) MarkStrongConsistency() {
|
|
// See https://cloud.google.com/storage/docs/consistency#strongly_consistent_operations
|
|
}
|
|
|
|
// GetBucketHandle gets the handle to the GCS API on the bucket.
|
|
func (s *GCSStorage) GetBucketHandle() *storage.BucketHandle {
|
|
i := s.idx.Inc() % int64(len(s.handles))
|
|
return s.handles[i]
|
|
}
|
|
|
|
// getClient gets the GCS client.
|
|
func (s *GCSStorage) getClient() *storage.Client {
|
|
i := s.idx.Inc() % int64(len(s.clients))
|
|
return s.clients[i]
|
|
}
|
|
|
|
// GetOptions gets the external storage operations for the GCS.
|
|
func (s *GCSStorage) GetOptions() *backuppb.GCS {
|
|
return s.gcs
|
|
}
|
|
|
|
// DeleteFile delete the file in storage
|
|
func (s *GCSStorage) DeleteFile(ctx context.Context, name string) error {
|
|
object := s.objectName(name)
|
|
err := s.GetBucketHandle().Object(object).Delete(ctx)
|
|
// for delete single file, files are deleted should be considered
|
|
if err != nil {
|
|
if goerrors.Is(err, storage.ErrObjectNotExist) {
|
|
return nil
|
|
}
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// DeleteFiles delete the files in storage.
|
|
// If the file does not exist, we will ignore it.
|
|
func (s *GCSStorage) DeleteFiles(ctx context.Context, names []string) error {
|
|
for _, name := range names {
|
|
err := s.DeleteFile(ctx, name)
|
|
if err != nil {
|
|
// some real-TiKV test also delete objects, so we ignore the error if
|
|
// the object does not exist
|
|
if goerrors.Is(err, storage.ErrObjectNotExist) {
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *GCSStorage) objectName(name string) string {
|
|
return path.Join(s.gcs.Prefix, name)
|
|
}
|
|
|
|
// WriteFile writes data to a file to storage.
|
|
func (s *GCSStorage) WriteFile(ctx context.Context, name string, data []byte) error {
|
|
object := s.objectName(name)
|
|
wc := s.GetBucketHandle().Object(object).NewWriter(ctx)
|
|
wc.StorageClass = s.gcs.StorageClass
|
|
wc.PredefinedACL = s.gcs.PredefinedAcl
|
|
_, err := wc.Write(data)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
s.accessRec.RecWrite(len(data))
|
|
return wc.Close()
|
|
}
|
|
|
|
// ReadFile reads the file from the storage and returns the contents.
|
|
func (s *GCSStorage) ReadFile(ctx context.Context, name string) ([]byte, error) {
|
|
object := s.objectName(name)
|
|
rc, err := s.GetBucketHandle().Object(object).NewReader(ctx)
|
|
if err != nil {
|
|
return nil, errors.Annotatef(err,
|
|
"failed to read gcs file, file info: input.bucket='%s', input.key='%s'",
|
|
s.gcs.Bucket, object)
|
|
}
|
|
defer rc.Close()
|
|
|
|
size := rc.Attrs.Size
|
|
var b []byte
|
|
if size < 0 {
|
|
// happened when using fake-gcs-server in integration test
|
|
b, err = io.ReadAll(rc)
|
|
} else {
|
|
b = make([]byte, size)
|
|
_, err = io.ReadFull(rc, b)
|
|
}
|
|
s.accessRec.RecRead(len(b))
|
|
return b, errors.Trace(err)
|
|
}
|
|
|
|
// FileExists return true if file exists.
|
|
func (s *GCSStorage) FileExists(ctx context.Context, name string) (bool, error) {
|
|
object := s.objectName(name)
|
|
_, err := s.GetBucketHandle().Object(object).Attrs(ctx)
|
|
if err != nil {
|
|
if errors.Cause(err) == storage.ErrObjectNotExist { // nolint:errorlint
|
|
return false, nil
|
|
}
|
|
return false, errors.Trace(err)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// Open a Reader by file path.
|
|
func (s *GCSStorage) Open(ctx context.Context, path string, o *storeapi.ReaderOption) (objectio.Reader, error) {
|
|
object := s.objectName(path)
|
|
handle := s.GetBucketHandle().Object(object)
|
|
|
|
attrs, err := handle.Attrs(ctx)
|
|
if err != nil {
|
|
if goerrors.Is(err, storage.ErrObjectNotExist) {
|
|
return nil, errors.Annotatef(err,
|
|
"the object doesn't exist, file info: input.bucket='%s', input.key='%s'",
|
|
s.gcs.Bucket, path)
|
|
}
|
|
return nil, errors.Annotatef(err,
|
|
"failed to get gcs file attribute, file info: input.bucket='%s', input.key='%s'",
|
|
s.gcs.Bucket, path)
|
|
}
|
|
pos := int64(0)
|
|
endPos := attrs.Size
|
|
prefetchSize := 0
|
|
if o != nil {
|
|
if o.StartOffset != nil {
|
|
pos = *o.StartOffset
|
|
}
|
|
if o.EndOffset != nil {
|
|
endPos = min(endPos, *o.EndOffset)
|
|
}
|
|
prefetchSize = o.PrefetchSize
|
|
}
|
|
|
|
return &gcsObjectReader{
|
|
storage: s,
|
|
name: path,
|
|
objHandle: handle,
|
|
reader: nil, // lazy create
|
|
ctx: ctx,
|
|
pos: pos,
|
|
endPos: endPos,
|
|
prefetchSize: prefetchSize,
|
|
totalSize: attrs.Size,
|
|
}, nil
|
|
}
|
|
|
|
// WalkDir traverse all the files in a dir.
|
|
//
|
|
// fn is the function called for each regular file visited by WalkDir.
|
|
// The first argument is the file path that can be used in `Open`
|
|
// function; the second argument is the size in byte of the file determined
|
|
// by path.
|
|
func (s *GCSStorage) WalkDir(ctx context.Context, opt *storeapi.WalkOption, fn func(string, int64) error) error {
|
|
if opt == nil {
|
|
opt = &storeapi.WalkOption{}
|
|
}
|
|
prefix := path.Join(s.gcs.Prefix, opt.SubDir)
|
|
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
|
|
prefix += "/"
|
|
}
|
|
if len(opt.ObjPrefix) != 0 {
|
|
prefix += opt.ObjPrefix
|
|
}
|
|
|
|
query := &storage.Query{Prefix: prefix}
|
|
// only need each object's name and size
|
|
err := query.SetAttrSelection([]string{"Name", "Size"})
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
iter := s.GetBucketHandle().Objects(ctx, query)
|
|
for {
|
|
attrs, err := iter.Next()
|
|
if err == iterator.Done {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
// when walk on specify directory, the result include storage.Prefix,
|
|
// which can not be reuse in other API(Open/Read) directly.
|
|
// so we use TrimPrefix to filter Prefix for next Open/Read.
|
|
path := strings.TrimPrefix(attrs.Name, s.gcs.Prefix)
|
|
// trim the prefix '/' to ensure that the path returned is consistent with the local storage
|
|
path = strings.TrimPrefix(path, "/")
|
|
if err = fn(path, attrs.Size); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// URI implements Storage interface.
|
|
func (s *GCSStorage) URI() string {
|
|
return "gcs://" + s.gcs.Bucket + "/" + s.gcs.Prefix
|
|
}
|
|
|
|
// Create implements Storage interface.
|
|
func (s *GCSStorage) Create(ctx context.Context, name string, wo *storeapi.WriterOption) (objectio.Writer, error) {
|
|
// NewGCSWriter requires real testing environment on Google Cloud.
|
|
mockGCS := intest.InTest && strings.Contains(s.gcs.GetEndpoint(), "127.0.0.1")
|
|
if wo == nil || wo.Concurrency <= 1 || mockGCS {
|
|
object := s.objectName(name)
|
|
wc := s.GetBucketHandle().Object(object).NewWriter(ctx)
|
|
wc.StorageClass = s.gcs.StorageClass
|
|
wc.PredefinedACL = s.gcs.PredefinedAcl
|
|
return newFlushStorageWriter(wc, &objectio.EmptyFlusher{}, wc, s.accessRec), nil
|
|
}
|
|
uri := s.objectName(name)
|
|
// 5MB is the minimum part size for GCS.
|
|
partSize := max(wo.PartSize, int64(gcsMinimumChunkSize))
|
|
w, err := NewGCSWriter(ctx, s.getClient(), uri, partSize, wo.Concurrency, s.gcs.Bucket)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
fw := newFlushStorageWriter(w, &objectio.EmptyFlusher{}, w, s.accessRec)
|
|
// we already pass the accessRec to flushStorageWriter.
|
|
bw := objectio.NewBufferedWriter(fw, int(partSize), compressedio.NoCompression, nil)
|
|
return bw, nil
|
|
}
|
|
|
|
// Rename file name from oldFileName to newFileName.
|
|
func (s *GCSStorage) Rename(ctx context.Context, oldFileName, newFileName string) error {
|
|
data, err := s.ReadFile(ctx, oldFileName)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
err = s.WriteFile(ctx, newFileName, data)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return s.DeleteFile(ctx, oldFileName)
|
|
}
|
|
|
|
// Close implements Storage interface.
|
|
func (s *GCSStorage) Close() {
|
|
s.clientCancel()
|
|
for _, client := range s.clients {
|
|
if err := client.Close(); err != nil {
|
|
log.Warn("failed to close gcs client", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
// used in tests
|
|
var mustReportCredErr = false
|
|
|
|
const gcsClientCnt = 16
|
|
|
|
// NewGCSStorage creates a GCS external storage implementation.
|
|
func NewGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *storeapi.Options) (*GCSStorage, error) {
|
|
var clientOps []option.ClientOption
|
|
if opts.NoCredentials {
|
|
clientOps = append(clientOps, option.WithoutAuthentication())
|
|
} else {
|
|
if gcs.CredentialsBlob == "" {
|
|
creds, err := google.FindDefaultCredentials(ctx, storage.ScopeReadWrite)
|
|
if err != nil {
|
|
if intest.InTest && !mustReportCredErr {
|
|
clientOps = append(clientOps, option.WithoutAuthentication())
|
|
goto skipHandleCred
|
|
}
|
|
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "%v Or you should provide '--%s'", err, gcsCredentialsFile)
|
|
}
|
|
if opts.SendCredentials {
|
|
if len(creds.JSON) <= 0 {
|
|
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig,
|
|
"You should provide '--%s' when '--send-credentials-to-tikv' is true", gcsCredentialsFile)
|
|
}
|
|
gcs.CredentialsBlob = string(creds.JSON)
|
|
}
|
|
if creds != nil {
|
|
clientOps = append(clientOps, option.WithCredentials(creds))
|
|
}
|
|
} else {
|
|
clientOps = append(clientOps, option.WithCredentialsJSON([]byte(gcs.GetCredentialsBlob())))
|
|
}
|
|
}
|
|
skipHandleCred:
|
|
|
|
if gcs.Endpoint != "" {
|
|
clientOps = append(clientOps, option.WithEndpoint(gcs.Endpoint))
|
|
}
|
|
|
|
httpClient := opts.HTTPClient
|
|
if opts.AccessRecording != nil {
|
|
if httpClient == nil {
|
|
transport, _ := http.DefaultTransport.(*http.Transport)
|
|
httpClient = &http.Client{Transport: transport.Clone()}
|
|
}
|
|
httpClient.Transport = &roundTripperWrapper{
|
|
RoundTripper: httpClient.Transport,
|
|
accessRec: opts.AccessRecording,
|
|
}
|
|
}
|
|
if httpClient != nil {
|
|
// see https://github.com/pingcap/tidb/issues/47022#issuecomment-1722913455
|
|
// https://www.googleapis.com/auth/cloud-platform must be set to use service_account
|
|
// type of credential-file.
|
|
newTransport, err := htransport.NewTransport(ctx, httpClient.Transport,
|
|
append(clientOps, option.WithScopes(storage.ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"))...)
|
|
if err != nil {
|
|
if intest.InTest && !mustReportCredErr {
|
|
goto skipHandleTransport
|
|
}
|
|
return nil, errors.Trace(err)
|
|
}
|
|
httpClient.Transport = newTransport
|
|
skipHandleTransport:
|
|
clientOps = append(clientOps, option.WithHTTPClient(httpClient))
|
|
}
|
|
|
|
if !opts.SendCredentials {
|
|
// Clear the credentials if exists so that they will not be sent to TiKV
|
|
gcs.CredentialsBlob = ""
|
|
}
|
|
|
|
ret := &GCSStorage{
|
|
gcs: gcs,
|
|
idx: atomic.NewInt64(0),
|
|
clientCnt: gcsClientCnt,
|
|
clientOps: clientOps,
|
|
accessRec: opts.AccessRecording,
|
|
}
|
|
if err := ret.Reset(ctx); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
// Reset resets the GCS storage. Reset should not be used concurrently with
|
|
// Close.
|
|
func (s *GCSStorage) Reset(ctx context.Context) error {
|
|
logutil.Logger(ctx).Info("resetting gcs storage")
|
|
|
|
s.cancelAndCloseGCSClients()
|
|
|
|
clientCtx, clientCancel := context.WithCancel(context.Background())
|
|
s.clients = make([]*storage.Client, 0, gcsClientCnt)
|
|
wg := util.WaitGroupWrapper{}
|
|
cliCh := make(chan *storage.Client)
|
|
wg.RunWithLog(func() {
|
|
for range gcsClientCnt {
|
|
select {
|
|
case cli := <-cliCh:
|
|
s.clients = append(s.clients, cli)
|
|
case <-ctx.Done():
|
|
clientCancel()
|
|
return
|
|
case <-clientCtx.Done():
|
|
return
|
|
}
|
|
}
|
|
})
|
|
firstErr := atomic.NewError(nil)
|
|
for range gcsClientCnt {
|
|
wg.RunWithLog(func() {
|
|
client, err := storage.NewClient(clientCtx, s.clientOps...)
|
|
if err != nil {
|
|
firstErr.CompareAndSwap(nil, err)
|
|
clientCancel()
|
|
return
|
|
}
|
|
client.SetRetry(storage.WithErrorFunc(shouldRetry), storage.WithPolicy(storage.RetryAlways))
|
|
select {
|
|
case cliCh <- client:
|
|
case <-clientCtx.Done():
|
|
}
|
|
})
|
|
}
|
|
wg.Wait()
|
|
if err := firstErr.Load(); err != nil {
|
|
s.cancelAndCloseGCSClients()
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
s.clientCancel = clientCancel
|
|
s.handles = make([]*storage.BucketHandle, gcsClientCnt)
|
|
for i := range s.handles {
|
|
s.handles[i] = s.clients[i].Bucket(s.gcs.Bucket)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *GCSStorage) cancelAndCloseGCSClients() {
|
|
if s.clientCancel != nil {
|
|
s.clientCancel()
|
|
s.clientCancel = nil
|
|
}
|
|
|
|
for _, client := range s.clients {
|
|
if client != nil {
|
|
_ = client.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
func shouldRetry(err error) bool {
|
|
if storage.ShouldRetry(err) {
|
|
return true
|
|
}
|
|
|
|
if err == nil {
|
|
return false
|
|
}
|
|
|
|
// workaround for https://github.com/googleapis/google-cloud-go/issues/7440
|
|
if e := (http2.StreamError{}); goerrors.As(err, &e) {
|
|
if e.Code == http2.ErrCodeInternal {
|
|
log.Warn("retrying gcs request due to internal HTTP2 error", zap.Error(err))
|
|
return true
|
|
}
|
|
}
|
|
|
|
// workaround for https://github.com/googleapis/google-cloud-go/issues/9262
|
|
if e := (&googleapi.Error{}); goerrors.As(err, &e) {
|
|
if e.Code == 401 {
|
|
log.Warn("retrying gcs request due to internal authentication error", zap.Error(err))
|
|
return true
|
|
}
|
|
}
|
|
|
|
// workaround for https://github.com/googleapis/google-cloud-go/issues/7090
|
|
// seems it's a bug of golang net/http: https://github.com/golang/go/issues/53472
|
|
if e := (&url.Error{}); goerrors.As(err, &e) {
|
|
if goerrors.Is(e.Err, io.EOF) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
errMsg := err.Error()
|
|
// workaround for strange unknown errors
|
|
retryableErrMsg := []string{
|
|
"http2: client connection force closed via ClientConn.Close",
|
|
"broken pipe",
|
|
"http2: client connection lost",
|
|
// See https://stackoverflow.com/questions/45209168/http2-server-sent-goaway-and-closed-the-connection-laststreamid-1999 for details.
|
|
"http2: server sent GOAWAY",
|
|
}
|
|
|
|
for _, msg := range retryableErrMsg {
|
|
if strings.Contains(errMsg, msg) {
|
|
log.Warn("retrying gcs request", zap.Error(err))
|
|
return true
|
|
}
|
|
}
|
|
|
|
// just log the new unknown error, in case we can add it to this function
|
|
if !goerrors.Is(err, context.Canceled) {
|
|
log.Warn("other error when requesting gcs",
|
|
zap.Error(err),
|
|
zap.String("info", fmt.Sprintf("type: %T, value: %#v", err, err)))
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// gcsObjectReader wrap storage.Reader and add the `Seek` method.
|
|
type gcsObjectReader struct {
|
|
storage *GCSStorage
|
|
name string
|
|
objHandle *storage.ObjectHandle
|
|
reader io.ReadCloser
|
|
// [pos, endPos) is the range of the file to read.
|
|
pos int64
|
|
endPos int64
|
|
totalSize int64
|
|
|
|
prefetchSize int
|
|
// reader context used for implement `io.Seek`
|
|
ctx context.Context
|
|
}
|
|
|
|
// Read implement the io.Reader interface.
|
|
func (r *gcsObjectReader) Read(p []byte) (n int, err error) {
|
|
failpoint.Inject("GCSReadUnexpectedEOF", func(n failpoint.Value) {
|
|
if r.prefetchSize > 0 && r.pos > 0 && rand.Intn(2) == 0 {
|
|
log.Info("ingest error in gcs reader read")
|
|
failpoint.Return(n.(int), io.ErrUnexpectedEOF)
|
|
}
|
|
})
|
|
if r.reader == nil {
|
|
length := r.endPos - r.pos
|
|
rc, err := r.objHandle.NewRangeReader(r.ctx, r.pos, length)
|
|
if err != nil {
|
|
return 0, errors.Annotatef(err,
|
|
"failed to read gcs file, file info: input.bucket='%s', input.key='%s'",
|
|
r.storage.gcs.Bucket, r.name)
|
|
}
|
|
r.reader = rc
|
|
if r.prefetchSize > 0 {
|
|
r.reader = prefetch.NewReader(r.reader, length, r.prefetchSize)
|
|
}
|
|
}
|
|
n, err = r.reader.Read(p)
|
|
r.storage.accessRec.RecRead(n)
|
|
r.pos += int64(n)
|
|
return n, err
|
|
}
|
|
|
|
// Close implement the io.Closer interface.
|
|
func (r *gcsObjectReader) Close() error {
|
|
if r.reader == nil {
|
|
return nil
|
|
}
|
|
return r.reader.Close()
|
|
}
|
|
|
|
// Seek implement the io.Seeker interface.
|
|
//
|
|
// Currently, tidb-lightning depends on this method to read parquet file for gcs storage.
|
|
func (r *gcsObjectReader) Seek(offset int64, whence int) (int64, error) {
|
|
var realOffset int64
|
|
switch whence {
|
|
case io.SeekStart:
|
|
realOffset = offset
|
|
case io.SeekCurrent:
|
|
realOffset = r.pos + offset
|
|
case io.SeekEnd:
|
|
if offset > 0 {
|
|
return 0, errors.Annotatef(berrors.ErrInvalidArgument, "Seek: offset '%v' should be negative.", offset)
|
|
}
|
|
realOffset = offset + r.totalSize
|
|
default:
|
|
return 0, errors.Annotatef(berrors.ErrStorageUnknown, "Seek: invalid whence '%d'", whence)
|
|
}
|
|
|
|
if realOffset < 0 {
|
|
return 0, errors.Annotatef(berrors.ErrInvalidArgument, "Seek: offset '%v' out of range. current pos is '%v'. total size is '%v'", offset, r.pos, r.totalSize)
|
|
}
|
|
|
|
if realOffset == r.pos {
|
|
return realOffset, nil
|
|
}
|
|
|
|
if r.reader != nil {
|
|
_ = r.reader.Close()
|
|
r.reader = nil
|
|
}
|
|
r.pos = realOffset
|
|
if realOffset >= r.totalSize {
|
|
r.reader = io.NopCloser(bytes.NewReader(nil))
|
|
return realOffset, nil
|
|
}
|
|
rc, err := r.objHandle.NewRangeReader(r.ctx, r.pos, -1)
|
|
if err != nil {
|
|
return 0, errors.Annotatef(err,
|
|
"failed to read gcs file, file info: input.bucket='%s', input.key='%s'",
|
|
r.storage.gcs.Bucket, r.name)
|
|
}
|
|
r.reader = rc
|
|
if r.prefetchSize > 0 {
|
|
r.reader = prefetch.NewReader(r.reader, r.endPos-r.pos, r.prefetchSize)
|
|
}
|
|
|
|
return realOffset, nil
|
|
}
|
|
|
|
func (r *gcsObjectReader) GetFileSize() (int64, error) {
|
|
return r.totalSize, nil
|
|
}
|
|
|
|
type roundTripperWrapper struct {
|
|
http.RoundTripper
|
|
accessRec *recording.AccessStats
|
|
}
|
|
|
|
func (rt *roundTripperWrapper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
rt.accessRec.RecRequest(req)
|
|
return rt.RoundTripper.RoundTrip(req)
|
|
}
|