Merge 1aede367d1718aaaf87646b5d3b8182994b6ff8d into 0b9671313b14ffe839ecbd7dd2ae5ac7f6f05db8

This commit is contained in:
Georg Welzel 2025-04-14 18:37:01 +08:00 committed by GitHub
commit a84d31f007
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 64 additions and 33 deletions

View File

@ -1352,10 +1352,11 @@ func (w *writerAt) Close() error {
// Pass in the remote desired and the size if known.
//
// It truncates any existing object
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
info := fs.OpenWriterAtInfo{}
err := f.mkParentDir(ctx, remote)
if err != nil {
return nil, fmt.Errorf("OpenWriterAt: failed to create parent directory: %w", err)
return info, nil, fmt.Errorf("OpenWriterAt: failed to create parent directory: %w", err)
}
fc := f.fileClient(remote)
if size < 0 {
@ -1363,7 +1364,7 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
}
_, err = fc.Create(ctx, size, nil)
if err != nil {
return nil, fmt.Errorf("OpenWriterAt: unable to create file: %w", err)
return info, nil, fmt.Errorf("OpenWriterAt: unable to create file: %w", err)
}
w := &writerAt{
ctx: ctx,
@ -1371,7 +1372,7 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
fc: fc,
size: size,
}
return w, nil
return info, w, nil
}
// About gets quota information

View File

@ -1061,14 +1061,14 @@ func (f *Fs) CleanUp(ctx context.Context) error {
// Pass in the remote desired and the size if known.
//
// It truncates any existing object
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
u, uRemote, err := f.findUpstream(remote)
if err != nil {
return nil, err
return fs.OpenWriterAtInfo{}, nil, err
}
do := u.f.Features().OpenWriterAt
if do == nil {
return nil, fs.ErrorNotImplemented
return fs.OpenWriterAtInfo{}, nil, fs.ErrorNotImplemented
}
return do(ctx, uRemote, size)
}

View File

@ -1486,22 +1486,23 @@ var sparseWarning sync.Once
// Pass in the remote desired and the size if known.
//
// It truncates any existing object
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
// Temporary Object under construction
o := f.newObject(remote)
info := fs.OpenWriterAtInfo{}
err := o.mkdirAll()
if err != nil {
return nil, err
return info, nil, err
}
if o.translatedLink {
return nil, errors.New("can't open a symlink for random writing")
return info, nil, errors.New("can't open a symlink for random writing")
}
out, err := file.OpenFile(o.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil {
return nil, err
return info, nil, err
}
// Pre-allocate the file for performance reasons
if !f.opt.NoPreAllocate {
@ -1521,7 +1522,7 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
}
}
return out, nil
return info, out, nil
}
// setMetadata sets the file info from the os.FileInfo passed in

View File

@ -383,22 +383,28 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
// Pass in the remote desired and the size if known.
//
// It truncates any existing object
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
info := fs.OpenWriterAtInfo{
BufferSize: int64(16 * 1024 * 1024),
ChunkSize: int64(16 * 1024 * 1024),
LeavePartsOnError: true,
}
client, err := f.newSingleConnClient(ctx)
if err != nil {
return nil, fmt.Errorf("create client: %w", err)
return info, nil, fmt.Errorf("create client: %w", err)
}
// init an empty file
leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, true)
if err != nil {
return nil, fmt.Errorf("resolve src: %w", err)
return info, nil, fmt.Errorf("resolve src: %w", err)
}
openResult, err := fileOpenNew(ctx, client, f, directoryID, leaf)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
return info, nil, fmt.Errorf("open file: %w", err)
}
if _, err := fileClose(ctx, client, f.pacer, openResult.FileDescriptor); err != nil {
return nil, fmt.Errorf("close file: %w", err)
return info, nil, fmt.Errorf("close file: %w", err)
}
writer := &writerAt{
@ -409,7 +415,7 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
fileID: openResult.Fileid,
}
return writer, nil
return info, writer, nil
}
// Create a new http client, accepting keep-alive headers, limited to single connection.

View File

