Files
tidb/pkg/ddl/schematracker/dm_tracker.go

1192 lines
37 KiB
Go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2013 The ql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSES/QL-LICENSE file.
package schematracker
import (
"context"
"slices"
"strings"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/metabuild"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
field_types "github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/dbterror"
)
// SchemaTracker is used to track schema changes by DM. It implements
// ddl.Executor interface and by applying DDL, it updates the table structure to
// keep tracked with upstream changes.
//
// It embeds an InfoStore which stores DBInfo and TableInfo. The DBInfo and
// TableInfo can be treated as immutable, so after reading them by SchemaByName
// or TableByName, later modifications made by SchemaTracker will not change
// them. SchemaTracker is not thread-safe.
type SchemaTracker struct {
*InfoStore
}
var _ ddl.Executor = (*SchemaTracker)(nil)
// NewSchemaTracker creates a SchemaTracker. lowerCaseTableNames has the same meaning as MySQL variable lower_case_table_names.
func NewSchemaTracker(lowerCaseTableNames int) SchemaTracker {
return SchemaTracker{
InfoStore: NewInfoStore(lowerCaseTableNames),
}
}
// CreateSchema implements the DDL interface.
func (d *SchemaTracker) CreateSchema(ctx sessionctx.Context, stmt *ast.CreateDatabaseStmt) error {
// we only consider explicit charset/collate, if not found, fallback to default charset/collate.
charsetOpt := ast.CharsetOpt{}
for _, val := range stmt.Options {
switch val.Tp {
case ast.DatabaseOptionCharset:
charsetOpt.Chs = val.Value
case ast.DatabaseOptionCollate:
charsetOpt.Col = val.Value
}
}
utf8MB4DefaultColl := ""
if ctx != nil {
utf8MB4DefaultColl = ctx.GetSessionVars().DefaultCollationForUTF8MB4
}
chs, coll, err := ddl.ResolveCharsetCollation([]ast.CharsetOpt{charsetOpt}, utf8MB4DefaultColl)
if err != nil {
return errors.Trace(err)
}
dbInfo := &model.DBInfo{Name: stmt.Name, Charset: chs, Collate: coll}
onExist := ddl.OnExistError
if stmt.IfNotExists {
onExist = ddl.OnExistIgnore
}
return d.CreateSchemaWithInfo(ctx, dbInfo, onExist)
}
// CreateTestDB creates the `test` database, which is the default behavior of TiDB.
func (d *SchemaTracker) CreateTestDB(ctx sessionctx.Context) {
_ = d.CreateSchema(ctx, &ast.CreateDatabaseStmt{
Name: ast.NewCIStr("test"),
})
}
// CreateSchemaWithInfo implements the DDL interface.
func (d *SchemaTracker) CreateSchemaWithInfo(_ sessionctx.Context, dbInfo *model.DBInfo, onExist ddl.OnExist) error {
oldInfo := d.SchemaByName(dbInfo.Name)
if oldInfo != nil {
if onExist == ddl.OnExistIgnore {
return nil
}
// not support MariaDB's CREATE OR REPLACE SCHEMA
return infoschema.ErrDatabaseExists.GenWithStackByArgs(dbInfo.Name)
}
d.PutSchema(dbInfo)
return nil
}
// AlterSchema implements the DDL interface.
func (d *SchemaTracker) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (err error) {
dbInfo := d.SchemaByName(stmt.Name)
if dbInfo == nil {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(stmt.Name.O)
}
dbInfo = dbInfo.Clone()
defer func() {
if err == nil {
d.PutSchema(dbInfo)
}
}()
// Resolve target charset and collation from options.
var (
toCharset, toCollate string
)
for _, val := range stmt.Options {
switch val.Tp {
case ast.DatabaseOptionCharset:
if toCharset == "" {
toCharset = val.Value
} else if toCharset != val.Value {
return dbterror.ErrConflictingDeclarations.GenWithStackByArgs(toCharset, val.Value)
}
case ast.DatabaseOptionCollate:
info, errGetCollate := collate.GetCollationByName(val.Value)
if errGetCollate != nil {
return errors.Trace(errGetCollate)
}
if toCharset == "" {
toCharset = info.CharsetName
} else if toCharset != info.CharsetName {
return dbterror.ErrConflictingDeclarations.GenWithStackByArgs(toCharset, info.CharsetName)
}
toCollate = info.Name
}
}
if toCollate == "" {
if toCollate, err = ddl.GetDefaultCollation(toCharset, ctx.GetSessionVars().DefaultCollationForUTF8MB4); err != nil {
return errors.Trace(err)
}
}
dbInfo.Charset = toCharset
dbInfo.Collate = toCollate
return nil
}
// DropSchema implements the DDL interface.
func (d *SchemaTracker) DropSchema(_ sessionctx.Context, stmt *ast.DropDatabaseStmt) error {
ok := d.DeleteSchema(stmt.Name)
if !ok {
if stmt.IfExists {
return nil
}
return infoschema.ErrDatabaseDropExists.GenWithStackByArgs(stmt.Name)
}
return nil
}
// CreateTable implements the DDL interface.
func (d *SchemaTracker) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) error {
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
schema := d.SchemaByName(ident.Schema)
if schema == nil {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ident.Schema)
}
var (
referTbl *model.TableInfo
err error
)
if s.ReferTable != nil {
referTbl, err = d.TableByName(context.Background(), s.ReferTable.Schema, s.ReferTable.Name)
if err != nil {
return infoschema.ErrTableNotExists.GenWithStackByArgs(s.ReferTable.Schema, s.ReferTable.Name)
}
}
metaBuildCtx := ddl.NewMetaBuildContextWithSctx(
ctx,
// suppress ErrTooLongKey
metabuild.WithSuppressTooLongIndexErr(true),
// support drop PK
metabuild.WithClusteredIndexDefMode(vardef.ClusteredIndexDefModeOff),
)
// build tableInfo
var (
tbInfo *model.TableInfo
)
if s.ReferTable != nil {
tbInfo, err = ddl.BuildTableInfoWithLike(ident, referTbl, s)
} else {
tbInfo, err = ddl.BuildTableInfoWithStmt(metaBuildCtx, s, schema.Charset, schema.Collate, nil)
}
if err != nil {
return errors.Trace(err)
}
// TODO: to reuse the constant fold of expression in partition range definition we use CheckTableInfoValidWithStmt,
// but it may also introduce unwanted limit check in DM's use case. Should check it later.
if err = ddl.CheckTableInfoValidWithStmt(metaBuildCtx, tbInfo, s); err != nil {
return err
}
onExist := ddl.OnExistError
if s.IfNotExists {
onExist = ddl.OnExistIgnore
}
return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, nil, ddl.WithOnExist(onExist))
}
// CreateTableWithInfo implements the DDL interface.
func (d *SchemaTracker) CreateTableWithInfo(
_ sessionctx.Context,
dbName ast.CIStr,
info *model.TableInfo,
_ []model.InvolvingSchemaInfo,
cs ...ddl.CreateTableOption,
) error {
c := ddl.GetCreateTableConfig(cs)
schema := d.SchemaByName(dbName)
if schema == nil {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(dbName)
}
oldTable, _ := d.TableByName(context.Background(), dbName, info.Name)
if oldTable != nil {
switch c.OnExist {
case ddl.OnExistIgnore:
return nil
case ddl.OnExistReplace:
return d.PutTable(dbName, info)
default:
return infoschema.ErrTableExists.GenWithStackByArgs(ast.Ident{Schema: dbName, Name: info.Name})
}
}
return d.PutTable(dbName, info)
}
// CreateView implements the DDL interface.
func (d *SchemaTracker) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) error {
viewInfo, err := ddl.BuildViewInfo(s)
if err != nil {
return err
}
cols := make([]*table.Column, len(s.Cols))
for i, v := range s.Cols {
cols[i] = table.ToColumn(&model.ColumnInfo{
Name: v,
ID: int64(i),
Offset: i,
State: model.StatePublic,
})
}
tbInfo, err := ddl.BuildTableInfo(ddl.NewMetaBuildContextWithSctx(ctx), s.ViewName.Name, cols, nil, "", "")
if err != nil {
return err
}
tbInfo.View = viewInfo
onExist := ddl.OnExistError
if s.OrReplace {
onExist = ddl.OnExistReplace
}
return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, nil, ddl.WithOnExist(onExist))
}
// DropTable implements the DDL interface.
func (d *SchemaTracker) DropTable(_ sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
notExistTables := make([]string, 0, len(stmt.Tables))
for _, name := range stmt.Tables {
tb, err := d.TableByName(context.Background(), name.Schema, name.Name)
if err != nil || !tb.IsBaseTable() {
if stmt.IfExists {
continue
}
id := ast.Ident{Schema: name.Schema, Name: name.Name}
notExistTables = append(notExistTables, id.String())
// For statement dropping multiple tables, we should return error after try to drop all tables.
continue
}
// Without IF EXISTS, the statement drops all named tables that do exist, and returns an error indicating which
// nonexisting tables it was unable to drop.
// https://dev.mysql.com/doc/refman/5.7/en/drop-table.html
_ = d.DeleteTable(name.Schema, name.Name)
}
if len(notExistTables) > 0 {
return infoschema.ErrTableDropExists.GenWithStackByArgs(strings.Join(notExistTables, ","))
}
return nil
}
// RecoverTable implements the DDL interface, which is no-op in DM's case.
func (*SchemaTracker) RecoverTable(_ sessionctx.Context, _ *model.RecoverTableInfo) (err error) {
return nil
}
// FlashbackCluster implements the DDL interface, which is no-op in DM's case.
func (*SchemaTracker) FlashbackCluster(_ sessionctx.Context, _ uint64) (err error) {
return nil
}
// RecoverSchema implements the DDL interface, which is no-op in DM's case.
func (*SchemaTracker) RecoverSchema(_ sessionctx.Context, _ *model.RecoverSchemaInfo) (err error) {
return nil
}
// DropView implements the DDL interface.
func (d *SchemaTracker) DropView(_ sessionctx.Context, stmt *ast.DropTableStmt) (err error) {
notExistTables := make([]string, 0, len(stmt.Tables))
for _, name := range stmt.Tables {
tb, err := d.TableByName(context.Background(), name.Schema, name.Name)
if err != nil {
if stmt.IfExists {
continue
}
id := ast.Ident{Schema: name.Schema, Name: name.Name}
notExistTables = append(notExistTables, id.String())
continue
}
// the behaviour is fast fail when type is wrong.
if !tb.IsView() {
return dbterror.ErrWrongObject.GenWithStackByArgs(name.Schema, name.Name, "VIEW")
}
_ = d.DeleteTable(name.Schema, name.Name)
}
if len(notExistTables) > 0 {
return infoschema.ErrTableDropExists.GenWithStackByArgs(strings.Join(notExistTables, ","))
}
return nil
}
// CreateIndex implements the DDL interface.
func (d *SchemaTracker) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error {
ident := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name}
return d.createIndex(ctx, ident, stmt.KeyType, ast.NewCIStr(stmt.IndexName),
stmt.IndexPartSpecifications, stmt.IndexOption, stmt.IfNotExists)
}
func (d *SchemaTracker) putTableIfNoError(err error, dbName ast.CIStr, tbInfo *model.TableInfo) {
if err != nil {
return
}
_ = d.PutTable(dbName, tbInfo)
}
// createIndex is shared by CreateIndex and AlterTable.
func (d *SchemaTracker) createIndex(
ctx sessionctx.Context,
ti ast.Ident,
keyType ast.IndexKeyType,
indexName ast.CIStr,
indexPartSpecifications []*ast.IndexPartSpecification,
indexOption *ast.IndexOption,
ifNotExists bool,
) (err error) {
unique := keyType == ast.IndexKeyTypeUnique
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}
defer d.putTableIfNoError(err, ti.Schema, tblInfo)
t := tables.MockTableFromMeta(tblInfo)
// Deal with anonymous index.
if len(indexName.L) == 0 {
colName := ast.NewCIStr("expression_index")
if indexPartSpecifications[0].Column != nil {
colName = indexPartSpecifications[0].Column.Name
}
indexName = ddl.GetName4AnonymousIndex(t, colName, ast.NewCIStr(""))
}
if indexInfo := tblInfo.FindIndexByName(indexName.L); indexInfo != nil {
if ifNotExists {
return nil
}
return dbterror.ErrDupKeyName.GenWithStack("index already exist %s", indexName)
}
hiddenCols, err := ddl.BuildHiddenColumnInfo(ddl.NewMetaBuildContextWithSctx(ctx), indexPartSpecifications, indexName, t.Meta(), t.Cols())
if err != nil {
return err
}
for _, hiddenCol := range hiddenCols {
ddl.InitAndAddColumnToTable(tblInfo, hiddenCol)
}
indexInfo, err := ddl.BuildIndexInfo(
ddl.NewMetaBuildContextWithSctx(ctx),
tblInfo,
indexName,
false,
unique,
model.ColumnarIndexTypeNA,
indexPartSpecifications,
indexOption,
model.StatePublic,
)
if err != nil {
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(hiddenCols)]
return err
}
indexInfo.ID = ddl.AllocateIndexID(tblInfo)
tblInfo.Indices = append(tblInfo.Indices, indexInfo)
ddl.AddIndexColumnFlag(tblInfo, indexInfo)
return nil
}
// DropIndex implements the DDL interface.
func (d *SchemaTracker) DropIndex(ctx sessionctx.Context, stmt *ast.DropIndexStmt) error {
ti := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name}
err := d.dropIndex(ctx, ti, ast.NewCIStr(stmt.IndexName), stmt.IfExists)
if (infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err)) && stmt.IfExists {
err = nil
}
return err
}
// dropIndex is shared by DropIndex and AlterTable.
func (d *SchemaTracker) dropIndex(_ sessionctx.Context, ti ast.Ident, indexName ast.CIStr, ifExists bool) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return infoschema.ErrTableNotExists.GenWithStackByArgs(ti.Schema, ti.Name)
}
defer d.putTableIfNoError(err, ti.Schema, tblInfo)
indexInfo := tblInfo.FindIndexByName(indexName.L)
if indexInfo == nil {
if ifExists {
return nil
}
return dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName)
}
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if idx.Name.L != indexInfo.Name.L {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices
ddl.DropIndexColumnFlag(tblInfo, indexInfo)
ddl.RemoveDependentHiddenColumns(tblInfo, indexInfo)
return nil
}
// addColumn is used by AlterTable.
func (d *SchemaTracker) addColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) (err error) {
specNewColumn := spec.NewColumns[0]
schema := d.SchemaByName(ti.Schema)
if schema == nil {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(ti.Schema)
}
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}
defer d.putTableIfNoError(err, ti.Schema, tblInfo)
t := tables.MockTableFromMeta(tblInfo)
colName := specNewColumn.Name.Name.O
if col := table.FindCol(t.Cols(), colName); col != nil {
if spec.IfNotExists {
return nil
}
return infoschema.ErrColumnExists.GenWithStackByArgs(colName)
}
col, err := ddl.CreateNewColumn(ctx, schema, spec, t, specNewColumn)
if err != nil {
return errors.Trace(err)
}
err = ddl.CheckAfterPositionExists(tblInfo, spec.Position)
if err != nil {
return errors.Trace(err)
}
columnInfo := ddl.InitAndAddColumnToTable(tblInfo, col.ColumnInfo)
offset, err := ddl.LocateOffsetToMove(columnInfo.Offset, spec.Position, tblInfo)
if err != nil {
return errors.Trace(err)
}
tblInfo.MoveColumnInfo(columnInfo.Offset, offset)
columnInfo.State = model.StatePublic
return nil
}
// dropColumn is used by AlterTable.
func (d *SchemaTracker) dropColumn(_ sessionctx.Context, ti ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return err
}
defer d.putTableIfNoError(err, ti.Schema, tblInfo)
colName := spec.OldColumnName.Name
colInfo := tblInfo.FindPublicColumnByName(colName.L)
if colInfo == nil {
if spec.IfExists {
return nil
}
return dbterror.ErrCantDropFieldOrKey.GenWithStackByArgs(colName)
}
if len(tblInfo.Columns) == 1 {
return dbterror.ErrCantRemoveAllFields.GenWithStack("can't drop only column %s in table %s",
colName, tblInfo.Name)
}
// do drop column
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
i, _ := model.FindIndexColumnByName(idx.Columns, colName.L)
if i == -1 {
newIndices = append(newIndices, idx)
continue
}
idx.Columns = slices.Delete(idx.Columns, i, i+1)
if len(idx.Columns) == 0 {
continue
}
newIndices = append(newIndices, idx)
}
tblInfo.Indices = newIndices
return nil
}
// renameColumn is used by AlterTable.
func (d *SchemaTracker) renameColumn(_ sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
oldColName := spec.OldColumnName.Name
newColName := spec.NewColumnName.Name
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}
defer d.putTableIfNoError(err, ident.Schema, tblInfo)
tbl := tables.MockTableFromMeta(tblInfo)
oldCol := table.FindCol(tbl.VisibleCols(), oldColName.L)
if oldCol == nil {
return infoschema.ErrColumnNotExists.GenWithStackByArgs(oldColName, ident.Name)
}
if oldColName.L == newColName.L {
return nil
}
if newColName.L == model.ExtraHandleName.L {
return dbterror.ErrWrongColumnName.GenWithStackByArgs(newColName.L)
}
allCols := tbl.Cols()
colWithNewNameAlreadyExist := table.FindCol(allCols, newColName.L) != nil
if colWithNewNameAlreadyExist {
return infoschema.ErrColumnExists.GenWithStackByArgs(newColName)
}
// Check generated expression.
for _, col := range allCols {
if col.GeneratedExpr == nil {
continue
}
dependedColNames := ddl.FindColumnNamesInExpr(col.GeneratedExpr.Internal())
for _, name := range dependedColNames {
if name.Name.L == oldColName.L {
if col.Hidden {
return dbterror.ErrDependentByFunctionalIndex.GenWithStackByArgs(oldColName.O)
}
return dbterror.ErrDependentByGeneratedColumn.GenWithStackByArgs(oldColName.O)
}
}
}
oldCol.Name = newColName
return nil
}
// alterColumn is used by AlterTable.
func (d *SchemaTracker) alterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
specNewColumn := spec.NewColumns[0]
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}
defer d.putTableIfNoError(err, ident.Schema, tblInfo)
t := tables.MockTableFromMeta(tblInfo)
colName := specNewColumn.Name.Name
// Check whether alter column has existed.
oldCol := table.FindCol(t.Cols(), colName.L)
if oldCol == nil {
return dbterror.ErrBadField.GenWithStackByArgs(colName, ident.Name)
}
// Clean the NoDefaultValueFlag value.
oldCol.DelFlag(mysql.NoDefaultValueFlag)
if len(specNewColumn.Options) == 0 {
oldCol.DefaultIsExpr = false
err = oldCol.SetDefaultValue(nil)
if err != nil {
return errors.Trace(err)
}
oldCol.AddFlag(mysql.NoDefaultValueFlag)
} else {
_, err := ddl.SetDefaultValue(ctx.GetExprCtx(), oldCol, specNewColumn.Options[0])
if err != nil {
return errors.Trace(err)
}
}
return nil
}
// modifyColumn is used by AlterTable.
func (d *SchemaTracker) modifyColumn(ctx context.Context, sctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
specNewColumn := spec.NewColumns[0]
if len(specNewColumn.Name.Schema.O) != 0 && ident.Schema.L != specNewColumn.Name.Schema.L {
return dbterror.ErrWrongDBName.GenWithStackByArgs(specNewColumn.Name.Schema.O)
}
if len(specNewColumn.Name.Table.O) != 0 && ident.Name.L != specNewColumn.Name.Table.L {
return dbterror.ErrWrongTableName.GenWithStackByArgs(specNewColumn.Name.Table.O)
}
return d.handleModifyColumn(ctx, sctx, ident, specNewColumn.Name.Name, spec)
}
// changeColumn is used by AlterTable.
func (d *SchemaTracker) changeColumn(ctx context.Context, sctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
specNewColumn := spec.NewColumns[0]
if len(specNewColumn.Name.Schema.O) != 0 && ident.Schema.L != specNewColumn.Name.Schema.L {
return dbterror.ErrWrongDBName.GenWithStackByArgs(specNewColumn.Name.Schema.O)
}
if len(spec.OldColumnName.Schema.O) != 0 && ident.Schema.L != spec.OldColumnName.Schema.L {
return dbterror.ErrWrongDBName.GenWithStackByArgs(spec.OldColumnName.Schema.O)
}
if len(specNewColumn.Name.Table.O) != 0 && ident.Name.L != specNewColumn.Name.Table.L {
return dbterror.ErrWrongTableName.GenWithStackByArgs(specNewColumn.Name.Table.O)
}
if len(spec.OldColumnName.Table.O) != 0 && ident.Name.L != spec.OldColumnName.Table.L {
return dbterror.ErrWrongTableName.GenWithStackByArgs(spec.OldColumnName.Table.O)
}
return d.handleModifyColumn(ctx, sctx, ident, spec.OldColumnName.Name, spec)
}
func (d *SchemaTracker) handleModifyColumn(
ctx context.Context,
sctx sessionctx.Context,
ident ast.Ident,
originalColName ast.CIStr,
spec *ast.AlterTableSpec,
) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}
defer d.putTableIfNoError(err, ident.Schema, tblInfo)
schema := d.SchemaByName(ident.Schema)
t := tables.MockTableFromMeta(tblInfo)
jobW, err := ddl.GetModifiableColumnJob(ctx, sctx, nil, ident, originalColName, schema, t, spec)
if err != nil {
if infoschema.ErrColumnNotExists.Equal(err) && spec.IfExists {
sctx.GetSessionVars().StmtCtx.AppendNote(infoschema.ErrColumnNotExists.FastGenByArgs(originalColName, ident.Name))
return nil
}
return errors.Trace(err)
}
args := jobW.JobArgs.(*model.ModifyColumnArgs)
newColInfo := args.Column
tblInfo.AutoRandomBits = args.NewShardBits
oldCol := table.FindCol(t.Cols(), originalColName.L).ColumnInfo
// replace old column and its related index column in-place.
newColInfo.ID = ddl.AllocateColumnID(tblInfo)
newColInfo.Offset = oldCol.Offset
tblInfo.Columns[oldCol.Offset] = newColInfo
indexesToChange := ddl.FindRelatedIndexesToChange(tblInfo, oldCol.Name)
for _, info := range indexesToChange {
ddl.UpdateIndexCol(info.IndexInfo.Columns[info.Offset], newColInfo)
}
destOffset, err := ddl.LocateOffsetToMove(newColInfo.Offset, spec.Position, tblInfo)
if err != nil {
return errors.Trace(err)
}
tblInfo.MoveColumnInfo(newColInfo.Offset, destOffset)
newColInfo.State = model.StatePublic
return nil
}
// renameIndex is used by AlterTable.
func (d *SchemaTracker) renameIndex(_ sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return err
}
defer d.putTableIfNoError(err, ident.Schema, tblInfo)
duplicate, err := ddl.ValidateRenameIndex(spec.FromKey, spec.ToKey, tblInfo)
if duplicate {
return nil
}
if err != nil {
return err
}
idx := tblInfo.FindIndexByName(spec.FromKey.L)
idx.Name = spec.ToKey
return nil
}
// addTablePartitions is used by AlterTable.
func (d *SchemaTracker) addTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(err)
}
defer d.putTableIfNoError(err, ident.Schema, tblInfo)
pi := tblInfo.GetPartitionInfo()
if pi == nil {
return errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned)
}
partInfo, err := ddl.BuildAddedPartitionInfo(ctx.GetExprCtx(), tblInfo, spec)
if err != nil {
return errors.Trace(err)
}
tblInfo.Partition.Definitions = append(tblInfo.Partition.Definitions, partInfo.Definitions...)
return nil
}
// dropTablePartitions is used by AlterTable.
func (d *SchemaTracker) dropTablePartitions(_ sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
tblInfo, err := d.TableClonedByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(err)
}
defer d.putTableIfNoError(err, ident.Schema, tblInfo)
pi := tblInfo.GetPartitionInfo()
if pi == nil {
return errors.Trace(dbterror.ErrPartitionMgmtOnNonpartitioned)
}
partNames := make([]string, len(spec.PartitionNames))
for i, partCIName := range spec.PartitionNames {
partNames[i] = partCIName.L
}
err = ddl.CheckDropTablePartition(tblInfo, partNames)
if err != nil {
if dbterror.ErrDropPartitionNonExistent.Equal(err) && spec.IfExists {
return nil
}
return errors.Trace(err)
}
newDefs := make([]model.PartitionDefinition, 0, len(tblInfo.Partition.Definitions)-len(partNames))
for _, def := range tblInfo.Partition.Definitions {
found := slices.Contains(partNames, def.Name.L)
if !found {
newDefs = append(newDefs, def)
}
}
tblInfo.Partition.Definitions = newDefs
return nil
}
// createPrimaryKey is used by AlterTable.
func (d *SchemaTracker) createPrimaryKey(
ctx sessionctx.Context,
ti ast.Ident,
indexName ast.CIStr,
indexPartSpecifications []*ast.IndexPartSpecification,
indexOption *ast.IndexOption,
) (err error) {
tblInfo, err := d.TableClonedByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(err)
}
defer d.putTableIfNoError(err, ti.Schema, tblInfo)
indexName = ast.NewCIStr(mysql.PrimaryKeyName)
if indexInfo := tblInfo.FindIndexByName(indexName.L); indexInfo != nil ||
// If the table's PKIsHandle is true, it also means that this table has a primary key.
tblInfo.PKIsHandle {
return infoschema.ErrMultiplePriKey
}
// Primary keys cannot include expression index parts. A primary key requires the generated column to be stored,
// but expression index parts are implemented as virtual generated columns, not stored generated columns.
for _, idxPart := range indexPartSpecifications {
if idxPart.Expr != nil {
return dbterror.ErrFunctionalIndexPrimaryKey
}
}
if _, err = ddl.CheckPKOnGeneratedColumn(tblInfo, indexPartSpecifications); err != nil {
return err
}
indexInfo, err := ddl.BuildIndexInfo(
ddl.NewMetaBuildContextWithSctx(ctx),
tblInfo,
indexName,
true,
true,
model.ColumnarIndexTypeNA,
indexPartSpecifications,
indexOption,
model.StatePublic,
)
if err != nil {
return errors.Trace(err)
}
indexInfo.ID = ddl.AllocateIndexID(tblInfo)
tblInfo.Indices = append(tblInfo.Indices, indexInfo)
// Set column index flag.
ddl.AddIndexColumnFlag(tblInfo, indexInfo)
if err = ddl.UpdateColsNull2NotNull(tblInfo, indexInfo); err != nil {
return errors.Trace(err)
}
return nil
}
// AlterTable implements the DDL interface.
func (d *SchemaTracker) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast.AlterTableStmt) (err error) {
validSpecs, err := ddl.ResolveAlterTableSpec(sctx, stmt.Specs)
if err != nil {
return errors.Trace(err)
}
// TODO: reorder specs to follow MySQL's order, drop index -> drop column -> rename column -> add column -> add index
// https://github.com/mysql/mysql-server/blob/8d8c986e5716e38cb776b627a8eee9e92241b4ce/sql/sql_table.cc#L16698-L16714
ident := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name}
tblInfo, err := d.TableByName(context.Background(), ident.Schema, ident.Name)
if err != nil {
return errors.Trace(err)
}
// atomicity for multi-schema change
oldTblInfo := tblInfo.Clone()
defer func() {
if err != nil {
_ = d.PutTable(ident.Schema, oldTblInfo)
}
}()
for _, spec := range validSpecs {
var handledCharsetOrCollate bool
switch spec.Tp {
case ast.AlterTableAddColumns:
err = d.addColumn(sctx, ident, spec)
case ast.AlterTableAddPartitions:
err = d.addTablePartitions(sctx, ident, spec)
case ast.AlterTableDropColumn:
err = d.dropColumn(sctx, ident, spec)
case ast.AlterTableDropIndex:
err = d.dropIndex(sctx, ident, ast.NewCIStr(spec.Name), spec.IfExists)
case ast.AlterTableDropPrimaryKey:
err = d.dropIndex(sctx, ident, ast.NewCIStr(mysql.PrimaryKeyName), spec.IfExists)
case ast.AlterTableRenameIndex:
err = d.renameIndex(sctx, ident, spec)
case ast.AlterTableDropPartition:
err = d.dropTablePartitions(sctx, ident, spec)
case ast.AlterTableAddConstraint:
constr := spec.Constraint
switch spec.Constraint.Tp {
case ast.ConstraintKey, ast.ConstraintIndex:
err = d.createIndex(sctx, ident, ast.IndexKeyTypeNone, ast.NewCIStr(constr.Name),
spec.Constraint.Keys, constr.Option, constr.IfNotExists)
case ast.ConstraintUniq, ast.ConstraintUniqIndex, ast.ConstraintUniqKey:
err = d.createIndex(sctx, ident, ast.IndexKeyTypeUnique, ast.NewCIStr(constr.Name),
spec.Constraint.Keys, constr.Option, false) // IfNotExists should be not applied
case ast.ConstraintPrimaryKey:
err = d.createPrimaryKey(sctx, ident, ast.NewCIStr(constr.Name), spec.Constraint.Keys, constr.Option)
case ast.ConstraintForeignKey,
ast.ConstraintFulltext,
ast.ConstraintCheck:
default:
// Nothing to do now.
}
case ast.AlterTableModifyColumn:
err = d.modifyColumn(ctx, sctx, ident, spec)
case ast.AlterTableChangeColumn:
err = d.changeColumn(ctx, sctx, ident, spec)
case ast.AlterTableRenameColumn:
err = d.renameColumn(sctx, ident, spec)
case ast.AlterTableAlterColumn:
err = d.alterColumn(sctx, ident, spec)
case ast.AlterTableRenameTable:
newIdent := ast.Ident{Schema: spec.NewTable.Schema, Name: spec.NewTable.Name}
err = d.renameTable(sctx, []ast.Ident{ident}, []ast.Ident{newIdent}, true)
case ast.AlterTableOption:
for i, opt := range spec.Options {
switch opt.Tp {
case ast.TableOptionShardRowID:
case ast.TableOptionAutoIncrement:
case ast.TableOptionAutoIdCache:
case ast.TableOptionAutoRandomBase:
case ast.TableOptionComment:
tblInfo = tblInfo.Clone()
tblInfo.Comment = opt.StrValue
_ = d.PutTable(ident.Schema, tblInfo)
case ast.TableOptionCharset, ast.TableOptionCollate:
// getCharsetAndCollateInTableOption will get the last charset and collate in the options,
// so it should be handled only once.
if handledCharsetOrCollate {
continue
}
var toCharset, toCollate string
toCharset, toCollate, err = ddl.GetCharsetAndCollateInTableOption(i, spec.Options, sctx.GetSessionVars().DefaultCollationForUTF8MB4)
if err != nil {
return err
}
needsOverwriteCols := ddl.NeedToOverwriteColCharset(spec.Options)
tblInfo = tblInfo.Clone()
if toCharset != "" {
tblInfo.Charset = toCharset
}
if toCollate != "" {
tblInfo.Collate = toCollate
}
if needsOverwriteCols {
// update column charset.
for _, col := range tblInfo.Columns {
if field_types.HasCharset(&col.FieldType) {
col.SetCharset(toCharset)
col.SetCollate(toCollate)
} else {
col.SetCharset(charset.CharsetBin)
col.SetCollate(charset.CharsetBin)
}
}
}
_ = d.PutTable(ident.Schema, tblInfo)
handledCharsetOrCollate = true
case ast.TableOptionPlacementPolicy:
case ast.TableOptionEngine:
default:
err = dbterror.ErrUnsupportedAlterTableOption
}
if err != nil {
return errors.Trace(err)
}
}
case ast.AlterTableIndexInvisible:
tblInfo = tblInfo.Clone()
idx := tblInfo.FindIndexByName(spec.IndexName.L)
if idx == nil {
return errors.Trace(infoschema.ErrKeyNotExists.GenWithStackByArgs(spec.IndexName.O, ident.Name))
}
idx.Invisible = spec.Visibility == ast.IndexVisibilityInvisible
_ = d.PutTable(ident.Schema, tblInfo)
case ast.AlterTablePartitionOptions,
ast.AlterTableDropForeignKey,
ast.AlterTableCoalescePartitions,
ast.AlterTableReorganizePartition,
ast.AlterTableCheckPartitions,
ast.AlterTableRebuildPartition,
ast.AlterTableOptimizePartition,
ast.AlterTableRemovePartitioning,
ast.AlterTableRepairPartition,
ast.AlterTableTruncatePartition,
ast.AlterTableWriteable,
ast.AlterTableExchangePartition,
ast.AlterTablePartition,
ast.AlterTableSetTiFlashReplica,
ast.AlterTableOrderByColumns,
ast.AlterTableAlterCheck,
ast.AlterTableDropCheck,
ast.AlterTableWithValidation,
ast.AlterTableWithoutValidation,
ast.AlterTableAddStatistics,
ast.AlterTableDropStatistics,
ast.AlterTableAttributes,
ast.AlterTablePartitionAttributes,
ast.AlterTableCache,
ast.AlterTableNoCache:
default:
// Nothing to do now.
}
if err != nil {
return errors.Trace(err)
}
}
return err
}
// TruncateTable implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) TruncateTable(_ sessionctx.Context, _ ast.Ident) error {
return nil
}
// RenameTable implements the DDL interface.
func (d *SchemaTracker) RenameTable(ctx sessionctx.Context, stmt *ast.RenameTableStmt) error {
oldIdents := make([]ast.Ident, 0, len(stmt.TableToTables))
newIdents := make([]ast.Ident, 0, len(stmt.TableToTables))
for _, tablePair := range stmt.TableToTables {
oldIdent := ast.Ident{Schema: tablePair.OldTable.Schema, Name: tablePair.OldTable.Name}
newIdent := ast.Ident{Schema: tablePair.NewTable.Schema, Name: tablePair.NewTable.Name}
oldIdents = append(oldIdents, oldIdent)
newIdents = append(newIdents, newIdent)
}
return d.renameTable(ctx, oldIdents, newIdents, false)
}
// renameTable is used by RenameTable and AlterTable.
func (d *SchemaTracker) renameTable(_ sessionctx.Context, oldIdents, newIdents []ast.Ident, isAlterTable bool) error {
tablesCache := make(map[string]int64)
is := InfoStoreAdaptor{inner: d.InfoStore}
for i := range oldIdents {
schema, _, err := ddl.ExtractTblInfos(is, oldIdents[i], newIdents[i], isAlterTable, tablesCache)
if err != nil {
return err
}
// no-op for ALTER TABLE RENAME t1 TO T1
if schema == nil && isAlterTable {
return nil
}
}
for i := range oldIdents {
tableInfo, err := d.TableByName(context.Background(), oldIdents[i].Schema, oldIdents[i].Name)
if err != nil {
return err
}
if err = d.DeleteTable(oldIdents[i].Schema, oldIdents[i].Name); err != nil {
return err
}
tableInfo.Name = newIdents[i].Name
if err = d.PutTable(newIdents[i].Schema, tableInfo); err != nil {
return err
}
}
return nil
}
// LockTables implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) LockTables(_ sessionctx.Context, _ *ast.LockTablesStmt) error {
return nil
}
// UnlockTables implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) UnlockTables(_ sessionctx.Context, _ []model.TableLockTpInfo) error {
return nil
}
// AlterTableMode implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) AlterTableMode(_ sessionctx.Context, _ *model.AlterTableModeArgs) error {
return nil
}
// CleanupTableLock implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) CleanupTableLock(_ sessionctx.Context, _ []*ast.TableName) error {
return nil
}
// UpdateTableReplicaInfo implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) UpdateTableReplicaInfo(_ sessionctx.Context, _ int64, _ bool) error {
return nil
}
// RepairTable implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) RepairTable(_ sessionctx.Context, _ *ast.CreateTableStmt) error {
return nil
}
// CreateSequence implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) CreateSequence(_ sessionctx.Context, _ *ast.CreateSequenceStmt) error {
return nil
}
// DropSequence implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) DropSequence(_ sessionctx.Context, _ *ast.DropSequenceStmt) (err error) {
return nil
}
// AlterSequence implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) AlterSequence(_ sessionctx.Context, _ *ast.AlterSequenceStmt) error {
return nil
}
// CreatePlacementPolicy implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) CreatePlacementPolicy(_ sessionctx.Context, _ *ast.CreatePlacementPolicyStmt) error {
return nil
}
// DropPlacementPolicy implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) DropPlacementPolicy(_ sessionctx.Context, _ *ast.DropPlacementPolicyStmt) error {
return nil
}
// AlterPlacementPolicy implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) AlterPlacementPolicy(_ sessionctx.Context, _ *ast.AlterPlacementPolicyStmt) error {
return nil
}
// AddResourceGroup implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) AddResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error {
return nil
}
// DropResourceGroup implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) DropResourceGroup(_ sessionctx.Context, _ *ast.DropResourceGroupStmt) error {
return nil
}
// AlterResourceGroup implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) AlterResourceGroup(_ sessionctx.Context, _ *ast.AlterResourceGroupStmt) error {
return nil
}
// BatchCreateTableWithInfo implements the DDL interface, it will call CreateTableWithInfo for each table.
func (d *SchemaTracker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema ast.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableOption) error {
for _, tableInfo := range info {
if err := d.CreateTableWithInfo(ctx, schema, tableInfo, nil, cs...); err != nil {
return err
}
}
return nil
}
// CreatePlacementPolicyWithInfo implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) CreatePlacementPolicyWithInfo(_ sessionctx.Context, _ *model.PolicyInfo, _ ddl.OnExist) error {
return nil
}
// RefreshMeta implements the DDL interface, it's no-op in DM's case.
func (*SchemaTracker) RefreshMeta(_ sessionctx.Context, _ *model.RefreshMetaArgs) error {
return nil
}