ddl: add a framework for placement rules cache (#20086)
This commit is contained in:
@ -76,6 +76,14 @@ type Rule struct {
|
||||
IsolationLevel string `json:"isolation_level,omitempty"`
|
||||
}
|
||||
|
||||
// Bundle is a group of all rules and configurations. It is used to support rule cache.
|
||||
type Bundle struct {
|
||||
ID string `json:"group_id"`
|
||||
Index int `json:"group_index"`
|
||||
Override bool `json:"group_override"`
|
||||
Rules []*Rule `json:"rules"`
|
||||
}
|
||||
|
||||
// RuleOpType indicates the operation type.
|
||||
type RuleOpType string
|
||||
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
package placement
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
@ -69,3 +70,8 @@ func CheckLabelConstraints(labels []string) ([]LabelConstraint, error) {
|
||||
}
|
||||
return constraints, nil
|
||||
}
|
||||
|
||||
// GroupID accepts a tableID or whatever integer, and encode the integer into a valid GroupID for PD.
|
||||
func GroupID(id int64) string {
|
||||
return fmt.Sprintf("TIDB_DDL_%d", id)
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ import (
|
||||
"github.com/pingcap/tidb/bindinfo"
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/ddl"
|
||||
"github.com/pingcap/tidb/ddl/placement"
|
||||
"github.com/pingcap/tidb/domain/infosync"
|
||||
"github.com/pingcap/tidb/errno"
|
||||
"github.com/pingcap/tidb/infoschema"
|
||||
@ -140,7 +141,18 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
|
||||
return 0, nil, fullLoad, err
|
||||
}
|
||||
|
||||
newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, neededSchemaVersion)
|
||||
bundles, err := infosync.GetAllRuleBundles(nil)
|
||||
failpoint.Inject("FailPlacement", func(val failpoint.Value) {
|
||||
if val.(bool) {
|
||||
bundles = []*placement.Bundle{}
|
||||
err = nil
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return 0, nil, fullLoad, err
|
||||
}
|
||||
|
||||
newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, bundles, neededSchemaVersion)
|
||||
if err != nil {
|
||||
return 0, nil, fullLoad, err
|
||||
}
|
||||
|
||||
@ -89,6 +89,11 @@ func unixSocketAvailable() bool {
|
||||
}
|
||||
|
||||
func TestInfo(t *testing.T) {
|
||||
err := failpoint.Enable("github.com/pingcap/tidb/domain/FailPlacement", `return(true)`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
|
||||
}
|
||||
@ -214,6 +219,11 @@ func TestInfo(t *testing.T) {
|
||||
if err != nil || len(infos) != 0 {
|
||||
t.Fatalf("err %v, infos %v", err, infos)
|
||||
}
|
||||
|
||||
err = failpoint.Disable("github.com/pingcap/tidb/domain/FailPlacement")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
type mockSessionManager struct {
|
||||
|
||||
@ -81,6 +81,9 @@ const (
|
||||
// ErrPrometheusAddrIsNotSet is the error that Prometheus address is not set in PD and etcd
|
||||
var ErrPrometheusAddrIsNotSet = terror.ClassDomain.New(errno.ErrPrometheusAddrIsNotSet, errno.MySQLErrName[errno.ErrPrometheusAddrIsNotSet])
|
||||
|
||||
// errPlacementRulesDisabled is exported for internal usage, indicating PD rejected the request due to disabled placement feature.
|
||||
var errPlacementRulesDisabled = errors.New("placement rules feature is disabled")
|
||||
|
||||
// InfoSyncer stores server info to etcd when the tidb-server starts and delete when tidb-server shuts down.
|
||||
type InfoSyncer struct {
|
||||
etcdCli *clientv3.Client
|
||||
@ -295,12 +298,19 @@ func doRequest(ctx context.Context, addrs []string, route, method string, body i
|
||||
|
||||
res, err = http.DefaultClient.Do(req)
|
||||
if err == nil {
|
||||
defer terror.Call(res.Body.Close)
|
||||
|
||||
bodyBytes, err := ioutil.ReadAll(res.Body)
|
||||
if res.StatusCode != http.StatusOK {
|
||||
err = errors.Wrapf(err, "%s", bodyBytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res.StatusCode != http.StatusOK {
|
||||
err = errors.Errorf("%s", bodyBytes)
|
||||
// ignore if placement rules feature is not enabled
|
||||
if strings.HasPrefix(err.Error(), `"placement rules feature is disabled"`) {
|
||||
err = nil
|
||||
bodyBytes = nil
|
||||
}
|
||||
}
|
||||
terror.Log(res.Body.Close())
|
||||
return bodyBytes, err
|
||||
}
|
||||
}
|
||||
@ -324,17 +334,12 @@ func GetPlacementRules(ctx context.Context) ([]*placement.RuleOp, error) {
|
||||
return nil, errors.Errorf("pd unavailable")
|
||||
}
|
||||
|
||||
rules := []*placement.RuleOp{}
|
||||
res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "rules"), http.MethodGet, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err == nil && res != nil {
|
||||
err = json.Unmarshal(res, &rules)
|
||||
}
|
||||
|
||||
var rules []*placement.RuleOp
|
||||
err = json.Unmarshal(res, &rules)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rules, nil
|
||||
return rules, err
|
||||
}
|
||||
|
||||
// UpdatePlacementRules is used to notify PD changes of placement rules.
|
||||
@ -367,6 +372,57 @@ func UpdatePlacementRules(ctx context.Context, rules []*placement.RuleOp) error
|
||||
return err
|
||||
}
|
||||
|
||||
// GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema.
|
||||
func GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) {
|
||||
is, err := getGlobalInfoSyncer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bundles := []*placement.Bundle{}
|
||||
if is.etcdCli == nil {
|
||||
return bundles, nil
|
||||
}
|
||||
|
||||
addrs := is.etcdCli.Endpoints()
|
||||
|
||||
if len(addrs) == 0 {
|
||||
return nil, errors.Errorf("pd unavailable")
|
||||
}
|
||||
|
||||
res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil)
|
||||
if err == nil && res != nil {
|
||||
err = json.Unmarshal(res, &bundles)
|
||||
}
|
||||
return bundles, err
|
||||
}
|
||||
|
||||
// GetRuleBundle is used to get one specific rule bundle from PD.
|
||||
func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) {
|
||||
is, err := getGlobalInfoSyncer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bundle := &placement.Bundle{ID: name}
|
||||
|
||||
if is.etcdCli == nil {
|
||||
return bundle, nil
|
||||
}
|
||||
|
||||
addrs := is.etcdCli.Endpoints()
|
||||
|
||||
if len(addrs) == 0 {
|
||||
return nil, errors.Errorf("pd unavailable")
|
||||
}
|
||||
|
||||
res, err := doRequest(ctx, addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil)
|
||||
if err == nil && res != nil {
|
||||
err = json.Unmarshal(res, bundle)
|
||||
}
|
||||
return bundle, err
|
||||
}
|
||||
|
||||
func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
|
||||
allInfo := make(map[string]*ServerInfo)
|
||||
if is.etcdCli == nil {
|
||||
|
||||
@ -23,6 +23,8 @@ import (
|
||||
"github.com/pingcap/parser/charset"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/ddl/placement"
|
||||
"github.com/pingcap/tidb/domain/infosync"
|
||||
"github.com/pingcap/tidb/meta"
|
||||
"github.com/pingcap/tidb/meta/autoid"
|
||||
"github.com/pingcap/tidb/table"
|
||||
@ -40,13 +42,16 @@ type Builder struct {
|
||||
// Return the detail updated table IDs that are produced from SchemaDiff and an error.
|
||||
func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
|
||||
b.is.schemaMetaVersion = diff.Version
|
||||
if diff.Type == model.ActionCreateSchema {
|
||||
switch diff.Type {
|
||||
case model.ActionCreateSchema:
|
||||
return nil, b.applyCreateSchema(m, diff)
|
||||
} else if diff.Type == model.ActionDropSchema {
|
||||
case model.ActionDropSchema:
|
||||
tblIDs := b.applyDropSchema(diff.SchemaID)
|
||||
return tblIDs, nil
|
||||
} else if diff.Type == model.ActionModifySchemaCharsetAndCollate {
|
||||
case model.ActionModifySchemaCharsetAndCollate:
|
||||
return nil, b.applyModifySchemaCharsetAndCollate(m, diff)
|
||||
case model.ActionAlterTableAlterPartition:
|
||||
return nil, b.applyPartitionPlacementUpdate(m, diff)
|
||||
}
|
||||
roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
|
||||
if !ok {
|
||||
@ -364,11 +369,24 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [
|
||||
return affected
|
||||
}
|
||||
|
||||
func (b *Builder) applyPartitionPlacementUpdate(m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
tID := placement.GroupID(diff.TableID)
|
||||
|
||||
bundle, err := infosync.GetRuleBundle(nil, tID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.is.ruleBundleMap[tID] = bundle
|
||||
return nil
|
||||
}
|
||||
|
||||
// InitWithOldInfoSchema initializes an empty new InfoSchema by copies all the data from old InfoSchema.
|
||||
func (b *Builder) InitWithOldInfoSchema() *Builder {
|
||||
oldIS := b.handle.Get().(*infoSchema)
|
||||
b.is.schemaMetaVersion = oldIS.schemaMetaVersion
|
||||
b.copySchemasMap(oldIS)
|
||||
b.copyBundlesMap(oldIS)
|
||||
copy(b.is.sortedTablesBuckets, oldIS.sortedTablesBuckets)
|
||||
return b
|
||||
}
|
||||
@ -379,6 +397,12 @@ func (b *Builder) copySchemasMap(oldIS *infoSchema) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyBundlesMap(oldIS *infoSchema) {
|
||||
for k, v := range oldIS.ruleBundleMap {
|
||||
b.is.ruleBundleMap[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// copySchemaTables creates a new schemaTables instance when a table in the database has changed.
|
||||
// It also does modifications on the new one because old schemaTables must be read-only.
|
||||
// Note: please make sure the dbName is in lowercase.
|
||||
@ -395,10 +419,15 @@ func (b *Builder) copySchemaTables(dbName string) *model.DBInfo {
|
||||
return newSchemaTables.dbInfo
|
||||
}
|
||||
|
||||
// InitWithDBInfos initializes an empty new InfoSchema with a slice of DBInfo and schema version.
|
||||
func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, schemaVersion int64) (*Builder, error) {
|
||||
// InitWithDBInfos initializes an empty new InfoSchema with a slice of DBInfo, all placement rules, and schema version.
|
||||
func (b *Builder) InitWithDBInfos(dbInfos []*model.DBInfo, bundles []*placement.Bundle, schemaVersion int64) (*Builder, error) {
|
||||
info := b.is
|
||||
info.schemaMetaVersion = schemaVersion
|
||||
info.ruleBundleMap = make(map[string]*placement.Bundle, len(bundles))
|
||||
for _, bundle := range bundles {
|
||||
info.ruleBundleMap[bundle.ID] = bundle
|
||||
}
|
||||
|
||||
for _, di := range dbInfos {
|
||||
err := b.createSchemaTablesForDB(di, tables.TableFromMeta)
|
||||
if err != nil {
|
||||
@ -429,6 +458,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF
|
||||
tables: make(map[string]table.Table, len(di.Tables)),
|
||||
}
|
||||
b.is.schemaMap[di.Name.L] = schTbls
|
||||
|
||||
for _, t := range di.Tables {
|
||||
allocs := autoid.NewAllocatorsFromTblInfo(b.handle.store, di.ID, t)
|
||||
var tbl table.Table
|
||||
@ -445,7 +475,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo, tableFromMeta tableF
|
||||
|
||||
type virtualTableDriver struct {
|
||||
*model.DBInfo
|
||||
TableFromMeta func(alloc autoid.Allocators, tblInfo *model.TableInfo) (table.Table, error)
|
||||
TableFromMeta tableFromMetaFunc
|
||||
}
|
||||
|
||||
var drivers []*virtualTableDriver
|
||||
@ -466,6 +496,7 @@ func NewBuilder(handle *Handle) *Builder {
|
||||
b.handle = handle
|
||||
b.is = &infoSchema{
|
||||
schemaMap: map[string]*schemaTables{},
|
||||
ruleBundleMap: map[string]*placement.Bundle{},
|
||||
sortedTablesBuckets: make([]sortedTables, bucketCount),
|
||||
}
|
||||
return b
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/tidb/ddl/placement"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/meta/autoid"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
@ -53,6 +54,8 @@ type InfoSchema interface {
|
||||
// TableIsSequence indicates whether the schema.table is a sequence.
|
||||
TableIsSequence(schema, table model.CIStr) bool
|
||||
FindTableByPartitionID(partitionID int64) (table.Table, *model.DBInfo)
|
||||
// BundleByName is used to get a rule bundle.
|
||||
BundleByName(name string) (*placement.Bundle, bool)
|
||||
}
|
||||
|
||||
type sortedTables []table.Table
|
||||
@ -87,6 +90,9 @@ type schemaTables struct {
|
||||
const bucketCount = 512
|
||||
|
||||
type infoSchema struct {
|
||||
// ruleBundleMap stores all placement rules
|
||||
ruleBundleMap map[string]*placement.Bundle
|
||||
|
||||
schemaMap map[string]*schemaTables
|
||||
|
||||
// sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount).
|
||||
@ -390,3 +396,8 @@ func GetInfoSchemaBySessionVars(sessVar *variable.SessionVars) InfoSchema {
|
||||
}
|
||||
return is
|
||||
}
|
||||
|
||||
func (is *infoSchema) BundleByName(name string) (*placement.Bundle, bool) {
|
||||
t, r := is.ruleBundleMap[name]
|
||||
return t, r
|
||||
}
|
||||
|
||||
@ -110,7 +110,7 @@ func (*testSuite) TestT(c *C) {
|
||||
})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(dbInfos, 1)
|
||||
builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(dbInfos, nil, 1)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
txn, err := store.Begin()
|
||||
@ -282,7 +282,7 @@ func (*testSuite) TestInfoTables(c *C) {
|
||||
c.Assert(err, IsNil)
|
||||
defer store.Close()
|
||||
handle := infoschema.NewHandle(store)
|
||||
builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, 0)
|
||||
builder, err := infoschema.NewBuilder(handle).InitWithDBInfos(nil, nil, 0)
|
||||
c.Assert(err, IsNil)
|
||||
builder.Build()
|
||||
is := handle.Get()
|
||||
|
||||
Reference in New Issue
Block a user