local: add filesystem notification support

Adds ChangeNotify() support for the local backend using the fsnotify package.
This commit is contained in:
Lawrence Murray 2025-01-21 14:33:49 +07:00
parent 431386085f
commit 96ec03c24c
7 changed files with 517 additions and 1 deletions

View File

@ -0,0 +1,275 @@
//go:build !windows
package local
import (
"context"
"os"
"path/filepath"
"time"
"github.com/fsnotify/fsnotify"
"github.com/moby/sys/mountinfo"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/walk"
)
// ChangeNotify calls the passed function with a path that has had changes.
// Close the returned channel to stop being notified.
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
// Will not work with an NFS mounted filesystem, error in this case
infos, err := mountinfo.GetMounts(mountinfo.ParentsFilter(f.root))
if err == nil {
for i := 0; i < len(infos); i++ {
if infos[i].FSType == "nfs" {
fs.Error(f, "ChangeNotify does not support NFS mounts")
return
}
}
}
// Create new watcher
watcher, err := fsnotify.NewWatcher()
if err != nil {
fs.Errorf(f, "Failed to create watcher: %s", err)
return
}
// All known files and directories, used to call notifyFunc() with correct
// entry type even on remove and rename events.
known := make(map[string]fs.EntryType)
// Files and directories that have changed in the last poll window.
changed := make(map[string]fs.EntryType)
// Channel to handle new paths. Buffered ensures filesystem events keep
// being consumed.
watchChan := make(chan string)
// Channel to synchronize with the watch goroutine
replyChan := make(chan bool)
// Start goroutine to handle filesystem events
go func() {
// Polling is imitated by accumulating events between ticks. While
// notifyFunc() could be called immediately on each filesystem event,
// accumulating turns out to have some advantages in accurately keeping
// track of entry types (i.e. file or directory), under the
// interpretation that the notifications sent at each tick are a diff of
// the state of the filesystem at that tick compared to the previous. It
// is also assumed by some tests.
var ticker *time.Ticker
var tickerC <-chan time.Time
loop:
for {
select {
case pollInterval, ok := <-pollIntervalChan:
// Update ticker
if !ok {
if ticker != nil {
ticker.Stop()
}
break loop
}
if ticker != nil {
ticker.Stop()
ticker, tickerC = nil, nil
}
if pollInterval != 0 {
ticker = time.NewTicker(pollInterval)
tickerC = ticker.C
}
case <-tickerC:
// notify for all changed paths since last tick
for entryPath, entryType := range changed {
notifyFunc(filepath.ToSlash(entryPath), entryType)
}
changed = make(map[string]fs.EntryType)
case event, ok := <-watcher.Events:
if !ok {
break loop
}
if event.Has(fsnotify.Create) {
fs.Debugf(f, "Create: %s", event.Name)
}
if event.Has(fsnotify.Remove) {
fs.Debugf(f, "Remove: %s", event.Name)
}
if event.Has(fsnotify.Rename) {
fs.Debugf(f, "Rename: %s", event.Name)
}
if event.Has(fsnotify.Write) {
fs.Debugf(f, "Write: %s", event.Name)
}
if event.Has(fsnotify.Chmod) {
fs.Debugf(f, "Chmod: %s", event.Name)
}
if event.Has(fsnotify.Create) {
fs.Debugf(f, "Create: %s", event.Name)
watchChan <- event.Name
<-replyChan // implies mutex on 'known' and 'changed'
} else {
// Determine the entry type (file or directory) using 'known'. This
// is instead of Stat(), say, which is both expensive (a system
// call) and does not work if the entry has been removed (including
// removed before a creation or write event is handled).
entryPath, _ := filepath.Rel(f.root, event.Name)
entryType, ok := known[entryPath]
if !ok {
// By the time the create event was handled for this entry, it was
// already deleted, and it could not be determined whether it was
// a file or directory. It is ignored, as it does not affect the
// state of the filesystem between the previous tick and the next
// tick.
} else {
changed[entryPath] = entryType
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
delete(known, entryPath)
}
}
// Internally, fsnotify stops watching directories that are removed
// or renamed, so it is not necessary to make updates to the watch
// list.
}
case err, ok := <-watcher.Errors:
if !ok {
break loop
}
fs.Errorf(f, "Error: %s", err.Error())
}
}
// Close channels
close(watchChan)
close(replyChan)
// Close watcher
err := watcher.Close()
if err != nil {
fs.Errorf(f, "Failed to close watcher: %s", err)
}
}()
// Start goroutine to establish watchers and update 'known'
go func() {
for {
path, ok := <-watchChan
if !ok {
break
}
// Is this the initial watch?
initial := path == f.root
// Determine entry path
entryPath := ""
if !initial {
entryPath, err = filepath.Rel(f.root, path)
if err != nil {
// Not in this remote
replyChan <- true
continue
}
}
// Determine entry type
entryType := fs.EntryObject
if initial {
// Known to be a directory, but also cannot Lstat() some mounts
entryType = fs.EntryDirectory
} else {
info, err := os.Lstat(path)
if err != nil {
fs.Errorf(f, "Failed to stat %s, already removed? %s", path, err)
replyChan <- true
continue
} else if info.IsDir() {
entryType = fs.EntryDirectory
}
}
// Record known and possibly changed
known[entryPath] = entryType
if !initial {
changed[entryPath] = entryType
}
if entryType == fs.EntryDirectory {
// Recursively watch the directory and populate 'known'
err := watcher.Add(path)
if err != nil {
fs.Errorf(f, "Failed to start watching %s, already removed? %s", path, err)
} else {
fs.Logf(f, "Started watching %s", path)
}
err = walk.Walk(ctx, f, entryPath, false, -1, func(entryPath string, entries fs.DirEntries, err error) error {
if err != nil {
// The entry has already been removed, and we do not know what
// type it was. It can be ignored, as this means it has been both
// created and removed since the last tick, which will not change
// the diff at the next tick.
fs.Errorf(f, "Failed to walk %s, already removed? %s", path, err)
}
for _, d := range entries {
entryPath := d.Remote()
entryType := fs.EntryObject
path := filepath.Join(f.root, entryPath)
info, err := os.Lstat(path)
if err != nil {
fs.Errorf(f, "Failed to stat %s, already removed? %s", path, err)
continue
}
if info.IsDir() {
entryType = fs.EntryDirectory
}
known[entryPath] = entryType
if !initial {
changed[entryPath] = entryType
}
if info.IsDir() {
// Watch the directory.
//
// Establishing a watch on a directory before listing its
// contents ensures that no entries are missed and all changes
// are notified, even for entries created or modified while
// the watch is being established.
//
// An entry may be created between establishing the watch on
// the directory and listing the directory. In this case it is
// marked as changed both by this walk and the subsequent
// handling of the associated filesystem event. Because
// changes are accumulated up to the next tick, however, only
// a single notification is sent at the next tick.
//
// If an entry exists when the walk begins, but is removed
// before the walk reaches it, it is as though that entry
// never existed. But as both occur since the last tick, this
// does not affect the diff at the next tick.
err := watcher.Add(path)
if err != nil {
fs.Errorf(f, "Failed to start watching %s, already removed? %s", entryPath, err)
} else {
fs.Logf(f, "Started watching %s", entryPath)
}
}
}
return nil
})
if err != nil {
fs.Errorf(f, "Failed to walk %s, already removed? %s", entryPath, err)
}
}
replyChan <- true
}
}()
// Recursively watch all subdirectories from the root
watchChan <- f.root
// Wait until initial watch is established before returning
<-replyChan
}

