298 lines
11 KiB
Go
298 lines
11 KiB
Go
// Copyright 2025 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ddl_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
|
|
"github.com/pingcap/tidb/pkg/ddl"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/domain/affinity"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/meta"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/testkit"
|
|
"github.com/stretchr/testify/require"
|
|
tikv "github.com/tikv/client-go/v2/tikv"
|
|
pdhttp "github.com/tikv/pd/client/http"
|
|
)
|
|
|
|
// Unit tests for keyrange building
|
|
|
|
type mockCodec struct {
|
|
tikv.Codec
|
|
}
|
|
|
|
func (mockCodec) EncodeRegionRange(start, end []byte) ([]byte, []byte) {
|
|
return append([]byte("k:"), start...), append([]byte("k:"), end...)
|
|
}
|
|
|
|
func TestAffinityBuildGroupDefinitionsTable(t *testing.T) {
|
|
tbl := &model.TableInfo{
|
|
ID: 123,
|
|
Affinity: &model.TableAffinityInfo{Level: ast.TableAffinityLevelTable},
|
|
}
|
|
|
|
groups, err := ddl.BuildAffinityGroupDefinitionsForTest(&mockCodec{}, tbl, nil)
|
|
require.NoError(t, err)
|
|
require.Len(t, groups, 1)
|
|
|
|
ranges := groups[ddl.GetTableAffinityGroupID(123)]
|
|
require.Len(t, ranges, 1)
|
|
require.Equal(t, pdhttp.AffinityGroupKeyRange{
|
|
StartKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(123)...),
|
|
EndKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(124)...),
|
|
}, ranges[0])
|
|
}
|
|
|
|
func TestAffinityBuildGroupDefinitionsPartition(t *testing.T) {
|
|
tbl := &model.TableInfo{
|
|
ID: 50,
|
|
Affinity: &model.TableAffinityInfo{Level: ast.TableAffinityLevelPartition},
|
|
Partition: &model.PartitionInfo{
|
|
Enable: true,
|
|
Definitions: []model.PartitionDefinition{
|
|
{ID: 1},
|
|
{ID: 3},
|
|
},
|
|
},
|
|
}
|
|
|
|
groups, err := ddl.BuildAffinityGroupDefinitionsForTest(&mockCodec{}, tbl, nil)
|
|
require.NoError(t, err)
|
|
require.Len(t, groups, 2)
|
|
|
|
require.Equal(t, []pdhttp.AffinityGroupKeyRange{{
|
|
StartKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(1)...),
|
|
EndKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(2)...),
|
|
}}, groups[ddl.GetPartitionAffinityGroupID(50, 1)])
|
|
require.Equal(t, []pdhttp.AffinityGroupKeyRange{{
|
|
StartKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(3)...),
|
|
EndKey: append([]byte("k:"), tablecodec.EncodeTablePrefix(4)...),
|
|
}}, groups[ddl.GetPartitionAffinityGroupID(50, 3)])
|
|
}
|
|
|
|
func TestAffinityBuildGroupDefinitionsPartitionMissing(t *testing.T) {
|
|
tbl := &model.TableInfo{
|
|
ID: 1,
|
|
Affinity: &model.TableAffinityInfo{Level: ast.TableAffinityLevelPartition},
|
|
}
|
|
|
|
_, err := ddl.BuildAffinityGroupDefinitionsForTest(nil, tbl, nil)
|
|
require.Error(t, err)
|
|
}
|
|
|
|
// PD interaction tests
|
|
|
|
type affinityGroupCheck struct {
|
|
tableID int64
|
|
partitionIDs []int64
|
|
shouldExist bool
|
|
rangeCount int
|
|
comment string
|
|
}
|
|
|
|
func (c *affinityGroupCheck) check(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
// Collect all group IDs to check
|
|
var groupIDs []string
|
|
if len(c.partitionIDs) == 0 {
|
|
// Non-partitioned table
|
|
groupIDs = []string{ddl.GetTableAffinityGroupID(c.tableID)}
|
|
} else {
|
|
// Partitioned table
|
|
for _, partID := range c.partitionIDs {
|
|
groupIDs = append(groupIDs, ddl.GetPartitionAffinityGroupID(c.tableID, partID))
|
|
}
|
|
}
|
|
|
|
// Get affinity groups from PD
|
|
groups, err := affinity.GetGroups(ctx, groupIDs)
|
|
require.NoError(t, err, c.comment)
|
|
|
|
if c.shouldExist {
|
|
require.Equal(t, len(groupIDs), len(groups), c.comment)
|
|
for _, id := range groupIDs {
|
|
group, ok := groups[id]
|
|
require.True(t, ok, "group %s should exist, comment: %s", id, c.comment)
|
|
require.NotNil(t, group, c.comment)
|
|
require.Equal(t, id, group.ID, c.comment)
|
|
if c.rangeCount > 0 {
|
|
require.Equal(t, c.rangeCount, group.RangeCount, c.comment)
|
|
}
|
|
}
|
|
} else {
|
|
require.Empty(t, groups, "groups should not exist, comment: %s", c.comment)
|
|
}
|
|
}
|
|
|
|
func checkAffinityGroupsInPD(t *testing.T, do *domain.Domain, dbName, tbName string, shouldExist bool) {
|
|
tblInfo, err := do.InfoSchema().TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tbName))
|
|
require.NoError(t, err)
|
|
|
|
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
|
|
require.NoError(t, kv.RunInNewTxn(ctx, do.Store(), false, func(ctx context.Context, txn kv.Transaction) error {
|
|
_ = meta.NewMutator(txn)
|
|
|
|
check := &affinityGroupCheck{
|
|
tableID: tblInfo.Meta().ID,
|
|
shouldExist: shouldExist,
|
|
rangeCount: 1,
|
|
comment: fmt.Sprintf("table: %s.%s", dbName, tbName),
|
|
}
|
|
|
|
if tblInfo.Meta().Partition != nil {
|
|
check.partitionIDs = make([]int64, 0, len(tblInfo.Meta().Partition.Definitions))
|
|
for _, def := range tblInfo.Meta().Partition.Definitions {
|
|
check.partitionIDs = append(check.partitionIDs, def.ID)
|
|
}
|
|
}
|
|
|
|
check.check(t)
|
|
return nil
|
|
}))
|
|
}
|
|
|
|
func TestAffinityPDInteraction(t *testing.T) {
|
|
store, dom := testkit.CreateMockStoreAndDomain(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
dropTable := func() {
|
|
tk.MustExec("drop table if exists t1, t2, t3, tp1, tp2, tp3")
|
|
}
|
|
dropTable()
|
|
defer dropTable()
|
|
|
|
// Test 1: Create table with affinity='table'
|
|
tk.MustExec("create table t1(a int) affinity = 'table'")
|
|
checkAffinityGroupsInPD(t, dom, "test", "t1", true)
|
|
|
|
// Test 2: Create partitioned table with affinity='partition'
|
|
tk.MustExec("create table tp1(a int) affinity = 'partition' partition by hash(a) partitions 4")
|
|
checkAffinityGroupsInPD(t, dom, "test", "tp1", true)
|
|
|
|
// Test 3: Alter table to remove affinity
|
|
tk.MustExec("alter table t1 affinity = ''")
|
|
checkAffinityGroupsInPD(t, dom, "test", "t1", false)
|
|
|
|
// Test 4: Alter table to add affinity back
|
|
tk.MustExec("alter table t1 affinity = 'table'")
|
|
checkAffinityGroupsInPD(t, dom, "test", "t1", true)
|
|
|
|
// Test 5: Drop table should clean up affinity groups
|
|
// Get table ID before drop
|
|
t1Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t1"))
|
|
require.NoError(t, err)
|
|
t1ID := t1Info.Meta().ID
|
|
|
|
tk.MustExec("drop table t1")
|
|
// Verify affinity groups are deleted
|
|
ctx := context.Background()
|
|
groups, err := affinity.GetGroups(ctx, []string{ddl.GetTableAffinityGroupID(t1ID)})
|
|
require.NoError(t, err)
|
|
require.Empty(t, groups, "affinity groups should be deleted after dropping table")
|
|
|
|
// Test 6: TRUNCATE TABLE should preserve affinity
|
|
tk.MustExec("create table t2(a int) affinity = 'table'")
|
|
checkAffinityGroupsInPD(t, dom, "test", "t2", true)
|
|
|
|
t2Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t2"))
|
|
require.NoError(t, err)
|
|
oldTableID := t2Info.Meta().ID
|
|
|
|
tk.MustExec("truncate table t2")
|
|
checkAffinityGroupsInPD(t, dom, "test", "t2", true)
|
|
// Old table ID's affinity group should be deleted
|
|
groups, err = affinity.GetGroups(ctx, []string{ddl.GetTableAffinityGroupID(oldTableID)})
|
|
require.NoError(t, err)
|
|
require.Empty(t, groups, "old table's affinity groups should be deleted after truncate")
|
|
|
|
// Test 7: TRUNCATE PARTITION should preserve affinity
|
|
tk.MustExec("create table tp2(a int) affinity = 'partition' partition by range(a) (partition p0 values less than (10), partition p1 values less than (20))")
|
|
checkAffinityGroupsInPD(t, dom, "test", "tp2", true)
|
|
|
|
// Get partition ID before truncate
|
|
tblInfo, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("tp2"))
|
|
require.NoError(t, err)
|
|
oldPartitionID := tblInfo.Meta().Partition.Definitions[0].ID
|
|
|
|
tk.MustExec("alter table tp2 truncate partition p0")
|
|
checkAffinityGroupsInPD(t, dom, "test", "tp2", true)
|
|
// Old partition's affinity group should be deleted
|
|
groups, err = affinity.GetGroups(ctx, []string{ddl.GetPartitionAffinityGroupID(tblInfo.Meta().ID, oldPartitionID)})
|
|
require.NoError(t, err)
|
|
require.Empty(t, groups, "old partition's affinity group should be deleted after truncate partition")
|
|
|
|
// Test 8: ALTER TABLE AFFINITY idempotency - partition -> partition
|
|
tk.MustExec("create table tp3(a int) affinity = 'partition' partition by hash(a) partitions 3")
|
|
checkAffinityGroupsInPD(t, dom, "test", "tp3", true)
|
|
// Set affinity to partition again (idempotent operation)
|
|
tk.MustExec("alter table tp3 affinity = 'partition'")
|
|
checkAffinityGroupsInPD(t, dom, "test", "tp3", true)
|
|
|
|
// Test 9: ALTER TABLE AFFINITY idempotency - table -> table
|
|
tk.MustExec("create table t3(a int) affinity = 'table'")
|
|
checkAffinityGroupsInPD(t, dom, "test", "t3", true)
|
|
// Set affinity to table again (idempotent operation)
|
|
tk.MustExec("alter table t3 affinity = 'table'")
|
|
checkAffinityGroupsInPD(t, dom, "test", "t3", true)
|
|
}
|
|
|
|
func TestAffinityDropDatabase(t *testing.T) {
|
|
store, dom := testkit.CreateMockStoreAndDomain(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
|
|
tk.MustExec("drop database if exists test_affinity_db")
|
|
tk.MustExec("create database test_affinity_db")
|
|
tk.MustExec("use test_affinity_db")
|
|
|
|
// Create multiple tables with affinity
|
|
tk.MustExec("create table t1(a int) affinity = 'table'")
|
|
tk.MustExec("create table t2(a int) affinity = 'table'")
|
|
tk.MustExec("create table tp1(a int) affinity = 'partition' partition by hash(a) partitions 2")
|
|
|
|
checkAffinityGroupsInPD(t, dom, "test_affinity_db", "t1", true)
|
|
checkAffinityGroupsInPD(t, dom, "test_affinity_db", "t2", true)
|
|
checkAffinityGroupsInPD(t, dom, "test_affinity_db", "tp1", true)
|
|
|
|
// Get table IDs before dropping database
|
|
t1Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test_affinity_db"), ast.NewCIStr("t1"))
|
|
require.NoError(t, err)
|
|
t2Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test_affinity_db"), ast.NewCIStr("t2"))
|
|
require.NoError(t, err)
|
|
tp1Info, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test_affinity_db"), ast.NewCIStr("tp1"))
|
|
require.NoError(t, err)
|
|
|
|
// Drop database should clean up all affinity groups
|
|
tk.MustExec("drop database test_affinity_db")
|
|
|
|
ctx := context.Background()
|
|
groupIDs := make([]string, 0, 2+len(tp1Info.Meta().Partition.Definitions))
|
|
groupIDs = append(groupIDs, ddl.GetTableAffinityGroupID(t1Info.Meta().ID))
|
|
groupIDs = append(groupIDs, ddl.GetTableAffinityGroupID(t2Info.Meta().ID))
|
|
for _, def := range tp1Info.Meta().Partition.Definitions {
|
|
groupIDs = append(groupIDs, ddl.GetPartitionAffinityGroupID(tp1Info.Meta().ID, def.ID))
|
|
}
|
|
|
|
groups, err := affinity.GetGroups(ctx, groupIDs)
|
|
require.NoError(t, err)
|
|
require.Empty(t, groups, "all affinity groups should be deleted after dropping database")
|
|
}
|