rclone/fs/list/sorter.go
Nick Craig-Wood 7523300ef6
Some checks failed
build / windows (push) Has been cancelled
build / other_os (push) Has been cancelled
build / mac_amd64 (push) Has been cancelled
build / mac_arm64 (push) Has been cancelled
build / linux (push) Has been cancelled
build / go1.23 (push) Has been cancelled
build / linux_386 (push) Has been cancelled
build / lint (push) Has been cancelled
build / android-all (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/386 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/amd64 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm/v6 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm/v7 (push) Has been cancelled
Build & Push Docker Images / Build Docker Image for linux/arm64 (push) Has been cancelled
Build & Push Docker Images / Merge & Push Final Docker Image (push) Has been cancelled
sync: implement --list-cutoff to allow on disk sorting for reduced memory use
Before this change, rclone had to load an entire directory into RAM in
order to sort it so it could be synced.

With directories with millions of entries, this used too much memory.

This fixes the probem by using an on disk sort when there are more
than --list-cutoff entries in a directory.

Fixes #7974
2025-04-08 15:18:08 +01:00

343 lines
9.4 KiB
Go

package list
import (
"cmp"
"context"
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"
"github.com/lanrat/extsort"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/lib/errcount"
"golang.org/x/sync/errgroup"
)
// NewObjecter is the minimum facilities we need from the fs.Fs passed into NewSorter.
type NewObjecter interface {
// NewObject finds the Object at remote. If it can't be found
// it returns the error ErrorObjectNotFound.
NewObject(ctx context.Context, remote string) (fs.Object, error)
}
// Sorter implements an efficient mechanism for sorting list entries.
//
// If there are a large number of entries (above `--list-cutoff`),
// this may be done on disk instead of in memory.
//
// Supply entries with the Add method, call Send at the end to deliver
// the sorted entries and finalise with CleanUp regardless of whether
// you called Add or Send.
//
// Sorted entries are delivered to the callback supplied to NewSorter
// when the Send method is called.
type Sorter struct {
ctx context.Context // context for everything
ci *fs.ConfigInfo // config we are using
cancel func() // cancel all background operations
mu sync.Mutex // protect the below
f NewObjecter // fs that we are listing
callback fs.ListRCallback // where to send the sorted entries to
entries fs.DirEntries // accumulated entries
keyFn KeyFn // transform an entry into a sort key
cutoff int // number of entries above which we start extsort
extSort bool // true if we are ext sorting
inputChan chan string // for sending data to the ext sort
outputChan chan string // for receiving data from the ext sort
errChan chan error // for getting errors from the ext sort
sorter *extsort.StringSorter // external string sort
errs *errcount.ErrCount // accumulate errors
}
// KeyFn turns an entry into a sort key
type KeyFn func(entry fs.DirEntry) string
// identityKeyFn maps an entry to its Remote
func identityKeyFn(entry fs.DirEntry) string {
return entry.Remote()
}
// NewSorter creates a new Sorter with callback for sorted entries to
// be delivered to. keyFn is used to process each entry to get a key
// function, if nil then it will just use entry.Remote()
func NewSorter(ctx context.Context, f NewObjecter, callback fs.ListRCallback, keyFn KeyFn) (*Sorter, error) {
ci := fs.GetConfig(ctx)
ctx, cancel := context.WithCancel(ctx)
if keyFn == nil {
keyFn = identityKeyFn
}
return &Sorter{
ctx: ctx,
ci: ci,
cancel: cancel,
f: f,
callback: callback,
keyFn: keyFn,
cutoff: ci.ListCutoff,
errs: errcount.New(),
}, nil
}
// Turn a directory entry into a combined key and data for extsort
func (ls *Sorter) entryToKey(entry fs.DirEntry) string {
// To start with we just use the Remote to recover the object
// To make more efficient we would serialize the object here
remote := entry.Remote()
remote = strings.TrimRight(remote, "/")
if _, isDir := entry.(fs.Directory); isDir {
remote += "/"
}
key := ls.keyFn(entry) + "\x00" + remote
return key
}
// Turn an exsort key back into a directory entry
func (ls *Sorter) keyToEntry(ctx context.Context, key string) (entry fs.DirEntry, err error) {
null := strings.IndexRune(key, '\x00')
if null < 0 {
return nil, errors.New("sorter: failed to deserialize: missing null")
}
remote := key[null+1:]
if remote, isDir := strings.CutSuffix(remote, "/"); isDir {
// Is a directory
//
// Note this creates a very minimal directory entry which should be fine for the
// bucket based remotes this code will be run on.
entry = fs.NewDir(remote, time.Time{})
} else {
obj, err := ls.f.NewObject(ctx, remote)
if err != nil {
fs.Errorf(ls.f, "sorter: failed to re-create object %q: %v", remote, err)
return nil, fmt.Errorf("sorter: failed to re-create object: %w", err)
}
entry = obj
}
return entry, nil
}
func (ls *Sorter) sendEntriesToExtSort(entries fs.DirEntries) (err error) {
for _, entry := range entries {
select {
case ls.inputChan <- ls.entryToKey(entry):
case err = <-ls.errChan:
if err != nil {
return err
}
}
}
select {
case err = <-ls.errChan:
default:
}
return err
}
func (ls *Sorter) startExtSort() (err error) {
fs.Logf(ls.f, "Switching to on disk sorting as more than %d entries in one directory detected", ls.cutoff)
ls.inputChan = make(chan string, 100)
// Options to control the extsort
opt := extsort.Config{
NumWorkers: 8, // small effect
ChanBuffSize: 1024, // small effect
SortedChanBuffSize: 1024, // makes a lot of difference
ChunkSize: 32 * 1024, // tuned for 50 char records (UUID sized)
// Defaults
// ChunkSize: int(1e6), // amount of records to store in each chunk which will be written to disk
// NumWorkers: 2, // maximum number of workers to use for parallel sorting
// ChanBuffSize: 1, // buffer size for merging chunks
// SortedChanBuffSize: 10, // buffer size for passing records to output
// TempFilesDir: "", // empty for use OS default ex: /tmp
}
ls.sorter, ls.outputChan, ls.errChan = extsort.Strings(ls.inputChan, &opt)
go ls.sorter.Sort(ls.ctx)
// Show we are extsorting now
ls.extSort = true
// Send the accumulated entries to the sorter
fs.Debugf(ls.f, "Sending accumulated directory entries to disk")
err = ls.sendEntriesToExtSort(ls.entries)
fs.Debugf(ls.f, "Done sending accumulated directory entries to disk")
clear(ls.entries)
ls.entries = nil
return err
}
// Add entries to the list sorter.
//
// Does not call the callback.
//
// Safe to call from concurrent go routines
func (ls *Sorter) Add(entries fs.DirEntries) error {
ls.mu.Lock()
defer ls.mu.Unlock()
if ls.extSort {
err := ls.sendEntriesToExtSort(entries)
if err != nil {
return err
}
} else {
ls.entries = append(ls.entries, entries...)
if len(ls.entries) >= ls.cutoff {
err := ls.startExtSort()
if err != nil {
return err
}
}
}
return nil
}
// Number of entries to batch in list helper
const listHelperBatchSize = 100
// listHelper is used to turn keys into entries concurrently
type listHelper struct {
ls *Sorter // parent
keys []string // keys being built up
entries fs.DirEntries // entries processed concurrently as a batch
errs []error // errors processed concurrently
}
// NewlistHelper should be with the callback passed in
func (ls *Sorter) newListHelper() *listHelper {
return &listHelper{
ls: ls,
entries: make(fs.DirEntries, listHelperBatchSize),
errs: make([]error, listHelperBatchSize),
}
}
// send sends the stored entries to the callback if there are >= max
// entries.
func (lh *listHelper) send(max int) (err error) {
if len(lh.keys) < max {
return nil
}
// Turn this batch into objects in parallel
g, gCtx := errgroup.WithContext(lh.ls.ctx)
g.SetLimit(lh.ls.ci.Checkers)
for i, key := range lh.keys {
i, key := i, key // can remove when go1.22 is minimum version
g.Go(func() error {
lh.entries[i], lh.errs[i] = lh.ls.keyToEntry(gCtx, key)
return nil
})
}
err = g.Wait()
if err != nil {
return err
}
// Account errors and collect OK entries
toSend := lh.entries[:0]
for i := range lh.keys {
entry, err := lh.entries[i], lh.errs[i]
if err != nil {
lh.ls.errs.Add(err)
} else if entry != nil {
toSend = append(toSend, entry)
}
}
// fmt.Println(lh.keys)
// fmt.Println(toSend)
err = lh.ls.callback(toSend)
clear(lh.entries)
clear(lh.errs)
lh.keys = lh.keys[:0]
return err
}
// Add an entry to the stored entries and send them if there are more
// than a certain amount
func (lh *listHelper) Add(key string) error {
lh.keys = append(lh.keys, key)
return lh.send(100)
}
// Flush the stored entries (if any) sending them to the callback
func (lh *listHelper) Flush() error {
return lh.send(1)
}
// Send the sorted entries to the callback.
func (ls *Sorter) Send() (err error) {
ls.mu.Lock()
defer ls.mu.Unlock()
if ls.extSort {
close(ls.inputChan)
list := ls.newListHelper()
outer:
for {
select {
case key, ok := <-ls.outputChan:
if !ok {
break outer
}
err := list.Add(key)
if err != nil {
return err
}
case err := <-ls.errChan:
if err != nil {
return err
}
}
}
err = list.Flush()
if err != nil {
return err
}
return ls.errs.Err("sorter")
}
// Sort the directory entries by Remote
//
// We use a stable sort here just in case there are
// duplicates. Assuming the remote delivers the entries in a
// consistent order, this will give the best user experience
// in syncing as it will use the first entry for the sync
// comparison.
slices.SortStableFunc(ls.entries, func(a, b fs.DirEntry) int {
return cmp.Compare(ls.keyFn(a), ls.keyFn(b))
})
return ls.callback(ls.entries)
}
// CleanUp the Sorter, cleaning up any memory / files.
//
// It is safe and encouraged to call this regardless of whether you
// called Send or not.
//
// This does not call the callback
func (ls *Sorter) CleanUp() {
ls.mu.Lock()
defer ls.mu.Unlock()
ls.cancel()
clear(ls.entries)
ls.entries = nil
ls.extSort = false
}
// SortToChan makes a callback for the Sorter which sends the output
// to the channel provided.
func SortToChan(out chan<- fs.DirEntry) fs.ListRCallback {
return func(entries fs.DirEntries) error {
for _, entry := range entries {
out <- entry
}
return nil
}
}