Merge d4a77166612fd6d193d88f860f4059eb15f1289b into 0b9671313b14ffe839ecbd7dd2ae5ac7f6f05db8

This commit is contained in:
bodqhrohro 2025-04-15 23:02:08 +00:00 committed by GitHub
commit 9c0d304682
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -38,7 +38,6 @@ import (
// Constants // Constants
const ( const (
directoryMarkerContentType = "application/directory" // content type of directory marker objects directoryMarkerContentType = "application/directory" // content type of directory marker objects
listChunks = 1000 // chunk size to read directory listings
defaultChunkSize = 5 * fs.Gibi defaultChunkSize = 5 * fs.Gibi
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
segmentsContainerSuffix = "_segments" segmentsContainerSuffix = "_segments"
@ -310,6 +309,21 @@ of the Swift API that do not implement pagination as expected. See
also "fetch_until_empty_page".`, also "fetch_until_empty_page".`,
Default: 0, Default: 0,
Advanced: true, Advanced: true,
}, {
Name: "list_chunks",
Help: `Maximum amount of directory entries per listing request.
Reduce this if you have errors with listing large directories.`,
Default: 1000,
Advanced: true,
}, {
Name: "retry_errors",
Help: `Additional comma-separated list of error codes to retry on.
Blomp is known to return arbitrary errors on directory listing requests.
Known ones are 400, 403 and 404.`,
Default: fs.CommaSepList{},
Advanced: true,
}}, SharedOptions...), }}, SharedOptions...),
}) })
} }
@ -342,6 +356,8 @@ type Options struct {
Enc encoder.MultiEncoder `config:"encoding"` Enc encoder.MultiEncoder `config:"encoding"`
FetchUntilEmptyPage bool `config:"fetch_until_empty_page"` FetchUntilEmptyPage bool `config:"fetch_until_empty_page"`
PartialPageFetchThreshold int `config:"partial_page_fetch_threshold"` PartialPageFetchThreshold int `config:"partial_page_fetch_threshold"`
ListChunks int `config:"list_chunks"`
RetryErrors fs.CommaSepList `config:"retry_errors"`
} }
// Fs represents a remote swift server // Fs represents a remote swift server
@ -413,7 +429,7 @@ var retryErrorCodes = []int{
// shouldRetry returns a boolean as to whether this err deserves to be // shouldRetry returns a boolean as to whether this err deserves to be
// retried. It returns the err as a convenience // retried. It returns the err as a convenience
func shouldRetry(ctx context.Context, err error) (bool, error) { func shouldRetry(ctx context.Context, err error, retryErrors fs.CommaSepList) (bool, error) {
if fserrors.ContextError(ctx, &err) { if fserrors.ContextError(ctx, &err) {
return false, err return false, err
} }
@ -422,6 +438,12 @@ func shouldRetry(ctx context.Context, err error) (bool, error) {
if slices.Contains(retryErrorCodes, swiftError.StatusCode) { if slices.Contains(retryErrorCodes, swiftError.StatusCode) {
return true, err return true, err
} }
for _, code := range retryErrors {
intCode, intError := strconv.Atoi(code)
if intError == nil && swiftError.StatusCode == intCode {
return true, err
}
}
} }
// Check for generic failure conditions // Check for generic failure conditions
return fserrors.ShouldRetry(err), err return fserrors.ShouldRetry(err), err
@ -430,7 +452,7 @@ func shouldRetry(ctx context.Context, err error) (bool, error) {
// shouldRetryHeaders returns a boolean as to whether this err // shouldRetryHeaders returns a boolean as to whether this err
// deserves to be retried. It reads the headers passed in looking for // deserves to be retried. It reads the headers passed in looking for
// `Retry-After`. It returns the err as a convenience // `Retry-After`. It returns the err as a convenience
func shouldRetryHeaders(ctx context.Context, headers swift.Headers, err error) (bool, error) { func shouldRetryHeaders(ctx context.Context, headers swift.Headers, err error, retryErrors fs.CommaSepList) (bool, error) {
if swiftError, ok := err.(*swift.Error); ok && swiftError.StatusCode == 429 { if swiftError, ok := err.(*swift.Error); ok && swiftError.StatusCode == 429 {
if value := headers["Retry-After"]; value != "" { if value := headers["Retry-After"]; value != "" {
retryAfter, parseErr := strconv.Atoi(value) retryAfter, parseErr := strconv.Atoi(value)
@ -449,7 +471,7 @@ func shouldRetryHeaders(ctx context.Context, headers swift.Headers, err error) (
} }
} }
} }
return shouldRetry(ctx, err) return shouldRetry(ctx, err, retryErrors)
} }
// parsePath parses a remote 'url' // parsePath parses a remote 'url'
@ -598,7 +620,7 @@ func NewFsWithConnection(ctx context.Context, opt *Options, name, root string, c
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers var rxHeaders swift.Headers
info, rxHeaders, err = f.c.Object(ctx, f.rootContainer, encodedDirectory) info, rxHeaders, err = f.c.Object(ctx, f.rootContainer, encodedDirectory)
return shouldRetryHeaders(ctx, rxHeaders, err) return shouldRetryHeaders(ctx, rxHeaders, err, f.opt.RetryErrors)
}) })
if err == nil && info.ContentType != directoryMarkerContentType { if err == nil && info.ContentType != directoryMarkerContentType {
newRoot := path.Dir(f.root) newRoot := path.Dir(f.root)
@ -696,7 +718,7 @@ func (f *Fs) listContainerRoot(ctx context.Context, container, directory, prefix
// Options for ObjectsWalk // Options for ObjectsWalk
opts := swift.ObjectsOpts{ opts := swift.ObjectsOpts{
Prefix: directory, Prefix: directory,
Limit: listChunks, Limit: f.opt.ListChunks,
} }
if !recurse { if !recurse {
opts.Delimiter = '/' opts.Delimiter = '/'
@ -706,7 +728,7 @@ func (f *Fs) listContainerRoot(ctx context.Context, container, directory, prefix
var err error var err error
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
objects, err = f.c.Objects(ctx, container, opts) objects, err = f.c.Objects(ctx, container, opts)
return shouldRetry(ctx, err) return shouldRetry(ctx, err, f.opt.RetryErrors)
}) })
if err == nil { if err == nil {
for i := range objects { for i := range objects {
@ -795,7 +817,7 @@ func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err err
var containers []swift.Container var containers []swift.Container
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
containers, err = f.c.ContainersAll(ctx, nil) containers, err = f.c.ContainersAll(ctx, nil)
return shouldRetry(ctx, err) return shouldRetry(ctx, err, f.opt.RetryErrors)
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("container listing failed: %w", err) return nil, fmt.Errorf("container listing failed: %w", err)
@ -888,7 +910,7 @@ func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) {
var container swift.Container var container swift.Container
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
container, _, err = f.c.Container(ctx, f.rootContainer) container, _, err = f.c.Container(ctx, f.rootContainer)
return shouldRetry(ctx, err) return shouldRetry(ctx, err, f.opt.RetryErrors)
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("container info failed: %w", err) return nil, fmt.Errorf("container info failed: %w", err)
@ -900,7 +922,7 @@ func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) {
var containers []swift.Container var containers []swift.Container
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
containers, err = f.c.ContainersAll(ctx, nil) containers, err = f.c.ContainersAll(ctx, nil)
return shouldRetry(ctx, err) return shouldRetry(ctx, err, f.opt.RetryErrors)
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("container listing failed: %w", err) return nil, fmt.Errorf("container listing failed: %w", err)
@ -957,7 +979,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error {
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers var rxHeaders swift.Headers
_, rxHeaders, err = f.c.Container(ctx, container) _, rxHeaders, err = f.c.Container(ctx, container)
return shouldRetryHeaders(ctx, rxHeaders, err) return shouldRetryHeaders(ctx, rxHeaders, err, f.opt.RetryErrors)
}) })
} }
if err == swift.ContainerNotFound { if err == swift.ContainerNotFound {
@ -967,7 +989,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error {
} }
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
err = f.c.ContainerCreate(ctx, container, headers) err = f.c.ContainerCreate(ctx, container, headers)
return shouldRetry(ctx, err) return shouldRetry(ctx, err, f.opt.RetryErrors)
}) })
if err == nil { if err == nil {
fs.Infof(f, "Container %q created", container) fs.Infof(f, "Container %q created", container)
@ -988,7 +1010,7 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
err := f.cache.Remove(container, func() error { err := f.cache.Remove(container, func() error {
err := f.pacer.Call(func() (bool, error) { err := f.pacer.Call(func() (bool, error) {
err := f.c.ContainerDelete(ctx, container) err := f.c.ContainerDelete(ctx, container)
return shouldRetry(ctx, err) return shouldRetry(ctx, err, f.opt.RetryErrors)
}) })
if err == nil { if err == nil {
fs.Infof(f, "Container %q removed", container) fs.Infof(f, "Container %q removed", container)
@ -1066,7 +1088,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers var rxHeaders swift.Headers
rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil) rxHeaders, err = f.c.ObjectCopy(ctx, srcContainer, srcPath, dstContainer, dstPath, nil)
return shouldRetryHeaders(ctx, rxHeaders, err) return shouldRetryHeaders(ctx, rxHeaders, err, f.opt.RetryErrors)
}) })
} }
if err != nil { if err != nil {
@ -1161,7 +1183,7 @@ func (su *segmentedUpload) uploadManifest(ctx context.Context, contentType strin
err = su.f.pacer.Call(func() (bool, error) { err = su.f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers var rxHeaders swift.Headers
rxHeaders, err = su.f.c.ObjectPut(ctx, su.dstContainer, su.dstPath, emptyReader, true, "", contentType, headers) rxHeaders, err = su.f.c.ObjectPut(ctx, su.dstContainer, su.dstPath, emptyReader, true, "", contentType, headers)
return shouldRetryHeaders(ctx, rxHeaders, err) return shouldRetryHeaders(ctx, rxHeaders, err, su.f.opt.RetryErrors)
}) })
return err return err
} }
@ -1185,7 +1207,7 @@ func (f *Fs) copyLargeObject(ctx context.Context, src *Object, dstContainer stri
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers var rxHeaders swift.Headers
rxHeaders, err = f.c.ObjectCopy(ctx, srcSegmentsContainer, srcSegment, su.container, dstSegment, nil) rxHeaders, err = f.c.ObjectCopy(ctx, srcSegmentsContainer, srcSegment, su.container, dstSegment, nil)
return shouldRetryHeaders(ctx, rxHeaders, err) return shouldRetryHeaders(ctx, rxHeaders, err, f.opt.RetryErrors)
}) })
if err != nil { if err != nil {
return err return err
@ -1332,7 +1354,7 @@ func (o *Object) readMetaData(ctx context.Context) (err error) {
container, containerPath := o.split() container, containerPath := o.split()
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
info, h, err = o.fs.c.Object(ctx, container, containerPath) info, h, err = o.fs.c.Object(ctx, container, containerPath)
return shouldRetryHeaders(ctx, h, err) return shouldRetryHeaders(ctx, h, err, o.fs.opt.RetryErrors)
}) })
if err != nil { if err != nil {
if err == swift.ObjectNotFound { if err == swift.ObjectNotFound {
@ -1388,7 +1410,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
container, containerPath := o.split() container, containerPath := o.split()
return o.fs.pacer.Call(func() (bool, error) { return o.fs.pacer.Call(func() (bool, error) {
err = o.fs.c.ObjectUpdate(ctx, container, containerPath, newHeaders) err = o.fs.c.ObjectUpdate(ctx, container, containerPath, newHeaders)
return shouldRetry(ctx, err) return shouldRetry(ctx, err, o.fs.opt.RetryErrors)
}) })
} }
@ -1409,7 +1431,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
var rxHeaders swift.Headers var rxHeaders swift.Headers
in, rxHeaders, err = o.fs.c.ObjectOpen(ctx, container, containerPath, !isRanging, headers) in, rxHeaders, err = o.fs.c.ObjectOpen(ctx, container, containerPath, !isRanging, headers)
return shouldRetryHeaders(ctx, rxHeaders, err) return shouldRetryHeaders(ctx, rxHeaders, err, o.fs.opt.RetryErrors)
}) })
return return
} }
@ -1493,7 +1515,7 @@ func (o *Object) updateChunks(ctx context.Context, in0 io.Reader, headers swift.
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
var rxHeaders swift.Headers var rxHeaders swift.Headers
rxHeaders, err = o.fs.c.ObjectPut(ctx, su.container, segmentPath, segmentReader, true, "", "", headers) rxHeaders, err = o.fs.c.ObjectPut(ctx, su.container, segmentPath, segmentReader, true, "", "", headers)
return shouldRetryHeaders(ctx, rxHeaders, err) return shouldRetryHeaders(ctx, rxHeaders, err, o.fs.opt.RetryErrors)
}) })
if err != nil { if err != nil {
return err return err
@ -1557,7 +1579,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
var rxHeaders swift.Headers var rxHeaders swift.Headers
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.CallNoRetry(func() (bool, error) {
rxHeaders, err = o.fs.c.ObjectPut(ctx, container, containerPath, in, true, "", contentType, headers) rxHeaders, err = o.fs.c.ObjectPut(ctx, container, containerPath, in, true, "", contentType, headers)
return shouldRetryHeaders(ctx, rxHeaders, err) return shouldRetryHeaders(ctx, rxHeaders, err, o.fs.opt.RetryErrors)
}) })
if err != nil { if err != nil {
return err return err
@ -1622,7 +1644,7 @@ func (o *Object) Remove(ctx context.Context) (err error) {
fs.Errorf(o, "Dangling object - ignoring: %v", err) fs.Errorf(o, "Dangling object - ignoring: %v", err)
err = nil err = nil
} }
return shouldRetry(ctx, err) return shouldRetry(ctx, err, o.fs.opt.RetryErrors)
}) })
if err != nil { if err != nil {
return err return err