diff --git a/ddl/placement/types.go b/ddl/placement/types.go index 895ab30b7d..d6d6bfeb6d 100644 --- a/ddl/placement/types.go +++ b/ddl/placement/types.go @@ -76,6 +76,14 @@ type Rule struct { IsolationLevel string `json:"isolation_level,omitempty"` } +// Bundle is a group of all rules and configurations. It is used to support rule cache. +type Bundle struct { + ID string `json:"group_id"` + Index int `json:"group_index"` + Override bool `json:"group_override"` + Rules []*Rule `json:"rules"` +} + // RuleOpType indicates the operation type. type RuleOpType string diff --git a/ddl/placement/utils.go b/ddl/placement/utils.go index b251b9f016..87c4a37a4e 100644 --- a/ddl/placement/utils.go +++ b/ddl/placement/utils.go @@ -14,6 +14,7 @@ package placement import ( + "fmt" "strings" "github.com/pingcap/errors" @@ -69,3 +70,8 @@ func CheckLabelConstraints(labels []string) ([]LabelConstraint, error) { } return constraints, nil } + +// GroupID accepts a tableID or whatever integer, and encode the integer into a valid GroupID for PD. +func GroupID(id int64) string { + return fmt.Sprintf("TIDB_DDL_%d", id) +} diff --git a/domain/domain.go b/domain/domain.go index 2d7c254f16..a2557f71d3 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/infoschema" @@ -140,7 +141,18 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in return 0, nil, fullLoad, err } - newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, neededSchemaVersion) + bundles, err := infosync.GetAllRuleBundles(nil) + failpoint.Inject("FailPlacement", func(val failpoint.Value) { + if val.(bool) { + bundles = []*placement.Bundle{} + err = nil + } + }) + if err != nil { + return 0, nil, fullLoad, err + } + + newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, bundles, neededSchemaVersion) if err != nil { return 0, nil, fullLoad, err } diff --git a/domain/domain_test.go b/domain/domain_test.go index a54a572ad4..099a045d33 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -89,6 +89,11 @@ func unixSocketAvailable() bool { } func TestInfo(t *testing.T) { + err := failpoint.Enable("github.com/pingcap/tidb/domain/FailPlacement", `return(true)`) + if err != nil { + t.Fatal(err) + } + if runtime.GOOS == "windows" { t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") } @@ -214,6 +219,11 @@ func TestInfo(t *testing.T) { if err != nil || len(infos) != 0 { t.Fatalf("err %v, infos %v", err, infos) } + + err = failpoint.Disable("github.com/pingcap/tidb/domain/FailPlacement") + if err != nil { + t.Fatal(err) + } } type mockSessionManager struct { diff --git a/domain/infosync/info.go b/domain/infosync/info.go index b025c9cc3c..fd427341ba 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -81,6 +81,9 @@ const ( // ErrPrometheusAddrIsNotSet is the error that Prometheus address is not set in PD and etcd var ErrPrometheusAddrIsNotSet = terror.ClassDomain.New(errno.ErrPrometheusAddrIsNotSet, errno.MySQLErrName[errno.ErrPrometheusAddrIsNotSet]) +// errPlacementRulesDisabled is exported for internal usage, indicating PD rejected the request due to disabled placement feature. +var errPlacementRulesDisabled = errors.New("placement rules feature is disabled") + // InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down. type InfoSyncer struct { etcdCli *clientv3.Client @@ -295,12 +298,19 @@ func doRequest(ctx context.Context, addrs []string, route, method string, body i res, err = http.DefaultClient.Do(req) if err == nil { - defer terror.Call(res.Body.Close) - bodyBytes, err := ioutil.ReadAll(res.Body) - if res.StatusCode != http.StatusOK { - err = errors.Wrapf(err, "%s", bodyBytes) + if err != nil { + return nil, err } + if res.StatusCode != http.StatusOK { + err = errors.Errorf("%s", bodyBytes) + // ignore if placement rules feature is not enabled + if strings.HasPrefix(err.Error(), `"placement rules feature is disabled"`) { + err = nil + bodyBytes = nil + } + } + terror.Log(res.Body.Close()) return bodyBytes, err } } @@ -324,17 +334,12 @@ func GetPlacementRules(ctx context.Context) ([]*placement.RuleOp, error) { return nil, errors.Errorf("pd unavailable") } + rules := []*placement.RuleOp{} res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "rules"), http.MethodGet, nil) - if err != nil { - return nil, err + if err == nil && res != nil { + err = json.Unmarshal(res, &rules) } - - var rules []*placement.RuleOp - err = json.Unmarshal(res, &rules) - if err != nil { - return nil, err - } - return rules, nil + return rules, err } // UpdatePlacementRules is used to notify PD changes of placement rules. @@ -367,6 +372,57 @@ func UpdatePlacementRules(ctx context.Context, rules []*placement.RuleOp) error return err } +// GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema. +func GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) { + is, err := getGlobalInfoSyncer() + if err != nil { + return nil, err + } + + bundles := []*placement.Bundle{} + if is.etcdCli == nil { + return bundles, nil + } + + addrs := is.etcdCli.Endpoints() + + if len(addrs) == 0 { + return nil, errors.Errorf("pd unavailable") + } + + res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil) + if err == nil && res != nil { + err = json.Unmarshal(res, &bundles) + } + return bundles, err +} + +// GetRuleBundle is used to get one specific rule bundle from PD. +func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) { + is, err := getGlobalInfoSyncer() + if err != nil { + return nil, err + } + + bundle := &placement.Bundle{ID: name} + + if is.etcdCli == nil { + return bundle, nil + } + + addrs := is.etcdCli.Endpoints() + + if len(addrs) == 0 { + return nil, errors.Errorf("pd unavailable") + } + + res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil) + if err == nil && res != nil { + err = json.Unmarshal(res, bundle) + } + return bundle, err +} + func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { allInfo := make(map[string]*ServerInfo) if is.etcdCli == nil { diff --git a/infoschema/builder.go b/infoschema/builder.go index 6c38352528..6416c6f226 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -23,6 +23,8 @@ import ( "github.com/pingcap/parser/charset" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/ddl/placement" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/table" @@ -40,13 +42,16 @@ type Builder struct { // Return the detail updated table IDs that are produced from SchemaDiff and an error. func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { b.is.schemaMetaVersion = diff.Version - if diff.Type == model.ActionCreateSchema { + switch diff.Type { + case model.ActionCreateSchema: return nil, b.applyCreateSchema(m, diff) - } else if diff.Type == model.ActionDropSchema { + case model.ActionDropSchema: tblIDs := b.applyDropSchema(diff.SchemaID) return tblIDs, nil - } else if diff.Type == model.ActionModifySchemaCharsetAndCollate { + case model.ActionModifySchemaCharsetAndCollate: return nil, b.applyModifySchemaCharsetAndCollate(m, diff) + case model.ActionAlterTableAlterPartition: + return nil, b.applyPartitionPlacementUpdate(m, diff) } roDBInfo, ok := b.is.SchemaByID(diff.SchemaID) if !ok { @@ -364,11 +369,24 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [ return affected } +func (b *Builder) applyPartitionPlacementUpdate(m *meta.Meta, diff *model.SchemaDiff) error { + tID := placement.GroupID(diff.TableID) + + bundle, err := infosync.GetRuleBundle(nil, tID) + if err != nil { + return err + } + + b.is.ruleBundleMap[tID] = bundle + return nil +} + // InitWithOldInfoSchema initializes an empty new InfoSchema by copies all the data from old InfoSchema. func (b *Builder) InitWithOldInfoSchema() *Builder { oldIS := b.handle.Get().(*infoSchema) b.is.schemaMetaVersion = oldIS.schemaMetaVersion b.copySchemasMap(oldIS) + b.copyBundlesMap(oldIS) copy(b.is.sortedTablesBuckets, oldIS.sortedTablesBuckets) return b } @@ -379,6 +397,12 @@ func (b *Builder) copySchemasMap(oldIS *infoSchema) { } } +func (b *Builder) copyBundlesMap(oldIS *infoSchema) { + for k, v := range oldIS.ruleBundleMap { + b.is.ruleBundleMap[k] = v + } +} + // copySchemaTables creates a new schemaTables instance when a table in the database has changed. // It also does modifications on the new one because old schemaTables must be read-only. // Note: please make sure the dbName is in lowercase. @@ -395,10 +419,15 @@ func (b *Builder) copySchemaTables(dbName string) *model.DBInfo { return newSchemaTables.dbInfo } -// InitWithDBInfos initializes an empty new InfoSchema with a slice of DBInfo and schema version. -func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, schemaVersion int64) (*Builder, error) { +// InitWithDBInfos initializes an empty new InfoSchema with a slice of DBInfo, all placement rules, and schema version. +func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement.Bundle, schemaVersion int64) (*Builder, error) { info := b.is info.schemaMetaVersion = schemaVersion + info.ruleBundleMap = make(map[string]*placement.Bundle, len(bundles)) + for _, bundle := range bundles { + info.ruleBundleMap[bundle.ID] = bundle + } + for _, di := range dbInfos { err := b.createSchemaTablesForDB(di, tables.TableFromMeta) if err != nil { @@ -429,6 +458,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF tables: make(map[string]table.Table, len(di.Tables)), } b.is.schemaMap[di.Name.L] = schTbls + for _, t := range di.Tables { allocs := autoid.NewAllocatorsFromTblInfo(b.handle.store, di.ID, t) var tbl table.Table @@ -445,7 +475,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF type virtualTableDriver struct { *model.DBInfo - TableFromMeta func(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error) + TableFromMeta tableFromMetaFunc } var drivers []*virtualTableDriver @@ -466,6 +496,7 @@ func NewBuilder(handle *Handle) *Builder { b.handle = handle b.is = &infoSchema{ schemaMap: map[string]*schemaTables{}, + ruleBundleMap: map[string]*placement.Bundle{}, sortedTablesBuckets: make([]sortedTables, bucketCount), } return b diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 72a2fe48d8..f2e1033ad7 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/sessionctx" @@ -53,6 +54,8 @@ type InfoSchema interface { // TableIsSequence indicates whether the schema.table is a sequence. TableIsSequence(schema, table model.CIStr) bool FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo) + // BundleByName is used to get a rule bundle. + BundleByName(name string) (*placement.Bundle, bool) } type sortedTables []table.Table @@ -87,6 +90,9 @@ type schemaTables struct { const bucketCount = 512 type infoSchema struct { + // ruleBundleMap stores all placement rules + ruleBundleMap map[string]*placement.Bundle + schemaMap map[string]*schemaTables // sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount). @@ -390,3 +396,8 @@ func GetInfoSchemaBySessionVars(sessVar *variable.SessionVars) InfoSchema { } return is } + +func (is *infoSchema) BundleByName(name string) (*placement.Bundle, bool) { + t, r := is.ruleBundleMap[name] + return t, r +} diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index ecf6db2ec7..5c2e9241c0 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -110,7 +110,7 @@ func (*testSuite) TestT(c *C) { }) c.Assert(err, IsNil) - builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(dbInfos, 1) + builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(dbInfos, nil, 1) c.Assert(err, IsNil) txn, err := store.Begin() @@ -282,7 +282,7 @@ func (*testSuite) TestInfoTables(c *C) { c.Assert(err, IsNil) defer store.Close() handle := infoschema.NewHandle(store) - builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, 0) + builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, nil, 0) c.Assert(err, IsNil) builder.Build() is := handle.Get()