Files
tidb/pkg/ddl/table_mode_test.go
2025-08-11 09:02:45 +00:00

372 lines
15 KiB
Go

// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl_test
import (
"context"
"strings"
"sync"
"testing"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
)
func getClonedTableInfoFromDomain(
t *testing.T,
dbName string,
tableName string,
dom *domain.Domain,
) *model.TableInfo {
tbl, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tableName))
require.NoError(t, err)
return tbl.Meta().Clone()
}
func checkErrorCode(t *testing.T, err error, expected int) {
originErr := errors.Cause(err)
tErr, ok := originErr.(*terror.Error)
require.True(t, ok)
sqlErr := terror.ToSQLError(tErr)
require.Equal(t, expected, int(sqlErr.Code))
}
func TestTableModeBasic(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)
de := domain.DDLExecutor()
tk := testkit.NewTestKit(t, store)
ctx := testkit.NewTestKit(t, store).Session()
// init test
tk.MustExec("use test")
tk.MustExec("create table t1(id int, c1 int, c2 int, index idx1(c1))")
tk.MustExec("create table t2(id int, c1 int, c2 int, index idx1(c1))")
tk.MustExec("create table t3(id int, pid INT, INDEX idx_pid (pid),FOREIGN KEY fk_1 (pid) REFERENCES t1(c1) ON UPDATE SET NULL)")
// For testing create foreign key table as ModeImport
tblInfo := getClonedTableInfoFromDomain(t, "test", "t3", domain)
tblInfo.Name = ast.NewCIStr("t1_foreign_key")
tblInfo.Mode = model.TableModeImport
err := de.CreateTableWithInfo(tk.Session(), ast.NewCIStr("test"), tblInfo, nil, ddl.WithOnExist(ddl.OnExistIgnore))
require.NoError(t, err)
dbInfo, ok := domain.InfoSchema().SchemaByName(ast.NewCIStr("test"))
require.True(t, ok)
testutil.CheckTableMode(t, store, dbInfo, tblInfo, model.TableModeImport)
// not allow delete foreign key constraint
tk.MustGetErrCode("ALTER TABLE t1_foreign_key DROP FOREIGN KEY fk_1", errno.ErrProtectedTableMode)
// For testing create table as ModeRestore
tblInfo = getClonedTableInfoFromDomain(t, "test", "t1", domain)
tblInfo.Name = ast.NewCIStr("t1_restore_import")
tblInfo.Mode = model.TableModeRestore
err = de.CreateTableWithInfo(tk.Session(), ast.NewCIStr("test"), tblInfo, nil, ddl.WithOnExist(ddl.OnExistIgnore))
require.NoError(t, err)
dbInfo, ok = domain.InfoSchema().SchemaByName(ast.NewCIStr("test"))
require.True(t, ok)
testutil.CheckTableMode(t, store, dbInfo, tblInfo, model.TableModeRestore)
// For testing accessing table metadata is allowed when table is in ModeRestore
tk.MustExec("show create table t1_restore_import")
tk.MustExec("show table status where Name = 't1_restore_import'")
tk.MustExec("show columns from t1_restore_import")
tk.MustExec("show create table t1_restore_import")
tk.MustExec("show table status where Name = 't1_restore_import'")
tk.MustExec("show index from t1_restore_import")
tk.MustExec("describe t1_restore_import")
tk.MustExec("create table t1_restore_import_2 like t1_restore_import")
tk.MustExec("create view t1_restore_import_view as select * from t1_restore_import")
tk.MustExec("create table foreign_key_child(id int, pid INT, INDEX idx_pid (pid),FOREIGN KEY (pid) REFERENCES t1_restore_import(c1) ON DELETE CASCADE)")
tk.MustExec("drop table foreign_key_child")
// special case allow admin checksum table for import into
tk.MustExec("admin checksum table t1_restore_import;")
// For testing below stmt is not allowed when table is in ModeImport/ModeRestore
// DMLs
tk.MustGetErrCode("select * from t1_restore_import", errno.ErrProtectedTableMode)
tk.MustGetErrCode("explain select * from t1_restore_import", errno.ErrProtectedTableMode)
tk.MustGetErrCode("desc select * from t1_restore_import", errno.ErrProtectedTableMode)
tk.MustGetErrCode("insert into t1_restore_import values(1, 1, 1)", errno.ErrProtectedTableMode)
tk.MustGetErrCode("replace into t1_restore_import values(1,1,1)", errno.ErrProtectedTableMode)
tk.MustGetErrCode("update t1_restore_import set id = 2 where id = 1", errno.ErrProtectedTableMode)
tk.MustGetErrCode("delete from t1_restore_import where id = 2", errno.ErrProtectedTableMode)
tk.MustGetErrCode("truncate table t1_restore_import", errno.ErrProtectedTableMode)
// DDLs
tk.MustGetErrCode("drop table t1_restore_import", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import rename to t1_new", errno.ErrProtectedTableMode)
tk.MustGetErrCode("rename table t1_restore_import to t1_new", errno.ErrProtectedTableMode)
tk.MustGetErrCode("rename table t1_restore_import to t1_new, t2 to t2_new", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import modify column c2 bigint", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import add column c3 int", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import drop column c2", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import drop index idx1", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import add index idx2(c2)", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import partition by range(id) (partition p0 values less than (100))", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import comment='new comment'", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import convert to character set utf8mb4", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import rename column c1 to c1_new", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import alter column c1 set default 100", errno.ErrProtectedTableMode)
tk.MustGetErrCode("alter table t1_restore_import add foreign key fk_1 (c2) REFERENCES t1(c1) ON UPDATE SET NULL ", errno.ErrProtectedTableMode)
// Transaction related operations
tk.MustExec("begin")
tk.MustGetErrCode("insert into t1_restore_import values(1,1,1)", errno.ErrProtectedTableMode)
tk.MustExec("rollback")
// For testing AlterTable ModeRestore -> ModeImport is not allowed
err = testutil.SetTableMode(ctx, t, store, de, dbInfo, tblInfo, model.TableModeImport)
require.ErrorContains(t, err, "Invalid mode set from (or by default) Restore to Import for table t1_restore_import")
// For testing AlterTableMode ModeRestore -> ModeNormal
err = testutil.SetTableMode(ctx, t, store, de, dbInfo, tblInfo, model.TableModeNormal)
require.NoError(t, err)
// For testing AlterTableMode ModeNormal -> ModeRestore
err = testutil.SetTableMode(ctx, t, store, de, dbInfo, tblInfo, model.TableModeRestore)
require.NoError(t, err)
// For testing AlterTableMode ModeRestore -> ModeRestore
err = testutil.SetTableMode(ctx, t, store, de, dbInfo, tblInfo, model.TableModeRestore)
require.NoError(t, err)
// For testing an exist table with ModeImport is not allowed recreate with ModeRestore from BR
err = testutil.SetTableMode(ctx, t, store, de, dbInfo, tblInfo, model.TableModeNormal)
require.NoError(t, err)
err = testutil.SetTableMode(ctx, t, store, de, dbInfo, tblInfo, model.TableModeImport)
require.NoError(t, err)
tblInfo.Mode = model.TableModeRestore
err = de.CreateTableWithInfo(tk.Session(), ast.NewCIStr("test"), tblInfo, nil, ddl.WithOnExist(ddl.OnExistIgnore))
require.ErrorContains(t, err, "Invalid mode set from (or by default) Import to Restore for table t1_restore_import")
// For testing batch create tables with info
var tblInfo1, tblInfo2, tblInfo3 *model.TableInfo
tblInfo1 = getClonedTableInfoFromDomain(t, "test", "t1", domain)
tblInfo1.Name = ast.NewCIStr("t1_1")
tblInfo1.Mode = model.TableModeNormal
tblInfo2 = getClonedTableInfoFromDomain(t, "test", "t1", domain)
tblInfo2.Name = ast.NewCIStr("t1_2")
tblInfo2.Mode = model.TableModeImport
tblInfo3 = getClonedTableInfoFromDomain(t, "test", "t1", domain)
tblInfo3.Name = ast.NewCIStr("t1_3")
tblInfo3.Mode = model.TableModeRestore
err = de.BatchCreateTableWithInfo(
ctx,
ast.NewCIStr("test"),
[]*model.TableInfo{tblInfo1, tblInfo2, tblInfo3},
ddl.WithOnExist(ddl.OnExistIgnore),
)
require.NoError(t, err)
testutil.CheckTableMode(t, store, dbInfo, tblInfo1, model.TableModeNormal)
testutil.CheckTableMode(t, store, dbInfo, tblInfo2, model.TableModeImport)
testutil.CheckTableMode(t, store, dbInfo, tblInfo3, model.TableModeRestore)
}
func TestTableModeConcurrent(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)
de := domain.DDLExecutor()
tk := testkit.NewTestKit(t, store)
ctx := testkit.NewTestKit(t, store).Session()
tk.MustExec("use test")
tk.MustExec("create table t1(id int)")
dbInfo, ok := domain.InfoSchema().SchemaByName(ast.NewCIStr("test"))
require.True(t, ok)
// Concurrency test1: concurrently alter t1 to ModeImport, expecting one success, one failure.
t1Infos := []*model.TableInfo{
getClonedTableInfoFromDomain(t, "test", "t1", domain),
getClonedTableInfoFromDomain(t, "test", "t1", domain),
}
var wg sync.WaitGroup
wg.Add(len(t1Infos))
errs := make(chan error, len(t1Infos))
for _, info := range t1Infos {
go func(info *model.TableInfo) {
defer wg.Done()
errs <- testutil.SetTableMode(ctx, t, store, de, dbInfo, info, model.TableModeImport)
}(info)
}
wg.Wait()
close(errs)
var successCount int
var failedErr error
for e := range errs {
if e == nil {
successCount++
} else {
failedErr = e
}
}
require.Equal(t, 2, successCount)
require.Nil(t, failedErr)
testutil.CheckTableMode(t, store, dbInfo, t1Infos[0], model.TableModeImport)
// Concurrency test2: concurrently alter t1 to ModeNormal, expecting both success.
t1NormalInfos := []*model.TableInfo{
getClonedTableInfoFromDomain(t, "test", "t1", domain),
getClonedTableInfoFromDomain(t, "test", "t1", domain),
}
var wg2 sync.WaitGroup
wg2.Add(len(t1NormalInfos))
errs2 := make(chan error, len(t1NormalInfos))
for _, info := range t1NormalInfos {
go func(info *model.TableInfo) {
defer wg2.Done()
errs2 <- testutil.SetTableMode(ctx, t, store, de, dbInfo, info, model.TableModeNormal)
}(info)
}
wg2.Wait()
close(errs2)
for e := range errs2 {
require.NoError(t, e)
}
testutil.CheckTableMode(t, store, dbInfo, t1NormalInfos[0], model.TableModeNormal)
// Concurrency test3: concurrently alter t1 to ModeRestore, expecting both success.
t1Infos = []*model.TableInfo{
getClonedTableInfoFromDomain(t, "test", "t1", domain),
getClonedTableInfoFromDomain(t, "test", "t1", domain),
}
var wg3 sync.WaitGroup
wg3.Add(len(t1Infos))
errs = make(chan error, len(t1Infos))
for _, info := range t1Infos {
go func(info *model.TableInfo) {
defer wg3.Done()
errs <- testutil.SetTableMode(ctx, t, store, de, dbInfo, info, model.TableModeRestore)
}(info)
}
wg3.Wait()
close(errs)
successCount = 0
failedErr = nil
for e := range errs {
if e == nil {
successCount++
} else {
failedErr = e
}
}
require.Equal(t, 2, successCount)
require.Nil(t, failedErr)
testutil.CheckTableMode(t, store, dbInfo, t1Infos[0], model.TableModeRestore)
// Concurrency test4: concurrently alter t1 to ModeRestore and ModeImport, expecting one success, one failure.
modes := []model.TableMode{
model.TableModeRestore,
model.TableModeImport,
}
clones := make([]*model.TableInfo, len(modes))
for i := range modes {
clones[i] = getClonedTableInfoFromDomain(t, "test", "t1", domain)
}
var wg4 sync.WaitGroup
wg4.Add(len(modes))
errs = make(chan error, len(modes))
for i, mode := range modes {
go func(clone *model.TableInfo, m model.TableMode) {
defer wg4.Done()
errs <- testutil.SetTableMode(ctx, t, store, de, dbInfo, clone, m)
}(clones[i], mode)
}
wg4.Wait()
close(errs)
var successCount3 int
var failedErr3 error
for e := range errs {
if e == nil {
successCount3++
} else {
failedErr3 = e
}
}
require.Equal(t, 1, successCount3)
require.NotNil(t, failedErr3)
checkErrorCode(t, failedErr3, errno.ErrInvalidTableModeSet)
}
// TestTableModeWithRefreshMeta tests update table meta by txn(exchange partition ID),
// after RefreshMeta can modify TableMode.
func TestTableModeWithRefreshMeta(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)
de := domain.DDLExecutor()
tk := testkit.NewTestKit(t, store)
sctx := testkit.NewTestKit(t, store).Session()
tk.MustExec("use test")
tk.MustExec("create table nt(id int, c1 int)")
tk.MustExec("create table pt(id int, c1 int) partition by range (c1) (partition p10 values less than (10))")
tk.MustExec("insert into nt values(3, 3), (4, 4), (5, 5)")
tk.MustExec("insert into pt values(1, 1), (2, 2)")
dbInfo, ok := domain.InfoSchema().SchemaByName(ast.NewCIStr("test"))
require.True(t, ok)
require.NotNil(t, dbInfo)
ntInfo, ptInfo := getClonedTableInfoFromDomain(t, "test", "nt", domain), getClonedTableInfoFromDomain(t, "test", "pt", domain)
// change non-partition table ID to partition ID
partID := ptInfo.Partition.Definitions[0].ID
recreateTableWithPartitionID(t, &store, dbInfo.ID, ntInfo, ptInfo, "p10")
ntInfo = testutil.GetTableInfoByTxn(t, store, dbInfo.ID, ntInfo.ID)
require.Equal(t, partID, ntInfo.ID)
// set table mode failure before refresh meta
err := testutil.SetTableMode(sctx, t, store, de, dbInfo, ntInfo, model.TableModeImport)
require.ErrorContains(t, err, "doesn't exist")
testutil.RefreshMeta(sctx, t, de, dbInfo.ID, ntInfo.ID, dbInfo.Name.O, ntInfo.Name.O)
// set table mode success after refresh meta
err = testutil.SetTableMode(sctx, t, store, de, dbInfo, ntInfo, model.TableModeImport)
require.NoError(t, err)
tk.MustGetErrCode("select * from nt", errno.ErrProtectedTableMode)
err = testutil.SetTableMode(sctx, t, store, de, dbInfo, ntInfo, model.TableModeNormal)
require.NoError(t, err)
tk.MustExec("select * from nt")
}
// recreateTableWithPartitionID update table ID to partition ID and recreate table.
func recreateTableWithPartitionID(t *testing.T, store *kv.Storage, dbID int64, ntInfo, ptInfo *model.TableInfo, partName string) {
_, partDef, err := getPartitionDef(ptInfo, partName)
require.NoError(t, err)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, *store, true, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
err := m.DropTableOrView(dbID, ntInfo.ID)
require.NoError(t, err)
ntInfo.ID = partDef.ID
err = m.CreateTableOrView(dbID, ntInfo)
require.NoError(t, err)
return nil
})
require.NoError(t, err)
}
func getPartitionDef(tblInfo *model.TableInfo, partName string) (index int, def *model.PartitionDefinition, _ error) {
defs := tblInfo.Partition.Definitions
for i := 0; i < len(defs); i++ {
if strings.EqualFold(defs[i].Name.L, strings.ToLower(partName)) {
return i, &(defs[i]), nil
}
}
return index, nil, table.ErrUnknownPartition.GenWithStackByArgs(partName, tblInfo.Name.O)
}