View File

@ -0,0 +1,222 @@
//go:build windows
package local
import (
"context"
"os"
"path/filepath"
"time"
_ "unsafe" // use go:linkname
"github.com/fsnotify/fsnotify"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/walk"
)
// Hack to enable recursive watchers in fsnotify, which are available for
// Windows and Linux (although not quite what is needed for Linux here), but
// not yet Mac, hence there is no public interface for this in fsnotify just
// yet. This is currently only needed for, and enabled for, Windows builds.
//
// Setting fsnotify.enableRecurse to true enables recursive handling: paths
// that end with with \... or /... as watched recursively.
//go:linkname enableRecurse github.com/fsnotify/fsnotify.enableRecurse
var enableRecurse bool
// ChangeNotify calls the passed function with a path that has had changes.
// Close the returned channel to stop being notified.
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
// Create new watcher
watcher, err := fsnotify.NewWatcher()
if err != nil {
fs.Errorf(f, "Failed to create watcher: %s", err)
return
}
// Recursive watch of base directory. This is indicated by appending \...
// or /... to the path.
enableRecurse = true
err = watcher.Add(filepath.Join(f.root, "..."))
if err != nil {
fs.Errorf(f, "Failed to start watching %s: %s", f.root, err)
} else {
fs.Debugf(f, "Started watching %s", f.root)
}
// All known files and directories, used to call notifyFunc() with correct
// entry type even on remove and rename events.
known := make(map[string]fs.EntryType)
// Files and directories that have changed in the last poll window.
changed := make(map[string]fs.EntryType)
// Walk the root directory to populate 'known'
known[""] = fs.EntryDirectory
err = walk.Walk(ctx, f, "", false, -1, func(entryPath string, entries fs.DirEntries, err error) error {
if err != nil {
fs.Errorf(f, "Failed to walk %s, already removed? %s", entryPath, err)
} else {
entryType := fs.EntryObject
path := filepath.Join(f.root, entryPath)
info, err := os.Lstat(path)
if err != nil {
fs.Errorf(f, "Failed to stat %s, already removed? %s", path, err)
} else {
if info.IsDir() {
entryType = fs.EntryDirectory
}
known[entryPath] = entryType
}
}
return nil
})
if err != nil {
fs.Errorf(f, "Failed to walk root, already removed? %s", err)
}
// Start goroutine to handle filesystem events
go func() {
// Polling is imitated by accumulating events between ticks. While
// notifyFunc() could be called immediately on each filesystem event,
// accumulating turns out to have some advantages in accurately keeping
// track of entry types (i.e. file or directory), under the
// interpretation that the notifications sent at each tick are a diff of
// the state of the filesystem at that tick compared to the previous. It
// is also assumed by some tests.
var ticker *time.Ticker
var tickerC <-chan time.Time
loop:
for {
select {
case pollInterval, ok := <-pollIntervalChan:
// Update ticker
if !ok {
if ticker != nil {
ticker.Stop()
}
break loop
}
if ticker != nil {
ticker.Stop()
ticker, tickerC = nil, nil
}
if pollInterval != 0 {
ticker = time.NewTicker(pollInterval)
tickerC = ticker.C
}
case <-tickerC:
// notify for all changed paths since last tick
for entryPath, entryType := range changed {
notifyFunc(filepath.ToSlash(entryPath), entryType)
}
changed = make(map[string]fs.EntryType)
case event, ok := <-watcher.Events:
if !ok {
break loop
}
if event.Has(fsnotify.Create) {
fs.Debugf(f, "Create: %s", event.Name)
}
if event.Has(fsnotify.Remove) {
fs.Debugf(f, "Remove: %s", event.Name)
}
if event.Has(fsnotify.Rename) {
fs.Debugf(f, "Rename: %s", event.Name)
}
if event.Has(fsnotify.Write) {
fs.Debugf(f, "Write: %s", event.Name)
}
if event.Has(fsnotify.Chmod) {
fs.Debugf(f, "Chmod: %s", event.Name)
}
// Determine the entry type (file or directory) using 'known'. This
// is instead of Stat(), say, which is both expensive (a system
// call) and does not work if the entry has been removed (including
// removed before a creation or write event is handled).
entryPath, _ := filepath.Rel(f.root, event.Name)
entryType := fs.EntryObject
if event.Has(fsnotify.Create) {
// Stat to determine whether entry is a file or directory
info, err := os.Lstat(event.Name)
if err != nil {
// Entry has already been deleted, so cannot determine whether it
// was a file or directory. It is ignored, as it does not affect
// the diff at the next tick.
} else if info.IsDir() {
entryType = fs.EntryDirectory
known[entryPath] = entryType
changed[entryPath] = entryType
// TODO: Recursively add to 'known' and 'changed'
//
// The issue here is that the walk triggers errors, "The
// process cannot access the file because it is being
// used by another process."
//
// err = walk.Walk(ctx, f, entryPath, false, -1, func(entryPath string, entries fs.DirEntries, err error) error {
// if err != nil {
// fs.Errorf(f, "Failed to walk %s, already removed? %s", entryPath, err)
// } else {
// entryType := fs.EntryObject
// path := filepath.Join(f.root, entryPath)
// info, err := os.Lstat(path)
// if err != nil {
// fs.Errorf(f, "Failed to stat %s, already removed? %s", path, err)
// } else {
// if info.IsDir() {
// entryType = fs.EntryDirectory
// }
// known[entryPath] = entryType
// }
// }
// return nil
// })
// if err != nil {
// fs.Errorf(f, "Failed to walk %s, already removed? %s", entryPath, err)
// }
} else {
known[entryPath] = entryType
changed[entryPath] = entryType
}
} else {
entryType, ok := known[entryPath]
if !ok {
// By the time the create event was handled for this
// entry, it was already removed, and it could not be
// determined whether it was a file or directory. It is
// ignored, as it does not affect the diff at the next
// tick.
} else {
changed[entryPath] = entryType
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
delete(known, entryPath)
// TODO: Recursively remove from 'known' and
// add to 'changed'.
}
}
// Internally, fsnotify stops watching directories that are
// removed or renamed, so it is not necessary to make
// updates to the watch list.
}
case err, ok := <-watcher.Errors:
if !ok {
break loop
}
fs.Errorf(f, "Error: %s", err.Error())
}
}
// Close watcher
err := watcher.Close()
if err != nil {
fs.Errorf(f, "Failed to close watcher: %s", err)
}
}()
}

