New Future 2e21df0661
feat(driver): add Azure Blob Storage driver (#8261)
* add azure-blob driver

* fix nested folders copy

* feat(driver): add Azure Blob Storage driver

实现 Azure Blob Storage 驱动,支持以下功能:
- 使用共享密钥身份验证初始化连接
- 列出目录和文件
- 生成临时 SAS URL 进行文件访问
- 创建目录
- 移动和重命名文件/文件夹
- 复制文件/文件夹
- 删除文件/文件夹
- 上传文件并支持进度跟踪

此驱动允许用户通过 AList 平台无缝访问和管理 Azure Blob Storage 中的数据。

* feat(driver): update help doc for Azure Blob

* doc(readme): add new driver

* Update drivers/azure_blob/driver.go

fix(azure): fix name check

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update README.md

doc(readme): fix the link

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix(azure): fix log and link

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-04-03 20:43:21 +08:00

402 lines
12 KiB
Go

package azure_blob
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"path"
"sort"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
log "github.com/sirupsen/logrus"
)
const (
// MaxRetries defines the maximum number of retry attempts for Azure operations
MaxRetries = 3
// RetryDelay defines the base delay between retries
RetryDelay = 3 * time.Second
// MaxBatchSize defines the maximum number of operations in a single batch request
MaxBatchSize = 128
)
// extractAccountName 从 Azure 存储 Endpoint 中提取账户名
func extractAccountName(endpoint string) string {
// 移除协议前缀
endpoint = strings.TrimPrefix(endpoint, "https://")
endpoint = strings.TrimPrefix(endpoint, "http://")
// 获取第一个点之前的部分(即账户名)
parts := strings.Split(endpoint, ".")
if len(parts) > 0 {
// to lower case
return strings.ToLower(parts[0])
}
return ""
}
// isNotFoundError checks if the error is a "not found" type error
func isNotFoundError(err error) bool {
var storageErr *azcore.ResponseError
if errors.As(err, &storageErr) {
return storageErr.StatusCode == 404
}
// Fallback to string matching for backwards compatibility
return err != nil && strings.Contains(err.Error(), "BlobNotFound")
}
// flattenListBlobs - Optimize blob listing to handle pagination better
func (d *AzureBlob) flattenListBlobs(ctx context.Context, prefix string) ([]container.BlobItem, error) {
// Standardize prefix format
prefix = ensureTrailingSlash(prefix)
var blobItems []container.BlobItem
pager := d.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: &prefix,
Include: container.ListBlobsInclude{
Metadata: true,
},
})
for pager.More() {
page, err := pager.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list blobs: %w", err)
}
for _, blob := range page.Segment.BlobItems {
blobItems = append(blobItems, *blob)
}
}
return blobItems, nil
}
// batchDeleteBlobs - Simplify batch deletion logic
func (d *AzureBlob) batchDeleteBlobs(ctx context.Context, blobPaths []string) error {
if len(blobPaths) == 0 {
return nil
}
// Process in batches of MaxBatchSize
for i := 0; i < len(blobPaths); i += MaxBatchSize {
end := min(i+MaxBatchSize, len(blobPaths))
currentBatch := blobPaths[i:end]
// Create batch builder
batchBuilder, err := d.containerClient.NewBatchBuilder()
if err != nil {
return fmt.Errorf("failed to create batch builder: %w", err)
}
// Add delete operations
for _, blobPath := range currentBatch {
if err := batchBuilder.Delete(blobPath, nil); err != nil {
return fmt.Errorf("failed to add delete operation for %s: %w", blobPath, err)
}
}
// Submit batch
responses, err := d.containerClient.SubmitBatch(ctx, batchBuilder, nil)
if err != nil {
return fmt.Errorf("batch delete request failed: %w", err)
}
// Check responses
for _, resp := range responses.Responses {
if resp.Error != nil && !isNotFoundError(resp.Error) {
// 获取 blob 名称以提供更好的错误信息
blobName := "unknown"
if resp.BlobName != nil {
blobName = *resp.BlobName
}
return fmt.Errorf("failed to delete blob %s: %v", blobName, resp.Error)
}
}
}
return nil
}
// deleteFolder recursively deletes a directory and all its contents
func (d *AzureBlob) deleteFolder(ctx context.Context, prefix string) error {
// Ensure directory path ends with slash
prefix = ensureTrailingSlash(prefix)
// Get all blobs under the directory using flattenListBlobs
globs, err := d.flattenListBlobs(ctx, prefix)
if err != nil {
return fmt.Errorf("failed to list blobs for deletion: %w", err)
}
// If there are blobs in the directory, delete them
if len(globs) > 0 {
// 分离文件和目录标记
var filePaths []string
var dirPaths []string
for _, blob := range globs {
blobName := *blob.Name
if isDirectory(blob) {
// remove trailing slash for directory names
dirPaths = append(dirPaths, strings.TrimSuffix(blobName, "/"))
} else {
filePaths = append(filePaths, blobName)
}
}
// 先删除文件,再删除目录
if len(filePaths) > 0 {
if err := d.batchDeleteBlobs(ctx, filePaths); err != nil {
return err
}
}
if len(dirPaths) > 0 {
// 按路径深度分组
depthMap := make(map[int][]string)
for _, dir := range dirPaths {
depth := strings.Count(dir, "/") // 计算目录深度
depthMap[depth] = append(depthMap[depth], dir)
}
// 按深度从大到小排序
var depths []int
for depth := range depthMap {
depths = append(depths, depth)
}
sort.Sort(sort.Reverse(sort.IntSlice(depths)))
// 按深度逐层批量删除
for _, depth := range depths {
batch := depthMap[depth]
if err := d.batchDeleteBlobs(ctx, batch); err != nil {
return err
}
}
}
}
// 最后删除目录标记本身
return d.deleteEmptyDirectory(ctx, prefix)
}
// deleteFile deletes a single file or blob with better error handling
func (d *AzureBlob) deleteFile(ctx context.Context, path string, isDir bool) error {
blobClient := d.containerClient.NewBlobClient(path)
_, err := blobClient.Delete(ctx, nil)
if err != nil && !(isDir && isNotFoundError(err)) {
return err
}
return nil
}
// copyFile copies a single blob from source path to destination path
func (d *AzureBlob) copyFile(ctx context.Context, srcPath, dstPath string) error {
srcBlob := d.containerClient.NewBlobClient(srcPath)
dstBlob := d.containerClient.NewBlobClient(dstPath)
// Use configured expiration time for SAS URL
expireDuration := time.Hour * time.Duration(d.SignURLExpire)
srcURL, err := srcBlob.GetSASURL(sas.BlobPermissions{Read: true}, time.Now().Add(expireDuration), nil)
if err != nil {
return fmt.Errorf("failed to generate source SAS URL: %w", err)
}
_, err = dstBlob.StartCopyFromURL(ctx, srcURL, nil)
return err
}
// createContainerIfNotExists - Create container if not exists
// Clean up commented code
func (d *AzureBlob) createContainerIfNotExists(ctx context.Context, containerName string) error {
serviceClient := d.client.ServiceClient()
containerClient := serviceClient.NewContainerClient(containerName)
var options = service.CreateContainerOptions{}
_, err := containerClient.Create(ctx, &options)
if err != nil {
var responseErr *azcore.ResponseError
if errors.As(err, &responseErr) && responseErr.ErrorCode != "ContainerAlreadyExists" {
return fmt.Errorf("failed to create or access container [%s]: %w", containerName, err)
}
}
d.containerClient = containerClient
return nil
}
// mkDir creates a virtual directory marker by uploading an empty blob with metadata.
func (d *AzureBlob) mkDir(ctx context.Context, fullDirName string) error {
dirPath := ensureTrailingSlash(fullDirName)
blobClient := d.containerClient.NewBlockBlobClient(dirPath)
// Upload an empty blob with metadata indicating it's a directory
_, err := blobClient.Upload(ctx, struct {
*bytes.Reader
io.Closer
}{
Reader: bytes.NewReader([]byte{}),
Closer: io.NopCloser(nil),
}, &blockblob.UploadOptions{
Metadata: map[string]*string{
"hdi_isfolder": to.Ptr("true"),
},
})
return err
}
// ensureTrailingSlash ensures the provided path ends with a trailing slash.
func ensureTrailingSlash(path string) string {
if !strings.HasSuffix(path, "/") {
return path + "/"
}
return path
}
// moveOrRename moves or renames blobs or directories from source to destination.
func (d *AzureBlob) moveOrRename(ctx context.Context, srcPath, dstPath string, isDir bool, srcSize int64) error {
if isDir {
// Normalize paths for directory operations
srcPath = ensureTrailingSlash(srcPath)
dstPath = ensureTrailingSlash(dstPath)
// List all blobs under the source directory
blobs, err := d.flattenListBlobs(ctx, srcPath)
if err != nil {
return fmt.Errorf("failed to list blobs: %w", err)
}
// Iterate and copy each blob to the destination
for _, item := range blobs {
srcBlobName := *item.Name
relPath := strings.TrimPrefix(srcBlobName, srcPath)
itemDstPath := path.Join(dstPath, relPath)
if isDirectory(item) {
// Create directory marker at destination
if err := d.mkDir(ctx, itemDstPath); err != nil {
return fmt.Errorf("failed to create directory marker [%s]: %w", itemDstPath, err)
}
} else {
// Copy file blob to destination
if err := d.copyFile(ctx, srcBlobName, itemDstPath); err != nil {
return fmt.Errorf("failed to copy blob [%s]: %w", srcBlobName, err)
}
}
}
// Handle empty directories by creating a marker at destination
if len(blobs) == 0 {
if err := d.mkDir(ctx, dstPath); err != nil {
return fmt.Errorf("failed to create directory [%s]: %w", dstPath, err)
}
}
// Delete source directory and its contents
if err := d.deleteFolder(ctx, srcPath); err != nil {
log.Warnf("failed to delete source directory [%s]: %v\n, and try again", srcPath, err)
// Retry deletion once more and ignore the result
if err := d.deleteFolder(ctx, srcPath); err != nil {
log.Errorf("Retry deletion of source directory [%s] failed: %v", srcPath, err)
}
}
return nil
}
// Single file move or rename operation
if err := d.copyFile(ctx, srcPath, dstPath); err != nil {
return fmt.Errorf("failed to copy file: %w", err)
}
// Delete source file after successful copy
if err := d.deleteFile(ctx, srcPath, false); err != nil {
log.Errorf("Error deleting source file [%s]: %v", srcPath, err)
}
return nil
}
// optimizedUploadOptions returns the optimal upload options based on file size
func optimizedUploadOptions(fileSize int64) *azblob.UploadStreamOptions {
options := &azblob.UploadStreamOptions{
BlockSize: 4 * 1024 * 1024, // 4MB block size
Concurrency: 4, // Default concurrency
}
// For large files, increase block size and concurrency
if fileSize > 256*1024*1024 { // For files larger than 256MB
options.BlockSize = 8 * 1024 * 1024 // 8MB blocks
options.Concurrency = 8 // More concurrent uploads
}
// For very large files (>1GB)
if fileSize > 1024*1024*1024 {
options.BlockSize = 16 * 1024 * 1024 // 16MB blocks
options.Concurrency = 16 // Higher concurrency
}
return options
}
// isDirectory determines if a blob represents a directory
// Checks multiple indicators: path suffix, metadata, and content type
func isDirectory(blob container.BlobItem) bool {
// Check path suffix
if strings.HasSuffix(*blob.Name, "/") {
return true
}
// Check metadata for directory marker
if blob.Metadata != nil {
if val, ok := blob.Metadata["hdi_isfolder"]; ok && val != nil && *val == "true" {
return true
}
// Azure Storage Explorer and other tools may use different metadata keys
if val, ok := blob.Metadata["is_directory"]; ok && val != nil && strings.ToLower(*val) == "true" {
return true
}
}
// Check content type (some tools mark directories with specific content types)
if blob.Properties != nil && blob.Properties.ContentType != nil {
contentType := strings.ToLower(*blob.Properties.ContentType)
if blob.Properties.ContentLength != nil && *blob.Properties.ContentLength == 0 && (contentType == "application/directory" || contentType == "directory") {
return true
}
}
return false
}
// deleteEmptyDirectory deletes a directory only if it's empty
func (d *AzureBlob) deleteEmptyDirectory(ctx context.Context, dirPath string) error {
// Directory is empty, delete the directory marker
blobClient := d.containerClient.NewBlobClient(strings.TrimSuffix(dirPath, "/"))
_, err := blobClient.Delete(ctx, nil)
// Also try deleting with trailing slash (for different directory marker formats)
if err != nil && isNotFoundError(err) {
blobClient = d.containerClient.NewBlobClient(dirPath)
_, err = blobClient.Delete(ctx, nil)
}
// Ignore not found errors
if err != nil && isNotFoundError(err) {
log.Infof("Directory [%s] not found during deletion: %v", dirPath, err)
return nil
}
return err
}