921 lines
28 KiB
Go
921 lines
28 KiB
Go
// Copyright 2021 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package objstore
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/docker/go-units"
|
|
"github.com/google/uuid"
|
|
"github.com/pingcap/errors"
|
|
backuppb "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/log"
|
|
berrors "github.com/pingcap/tidb/br/pkg/errors"
|
|
"github.com/pingcap/tidb/br/pkg/logutil"
|
|
"github.com/pingcap/tidb/pkg/objstore/compressedio"
|
|
"github.com/pingcap/tidb/pkg/objstore/objectio"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/spf13/pflag"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
azblobEndpointOption = "azblob.endpoint"
|
|
azblobAccessTierOption = "azblob.access-tier"
|
|
azblobAccountName = "azblob.account-name"
|
|
azblobAccountKey = "azblob.account-key"
|
|
azblobSASToken = "azblob.sas-token"
|
|
azblobEncryptionScope = "azblob.encryption-scope"
|
|
azblobEncryptionKey = "azblob.encryption-key"
|
|
|
|
azblobPremisedCopySpeedPerSecond = 100 * units.MiB
|
|
azblobPremisedCopySpeedPerMilliSecond = azblobPremisedCopySpeedPerSecond / 1000
|
|
azblobCopyPollPendingMinimalDuration = 2 * time.Second
|
|
)
|
|
|
|
const azblobRetryTimes int32 = 5
|
|
|
|
func getDefaultClientOptions() *azblob.ClientOptions {
|
|
return &azblob.ClientOptions{
|
|
ClientOptions: azcore.ClientOptions{
|
|
Retry: policy.RetryOptions{
|
|
MaxRetries: azblobRetryTimes,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// AzblobBackendOptions is the options for Azure Blob storage.
|
|
type AzblobBackendOptions struct {
|
|
Endpoint string `json:"endpoint" toml:"endpoint"`
|
|
AccountName string `json:"account-name" toml:"account-name"`
|
|
AccountKey string `json:"account-key" toml:"account-key"`
|
|
AccessTier string `json:"access-tier" toml:"access-tier"`
|
|
SASToken string `json:"sas-token" toml:"sas-token"`
|
|
EncryptionScope string `json:"encryption-scope" toml:"encryption-scope"`
|
|
EncryptionKey string `json:"encryption-key" toml:"encryption-key"`
|
|
}
|
|
|
|
func (options *AzblobBackendOptions) apply(azblob *backuppb.AzureBlobStorage) error {
|
|
azblob.Endpoint = options.Endpoint
|
|
azblob.StorageClass = options.AccessTier
|
|
azblob.AccountName = options.AccountName
|
|
azblob.SharedKey = options.AccountKey
|
|
azblob.AccessSig = options.SASToken
|
|
azblob.EncryptionScope = options.EncryptionScope
|
|
|
|
if len(options.EncryptionKey) == 0 {
|
|
options.EncryptionKey = os.Getenv("AZURE_ENCRYPTION_KEY")
|
|
}
|
|
|
|
if len(options.EncryptionKey) > 0 {
|
|
keySlice := []byte(options.EncryptionKey)
|
|
keySha256 := sha256.Sum256(keySlice)
|
|
azblob.EncryptionKey = &backuppb.AzureCustomerKey{
|
|
EncryptionKey: base64.StdEncoding.EncodeToString(keySlice),
|
|
EncryptionKeySha256: base64.StdEncoding.EncodeToString(keySha256[:]),
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func defineAzblobFlags(flags *pflag.FlagSet) {
|
|
flags.String(azblobEndpointOption, "", "(experimental) Set the Azblob endpoint URL")
|
|
flags.String(azblobAccessTierOption, "", "Specify the storage class for azblob")
|
|
flags.String(azblobAccountName, "", "Specify the account name for azblob")
|
|
flags.String(azblobAccountKey, "", "Specify the account key for azblob")
|
|
flags.String(azblobSASToken, "", "Specify the SAS (shared access signatures) for azblob")
|
|
flags.String(azblobEncryptionScope, "", "Specify the server side encryption scope")
|
|
flags.String(azblobEncryptionKey, "", "Specify the server side encryption customer provided key")
|
|
}
|
|
|
|
func hiddenAzblobFlags(flags *pflag.FlagSet) {
|
|
_ = flags.MarkHidden(azblobEndpointOption)
|
|
_ = flags.MarkHidden(azblobAccessTierOption)
|
|
_ = flags.MarkHidden(azblobAccountName)
|
|
_ = flags.MarkHidden(azblobAccountKey)
|
|
_ = flags.MarkHidden(azblobSASToken)
|
|
_ = flags.MarkHidden(azblobEncryptionScope)
|
|
_ = flags.MarkHidden(azblobEncryptionKey)
|
|
}
|
|
|
|
func (options *AzblobBackendOptions) parseFromFlags(flags *pflag.FlagSet) error {
|
|
var err error
|
|
options.Endpoint, err = flags.GetString(azblobEndpointOption)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.AccessTier, err = flags.GetString(azblobAccessTierOption)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.AccountName, err = flags.GetString(azblobAccountName)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.AccountKey, err = flags.GetString(azblobAccountKey)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.SASToken, err = flags.GetString(azblobSASToken)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.EncryptionScope, err = flags.GetString(azblobEncryptionScope)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
options.EncryptionKey, err = flags.GetString(azblobEncryptionKey)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ClientBuilder provides common method to build a service client.
|
|
type ClientBuilder interface {
|
|
// Example of serviceURL: https://<your_storage_account>.blob.core.windows.net
|
|
GetServiceClient() (*azblob.Client, error)
|
|
GetAccountName() string
|
|
GetServiceURL() string
|
|
}
|
|
|
|
func urlOfObjectByEndpoint(endpoint, container, object string) (string, error) {
|
|
u, err := url.Parse(endpoint)
|
|
if err != nil {
|
|
return "", errors.Annotatef(err, "%s isn't a valid url", endpoint)
|
|
}
|
|
u.Path = path.Join(u.Path, container, object)
|
|
return u.String(), nil
|
|
}
|
|
|
|
type defaultClientBuilder struct {
|
|
defaultCred *azidentity.DefaultAzureCredential
|
|
|
|
accountName string
|
|
serviceURL string
|
|
clientOpts *azblob.ClientOptions
|
|
}
|
|
|
|
// GetAccountName implements ClientBuilder.
|
|
func (d *defaultClientBuilder) GetAccountName() string {
|
|
return d.accountName
|
|
}
|
|
|
|
// GetServiceClient implements ClientBuilder.
|
|
func (d *defaultClientBuilder) GetServiceClient() (*azblob.Client, error) {
|
|
return azblob.NewClient(d.serviceURL, d.defaultCred, d.clientOpts)
|
|
}
|
|
|
|
// GetServiceURL implements ClientBuilder.
|
|
func (d *defaultClientBuilder) GetServiceURL() string {
|
|
return d.serviceURL
|
|
}
|
|
|
|
// use shared key to access azure blob storage
|
|
type sharedKeyClientBuilder struct {
|
|
cred *azblob.SharedKeyCredential
|
|
accountName string
|
|
serviceURL string
|
|
|
|
clientOptions *azblob.ClientOptions
|
|
}
|
|
|
|
// GetServiceURL implements ClientBuilder.
|
|
func (b *sharedKeyClientBuilder) GetServiceURL() string {
|
|
return b.serviceURL
|
|
}
|
|
|
|
func (b *sharedKeyClientBuilder) GetServiceClient() (*azblob.Client, error) {
|
|
return azblob.NewClientWithSharedKeyCredential(b.serviceURL, b.cred, b.clientOptions)
|
|
}
|
|
|
|
func (b *sharedKeyClientBuilder) GetAccountName() string {
|
|
return b.accountName
|
|
}
|
|
|
|
// use SAS to access azure blob storage
|
|
type sasClientBuilder struct {
|
|
accountName string
|
|
// Example of serviceURL: https://<account>.blob.core.windows.net/?<sas token>
|
|
serviceURL string
|
|
|
|
clientOptions *azblob.ClientOptions
|
|
}
|
|
|
|
// GetServiceURL implements ClientBuilder.
|
|
func (b *sasClientBuilder) GetServiceURL() string {
|
|
return b.serviceURL
|
|
}
|
|
|
|
func (b *sasClientBuilder) GetServiceClient() (*azblob.Client, error) {
|
|
return azblob.NewClientWithNoCredential(b.serviceURL, b.clientOptions)
|
|
}
|
|
|
|
func (b *sasClientBuilder) GetAccountName() string {
|
|
return b.accountName
|
|
}
|
|
|
|
// use token to access azure blob storage
|
|
type tokenClientBuilder struct {
|
|
cred *azidentity.ClientSecretCredential
|
|
accountName string
|
|
serviceURL string
|
|
|
|
clientOptions *azblob.ClientOptions
|
|
}
|
|
|
|
// GetServiceURL implements ClientBuilder.
|
|
func (b *tokenClientBuilder) GetServiceURL() string {
|
|
return b.serviceURL
|
|
}
|
|
|
|
func (b *tokenClientBuilder) GetServiceClient() (*azblob.Client, error) {
|
|
return azblob.NewClient(b.serviceURL, b.cred, b.clientOptions)
|
|
}
|
|
|
|
func (b *tokenClientBuilder) GetAccountName() string {
|
|
return b.accountName
|
|
}
|
|
|
|
func getAuthorizerFromEnvironment() (clientID, tenantID, clientSecret string) {
|
|
return os.Getenv("AZURE_CLIENT_ID"),
|
|
os.Getenv("AZURE_TENANT_ID"),
|
|
os.Getenv("AZURE_CLIENT_SECRET")
|
|
}
|
|
|
|
// get azure service client from options and environment
|
|
func getAzureServiceClientBuilder(options *backuppb.AzureBlobStorage, opts *storeapi.Options) (ClientBuilder, error) {
|
|
if len(options.Bucket) == 0 {
|
|
return nil, errors.New("bucket(container) cannot be empty to access azure blob storage")
|
|
}
|
|
|
|
clientOptions := getDefaultClientOptions()
|
|
if opts != nil && opts.HTTPClient != nil {
|
|
clientOptions.Transport = opts.HTTPClient
|
|
}
|
|
|
|
if len(options.AccountName) > 0 && len(options.AccessSig) > 0 {
|
|
serviceURL := options.Endpoint
|
|
if len(serviceURL) == 0 {
|
|
if strings.HasPrefix(options.AccessSig, "?") {
|
|
serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/%s", options.AccountName, options.AccessSig)
|
|
} else {
|
|
serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net/?%s", options.AccountName, options.AccessSig)
|
|
}
|
|
}
|
|
return &sasClientBuilder{
|
|
options.AccountName,
|
|
serviceURL,
|
|
|
|
clientOptions,
|
|
}, nil
|
|
}
|
|
|
|
if len(options.AccountName) > 0 && len(options.SharedKey) > 0 {
|
|
serviceURL := options.Endpoint
|
|
if len(serviceURL) == 0 {
|
|
serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net", options.AccountName)
|
|
}
|
|
cred, err := azblob.NewSharedKeyCredential(options.AccountName, options.SharedKey)
|
|
if err != nil {
|
|
return nil, errors.Annotate(err, "Failed to get azure sharedKey credential")
|
|
}
|
|
return &sharedKeyClientBuilder{
|
|
cred,
|
|
options.AccountName,
|
|
serviceURL,
|
|
|
|
clientOptions,
|
|
}, nil
|
|
}
|
|
|
|
accountName := options.AccountName
|
|
if len(accountName) == 0 {
|
|
val := os.Getenv("AZURE_STORAGE_ACCOUNT")
|
|
if len(val) <= 0 {
|
|
return nil, errors.New("account name cannot be empty to access azure blob storage")
|
|
}
|
|
accountName = val
|
|
}
|
|
|
|
serviceURL := options.Endpoint
|
|
if len(serviceURL) == 0 {
|
|
serviceURL = fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
|
|
}
|
|
|
|
if clientID, tenantID, clientSecret := getAuthorizerFromEnvironment(); len(clientID) > 0 && len(tenantID) > 0 && len(clientSecret) > 0 {
|
|
cred, err := azidentity.NewClientSecretCredential(tenantID, clientID, clientSecret, nil)
|
|
if err == nil {
|
|
// send account-name to TiKV
|
|
if opts != nil && opts.SendCredentials {
|
|
options.AccountName = accountName
|
|
}
|
|
return &tokenClientBuilder{
|
|
cred,
|
|
accountName,
|
|
serviceURL,
|
|
|
|
clientOptions,
|
|
}, nil
|
|
}
|
|
log.Warn("Failed to get azure token credential but environment variables exist, try to use shared key.", zap.String("tenantId", tenantID), zap.String("clientId", clientID), zap.String("clientSecret", "?"))
|
|
}
|
|
|
|
var sharedKey string
|
|
val := os.Getenv("AZURE_STORAGE_KEY")
|
|
if len(val) > 0 {
|
|
log.Info("Get azure sharedKey from environment variable $AZURE_STORAGE_KEY")
|
|
sharedKey = val
|
|
|
|
cred, err := azblob.NewSharedKeyCredential(accountName, sharedKey)
|
|
if err != nil {
|
|
return nil, errors.Annotate(err, "Failed to get azure sharedKey credential")
|
|
}
|
|
// if BR can only get credential info from environment variable `sharedKey`,
|
|
// BR will send it to TiKV so that there is no need to set environment variable for TiKV.
|
|
if opts != nil && opts.SendCredentials {
|
|
options.AccountName = accountName
|
|
options.SharedKey = sharedKey
|
|
}
|
|
return &sharedKeyClientBuilder{
|
|
cred,
|
|
accountName,
|
|
serviceURL,
|
|
|
|
clientOptions,
|
|
}, nil
|
|
}
|
|
|
|
defaultCred, err := azidentity.NewDefaultAzureCredential(nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &defaultClientBuilder{
|
|
defaultCred: defaultCred,
|
|
accountName: accountName,
|
|
serviceURL: serviceURL,
|
|
clientOpts: clientOptions,
|
|
}, nil
|
|
}
|
|
|
|
// AzureBlobStorage is a storage engine that stores data in Azure Blob Storage.
|
|
type AzureBlobStorage struct {
|
|
options *backuppb.AzureBlobStorage
|
|
|
|
containerClient *container.Client
|
|
|
|
accessTier blob.AccessTier
|
|
|
|
cpkScope *blob.CPKScopeInfo
|
|
cpkInfo *blob.CPKInfo
|
|
|
|
// resolvedAccountName is the final account name we are going to use.
|
|
resolvedAccountName string
|
|
resolvedServiceEndpoint string
|
|
}
|
|
|
|
// CopyFrom implements Copier.
|
|
func (s *AzureBlobStorage) CopyFrom(ctx context.Context, e storeapi.Storage, spec storeapi.CopySpec) error {
|
|
es, ok := e.(*AzureBlobStorage)
|
|
if !ok {
|
|
return errors.Annotatef(berrors.ErrStorageInvalidConfig,
|
|
"AzureBlobStorage.CopyFrom supports *AzureBlobStorage only, got %T", e)
|
|
}
|
|
|
|
url, err := urlOfObjectByEndpoint(es.resolvedServiceEndpoint, es.options.Bucket, es.withPrefix(spec.From))
|
|
if err != nil {
|
|
return errors.Annotatef(err, "failed to get url of object %s", spec.From)
|
|
}
|
|
dstBlob := s.containerClient.NewBlobClient(s.withPrefix(spec.To))
|
|
|
|
// NOTE: `CopyFromURL` supports files up to 256 MiB, which might not be enough for huger regions.
|
|
// Hence we use the asynchronous version and wait for finish.
|
|
// But this might not as effect as the syncrhonous version for small files.
|
|
// It is possible to use syncrhonous version for small files if necessary.
|
|
// REF: https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob-from-url
|
|
resp, err := dstBlob.StartCopyFromURL(ctx, url, &blob.StartCopyFromURLOptions{})
|
|
if err != nil {
|
|
return errors.Annotatef(err, "failed to copy blob from %s to %s", url, spec.To)
|
|
}
|
|
copyID := resp.CopyID
|
|
deref := aws.ToString
|
|
|
|
for {
|
|
prop, err := dstBlob.GetProperties(ctx, &blob.GetPropertiesOptions{})
|
|
if err != nil {
|
|
return errors.Annotate(err, "failed to check asynchronous copy status")
|
|
}
|
|
if prop.CopyID == nil || deref(prop.CopyID) != deref(copyID) {
|
|
return errors.Annotatef(berrors.ErrStorageUnknown,
|
|
"failed to check copy status: copy ID not match; copy id = %v; resp.copyID = %v;", deref(copyID), deref(prop.CopyID))
|
|
}
|
|
cStat := deref((*string)(prop.CopyStatus))
|
|
// REF: https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob-properties?tabs=microsoft-entra-id#response-headers
|
|
switch cStat {
|
|
case "success":
|
|
return nil
|
|
case "failed", "aborted":
|
|
return errors.Annotatef(berrors.ErrStorageUnknown, "asynchronous copy failed or aborted: %s", deref(prop.CopyStatusDescription))
|
|
case "pending":
|
|
finished, total, err := progress(deref(prop.CopyProgress))
|
|
if err != nil {
|
|
return errors.Annotate(err, "failed to parse progress")
|
|
}
|
|
rem := total - finished
|
|
// In practice, most copies finish when the initial request returns.
|
|
// To avoid a busy loop of requesting, we need a minimal sleep duration.
|
|
toSleep := max(time.Duration(rem/azblobPremisedCopySpeedPerMilliSecond)*time.Millisecond, azblobCopyPollPendingMinimalDuration)
|
|
logutil.CL(ctx).Info("AzureBlobStorage: asynchronous copy triggered",
|
|
zap.Int("finished", finished), zap.Int("total", total),
|
|
zap.Stringp("copy-id", prop.CopyID), zap.Duration("to-sleep", toSleep),
|
|
zap.Stringp("copy-desc", prop.CopyStatusDescription),
|
|
)
|
|
time.Sleep(toSleep)
|
|
continue
|
|
default:
|
|
return errors.Annotatef(berrors.ErrStorageUnknown, "unknown copy status: %v", cStat)
|
|
}
|
|
}
|
|
}
|
|
|
|
// progress parses the format "bytes copied/bytes total".
|
|
// REF: https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob-properties?tabs=microsoft-entra-id#response-headers
|
|
func progress(s string) (finished, total int, err error) {
|
|
n, err := fmt.Sscanf(s, "%d/%d", &finished, &total)
|
|
if n != 2 {
|
|
err = errors.Errorf("failed to parse progress %s", s)
|
|
}
|
|
return
|
|
}
|
|
|
|
// MarkStrongConsistency implements Storage.
|
|
func (*AzureBlobStorage) MarkStrongConsistency() {
|
|
// See https://github.com/MicrosoftDocs/azure-docs/issues/105331#issuecomment-1450252384
|
|
}
|
|
|
|
func newAzureBlobStorage(ctx context.Context, options *backuppb.AzureBlobStorage, opts *storeapi.Options) (*AzureBlobStorage, error) {
|
|
clientBuilder, err := getAzureServiceClientBuilder(options, opts)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
return newAzureBlobStorageWithClientBuilder(ctx, options, clientBuilder)
|
|
}
|
|
|
|
func newAzureBlobStorageWithClientBuilder(ctx context.Context, options *backuppb.AzureBlobStorage, clientBuilder ClientBuilder) (*AzureBlobStorage, error) {
|
|
serviceClient, err := clientBuilder.GetServiceClient()
|
|
if err != nil {
|
|
return nil, errors.Annotate(err, "Failed to create azure service client")
|
|
}
|
|
|
|
containerClient := serviceClient.ServiceClient().NewContainerClient(options.Bucket)
|
|
if _, err = containerClient.GetProperties(ctx, &container.GetPropertiesOptions{}); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
if (len(options.EncryptionScope) > 0 || options.EncryptionKey != nil) && len(options.StorageClass) > 0 {
|
|
return nil, errors.Errorf("Set Blob Tier cannot be used with customer-provided key/scope. " +
|
|
"Please don't supply the access-tier when use encryption-key or encryption-scope.")
|
|
} else if len(options.EncryptionScope) > 0 && options.EncryptionKey != nil {
|
|
return nil, errors.Errorf("Undefined input: There are both encryption-scope and customer provided key. " +
|
|
"Please select only one to encrypt blobs.")
|
|
}
|
|
|
|
var cpkScope *blob.CPKScopeInfo = nil
|
|
if len(options.EncryptionScope) > 0 {
|
|
cpkScope = &blob.CPKScopeInfo{
|
|
EncryptionScope: &options.EncryptionScope,
|
|
}
|
|
}
|
|
|
|
var cpkInfo *blob.CPKInfo = nil
|
|
if options.EncryptionKey != nil {
|
|
defaultAlgorithm := blob.EncryptionAlgorithmTypeAES256
|
|
cpkInfo = &blob.CPKInfo{
|
|
EncryptionAlgorithm: &defaultAlgorithm,
|
|
EncryptionKey: &options.EncryptionKey.EncryptionKey,
|
|
EncryptionKeySHA256: &options.EncryptionKey.EncryptionKeySha256,
|
|
}
|
|
}
|
|
|
|
// parse storage access-tier
|
|
var accessTier blob.AccessTier
|
|
switch options.StorageClass {
|
|
case "Archive", "archive":
|
|
accessTier = blob.AccessTierArchive
|
|
case "Cool", "cool":
|
|
accessTier = blob.AccessTierCool
|
|
case "Hot", "hot":
|
|
accessTier = blob.AccessTierHot
|
|
default:
|
|
accessTier = blob.AccessTier(options.StorageClass)
|
|
}
|
|
|
|
log.Debug("select accessTier", zap.String("accessTier", string(accessTier)))
|
|
|
|
return &AzureBlobStorage{
|
|
options,
|
|
containerClient,
|
|
accessTier,
|
|
cpkScope,
|
|
cpkInfo,
|
|
clientBuilder.GetAccountName(),
|
|
clientBuilder.GetServiceURL(),
|
|
}, nil
|
|
}
|
|
|
|
func (s *AzureBlobStorage) withPrefix(name string) string {
|
|
return path.Join(s.options.Prefix, name)
|
|
}
|
|
|
|
// WriteFile writes a file to Azure Blob Storage.
|
|
func (s *AzureBlobStorage) WriteFile(ctx context.Context, name string, data []byte) error {
|
|
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
|
|
// the encryption scope/key and the access tier can not be both in the HTTP headers
|
|
options := &blockblob.UploadBufferOptions{
|
|
CPKScopeInfo: s.cpkScope,
|
|
CPKInfo: s.cpkInfo,
|
|
}
|
|
|
|
if len(s.accessTier) > 0 {
|
|
options.AccessTier = &s.accessTier
|
|
}
|
|
_, err := client.UploadBuffer(ctx, data, options)
|
|
if err != nil {
|
|
return errors.Annotatef(err, "Failed to write azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReadFile reads a file from Azure Blob Storage.
|
|
func (s *AzureBlobStorage) ReadFile(ctx context.Context, name string) ([]byte, error) {
|
|
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
|
|
resp, err := client.DownloadStream(ctx, &blob.DownloadStreamOptions{
|
|
CPKInfo: s.cpkInfo,
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Annotatef(err, "Failed to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name))
|
|
}
|
|
body := resp.NewRetryReader(ctx, &blob.RetryReaderOptions{
|
|
MaxRetries: azblobRetryTimes,
|
|
})
|
|
data, err := io.ReadAll(body)
|
|
if err != nil {
|
|
return nil, errors.Annotatef(err, "Failed to read azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name))
|
|
}
|
|
return data, body.Close()
|
|
}
|
|
|
|
// FileExists checks if a file exists in Azure Blob Storage.
|
|
func (s *AzureBlobStorage) FileExists(ctx context.Context, name string) (bool, error) {
|
|
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
|
|
_, err := client.GetProperties(ctx, nil)
|
|
if err != nil {
|
|
if bloberror.HasCode(err, bloberror.BlobNotFound) {
|
|
return false, nil
|
|
}
|
|
return false, errors.Trace(err)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// DeleteFile deletes the file with the given name.
|
|
func (s *AzureBlobStorage) DeleteFile(ctx context.Context, name string) error {
|
|
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
|
|
_, err := client.Delete(ctx, nil)
|
|
if err != nil {
|
|
return errors.Annotatef(err, "Failed to delete azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteFiles deletes the files with the given names.
|
|
func (s *AzureBlobStorage) DeleteFiles(ctx context.Context, names []string) error {
|
|
for _, name := range names {
|
|
err := s.DeleteFile(ctx, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Open implements the StorageReader interface.
|
|
func (s *AzureBlobStorage) Open(ctx context.Context, name string, o *storeapi.ReaderOption) (objectio.Reader, error) {
|
|
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
|
|
resp, err := client.GetProperties(ctx, nil)
|
|
if err != nil {
|
|
return nil, errors.Annotate(err, "Failed to get properties from the azure blob")
|
|
}
|
|
|
|
pos := int64(0)
|
|
totalSize := *resp.ContentLength
|
|
endPos := totalSize
|
|
if o != nil {
|
|
if o.StartOffset != nil {
|
|
pos = *o.StartOffset
|
|
}
|
|
if o.EndOffset != nil {
|
|
endPos = *o.EndOffset
|
|
}
|
|
}
|
|
|
|
return &azblobObjectReader{
|
|
blobClient: client,
|
|
|
|
pos: pos,
|
|
endPos: endPos,
|
|
totalSize: totalSize,
|
|
|
|
ctx: ctx,
|
|
|
|
cpkInfo: s.cpkInfo,
|
|
}, nil
|
|
}
|
|
|
|
// WalkDir implements the StorageReader interface.
|
|
func (s *AzureBlobStorage) WalkDir(ctx context.Context, opt *storeapi.WalkOption, fn func(path string, size int64) error) error {
|
|
if opt == nil {
|
|
opt = &storeapi.WalkOption{}
|
|
}
|
|
prefix := path.Join(s.options.Prefix, opt.SubDir)
|
|
if len(prefix) > 0 && !strings.HasSuffix(prefix, "/") {
|
|
prefix += "/"
|
|
}
|
|
if len(opt.ObjPrefix) != 0 {
|
|
prefix += opt.ObjPrefix
|
|
}
|
|
|
|
pager := s.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
|
|
Prefix: &prefix,
|
|
})
|
|
for pager.More() {
|
|
page, err := pager.NextPage(ctx)
|
|
if err != nil {
|
|
return errors.Annotatef(err, "Failed to list azure blobs, bucket(container)='%s'", s.options.Bucket)
|
|
}
|
|
|
|
for _, blob := range page.Segment.BlobItems {
|
|
// when walk on specify directory, the result include storage.Prefix,
|
|
// which can not be reuse in other API(Open/Read) directly.
|
|
// so we use TrimPrefix to filter Prefix for next Open/Read.
|
|
path := strings.TrimPrefix((*blob.Name), s.options.Prefix)
|
|
// trim the prefix '/' to ensure that the path returned is consistent with the local storage
|
|
path = strings.TrimPrefix(path, "/")
|
|
if err := fn(path, *blob.Properties.ContentLength); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// URI implements the StorageReader interface.
|
|
func (s *AzureBlobStorage) URI() string {
|
|
return "azure://" + s.options.Bucket + "/" + s.options.Prefix
|
|
}
|
|
|
|
const azblobChunkSize = 64 * 1024 * 1024
|
|
|
|
// Create implements the StorageWriter interface.
|
|
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *storeapi.WriterOption) (objectio.Writer, error) {
|
|
client := s.containerClient.NewBlockBlobClient(s.withPrefix(name))
|
|
uploader := &azblobUploader{
|
|
blobClient: client,
|
|
|
|
blockIDList: make([]string, 0, 4),
|
|
|
|
accessTier: s.accessTier,
|
|
|
|
cpkScope: s.cpkScope,
|
|
cpkInfo: s.cpkInfo,
|
|
}
|
|
|
|
uploaderWriter := objectio.NewBufferedWriter(uploader, azblobChunkSize, compressedio.NoCompression, nil)
|
|
return uploaderWriter, nil
|
|
}
|
|
|
|
// Rename implements the StorageWriter interface.
|
|
func (s *AzureBlobStorage) Rename(ctx context.Context, oldFileName, newFileName string) error {
|
|
data, err := s.ReadFile(ctx, oldFileName)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
err = s.WriteFile(ctx, newFileName, data)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return s.DeleteFile(ctx, oldFileName)
|
|
}
|
|
|
|
// Close implements the Storage interface.
|
|
func (*AzureBlobStorage) Close() {}
|
|
|
|
type azblobObjectReader struct {
|
|
blobClient *blockblob.Client
|
|
|
|
pos int64
|
|
endPos int64
|
|
totalSize int64
|
|
|
|
ctx context.Context
|
|
|
|
cpkInfo *blob.CPKInfo
|
|
// opened lazily
|
|
reader io.ReadCloser
|
|
}
|
|
|
|
// Read implement the io.Reader interface.
|
|
func (r *azblobObjectReader) Read(p []byte) (n int, err error) {
|
|
maxCnt := min(r.endPos-r.pos, int64(len(p)))
|
|
if maxCnt == 0 {
|
|
return 0, io.EOF
|
|
}
|
|
if r.reader == nil {
|
|
if err2 := r.reopenReader(); err2 != nil {
|
|
return 0, err2
|
|
}
|
|
}
|
|
buf := p[:maxCnt]
|
|
n, err = r.reader.Read(buf)
|
|
if err != nil && err != io.EOF {
|
|
return 0, errors.Annotatef(err, "Failed to read data from azure blob response, data info: pos='%d', count='%d'", r.pos, maxCnt)
|
|
}
|
|
r.pos += int64(n)
|
|
return n, nil
|
|
}
|
|
|
|
// Close implement the io.Closer interface.
|
|
func (r *azblobObjectReader) Close() error {
|
|
if r.reader != nil {
|
|
err := errors.Trace(r.reader.Close())
|
|
r.reader = nil
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *azblobObjectReader) Seek(offset int64, whence int) (int64, error) {
|
|
var realOffset int64
|
|
switch whence {
|
|
case io.SeekStart:
|
|
if offset < 0 {
|
|
return 0, errors.Annotatef(berrors.ErrInvalidArgument, "Seek: offset '%v' out of range.", offset)
|
|
}
|
|
realOffset = offset
|
|
case io.SeekCurrent:
|
|
realOffset = r.pos + offset
|
|
if r.pos < 0 && realOffset >= 0 {
|
|
return 0, errors.Annotatef(berrors.ErrInvalidArgument, "Seek: offset '%v' out of range. current pos is '%v'.", offset, r.pos)
|
|
}
|
|
case io.SeekEnd:
|
|
if offset > 0 {
|
|
return 0, errors.Annotatef(berrors.ErrInvalidArgument, "Seek: offset '%v' should be negative.", offset)
|
|
}
|
|
realOffset = offset + r.totalSize
|
|
default:
|
|
return 0, errors.Annotatef(berrors.ErrStorageUnknown, "Seek: invalid whence '%d'", whence)
|
|
}
|
|
|
|
if realOffset < 0 || realOffset > r.totalSize {
|
|
return 0, errors.Annotatef(berrors.ErrInvalidArgument, "Seek: offset is %d, but length of content is only %d", realOffset, r.totalSize)
|
|
}
|
|
if realOffset == r.pos {
|
|
return r.pos, nil
|
|
}
|
|
r.pos = realOffset
|
|
// azblob reader can only read forward, so we need to reopen the reader
|
|
if err := r.reopenReader(); err != nil {
|
|
return 0, err
|
|
}
|
|
return r.pos, nil
|
|
}
|
|
|
|
func (r *azblobObjectReader) reopenReader() error {
|
|
if r.reader != nil {
|
|
err := errors.Trace(r.reader.Close())
|
|
if err != nil {
|
|
log.Warn("failed to close azblob reader", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
if r.pos == r.totalSize {
|
|
r.reader = io.NopCloser(bytes.NewReader(nil))
|
|
return nil
|
|
}
|
|
|
|
resp, err := r.blobClient.DownloadStream(r.ctx, &blob.DownloadStreamOptions{
|
|
Range: blob.HTTPRange{
|
|
Offset: r.pos,
|
|
},
|
|
CPKInfo: r.cpkInfo,
|
|
})
|
|
if err != nil {
|
|
return errors.Annotatef(err, "Failed to read data from azure blob, data info: pos='%d'", r.pos)
|
|
}
|
|
body := resp.NewRetryReader(r.ctx, &blob.RetryReaderOptions{
|
|
MaxRetries: azblobRetryTimes,
|
|
})
|
|
r.reader = body
|
|
return nil
|
|
}
|
|
|
|
func (r *azblobObjectReader) GetFileSize() (int64, error) {
|
|
return r.totalSize, nil
|
|
}
|
|
|
|
type nopCloser struct {
|
|
io.ReadSeeker
|
|
}
|
|
|
|
func newNopCloser(r io.ReadSeeker) nopCloser {
|
|
return nopCloser{r}
|
|
}
|
|
|
|
func (nopCloser) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type azblobUploader struct {
|
|
blobClient *blockblob.Client
|
|
|
|
blockIDList []string
|
|
|
|
accessTier blob.AccessTier
|
|
|
|
cpkScope *blob.CPKScopeInfo
|
|
cpkInfo *blob.CPKInfo
|
|
}
|
|
|
|
func (u *azblobUploader) Write(ctx context.Context, data []byte) (int, error) {
|
|
generatedUUID, err := uuid.NewUUID()
|
|
if err != nil {
|
|
return 0, errors.Annotate(err, "Fail to generate uuid")
|
|
}
|
|
blockID := base64.StdEncoding.EncodeToString([]byte(generatedUUID.String()))
|
|
|
|
_, err = u.blobClient.StageBlock(ctx, blockID, newNopCloser(bytes.NewReader(data)), &blockblob.StageBlockOptions{
|
|
CPKScopeInfo: u.cpkScope,
|
|
CPKInfo: u.cpkInfo,
|
|
})
|
|
if err != nil {
|
|
return 0, errors.Annotate(err, "Failed to upload block to azure blob")
|
|
}
|
|
u.blockIDList = append(u.blockIDList, blockID)
|
|
|
|
return len(data), nil
|
|
}
|
|
|
|
func (u *azblobUploader) Close(ctx context.Context) error {
|
|
// the encryption scope and the access tier can not be both in the HTTP headers
|
|
options := &blockblob.CommitBlockListOptions{
|
|
CPKScopeInfo: u.cpkScope,
|
|
CPKInfo: u.cpkInfo,
|
|
}
|
|
|
|
if len(u.accessTier) > 0 {
|
|
options.Tier = &u.accessTier
|
|
}
|
|
_, err := u.blobClient.CommitBlockList(ctx, u.blockIDList, options)
|
|
return errors.Trace(err)
|
|
}
|