// Copyright 2026 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package objstore import ( "context" "encoding/json" "fmt" "io" "os" "sync" "github.com/pingcap/errors" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/pkg/objstore/objectio" "github.com/pingcap/tidb/pkg/objstore/storeapi" "go.uber.org/multierr" ) // Effect is an side effect that happens in the batch storage. type Effect any // EffPut is the side effect of a call to `WriteFile`. type EffPut struct { File string `json:"file"` Content []byte `json:"content"` } // EffDeleteFiles is the side effect of a call to `DeleteFiles`. type EffDeleteFiles struct { Files []string `json:"files"` } // EffDeleteFile is the side effect of a call to `DeleteFile`. type EffDeleteFile string // EffRename is the side effect of a call to `Rename`. type EffRename struct { From string `json:"from"` To string `json:"to"` } // JSONEffects converts a slices of effects into json. // The json will be a tagged union: `{"type": $go_type_name, "effect": $effect}` func JSONEffects(es []Effect, output io.Writer) error { type Typed struct { Type string `json:"type"` Eff Effect `json:"effect"` } out := make([]Typed, 0, len(es)) for _, eff := range es { out = append(out, Typed{ Type: fmt.Sprintf("%T", eff), Eff: eff, }) } return json.NewEncoder(output).Encode(out) } // SaveJSONEffectsToTmp save to tmp. func SaveJSONEffectsToTmp(es []Effect) (string, error) { // Save the json to a subdir so user can redirect the output path by symlinking... tmp, err := os.CreateTemp(os.TempDir(), "br-effects-*.json") if err != nil { return "", err } if err := JSONEffects(es, tmp); err != nil { return "", err } return tmp.Name(), nil } // Batched is a wrapper of an external storage that suspends all write operations ("effects"). // If `Close()` without calling `Commit()`, nothing will happen in the underlying external storage. // In that case, we have done a "dry run". // // You may use `ReadOnlyEffects()` to get the history of the effects. // But don't modify the returned slice! // // You may use `Commit()` to execute all suspended effects. type Batched struct { storeapi.Storage effectsMu sync.Mutex // It will be one of: // EffPut, EffDeleteFiles, EffDeleteFile, EffRename effects []Effect } // Batch wraps an external storage instance to a batched version. func Batch(s storeapi.Storage) *Batched { return &Batched{Storage: s} } // ReadOnlyEffects Fetch all effects from the batched storage. // // **The returned slice should not be modified.** func (d *Batched) ReadOnlyEffects() []Effect { d.effectsMu.Lock() defer d.effectsMu.Unlock() return d.effects } // CleanEffects cleans all suspended effects. func (d *Batched) CleanEffects() { d.effectsMu.Lock() defer d.effectsMu.Unlock() d.effects = nil } // DeleteFiles implements the Storage interface. func (d *Batched) DeleteFiles(ctx context.Context, names []string) error { d.effectsMu.Lock() defer d.effectsMu.Unlock() d.effects = append(d.effects, EffDeleteFiles{Files: names}) return nil } // DeleteFile implements the Storage interface. func (d *Batched) DeleteFile(ctx context.Context, name string) error { d.effectsMu.Lock() defer d.effectsMu.Unlock() d.effects = append(d.effects, EffDeleteFile(name)) return nil } // WriteFile implements the Storage interface. func (d *Batched) WriteFile(ctx context.Context, name string, data []byte) error { d.effectsMu.Lock() defer d.effectsMu.Unlock() d.effects = append(d.effects, EffPut{File: name, Content: data}) return nil } // Rename implements the Storage interface. func (d *Batched) Rename(ctx context.Context, oldName, newName string) error { d.effectsMu.Lock() defer d.effectsMu.Unlock() d.effects = append(d.effects, EffRename{From: oldName, To: newName}) return nil } // Create implements the Storage interface. func (d *Batched) Create(ctx context.Context, path string, option *storeapi.WriterOption) (objectio.Writer, error) { return nil, errors.Annotatef(berrors.ErrStorageUnknown, "ExternalStorage.Create isn't allowed in batch mode for now.") } // Commit performs all effects recorded so long in the REAL external storage. // This will cleanup all of the suspended effects. func (d *Batched) Commit(ctx context.Context) error { d.effectsMu.Lock() defer d.effectsMu.Unlock() var err error for _, eff := range d.effects { switch e := eff.(type) { case EffPut: err = multierr.Combine(d.Storage.WriteFile(ctx, e.File, e.Content), err) case EffDeleteFiles: err = multierr.Combine(d.Storage.DeleteFiles(ctx, e.Files), err) case EffDeleteFile: err = multierr.Combine(d.Storage.DeleteFile(ctx, string(e)), err) case EffRename: err = multierr.Combine(d.Storage.Rename(ctx, e.From, e.To), err) default: return errors.Annotatef(berrors.ErrStorageUnknown, "Unknown effect type %T", eff) } } d.effects = nil return nil }