View File

@ -1700,6 +1700,7 @@ var (
_ fs.OpenWriterAter = &Fs{}
_ fs.DirSetModTimer = &Fs{}
_ fs.MkdirMetadataer = &Fs{}
_ fs.ChangeNotifier = &Fs{}
_ fs.Object = &Object{}
_ fs.Metadataer = &Object{}
_ fs.SetMetadataer = &Object{}

View File

@ -20,6 +20,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -878,17 +879,20 @@ func Run(t *testing.T, opt *Opt) {
pollInterval := make(chan time.Duration)
dirChanges := map[string]struct{}{}
objChanges := map[string]struct{}{}
var mutex sync.Mutex
doChangeNotify(ctx, func(x string, e fs.EntryType) {
fs.Debugf(nil, "doChangeNotify(%q, %+v)", x, e)
if strings.HasPrefix(x, file1.Path[:5]) || strings.HasPrefix(x, file2.Path[:5]) {
fs.Debugf(nil, "Ignoring notify for file1 or file2: %q, %v", x, e)
return
}
mutex.Lock()
if e == fs.EntryDirectory {
dirChanges[x] = struct{}{}
} else if e == fs.EntryObject {
objChanges[x] = struct{}{}
}
mutex.Unlock()
}, pollInterval)
defer func() { close(pollInterval) }()
pollInterval <- time.Second
@ -928,7 +932,9 @@ func Run(t *testing.T, opt *Opt) {
wantObjChanges := []string{"dir/file2", "dir/file4", "dir/file3"}
ok := false
for tries := 1; tries < 10; tries++ {
mutex.Lock()
ok = contains(dirChanges, wantDirChanges) && contains(objChanges, wantObjChanges)
mutex.Unlock()
if ok {
break
}

1
go.mod
View File

@ -143,6 +143,7 @@ require (
github.com/emersion/go-vcard v0.0.0-20230815062825-8fda7d206ec9 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/flynn/noise v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
github.com/gdamore/encoding v1.0.1 // indirect
github.com/geoffgarside/ber v1.1.0 // indirect
github.com/go-ini/ini v1.67.0 // indirect

2
go.sum
View File

@ -241,6 +241,8 @@ github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0X
github.com/frankban/quicktest v1.14.4/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/gdamore/encoding v1.0.1 h1:YzKZckdBL6jVt2Gc+5p82qhrGiqMdG/eNs6Wy0u3Uhw=

View File

@ -3,6 +3,7 @@ package vfs
import (
"context"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/rc"
@ -97,7 +98,15 @@ func TestRcPollInterval(t *testing.T) {
}
out, err := call.Fn(context.Background(), nil)
require.NoError(t, err)
assert.Equal(t, rc.Params{}, out)
assert.Equal(t, rc.Params{
"enabled": true,
"interval": map[string]interface{}{
"raw": fs.Duration(60000000000),
"seconds": time.Duration(60),
"string": "1m0s",
},
"supported": true,
}, out)
// FIXME needs more tests
}