ebs br: control the snapshots batch size for fsr enable/disable (#48506)
close pingcap/tidb#48505
This commit is contained in:
@ -27,6 +27,7 @@ import (
|
||||
const (
|
||||
pollingPendingSnapshotInterval = 30 * time.Second
|
||||
errCodeTooManyPendingSnapshots = "PendingSnapshotLimitExceeded"
|
||||
FsrApiSnapshotsThreshold = 10
|
||||
)
|
||||
|
||||
type EC2Session struct {
|
||||
@ -293,24 +294,32 @@ func (e *EC2Session) EnableDataFSR(meta *config.EBSBasedBRMeta, targetAZ string)
|
||||
|
||||
for availableZone := range snapshotsIDsMap {
|
||||
targetAZ := availableZone
|
||||
eg.Go(func() error {
|
||||
log.Info("enable fsr for snapshots", zap.String("available zone", targetAZ))
|
||||
resp, err := e.ec2.EnableFastSnapshotRestores(&ec2.EnableFastSnapshotRestoresInput{
|
||||
AvailabilityZones: []*string{&targetAZ},
|
||||
SourceSnapshotIds: snapshotsIDsMap[targetAZ],
|
||||
// We have to control the batch size to avoid the error of "parameter SourceSnapshotIds must be less than or equal to 10"
|
||||
for i := 0; i < len(snapshotsIDsMap[targetAZ]); i += FsrApiSnapshotsThreshold {
|
||||
start := i
|
||||
end := i + FsrApiSnapshotsThreshold
|
||||
if end > len(snapshotsIDsMap[targetAZ]) {
|
||||
end = len(snapshotsIDsMap[targetAZ])
|
||||
}
|
||||
eg.Go(func() error {
|
||||
log.Info("enable fsr for snapshots", zap.String("available zone", targetAZ), zap.Any("snapshots", snapshotsIDsMap[targetAZ][start:end]))
|
||||
resp, err := e.ec2.EnableFastSnapshotRestores(&ec2.EnableFastSnapshotRestoresInput{
|
||||
AvailabilityZones: []*string{&targetAZ},
|
||||
SourceSnapshotIds: snapshotsIDsMap[targetAZ][start:end],
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if len(resp.Unsuccessful) > 0 {
|
||||
log.Warn("not all snapshots enabled FSR")
|
||||
return errors.Errorf("Some snapshot fails to enable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
|
||||
}
|
||||
|
||||
return e.waitDataFSREnabled(snapshotsIDsMap[targetAZ][start:end], targetAZ)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if len(resp.Unsuccessful) > 0 {
|
||||
log.Warn("not all snapshots enabled FSR")
|
||||
return errors.Errorf("Some snapshot fails to enable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
|
||||
}
|
||||
|
||||
return e.waitDataFSREnabled(snapshotsIDsMap[targetAZ], targetAZ)
|
||||
})
|
||||
}
|
||||
}
|
||||
return snapshotsIDsMap, eg.Wait()
|
||||
}
|
||||
@ -328,7 +337,7 @@ func (e *EC2Session) waitDataFSREnabled(snapShotIDs []*string, targetAZ string)
|
||||
log.Info("starts check fsr pending snapshots", zap.Any("snapshots", pendingSnapshots), zap.String("available zone", targetAZ))
|
||||
for {
|
||||
if len(pendingSnapshots) == 0 {
|
||||
log.Info("all snapshots fsr enablement is finished", zap.String("available zone", targetAZ))
|
||||
log.Info("all snapshots in current batch fsr enablement is finished", zap.String("available zone", targetAZ), zap.Any("snapshots", snapShotIDs))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -379,25 +388,33 @@ func (e *EC2Session) DisableDataFSR(snapshotsIDsMap map[string][]*string) error
|
||||
|
||||
for availableZone := range snapshotsIDsMap {
|
||||
targetAZ := availableZone
|
||||
eg.Go(func() error {
|
||||
resp, err := e.ec2.DisableFastSnapshotRestores(&ec2.DisableFastSnapshotRestoresInput{
|
||||
AvailabilityZones: []*string{&targetAZ},
|
||||
SourceSnapshotIds: snapshotsIDsMap[targetAZ],
|
||||
// We have to control the batch size to avoid the error of "parameter SourceSnapshotIds must be less than or equal to 10"
|
||||
for i := 0; i < len(snapshotsIDsMap[targetAZ]); i += FsrApiSnapshotsThreshold {
|
||||
start := i
|
||||
end := i + FsrApiSnapshotsThreshold
|
||||
if end > len(snapshotsIDsMap[targetAZ]) {
|
||||
end = len(snapshotsIDsMap[targetAZ])
|
||||
}
|
||||
eg.Go(func() error {
|
||||
resp, err := e.ec2.DisableFastSnapshotRestores(&ec2.DisableFastSnapshotRestoresInput{
|
||||
AvailabilityZones: []*string{&targetAZ},
|
||||
SourceSnapshotIds: snapshotsIDsMap[targetAZ][start:end],
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if len(resp.Unsuccessful) > 0 {
|
||||
log.Warn("not all snapshots disabled FSR", zap.String("available zone", targetAZ))
|
||||
return errors.Errorf("Some snapshot fails to disable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
|
||||
}
|
||||
|
||||
log.Info("Disable FSR issued", zap.String("available zone", targetAZ), zap.Any("snapshots", snapshotsIDsMap[targetAZ][start:end]))
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if len(resp.Unsuccessful) > 0 {
|
||||
log.Warn("not all snapshots disabled FSR", zap.String("available zone", targetAZ))
|
||||
return errors.Errorf("Some snapshot fails to disable FSR for available zone %s, such as %s, error code is %v", targetAZ, *resp.Unsuccessful[0].SnapshotId, resp.Unsuccessful[0].FastSnapshotRestoreStateErrors)
|
||||
}
|
||||
|
||||
log.Info("Disable FSR issued", zap.String("available zone", targetAZ))
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
@ -241,7 +241,9 @@ func (h *restoreEBSMetaHelper) restoreVolumes(progress glue.Progress) (map[strin
|
||||
|
||||
if h.cfg.UseFSR {
|
||||
err = ec2Session.DisableDataFSR(snapshotsIDsMap)
|
||||
log.Error("disable fsr failed", zap.Error(err))
|
||||
if err != nil {
|
||||
log.Error("disable fsr failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user