Files
tidb/pkg/ddl/affinity_test.go

298 lines
11 KiB
Go

// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl_test
import (
"context"
"fmt"
"testing"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/affinity"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
tikv "github.com/tikv/client-go/v2/tikv"
pdhttp "github.com/tikv/pd/client/http"
)
// Unit tests for keyrange building
type mockCodec struct {
tikv.Codec
}
func (mockCodec) EncodeRegionRange(start, end []byte) ([]byte, []byte) {
return append([]byte("k:"), start...), append([]byte("k:"), end...)
}
func TestAffinityBuildGroupDefinitionsTable(t *testing.T) {
tbl := &model.TableInfo{
ID: 123,
Affinity: &model.TableAffinityInfo{Level: ast.TableAffinityLevelTable},
}
groups, err := ddl.BuildAffinityGroupDefinitionsForTest(&mockCodec{}, tbl, nil)
require.NoError(t, err)
require.Len(t, groups, 1)
ranges := groups[ddl.GetTableAffinityGroupID(123)]
require.Len(t, ranges, 1)
require.Equal(t, pdhttp.AffinityGroupKeyRange{
StartKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(123)...),
EndKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(124)...),
}, ranges[0])
}
func TestAffinityBuildGroupDefinitionsPartition(t *testing.T) {
tbl := &model.TableInfo{
ID: 50,
Affinity: &model.TableAffinityInfo{Level: ast.TableAffinityLevelPartition},
Partition: &model.PartitionInfo{
Enable: true,
Definitions: []model.PartitionDefinition{
{ID: 1},
{ID: 3},
},
},
}
groups, err := ddl.BuildAffinityGroupDefinitionsForTest(&mockCodec{}, tbl, nil)
require.NoError(t, err)
require.Len(t, groups, 2)
require.Equal(t, []pdhttp.AffinityGroupKeyRange{{
StartKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(1)...),
EndKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(2)...),
}}, groups[ddl.GetPartitionAffinityGroupID(50, 1)])
require.Equal(t, []pdhttp.AffinityGroupKeyRange{{
StartKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(3)...),
EndKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(4)...),
}}, groups[ddl.GetPartitionAffinityGroupID(50, 3)])
}
func TestAffinityBuildGroupDefinitionsPartitionMissing(t *testing.T) {
tbl := &model.TableInfo{
ID: 1,
Affinity: &model.TableAffinityInfo{Level: ast.TableAffinityLevelPartition},
}
_, err := ddl.BuildAffinityGroupDefinitionsForTest(nil, tbl, nil)
require.Error(t, err)
}
// PD interaction tests
type affinityGroupCheck struct {
tableID int64
partitionIDs []int64
shouldExist bool
rangeCount int
comment string
}
func (c *affinityGroupCheck) check(t *testing.T) {
ctx := context.Background()
// Collect all group IDs to check
var groupIDs []string
if len(c.partitionIDs) == 0 {
// Non-partitioned table
groupIDs = []string{ddl.GetTableAffinityGroupID(c.tableID)}
} else {
// Partitioned table
for _, partID := range c.partitionIDs {
groupIDs = append(groupIDs, ddl.GetPartitionAffinityGroupID(c.tableID, partID))
}
}
// Get affinity groups from PD
groups, err := affinity.GetGroups(ctx, groupIDs)
require.NoError(t, err, c.comment)
if c.shouldExist {
require.Equal(t, len(groupIDs), len(groups), c.comment)
for _, id := range groupIDs {
group, ok := groups[id]
require.True(t, ok, "group %s should exist, comment: %s", id, c.comment)
require.NotNil(t, group, c.comment)
require.Equal(t, id, group.ID, c.comment)
if c.rangeCount > 0 {
require.Equal(t, c.rangeCount, group.RangeCount, c.comment)
}
}
} else {
require.Empty(t, groups, "groups should not exist, comment: %s", c.comment)
}
}
func checkAffinityGroupsInPD(t *testing.T, do *domain.Domain, dbName, tbName string, shouldExist bool) {
tblInfo, err := do.InfoSchema().TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tbName))
require.NoError(t, err)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
require.NoError(t, kv.RunInNewTxn(ctx, do.Store(), false, func(ctx context.Context, txn kv.Transaction) error {
_ = meta.NewMutator(txn)
check := &affinityGroupCheck{
tableID: tblInfo.Meta().ID,
shouldExist: shouldExist,
rangeCount: 1,
comment: fmt.Sprintf("table: %s.%s", dbName, tbName),
}
if tblInfo.Meta().Partition != nil {
check.partitionIDs = make([]int64, 0, len(tblInfo.Meta().Partition.Definitions))
for _, def := range tblInfo.Meta().Partition.Definitions {
check.partitionIDs = append(check.partitionIDs, def.ID)
}
}
check.check(t)
return nil
}))
}
func TestAffinityPDInteraction(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
dropTable := func() {
tk.MustExec("drop table if exists t1, t2, t3, tp1, tp2, tp3")
}
dropTable()
defer dropTable()
// Test 1: Create table with affinity='table'
tk.MustExec("create table t1(a int) affinity = 'table'")
checkAffinityGroupsInPD(t, dom, "test", "t1", true)
// Test 2: Create partitioned table with affinity='partition'
tk.MustExec("create table tp1(a int) affinity = 'partition' partition by hash(a) partitions 4")
checkAffinityGroupsInPD(t, dom, "test", "tp1", true)
// Test 3: Alter table to remove affinity
tk.MustExec("alter table t1 affinity = ''")
checkAffinityGroupsInPD(t, dom, "test", "t1", false)
// Test 4: Alter table to add affinity back
tk.MustExec("alter table t1 affinity = 'table'")
checkAffinityGroupsInPD(t, dom, "test", "t1", true)
// Test 5: Drop table should clean up affinity groups
// Get table ID before drop
t1Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t1"))
require.NoError(t, err)
t1ID := t1Info.Meta().ID
tk.MustExec("drop table t1")
// Verify affinity groups are deleted
ctx := context.Background()
groups, err := affinity.GetGroups(ctx, []string{ddl.GetTableAffinityGroupID(t1ID)})
require.NoError(t, err)
require.Empty(t, groups, "affinity groups should be deleted after dropping table")
// Test 6: TRUNCATE TABLE should preserve affinity
tk.MustExec("create table t2(a int) affinity = 'table'")
checkAffinityGroupsInPD(t, dom, "test", "t2", true)
t2Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t2"))
require.NoError(t, err)
oldTableID := t2Info.Meta().ID
tk.MustExec("truncate table t2")
checkAffinityGroupsInPD(t, dom, "test", "t2", true)
// Old table ID's affinity group should be deleted
groups, err = affinity.GetGroups(ctx, []string{ddl.GetTableAffinityGroupID(oldTableID)})
require.NoError(t, err)
require.Empty(t, groups, "old table's affinity groups should be deleted after truncate")
// Test 7: TRUNCATE PARTITION should preserve affinity
tk.MustExec("create table tp2(a int) affinity = 'partition' partition by range(a) (partition p0 values less than (10), partition p1 values less than (20))")
checkAffinityGroupsInPD(t, dom, "test", "tp2", true)
// Get partition ID before truncate
tblInfo, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("tp2"))
require.NoError(t, err)
oldPartitionID := tblInfo.Meta().Partition.Definitions[0].ID
tk.MustExec("alter table tp2 truncate partition p0")
checkAffinityGroupsInPD(t, dom, "test", "tp2", true)
// Old partition's affinity group should be deleted
groups, err = affinity.GetGroups(ctx, []string{ddl.GetPartitionAffinityGroupID(tblInfo.Meta().ID, oldPartitionID)})
require.NoError(t, err)
require.Empty(t, groups, "old partition's affinity group should be deleted after truncate partition")
// Test 8: ALTER TABLE AFFINITY idempotency - partition -> partition
tk.MustExec("create table tp3(a int) affinity = 'partition' partition by hash(a) partitions 3")
checkAffinityGroupsInPD(t, dom, "test", "tp3", true)
// Set affinity to partition again (idempotent operation)
tk.MustExec("alter table tp3 affinity = 'partition'")
checkAffinityGroupsInPD(t, dom, "test", "tp3", true)
// Test 9: ALTER TABLE AFFINITY idempotency - table -> table
tk.MustExec("create table t3(a int) affinity = 'table'")
checkAffinityGroupsInPD(t, dom, "test", "t3", true)
// Set affinity to table again (idempotent operation)
tk.MustExec("alter table t3 affinity = 'table'")
checkAffinityGroupsInPD(t, dom, "test", "t3", true)
}
func TestAffinityDropDatabase(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists test_affinity_db")
tk.MustExec("create database test_affinity_db")
tk.MustExec("use test_affinity_db")
// Create multiple tables with affinity
tk.MustExec("create table t1(a int) affinity = 'table'")
tk.MustExec("create table t2(a int) affinity = 'table'")
tk.MustExec("create table tp1(a int) affinity = 'partition' partition by hash(a) partitions 2")
checkAffinityGroupsInPD(t, dom, "test_affinity_db", "t1", true)
checkAffinityGroupsInPD(t, dom, "test_affinity_db", "t2", true)
checkAffinityGroupsInPD(t, dom, "test_affinity_db", "tp1", true)
// Get table IDs before dropping database
t1Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test_affinity_db"), ast.NewCIStr("t1"))
require.NoError(t, err)
t2Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test_affinity_db"), ast.NewCIStr("t2"))
require.NoError(t, err)
tp1Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test_affinity_db"), ast.NewCIStr("tp1"))
require.NoError(t, err)
// Drop database should clean up all affinity groups
tk.MustExec("drop database test_affinity_db")
ctx := context.Background()
groupIDs := make([]string, 0, 2+len(tp1Info.Meta().Partition.Definitions))
groupIDs = append(groupIDs, ddl.GetTableAffinityGroupID(t1Info.Meta().ID))
groupIDs = append(groupIDs, ddl.GetTableAffinityGroupID(t2Info.Meta().ID))
for _, def := range tp1Info.Meta().Partition.Definitions {
groupIDs = append(groupIDs, ddl.GetPartitionAffinityGroupID(tp1Info.Meta().ID, def.ID))
}
groups, err := affinity.GetGroups(ctx, groupIDs)
require.NoError(t, err)
require.Empty(t, groups, "all affinity groups should be deleted after dropping database")
}