@ -182,7 +182,7 @@ type Features struct {
// Pass in the remote desired and the size if known.
//
// It truncates any existing object
OpenWriterAt func(ctx context.Context, remote string, size int64) (WriterAtCloser, error)
OpenWriterAt OpenWriterAtFn
// OpenChunkWriter returns the chunk size and a ChunkWriter
//
@ -728,11 +728,19 @@ type OpenWriterAter interface {
// Pass in the remote desired and the size if known.
//
// It truncates any existing object
OpenWriterAt(ctx context.Context, remote string, size int64) (WriterAtCloser, error)
OpenWriterAt(ctx context.Context, remote string, size int64) (OpenWriterAtInfo, WriterAtCloser, error)
}
// OpenWriterAtInfo describes how a backend would like ChunkWriter called
type OpenWriterAtInfo struct {
BufferSize int64 // preferred buffer size
ChunkSize int64 // preferred chunk size
Concurrency int // how many chunks to write at once
LeavePartsOnError bool // if set don't delete parts uploaded so far on error
}
// OpenWriterAtFn describes the OpenWriterAt function pointer
type OpenWriterAtFn func(ctx context.Context, remote string, size int64) (WriterAtCloser, error)
type OpenWriterAtFn func(ctx context.Context, remote string, size int64) (OpenWriterAtInfo, WriterAtCloser, error)
// ChunkWriterInfo describes how a backend would like ChunkWriter called
type ChunkWriterInfo struct {

View File

@ -135,7 +135,9 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
if openWriterAt == nil {
return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported")
}
openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f)
openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, f)
// If we are using OpenWriterAt we don't seek the chunks so don't need to buffer
fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt")
noBuffering = true
@ -343,31 +345,44 @@ func (w *writerAtChunkWriter) Abort(ctx context.Context) error {
}
// openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize
func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize int64, writeBufferSize int64, f fs.Fs) fs.OpenChunkWriterFn {
func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, f fs.Fs) fs.OpenChunkWriterFn {
return func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
ci := fs.GetConfig(ctx)
writerAt, err := openWriterAt(ctx, remote, src.Size())
writerAtInfo, writerAt, err := openWriterAt(ctx, remote, src.Size())
if err != nil {
return info, nil, err
}
if !fs.ConfigOptionsInfo.Get("multi_thread_chunk_size").IsDefault() || writerAtInfo.ChunkSize == 0 {
// if user did provided a specifc value or the backend didn't provide hint, use config value
writerAtInfo.ChunkSize = int64(ci.MultiThreadChunkSize)
}
if !fs.ConfigOptionsInfo.Get("multi_thread_write_buffer_size").IsDefault() || writerAtInfo.BufferSize == 0 {
// if user did provided a specifc value or the backend didn't provide hint, use config value
writerAtInfo.BufferSize = int64(ci.MultiThreadWriteBufferSize)
}
if !fs.ConfigOptionsInfo.Get("multi_thread_streams").IsDefault() || writerAtInfo.Concurrency == 0 {
// if user did provided a specifc value or the backend didn't provide hint, use config value
writerAtInfo.Concurrency = ci.MultiThreadStreams
}
if writeBufferSize > 0 {
fs.Debugf(src.Remote(), "multi-thread copy: write buffer set to %v", writeBufferSize)
if writerAtInfo.BufferSize > 0 {
fs.Debugf(src.Remote(), "multi-thread copy: write buffer set to %v", writerAtInfo.BufferSize)
}
chunkWriter := &writerAtChunkWriter{
remote: remote,
size: src.Size(),
chunkSize: chunkSize,
chunks: calculateNumChunks(src.Size(), chunkSize),
chunkSize: writerAtInfo.ChunkSize,
chunks: calculateNumChunks(src.Size(), writerAtInfo.ChunkSize),
writerAt: writerAt,
writeBufferSize: writeBufferSize,
writeBufferSize: writerAtInfo.BufferSize,
f: f,
}
info = fs.ChunkWriterInfo{
ChunkSize: chunkSize,
Concurrency: ci.MultiThreadStreams,
ChunkSize: writerAtInfo.ChunkSize,
Concurrency: writerAtInfo.Concurrency,
LeavePartsOnError: writerAtInfo.LeavePartsOnError,
}
return info, chunkWriter, nil
}

View File

@ -44,7 +44,7 @@ func TestDoMultiThreadCopy(t *testing.T) {
ci.MultiThreadStreams, ci.MultiThreadCutoff = 4, 50
ci.MultiThreadSet = false
nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.OpenWriterAtInfo, fs.WriterAtCloser, error) {
panic("don't call me")
}
f.Features().OpenWriterAt = nullWriterAt

View File

@ -786,7 +786,7 @@ func Run(t *testing.T, opt *Opt) {
t.Skip("FS has no OpenWriterAt interface")
}
path := "writer-at-subdir/writer-at-file"
out, err := openWriterAt(ctx, path, -1)
_, out, err := openWriterAt(ctx, path, -1)
require.NoError(t, err)
var n int