ddl: fetch schemas in work pool in case of reporting a error in one goroutine. (#55841)
ref pingcap/tidb#54436
This commit is contained in:
@ -135,7 +135,7 @@ go_test(
|
||||
],
|
||||
embed = [":domain"],
|
||||
flaky = True,
|
||||
shard_count = 28,
|
||||
shard_count = 30,
|
||||
deps = [
|
||||
"//pkg/config",
|
||||
"//pkg/ddl",
|
||||
@ -144,6 +144,7 @@ go_test(
|
||||
"//pkg/infoschema",
|
||||
"//pkg/keyspace",
|
||||
"//pkg/kv",
|
||||
"//pkg/meta",
|
||||
"//pkg/meta/model",
|
||||
"//pkg/metrics",
|
||||
"//pkg/parser/ast",
|
||||
@ -159,6 +160,7 @@ go_test(
|
||||
"//pkg/testkit/testsetup",
|
||||
"//pkg/types",
|
||||
"//pkg/util",
|
||||
"//pkg/util/mathutil",
|
||||
"//pkg/util/mock",
|
||||
"//pkg/util/replayer",
|
||||
"//pkg/util/stmtsummary/v2:stmtsummary",
|
||||
|
||||
@ -16,6 +16,7 @@ package domain_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -24,9 +25,14 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/ddl"
|
||||
"github.com/pingcap/tidb/pkg/domain/infosync"
|
||||
"github.com/pingcap/tidb/pkg/keyspace"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/meta"
|
||||
"github.com/pingcap/tidb/pkg/server"
|
||||
"github.com/pingcap/tidb/pkg/session"
|
||||
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/pkg/store/mockstore"
|
||||
"github.com/pingcap/tidb/pkg/testkit"
|
||||
"github.com/pingcap/tidb/pkg/util/mathutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -125,3 +131,75 @@ func TestAbnormalSessionPool(t *testing.T) {
|
||||
failpoint.Disable("github.com/pingcap/tidb/pkg/util/mockSessionPoolReturnError")
|
||||
require.Equal(t, svr.InternalSessionExists(se), false)
|
||||
}
|
||||
|
||||
func TestTetchAllSchemasWithTables(t *testing.T) {
|
||||
lease := 100 * time.Millisecond
|
||||
store, err := mockstore.NewMockStore()
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := store.Close()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
session.SetSchemaLease(lease)
|
||||
domain, err := session.BootstrapSession(store)
|
||||
require.NoError(t, err)
|
||||
defer domain.Close()
|
||||
|
||||
snapshot := store.GetSnapshot(kv.NewVersion(mathutil.MaxUint))
|
||||
m := meta.NewSnapshotMeta(snapshot)
|
||||
dbs, err := domain.FetchAllSchemasWithTables(m)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(dbs), 3)
|
||||
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("create database test1")
|
||||
tk.MustExec("use test1")
|
||||
tk.MustExec("create table t1(i int, s varchar(20), index index_t(i, s))")
|
||||
tk.MustExec("create table t2(i int, s varchar(20), index index_t(i, s))")
|
||||
tk.MustExec("create database test2")
|
||||
dbs, err = domain.FetchAllSchemasWithTables(m)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(dbs), 5)
|
||||
}
|
||||
|
||||
func TestTetchAllSchemasWithTablesWithFailpoint(t *testing.T) {
|
||||
lease := 100 * time.Millisecond
|
||||
store, err := mockstore.NewMockStore()
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := store.Close()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
session.SetSchemaLease(lease)
|
||||
domain, err := session.BootstrapSession(store)
|
||||
require.NoError(t, err)
|
||||
defer domain.Close()
|
||||
|
||||
snapshot := store.GetSnapshot(kv.NewVersion(mathutil.MaxUint))
|
||||
m := meta.NewSnapshotMeta(snapshot)
|
||||
dbs, err := domain.FetchAllSchemasWithTables(m)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(dbs), 3)
|
||||
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
|
||||
for i := 1; i <= 1000; i++ {
|
||||
dbName := fmt.Sprintf("test_%d", i)
|
||||
tk.MustExec("create database " + dbName)
|
||||
}
|
||||
variable.SchemaCacheSize.Store(1000000)
|
||||
|
||||
dbs, err = domain.FetchAllSchemasWithTables(m)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(dbs), 1003)
|
||||
|
||||
// inject the failpoint
|
||||
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/failed-fetch-schemas-with-tables", "1*return()"))
|
||||
defer func() {
|
||||
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/failed-fetch-schemas-with-tables"))
|
||||
}()
|
||||
dbs, err = domain.FetchAllSchemasWithTables(m)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err.Error(), "failpoint: failed to fetch schemas with tables")
|
||||
require.Nil(t, dbs)
|
||||
}
|
||||
|
||||
@ -442,16 +442,23 @@ func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, erro
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
splittedSchemas := do.splitForConcurrentFetch(allSchemas)
|
||||
doneCh := make(chan error, len(splittedSchemas))
|
||||
for _, schemas := range splittedSchemas {
|
||||
go do.fetchSchemasWithTables(schemas, m, doneCh)
|
||||
if len(allSchemas) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
for range splittedSchemas {
|
||||
err = <-doneCh
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
splittedSchemas := do.splitForConcurrentFetch(allSchemas)
|
||||
concurrency := min(len(splittedSchemas), 128)
|
||||
|
||||
eg, ectx := util.NewErrorGroupWithRecoverWithCtx(context.Background())
|
||||
eg.SetLimit(concurrency)
|
||||
for _, schemas := range splittedSchemas {
|
||||
ss := schemas
|
||||
eg.Go(func() error {
|
||||
return do.fetchSchemasWithTables(ectx, ss, m)
|
||||
})
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return allSchemas, nil
|
||||
}
|
||||
@ -481,23 +488,29 @@ func (*Domain) splitForConcurrentFetch(schemas []*model.DBInfo) [][]*model.DBInf
|
||||
return splitted
|
||||
}
|
||||
|
||||
func (*Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, done chan error) {
|
||||
func (*Domain) fetchSchemasWithTables(ctx context.Context, schemas []*model.DBInfo, m *meta.Meta) error {
|
||||
failpoint.Inject("failed-fetch-schemas-with-tables", func() {
|
||||
failpoint.Return(errors.New("failpoint: failed to fetch schemas with tables"))
|
||||
})
|
||||
|
||||
for _, di := range schemas {
|
||||
// if the ctx has been canceled, stop fetching schemas.
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
var tables []*model.TableInfo
|
||||
var err error
|
||||
if variable.SchemaCacheSize.Load() > 0 && !infoschema.IsSpecialDB(di.Name.L) {
|
||||
name2ID, specialTableInfos, err := meta.GetAllNameToIDAndTheMustLoadedTableInfo(m, di.ID)
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
return err
|
||||
}
|
||||
di.TableName2ID = name2ID
|
||||
tables = specialTableInfos
|
||||
} else {
|
||||
tables, err = m.ListTables(di.ID)
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
return err
|
||||
}
|
||||
}
|
||||
// If TreatOldVersionUTF8AsUTF8MB4 was enable, need to convert the old version schema UTF8 charset to UTF8MB4.
|
||||
@ -524,7 +537,7 @@ func (*Domain) fetchSchemasWithTables(schemas []*model.DBInfo, m *meta.Meta, don
|
||||
}
|
||||
di.Deprecated.Tables = diTables
|
||||
}
|
||||
done <- nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldUseV2 decides whether to use infoschema v2.
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/infoschema"
|
||||
"github.com/pingcap/tidb/pkg/meta"
|
||||
"github.com/pingcap/tidb/pkg/meta/model"
|
||||
pmodel "github.com/pingcap/tidb/pkg/parser/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -50,3 +51,8 @@ func (do *Domain) MustGetPartitionAt(t *testing.T, dbName, tableName string, idx
|
||||
ti := do.MustGetTableInfo(t, dbName, tableName)
|
||||
return ti.Partition.Definitions[idx].ID
|
||||
}
|
||||
|
||||
// FetchAllSchemasWithTables calls the internal function. Only used in unit tests.
|
||||
func (do *Domain) FetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) {
|
||||
return do.fetchAllSchemasWithTables(m)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user