error handler

This commit is contained in:
ns2kracy 2025-02-21 15:41:38 +08:00
parent 3afc107792
commit 4af421a0f4
No known key found for this signature in database
GPG Key ID: 3E6E5EE4AFFF61DB
2 changed files with 100 additions and 97 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/dircache"
@ -109,7 +110,7 @@ type Fs struct {
root string // the path we are working on
opt Options // parsed options
features *fs.Features // optional features
client *AdriveClient // Aliyun Drive client
client *rest.Client // Aliyun Drive client
dirCache *dircache.DirCache // Map of directory path to directory id
pacer *fs.Pacer // pacer for API calls
tokenRenewer *oauthutil.Renew // renew the token on expiry
@ -139,7 +140,6 @@ type Options struct {
ChunkSize fs.SizeSuffix `config:"chunk_size"`
RemoveWay string `config:"remove_way"`
CheckNameMode string `config:"check_name_mode"`
TokenType string `config:"token_type"`
AccessToken string `config:"access_token"`
RefreshToken string `config:"refresh_token"`
ExpiresAt string `config:"expires_at"`
@ -290,7 +290,7 @@ func (f *Fs) getDriveInfo(ctx context.Context) error {
return fmt.Errorf("failed to get driveinfo: %w", err)
}
f.client.driveID = info.DefaultDriveID
f.driveID = info.DefaultDriveID
return nil
}
@ -558,7 +558,6 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
if err != nil {
return nil, err
}
var iErr error
list, err := f.listAll(ctx, directoryID)
for _, info := range list {
@ -578,9 +577,7 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
if err != nil {
return nil, err
}
if iErr != nil {
return nil, iErr
}
return entries, nil
}
@ -805,6 +802,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
// Create HTTP client
client := fshttp.NewClient(ctx)
var ts *oauthutil.TokenSource
if opt.AccessToken == "" {
@ -819,19 +817,28 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
name: name,
root: root,
opt: *opt,
client: NewAdriveClient(client, rootURL),
client: rest.NewClient(client),
m: m,
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
pacer: fs.NewPacer(
context.Background(),
pacer.NewDefault(
pacer.MinSleep(minSleep),
pacer.MaxSleep(maxSleep),
pacer.DecayConstant(decayConstant),
),
),
}
f.features = (&fs.Features{
CaseInsensitive: true,
CanHaveEmptyDirectories: true,
}).Fill(ctx, f)
f.client.SetErrorHandler(errorHandler)
f.client.SetRoot(rootURL)
// Set up authentication
if f.opt.AccessToken != "" {
f.client.c.SetHeader("Authorization", "Bearer "+f.opt.AccessToken)
f.client.SetHeader("Authorization", "Bearer "+f.opt.AccessToken)
}
if ts != nil {
@ -887,6 +894,89 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return f, nil
}
// Call makes a call to the API using the params passed in
func (f *Fs) Call(ctx context.Context, opts *rest.Opts) (resp *http.Response, err error) {
return f.CallWithPacer(ctx, opts, f.pacer)
}
// CallWithPacer makes a call to the API using the params passed in using the pacer passed in
func (f *Fs) CallWithPacer(ctx context.Context, opts *rest.Opts, pacer *fs.Pacer) (resp *http.Response, err error) {
err = pacer.Call(func() (bool, error) {
resp, err = f.client.Call(ctx, opts)
return shouldRetry(ctx, resp, err)
})
return resp, err
}
// CallJSON makes an API call and decodes the JSON return packet into response
func (f *Fs) CallJSON(ctx context.Context, opts *rest.Opts, request interface{}, response interface{}) (resp *http.Response, err error) {
return f.CallJSONWithPacer(ctx, opts, f.pacer, request, response)
}
// CallJSONWithPacer makes an API call and decodes the JSON return packet into response using the pacer passed in
func (f *Fs) CallJSONWithPacer(ctx context.Context, opts *rest.Opts, pacer *fs.Pacer, request interface{}, response interface{}) (resp *http.Response, err error) {
err = pacer.Call(func() (bool, error) {
resp, err = f.client.CallJSON(ctx, opts, request, response)
return shouldRetry(ctx, resp, err)
})
return resp, err
}
var retryErrorCodes = []int{
403,
404,
429, // Too Many Requests
500, // Internal Server Error
502, // Bad Gateway
503, // Service Unavailable
504, // Gateway Timeout
509, // Bandwidth Limit Exceeded
}
// shouldRetry returns true if err is nil, or if it's a retryable error
func shouldRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
if fserrors.ContextError(ctx, &err) {
return false, err
}
authRetry := false
if resp != nil && resp.StatusCode == 401 && strings.Contains(resp.Header.Get("Www-Authenticate"), "expired_token") {
authRetry = true
fs.Debugf(nil, "Should retry: %v", err)
}
if _, ok := err.(*api.Error); ok {
fs.Debugf(nil, "Retrying API error %v", err)
return true, err
}
return authRetry || fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
}
// errorHandler parses a non 2xx error response into an error
func errorHandler(resp *http.Response) error {
// Decode error response
apiErr := new(api.Error)
err := rest.DecodeJSON(resp, &apiErr)
if err != nil {
fs.Debugf(nil, "Failed to decode error response: %v", err)
// If we can't decode the error response, create a basic error
apiErr.Code = resp.StatusCode
apiErr.Message = resp.Status
return apiErr
}
// Ensure we have an error code and message
if apiErr.Code == 0 {
apiErr.Code = resp.StatusCode
}
if apiErr.Message == "" {
apiErr.Message = resp.Status
}
return apiErr
}
// Check interfaces
var (
_ fs.Abouter = (*Fs)(nil)

View File

@ -3,12 +3,9 @@ package adrive
import (
"context"
"net/http"
"strings"
"sync"
"github.com/rclone/rclone/backend/adrive/api"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/rest"
)
@ -44,87 +41,3 @@ func NewAdriveClient(c *http.Client, rootURL string) *AdriveClient {
return client
}
// Call makes a call to the API using the params passed in
func (c *AdriveClient) Call(ctx context.Context, opts *rest.Opts) (resp *http.Response, err error) {
return c.CallWithPacer(ctx, opts, c.pacer)
}
// CallWithPacer makes a call to the API using the params passed in using the pacer passed in
func (c *AdriveClient) CallWithPacer(ctx context.Context, opts *rest.Opts, pacer *fs.Pacer) (resp *http.Response, err error) {
err = pacer.Call(func() (bool, error) {
resp, err = c.c.Call(ctx, opts)
return shouldRetry(ctx, resp, err)
})
return resp, err
}
// CallJSON makes an API call and decodes the JSON return packet into response
func (c *AdriveClient) CallJSON(ctx context.Context, opts *rest.Opts, request interface{}, response interface{}) (resp *http.Response, err error) {
return c.CallJSONWithPacer(ctx, opts, c.pacer, request, response)
}
// CallJSONWithPacer makes an API call and decodes the JSON return packet into response using the pacer passed in
func (c *AdriveClient) CallJSONWithPacer(ctx context.Context, opts *rest.Opts, pacer *fs.Pacer, request interface{}, response interface{}) (resp *http.Response, err error) {
err = pacer.Call(func() (bool, error) {
resp, err = c.c.CallJSON(ctx, opts, request, response)
return shouldRetry(ctx, resp, err)
})
return resp, err
}
var retryErrorCodes = []int{
403,
404,
429, // Too Many Requests
500, // Internal Server Error
502, // Bad Gateway
503, // Service Unavailable
504, // Gateway Timeout
509, // Bandwidth Limit Exceeded
}
// shouldRetry returns true if err is nil, or if it's a retryable error
func shouldRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
if fserrors.ContextError(ctx, &err) {
return false, err
}
authRetry := false
if resp != nil && resp.StatusCode == 401 && strings.Contains(resp.Header.Get("Www-Authenticate"), "expired_token") {
authRetry = true
fs.Debugf(nil, "Should retry: %v", err)
}
// TODO
if _, ok := err.(*api.Error); ok {
fs.Debugf(nil, "Retrying API error %v", err)
return true, err
}
return authRetry || fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
}
// errorHandler parses a non 2xx error response into an error
func errorHandler(resp *http.Response) error {
// Decode error response
apiErr := new(api.Error)
err := rest.DecodeJSON(resp, &apiErr)
if err != nil {
fs.Debugf(nil, "Failed to decode error response: %v", err)
// If we can't decode the error response, create a basic error
apiErr.Code = resp.StatusCode
apiErr.Message = resp.Status
return apiErr
}
// Ensure we have an error code and message
if apiErr.Code == 0 {
apiErr.Code = resp.StatusCode
}
if apiErr.Message == "" {
apiErr.Message = resp.Status
}
return apiErr
}