mirror of
https://github.com/rclone/rclone.git
synced 2025-04-21 19:38:53 +08:00

This commit modernizes Go usage. This was done with: go run golang.org/x/tools/gopls/internal/analysis/modernize/cmd/modernize@latest -fix -test ./... Then files needed to be `go fmt`ed and a few comments needed to be restored. The modernizations include replacing - if/else conditional assignment by a call to the built-in min or max functions added in go1.21 - sort.Slice(x, func(i, j int) bool) { return s[i] < s[j] } by a call to slices.Sort(s), added in go1.21 - interface{} by the 'any' type added in go1.18 - append([]T(nil), s...) by slices.Clone(s) or slices.Concat(s), added in go1.21 - loop around an m[k]=v map update by a call to one of the Collect, Copy, Clone, or Insert functions from the maps package, added in go1.21 - []byte(fmt.Sprintf...) by fmt.Appendf(nil, ...), added in go1.19 - append(s[:i], s[i+1]...) by slices.Delete(s, i, i+1), added in go1.21 - a 3-clause for i := 0; i < n; i++ {} loop by for i := range n {}, added in go1.22
276 lines
6.6 KiB
Go
276 lines
6.6 KiB
Go
package batcher
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type (
|
|
Result string
|
|
Item string
|
|
)
|
|
|
|
func TestBatcherNew(t *testing.T) {
|
|
ctx := context.Background()
|
|
ci := fs.GetConfig(ctx)
|
|
|
|
opt := Options{
|
|
Mode: "async",
|
|
Size: 100,
|
|
Timeout: 1 * time.Second,
|
|
MaxBatchSize: 1000,
|
|
DefaultTimeoutSync: 500 * time.Millisecond,
|
|
DefaultTimeoutAsync: 10 * time.Second,
|
|
DefaultBatchSizeAsync: 100,
|
|
}
|
|
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
|
|
return nil
|
|
}
|
|
|
|
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
require.True(t, b.Batching())
|
|
b.Shutdown()
|
|
|
|
opt.Mode = "sync"
|
|
b, err = New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
require.True(t, b.Batching())
|
|
b.Shutdown()
|
|
|
|
opt.Mode = "off"
|
|
b, err = New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
require.False(t, b.Batching())
|
|
b.Shutdown()
|
|
|
|
opt.Mode = "bad"
|
|
_, err = New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.ErrorContains(t, err, "batch mode")
|
|
|
|
opt.Mode = "async"
|
|
opt.Size = opt.MaxBatchSize + 1
|
|
_, err = New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.ErrorContains(t, err, "batch size")
|
|
|
|
opt.Mode = "sync"
|
|
opt.Size = 0
|
|
opt.Timeout = 0
|
|
b, err = New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, ci.Transfers, b.opt.Size)
|
|
assert.Equal(t, opt.DefaultTimeoutSync, b.opt.Timeout)
|
|
b.Shutdown()
|
|
|
|
opt.Mode = "async"
|
|
opt.Size = 0
|
|
opt.Timeout = 0
|
|
b, err = New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, opt.DefaultBatchSizeAsync, b.opt.Size)
|
|
assert.Equal(t, opt.DefaultTimeoutAsync, b.opt.Timeout)
|
|
b.Shutdown()
|
|
|
|
// Check we get an error on commit
|
|
_, err = b.Commit(ctx, "last", Item("last"))
|
|
require.ErrorContains(t, err, "shutting down")
|
|
|
|
}
|
|
|
|
func TestBatcherCommit(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
opt := Options{
|
|
Mode: "sync",
|
|
Size: 3,
|
|
Timeout: 1 * time.Second,
|
|
MaxBatchSize: 1000,
|
|
DefaultTimeoutSync: 500 * time.Millisecond,
|
|
DefaultTimeoutAsync: 10 * time.Second,
|
|
DefaultBatchSizeAsync: 100,
|
|
}
|
|
var wg sync.WaitGroup
|
|
errFail := errors.New("fail")
|
|
var commits int
|
|
var totalSize int
|
|
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
|
|
commits += 1
|
|
totalSize += len(items)
|
|
for i := range items {
|
|
if items[i] == "5" {
|
|
errors[i] = errFail
|
|
} else {
|
|
results[i] = Result(items[i]) + " result"
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
defer b.Shutdown()
|
|
|
|
for i := range 10 {
|
|
wg.Add(1)
|
|
s := fmt.Sprintf("%d", i)
|
|
go func() {
|
|
defer wg.Done()
|
|
result, err := b.Commit(ctx, s, Item(s))
|
|
if s == "5" {
|
|
assert.True(t, errors.Is(err, errFail))
|
|
} else {
|
|
require.NoError(t, err)
|
|
assert.Equal(t, Result(s+" result"), result)
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
assert.Equal(t, 4, commits)
|
|
assert.Equal(t, 10, totalSize)
|
|
}
|
|
|
|
func TestBatcherCommitFail(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
opt := Options{
|
|
Mode: "sync",
|
|
Size: 3,
|
|
Timeout: 1 * time.Second,
|
|
MaxBatchSize: 1000,
|
|
DefaultTimeoutSync: 500 * time.Millisecond,
|
|
DefaultTimeoutAsync: 10 * time.Second,
|
|
DefaultBatchSizeAsync: 100,
|
|
}
|
|
var wg sync.WaitGroup
|
|
errFail := errors.New("fail")
|
|
var commits int
|
|
var totalSize int
|
|
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
|
|
commits += 1
|
|
totalSize += len(items)
|
|
return errFail
|
|
}
|
|
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
defer b.Shutdown()
|
|
|
|
for i := range 10 {
|
|
wg.Add(1)
|
|
s := fmt.Sprintf("%d", i)
|
|
go func() {
|
|
defer wg.Done()
|
|
_, err := b.Commit(ctx, s, Item(s))
|
|
assert.True(t, errors.Is(err, errFail))
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
assert.Equal(t, 4, commits)
|
|
assert.Equal(t, 10, totalSize)
|
|
}
|
|
|
|
func TestBatcherCommitShutdown(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
opt := Options{
|
|
Mode: "sync",
|
|
Size: 3,
|
|
Timeout: 1 * time.Second,
|
|
MaxBatchSize: 1000,
|
|
DefaultTimeoutSync: 500 * time.Millisecond,
|
|
DefaultTimeoutAsync: 10 * time.Second,
|
|
DefaultBatchSizeAsync: 100,
|
|
}
|
|
var wg sync.WaitGroup
|
|
var commits int
|
|
var totalSize int
|
|
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
|
|
commits += 1
|
|
totalSize += len(items)
|
|
for i := range items {
|
|
results[i] = Result(items[i])
|
|
}
|
|
return nil
|
|
}
|
|
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
|
|
for i := range 10 {
|
|
wg.Add(1)
|
|
s := fmt.Sprintf("%d", i)
|
|
go func() {
|
|
defer wg.Done()
|
|
result, err := b.Commit(ctx, s, Item(s))
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, Result(s), result)
|
|
}()
|
|
}
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
b.Shutdown() // shutdown with batches outstanding
|
|
|
|
wg.Wait()
|
|
assert.Equal(t, 4, commits)
|
|
assert.Equal(t, 10, totalSize)
|
|
}
|
|
|
|
func TestBatcherCommitAsync(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
opt := Options{
|
|
Mode: "async",
|
|
Size: 3,
|
|
Timeout: 1 * time.Second,
|
|
MaxBatchSize: 1000,
|
|
DefaultTimeoutSync: 500 * time.Millisecond,
|
|
DefaultTimeoutAsync: 10 * time.Second,
|
|
DefaultBatchSizeAsync: 100,
|
|
}
|
|
var wg sync.WaitGroup
|
|
errFail := errors.New("fail")
|
|
var commits atomic.Int32
|
|
var totalSize atomic.Int32
|
|
commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) {
|
|
wg.Add(1)
|
|
defer wg.Done()
|
|
// t.Logf("commit %d", len(items))
|
|
commits.Add(1)
|
|
totalSize.Add(int32(len(items)))
|
|
for i := range items {
|
|
if items[i] == "5" {
|
|
errors[i] = errFail
|
|
} else {
|
|
results[i] = Result(items[i]) + " result"
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
b, err := New[Item, Result](ctx, nil, commitBatch, opt)
|
|
require.NoError(t, err)
|
|
defer b.Shutdown()
|
|
|
|
for i := range 10 {
|
|
wg.Add(1)
|
|
s := fmt.Sprintf("%d", i)
|
|
go func() {
|
|
defer wg.Done()
|
|
result, err := b.Commit(ctx, s, Item(s))
|
|
// Async just returns straight away
|
|
require.NoError(t, err)
|
|
assert.Equal(t, Result(""), result)
|
|
}()
|
|
}
|
|
time.Sleep(2 * time.Second) // wait for batch timeout - needed with async
|
|
wg.Wait()
|
|
|
|
assert.Equal(t, int32(4), commits.Load())
|
|
assert.Equal(t, int32(10), totalSize.Load())
|
|
}
|