@ -4,6 +4,8 @@ go_library(
|
||||
name = "infoschema",
|
||||
srcs = [
|
||||
"builder.go",
|
||||
"builder_misc.go",
|
||||
"bundle_builder.go",
|
||||
"cache.go",
|
||||
"cluster.go",
|
||||
"error.go",
|
||||
|
||||
@ -36,137 +36,9 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/table"
|
||||
"github.com/pingcap/tidb/pkg/table/tables"
|
||||
"github.com/pingcap/tidb/pkg/util/domainutil"
|
||||
"github.com/pingcap/tidb/pkg/util/logutil"
|
||||
"github.com/pingcap/tidb/pkg/util/sqlexec"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type policyGetter struct {
|
||||
is *infoSchema
|
||||
}
|
||||
|
||||
func (p *policyGetter) GetPolicy(policyID int64) (*model.PolicyInfo, error) {
|
||||
if policy, ok := p.is.PolicyByID(policyID); ok {
|
||||
return policy, nil
|
||||
}
|
||||
return nil, errors.Errorf("Cannot find placement policy with ID: %d", policyID)
|
||||
}
|
||||
|
||||
type bundleInfoBuilder struct {
|
||||
deltaUpdate bool
|
||||
// tables or partitions that need to update placement bundle
|
||||
updateTables map[int64]any
|
||||
// all tables or partitions referring these policies should update placement bundle
|
||||
updatePolicies map[int64]any
|
||||
// partitions that need to update placement bundle
|
||||
updatePartitions map[int64]any
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) initBundleInfoBuilder() {
|
||||
b.updateTables = make(map[int64]any)
|
||||
b.updatePartitions = make(map[int64]any)
|
||||
b.updatePolicies = make(map[int64]any)
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) SetDeltaUpdateBundles() {
|
||||
b.deltaUpdate = true
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) deleteBundle(is *infoSchema, tblID int64) {
|
||||
delete(is.ruleBundleMap, tblID)
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) markTableBundleShouldUpdate(tblID int64) {
|
||||
b.updateTables[tblID] = struct{}{}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) markPartitionBundleShouldUpdate(partID int64) {
|
||||
b.updatePartitions[partID] = struct{}{}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) markBundlesReferPolicyShouldUpdate(policyID int64) {
|
||||
b.updatePolicies[policyID] = struct{}{}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) updateInfoSchemaBundles(is *infoSchema) {
|
||||
if b.deltaUpdate {
|
||||
b.completeUpdateTables(is)
|
||||
for tblID := range b.updateTables {
|
||||
b.updateTableBundles(is, tblID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// do full update bundles
|
||||
is.ruleBundleMap = make(map[int64]*placement.Bundle)
|
||||
for _, tbls := range is.schemaMap {
|
||||
for _, tbl := range tbls.tables {
|
||||
b.updateTableBundles(is, tbl.Meta().ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) completeUpdateTables(is *infoSchema) {
|
||||
if len(b.updatePolicies) == 0 && len(b.updatePartitions) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, tbls := range is.schemaMap {
|
||||
for _, tbl := range tbls.tables {
|
||||
tblInfo := tbl.Meta()
|
||||
if tblInfo.PlacementPolicyRef != nil {
|
||||
if _, ok := b.updatePolicies[tblInfo.PlacementPolicyRef.ID]; ok {
|
||||
b.markTableBundleShouldUpdate(tblInfo.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if tblInfo.Partition != nil {
|
||||
for _, par := range tblInfo.Partition.Definitions {
|
||||
if _, ok := b.updatePartitions[par.ID]; ok {
|
||||
b.markTableBundleShouldUpdate(tblInfo.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) updateTableBundles(is *infoSchema, tableID int64) {
|
||||
tbl, ok := is.TableByID(tableID)
|
||||
if !ok {
|
||||
b.deleteBundle(is, tableID)
|
||||
return
|
||||
}
|
||||
|
||||
getter := &policyGetter{is: is}
|
||||
bundle, err := placement.NewTableBundle(getter, tbl.Meta())
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("create table bundle failed", zap.Error(err))
|
||||
} else if bundle != nil {
|
||||
is.ruleBundleMap[tableID] = bundle
|
||||
} else {
|
||||
b.deleteBundle(is, tableID)
|
||||
}
|
||||
|
||||
if tbl.Meta().Partition == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, par := range tbl.Meta().Partition.Definitions {
|
||||
bundle, err = placement.NewPartitionBundle(getter, par)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("create partition bundle failed",
|
||||
zap.Error(err),
|
||||
zap.Int64("partition id", par.ID),
|
||||
)
|
||||
} else if bundle != nil {
|
||||
is.ruleBundleMap[par.ID] = bundle
|
||||
} else {
|
||||
b.deleteBundle(is, par.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Builder builds a new InfoSchema.
|
||||
type Builder struct {
|
||||
enableV2 bool
|
||||
@ -233,25 +105,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
|
||||
}
|
||||
|
||||
func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
|
||||
tblIDs := make([]int64, 0, len(diff.AffectedOpts))
|
||||
if diff.AffectedOpts != nil {
|
||||
for _, opt := range diff.AffectedOpts {
|
||||
affectedDiff := &model.SchemaDiff{
|
||||
Version: diff.Version,
|
||||
Type: model.ActionCreateTable,
|
||||
SchemaID: opt.SchemaID,
|
||||
TableID: opt.TableID,
|
||||
OldSchemaID: opt.OldSchemaID,
|
||||
OldTableID: opt.OldTableID,
|
||||
}
|
||||
affectedIDs, err := b.ApplyDiff(m, affectedDiff)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
tblIDs = append(tblIDs, affectedIDs...)
|
||||
}
|
||||
}
|
||||
return tblIDs, nil
|
||||
return b.applyAffectedOpts(m, make([]int64, 0, len(diff.AffectedOpts)), diff, model.ActionCreateTable)
|
||||
}
|
||||
|
||||
func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
|
||||
@ -260,6 +114,7 @@ func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.Schema
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// bundle ops
|
||||
if diff.Type == model.ActionTruncateTable {
|
||||
b.deleteBundle(b.infoSchema, diff.OldTableID)
|
||||
b.markTableBundleShouldUpdate(diff.TableID)
|
||||
@ -284,6 +139,7 @@ func (b *Builder) applyDropTableOrPartition(m *meta.Meta, diff *model.SchemaDiff
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// bundle ops
|
||||
b.markTableBundleShouldUpdate(diff.TableID)
|
||||
for _, opt := range diff.AffectedOpts {
|
||||
b.deleteBundle(b.infoSchema, opt.OldTableID)
|
||||
@ -296,6 +152,8 @@ func (b *Builder) applyReorganizePartition(m *meta.Meta, diff *model.SchemaDiff)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// bundle ops
|
||||
for _, opt := range diff.AffectedOpts {
|
||||
if opt.OldTableID != 0 {
|
||||
b.deleteBundle(b.infoSchema, opt.OldTableID)
|
||||
@ -392,6 +250,7 @@ func (b *Builder) applyRecoverTable(m *meta.Meta, diff *model.SchemaDiff) ([]int
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// bundle ops
|
||||
for _, opt := range diff.AffectedOpts {
|
||||
b.markTableBundleShouldUpdate(opt.TableID)
|
||||
}
|
||||
@ -432,30 +291,34 @@ func updateAutoIDForExchangePartition(store kv.Storage, ptSchemaID, ptID, ntSche
|
||||
return err
|
||||
}
|
||||
|
||||
func (b *Builder) applyAffectedOpts(m *meta.Meta, tblIDs []int64, diff *model.SchemaDiff, tp model.ActionType) ([]int64, error) {
|
||||
if diff.AffectedOpts != nil {
|
||||
for _, opt := range diff.AffectedOpts {
|
||||
affectedDiff := &model.SchemaDiff{
|
||||
Version: diff.Version,
|
||||
Type: tp,
|
||||
SchemaID: opt.SchemaID,
|
||||
TableID: opt.TableID,
|
||||
OldSchemaID: opt.OldSchemaID,
|
||||
OldTableID: opt.OldTableID,
|
||||
}
|
||||
affectedIDs, err := b.ApplyDiff(m, affectedDiff)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
tblIDs = append(tblIDs, affectedIDs...)
|
||||
}
|
||||
}
|
||||
return tblIDs, nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyDefaultAction(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
|
||||
tblIDs, err := b.applyTableUpdate(m, diff)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
for _, opt := range diff.AffectedOpts {
|
||||
var err error
|
||||
affectedDiff := &model.SchemaDiff{
|
||||
Version: diff.Version,
|
||||
Type: diff.Type,
|
||||
SchemaID: opt.SchemaID,
|
||||
TableID: opt.TableID,
|
||||
OldSchemaID: opt.OldSchemaID,
|
||||
OldTableID: opt.OldTableID,
|
||||
}
|
||||
affectedIDs, err := b.ApplyDiff(m, affectedDiff)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
tblIDs = append(tblIDs, affectedIDs...)
|
||||
}
|
||||
|
||||
return tblIDs, nil
|
||||
return b.applyAffectedOpts(m, tblIDs, diff, diff.Type)
|
||||
}
|
||||
|
||||
func (b *Builder) getTableIDs(diff *model.SchemaDiff) (oldTableID, newTableID int64) {
|
||||
@ -609,78 +472,6 @@ func appendAffectedIDs(affected []int64, tblInfo *model.TableInfo) []int64 {
|
||||
return affected
|
||||
}
|
||||
|
||||
// copySortedTables copies sortedTables for old table and new table for later modification.
|
||||
func (b *Builder) copySortedTables(oldTableID, newTableID int64) {
|
||||
if tableIDIsValid(oldTableID) {
|
||||
b.copySortedTablesBucket(tableBucketIdx(oldTableID))
|
||||
}
|
||||
if tableIDIsValid(newTableID) && newTableID != oldTableID {
|
||||
b.copySortedTablesBucket(tableBucketIdx(newTableID))
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) applyCreateOrAlterResourceGroup(m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
group, err := m.GetResourceGroup(diff.SchemaID)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if group == nil {
|
||||
return ErrResourceGroupNotExists.GenWithStackByArgs(fmt.Sprintf("(Group ID %d)", diff.SchemaID))
|
||||
}
|
||||
// TODO: need mark updated?
|
||||
b.infoSchema.setResourceGroup(group)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyDropResourceGroup(m *meta.Meta, diff *model.SchemaDiff) []int64 {
|
||||
group, ok := b.infoSchema.ResourceGroupByID(diff.SchemaID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
b.infoSchema.deleteResourceGroup(group.Name.L)
|
||||
// TODO: return the related information.
|
||||
return []int64{}
|
||||
}
|
||||
|
||||
func (b *Builder) applyCreatePolicy(m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
po, err := m.GetPolicy(diff.SchemaID)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if po == nil {
|
||||
return ErrPlacementPolicyNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Policy ID %d)", diff.SchemaID),
|
||||
)
|
||||
}
|
||||
|
||||
if _, ok := b.infoSchema.PolicyByID(po.ID); ok {
|
||||
// if old policy with the same id exists, it means replace,
|
||||
// so the tables referring this policy's bundle should be updated
|
||||
b.markBundlesReferPolicyShouldUpdate(po.ID)
|
||||
}
|
||||
|
||||
b.infoSchema.setPolicy(po)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyAlterPolicy(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
|
||||
po, err := m.GetPolicy(diff.SchemaID)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
if po == nil {
|
||||
return nil, ErrPlacementPolicyNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Policy ID %d)", diff.SchemaID),
|
||||
)
|
||||
}
|
||||
|
||||
b.infoSchema.setPolicy(po)
|
||||
b.markBundlesReferPolicyShouldUpdate(po.ID)
|
||||
// TODO: return the policy related table ids
|
||||
return []int64{}, nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
di, err := m.GetDatabase(diff.SchemaID)
|
||||
if err != nil {
|
||||
@ -730,16 +521,6 @@ func (b *Builder) applyModifySchemaDefaultPlacement(m *meta.Meta, diff *model.Sc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyDropPolicy(PolicyID int64) []int64 {
|
||||
po, ok := b.infoSchema.PolicyByID(PolicyID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
b.infoSchema.deletePolicy(po.Name.L)
|
||||
// TODO: return the policy related table ids
|
||||
return []int64{}
|
||||
}
|
||||
|
||||
func (b *Builder) applyDropSchema(schemaID int64) []int64 {
|
||||
di, ok := b.infoSchema.SchemaByID(schemaID)
|
||||
if !ok {
|
||||
@ -784,6 +565,16 @@ func (b *Builder) applyRecoverSchema(m *meta.Meta, diff *model.SchemaDiff) ([]in
|
||||
return b.applyCreateTables(m, diff)
|
||||
}
|
||||
|
||||
// copySortedTables copies sortedTables for old table and new table for later modification.
|
||||
func (b *Builder) copySortedTables(oldTableID, newTableID int64) {
|
||||
if tableIDIsValid(oldTableID) {
|
||||
b.copySortedTablesBucket(tableBucketIdx(oldTableID))
|
||||
}
|
||||
if tableIDIsValid(newTableID) && newTableID != oldTableID {
|
||||
b.copySortedTablesBucket(tableBucketIdx(newTableID))
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copySortedTablesBucket(bucketIdx int) {
|
||||
oldSortedTables := b.infoSchema.sortedTablesBuckets[bucketIdx]
|
||||
newSortedTables := make(sortedTables, len(oldSortedTables))
|
||||
@ -970,12 +761,6 @@ func (b *Builder) applyDropTable(dbInfo *model.DBInfo, tableID int64, affected [
|
||||
return affected
|
||||
}
|
||||
|
||||
// TODO: get rid of this and use infoschemaV2 directly.
|
||||
type infoschemaProxy struct {
|
||||
infoschemaV2
|
||||
v1 InfoSchema
|
||||
}
|
||||
|
||||
// Build builds and returns the built infoschema.
|
||||
func (b *Builder) Build() InfoSchema {
|
||||
b.updateInfoSchemaBundles(b.infoSchema)
|
||||
@ -1020,46 +805,6 @@ func (b *Builder) copySchemasMap(oldIS *infoSchema) {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyBundlesMap(oldIS *infoSchema) {
|
||||
b.infoSchema.ruleBundleMap = make(map[int64]*placement.Bundle)
|
||||
for id, v := range oldIS.ruleBundleMap {
|
||||
b.infoSchema.ruleBundleMap[id] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyPoliciesMap(oldIS *infoSchema) {
|
||||
is := b.infoSchema
|
||||
for _, v := range oldIS.AllPlacementPolicies() {
|
||||
is.policyMap[v.Name.L] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyResourceGroupMap(oldIS *infoSchema) {
|
||||
is := b.infoSchema
|
||||
for _, v := range oldIS.AllResourceGroups() {
|
||||
is.resourceGroupMap[v.Name.L] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyTemporaryTableIDsMap(oldIS *infoSchema) {
|
||||
is := b.infoSchema
|
||||
if len(oldIS.temporaryTableIDs) == 0 {
|
||||
is.temporaryTableIDs = nil
|
||||
return
|
||||
}
|
||||
|
||||
is.temporaryTableIDs = make(map[int64]struct{})
|
||||
for tblID := range oldIS.temporaryTableIDs {
|
||||
is.temporaryTableIDs[tblID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyReferredForeignKeyMap(oldIS *infoSchema) {
|
||||
for k, v := range oldIS.referredForeignKeyMap {
|
||||
b.infoSchema.referredForeignKeyMap[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
// getSchemaAndCopyIfNecessary creates a new schemaTables instance when a table in the database has changed.
|
||||
// It also does modifications on the new one because old schemaTables must be read-only.
|
||||
// And it will only copy the changed database once in the lifespan of the Builder.
|
||||
@ -1081,26 +826,6 @@ func (b *Builder) getSchemaAndCopyIfNecessary(dbName string) *model.DBInfo {
|
||||
return b.infoSchema.schemaMap[dbName].dbInfo
|
||||
}
|
||||
|
||||
func (b *Builder) initMisc(dbInfos []*model.DBInfo, policies []*model.PolicyInfo, resourceGroups []*model.ResourceGroupInfo) {
|
||||
info := b.infoSchema
|
||||
// build the policies.
|
||||
for _, policy := range policies {
|
||||
info.setPolicy(policy)
|
||||
}
|
||||
|
||||
// build the groups.
|
||||
for _, group := range resourceGroups {
|
||||
info.setResourceGroup(group)
|
||||
}
|
||||
|
||||
// Maintain foreign key reference information.
|
||||
for _, di := range dbInfos {
|
||||
for _, t := range di.Tables {
|
||||
b.infoSchema.addReferredForeignKeys(di.Name, t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) initVirtualTables(schemaVersion int64) error {
|
||||
// Initialize virtual tables.
|
||||
for _, driver := range drivers {
|
||||
@ -1221,13 +946,6 @@ func (b *Builder) addTable(schemaVersion int64, di *model.DBInfo, tblInfo *model
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) addTemporaryTable(tblID int64) {
|
||||
if b.infoSchema.temporaryTableIDs == nil {
|
||||
b.infoSchema.temporaryTableIDs = make(map[int64]struct{})
|
||||
}
|
||||
b.infoSchema.temporaryTableIDs[tblID] = struct{}{}
|
||||
}
|
||||
|
||||
type virtualTableDriver struct {
|
||||
*model.DBInfo
|
||||
TableFromMeta tableFromMetaFunc
|
||||
|
||||
163
pkg/infoschema/builder_misc.go
Normal file
163
pkg/infoschema/builder_misc.go
Normal file
@ -0,0 +1,163 @@
|
||||
// Copyright 2024 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package infoschema
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/pkg/ddl/placement"
|
||||
"github.com/pingcap/tidb/pkg/meta"
|
||||
"github.com/pingcap/tidb/pkg/parser/model"
|
||||
)
|
||||
|
||||
func (b *Builder) applyCreatePolicy(m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
po, err := m.GetPolicy(diff.SchemaID)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if po == nil {
|
||||
return ErrPlacementPolicyNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Policy ID %d)", diff.SchemaID),
|
||||
)
|
||||
}
|
||||
|
||||
if _, ok := b.infoSchema.PolicyByID(po.ID); ok {
|
||||
// if old policy with the same id exists, it means replace,
|
||||
// so the tables referring this policy's bundle should be updated
|
||||
b.markBundlesReferPolicyShouldUpdate(po.ID)
|
||||
}
|
||||
|
||||
b.infoSchema.setPolicy(po)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyAlterPolicy(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
|
||||
po, err := m.GetPolicy(diff.SchemaID)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
if po == nil {
|
||||
return nil, ErrPlacementPolicyNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Policy ID %d)", diff.SchemaID),
|
||||
)
|
||||
}
|
||||
|
||||
b.infoSchema.setPolicy(po)
|
||||
b.markBundlesReferPolicyShouldUpdate(po.ID)
|
||||
// TODO: return the policy related table ids
|
||||
return []int64{}, nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyDropPolicy(PolicyID int64) []int64 {
|
||||
po, ok := b.infoSchema.PolicyByID(PolicyID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
b.infoSchema.deletePolicy(po.Name.L)
|
||||
// TODO: return the policy related table ids
|
||||
return []int64{}
|
||||
}
|
||||
|
||||
func (b *Builder) applyCreateOrAlterResourceGroup(m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
group, err := m.GetResourceGroup(diff.SchemaID)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if group == nil {
|
||||
return ErrResourceGroupNotExists.GenWithStackByArgs(fmt.Sprintf("(Group ID %d)", diff.SchemaID))
|
||||
}
|
||||
// TODO: need mark updated?
|
||||
b.infoSchema.setResourceGroup(group)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyDropResourceGroup(m *meta.Meta, diff *model.SchemaDiff) []int64 {
|
||||
group, ok := b.infoSchema.ResourceGroupByID(diff.SchemaID)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
b.infoSchema.deleteResourceGroup(group.Name.L)
|
||||
// TODO: return the related information.
|
||||
return []int64{}
|
||||
}
|
||||
|
||||
func (b *Builder) addTemporaryTable(tblID int64) {
|
||||
if b.infoSchema.temporaryTableIDs == nil {
|
||||
b.infoSchema.temporaryTableIDs = make(map[int64]struct{})
|
||||
}
|
||||
b.infoSchema.temporaryTableIDs[tblID] = struct{}{}
|
||||
}
|
||||
|
||||
func (b *Builder) copyBundlesMap(oldIS *infoSchema) {
|
||||
b.infoSchema.ruleBundleMap = make(map[int64]*placement.Bundle)
|
||||
for id, v := range oldIS.ruleBundleMap {
|
||||
b.infoSchema.ruleBundleMap[id] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyPoliciesMap(oldIS *infoSchema) {
|
||||
is := b.infoSchema
|
||||
for _, v := range oldIS.AllPlacementPolicies() {
|
||||
is.policyMap[v.Name.L] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyResourceGroupMap(oldIS *infoSchema) {
|
||||
is := b.infoSchema
|
||||
for _, v := range oldIS.AllResourceGroups() {
|
||||
is.resourceGroupMap[v.Name.L] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyTemporaryTableIDsMap(oldIS *infoSchema) {
|
||||
is := b.infoSchema
|
||||
if len(oldIS.temporaryTableIDs) == 0 {
|
||||
is.temporaryTableIDs = nil
|
||||
return
|
||||
}
|
||||
|
||||
is.temporaryTableIDs = make(map[int64]struct{})
|
||||
for tblID := range oldIS.temporaryTableIDs {
|
||||
is.temporaryTableIDs[tblID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) copyReferredForeignKeyMap(oldIS *infoSchema) {
|
||||
for k, v := range oldIS.referredForeignKeyMap {
|
||||
b.infoSchema.referredForeignKeyMap[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Builder) initMisc(dbInfos []*model.DBInfo, policies []*model.PolicyInfo, resourceGroups []*model.ResourceGroupInfo) {
|
||||
info := b.infoSchema
|
||||
// build the policies.
|
||||
for _, policy := range policies {
|
||||
info.setPolicy(policy)
|
||||
}
|
||||
|
||||
// build the groups.
|
||||
for _, group := range resourceGroups {
|
||||
info.setResourceGroup(group)
|
||||
}
|
||||
|
||||
// Maintain foreign key reference information.
|
||||
for _, di := range dbInfos {
|
||||
for _, t := range di.Tables {
|
||||
b.infoSchema.addReferredForeignKeys(di.Name, t)
|
||||
}
|
||||
}
|
||||
}
|
||||
149
pkg/infoschema/bundle_builder.go
Normal file
149
pkg/infoschema/bundle_builder.go
Normal file
@ -0,0 +1,149 @@
|
||||
// Copyright 2024 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package infoschema
|
||||
|
||||
import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/pkg/ddl/placement"
|
||||
"github.com/pingcap/tidb/pkg/parser/model"
|
||||
"github.com/pingcap/tidb/pkg/util/logutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type policyGetter struct {
|
||||
is *infoSchema
|
||||
}
|
||||
|
||||
func (p *policyGetter) GetPolicy(policyID int64) (*model.PolicyInfo, error) {
|
||||
if policy, ok := p.is.PolicyByID(policyID); ok {
|
||||
return policy, nil
|
||||
}
|
||||
return nil, errors.Errorf("Cannot find placement policy with ID: %d", policyID)
|
||||
}
|
||||
|
||||
type bundleInfoBuilder struct {
|
||||
deltaUpdate bool
|
||||
// tables or partitions that need to update placement bundle
|
||||
updateTables map[int64]any
|
||||
// all tables or partitions referring these policies should update placement bundle
|
||||
updatePolicies map[int64]any
|
||||
// partitions that need to update placement bundle
|
||||
updatePartitions map[int64]any
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) initBundleInfoBuilder() {
|
||||
b.updateTables = make(map[int64]any)
|
||||
b.updatePartitions = make(map[int64]any)
|
||||
b.updatePolicies = make(map[int64]any)
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) SetDeltaUpdateBundles() {
|
||||
b.deltaUpdate = true
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) deleteBundle(is *infoSchema, tblID int64) {
|
||||
delete(is.ruleBundleMap, tblID)
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) markTableBundleShouldUpdate(tblID int64) {
|
||||
b.updateTables[tblID] = struct{}{}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) markPartitionBundleShouldUpdate(partID int64) {
|
||||
b.updatePartitions[partID] = struct{}{}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) markBundlesReferPolicyShouldUpdate(policyID int64) {
|
||||
b.updatePolicies[policyID] = struct{}{}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) updateInfoSchemaBundles(is *infoSchema) {
|
||||
if b.deltaUpdate {
|
||||
b.completeUpdateTables(is)
|
||||
for tblID := range b.updateTables {
|
||||
b.updateTableBundles(is, tblID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// do full update bundles
|
||||
is.ruleBundleMap = make(map[int64]*placement.Bundle)
|
||||
for _, tbls := range is.schemaMap {
|
||||
for _, tbl := range tbls.tables {
|
||||
b.updateTableBundles(is, tbl.Meta().ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) completeUpdateTables(is *infoSchema) {
|
||||
if len(b.updatePolicies) == 0 && len(b.updatePartitions) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, tbls := range is.schemaMap {
|
||||
for _, tbl := range tbls.tables {
|
||||
tblInfo := tbl.Meta()
|
||||
if tblInfo.PlacementPolicyRef != nil {
|
||||
if _, ok := b.updatePolicies[tblInfo.PlacementPolicyRef.ID]; ok {
|
||||
b.markTableBundleShouldUpdate(tblInfo.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if tblInfo.Partition != nil {
|
||||
for _, par := range tblInfo.Partition.Definitions {
|
||||
if _, ok := b.updatePartitions[par.ID]; ok {
|
||||
b.markTableBundleShouldUpdate(tblInfo.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bundleInfoBuilder) updateTableBundles(is *infoSchema, tableID int64) {
|
||||
tbl, ok := is.TableByID(tableID)
|
||||
if !ok {
|
||||
b.deleteBundle(is, tableID)
|
||||
return
|
||||
}
|
||||
|
||||
getter := &policyGetter{is: is}
|
||||
bundle, err := placement.NewTableBundle(getter, tbl.Meta())
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("create table bundle failed", zap.Error(err))
|
||||
} else if bundle != nil {
|
||||
is.ruleBundleMap[tableID] = bundle
|
||||
} else {
|
||||
b.deleteBundle(is, tableID)
|
||||
}
|
||||
|
||||
if tbl.Meta().Partition == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, par := range tbl.Meta().Partition.Definitions {
|
||||
bundle, err = placement.NewPartitionBundle(getter, par)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("create partition bundle failed",
|
||||
zap.Error(err),
|
||||
zap.Int64("partition id", par.ID),
|
||||
)
|
||||
} else if bundle != nil {
|
||||
is.ruleBundleMap[par.ID] = bundle
|
||||
} else {
|
||||
b.deleteBundle(is, par.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user