From 2e21df066105c078f3a7d435ab8232eae644172a Mon Sep 17 00:00:00 2001 From: New Future Date: Thu, 3 Apr 2025 20:43:21 +0800 Subject: [PATCH] feat(driver): add Azure Blob Storage driver (#8261) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add azure-blob driver * fix nested folders copy * feat(driver): add Azure Blob Storage driver 实现 Azure Blob Storage 驱动,支持以下功能: - 使用共享密钥身份验证初始化连接 - 列出目录和文件 - 生成临时 SAS URL 进行文件访问 - 创建目录 - 移动和重命名文件/文件夹 - 复制文件/文件夹 - 删除文件/文件夹 - 上传文件并支持进度跟踪 此驱动允许用户通过 AList 平台无缝访问和管理 Azure Blob Storage 中的数据。 * feat(driver): update help doc for Azure Blob * doc(readme): add new driver * Update drivers/azure_blob/driver.go fix(azure): fix name check Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update README.md doc(readme): fix the link Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix(azure): fix log and link --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- README.md | 1 + drivers/all.go | 1 + drivers/azure_blob/driver.go | 313 +++++++++++++++++++++++++++ drivers/azure_blob/meta.go | 27 +++ drivers/azure_blob/types.go | 20 ++ drivers/azure_blob/util.go | 401 +++++++++++++++++++++++++++++++++++ go.mod | 6 + go.sum | 6 + 8 files changed, 775 insertions(+) create mode 100644 drivers/azure_blob/driver.go create mode 100644 drivers/azure_blob/meta.go create mode 100644 drivers/azure_blob/types.go create mode 100644 drivers/azure_blob/util.go diff --git a/README.md b/README.md index d1189188..1261839e 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,7 @@ English | [中文](./README_cn.md) | [日本語](./README_ja.md) | [Contributing - [x] [Dropbox](https://www.dropbox.com/) - [x] [FeijiPan](https://www.feijipan.com/) - [x] [dogecloud](https://www.dogecloud.com/product/oss) + - [x] [Azure Blob Storage](https://azure.microsoft.com/products/storage/blobs) - [x] Easy to deploy and out-of-the-box - [x] File preview (PDF, markdown, code, plain text, ...) - [x] Image preview in gallery mode diff --git a/drivers/all.go b/drivers/all.go index a14e80fb..083d01dc 100644 --- a/drivers/all.go +++ b/drivers/all.go @@ -16,6 +16,7 @@ import ( _ "github.com/alist-org/alist/v3/drivers/aliyundrive" _ "github.com/alist-org/alist/v3/drivers/aliyundrive_open" _ "github.com/alist-org/alist/v3/drivers/aliyundrive_share" + _ "github.com/alist-org/alist/v3/drivers/azure_blob" _ "github.com/alist-org/alist/v3/drivers/baidu_netdisk" _ "github.com/alist-org/alist/v3/drivers/baidu_photo" _ "github.com/alist-org/alist/v3/drivers/baidu_share" diff --git a/drivers/azure_blob/driver.go b/drivers/azure_blob/driver.go new file mode 100644 index 00000000..6836533a --- /dev/null +++ b/drivers/azure_blob/driver.go @@ -0,0 +1,313 @@ +package azure_blob + +import ( + "context" + "fmt" + "io" + "path" + "regexp" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" + "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/internal/model" +) +// Azure Blob Storage based on the blob APIs +// Link: https://learn.microsoft.com/rest/api/storageservices/blob-service-rest-api +type AzureBlob struct { + model.Storage + Addition + client *azblob.Client + containerClient *container.Client + config driver.Config +} + +// Config returns the driver configuration. +func (d *AzureBlob) Config() driver.Config { + return d.config +} + +// GetAddition returns additional settings specific to Azure Blob Storage. +func (d *AzureBlob) GetAddition() driver.Additional { + return &d.Addition +} + +// Init initializes the Azure Blob Storage client using shared key authentication. +func (d *AzureBlob) Init(ctx context.Context) error { + // Validate the endpoint URL + accountName := extractAccountName(d.Addition.Endpoint) + if !regexp.MustCompile(`^[a-z0-9]+$`).MatchString(accountName) { + return fmt.Errorf("invalid storage account name: must be chars of lowercase letters or numbers only") + } + + credential, err := azblob.NewSharedKeyCredential(accountName, d.Addition.AccessKey) + if err != nil { + return fmt.Errorf("failed to create credential: %w", err) + } + + // Check if Endpoint is just account name + endpoint := d.Addition.Endpoint + if accountName == endpoint { + endpoint = fmt.Sprintf("https://%s.blob.core.windows.net/", accountName) + } + // Initialize Azure Blob client with retry policy + client, err := azblob.NewClientWithSharedKeyCredential(endpoint, credential, + &azblob.ClientOptions{ClientOptions: azcore.ClientOptions{ + Retry: policy.RetryOptions{ + MaxRetries: MaxRetries, + RetryDelay: RetryDelay, + }, + }}) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + d.client = client + + // Ensure container exists or create it + containerName := strings.Trim(d.Addition.ContainerName, "/ \\") + if containerName == "" { + return fmt.Errorf("container name cannot be empty") + } + return d.createContainerIfNotExists(ctx, containerName) +} + +// Drop releases resources associated with the Azure Blob client. +func (d *AzureBlob) Drop(ctx context.Context) error { + d.client = nil + return nil +} + +// List retrieves blobs and directories under the specified path. +func (d *AzureBlob) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) { + prefix := ensureTrailingSlash(dir.GetPath()) + + pager := d.containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ + Prefix: &prefix, + }) + + var objs []model.Obj + for pager.More() { + page, err := pager.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list blobs: %w", err) + } + + // Process directories + for _, blobPrefix := range page.Segment.BlobPrefixes { + objs = append(objs, &model.Object{ + Name: path.Base(strings.TrimSuffix(*blobPrefix.Name, "/")), + Path: *blobPrefix.Name, + Modified: *blobPrefix.Properties.LastModified, + Ctime: *blobPrefix.Properties.CreationTime, + IsFolder: true, + }) + } + + // Process files + for _, blob := range page.Segment.BlobItems { + if strings.HasSuffix(*blob.Name, "/") { + continue + } + objs = append(objs, &model.Object{ + Name: path.Base(*blob.Name), + Path: *blob.Name, + Size: *blob.Properties.ContentLength, + Modified: *blob.Properties.LastModified, + Ctime: *blob.Properties.CreationTime, + IsFolder: false, + }) + } + } + return objs, nil +} + +// Link generates a temporary SAS URL for accessing a blob. +func (d *AzureBlob) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) { + blobClient := d.containerClient.NewBlobClient(file.GetPath()) + expireDuration := time.Hour * time.Duration(d.SignURLExpire) + + sasURL, err := blobClient.GetSASURL(sas.BlobPermissions{Read: true}, time.Now().Add(expireDuration), nil) + if err != nil { + return nil, fmt.Errorf("failed to generate SAS URL: %w", err) + } + return &model.Link{URL: sasURL}, nil +} + +// MakeDir creates a virtual directory by uploading an empty blob as a marker. +func (d *AzureBlob) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) { + dirPath := path.Join(parentDir.GetPath(), dirName) + if err := d.mkDir(ctx, dirPath); err != nil { + return nil, fmt.Errorf("failed to create directory marker: %w", err) + } + + return &model.Object{ + Path: dirPath, + Name: dirName, + IsFolder: true, + }, nil +} + +// Move relocates an object (file or directory) to a new directory. +func (d *AzureBlob) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { + srcPath := srcObj.GetPath() + dstPath := path.Join(dstDir.GetPath(), srcObj.GetName()) + + if err := d.moveOrRename(ctx, srcPath, dstPath, srcObj.IsDir(), srcObj.GetSize()); err != nil { + return nil, fmt.Errorf("move operation failed: %w", err) + } + + return &model.Object{ + Path: dstPath, + Name: srcObj.GetName(), + Modified: time.Now(), + IsFolder: srcObj.IsDir(), + Size: srcObj.GetSize(), + }, nil +} + +// Rename changes the name of an existing object. +func (d *AzureBlob) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) { + srcPath := srcObj.GetPath() + dstPath := path.Join(path.Dir(srcPath), newName) + + if err := d.moveOrRename(ctx, srcPath, dstPath, srcObj.IsDir(), srcObj.GetSize()); err != nil { + return nil, fmt.Errorf("rename operation failed: %w", err) + } + + return &model.Object{ + Path: dstPath, + Name: newName, + Modified: time.Now(), + IsFolder: srcObj.IsDir(), + Size: srcObj.GetSize(), + }, nil +} + +// Copy duplicates an object (file or directory) to a specified destination directory. +func (d *AzureBlob) Copy(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) { + dstPath := path.Join(dstDir.GetPath(), srcObj.GetName()) + + // Handle directory copying using flat listing + if srcObj.IsDir() { + srcPrefix := srcObj.GetPath() + srcPrefix = ensureTrailingSlash(srcPrefix) + + // Get all blobs under the source directory + blobs, err := d.flattenListBlobs(ctx, srcPrefix) + if err != nil { + return nil, fmt.Errorf("failed to list source directory contents: %w", err) + } + + // Process each blob - copy to destination + for _, blob := range blobs { + // Skip the directory marker itself + if *blob.Name == srcPrefix { + continue + } + + // Calculate relative path from source + relPath := strings.TrimPrefix(*blob.Name, srcPrefix) + itemDstPath := path.Join(dstPath, relPath) + + if strings.HasSuffix(itemDstPath, "/") || (blob.Metadata["hdi_isfolder"] != nil && *blob.Metadata["hdi_isfolder"] == "true") { + // Create directory marker at destination + err := d.mkDir(ctx, itemDstPath) + if err != nil { + return nil, fmt.Errorf("failed to create directory marker [%s]: %w", itemDstPath, err) + } + } else { + // Copy the blob + if err := d.copyFile(ctx, *blob.Name, itemDstPath); err != nil { + return nil, fmt.Errorf("failed to copy %s: %w", *blob.Name, err) + } + } + + } + + // Create directory marker at destination if needed + if len(blobs) == 0 { + err := d.mkDir(ctx, dstPath) + if err != nil { + return nil, fmt.Errorf("failed to create directory [%s]: %w", dstPath, err) + } + } + + return &model.Object{ + Path: dstPath, + Name: srcObj.GetName(), + Modified: time.Now(), + IsFolder: true, + }, nil + } + + // Copy a single file + if err := d.copyFile(ctx, srcObj.GetPath(), dstPath); err != nil { + return nil, fmt.Errorf("failed to copy blob: %w", err) + } + return &model.Object{ + Path: dstPath, + Name: srcObj.GetName(), + Size: srcObj.GetSize(), + Modified: time.Now(), + IsFolder: false, + }, nil +} + +// Remove deletes a specified blob or recursively deletes a directory and its contents. +func (d *AzureBlob) Remove(ctx context.Context, obj model.Obj) error { + path := obj.GetPath() + + // Handle recursive directory deletion + if obj.IsDir() { + return d.deleteFolder(ctx, path) + } + + // Delete single file + return d.deleteFile(ctx, path, false) +} + +// Put uploads a file stream to Azure Blob Storage with progress tracking. +func (d *AzureBlob) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) { + blobPath := path.Join(dstDir.GetPath(), stream.GetName()) + blobClient := d.containerClient.NewBlockBlobClient(blobPath) + + // Determine optimal upload options based on file size + options := optimizedUploadOptions(stream.GetSize()) + + // Track upload progress + progressTracker := &progressTracker{ + total: stream.GetSize(), + updateProgress: up, + } + + // Wrap stream to handle context cancellation and progress tracking + limitedStream := driver.NewLimitedUploadStream(ctx, io.TeeReader(stream, progressTracker)) + + // Upload the stream to Azure Blob Storage + _, err := blobClient.UploadStream(ctx, limitedStream, options) + if err != nil { + return nil, fmt.Errorf("failed to upload file: %w", err) + } + + return &model.Object{ + Path: blobPath, + Name: stream.GetName(), + Size: stream.GetSize(), + Modified: time.Now(), + IsFolder: false, + }, nil +} + +// The following methods related to archive handling are not implemented yet. +// func (d *AzureBlob) GetArchiveMeta(...) {...} +// func (d *AzureBlob) ListArchive(...) {...} +// func (d *AzureBlob) Extract(...) {...} +// func (d *AzureBlob) ArchiveDecompress(...) {...} + +// Ensure AzureBlob implements the driver.Driver interface. +var _ driver.Driver = (*AzureBlob)(nil) diff --git a/drivers/azure_blob/meta.go b/drivers/azure_blob/meta.go new file mode 100644 index 00000000..8e42bdd6 --- /dev/null +++ b/drivers/azure_blob/meta.go @@ -0,0 +1,27 @@ +package azure_blob + +import ( + "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/internal/op" +) + +type Addition struct { + Endpoint string `json:"endpoint" required:"true" default:"https://.blob.core.windows.net/" help:"e.g. https://accountname.blob.core.windows.net/. The full endpoint URL for Azure Storage, including the unique storage account name (3 ~ 24 numbers and lowercase letters only)."` + AccessKey string `json:"access_key" required:"true" help:"The access key for Azure Storage, used for authentication. https://learn.microsoft.com/azure/storage/common/storage-account-keys-manage"` + ContainerName string `json:"container_name" required:"true" help:"The name of the container in Azure Storage (created in the Azure portal). https://learn.microsoft.com/azure/storage/blobs/blob-containers-portal"` + SignURLExpire int `json:"sign_url_expire" type:"number" default:"4" help:"The expiration time for SAS URLs, in hours."` +} + +var config = driver.Config{ + Name: "Azure Blob Storage", + LocalSort: true, + CheckStatus: true, +} + +func init() { + op.RegisterDriver(func() driver.Driver { + return &AzureBlob{ + config: config, + } + }) +} diff --git a/drivers/azure_blob/types.go b/drivers/azure_blob/types.go new file mode 100644 index 00000000..01323e51 --- /dev/null +++ b/drivers/azure_blob/types.go @@ -0,0 +1,20 @@ +package azure_blob + +import "github.com/alist-org/alist/v3/internal/driver" + +// progressTracker is used to track upload progress +type progressTracker struct { + total int64 + current int64 + updateProgress driver.UpdateProgress +} + +// Write implements io.Writer to track progress +func (pt *progressTracker) Write(p []byte) (n int, err error) { + n = len(p) + pt.current += int64(n) + if pt.updateProgress != nil && pt.total > 0 { + pt.updateProgress(float64(pt.current) * 100 / float64(pt.total)) + } + return n, nil +} diff --git a/drivers/azure_blob/util.go b/drivers/azure_blob/util.go new file mode 100644 index 00000000..2adf3a0f --- /dev/null +++ b/drivers/azure_blob/util.go @@ -0,0 +1,401 @@ +package azure_blob + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "path" + "sort" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" + log "github.com/sirupsen/logrus" +) + +const ( + // MaxRetries defines the maximum number of retry attempts for Azure operations + MaxRetries = 3 + // RetryDelay defines the base delay between retries + RetryDelay = 3 * time.Second + // MaxBatchSize defines the maximum number of operations in a single batch request + MaxBatchSize = 128 +) + +// extractAccountName 从 Azure 存储 Endpoint 中提取账户名 +func extractAccountName(endpoint string) string { + // 移除协议前缀 + endpoint = strings.TrimPrefix(endpoint, "https://") + endpoint = strings.TrimPrefix(endpoint, "http://") + + // 获取第一个点之前的部分(即账户名) + parts := strings.Split(endpoint, ".") + if len(parts) > 0 { + // to lower case + return strings.ToLower(parts[0]) + } + return "" +} + +// isNotFoundError checks if the error is a "not found" type error +func isNotFoundError(err error) bool { + var storageErr *azcore.ResponseError + if errors.As(err, &storageErr) { + return storageErr.StatusCode == 404 + } + // Fallback to string matching for backwards compatibility + return err != nil && strings.Contains(err.Error(), "BlobNotFound") +} + +// flattenListBlobs - Optimize blob listing to handle pagination better +func (d *AzureBlob) flattenListBlobs(ctx context.Context, prefix string) ([]container.BlobItem, error) { + // Standardize prefix format + prefix = ensureTrailingSlash(prefix) + + var blobItems []container.BlobItem + pager := d.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ + Prefix: &prefix, + Include: container.ListBlobsInclude{ + Metadata: true, + }, + }) + + for pager.More() { + page, err := pager.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list blobs: %w", err) + } + + for _, blob := range page.Segment.BlobItems { + blobItems = append(blobItems, *blob) + } + } + + return blobItems, nil +} + +// batchDeleteBlobs - Simplify batch deletion logic +func (d *AzureBlob) batchDeleteBlobs(ctx context.Context, blobPaths []string) error { + if len(blobPaths) == 0 { + return nil + } + + // Process in batches of MaxBatchSize + for i := 0; i < len(blobPaths); i += MaxBatchSize { + end := min(i+MaxBatchSize, len(blobPaths)) + currentBatch := blobPaths[i:end] + + // Create batch builder + batchBuilder, err := d.containerClient.NewBatchBuilder() + if err != nil { + return fmt.Errorf("failed to create batch builder: %w", err) + } + + // Add delete operations + for _, blobPath := range currentBatch { + if err := batchBuilder.Delete(blobPath, nil); err != nil { + return fmt.Errorf("failed to add delete operation for %s: %w", blobPath, err) + } + } + + // Submit batch + responses, err := d.containerClient.SubmitBatch(ctx, batchBuilder, nil) + if err != nil { + return fmt.Errorf("batch delete request failed: %w", err) + } + + // Check responses + for _, resp := range responses.Responses { + if resp.Error != nil && !isNotFoundError(resp.Error) { + // 获取 blob 名称以提供更好的错误信息 + blobName := "unknown" + if resp.BlobName != nil { + blobName = *resp.BlobName + } + return fmt.Errorf("failed to delete blob %s: %v", blobName, resp.Error) + } + } + } + + return nil +} + +// deleteFolder recursively deletes a directory and all its contents +func (d *AzureBlob) deleteFolder(ctx context.Context, prefix string) error { + // Ensure directory path ends with slash + prefix = ensureTrailingSlash(prefix) + + // Get all blobs under the directory using flattenListBlobs + globs, err := d.flattenListBlobs(ctx, prefix) + if err != nil { + return fmt.Errorf("failed to list blobs for deletion: %w", err) + } + + // If there are blobs in the directory, delete them + if len(globs) > 0 { + // 分离文件和目录标记 + var filePaths []string + var dirPaths []string + + for _, blob := range globs { + blobName := *blob.Name + if isDirectory(blob) { + // remove trailing slash for directory names + dirPaths = append(dirPaths, strings.TrimSuffix(blobName, "/")) + } else { + filePaths = append(filePaths, blobName) + } + } + + // 先删除文件,再删除目录 + if len(filePaths) > 0 { + if err := d.batchDeleteBlobs(ctx, filePaths); err != nil { + return err + } + } + if len(dirPaths) > 0 { + // 按路径深度分组 + depthMap := make(map[int][]string) + for _, dir := range dirPaths { + depth := strings.Count(dir, "/") // 计算目录深度 + depthMap[depth] = append(depthMap[depth], dir) + } + + // 按深度从大到小排序 + var depths []int + for depth := range depthMap { + depths = append(depths, depth) + } + sort.Sort(sort.Reverse(sort.IntSlice(depths))) + + // 按深度逐层批量删除 + for _, depth := range depths { + batch := depthMap[depth] + if err := d.batchDeleteBlobs(ctx, batch); err != nil { + return err + } + } + } + } + + // 最后删除目录标记本身 + return d.deleteEmptyDirectory(ctx, prefix) +} + +// deleteFile deletes a single file or blob with better error handling +func (d *AzureBlob) deleteFile(ctx context.Context, path string, isDir bool) error { + blobClient := d.containerClient.NewBlobClient(path) + _, err := blobClient.Delete(ctx, nil) + if err != nil && !(isDir && isNotFoundError(err)) { + return err + } + return nil +} + +// copyFile copies a single blob from source path to destination path +func (d *AzureBlob) copyFile(ctx context.Context, srcPath, dstPath string) error { + srcBlob := d.containerClient.NewBlobClient(srcPath) + dstBlob := d.containerClient.NewBlobClient(dstPath) + + // Use configured expiration time for SAS URL + expireDuration := time.Hour * time.Duration(d.SignURLExpire) + srcURL, err := srcBlob.GetSASURL(sas.BlobPermissions{Read: true}, time.Now().Add(expireDuration), nil) + if err != nil { + return fmt.Errorf("failed to generate source SAS URL: %w", err) + } + + _, err = dstBlob.StartCopyFromURL(ctx, srcURL, nil) + return err + +} + +// createContainerIfNotExists - Create container if not exists +// Clean up commented code +func (d *AzureBlob) createContainerIfNotExists(ctx context.Context, containerName string) error { + serviceClient := d.client.ServiceClient() + containerClient := serviceClient.NewContainerClient(containerName) + + var options = service.CreateContainerOptions{} + _, err := containerClient.Create(ctx, &options) + if err != nil { + var responseErr *azcore.ResponseError + if errors.As(err, &responseErr) && responseErr.ErrorCode != "ContainerAlreadyExists" { + return fmt.Errorf("failed to create or access container [%s]: %w", containerName, err) + } + } + + d.containerClient = containerClient + return nil +} + +// mkDir creates a virtual directory marker by uploading an empty blob with metadata. +func (d *AzureBlob) mkDir(ctx context.Context, fullDirName string) error { + dirPath := ensureTrailingSlash(fullDirName) + blobClient := d.containerClient.NewBlockBlobClient(dirPath) + + // Upload an empty blob with metadata indicating it's a directory + _, err := blobClient.Upload(ctx, struct { + *bytes.Reader + io.Closer + }{ + Reader: bytes.NewReader([]byte{}), + Closer: io.NopCloser(nil), + }, &blockblob.UploadOptions{ + Metadata: map[string]*string{ + "hdi_isfolder": to.Ptr("true"), + }, + }) + return err +} + +// ensureTrailingSlash ensures the provided path ends with a trailing slash. +func ensureTrailingSlash(path string) string { + if !strings.HasSuffix(path, "/") { + return path + "/" + } + return path +} + +// moveOrRename moves or renames blobs or directories from source to destination. +func (d *AzureBlob) moveOrRename(ctx context.Context, srcPath, dstPath string, isDir bool, srcSize int64) error { + if isDir { + // Normalize paths for directory operations + srcPath = ensureTrailingSlash(srcPath) + dstPath = ensureTrailingSlash(dstPath) + + // List all blobs under the source directory + blobs, err := d.flattenListBlobs(ctx, srcPath) + if err != nil { + return fmt.Errorf("failed to list blobs: %w", err) + } + + // Iterate and copy each blob to the destination + for _, item := range blobs { + srcBlobName := *item.Name + relPath := strings.TrimPrefix(srcBlobName, srcPath) + itemDstPath := path.Join(dstPath, relPath) + + if isDirectory(item) { + // Create directory marker at destination + if err := d.mkDir(ctx, itemDstPath); err != nil { + return fmt.Errorf("failed to create directory marker [%s]: %w", itemDstPath, err) + } + } else { + // Copy file blob to destination + if err := d.copyFile(ctx, srcBlobName, itemDstPath); err != nil { + return fmt.Errorf("failed to copy blob [%s]: %w", srcBlobName, err) + } + } + } + + // Handle empty directories by creating a marker at destination + if len(blobs) == 0 { + if err := d.mkDir(ctx, dstPath); err != nil { + return fmt.Errorf("failed to create directory [%s]: %w", dstPath, err) + } + } + + // Delete source directory and its contents + if err := d.deleteFolder(ctx, srcPath); err != nil { + log.Warnf("failed to delete source directory [%s]: %v\n, and try again", srcPath, err) + // Retry deletion once more and ignore the result + if err := d.deleteFolder(ctx, srcPath); err != nil { + log.Errorf("Retry deletion of source directory [%s] failed: %v", srcPath, err) + } + } + + return nil + } + + // Single file move or rename operation + if err := d.copyFile(ctx, srcPath, dstPath); err != nil { + return fmt.Errorf("failed to copy file: %w", err) + } + + // Delete source file after successful copy + if err := d.deleteFile(ctx, srcPath, false); err != nil { + log.Errorf("Error deleting source file [%s]: %v", srcPath, err) + } + return nil +} + +// optimizedUploadOptions returns the optimal upload options based on file size +func optimizedUploadOptions(fileSize int64) *azblob.UploadStreamOptions { + options := &azblob.UploadStreamOptions{ + BlockSize: 4 * 1024 * 1024, // 4MB block size + Concurrency: 4, // Default concurrency + } + + // For large files, increase block size and concurrency + if fileSize > 256*1024*1024 { // For files larger than 256MB + options.BlockSize = 8 * 1024 * 1024 // 8MB blocks + options.Concurrency = 8 // More concurrent uploads + } + + // For very large files (>1GB) + if fileSize > 1024*1024*1024 { + options.BlockSize = 16 * 1024 * 1024 // 16MB blocks + options.Concurrency = 16 // Higher concurrency + } + + return options +} + +// isDirectory determines if a blob represents a directory +// Checks multiple indicators: path suffix, metadata, and content type +func isDirectory(blob container.BlobItem) bool { + // Check path suffix + if strings.HasSuffix(*blob.Name, "/") { + return true + } + + // Check metadata for directory marker + if blob.Metadata != nil { + if val, ok := blob.Metadata["hdi_isfolder"]; ok && val != nil && *val == "true" { + return true + } + // Azure Storage Explorer and other tools may use different metadata keys + if val, ok := blob.Metadata["is_directory"]; ok && val != nil && strings.ToLower(*val) == "true" { + return true + } + } + + // Check content type (some tools mark directories with specific content types) + if blob.Properties != nil && blob.Properties.ContentType != nil { + contentType := strings.ToLower(*blob.Properties.ContentType) + if blob.Properties.ContentLength != nil && *blob.Properties.ContentLength == 0 && (contentType == "application/directory" || contentType == "directory") { + return true + } + } + + return false +} + +// deleteEmptyDirectory deletes a directory only if it's empty +func (d *AzureBlob) deleteEmptyDirectory(ctx context.Context, dirPath string) error { + // Directory is empty, delete the directory marker + blobClient := d.containerClient.NewBlobClient(strings.TrimSuffix(dirPath, "/")) + _, err := blobClient.Delete(ctx, nil) + + // Also try deleting with trailing slash (for different directory marker formats) + if err != nil && isNotFoundError(err) { + blobClient = d.containerClient.NewBlobClient(dirPath) + _, err = blobClient.Delete(ctx, nil) + } + + // Ignore not found errors + if err != nil && isNotFoundError(err) { + log.Infof("Directory [%s] not found during deletion: %v", dirPath, err) + return nil + } + + return err +} diff --git a/go.mod b/go.mod index f8a238f1..97a477d3 100644 --- a/go.mod +++ b/go.mod @@ -79,6 +79,12 @@ require ( gorm.io/gorm v1.25.11 ) +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 // indirect +) + require ( github.com/STARRY-S/zip v0.2.1 // indirect github.com/aymerick/douceur v0.2.0 // indirect diff --git a/go.sum b/go.sum index 1681a3a0..86fb779e 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,12 @@ cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+ cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 h1:g0EZJwz7xkXQiZAI5xi9f3WWFYBlX1CPTrR+NDToRkQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0/go.mod h1:XCW7KnZet0Opnr7HccfUw1PLc4CjHqpcaxW8DHklNkQ= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 h1:UXT0o77lXQrikd1kgwIPQOUect7EoR/+sbP4wQKdzxM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0/go.mod h1:cTvi54pg19DoT07ekoeMgE/taAwNtCShVeZqA+Iv2xI= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=