infoschema_v2: support create/alter/drop resource group (#51562)
ref pingcap/tidb#50959
This commit is contained in:
@ -83,7 +83,7 @@ go_test(
|
||||
],
|
||||
embed = [":infoschema"],
|
||||
flaky = True,
|
||||
shard_count = 11,
|
||||
shard_count = 12,
|
||||
deps = [
|
||||
"//pkg/ddl/placement",
|
||||
"//pkg/domain",
|
||||
|
||||
@ -72,7 +72,7 @@ func (b *Builder) applyDropPolicy(PolicyID int64) []int64 {
|
||||
return []int64{}
|
||||
}
|
||||
|
||||
func (b *Builder) applyCreateOrAlterResourceGroup(m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
func applyCreateOrAlterResourceGroup(b *Builder, m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
group, err := m.GetResourceGroup(diff.SchemaID)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -85,7 +85,7 @@ func (b *Builder) applyCreateOrAlterResourceGroup(m *meta.Meta, diff *model.Sche
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Builder) applyDropResourceGroup(m *meta.Meta, diff *model.SchemaDiff) []int64 {
|
||||
func applyDropResourceGroup(b *Builder, m *meta.Meta, diff *model.SchemaDiff) []int64 {
|
||||
group, ok := b.infoSchema.ResourceGroupByID(diff.SchemaID)
|
||||
if !ok {
|
||||
return nil
|
||||
|
||||
@ -214,17 +214,6 @@ func (is *infoSchema) PolicyByID(id int64) (val *model.PolicyInfo, ok bool) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (is *infoSchema) ResourceGroupByID(id int64) (val *model.ResourceGroupInfo, ok bool) {
|
||||
is.resourceGroupMutex.RLock()
|
||||
defer is.resourceGroupMutex.RUnlock()
|
||||
for _, v := range is.resourceGroupMap {
|
||||
if v.ID == id {
|
||||
return v, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (is *infoSchema) SchemaByID(id int64) (val *model.DBInfo, ok bool) {
|
||||
for _, v := range is.schemaMap {
|
||||
if v.dbInfo.ID == id {
|
||||
@ -392,6 +381,18 @@ func (is *infoSchemaMisc) ResourceGroupByName(name model.CIStr) (*model.Resource
|
||||
return t, r
|
||||
}
|
||||
|
||||
// ResourceGroupByID is used to find the resource group.
|
||||
func (is *infoSchemaMisc) ResourceGroupByID(id int64) (*model.ResourceGroupInfo, bool) {
|
||||
is.resourceGroupMutex.RLock()
|
||||
defer is.resourceGroupMutex.RUnlock()
|
||||
for _, v := range is.resourceGroupMap {
|
||||
if v.ID == id {
|
||||
return v, true
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// AllResourceGroups returns all resource groups.
|
||||
func (is *infoSchemaMisc) AllResourceGroups() []*model.ResourceGroupInfo {
|
||||
is.resourceGroupMutex.RLock()
|
||||
|
||||
@ -510,43 +510,49 @@ func (is *infoschemaV2) SchemaTables(schema model.CIStr) (tables []table.Table)
|
||||
|
||||
func loadTableInfo(r autoid.Requirement, infoData *Data, tblID, dbID int64, ts uint64, schemaVersion int64) (table.Table, error) {
|
||||
// Try to avoid repeated concurrency loading.
|
||||
res, err, _ := loadTableSF.Do(fmt.Sprintf("%d-%d-%d", dbID, tblID, schemaVersion), func() (ret any, err error) {
|
||||
res, err, _ := loadTableSF.Do(fmt.Sprintf("%d-%d-%d", dbID, tblID, schemaVersion), func() (any, error) {
|
||||
snapshot := r.Store().GetSnapshot(kv.NewVersion(ts))
|
||||
// Using the KV timeout read feature to address the issue of potential DDL lease expiration when
|
||||
// the meta region leader is slow.
|
||||
snapshot.SetOption(kv.TiKVClientReadTimeout, uint64(3000)) // 3000ms.
|
||||
m := meta.NewSnapshotMeta(snapshot)
|
||||
|
||||
ret, err = m.GetTable(dbID, tblID)
|
||||
return
|
||||
tblInfo, err := m.GetTable(dbID, tblID)
|
||||
|
||||
if err != nil {
|
||||
// TODO load table panic!!!
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// table removed.
|
||||
if tblInfo == nil {
|
||||
return nil, errors.Trace(ErrTableNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Schema ID %d)", dbID),
|
||||
fmt.Sprintf("(Table ID %d)", tblID),
|
||||
))
|
||||
}
|
||||
|
||||
ConvertCharsetCollateToLowerCaseIfNeed(tblInfo)
|
||||
ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo)
|
||||
allocs := autoid.NewAllocatorsFromTblInfo(r, dbID, tblInfo)
|
||||
// TODO: handle cached table!!!
|
||||
ret, err := tables.TableFromMeta(allocs, tblInfo)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return ret, err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
// TODO load table panic!!!
|
||||
panic(err)
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if res == nil {
|
||||
return nil, err
|
||||
}
|
||||
tblInfo := res.(*model.TableInfo)
|
||||
|
||||
// table removed.
|
||||
if tblInfo == nil {
|
||||
return nil, errors.Trace(ErrTableNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Schema ID %d)", dbID),
|
||||
fmt.Sprintf("(Table ID %d)", tblID),
|
||||
))
|
||||
}
|
||||
|
||||
ConvertCharsetCollateToLowerCaseIfNeed(tblInfo)
|
||||
ConvertOldVersionUTF8ToUTF8MB4IfNeed(tblInfo)
|
||||
allocs := autoid.NewAllocatorsFromTblInfo(r, dbID, tblInfo)
|
||||
// TODO: handle cached table!!!
|
||||
ret, err := tables.TableFromMeta(allocs, tblInfo)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return ret, nil
|
||||
return res.(table.Table), nil
|
||||
}
|
||||
|
||||
var loadTableSF = &singleflight.Group{}
|
||||
@ -624,20 +630,6 @@ func applyAlterPolicy(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64
|
||||
return b.applyAlterPolicy(m, diff)
|
||||
}
|
||||
|
||||
func applyCreateOrAlterResourceGroup(b *Builder, m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
if b.enableV2 {
|
||||
return b.applyCreateOrAlterResourceGroupV2(m, diff)
|
||||
}
|
||||
return b.applyCreateOrAlterResourceGroup(m, diff)
|
||||
}
|
||||
|
||||
func applyDropResourceGroup(b *Builder, m *meta.Meta, diff *model.SchemaDiff) []int64 {
|
||||
if b.enableV2 {
|
||||
return b.applyDropResourceGroupV2(m, diff)
|
||||
}
|
||||
return b.applyDropResourceGroup(m, diff)
|
||||
}
|
||||
|
||||
func applyDropTableOrPartition(b *Builder, m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
|
||||
if b.enableV2 {
|
||||
// return b.applyDropTableOrPartitionV2(m, diff)
|
||||
@ -745,14 +737,6 @@ func (b *Builder) applyDropPolicyV2(PolicyID int64) []int64 {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
func (b *Builder) applyDropResourceGroupV2(m *meta.Meta, diff *model.SchemaDiff) []int64 {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
func (b *Builder) applyCreateOrAlterResourceGroupV2(m *meta.Meta, diff *model.SchemaDiff) error {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
func (b *Builder) applyTruncateTableOrPartitionV2(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) {
|
||||
panic("TODO")
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
|
||||
"github.com/pingcap/tidb/pkg/infoschema/internal"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/meta"
|
||||
"github.com/pingcap/tidb/pkg/parser/model"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -70,4 +71,68 @@ func TestV2Basic(t *testing.T) {
|
||||
// TODO: support FindTableByPartitionID.
|
||||
}
|
||||
|
||||
// TODO: test misc
|
||||
func TestMisc(t *testing.T) {
|
||||
r := internal.CreateAutoIDRequirement(t)
|
||||
defer func() {
|
||||
r.Store().Close()
|
||||
}()
|
||||
|
||||
builder, err := NewBuilder(r, nil, NewData()).InitWithDBInfos(nil, nil, nil, 1)
|
||||
require.NoError(t, err)
|
||||
is := builder.Build()
|
||||
require.Len(t, is.AllResourceGroups(), 0)
|
||||
|
||||
// test create resource group
|
||||
resourceGroupInfo := internal.MockResourceGroupInfo(t, r.Store(), "test")
|
||||
internal.AddResourceGroup(t, r.Store(), resourceGroupInfo)
|
||||
txn, err := r.Store().Begin()
|
||||
require.NoError(t, err)
|
||||
err = applyCreateOrAlterResourceGroup(builder, meta.NewMeta(txn), &model.SchemaDiff{SchemaID: resourceGroupInfo.ID})
|
||||
require.NoError(t, err)
|
||||
is = builder.Build()
|
||||
require.Len(t, is.AllResourceGroups(), 1)
|
||||
getResourceGroupInfo, ok := is.ResourceGroupByName(resourceGroupInfo.Name)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, resourceGroupInfo, getResourceGroupInfo)
|
||||
require.NoError(t, txn.Rollback())
|
||||
|
||||
// create another resource group
|
||||
resourceGroupInfo2 := internal.MockResourceGroupInfo(t, r.Store(), "test2")
|
||||
internal.AddResourceGroup(t, r.Store(), resourceGroupInfo2)
|
||||
txn, err = r.Store().Begin()
|
||||
require.NoError(t, err)
|
||||
err = applyCreateOrAlterResourceGroup(builder, meta.NewMeta(txn), &model.SchemaDiff{SchemaID: resourceGroupInfo2.ID})
|
||||
require.NoError(t, err)
|
||||
is = builder.Build()
|
||||
require.Len(t, is.AllResourceGroups(), 2)
|
||||
getResourceGroupInfo, ok = is.ResourceGroupByName(resourceGroupInfo2.Name)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, resourceGroupInfo2, getResourceGroupInfo)
|
||||
require.NoError(t, txn.Rollback())
|
||||
|
||||
// test alter resource group
|
||||
resourceGroupInfo.State = model.StatePublic
|
||||
internal.UpdateResourceGroup(t, r.Store(), resourceGroupInfo)
|
||||
txn, err = r.Store().Begin()
|
||||
require.NoError(t, err)
|
||||
err = applyCreateOrAlterResourceGroup(builder, meta.NewMeta(txn), &model.SchemaDiff{SchemaID: resourceGroupInfo.ID})
|
||||
require.NoError(t, err)
|
||||
is = builder.Build()
|
||||
require.Len(t, is.AllResourceGroups(), 2)
|
||||
getResourceGroupInfo, ok = is.ResourceGroupByName(resourceGroupInfo.Name)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, resourceGroupInfo, getResourceGroupInfo)
|
||||
require.NoError(t, txn.Rollback())
|
||||
|
||||
// test drop resource group
|
||||
internal.DropResourceGroup(t, r.Store(), resourceGroupInfo)
|
||||
txn, err = r.Store().Begin()
|
||||
require.NoError(t, err)
|
||||
_ = applyDropResourceGroup(builder, meta.NewMeta(txn), &model.SchemaDiff{SchemaID: resourceGroupInfo.ID})
|
||||
is = builder.Build()
|
||||
require.Len(t, is.AllResourceGroups(), 1)
|
||||
getResourceGroupInfo, ok = is.ResourceGroupByName(resourceGroupInfo2.Name)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, resourceGroupInfo2, getResourceGroupInfo)
|
||||
require.NoError(t, txn.Rollback())
|
||||
}
|
||||
|
||||
@ -180,6 +180,16 @@ func MockTable(t *testing.T, store kv.Storage, tblInfo *model.TableInfo) table.T
|
||||
return tbl
|
||||
}
|
||||
|
||||
// MockResourceGroupInfo mock resource group for testing.
|
||||
func MockResourceGroupInfo(t *testing.T, store kv.Storage, groupName string) *model.ResourceGroupInfo {
|
||||
id, err := GenGlobalID(store)
|
||||
require.NoError(t, err)
|
||||
return &model.ResourceGroupInfo{
|
||||
ID: id,
|
||||
Name: model.NewCIStr(groupName),
|
||||
}
|
||||
}
|
||||
|
||||
// AddTable add mock table for testing.
|
||||
func AddTable(t *testing.T, store kv.Storage, dbInfo *model.DBInfo, tblInfo *model.TableInfo) {
|
||||
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
|
||||
@ -224,3 +234,36 @@ func DropDB(t *testing.T, store kv.Storage, dbInfo *model.DBInfo) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// AddResourceGroup add mock resource group for testing.
|
||||
func AddResourceGroup(t *testing.T, store kv.Storage, group *model.ResourceGroupInfo) {
|
||||
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
|
||||
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
|
||||
err := meta.NewMeta(txn).AddResourceGroup(group)
|
||||
require.NoError(t, err)
|
||||
return errors.Trace(err)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// UpdateResourceGroup update mock resource group for testing.
|
||||
func UpdateResourceGroup(t *testing.T, store kv.Storage, group *model.ResourceGroupInfo) {
|
||||
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
|
||||
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
|
||||
err := meta.NewMeta(txn).UpdateResourceGroup(group)
|
||||
require.NoError(t, err)
|
||||
return errors.Trace(err)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// DropResourceGroup drop mock resource group for testing.
|
||||
func DropResourceGroup(t *testing.T, store kv.Storage, group *model.ResourceGroupInfo) {
|
||||
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
|
||||
err := kv.RunInNewTxn(ctx, store, true, func(ctx context.Context, txn kv.Transaction) error {
|
||||
err := meta.NewMeta(txn).DropResourceGroup(group.ID)
|
||||
require.NoError(t, err)
|
||||
return errors.Trace(err)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user