mirror of
https://github.com/AlistGo/alist.git
synced 2025-04-21 20:18:47 +08:00

* add azure-blob driver * fix nested folders copy * feat(driver): add Azure Blob Storage driver 实现 Azure Blob Storage 驱动,支持以下功能: - 使用共享密钥身份验证初始化连接 - 列出目录和文件 - 生成临时 SAS URL 进行文件访问 - 创建目录 - 移动和重命名文件/文件夹 - 复制文件/文件夹 - 删除文件/文件夹 - 上传文件并支持进度跟踪 此驱动允许用户通过 AList 平台无缝访问和管理 Azure Blob Storage 中的数据。 * feat(driver): update help doc for Azure Blob * doc(readme): add new driver * Update drivers/azure_blob/driver.go fix(azure): fix name check Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update README.md doc(readme): fix the link Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix(azure): fix log and link --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
402 lines
12 KiB
Go
402 lines
12 KiB
Go
package azure_blob
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
// MaxRetries defines the maximum number of retry attempts for Azure operations
|
|
MaxRetries = 3
|
|
// RetryDelay defines the base delay between retries
|
|
RetryDelay = 3 * time.Second
|
|
// MaxBatchSize defines the maximum number of operations in a single batch request
|
|
MaxBatchSize = 128
|
|
)
|
|
|
|
// extractAccountName 从 Azure 存储 Endpoint 中提取账户名
|
|
func extractAccountName(endpoint string) string {
|
|
// 移除协议前缀
|
|
endpoint = strings.TrimPrefix(endpoint, "https://")
|
|
endpoint = strings.TrimPrefix(endpoint, "http://")
|
|
|
|
// 获取第一个点之前的部分(即账户名)
|
|
parts := strings.Split(endpoint, ".")
|
|
if len(parts) > 0 {
|
|
// to lower case
|
|
return strings.ToLower(parts[0])
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// isNotFoundError checks if the error is a "not found" type error
|
|
func isNotFoundError(err error) bool {
|
|
var storageErr *azcore.ResponseError
|
|
if errors.As(err, &storageErr) {
|
|
return storageErr.StatusCode == 404
|
|
}
|
|
// Fallback to string matching for backwards compatibility
|
|
return err != nil && strings.Contains(err.Error(), "BlobNotFound")
|
|
}
|
|
|
|
// flattenListBlobs - Optimize blob listing to handle pagination better
|
|
func (d *AzureBlob) flattenListBlobs(ctx context.Context, prefix string) ([]container.BlobItem, error) {
|
|
// Standardize prefix format
|
|
prefix = ensureTrailingSlash(prefix)
|
|
|
|
var blobItems []container.BlobItem
|
|
pager := d.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
|
|
Prefix: &prefix,
|
|
Include: container.ListBlobsInclude{
|
|
Metadata: true,
|
|
},
|
|
})
|
|
|
|
for pager.More() {
|
|
page, err := pager.NextPage(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list blobs: %w", err)
|
|
}
|
|
|
|
for _, blob := range page.Segment.BlobItems {
|
|
blobItems = append(blobItems, *blob)
|
|
}
|
|
}
|
|
|
|
return blobItems, nil
|
|
}
|
|
|
|
// batchDeleteBlobs - Simplify batch deletion logic
|
|
func (d *AzureBlob) batchDeleteBlobs(ctx context.Context, blobPaths []string) error {
|
|
if len(blobPaths) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Process in batches of MaxBatchSize
|
|
for i := 0; i < len(blobPaths); i += MaxBatchSize {
|
|
end := min(i+MaxBatchSize, len(blobPaths))
|
|
currentBatch := blobPaths[i:end]
|
|
|
|
// Create batch builder
|
|
batchBuilder, err := d.containerClient.NewBatchBuilder()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create batch builder: %w", err)
|
|
}
|
|
|
|
// Add delete operations
|
|
for _, blobPath := range currentBatch {
|
|
if err := batchBuilder.Delete(blobPath, nil); err != nil {
|
|
return fmt.Errorf("failed to add delete operation for %s: %w", blobPath, err)
|
|
}
|
|
}
|
|
|
|
// Submit batch
|
|
responses, err := d.containerClient.SubmitBatch(ctx, batchBuilder, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("batch delete request failed: %w", err)
|
|
}
|
|
|
|
// Check responses
|
|
for _, resp := range responses.Responses {
|
|
if resp.Error != nil && !isNotFoundError(resp.Error) {
|
|
// 获取 blob 名称以提供更好的错误信息
|
|
blobName := "unknown"
|
|
if resp.BlobName != nil {
|
|
blobName = *resp.BlobName
|
|
}
|
|
return fmt.Errorf("failed to delete blob %s: %v", blobName, resp.Error)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteFolder recursively deletes a directory and all its contents
|
|
func (d *AzureBlob) deleteFolder(ctx context.Context, prefix string) error {
|
|
// Ensure directory path ends with slash
|
|
prefix = ensureTrailingSlash(prefix)
|
|
|
|
// Get all blobs under the directory using flattenListBlobs
|
|
globs, err := d.flattenListBlobs(ctx, prefix)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list blobs for deletion: %w", err)
|
|
}
|
|
|
|
// If there are blobs in the directory, delete them
|
|
if len(globs) > 0 {
|
|
// 分离文件和目录标记
|
|
var filePaths []string
|
|
var dirPaths []string
|
|
|
|
for _, blob := range globs {
|
|
blobName := *blob.Name
|
|
if isDirectory(blob) {
|
|
// remove trailing slash for directory names
|
|
dirPaths = append(dirPaths, strings.TrimSuffix(blobName, "/"))
|
|
} else {
|
|
filePaths = append(filePaths, blobName)
|
|
}
|
|
}
|
|
|
|
// 先删除文件,再删除目录
|
|
if len(filePaths) > 0 {
|
|
if err := d.batchDeleteBlobs(ctx, filePaths); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if len(dirPaths) > 0 {
|
|
// 按路径深度分组
|
|
depthMap := make(map[int][]string)
|
|
for _, dir := range dirPaths {
|
|
depth := strings.Count(dir, "/") // 计算目录深度
|
|
depthMap[depth] = append(depthMap[depth], dir)
|
|
}
|
|
|
|
// 按深度从大到小排序
|
|
var depths []int
|
|
for depth := range depthMap {
|
|
depths = append(depths, depth)
|
|
}
|
|
sort.Sort(sort.Reverse(sort.IntSlice(depths)))
|
|
|
|
// 按深度逐层批量删除
|
|
for _, depth := range depths {
|
|
batch := depthMap[depth]
|
|
if err := d.batchDeleteBlobs(ctx, batch); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 最后删除目录标记本身
|
|
return d.deleteEmptyDirectory(ctx, prefix)
|
|
}
|
|
|
|
// deleteFile deletes a single file or blob with better error handling
|
|
func (d *AzureBlob) deleteFile(ctx context.Context, path string, isDir bool) error {
|
|
blobClient := d.containerClient.NewBlobClient(path)
|
|
_, err := blobClient.Delete(ctx, nil)
|
|
if err != nil && !(isDir && isNotFoundError(err)) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// copyFile copies a single blob from source path to destination path
|
|
func (d *AzureBlob) copyFile(ctx context.Context, srcPath, dstPath string) error {
|
|
srcBlob := d.containerClient.NewBlobClient(srcPath)
|
|
dstBlob := d.containerClient.NewBlobClient(dstPath)
|
|
|
|
// Use configured expiration time for SAS URL
|
|
expireDuration := time.Hour * time.Duration(d.SignURLExpire)
|
|
srcURL, err := srcBlob.GetSASURL(sas.BlobPermissions{Read: true}, time.Now().Add(expireDuration), nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to generate source SAS URL: %w", err)
|
|
}
|
|
|
|
_, err = dstBlob.StartCopyFromURL(ctx, srcURL, nil)
|
|
return err
|
|
|
|
}
|
|
|
|
// createContainerIfNotExists - Create container if not exists
|
|
// Clean up commented code
|
|
func (d *AzureBlob) createContainerIfNotExists(ctx context.Context, containerName string) error {
|
|
serviceClient := d.client.ServiceClient()
|
|
containerClient := serviceClient.NewContainerClient(containerName)
|
|
|
|
var options = service.CreateContainerOptions{}
|
|
_, err := containerClient.Create(ctx, &options)
|
|
if err != nil {
|
|
var responseErr *azcore.ResponseError
|
|
if errors.As(err, &responseErr) && responseErr.ErrorCode != "ContainerAlreadyExists" {
|
|
return fmt.Errorf("failed to create or access container [%s]: %w", containerName, err)
|
|
}
|
|
}
|
|
|
|
d.containerClient = containerClient
|
|
return nil
|
|
}
|
|
|
|
// mkDir creates a virtual directory marker by uploading an empty blob with metadata.
|
|
func (d *AzureBlob) mkDir(ctx context.Context, fullDirName string) error {
|
|
dirPath := ensureTrailingSlash(fullDirName)
|
|
blobClient := d.containerClient.NewBlockBlobClient(dirPath)
|
|
|
|
// Upload an empty blob with metadata indicating it's a directory
|
|
_, err := blobClient.Upload(ctx, struct {
|
|
*bytes.Reader
|
|
io.Closer
|
|
}{
|
|
Reader: bytes.NewReader([]byte{}),
|
|
Closer: io.NopCloser(nil),
|
|
}, &blockblob.UploadOptions{
|
|
Metadata: map[string]*string{
|
|
"hdi_isfolder": to.Ptr("true"),
|
|
},
|
|
})
|
|
return err
|
|
}
|
|
|
|
// ensureTrailingSlash ensures the provided path ends with a trailing slash.
|
|
func ensureTrailingSlash(path string) string {
|
|
if !strings.HasSuffix(path, "/") {
|
|
return path + "/"
|
|
}
|
|
return path
|
|
}
|
|
|
|
// moveOrRename moves or renames blobs or directories from source to destination.
|
|
func (d *AzureBlob) moveOrRename(ctx context.Context, srcPath, dstPath string, isDir bool, srcSize int64) error {
|
|
if isDir {
|
|
// Normalize paths for directory operations
|
|
srcPath = ensureTrailingSlash(srcPath)
|
|
dstPath = ensureTrailingSlash(dstPath)
|
|
|
|
// List all blobs under the source directory
|
|
blobs, err := d.flattenListBlobs(ctx, srcPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list blobs: %w", err)
|
|
}
|
|
|
|
// Iterate and copy each blob to the destination
|
|
for _, item := range blobs {
|
|
srcBlobName := *item.Name
|
|
relPath := strings.TrimPrefix(srcBlobName, srcPath)
|
|
itemDstPath := path.Join(dstPath, relPath)
|
|
|
|
if isDirectory(item) {
|
|
// Create directory marker at destination
|
|
if err := d.mkDir(ctx, itemDstPath); err != nil {
|
|
return fmt.Errorf("failed to create directory marker [%s]: %w", itemDstPath, err)
|
|
}
|
|
} else {
|
|
// Copy file blob to destination
|
|
if err := d.copyFile(ctx, srcBlobName, itemDstPath); err != nil {
|
|
return fmt.Errorf("failed to copy blob [%s]: %w", srcBlobName, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle empty directories by creating a marker at destination
|
|
if len(blobs) == 0 {
|
|
if err := d.mkDir(ctx, dstPath); err != nil {
|
|
return fmt.Errorf("failed to create directory [%s]: %w", dstPath, err)
|
|
}
|
|
}
|
|
|
|
// Delete source directory and its contents
|
|
if err := d.deleteFolder(ctx, srcPath); err != nil {
|
|
log.Warnf("failed to delete source directory [%s]: %v\n, and try again", srcPath, err)
|
|
// Retry deletion once more and ignore the result
|
|
if err := d.deleteFolder(ctx, srcPath); err != nil {
|
|
log.Errorf("Retry deletion of source directory [%s] failed: %v", srcPath, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Single file move or rename operation
|
|
if err := d.copyFile(ctx, srcPath, dstPath); err != nil {
|
|
return fmt.Errorf("failed to copy file: %w", err)
|
|
}
|
|
|
|
// Delete source file after successful copy
|
|
if err := d.deleteFile(ctx, srcPath, false); err != nil {
|
|
log.Errorf("Error deleting source file [%s]: %v", srcPath, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// optimizedUploadOptions returns the optimal upload options based on file size
|
|
func optimizedUploadOptions(fileSize int64) *azblob.UploadStreamOptions {
|
|
options := &azblob.UploadStreamOptions{
|
|
BlockSize: 4 * 1024 * 1024, // 4MB block size
|
|
Concurrency: 4, // Default concurrency
|
|
}
|
|
|
|
// For large files, increase block size and concurrency
|
|
if fileSize > 256*1024*1024 { // For files larger than 256MB
|
|
options.BlockSize = 8 * 1024 * 1024 // 8MB blocks
|
|
options.Concurrency = 8 // More concurrent uploads
|
|
}
|
|
|
|
// For very large files (>1GB)
|
|
if fileSize > 1024*1024*1024 {
|
|
options.BlockSize = 16 * 1024 * 1024 // 16MB blocks
|
|
options.Concurrency = 16 // Higher concurrency
|
|
}
|
|
|
|
return options
|
|
}
|
|
|
|
// isDirectory determines if a blob represents a directory
|
|
// Checks multiple indicators: path suffix, metadata, and content type
|
|
func isDirectory(blob container.BlobItem) bool {
|
|
// Check path suffix
|
|
if strings.HasSuffix(*blob.Name, "/") {
|
|
return true
|
|
}
|
|
|
|
// Check metadata for directory marker
|
|
if blob.Metadata != nil {
|
|
if val, ok := blob.Metadata["hdi_isfolder"]; ok && val != nil && *val == "true" {
|
|
return true
|
|
}
|
|
// Azure Storage Explorer and other tools may use different metadata keys
|
|
if val, ok := blob.Metadata["is_directory"]; ok && val != nil && strings.ToLower(*val) == "true" {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Check content type (some tools mark directories with specific content types)
|
|
if blob.Properties != nil && blob.Properties.ContentType != nil {
|
|
contentType := strings.ToLower(*blob.Properties.ContentType)
|
|
if blob.Properties.ContentLength != nil && *blob.Properties.ContentLength == 0 && (contentType == "application/directory" || contentType == "directory") {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// deleteEmptyDirectory deletes a directory only if it's empty
|
|
func (d *AzureBlob) deleteEmptyDirectory(ctx context.Context, dirPath string) error {
|
|
// Directory is empty, delete the directory marker
|
|
blobClient := d.containerClient.NewBlobClient(strings.TrimSuffix(dirPath, "/"))
|
|
_, err := blobClient.Delete(ctx, nil)
|
|
|
|
// Also try deleting with trailing slash (for different directory marker formats)
|
|
if err != nil && isNotFoundError(err) {
|
|
blobClient = d.containerClient.NewBlobClient(dirPath)
|
|
_, err = blobClient.Delete(ctx, nil)
|
|
}
|
|
|
|
// Ignore not found errors
|
|
if err != nil && isNotFoundError(err) {
|
|
log.Infof("Directory [%s] not found during deletion: %v", dirPath, err)
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}
|