From 722fbdc679cdbaedf7961ec9ac69fd02ab876575 Mon Sep 17 00:00:00 2001 From: Zack Zhao <57036248+joccau@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:40:52 +0800 Subject: [PATCH] ddl: fetch schemas in work pool in case of reporting a error in one goroutine. (#55841) ref pingcap/tidb#54436 --- pkg/domain/BUILD.bazel | 4 +- pkg/domain/db_test.go | 78 +++++++++++++++++++++++++++++++++++++++ pkg/domain/domain.go | 43 +++++++++++++-------- pkg/domain/test_helper.go | 6 +++ 4 files changed, 115 insertions(+), 16 deletions(-) diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel index e624d00fa1..c04fb87d01 100644 --- a/pkg/domain/BUILD.bazel +++ b/pkg/domain/BUILD.bazel @@ -135,7 +135,7 @@ go_test( ], embed = [":domain"], flaky = True, - shard_count = 28, + shard_count = 30, deps = [ "//pkg/config", "//pkg/ddl", @@ -144,6 +144,7 @@ go_test( "//pkg/infoschema", "//pkg/keyspace", "//pkg/kv", + "//pkg/meta", "//pkg/meta/model", "//pkg/metrics", "//pkg/parser/ast", @@ -159,6 +160,7 @@ go_test( "//pkg/testkit/testsetup", "//pkg/types", "//pkg/util", + "//pkg/util/mathutil", "//pkg/util/mock", "//pkg/util/replayer", "//pkg/util/stmtsummary/v2:stmtsummary", diff --git a/pkg/domain/db_test.go b/pkg/domain/db_test.go index 60d5003374..19d2e13563 100644 --- a/pkg/domain/db_test.go +++ b/pkg/domain/db_test.go @@ -16,6 +16,7 @@ package domain_test import ( "context" + "fmt" "testing" "time" @@ -24,9 +25,14 @@ import ( "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/keyspace" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/server" "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/store/mockstore" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/mathutil" "github.com/stretchr/testify/require" ) @@ -125,3 +131,75 @@ func TestAbnormalSessionPool(t *testing.T) { failpoint.Disable("github.com/pingcap/tidb/pkg/util/mockSessionPoolReturnError") require.Equal(t, svr.InternalSessionExists(se), false) } + +func TestTetchAllSchemasWithTables(t *testing.T) { + lease := 100 * time.Millisecond + store, err := mockstore.NewMockStore() + require.NoError(t, err) + defer func() { + err := store.Close() + require.NoError(t, err) + }() + session.SetSchemaLease(lease) + domain, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domain.Close() + + snapshot := store.GetSnapshot(kv.NewVersion(mathutil.MaxUint)) + m := meta.NewSnapshotMeta(snapshot) + dbs, err := domain.FetchAllSchemasWithTables(m) + require.NoError(t, err) + require.Equal(t, len(dbs), 3) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("create database test1") + tk.MustExec("use test1") + tk.MustExec("create table t1(i int, s varchar(20), index index_t(i, s))") + tk.MustExec("create table t2(i int, s varchar(20), index index_t(i, s))") + tk.MustExec("create database test2") + dbs, err = domain.FetchAllSchemasWithTables(m) + require.NoError(t, err) + require.Equal(t, len(dbs), 5) +} + +func TestTetchAllSchemasWithTablesWithFailpoint(t *testing.T) { + lease := 100 * time.Millisecond + store, err := mockstore.NewMockStore() + require.NoError(t, err) + defer func() { + err := store.Close() + require.NoError(t, err) + }() + session.SetSchemaLease(lease) + domain, err := session.BootstrapSession(store) + require.NoError(t, err) + defer domain.Close() + + snapshot := store.GetSnapshot(kv.NewVersion(mathutil.MaxUint)) + m := meta.NewSnapshotMeta(snapshot) + dbs, err := domain.FetchAllSchemasWithTables(m) + require.NoError(t, err) + require.Equal(t, len(dbs), 3) + + tk := testkit.NewTestKit(t, store) + + for i := 1; i <= 1000; i++ { + dbName := fmt.Sprintf("test_%d", i) + tk.MustExec("create database " + dbName) + } + variable.SchemaCacheSize.Store(1000000) + + dbs, err = domain.FetchAllSchemasWithTables(m) + require.NoError(t, err) + require.Equal(t, len(dbs), 1003) + + // inject the failpoint + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/failed-fetch-schemas-with-tables", "1*return()")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/failed-fetch-schemas-with-tables")) + }() + dbs, err = domain.FetchAllSchemasWithTables(m) + require.Error(t, err) + require.Equal(t, err.Error(), "failpoint: failed to fetch schemas with tables") + require.Nil(t, dbs) +} diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 49055a9778..d059b7c734 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -442,16 +442,23 @@ func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, erro if err != nil { return nil, err } - splittedSchemas := do.splitForConcurrentFetch(allSchemas) - doneCh := make(chan error, len(splittedSchemas)) - for _, schemas := range splittedSchemas { - go do.fetchSchemasWithTables(schemas, m, doneCh) + if len(allSchemas) == 0 { + return nil, nil } - for range splittedSchemas { - err = <-doneCh - if err != nil { - return nil, err - } + + splittedSchemas := do.splitForConcurrentFetch(allSchemas) + concurrency := min(len(splittedSchemas), 128) + + eg, ectx := util.NewErrorGroupWithRecoverWithCtx(context.Background()) + eg.SetLimit(concurrency) + for _, schemas := range splittedSchemas { + ss := schemas + eg.Go(func() error { + return do.fetchSchemasWithTables(ectx, ss, m) + }) + } + if err := eg.Wait(); err != nil { + return nil, err } return allSchemas, nil } @@ -481,23 +488,29 @@ func (*Domain) splitForConcurrentFetch(schemas []*model.DBInfo) [][]*model.DBInf return splitted } -func (*Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, done chan error) { +func (*Domain) fetchSchemasWithTables(ctx context.Context, schemas []*model.DBInfo, m *meta.Meta) error { + failpoint.Inject("failed-fetch-schemas-with-tables", func() { + failpoint.Return(errors.New("failpoint: failed to fetch schemas with tables")) + }) + for _, di := range schemas { + // if the ctx has been canceled, stop fetching schemas. + if err := ctx.Err(); err != nil { + return err + } var tables []*model.TableInfo var err error if variable.SchemaCacheSize.Load() > 0 && !infoschema.IsSpecialDB(di.Name.L) { name2ID, specialTableInfos, err := meta.GetAllNameToIDAndTheMustLoadedTableInfo(m, di.ID) if err != nil { - done <- err - return + return err } di.TableName2ID = name2ID tables = specialTableInfos } else { tables, err = m.ListTables(di.ID) if err != nil { - done <- err - return + return err } } // If TreatOldVersionUTF8AsUTF8MB4 was enable, need to convert the old version schema UTF8 charset to UTF8MB4. @@ -524,7 +537,7 @@ func (*Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, don } di.Deprecated.Tables = diTables } - done <- nil + return nil } // shouldUseV2 decides whether to use infoschema v2. diff --git a/pkg/domain/test_helper.go b/pkg/domain/test_helper.go index 7721ed55dd..2fa17a2a16 100644 --- a/pkg/domain/test_helper.go +++ b/pkg/domain/test_helper.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/stretchr/testify/require" @@ -50,3 +51,8 @@ func (do *Domain) MustGetPartitionAt(t *testing.T, dbName, tableName string, idx ti := do.MustGetTableInfo(t, dbName, tableName) return ti.Partition.Definitions[idx].ID } + +// FetchAllSchemasWithTables calls the internal function. Only used in unit tests. +func (do *Domain) FetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) { + return do.fetchAllSchemasWithTables(m) +}