From 65e233edf37129cbcb04c7bb2cd779ee4c910ff0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Thu, 28 Nov 2024 20:17:32 +0800 Subject: [PATCH] br/stream: added `--delete-compactions` to log truncate (#56761) close pingcap/tidb#56758 --- br/pkg/glue/BUILD.bazel | 2 + br/pkg/glue/console_glue.go | 34 +++- br/pkg/glue/progressing.go | 19 ++- br/pkg/storage/local.go | 2 +- br/pkg/stream/BUILD.bazel | 3 +- br/pkg/stream/stream_metas.go | 40 +++-- br/pkg/stream/stream_metas_test.go | 242 ++++++++++++----------------- br/pkg/task/stream.go | 55 +++++-- 8 files changed, 222 insertions(+), 175 deletions(-) diff --git a/br/pkg/glue/BUILD.bazel b/br/pkg/glue/BUILD.bazel index 1ab377fc60..819414ff70 100644 --- a/br/pkg/glue/BUILD.bazel +++ b/br/pkg/glue/BUILD.bazel @@ -18,10 +18,12 @@ go_library( "//pkg/parser/model", "//pkg/sessionctx", "@com_github_fatih_color//:color", + "@com_github_pingcap_log//:log", "@com_github_tikv_pd_client//:client", "@com_github_vbauerster_mpb_v7//:mpb", "@com_github_vbauerster_mpb_v7//decor", "@org_golang_x_term//:term", + "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/glue/console_glue.go b/br/pkg/glue/console_glue.go index a780d67dd2..fcd1abd944 100644 --- a/br/pkg/glue/console_glue.go +++ b/br/pkg/glue/console_glue.go @@ -11,6 +11,8 @@ import ( "time" "github.com/fatih/color" + "github.com/pingcap/log" + "go.uber.org/zap" "golang.org/x/term" ) @@ -32,8 +34,13 @@ type ExtraField func() [2]string // WithTimeCost adds the task information of time costing for `ShowTask`. func WithTimeCost() ExtraField { start := time.Now() + var cached time.Duration + return func() [2]string { - return [2]string{"take", time.Since(start).Round(time.Millisecond).String()} + if cached == 0 { + cached = time.Since(start).Round(time.Millisecond) + } + return [2]string{"take", cached.String()} } } @@ -65,14 +72,10 @@ func printFinalMessage(extraFields []ExtraField) func() string { // ShowTask prints a task start information, and mark as finished when the returned function called. // This is for TUI presenting. func (ops ConsoleOperations) ShowTask(message string, extraFields ...ExtraField) func() { - ops.Print(message) + bar := ops.StartProgressBar(message, OnlyOneTask, extraFields...) return func() { - fields := make([]string, 0, len(extraFields)) - for _, fieldFunc := range extraFields { - field := fieldFunc() - fields = append(fields, fmt.Sprintf("%s = %s", field[0], color.New(color.Bold).Sprint(field[1]))) - } - ops.Printf("%s { %s }\n", color.HiGreenString("DONE"), strings.Join(fields, ", ")) + bar.Inc() + bar.Close() } } @@ -84,6 +87,21 @@ func (ops ConsoleOperations) RootFrame() Frame { } } +func PrintList[T any](ops ConsoleOperations, title string, items []T, maxItemsDisplay int) { + log.Info("Print list: all items.", zap.String("title", title), zap.Any("items", items)) + ops.Println(title) + toPrint := items + if maxItemsDisplay > 0 { + toPrint = items[:min(len(items), maxItemsDisplay)] + } + for _, item := range toPrint { + ops.Printf("- %v\n", item) + } + if len(items) > len(toPrint) { + ops.Printf("... and %d more ...", len(items)-len(toPrint)) + } +} + // PromptBool prompts a boolean from the user. func (ops ConsoleOperations) PromptBool(p string) bool { if !ops.IsInteractive() { diff --git a/br/pkg/glue/progressing.go b/br/pkg/glue/progressing.go index 3182e46ba5..fd1616f035 100644 --- a/br/pkg/glue/progressing.go +++ b/br/pkg/glue/progressing.go @@ -18,7 +18,15 @@ import ( const OnlyOneTask int = -1 -var spinnerText []string = []string{".", "..", "..."} +func coloredSpinner(s []string) []string { + c := color.New(color.Bold, color.FgGreen) + for i := range s { + s[i] = c.Sprint(s[i]) + } + return s +} + +var spinnerText []string = coloredSpinner([]string{"/", "-", "\\", "|"}) type pbProgress struct { bar *mpb.Bar @@ -44,6 +52,13 @@ func (p pbProgress) GetCurrent() int64 { // Close marks the progress as 100% complete and that Inc() can no longer be // called. func (p pbProgress) Close() { + // This wait shouldn't block. + // We are just waiting the progress bar refresh to the finished state. + defer func() { + p.bar.Wait() + p.progress.Wait() + }() + if p.bar.Completed() || p.bar.Aborted() { return } @@ -162,7 +177,7 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ... } var ( - spinnerDoneText = fmt.Sprintf("... %s", color.GreenString("DONE")) + spinnerDoneText = fmt.Sprintf(":: %s", color.GreenString("DONE")) ) func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar { diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index b825c79e90..24f5301090 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -52,7 +52,7 @@ func (l *LocalStorage) DeleteFile(_ context.Context, name string) error { os.IsNotExist(err) { return nil } - return err + return errors.Annotatef(err, "failed to delete file %v", name) } // DeleteFiles deletes the files. diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index e537b651c3..225d50cb5a 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -65,7 +65,7 @@ go_test( ], embed = [":stream"], flaky = True, - shard_count = 46, + shard_count = 48, deps = [ "//br/pkg/storage", "//br/pkg/streamhelper", @@ -88,6 +88,7 @@ go_test( "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@org_golang_x_exp//maps", + "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/stream/stream_metas.go b/br/pkg/stream/stream_metas.go index bfbebafebd..6801035ce7 100644 --- a/br/pkg/stream/stream_metas.go +++ b/br/pkg/stream/stream_metas.go @@ -794,8 +794,8 @@ func (m MigrationExt) MergeAndMigrateTo( err = m.writeBase(ctx, newBase) if err != nil { result.Warnings = append( - result.MigratedTo.Warnings, - errors.Annotatef(err, "failed to save the merged new base, nothing will happen"), + result.Warnings, + errors.Annotate(err, "failed to save the merged new base"), ) // Put the new BASE here anyway. The caller may want this. result.NewBase = newBase @@ -817,9 +817,9 @@ func (m MigrationExt) MergeAndMigrateTo( result.MigratedTo = m.MigrateTo(ctx, newBase, MTMaybeSkipTruncateLog(!config.alwaysRunTruncate && canSkipTruncate)) // Put the final BASE. - err = m.writeBase(ctx, result.MigratedTo.NewBase) + err = m.writeBase(ctx, result.NewBase) if err != nil { - result.Warnings = append(result.MigratedTo.Warnings, errors.Annotatef(err, "failed to save the new base")) + result.Warnings = append(result.Warnings, errors.Annotatef(err, "failed to save the new base")) } return } @@ -853,8 +853,6 @@ func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ... result := MigratedTo{ NewBase: NewMigration(), } - // Fills: EditMeta for new Base. - m.doMetaEdits(ctx, mig, &result) // Fills: TruncatedTo, Compactions, DesctructPrefix. if !opt.skipTruncateLog { m.doTruncating(ctx, mig, &result) @@ -864,6 +862,10 @@ func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ... result.NewBase.TruncatedTo = mig.TruncatedTo } + // We do skip truncate log first, so metas removed by truncating can be removed in this execution. + // Fills: EditMeta for new Base. + m.doMetaEdits(ctx, mig, &result) + return result } @@ -880,6 +882,7 @@ func (m MigrationExt) writeBase(ctx context.Context, mig *pb.Migration) error { } // doMetaEdits applies the modification to the meta files in the storage. +// This will delete data files firstly. Make sure the new BASE was persisted before calling this. func (m MigrationExt) doMetaEdits(ctx context.Context, mig *pb.Migration, out *MigratedTo) { m.Hooks.StartHandlingMetaEdits(mig.EditMeta) @@ -887,14 +890,26 @@ func (m MigrationExt) doMetaEdits(ctx context.Context, mig *pb.Migration, out *M if isEmptyEdition(medit) { return } + + // Sometimes, the meta file will be deleted by truncating. + // We clean up those meta edits. + // NOTE: can we unify the deletion of truncating and meta editing? + // Say, add a "normalize" phase that load all files to be deleted to the migration. + // The problem here is a huge migration may be created in memory then leading to OOM. + exists, errChkExist := m.s.FileExists(ctx, medit.Path) + if errChkExist == nil && !exists { + log.Warn("The meta file doesn't exist, skipping the edit", zap.String("path", medit.Path)) + return + } + + // Firstly delete data so they won't leak when BR crashes. + m.cleanUpFor(ctx, medit, out) err := m.applyMetaEdit(ctx, medit) if err != nil { out.NewBase.EditMeta = append(out.NewBase.EditMeta, medit) out.Warnings = append(out.Warnings, errors.Annotatef(err, "failed to apply meta edit %s to meta file", medit.Path)) return } - - m.cleanUpFor(ctx, medit, out) } defer m.Hooks.HandingMetaEditDone() @@ -936,6 +951,13 @@ func (m MigrationExt) cleanUpFor(ctx context.Context, medit *pb.MetaEdit, out *M } } + if len(out.Warnings) > 0 { + log.Warn( + "Failed to clean up for meta edit.", + zap.String("meta-edit", medit.Path), + zap.Errors("warnings", out.Warnings), + ) + } if !isEmptyEdition(newMetaEdit) { out.NewBase.EditMeta = append(out.NewBase.EditMeta, newMetaEdit) } @@ -974,7 +996,6 @@ func (m MigrationExt) applyMetaEditTo(ctx context.Context, medit *pb.MetaEdit, m }) metadata.FileGroups = slices.DeleteFunc(metadata.FileGroups, func(dfg *pb.DataFileGroup) bool { del := slices.Contains(medit.DeletePhysicalFiles, dfg.Path) - fmt.Println(medit.Path, medit.DeletePhysicalFiles, dfg.Path, del) return del }) for _, group := range metadata.FileGroups { @@ -1143,6 +1164,7 @@ func (m MigrationExt) doTruncateLogs( // We have already written `truncated-to` to the storage hence // we don't need to worry that the user access files already deleted. aOut := new(MigratedTo) + aOut.NewBase = new(pb.Migration) m.cleanUpFor(ctx, me, aOut) updateResult(func(r *MigratedTo) { r.Warnings = append(r.Warnings, aOut.Warnings...) diff --git a/br/pkg/stream/stream_metas_test.go b/br/pkg/stream/stream_metas_test.go index b292a3f8a9..c0fcbbae62 100644 --- a/br/pkg/stream/stream_metas_test.go +++ b/br/pkg/stream/stream_metas_test.go @@ -3,6 +3,7 @@ package stream import ( + "bytes" "context" "fmt" "math" @@ -23,6 +24,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/util/intest" "github.com/stretchr/testify/require" + "go.uber.org/multierr" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -347,8 +349,6 @@ func TestTruncateSafepoint(t *testing.T) { } func TestTruncateSafepointForGCS(t *testing.T) { - t.SkipNow() - require.True(t, intest.InTest) ctx := context.Background() opts := fakestorage.Options{ @@ -389,144 +389,6 @@ func TestTruncateSafepointForGCS(t *testing.T) { } } -func fakeMetaDatas(t *testing.T, helper *MetadataHelper, cf string) []*backuppb.Metadata { - ms := []*backuppb.Metadata{ - { - StoreId: 1, - MinTs: 1500, - MaxTs: 2000, - Files: []*backuppb.DataFileInfo{ - { - MinTs: 1500, - MaxTs: 2000, - Cf: cf, - MinBeginTsInDefaultCf: 800, - }, - }, - }, - { - StoreId: 2, - MinTs: 3000, - MaxTs: 4000, - Files: []*backuppb.DataFileInfo{ - { - MinTs: 3000, - MaxTs: 4000, - Cf: cf, - MinBeginTsInDefaultCf: 2000, - }, - }, - }, - { - StoreId: 3, - MinTs: 5100, - MaxTs: 6100, - Files: []*backuppb.DataFileInfo{ - { - MinTs: 5100, - MaxTs: 6100, - Cf: cf, - MinBeginTsInDefaultCf: 1800, - }, - }, - }, - } - - m2s := make([]*backuppb.Metadata, 0, len(ms)) - for _, m := range ms { - raw, err := m.Marshal() - require.NoError(t, err) - m2, err := helper.ParseToMetadata(raw) - require.NoError(t, err) - m2s = append(m2s, m2) - } - return m2s -} - -func fakeMetaDataV2s(t *testing.T, helper *MetadataHelper, cf string) []*backuppb.Metadata { - ms := []*backuppb.Metadata{ - { - StoreId: 1, - MinTs: 1500, - MaxTs: 6100, - FileGroups: []*backuppb.DataFileGroup{ - { - MinTs: 1500, - MaxTs: 6100, - DataFilesInfo: []*backuppb.DataFileInfo{ - { - MinTs: 1500, - MaxTs: 2000, - Cf: cf, - MinBeginTsInDefaultCf: 800, - }, - { - MinTs: 3000, - MaxTs: 4000, - Cf: cf, - MinBeginTsInDefaultCf: 2000, - }, - { - MinTs: 5200, - MaxTs: 6100, - Cf: cf, - MinBeginTsInDefaultCf: 1700, - }, - }, - }, - { - MinTs: 1000, - MaxTs: 5100, - DataFilesInfo: []*backuppb.DataFileInfo{ - { - MinTs: 9000, - MaxTs: 10000, - Cf: cf, - MinBeginTsInDefaultCf: 0, - }, - { - MinTs: 3000, - MaxTs: 4000, - Cf: cf, - MinBeginTsInDefaultCf: 2000, - }, - }, - }, - }, - MetaVersion: backuppb.MetaVersion_V2, - }, - { - StoreId: 2, - MinTs: 4100, - MaxTs: 5100, - FileGroups: []*backuppb.DataFileGroup{ - { - MinTs: 4100, - MaxTs: 5100, - DataFilesInfo: []*backuppb.DataFileInfo{ - { - MinTs: 4100, - MaxTs: 5100, - Cf: cf, - MinBeginTsInDefaultCf: 1800, - }, - }, - }, - }, - MetaVersion: backuppb.MetaVersion_V2, - }, - } - m2s := make([]*backuppb.Metadata, 0, len(ms)) - for _, m := range ms { - raw, err := m.Marshal() - require.NoError(t, err) - m2, err := helper.ParseToMetadata(raw) - require.NoError(t, err) - m2s = append(m2s, m2) - } - return m2s -} - func ff(minTS, maxTS uint64) *backuppb.DataFileGroup { return f(0, minTS, maxTS, DefaultCF, 0) } @@ -707,12 +569,26 @@ func pmt(s storage.ExternalStorage, path string, mt *backuppb.Metadata) { } } +func pmlt(s storage.ExternalStorage, path string, mt *backuppb.Metadata, logPath func(i int) string) { + for i, g := range mt.FileGroups { + g.Path = logPath(i) + maxLen := uint64(0) + for _, sg := range g.DataFilesInfo { + if sg.RangeOffset+sg.Length > maxLen { + maxLen = sg.RangeOffset + sg.Length + } + } + os.WriteFile(g.Path, bytes.Repeat([]byte("0"), int(maxLen)), 0o644) + } + pmt(s, path, mt) +} + func pmig(s storage.ExternalStorage, num uint64, mt *backuppb.Migration) string { numS := fmt.Sprintf("%08d", num) - if num == baseMigrationSN { - numS = baseMigrationName - } name := fmt.Sprintf("%s_%08X.mgrt", numS, hashMigration(mt)) + if num == baseMigrationSN { + name = baseMigrationName + } p := path.Join(migrationPrefix, name) data, err := mt.Marshal() @@ -2837,6 +2713,86 @@ func TestWithSimpleTruncate(t *testing.T) { } } +func TestAppendingMigs(t *testing.T) { + s := tmp(t) + ctx := context.Background() + mN := func(n uint64) string { return fmt.Sprintf("v1/backupmeta/%05d.meta", n) } + lN := func(mn int) func(n int) string { + return func(n int) string { return fmt.Sprintf("v1/%05d_%05d.log", mn, n) } + } + placeholder := func(pfx string) string { + path := path.Join(pfx, "monolith") + require.NoError(t, s.WriteFile(ctx, path, []byte("🪨"))) + return path + } + // asp appends a span to the data file info. + asp := func(b *backuppb.DataFileInfo, span *backuppb.Span) *backuppb.DataFileInfo { + b.RangeOffset = span.Offset + b.RangeLength = span.Length + return b + } + + pmlt(s, mN(1), mf(1, [][]*backuppb.DataFileInfo{ + { + asp(fi(10, 20, DefaultCF, 0), sp(0, 10)), + asp(fi(15, 30, WriteCF, 8), sp(10, 15)), + asp(fi(25, 35, WriteCF, 11), sp(25, 10)), + asp(fi(42, 65, WriteCF, 20), sp(35, 10)), + }, + }), lN(1)) + pmlt(s, mN(2), mf(2, [][]*backuppb.DataFileInfo{ + { + asp(fi(45, 64, WriteCF, 32), sp(0, 19)), + asp(fi(65, 70, WriteCF, 55), sp(19, 5)), + asp(fi(50, 60, DefaultCF, 0), sp(24, 10)), + asp(fi(80, 85, WriteCF, 72), sp(34, 5)), + }, + }), lN(2)) + est := MigerationExtension(s) + + cDir := func(n uint64) string { return fmt.Sprintf("%05d/output", n) } + aDir := func(n uint64) string { return fmt.Sprintf("%05d/metas", n) } + compaction := mCompaction(placeholder(cDir(1)), placeholder(aDir(1)), 15, 66) + del11 := mLogDel(mN(1), spans(lN(1)(0), 45, sp(0, 10), sp(10, 15))) + del12 := mLogDel(mN(1), spans(lN(1)(0), 45, sp(35, 10), sp(25, 10))) + del2 := mLogDel(mN(2), spans(lN(2)(0), 39, sp(24, 10))) + m := mig(compaction, del11, del2) + pmig(s, 1, m) + pmig(s, 2, mig(del12)) + + res := est.MergeAndMigrateTo(ctx, math.MaxInt, MMOptAlwaysRunTruncate(), MMOptAppendPhantomMigration(*mig(mTruncatedTo(65)))) + require.NoError(t, multierr.Combine(res.Warnings...)) + requireMigrationsEqual(t, res.NewBase, mig(mTruncatedTo(65), compaction, del2)) + require.FileExists(t, filepath.Join(s.Base(), cDir(1), "monolith")) + + res = est.MergeAndMigrateTo(ctx, math.MaxInt, MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { + return true + }), MMOptAlwaysRunTruncate(), MMOptAppendPhantomMigration(*mig(mTruncatedTo(100)))) + require.NoError(t, multierr.Combine(res.Warnings...)) + requireMigrationsEqual(t, res.NewBase, mig(mTruncatedTo(100))) + require.NoFileExists(t, filepath.Join(s.Base(), cDir(1), "monolith")) + require.NoFileExists(t, filepath.Join(s.Base(), mN(1))) + require.NoFileExists(t, filepath.Join(s.Base(), lN(1)(0))) +} + +func TestUserAbort(t *testing.T) { + s := tmp(t) + ctx := context.Background() + + pmig(s, 0, mig(mTruncatedTo(42))) + pmig(s, 1, mig(mTruncatedTo(96))) + est := MigerationExtension(s) + var res MergeAndMigratedTo + effs := est.DryRun(func(me MigrationExt) { + res = me.MergeAndMigrateTo(ctx, 1, MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { + return false + })) + }) + require.Len(t, res.Warnings, 1) + require.ErrorContains(t, res.Warnings[0], "aborted") + require.Empty(t, effs) +} + func TestUnsupportedVersion(t *testing.T) { s := tmp(t) m := mig(mVersion(backuppb.MigrationVersion(65535))) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 678fda6cbd..92771bf920 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -64,13 +64,14 @@ import ( ) const ( - flagYes = "yes" - flagUntil = "until" - flagStreamJSONOutput = "json" - flagStreamTaskName = "task-name" - flagStreamStartTS = "start-ts" - flagStreamEndTS = "end-ts" - flagGCSafePointTTS = "gc-ttl" + flagYes = "yes" + flagCleanUpCompactions = "clean-up-compactions" + flagUntil = "until" + flagStreamJSONOutput = "json" + flagStreamTaskName = "task-name" + flagStreamStartTS = "start-ts" + flagStreamEndTS = "end-ts" + flagGCSafePointTTS = "gc-ttl" truncateLockPath = "truncating.lock" hintOnTruncateLock = "There might be another truncate task running, or a truncate task that didn't exit properly. " + @@ -119,9 +120,10 @@ type StreamConfig struct { SafePointTTL int64 `json:"safe-point-ttl" toml:"safe-point-ttl"` // Spec for the command `truncate`, we should truncate the until when? - Until uint64 `json:"until" toml:"until"` - DryRun bool `json:"dry-run" toml:"dry-run"` - SkipPrompt bool `json:"skip-prompt" toml:"skip-prompt"` + Until uint64 `json:"until" toml:"until"` + DryRun bool `json:"dry-run" toml:"dry-run"` + SkipPrompt bool `json:"skip-prompt" toml:"skip-prompt"` + CleanUpCompactions bool `json:"clean-up-compactions" toml:"clean-up-compactions"` // Spec for the command `status`. JSONOutput bool `json:"json-output" toml:"json-output"` @@ -184,6 +186,7 @@ func DefineStreamTruncateLogFlags(flags *pflag.FlagSet) { "(support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23+0800'.)") flags.Bool(flagDryRun, false, "Run the command but don't really delete the files.") flags.BoolP(flagYes, "y", false, "Skip all prompts and always execute the command.") + flags.Bool(flagCleanUpCompactions, false, "Clean up compaction files. Including the compacted log files and expired SST files.") } func (cfg *StreamConfig) ParseStreamStatusFromFlags(flags *pflag.FlagSet) error { @@ -214,6 +217,9 @@ func (cfg *StreamConfig) ParseStreamTruncateFromFlags(flags *pflag.FlagSet) erro if cfg.DryRun, err = flags.GetBool(flagDryRun); err != nil { return errors.Trace(err) } + if cfg.CleanUpCompactions, err = flags.GetBool(flagCleanUpCompactions); err != nil { + return errors.Trace(err) + } return nil } @@ -1039,7 +1045,34 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre } } - readMetaDone := console.ShowTask("Reading log backup metadata... ", glue.WithTimeCost()) + if cfg.CleanUpCompactions { + est := stream.MigerationExtension(extStorage) + est.Hooks = stream.NewProgressBarHooks(console) + newSN := math.MaxInt + optPrompt := stream.MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { + console.Println("We are going to do the following: ") + tbl := console.CreateTable() + stream.AddMigrationToTable(m, tbl) + tbl.Print() + return console.PromptBool("Continue? ") + }) + optAppend := stream.MMOptAppendPhantomMigration(backuppb.Migration{TruncatedTo: cfg.Until}) + opts := []stream.MergeAndMigrateToOpt{optPrompt, optAppend, stream.MMOptAlwaysRunTruncate()} + var res stream.MergeAndMigratedTo + if cfg.DryRun { + est.DryRun(func(me stream.MigrationExt) { + res = me.MergeAndMigrateTo(ctx, newSN, opts...) + }) + } else { + res = est.MergeAndMigrateTo(ctx, newSN, opts...) + } + if len(res.Warnings) > 0 { + glue.PrintList(console, "the following errors happened", res.Warnings, 10) + } + return nil + } + + readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost()) metas := stream.StreamMetadataSet{ MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize, Helper: stream.NewMetadataHelper(),