2203 lines
82 KiB
Go
2203 lines
82 KiB
Go
// Copyright 2024 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ddl
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/ddl/logutil"
|
|
"github.com/pingcap/tidb/pkg/ddl/notifier"
|
|
sess "github.com/pingcap/tidb/pkg/ddl/session"
|
|
"github.com/pingcap/tidb/pkg/errctx"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/expression/exprctx"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/meta"
|
|
"github.com/pingcap/tidb/pkg/meta/autoid"
|
|
"github.com/pingcap/tidb/pkg/meta/metabuild"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/parser"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/charset"
|
|
"github.com/pingcap/tidb/pkg/parser/format"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
"github.com/pingcap/tidb/pkg/table/tables"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/collate"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror"
|
|
"github.com/pingcap/tidb/pkg/util/filter"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func hasModifyFlag(col *model.ColumnInfo) bool {
|
|
return col.ChangingFieldType != nil || mysql.HasPreventNullInsertFlag(col.GetFlag())
|
|
}
|
|
|
|
func isNullToNotNullChange(oldCol, newCol *model.ColumnInfo) bool {
|
|
return !mysql.HasNotNullFlag(oldCol.GetFlag()) && mysql.HasNotNullFlag(newCol.GetFlag())
|
|
}
|
|
|
|
func isIntegerChange(from, to *model.ColumnInfo) bool {
|
|
return mysql.IsIntegerType(from.GetType()) && mysql.IsIntegerType(to.GetType())
|
|
}
|
|
|
|
func isCharChange(from, to *model.ColumnInfo) bool {
|
|
return types.IsTypeChar(from.GetType()) && types.IsTypeChar(to.GetType())
|
|
}
|
|
|
|
// getModifyColumnType gets the modify column type.
|
|
// 1. ModifyTypeNoReorg: The range of new type is a superset of the old type
|
|
// 2. ModifyTypeNoReorgWithCheck: The range of new type is a subset of the old type, but we are running in strict SQL mode.
|
|
// And there is no index on this column.
|
|
// The difference between ModifyTypeNoReorgWithCheck and ModifyTypeNoReorg is that we need some additional checks.
|
|
// 3. ModifyTypeIndexReorg: The range of new type is a subset of the old type, and there are indexes on this column.
|
|
// 4. ModifyTypeReorg: other
|
|
//
|
|
// We need to ensure it's compatible with job submitted from older version of TiDB.
|
|
func getModifyColumnType(
|
|
args *model.ModifyColumnArgs,
|
|
tblInfo *model.TableInfo,
|
|
oldCol *model.ColumnInfo,
|
|
sqlMode mysql.SQLMode) byte {
|
|
newCol := args.Column
|
|
if noReorgDataStrict(tblInfo, oldCol, args.Column) {
|
|
// It's not NULL->NOTNULL change
|
|
if !isNullToNotNullChange(oldCol, newCol) {
|
|
return model.ModifyTypeNoReorg
|
|
}
|
|
return model.ModifyTypeNoReorgWithCheck
|
|
}
|
|
|
|
// For backward compatibility
|
|
if args.ModifyColumnType == mysql.TypeNull {
|
|
return model.ModifyTypeReorg
|
|
}
|
|
|
|
// FIXME(joechenrh): handle partition and TiFlash replica case
|
|
if tblInfo.Partition != nil || (tblInfo.TiFlashReplica != nil && tblInfo.TiFlashReplica.Count > 0) {
|
|
return model.ModifyTypeReorg
|
|
}
|
|
|
|
failpoint.Inject("disableLossyDDLOptimization", func(val failpoint.Value) {
|
|
if v, ok := val.(bool); ok && v {
|
|
failpoint.Return(model.ModifyTypeReorg)
|
|
}
|
|
})
|
|
|
|
// FIXME(joechenrh): remove this when stats correctness is resolved.
|
|
// Since stats may store bytes encoded by codec.EncodeKey, we should disable the optimization
|
|
// if the same data produces different encoded bytes for the old and new types.
|
|
if (isIntegerChange(oldCol, args.Column) &&
|
|
mysql.HasUnsignedFlag(oldCol.GetFlag()) != mysql.HasUnsignedFlag(args.Column.GetFlag())) ||
|
|
(isCharChange(oldCol, args.Column) &&
|
|
!collate.CompatibleCollate(oldCol.GetCollate(), args.Column.GetCollate())) {
|
|
return model.ModifyTypeReorg
|
|
}
|
|
|
|
if !sqlMode.HasStrictMode() {
|
|
return model.ModifyTypeReorg
|
|
}
|
|
|
|
if needRowReorg(oldCol, args.Column) {
|
|
return model.ModifyTypeReorg
|
|
}
|
|
|
|
relatedIndexes := getRelatedIndexIDs(tblInfo, oldCol.ID, false)
|
|
if len(relatedIndexes) == 0 || !needIndexReorg(oldCol, args.Column) {
|
|
return model.ModifyTypeNoReorgWithCheck
|
|
}
|
|
return model.ModifyTypeIndexReorg
|
|
}
|
|
|
|
func getChangingCol(
|
|
args *model.ModifyColumnArgs,
|
|
tblInfo *model.TableInfo,
|
|
oldCol *model.ColumnInfo,
|
|
) (*model.ColumnInfo, error) {
|
|
changingCol := args.ChangingColumn
|
|
if changingCol == nil {
|
|
changingCol = args.Column.Clone()
|
|
changingCol.Name = ast.NewCIStr(model.GenUniqueChangingColumnName(tblInfo, oldCol))
|
|
changingCol.ChangeStateInfo = &model.ChangeStateInfo{DependencyColumnOffset: oldCol.Offset}
|
|
InitAndAddColumnToTable(tblInfo, changingCol)
|
|
|
|
var redundantIdxs []int64
|
|
indexesToChange := FindRelatedIndexesToChange(tblInfo, oldCol.Name)
|
|
for _, info := range indexesToChange {
|
|
newIdxID := AllocateIndexID(tblInfo)
|
|
if !info.isTemp {
|
|
// We create a temp index for each normal index.
|
|
tmpIdx := info.IndexInfo.Clone()
|
|
tmpIdx.ID = newIdxID
|
|
tmpIdx.Name = ast.NewCIStr(model.GenUniqueChangingIndexName(tblInfo, info.IndexInfo))
|
|
UpdateIndexCol(tmpIdx.Columns[info.Offset], changingCol)
|
|
tblInfo.Indices = append(tblInfo.Indices, tmpIdx)
|
|
} else {
|
|
// The index is a temp index created by previous modify column job(s).
|
|
// We can overwrite it to reduce reorg cost, because it will be dropped eventually.
|
|
tmpIdx := info.IndexInfo
|
|
oldTempIdxID := tmpIdx.ID
|
|
tmpIdx.ID = newIdxID
|
|
UpdateIndexCol(tmpIdx.Columns[info.Offset], changingCol)
|
|
redundantIdxs = append(redundantIdxs, oldTempIdxID)
|
|
}
|
|
}
|
|
args.RedundantIdxs = redundantIdxs
|
|
return changingCol, nil
|
|
}
|
|
|
|
changingCol = model.FindColumnInfoByID(tblInfo.Columns, args.ChangingColumn.ID)
|
|
if changingCol == nil {
|
|
logutil.DDLLogger().Error("the changing column has been removed")
|
|
return nil, errors.Trace(infoschema.ErrColumnNotExists.GenWithStackByArgs(oldCol.Name, tblInfo.Name))
|
|
}
|
|
|
|
return changingCol, nil
|
|
}
|
|
|
|
func initializeChangingIndexes(
|
|
args *model.ModifyColumnArgs,
|
|
tblInfo *model.TableInfo,
|
|
oldCol *model.ColumnInfo,
|
|
) error {
|
|
if args.ChangingColumn != nil {
|
|
return nil
|
|
}
|
|
|
|
if err := checkColumnAlreadyExists(tblInfo, args); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
tmpCol := oldCol.Clone()
|
|
tmpCol.FieldType = args.Column.FieldType
|
|
|
|
var redundantIdxs []int64
|
|
indexesToChange := FindRelatedIndexesToChange(tblInfo, oldCol.Name)
|
|
for _, info := range indexesToChange {
|
|
newIdxID := AllocateIndexID(tblInfo)
|
|
if !info.isTemp {
|
|
// We create a temp index for each normal index.
|
|
tmpIdx := info.IndexInfo.Clone()
|
|
tmpIdx.State = model.StateNone
|
|
tmpIdx.ID = newIdxID
|
|
tmpIdx.Name = ast.NewCIStr(model.GenUniqueChangingIndexName(tblInfo, info.IndexInfo))
|
|
tmpIdx.Columns[info.Offset].UseChangingType = true
|
|
UpdateIndexCol(tmpIdx.Columns[info.Offset], tmpCol)
|
|
tblInfo.Indices = append(tblInfo.Indices, tmpIdx)
|
|
} else {
|
|
// The index is a temp index created by previous modify column job(s).
|
|
// We can just allocate a new ID, and gc the old one later.
|
|
tmpIdx := info.IndexInfo
|
|
tmpIdx.State = model.StateNone
|
|
oldTempIdxID := tmpIdx.ID
|
|
tmpIdx.ID = newIdxID
|
|
tmpIdx.Columns[info.Offset].UseChangingType = true
|
|
UpdateIndexCol(tmpIdx.Columns[info.Offset], tmpCol)
|
|
redundantIdxs = append(redundantIdxs, oldTempIdxID)
|
|
}
|
|
}
|
|
args.RedundantIdxs = redundantIdxs
|
|
args.ChangingColumn = args.Column.Clone()
|
|
return nil
|
|
}
|
|
|
|
func (w *worker) onModifyColumn(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetModifyColumnArgs(job)
|
|
defer func() {
|
|
failpoint.InjectCall("getModifyColumnType", args.ModifyColumnType)
|
|
}()
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
dbInfo, tblInfo, oldCol, err := getModifyColumnInfo(jobCtx.metaMut, job, args)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// Check index prefix length for the first time
|
|
if job.SchemaState == model.StateNone {
|
|
columns := getReplacedColumns(tblInfo, oldCol, args.Column)
|
|
allIdxs := buildRelatedIndexInfos(tblInfo, oldCol.ID)
|
|
for _, idx := range allIdxs {
|
|
if err := checkIndexInModifiableColumns(columns, idx); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// For first time running this job, or the job from older version,
|
|
// we need to fill the ModifyColumnType.
|
|
if args.ModifyColumnType == model.ModifyTypeNone ||
|
|
args.ModifyColumnType == mysql.TypeNull {
|
|
// Check the modify types again, as the type may be changed by other no-reorg modify-column jobs.
|
|
// For example, there are two DDLs submitted in parallel:
|
|
// CREATE TABLE t (c VARCHAR(255) charset utf8);
|
|
// ALTER TABLE t MODIFY COLUMN c VARCHAR(255) charset utf8mb4;
|
|
// ALTER TABLE t MODIFY COLUMN c VARCHAR(100) charset utf8;
|
|
// Previously, the second DDL will be submitted successfully (VARCHAR(255) utf8 -> VARCHAR(100) utf8 is OK)
|
|
// but fail during execution, since the columnID has changed. However, as we now may reuse the old column,
|
|
// we must check the type again here as utf8mb4->utf8 is an invalid change.
|
|
if err = checkModifyTypes(oldCol, args.Column, isColumnWithIndex(oldCol.Name.L, tblInfo.Indices)); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
args.ModifyColumnType = getModifyColumnType(args, tblInfo, oldCol, job.SQLMode)
|
|
logutil.DDLLogger().Info("get type for modify column",
|
|
zap.String("query", job.Query),
|
|
zap.String("oldColumnName", args.OldColumnName.L),
|
|
zap.Int64("oldColumnID", oldCol.ID),
|
|
zap.String("type", model.ModifyTypeToString(args.ModifyColumnType)),
|
|
)
|
|
if oldCol.GetType() == mysql.TypeVarchar && args.Column.GetType() == mysql.TypeString &&
|
|
(args.ModifyColumnType == model.ModifyTypeNoReorgWithCheck ||
|
|
args.ModifyColumnType == model.ModifyTypeIndexReorg) {
|
|
logutil.DDLLogger().Info("meet varchar to char modify column, change type to precheck")
|
|
args.ModifyColumnType = model.ModifyTypePrecheck
|
|
}
|
|
}
|
|
|
|
if job.IsRollingback() {
|
|
switch args.ModifyColumnType {
|
|
case model.ModifyTypePrecheck, model.ModifyTypeNoReorg, model.ModifyTypeNoReorgWithCheck:
|
|
// For those column-type-change jobs which don't need reorg, or checking.
|
|
return rollbackModifyColumnJob(jobCtx, tblInfo, job, args.Column, oldCol)
|
|
case model.ModifyTypeReorg:
|
|
// For those column-type-change jobs which need reorg.
|
|
return rollbackModifyColumnJobWithReorg(jobCtx, tblInfo, job, oldCol, args)
|
|
case model.ModifyTypeIndexReorg:
|
|
// For those column-type-change jobs which reorg the index only.
|
|
return rollbackModifyColumnJobWithIndexReorg(jobCtx, tblInfo, job, oldCol, args)
|
|
}
|
|
}
|
|
|
|
// Do some checks for all modify column types.
|
|
err = checkAndApplyAutoRandomBits(jobCtx, dbInfo, tblInfo, oldCol, args.Column, args.NewShardBits)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if err = checkModifyTypes(oldCol, args.Column, isColumnWithIndex(oldCol.Name.L, tblInfo.Indices)); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if err = checkColumnReferencedByPartialCondition(tblInfo, args.Column.Name); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if args.ChangingColumn == nil {
|
|
if err := checkColumnAlreadyExists(tblInfo, args); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
switch args.ModifyColumnType {
|
|
case model.ModifyTypePrecheck:
|
|
return w.precheckForVarcharToChar(jobCtx, job, args, dbInfo, tblInfo, oldCol)
|
|
case model.ModifyTypeNoReorgWithCheck:
|
|
return w.doModifyColumnWithCheck(jobCtx, job, dbInfo, tblInfo, args.Column, oldCol, args.Position)
|
|
case model.ModifyTypeNoReorg:
|
|
return w.doModifyColumnNoCheck(jobCtx, job, tblInfo, args.Column, oldCol, args.Position)
|
|
}
|
|
|
|
// Checks for ModifyTypeReorg and ModifyTypeIndexReorg.
|
|
if err = isGeneratedRelatedColumn(tblInfo, args.Column, oldCol); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
if isColumnarIndexColumn(tblInfo, oldCol) {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("columnar indexes on the column"))
|
|
}
|
|
if mysql.HasPriKeyFlag(oldCol.GetFlag()) {
|
|
job.State = model.JobStateCancelled
|
|
return ver, dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("can't modify column in primary key")
|
|
}
|
|
|
|
if job.SchemaState == model.StateNone {
|
|
err = postCheckPartitionModifiableColumn(w, tblInfo, oldCol, args.Column)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, err
|
|
}
|
|
}
|
|
|
|
defer checkTableInfo(tblInfo)
|
|
|
|
if args.ModifyColumnType == model.ModifyTypeIndexReorg {
|
|
if err := initializeChangingIndexes(args, tblInfo, oldCol); err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
return w.doModifyColumnIndexReorg(
|
|
jobCtx, job, dbInfo, tblInfo, oldCol, args)
|
|
}
|
|
|
|
changingCol, err := getChangingCol(args, tblInfo, oldCol)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
return w.doModifyColumnTypeWithData(
|
|
jobCtx, job, dbInfo, tblInfo, changingCol, oldCol, args)
|
|
}
|
|
|
|
func checkColumnAlreadyExists(tblInfo *model.TableInfo, args *model.ModifyColumnArgs) error {
|
|
// If we want to rename the column name, we need to check whether it already exists.
|
|
if args.Column.Name.L != args.OldColumnName.L {
|
|
c := model.FindColumnInfo(tblInfo.Columns, args.Column.Name.L)
|
|
if c != nil {
|
|
return errors.Trace(infoschema.ErrColumnExists.GenWithStackByArgs(args.Column.Name))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// rollbackModifyColumnJob rollbacks the job who doesn't need to reorg the data.
|
|
func rollbackModifyColumnJob(
|
|
jobCtx *jobContext, tblInfo *model.TableInfo, job *model.Job,
|
|
newCol, oldCol *model.ColumnInfo) (ver int64, err error) {
|
|
if oldCol.ID == newCol.ID {
|
|
// Clean the flag info in oldCol.
|
|
if hasModifyFlag(tblInfo.Columns[oldCol.Offset]) {
|
|
tblInfo.Columns[oldCol.Offset].DelFlag(mysql.PreventNullInsertFlag)
|
|
tblInfo.Columns[oldCol.Offset].DelFlag(mysql.NotNullFlag)
|
|
tblInfo.Columns[oldCol.Offset].ChangingFieldType = nil
|
|
}
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
|
|
// For those column changes which doesn't need reorg data, we should also mock empty args for delete range.
|
|
job.FillFinishedArgs(&model.ModifyColumnArgs{})
|
|
return ver, nil
|
|
}
|
|
|
|
// rollbackModifyColumnJobWithReorg is used to rollback modify-column job which need to
|
|
// reorg both the row and index data.
|
|
func rollbackModifyColumnJobWithReorg(
|
|
jobCtx *jobContext, tblInfo *model.TableInfo, job *model.Job,
|
|
oldCol *model.ColumnInfo, args *model.ModifyColumnArgs) (ver int64, err error) {
|
|
// Clean the flag info in oldCol.
|
|
if hasModifyFlag(tblInfo.Columns[oldCol.Offset]) {
|
|
tblInfo.Columns[oldCol.Offset].DelFlag(mysql.PreventNullInsertFlag)
|
|
tblInfo.Columns[oldCol.Offset].DelFlag(mysql.NotNullFlag)
|
|
tblInfo.Columns[oldCol.Offset].ChangingFieldType = nil
|
|
}
|
|
|
|
var changingIdxIDs []int64
|
|
if args.ChangingColumn != nil {
|
|
changingIdxIDs = getRelatedIndexIDs(tblInfo, args.ChangingColumn.ID, job.ReorgMeta.ReorgTp.NeedMergeProcess())
|
|
// The job is in the middle state. The appended changingCol and changingIndex should
|
|
// be removed from the tableInfo as well.
|
|
removeChangingColAndIdxs(tblInfo, args.ChangingColumn.ID)
|
|
}
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
|
|
args.IndexIDs = changingIdxIDs
|
|
args.PartitionIDs = getPartitionIDs(tblInfo)
|
|
job.FillFinishedArgs(args)
|
|
return ver, nil
|
|
}
|
|
|
|
// rollbackModifyColumnJobWithIndexReorg is used to rollback modify-column job which
|
|
// only need to reorg the index.
|
|
func rollbackModifyColumnJobWithIndexReorg(
|
|
jobCtx *jobContext, tblInfo *model.TableInfo, job *model.Job,
|
|
oldCol *model.ColumnInfo, args *model.ModifyColumnArgs) (ver int64, err error) {
|
|
// Clean the flag info in oldCol.
|
|
if hasModifyFlag(tblInfo.Columns[oldCol.Offset]) {
|
|
tblInfo.Columns[oldCol.Offset].DelFlag(mysql.PreventNullInsertFlag)
|
|
tblInfo.Columns[oldCol.Offset].DelFlag(mysql.NotNullFlag)
|
|
tblInfo.Columns[oldCol.Offset].ChangingFieldType = nil
|
|
}
|
|
|
|
allIdxs := buildRelatedIndexInfos(tblInfo, oldCol.ID)
|
|
changingIdxInfos := make([]*model.IndexInfo, 0, len(allIdxs)/2)
|
|
for _, idx := range allIdxs {
|
|
for _, idxCol := range idx.Columns {
|
|
if idxCol.Name.L == oldCol.Name.L && idxCol.UseChangingType {
|
|
changingIdxInfos = append(changingIdxInfos, idx)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
for _, idx := range changingIdxInfos {
|
|
args.IndexIDs = append(args.IndexIDs, idx.ID)
|
|
}
|
|
args.IndexIDs = append(args.IndexIDs, getIngestTempIndexIDs(job, changingIdxInfos)...)
|
|
removeOldIndexes(tblInfo, changingIdxInfos)
|
|
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
|
|
args.PartitionIDs = getPartitionIDs(tblInfo)
|
|
job.FillFinishedArgs(args)
|
|
return ver, nil
|
|
}
|
|
|
|
func getModifyColumnInfo(
|
|
t *meta.Mutator, job *model.Job, args *model.ModifyColumnArgs,
|
|
) (*model.DBInfo, *model.TableInfo, *model.ColumnInfo, error) {
|
|
dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
|
|
if err != nil {
|
|
return nil, nil, nil, errors.Trace(err)
|
|
}
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
|
|
if err != nil {
|
|
return nil, nil, nil, errors.Trace(err)
|
|
}
|
|
|
|
var oldCol *model.ColumnInfo
|
|
// Use column ID to locate the old column.
|
|
// It is persisted to job arguments after the first execution.
|
|
if args.OldColumnID > 0 {
|
|
oldCol = model.FindColumnInfoByID(tblInfo.Columns, args.OldColumnID)
|
|
} else {
|
|
// Lower version TiDB doesn't persist the old column ID to job arguments.
|
|
// We have to use the old column name to locate the old column.
|
|
oldCol = model.FindColumnInfo(tblInfo.Columns, model.GenRemovingObjName(args.OldColumnName.L))
|
|
if oldCol == nil {
|
|
// The old column maybe not in removing state.
|
|
oldCol = model.FindColumnInfo(tblInfo.Columns, args.OldColumnName.L)
|
|
}
|
|
if oldCol != nil {
|
|
args.OldColumnID = oldCol.ID
|
|
logutil.DDLLogger().Info("run modify column job, find old column by name",
|
|
zap.Int64("jobID", job.ID),
|
|
zap.String("oldColumnName", args.OldColumnName.L),
|
|
zap.Int64("oldColumnID", oldCol.ID),
|
|
)
|
|
}
|
|
}
|
|
if oldCol == nil {
|
|
job.State = model.JobStateCancelled
|
|
return nil, nil, nil, errors.Trace(infoschema.ErrColumnNotExists.GenWithStackByArgs(args.OldColumnName, tblInfo.Name))
|
|
}
|
|
return dbInfo, tblInfo, oldCol, errors.Trace(err)
|
|
}
|
|
|
|
func finishModifyColumnWithoutReorg(
|
|
jobCtx *jobContext,
|
|
job *model.Job,
|
|
tblInfo *model.TableInfo,
|
|
newCol, oldCol *model.ColumnInfo,
|
|
pos *ast.ColumnPosition,
|
|
) (ver int64, _ error) {
|
|
if err := adjustTableInfoAfterModifyColumn(tblInfo, newCol, oldCol, pos); err != nil {
|
|
job.State = model.JobStateRollingback
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
childTableInfos, err := adjustForeignKeyChildTableInfoAfterModifyColumn(jobCtx.infoCache, jobCtx.metaMut, job, tblInfo, newCol, oldCol)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true, childTableInfos...)
|
|
if err != nil {
|
|
// Modified the type definition of 'null' to 'not null' before this, so rollBack the job when an error occurs.
|
|
job.State = model.JobStateRollingback
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
// For those column-type-change type which doesn't need reorg data, we should also mock the job args for delete range.
|
|
job.FillFinishedArgs(&model.ModifyColumnArgs{})
|
|
return ver, nil
|
|
}
|
|
|
|
// doModifyColumnNoCheck updates the column information and reorders all columns. It does not support modifying column data.
|
|
func (*worker) doModifyColumnNoCheck(
|
|
jobCtx *jobContext,
|
|
job *model.Job,
|
|
tblInfo *model.TableInfo,
|
|
newCol, oldCol *model.ColumnInfo,
|
|
pos *ast.ColumnPosition,
|
|
) (ver int64, _ error) {
|
|
if oldCol.ID != newCol.ID {
|
|
job.State = model.JobStateRollingback
|
|
return ver, dbterror.ErrColumnInChange.GenWithStackByArgs(oldCol.Name, newCol.ID)
|
|
}
|
|
|
|
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
|
|
job.MarkNonRevertible()
|
|
// Store the mark and enter the next DDL handling loop.
|
|
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, false)
|
|
}
|
|
|
|
return finishModifyColumnWithoutReorg(jobCtx, job, tblInfo, newCol, oldCol, pos)
|
|
}
|
|
|
|
// precheckForVarcharToChar updates the column information and reorders all columns with data check.
|
|
func (w *worker) precheckForVarcharToChar(
|
|
jobCtx *jobContext,
|
|
job *model.Job,
|
|
args *model.ModifyColumnArgs,
|
|
dbInfo *model.DBInfo,
|
|
tblInfo *model.TableInfo,
|
|
oldCol *model.ColumnInfo,
|
|
) (ver int64, _ error) {
|
|
newCol := args.Column
|
|
if oldCol.ID != newCol.ID {
|
|
job.State = model.JobStateRollingback
|
|
return ver, dbterror.ErrColumnInChange.GenWithStackByArgs(oldCol.Name, newCol.ID)
|
|
}
|
|
|
|
// For the first time we get here, just add the flag
|
|
// and check the existing data in the next round.
|
|
if !hasModifyFlag(oldCol) {
|
|
if isNullToNotNullChange(oldCol, newCol) {
|
|
oldCol.AddFlag(mysql.PreventNullInsertFlag)
|
|
}
|
|
if !noReorgDataStrict(tblInfo, oldCol, newCol) {
|
|
oldCol.ChangingFieldType = &newCol.FieldType
|
|
}
|
|
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
|
|
}
|
|
|
|
// Second time we get here, do the data check.
|
|
checked, err := checkModifyColumnData(
|
|
jobCtx.stepCtx, w,
|
|
dbInfo.Name, tblInfo.Name,
|
|
oldCol, newCol, true)
|
|
|
|
// If existing data is in range, we continue running without row reorg.
|
|
// The flag is retained, so string with trailing spaces still can't be inserted.
|
|
if err == nil {
|
|
args.ModifyColumnType = getModifyColumnType(args, tblInfo, oldCol, job.SQLMode)
|
|
logutil.DDLLogger().Info("precheck done, change modify type",
|
|
zap.String("query", job.Query),
|
|
zap.String("oldColumnName", args.OldColumnName.L),
|
|
zap.Int64("oldColumnID", oldCol.ID),
|
|
zap.String("type", model.ModifyTypeToString(args.ModifyColumnType)),
|
|
)
|
|
return ver, nil
|
|
}
|
|
|
|
// Meet error during checking, let outer side handle it.
|
|
if !checked {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// Fallback to normal reorg type.
|
|
oldCol.DelFlag(mysql.PreventNullInsertFlag)
|
|
oldCol.ChangingFieldType = nil
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
logutil.DDLLogger().Info("precheck done, fallback to normal reorg",
|
|
zap.String("query", job.Query),
|
|
zap.String("oldColumnName", args.OldColumnName.L),
|
|
zap.Int64("oldColumnID", oldCol.ID),
|
|
)
|
|
args.ModifyColumnType = model.ModifyTypeReorg
|
|
return ver, nil
|
|
}
|
|
|
|
// doModifyColumnWithCheck updates the column information and reorders all columns with data check.
|
|
func (w *worker) doModifyColumnWithCheck(
|
|
jobCtx *jobContext,
|
|
job *model.Job,
|
|
dbInfo *model.DBInfo,
|
|
tblInfo *model.TableInfo,
|
|
newCol, oldCol *model.ColumnInfo,
|
|
pos *ast.ColumnPosition,
|
|
) (ver int64, _ error) {
|
|
if oldCol.ID != newCol.ID {
|
|
job.State = model.JobStateRollingback
|
|
return ver, dbterror.ErrColumnInChange.GenWithStackByArgs(oldCol.Name, newCol.ID)
|
|
}
|
|
|
|
// For the first time we get here, just add the flag
|
|
// and check the existing data in the next round.
|
|
if !hasModifyFlag(oldCol) {
|
|
failpoint.InjectCall("beforeDoModifyColumnSkipReorgCheck")
|
|
if isNullToNotNullChange(oldCol, newCol) {
|
|
oldCol.AddFlag(mysql.PreventNullInsertFlag)
|
|
}
|
|
if !noReorgDataStrict(tblInfo, oldCol, newCol) {
|
|
oldCol.ChangingFieldType = &newCol.FieldType
|
|
}
|
|
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
|
|
}
|
|
|
|
checked, err := checkModifyColumnData(
|
|
jobCtx.stepCtx, w,
|
|
dbInfo.Name, tblInfo.Name,
|
|
oldCol, newCol,
|
|
!noReorgDataStrict(tblInfo, oldCol, newCol),
|
|
)
|
|
if err != nil {
|
|
// If checked is true, it means we have done the check and found invalid data.
|
|
// Otherwise, it means we meet some internal error, just let outer side handle it (like retry).
|
|
if checked {
|
|
job.State = model.JobStateRollingback
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
failpoint.InjectCall("afterDoModifyColumnSkipReorgCheck")
|
|
|
|
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
|
|
job.MarkNonRevertible()
|
|
// Store the mark and enter the next DDL handling loop.
|
|
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, false)
|
|
}
|
|
|
|
return finishModifyColumnWithoutReorg(jobCtx, job, tblInfo, newCol, oldCol, pos)
|
|
}
|
|
|
|
func adjustTableInfoAfterModifyColumn(
|
|
tblInfo *model.TableInfo, newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) error {
|
|
// We need the latest column's offset and state. This information can be obtained from the store.
|
|
newCol.Offset = oldCol.Offset
|
|
newCol.State = oldCol.State
|
|
if pos != nil && pos.RelativeColumn != nil && oldCol.Name.L == pos.RelativeColumn.Name.L {
|
|
// For cases like `modify column b after b`, it should report this error.
|
|
return errors.Trace(infoschema.ErrColumnNotExists.GenWithStackByArgs(oldCol.Name, tblInfo.Name))
|
|
}
|
|
destOffset, err := LocateOffsetToMove(oldCol.Offset, pos, tblInfo)
|
|
if err != nil {
|
|
return errors.Trace(infoschema.ErrColumnNotExists.GenWithStackByArgs(oldCol.Name, tblInfo.Name))
|
|
}
|
|
tblInfo.Columns[oldCol.Offset] = newCol
|
|
tblInfo.MoveColumnInfo(oldCol.Offset, destOffset)
|
|
updateNewIdxColsNameOffset(tblInfo.Indices, oldCol.Name, newCol)
|
|
updateFKInfoWhenModifyColumn(tblInfo, oldCol.Name, newCol.Name)
|
|
updateTTLInfoWhenModifyColumn(tblInfo, oldCol.Name, newCol.Name)
|
|
return nil
|
|
}
|
|
|
|
func updateFKInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol ast.CIStr) {
|
|
if oldCol.L == newCol.L {
|
|
return
|
|
}
|
|
for _, fk := range tblInfo.ForeignKeys {
|
|
for i := range fk.Cols {
|
|
if fk.Cols[i].L == oldCol.L {
|
|
fk.Cols[i] = newCol
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateTTLInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol ast.CIStr) {
|
|
if oldCol.L == newCol.L {
|
|
return
|
|
}
|
|
if tblInfo.TTLInfo != nil {
|
|
if tblInfo.TTLInfo.ColumnName.L == oldCol.L {
|
|
tblInfo.TTLInfo.ColumnName = newCol
|
|
}
|
|
}
|
|
}
|
|
|
|
func adjustForeignKeyChildTableInfoAfterModifyColumn(infoCache *infoschema.InfoCache, t *meta.Mutator, job *model.Job, tblInfo *model.TableInfo, newCol, oldCol *model.ColumnInfo) ([]schemaIDAndTableInfo, error) {
|
|
if !vardef.EnableForeignKey.Load() || newCol.Name.L == oldCol.Name.L {
|
|
return nil, nil
|
|
}
|
|
is := infoCache.GetLatest()
|
|
referredFKs := is.GetTableReferredForeignKeys(job.SchemaName, tblInfo.Name.L)
|
|
if len(referredFKs) == 0 {
|
|
return nil, nil
|
|
}
|
|
fkh := newForeignKeyHelper()
|
|
fkh.addLoadedTable(job.SchemaName, tblInfo.Name.L, job.SchemaID, tblInfo)
|
|
for _, referredFK := range referredFKs {
|
|
info, err := fkh.getTableFromStorage(is, t, referredFK.ChildSchema, referredFK.ChildTable)
|
|
if err != nil {
|
|
if infoschema.ErrTableNotExists.Equal(err) || infoschema.ErrDatabaseNotExists.Equal(err) {
|
|
continue
|
|
}
|
|
return nil, err
|
|
}
|
|
fkInfo := model.FindFKInfoByName(info.tblInfo.ForeignKeys, referredFK.ChildFKName.L)
|
|
if fkInfo == nil {
|
|
continue
|
|
}
|
|
for i := range fkInfo.RefCols {
|
|
if fkInfo.RefCols[i].L == oldCol.Name.L {
|
|
fkInfo.RefCols[i] = newCol.Name
|
|
}
|
|
}
|
|
}
|
|
infoList := make([]schemaIDAndTableInfo, 0, len(fkh.loaded))
|
|
for _, info := range fkh.loaded {
|
|
if info.tblInfo.ID == tblInfo.ID {
|
|
continue
|
|
}
|
|
infoList = append(infoList, info)
|
|
}
|
|
return infoList, nil
|
|
}
|
|
|
|
func needIndexReorg(oldCol, changingCol *model.ColumnInfo) bool {
|
|
if isIntegerChange(oldCol, changingCol) {
|
|
return mysql.HasUnsignedFlag(oldCol.GetFlag()) != mysql.HasUnsignedFlag(changingCol.GetFlag())
|
|
}
|
|
|
|
intest.Assert(isCharChange(oldCol, changingCol))
|
|
|
|
// Check index key part, ref tablecodec.GenIndexKey
|
|
if !collate.CompatibleCollate(oldCol.GetCollate(), changingCol.GetCollate()) {
|
|
return true
|
|
}
|
|
|
|
// Check index value part, ref tablecodec.GenIndexValuePortal
|
|
// TODO(joechenrh): It's better to check each index here, because not all indexes need
|
|
// reorg even if the below condition is true.
|
|
return types.NeedRestoredData(&oldCol.FieldType) != types.NeedRestoredData(&changingCol.FieldType)
|
|
}
|
|
|
|
func needRowReorg(oldCol, changingCol *model.ColumnInfo) bool {
|
|
// Integer changes can skip reorg
|
|
if isIntegerChange(oldCol, changingCol) {
|
|
return false
|
|
}
|
|
|
|
// Other changes except char changes need row reorg.
|
|
if !isCharChange(oldCol, changingCol) {
|
|
return true
|
|
}
|
|
|
|
// We have checked charset before, only need to check binary string, which needs padding.
|
|
return types.IsBinaryStr(&oldCol.FieldType) || types.IsBinaryStr(&changingCol.FieldType)
|
|
}
|
|
|
|
// checkModifyColumnData checks the values of the old column data
|
|
func checkModifyColumnData(
|
|
ctx context.Context,
|
|
w *worker,
|
|
dbName, tblName ast.CIStr,
|
|
oldCol, changingCol *model.ColumnInfo,
|
|
checkValueRange bool,
|
|
) (checked bool, err error) {
|
|
// Get sessionctx from context resource pool.
|
|
var sctx sessionctx.Context
|
|
sctx, err = w.sessPool.Get()
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
defer w.sessPool.Put(sctx)
|
|
|
|
sql := buildCheckSQLFromModifyColumn(dbName, tblName, oldCol, changingCol, checkValueRange)
|
|
if sql == "" {
|
|
return false, nil
|
|
}
|
|
|
|
delayForAsyncCommit()
|
|
rows, _, err := sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(ctx, nil, sql)
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
if len(rows) != 0 {
|
|
if rows[0].IsNull(0) {
|
|
return true, dbterror.ErrInvalidUseOfNull
|
|
}
|
|
|
|
datum := rows[0].GetDatum(0, &oldCol.FieldType)
|
|
dStr := datumToStringNoErr(datum)
|
|
return true, types.ErrTruncated.GenWithStack("Data truncated for column '%s', value is '%s'", oldCol.Name.L, dStr)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// buildCheckSQLFromModifyColumn builds the SQL to check whether the data
|
|
// is valid after modifying to new type.
|
|
func buildCheckSQLFromModifyColumn(
|
|
dbName, tblName ast.CIStr,
|
|
oldCol, changingCol *model.ColumnInfo,
|
|
checkValueRange bool,
|
|
) string {
|
|
oldTp := oldCol.GetType()
|
|
changingTp := changingCol.GetType()
|
|
|
|
var conditions []string
|
|
template := "SELECT %s FROM %s WHERE %s LIMIT 1"
|
|
checkColName := fmt.Sprintf("`%s`", oldCol.Name.O)
|
|
tableName := fmt.Sprintf("`%s`.`%s`", dbName.O, tblName.O)
|
|
|
|
if checkValueRange {
|
|
if mysql.IsIntegerType(oldTp) && mysql.IsIntegerType(changingTp) {
|
|
// Integer conversion
|
|
conditions = append(conditions, buildCheckRangeForIntegerTypes(oldCol, changingCol))
|
|
} else {
|
|
conditions = append(conditions, fmt.Sprintf("LENGTH(%s) > %d", checkColName, changingCol.FieldType.GetFlen()))
|
|
if oldTp == mysql.TypeVarchar && changingTp == mysql.TypeString {
|
|
conditions = append(conditions, fmt.Sprintf("%s LIKE '%% '", checkColName))
|
|
}
|
|
}
|
|
}
|
|
|
|
if isNullToNotNullChange(oldCol, changingCol) {
|
|
if !(oldTp != mysql.TypeTimestamp && changingTp == mysql.TypeTimestamp) {
|
|
conditions = append(conditions, fmt.Sprintf("`%s` IS NULL", oldCol.Name.O))
|
|
}
|
|
}
|
|
|
|
if len(conditions) == 0 {
|
|
return ""
|
|
}
|
|
|
|
return fmt.Sprintf(template, checkColName, tableName, strings.Join(conditions, " OR "))
|
|
}
|
|
|
|
// buildCheckRangeForIntegerTypes builds the range check condition for integer type conversion.
|
|
func buildCheckRangeForIntegerTypes(oldCol, changingCol *model.ColumnInfo) string {
|
|
changingTp := changingCol.GetType()
|
|
changingUnsigned := mysql.HasUnsignedFlag(changingCol.GetFlag())
|
|
|
|
columnName := fmt.Sprintf("`%s`", oldCol.Name.O)
|
|
|
|
if changingUnsigned {
|
|
upperBound := types.IntegerUnsignedUpperBound(changingTp)
|
|
return fmt.Sprintf("(%s < 0 OR %s > %d)", columnName, columnName, upperBound)
|
|
}
|
|
|
|
lowerBound := types.IntegerSignedLowerBound(changingTp)
|
|
upperBound := types.IntegerSignedUpperBound(changingTp)
|
|
return fmt.Sprintf("(%s < %d OR %s > %d)", columnName, lowerBound, columnName, upperBound)
|
|
}
|
|
|
|
// reorderChangingIdx reorders the changing index infos to match the order of old index infos.
|
|
func reorderChangingIdx(oldIdxInfos []*model.IndexInfo, changingIdxInfos []*model.IndexInfo) {
|
|
nameToChanging := make(map[string]*model.IndexInfo, len(changingIdxInfos))
|
|
for _, cIdx := range changingIdxInfos {
|
|
origName := cIdx.Name.O
|
|
if cIdx.State != model.StatePublic {
|
|
origName = cIdx.GetChangingOriginName()
|
|
}
|
|
nameToChanging[origName] = cIdx
|
|
}
|
|
|
|
for i, oldIdx := range oldIdxInfos {
|
|
changingIdxInfos[i] = nameToChanging[oldIdx.GetRemovingOriginName()]
|
|
}
|
|
}
|
|
|
|
func (w *worker) doModifyColumnTypeWithData(
|
|
jobCtx *jobContext,
|
|
job *model.Job,
|
|
dbInfo *model.DBInfo,
|
|
tblInfo *model.TableInfo,
|
|
changingCol, oldCol *model.ColumnInfo,
|
|
args *model.ModifyColumnArgs,
|
|
) (ver int64, _ error) {
|
|
colName, pos := args.Column.Name, args.Position
|
|
|
|
var err error
|
|
originalState := changingCol.State
|
|
targetCol := changingCol.Clone()
|
|
targetCol.Name = colName
|
|
changingIdxs := buildRelatedIndexInfos(tblInfo, changingCol.ID)
|
|
switch changingCol.State {
|
|
case model.StateNone:
|
|
err = validatePosition(tblInfo, oldCol, pos)
|
|
if err != nil {
|
|
job.State = model.JobStateRollingback
|
|
return ver, errors.Trace(err)
|
|
}
|
|
// Column from null to not null.
|
|
if isNullToNotNullChange(oldCol, changingCol) {
|
|
oldCol.AddFlag(mysql.PreventNullInsertFlag)
|
|
}
|
|
// none -> delete only
|
|
updateObjectState(changingCol, changingIdxs, model.StateDeleteOnly)
|
|
job.ReorgMeta.Stage = model.ReorgStageModifyColumnUpdateColumn
|
|
err = initForReorgIndexes(w, job, changingIdxs)
|
|
if err != nil {
|
|
job.State = model.JobStateRollingback
|
|
return ver, errors.Trace(err)
|
|
}
|
|
failpoint.Inject("mockInsertValueAfterCheckNull", func(val failpoint.Value) {
|
|
if valStr, ok := val.(string); ok {
|
|
var sctx sessionctx.Context
|
|
sctx, err := w.sessPool.Get()
|
|
if err != nil {
|
|
failpoint.Return(ver, err)
|
|
}
|
|
defer w.sessPool.Put(sctx)
|
|
|
|
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
|
|
//nolint:forcetypeassert
|
|
_, _, err = sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(ctx, nil, valStr)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
failpoint.Return(ver, err)
|
|
}
|
|
}
|
|
})
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, originalState != changingCol.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
// Make sure job args change after `updateVersionAndTableInfoWithCheck`, otherwise, the job args will
|
|
// be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`.
|
|
job.SchemaState = model.StateDeleteOnly
|
|
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String(), args.OldColumnName.O).Set(0)
|
|
args.ChangingColumn = changingCol
|
|
args.ChangingIdxs = changingIdxs
|
|
failpoint.InjectCall("modifyColumnTypeWithData", job, args)
|
|
job.FillArgs(args)
|
|
case model.StateDeleteOnly:
|
|
// Column from null to not null.
|
|
if isNullToNotNullChange(oldCol, changingCol) {
|
|
checked, err := checkModifyColumnData(
|
|
jobCtx.stepCtx, w,
|
|
dbInfo.Name, tblInfo.Name,
|
|
oldCol, args.Column, false)
|
|
if err != nil {
|
|
if checked {
|
|
job.State = model.JobStateRollingback
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
// delete only -> write only
|
|
updateObjectState(changingCol, changingIdxs, model.StateWriteOnly)
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != changingCol.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.SchemaState = model.StateWriteOnly
|
|
failpoint.InjectCall("afterModifyColumnStateDeleteOnly", job.ID)
|
|
case model.StateWriteOnly:
|
|
// write only -> reorganization
|
|
updateObjectState(changingCol, changingIdxs, model.StateWriteReorganization)
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != changingCol.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
// Initialize SnapshotVer to 0 for later reorganization check.
|
|
job.SnapshotVer = 0
|
|
job.SchemaState = model.StateWriteReorganization
|
|
case model.StateWriteReorganization:
|
|
tbl, err := getTable(jobCtx.getAutoIDRequirement(), dbInfo.ID, tblInfo)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
switch job.ReorgMeta.AnalyzeState {
|
|
case model.AnalyzeStateNone:
|
|
switch job.ReorgMeta.Stage {
|
|
case model.ReorgStageModifyColumnUpdateColumn:
|
|
var done bool
|
|
reorgElements := BuildElements(changingCol, changingIdxs)
|
|
done, ver, err = doReorgWorkForModifyColumn(jobCtx, w, job, tbl, oldCol, reorgElements)
|
|
if !done {
|
|
return ver, err
|
|
}
|
|
if len(changingIdxs) > 0 {
|
|
job.SnapshotVer = 0
|
|
job.ReorgMeta.Stage = model.ReorgStageModifyColumnRecreateIndex
|
|
} else {
|
|
job.ReorgMeta.Stage = model.ReorgStageModifyColumnCompleted
|
|
}
|
|
case model.ReorgStageModifyColumnRecreateIndex:
|
|
var done bool
|
|
done, ver, err = doReorgWorkForCreateIndex(w, jobCtx, job, tbl, changingIdxs)
|
|
if !done {
|
|
return ver, err
|
|
}
|
|
job.ReorgMeta.Stage = model.ReorgStageModifyColumnCompleted
|
|
case model.ReorgStageModifyColumnCompleted:
|
|
// For multi-schema change, analyze is done by parent job.
|
|
if job.MultiSchemaInfo == nil && checkNeedAnalyze(job, tblInfo) {
|
|
job.ReorgMeta.AnalyzeState = model.AnalyzeStateRunning
|
|
} else {
|
|
job.ReorgMeta.AnalyzeState = model.AnalyzeStateSkipped
|
|
checkAndMarkNonRevertible(job)
|
|
}
|
|
}
|
|
case model.AnalyzeStateRunning:
|
|
intest.Assert(job.MultiSchemaInfo == nil, "multi schema change shouldn't reach here")
|
|
w.startAnalyzeAndWait(job, tblInfo)
|
|
case model.AnalyzeStateDone, model.AnalyzeStateSkipped, model.AnalyzeStateTimeout, model.AnalyzeStateFailed:
|
|
failpoint.InjectCall("afterReorgWorkForModifyColumn")
|
|
oldIdxInfos := buildRelatedIndexInfos(tblInfo, oldCol.ID)
|
|
if tblInfo.TTLInfo != nil {
|
|
updateTTLInfoWhenModifyColumn(tblInfo, oldCol.Name, colName)
|
|
}
|
|
changingIdxInfos := buildRelatedIndexInfos(tblInfo, changingCol.ID)
|
|
intest.Assert(len(oldIdxInfos) == len(changingIdxInfos))
|
|
|
|
// In multi-schema change, the order of changingIdxInfos may not be the same as oldIdxInfos,
|
|
// because we will allocate new indexID for previous temp index.
|
|
reorderChangingIdx(oldIdxInfos, changingIdxInfos)
|
|
|
|
updateObjectState(oldCol, oldIdxInfos, model.StateWriteOnly)
|
|
updateObjectState(changingCol, changingIdxInfos, model.StatePublic)
|
|
markOldObjectRemoving(oldCol, changingCol, oldIdxInfos, changingIdxInfos, colName)
|
|
moveChangingColumnToDest(tblInfo, oldCol, changingCol, pos)
|
|
moveOldColumnToBack(tblInfo, oldCol)
|
|
moveIndexInfoToDest(tblInfo, changingCol, oldIdxInfos, changingIdxInfos)
|
|
updateModifyingCols(oldCol, changingCol)
|
|
|
|
job.SchemaState = model.StatePublic
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
case model.StatePublic:
|
|
oldIdxInfos := buildRelatedIndexInfos(tblInfo, oldCol.ID)
|
|
switch oldCol.State {
|
|
case model.StateWriteOnly:
|
|
updateObjectState(oldCol, oldIdxInfos, model.StateDeleteOnly)
|
|
moveOldColumnToBack(tblInfo, oldCol)
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
case model.StateDeleteOnly:
|
|
removedIdxIDs := removeOldObjects(tblInfo, oldCol, oldIdxInfos)
|
|
removedIdxIDs = append(removedIdxIDs, getIngestTempIndexIDs(job, changingIdxs)...)
|
|
analyzed := job.ReorgMeta.AnalyzeState == model.AnalyzeStateDone
|
|
modifyColumnEvent := notifier.NewModifyColumnEvent(tblInfo, []*model.ColumnInfo{changingCol}, analyzed)
|
|
err = asyncNotifyEvent(jobCtx, modifyColumnEvent, job, noSubJob, w.sess)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
// Finish this job.
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
// Refactor the job args to add the old index ids into delete range table.
|
|
rmIdxs := append(removedIdxIDs, args.RedundantIdxs...)
|
|
args.IndexIDs = rmIdxs
|
|
newIdxIDs := make([]int64, 0, len(changingIdxs))
|
|
for _, idx := range changingIdxs {
|
|
newIdxIDs = append(newIdxIDs, idx.ID)
|
|
}
|
|
args.NewIndexIDs = newIdxIDs
|
|
args.PartitionIDs = getPartitionIDs(tblInfo)
|
|
job.FillFinishedArgs(args)
|
|
default:
|
|
errMsg := fmt.Sprintf("unexpected column state %s in modify column job", oldCol.State)
|
|
intest.Assert(false, errMsg)
|
|
return ver, errors.Errorf("%s", errMsg)
|
|
}
|
|
default:
|
|
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", changingCol.State)
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
func (w *worker) doModifyColumnIndexReorg(
|
|
jobCtx *jobContext,
|
|
job *model.Job,
|
|
dbInfo *model.DBInfo,
|
|
tblInfo *model.TableInfo,
|
|
oldCol *model.ColumnInfo,
|
|
args *model.ModifyColumnArgs,
|
|
) (ver int64, err error) {
|
|
colName, pos := args.Column.Name, args.Position
|
|
|
|
allIdxs := buildRelatedIndexInfos(tblInfo, oldCol.ID)
|
|
oldIdxInfos := make([]*model.IndexInfo, 0, len(allIdxs)/2)
|
|
changingIdxInfos := make([]*model.IndexInfo, 0, len(allIdxs)/2)
|
|
|
|
if job.SchemaState == model.StatePublic {
|
|
// The constraint that the first half of allIdxs is oldIdxInfos and
|
|
// the second half is changingIdxInfos isn't true now, so we need to
|
|
// find the oldIdxInfos again.
|
|
for _, idx := range allIdxs {
|
|
if idx.IsRemoving() {
|
|
oldIdxInfos = append(oldIdxInfos, idx)
|
|
}
|
|
}
|
|
} else {
|
|
changingIdxInfos = allIdxs[len(allIdxs)/2:]
|
|
oldIdxInfos = allIdxs[:len(allIdxs)/2]
|
|
}
|
|
|
|
finishFunc := func() (ver int64, err error) {
|
|
removedIdxIDs := make([]int64, 0, len(oldIdxInfos))
|
|
for _, idx := range oldIdxInfos {
|
|
removedIdxIDs = append(removedIdxIDs, idx.ID)
|
|
}
|
|
removedIdxIDs = append(removedIdxIDs, getIngestTempIndexIDs(job, changingIdxInfos)...)
|
|
removeOldIndexes(tblInfo, oldIdxInfos)
|
|
oldCol.ChangingFieldType = nil
|
|
oldCol.DelFlag(mysql.PreventNullInsertFlag)
|
|
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
// Finish this job.
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
// Refactor the job args to add the old index ids into delete range table.
|
|
rmIdxs := append(removedIdxIDs, args.RedundantIdxs...)
|
|
args.IndexIDs = rmIdxs
|
|
args.PartitionIDs = getPartitionIDs(tblInfo)
|
|
job.FillFinishedArgs(args)
|
|
return ver, nil
|
|
}
|
|
|
|
switch job.SchemaState {
|
|
case model.StateNone:
|
|
oldCol.AddFlag(mysql.PreventNullInsertFlag)
|
|
oldCol.ChangingFieldType = &args.Column.FieldType
|
|
// none -> delete only
|
|
updateObjectState(nil, changingIdxInfos, model.StateDeleteOnly)
|
|
job.ReorgMeta.Stage = model.ReorgStageModifyColumnUpdateColumn
|
|
err := initForReorgIndexes(w, job, changingIdxInfos)
|
|
if err != nil {
|
|
job.State = model.JobStateRollingback
|
|
return ver, errors.Trace(err)
|
|
}
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.SchemaState = model.StateDeleteOnly
|
|
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String(), args.OldColumnName.O).Set(0)
|
|
args.ChangingIdxs = changingIdxInfos
|
|
failpoint.InjectCall("modifyColumnTypeWithData", job, args)
|
|
job.FillArgs(args)
|
|
case model.StateDeleteOnly:
|
|
checked, err := checkModifyColumnData(
|
|
jobCtx.stepCtx, w,
|
|
dbInfo.Name, tblInfo.Name,
|
|
oldCol, args.Column, true)
|
|
if err != nil {
|
|
if checked {
|
|
job.State = model.JobStateRollingback
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// delete only -> write only
|
|
updateObjectState(nil, changingIdxInfos, model.StateWriteOnly)
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.SchemaState = model.StateWriteOnly
|
|
failpoint.InjectCall("afterModifyColumnStateDeleteOnly", job.ID)
|
|
case model.StateWriteOnly:
|
|
// write only -> reorganization
|
|
updateObjectState(nil, changingIdxInfos, model.StateWriteReorganization)
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
// Initialize SnapshotVer to 0 for later reorganization check.
|
|
job.SnapshotVer = 0
|
|
job.SchemaState = model.StateWriteReorganization
|
|
case model.StateWriteReorganization:
|
|
tbl, err := getTable(jobCtx.getAutoIDRequirement(), dbInfo.ID, tblInfo)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
switch job.ReorgMeta.AnalyzeState {
|
|
case model.AnalyzeStateNone:
|
|
switch job.ReorgMeta.Stage {
|
|
case model.ReorgStageModifyColumnUpdateColumn:
|
|
// Now row reorg
|
|
job.SnapshotVer = 0
|
|
job.ReorgMeta.Stage = model.ReorgStageModifyColumnRecreateIndex
|
|
case model.ReorgStageModifyColumnRecreateIndex:
|
|
var done bool
|
|
done, ver, err = doReorgWorkForCreateIndex(w, jobCtx, job, tbl, changingIdxInfos)
|
|
if !done {
|
|
return ver, err
|
|
}
|
|
job.ReorgMeta.Stage = model.ReorgStageModifyColumnCompleted
|
|
case model.ReorgStageModifyColumnCompleted:
|
|
// For multi-schema change, analyze is done by parent job.
|
|
if job.MultiSchemaInfo == nil && checkNeedAnalyze(job, tblInfo) {
|
|
job.ReorgMeta.AnalyzeState = model.AnalyzeStateRunning
|
|
} else {
|
|
job.ReorgMeta.AnalyzeState = model.AnalyzeStateSkipped
|
|
checkAndMarkNonRevertible(job)
|
|
}
|
|
}
|
|
case model.AnalyzeStateRunning:
|
|
intest.Assert(job.MultiSchemaInfo == nil, "multi schema change shouldn't reach here")
|
|
w.startAnalyzeAndWait(job, tblInfo)
|
|
case model.AnalyzeStateDone, model.AnalyzeStateSkipped, model.AnalyzeStateTimeout, model.AnalyzeStateFailed:
|
|
failpoint.InjectCall("afterReorgWorkForModifyColumn")
|
|
reorderChangingIdx(oldIdxInfos, changingIdxInfos)
|
|
oldTp := oldCol.FieldType
|
|
oldName := oldCol.Name
|
|
oldID := oldCol.ID
|
|
tblInfo.Columns[oldCol.Offset] = args.Column.Clone()
|
|
tblInfo.Columns[oldCol.Offset].ChangingFieldType = &oldTp
|
|
tblInfo.Columns[oldCol.Offset].Offset = oldCol.Offset
|
|
tblInfo.Columns[oldCol.Offset].ID = oldID
|
|
tblInfo.Columns[oldCol.Offset].State = model.StatePublic
|
|
oldCol = tblInfo.Columns[oldCol.Offset]
|
|
|
|
updateObjectState(nil, oldIdxInfos, model.StateWriteOnly)
|
|
updateObjectState(nil, changingIdxInfos, model.StatePublic)
|
|
moveChangingColumnToDest(tblInfo, oldCol, oldCol, pos)
|
|
moveIndexInfoToDest(tblInfo, oldCol, oldIdxInfos, changingIdxInfos)
|
|
markOldIndexesRemoving(oldIdxInfos, changingIdxInfos)
|
|
for i, idx := range changingIdxInfos {
|
|
for j, idxCol := range idx.Columns {
|
|
if idxCol.Name.L == oldName.L {
|
|
oldIdxInfos[i].Columns[j].Name = colName
|
|
oldIdxInfos[i].Columns[j].Offset = oldCol.Offset
|
|
oldIdxInfos[i].Columns[j].UseChangingType = true
|
|
changingIdxInfos[i].Columns[j].Name = colName
|
|
changingIdxInfos[i].Columns[j].Offset = oldCol.Offset
|
|
changingIdxInfos[i].Columns[j].UseChangingType = false
|
|
}
|
|
}
|
|
}
|
|
job.SchemaState = model.StatePublic
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
case model.StatePublic:
|
|
if len(oldIdxInfos) == 0 {
|
|
// All the old indexes has been deleted by previous modify column,
|
|
// we can just finish the job.
|
|
return finishFunc()
|
|
}
|
|
|
|
switch oldIdxInfos[0].State {
|
|
case model.StateWriteOnly:
|
|
updateObjectState(nil, oldIdxInfos, model.StateDeleteOnly)
|
|
return updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
case model.StateDeleteOnly:
|
|
return finishFunc()
|
|
default:
|
|
errMsg := fmt.Sprintf("unexpected column state %s in modify column job", oldCol.State)
|
|
intest.Assert(false, errMsg)
|
|
return ver, errors.Errorf("%s", errMsg)
|
|
}
|
|
default:
|
|
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", oldIdxInfos[0].State)
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// checkAndMarkNonRevertible should be called when the job is in the final revertible state before public.
|
|
func checkAndMarkNonRevertible(job *model.Job) {
|
|
// previously, when reorg is done, and before we set the state to public, we need all other sub-task to
|
|
// be ready as well which is via setting this job as NornRevertible, then we can continue skip current
|
|
// sub and process the others.
|
|
// And when all sub-task are ready, which means each of them could be public in one more ddl round. And
|
|
// in onMultiSchemaChange, we just give all sub-jobs each one more round to public the schema, and only
|
|
// use the schema version generated once.
|
|
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
|
|
job.MarkNonRevertible()
|
|
}
|
|
}
|
|
|
|
func doReorgWorkForModifyColumn(
|
|
jobCtx *jobContext,
|
|
w *worker, job *model.Job, tbl table.Table,
|
|
oldCol *model.ColumnInfo, elements []*meta.Element,
|
|
) (done bool, ver int64, err error) {
|
|
sctx, err1 := w.sessPool.Get()
|
|
if err1 != nil {
|
|
err = errors.Trace(err1)
|
|
return
|
|
}
|
|
defer w.sessPool.Put(sctx)
|
|
rh := newReorgHandler(sess.NewSession(sctx))
|
|
dbInfo, err := jobCtx.metaMut.GetDatabase(job.SchemaID)
|
|
if err != nil {
|
|
return false, ver, errors.Trace(err)
|
|
}
|
|
reorgInfo, err := getReorgInfo(jobCtx.oldDDLCtx.jobContext(job.ID, job.ReorgMeta),
|
|
jobCtx, rh, job, dbInfo, tbl, elements, false)
|
|
if err != nil || reorgInfo == nil || reorgInfo.first {
|
|
// If we run reorg firstly, we should update the job snapshot version
|
|
// and then run the reorg next time.
|
|
return false, ver, errors.Trace(err)
|
|
}
|
|
|
|
// Inject a failpoint so that we can pause here and do verification on other components.
|
|
// With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command:
|
|
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData".
|
|
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/pkg/ddl/mockDelayInModifyColumnTypeWithData"
|
|
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
|
|
err = w.runReorgJob(jobCtx, reorgInfo, tbl.Meta(), func() (addIndexErr error) {
|
|
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
|
|
func() {
|
|
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name)
|
|
}, false)
|
|
return w.modifyTableColumn(jobCtx, tbl, reorgInfo)
|
|
})
|
|
if err != nil {
|
|
if dbterror.ErrPausedDDLJob.Equal(err) {
|
|
return false, ver, nil
|
|
}
|
|
|
|
if dbterror.ErrWaitReorgTimeout.Equal(err) {
|
|
// If timeout, we should return, check for the owner and re-wait job done.
|
|
return false, ver, nil
|
|
}
|
|
if kv.IsTxnRetryableError(err) || dbterror.ErrNotOwner.Equal(err) {
|
|
return false, ver, errors.Trace(err)
|
|
}
|
|
if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
|
|
logutil.DDLLogger().Warn("run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback",
|
|
zap.String("job", job.String()), zap.Error(err1))
|
|
}
|
|
logutil.DDLLogger().Warn("run modify column job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err))
|
|
job.State = model.JobStateRollingback
|
|
return false, ver, errors.Trace(err)
|
|
}
|
|
return true, ver, nil
|
|
}
|
|
|
|
func checkModifyColumnWithGeneratedColumnsConstraint(allCols []*table.Column, oldColName ast.CIStr) error {
|
|
for _, col := range allCols {
|
|
if col.GeneratedExpr == nil {
|
|
continue
|
|
}
|
|
dependedColNames := FindColumnNamesInExpr(col.GeneratedExpr.Internal())
|
|
for _, name := range dependedColNames {
|
|
if name.Name.L == oldColName.L {
|
|
if col.Hidden {
|
|
return dbterror.ErrDependentByFunctionalIndex.GenWithStackByArgs(oldColName.O)
|
|
}
|
|
return dbterror.ErrDependentByGeneratedColumn.GenWithStackByArgs(oldColName.O)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func preCheckPartitionModifiableColumn(sctx sessionctx.Context, t table.Table, col, newCol *table.Column) error {
|
|
// Check that the column change does not affect the partitioning column
|
|
// It must keep the same type, int [unsigned], [var]char, date[time]
|
|
if t.Meta().Partition != nil {
|
|
pt, ok := t.(table.PartitionedTable)
|
|
if !ok {
|
|
// Should never happen!
|
|
return dbterror.ErrNotAllowedTypeInPartition.GenWithStackByArgs(newCol.Name.O)
|
|
}
|
|
for _, name := range pt.GetPartitionColumnNames() {
|
|
if strings.EqualFold(name.L, col.Name.L) {
|
|
return checkPartitionColumnModifiable(sctx, t.Meta(), col.ColumnInfo, newCol.ColumnInfo)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func postCheckPartitionModifiableColumn(w *worker, tblInfo *model.TableInfo, col, newCol *model.ColumnInfo) error {
|
|
// Check that the column change does not affect the partitioning column
|
|
// It must keep the same type, int [unsigned], [var]char, date[time]
|
|
if tblInfo.Partition != nil {
|
|
sctx, err := w.sessPool.Get()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer w.sessPool.Put(sctx)
|
|
if len(tblInfo.Partition.Columns) > 0 {
|
|
for _, pc := range tblInfo.Partition.Columns {
|
|
if strings.EqualFold(pc.L, col.Name.L) {
|
|
err := checkPartitionColumnModifiable(sctx, tblInfo, col, newCol)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
partCols, err := extractPartitionColumns(tblInfo.Partition.Expr, tblInfo)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
var partCol *model.ColumnInfo
|
|
for _, pc := range partCols {
|
|
if strings.EqualFold(pc.Name.L, col.Name.L) {
|
|
partCol = pc
|
|
break
|
|
}
|
|
}
|
|
if partCol != nil {
|
|
return checkPartitionColumnModifiable(sctx, tblInfo, col, newCol)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func checkPartitionColumnModifiable(sctx sessionctx.Context, tblInfo *model.TableInfo, col, newCol *model.ColumnInfo) error {
|
|
if col.Name.L != newCol.Name.L {
|
|
return dbterror.ErrDependentByPartitionFunctional.GenWithStackByArgs(col.Name.L)
|
|
}
|
|
if !isColTypeAllowedAsPartitioningCol(tblInfo.Partition.Type, newCol.FieldType) {
|
|
return dbterror.ErrNotAllowedTypeInPartition.GenWithStackByArgs(newCol.Name.O)
|
|
}
|
|
pi := tblInfo.GetPartitionInfo()
|
|
if len(pi.Columns) == 0 {
|
|
// non COLUMNS partitioning, only checks INTs, not their actual range
|
|
// There are many edge cases, like when truncating SQL Mode is allowed
|
|
// which will change the partitioning expression value resulting in a
|
|
// different partition. Better be safe and not allow decreasing of length.
|
|
// TODO: Should we allow it in strict mode? Wait for a use case / request.
|
|
if newCol.FieldType.GetFlen() < col.FieldType.GetFlen() {
|
|
return dbterror.ErrUnsupportedModifyColumn.GenWithStack("Unsupported modify column, decreasing length of int may result in truncation and change of partition")
|
|
}
|
|
}
|
|
// Basically only allow changes of the length/decimals for the column
|
|
// Note that enum is not allowed, so elems are not checked
|
|
// TODO: support partition by ENUM
|
|
if newCol.FieldType.EvalType() != col.FieldType.EvalType() ||
|
|
newCol.FieldType.GetFlag() != col.FieldType.GetFlag() ||
|
|
newCol.FieldType.GetCollate() != col.FieldType.GetCollate() ||
|
|
newCol.FieldType.GetCharset() != col.FieldType.GetCharset() {
|
|
return dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("can't change the partitioning column, since it would require reorganize all partitions")
|
|
}
|
|
// Generate a new PartitionInfo and validate it together with the new column definition
|
|
// Checks if all partition definition values are compatible.
|
|
// Similar to what buildRangePartitionDefinitions would do in terms of checks.
|
|
|
|
newTblInfo := *tblInfo
|
|
// Replace col with newCol and see if we can generate a new SHOW CREATE TABLE
|
|
// and reparse it and build new partition definitions (which will do additional
|
|
// checks columns vs partition definition values
|
|
newCols := make([]*model.ColumnInfo, 0, len(newTblInfo.Columns))
|
|
for _, c := range newTblInfo.Columns {
|
|
if c.ID == col.ID {
|
|
newCols = append(newCols, newCol)
|
|
continue
|
|
}
|
|
newCols = append(newCols, c)
|
|
}
|
|
newTblInfo.Columns = newCols
|
|
|
|
var buf bytes.Buffer
|
|
AppendPartitionInfo(tblInfo.GetPartitionInfo(), &buf, mysql.ModeNone)
|
|
// Ignoring warnings
|
|
stmt, _, err := parser.New().ParseSQL("ALTER TABLE t " + buf.String())
|
|
if err != nil {
|
|
// Should never happen!
|
|
return dbterror.ErrUnsupportedModifyColumn.GenWithStack("cannot parse generated PartitionInfo")
|
|
}
|
|
at, ok := stmt[0].(*ast.AlterTableStmt)
|
|
if !ok || len(at.Specs) != 1 || at.Specs[0].Partition == nil {
|
|
return dbterror.ErrUnsupportedModifyColumn.GenWithStack("cannot parse generated PartitionInfo")
|
|
}
|
|
pAst := at.Specs[0].Partition
|
|
_, err = buildPartitionDefinitionsInfo(
|
|
exprctx.CtxWithHandleTruncateErrLevel(sctx.GetExprCtx(), errctx.LevelError),
|
|
pAst.Definitions, &newTblInfo, uint64(len(newTblInfo.Partition.Definitions)),
|
|
)
|
|
if err != nil {
|
|
return dbterror.ErrUnsupportedModifyColumn.GenWithStack("New column does not match partition definitions: %s", err.Error())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var colStateOrd = map[model.SchemaState]int{
|
|
model.StateNone: 0,
|
|
model.StateDeleteOnly: 1,
|
|
model.StateDeleteReorganization: 2,
|
|
model.StateWriteOnly: 3,
|
|
model.StateWriteReorganization: 4,
|
|
model.StatePublic: 5,
|
|
}
|
|
|
|
func checkTableInfo(tblInfo *model.TableInfo) {
|
|
if !intest.InTest {
|
|
return
|
|
}
|
|
|
|
// Check columns' order by state.
|
|
minState := model.StatePublic
|
|
for _, col := range tblInfo.Columns {
|
|
if colStateOrd[col.State] < colStateOrd[minState] {
|
|
minState = col.State
|
|
} else if colStateOrd[col.State] > colStateOrd[minState] {
|
|
intest.Assert(false, fmt.Sprintf("column %s state %s is not in order, expect at least %s", col.Name, col.State, minState))
|
|
}
|
|
if col.ChangeStateInfo != nil {
|
|
offset := col.ChangeStateInfo.DependencyColumnOffset
|
|
intest.Assert(offset >= 0 && offset < len(tblInfo.Columns))
|
|
depCol := tblInfo.Columns[offset]
|
|
switch {
|
|
case col.IsChanging():
|
|
name := col.GetChangingOriginName()
|
|
intest.Assert(name == depCol.Name.O, "%s != %s", name, depCol.Name.O)
|
|
case depCol.IsRemoving():
|
|
name := depCol.GetRemovingOriginName()
|
|
intest.Assert(name == col.Name.O, "%s != %s", name, col.Name.O)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check index names' uniqueness.
|
|
allNames := make(map[string]struct{})
|
|
for _, idx := range tblInfo.Indices {
|
|
_, exists := allNames[idx.Name.O]
|
|
intest.Assert(!exists, "duplicate index name %s", idx.Name.O)
|
|
allNames[idx.Name.O] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// GetModifiableColumnJob returns a DDL job of model.ActionModifyColumn.
|
|
func GetModifiableColumnJob(
|
|
ctx context.Context,
|
|
sctx sessionctx.Context,
|
|
is infoschema.InfoSchema, // WARN: is maybe nil here.
|
|
ident ast.Ident,
|
|
originalColName ast.CIStr,
|
|
schema *model.DBInfo,
|
|
t table.Table,
|
|
spec *ast.AlterTableSpec,
|
|
) (*JobWrapper, error) {
|
|
var err error
|
|
specNewColumn := spec.NewColumns[0]
|
|
|
|
col := table.FindCol(t.Cols(), originalColName.L)
|
|
if col == nil {
|
|
return nil, infoschema.ErrColumnNotExists.GenWithStackByArgs(originalColName, ident.Name)
|
|
}
|
|
err = checkColumnReferencedByPartialCondition(t.Meta(), col.ColumnInfo.Name)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
newColName := specNewColumn.Name.Name
|
|
if newColName.L == model.ExtraHandleName.L {
|
|
return nil, dbterror.ErrWrongColumnName.GenWithStackByArgs(newColName.L)
|
|
}
|
|
errG := checkModifyColumnWithGeneratedColumnsConstraint(t.Cols(), originalColName)
|
|
|
|
// If we want to rename the column name, we need to check whether it already exists.
|
|
if newColName.L != originalColName.L {
|
|
c := table.FindCol(t.Cols(), newColName.L)
|
|
if c != nil {
|
|
return nil, infoschema.ErrColumnExists.GenWithStackByArgs(newColName)
|
|
}
|
|
|
|
// And also check the generated columns dependency, if some generated columns
|
|
// depend on this column, we can't rename the column name.
|
|
if errG != nil {
|
|
return nil, errors.Trace(errG)
|
|
}
|
|
}
|
|
|
|
// Constraints in the new column means adding new constraints. Errors should thrown,
|
|
// which will be done by `processColumnOptions` later.
|
|
if specNewColumn.Tp == nil {
|
|
// Make sure the column definition is simple field type.
|
|
return nil, errors.Trace(dbterror.ErrUnsupportedModifyColumn)
|
|
}
|
|
|
|
if err = checkColumnAttributes(specNewColumn.Name.OrigColName(), specNewColumn.Tp); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
newCol := table.ToColumn(&model.ColumnInfo{
|
|
ID: col.ID,
|
|
// We use this PR(https://github.com/pingcap/tidb/pull/6274) as the dividing line to define whether it is a new version or an old version TiDB.
|
|
// The old version TiDB initializes the column's offset and state here.
|
|
// The new version TiDB doesn't initialize the column's offset and state, and it will do the initialization in run DDL function.
|
|
// When we do the rolling upgrade the following may happen:
|
|
// a new version TiDB builds the DDL job that doesn't be set the column's offset and state,
|
|
// and the old version TiDB is the DDL owner, it doesn't get offset and state from the store. Then it will encounter errors.
|
|
// So here we set offset and state to support the rolling upgrade.
|
|
Offset: col.Offset,
|
|
State: col.State,
|
|
OriginDefaultValue: col.OriginDefaultValue,
|
|
OriginDefaultValueBit: col.OriginDefaultValueBit,
|
|
FieldType: *specNewColumn.Tp,
|
|
Name: newColName,
|
|
Version: col.Version,
|
|
})
|
|
|
|
if err = ProcessColumnCharsetAndCollation(NewMetaBuildContextWithSctx(sctx), col, newCol, t.Meta(), specNewColumn, schema); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = checkModifyColumnWithForeignKeyConstraint(is, schema.Name.L, t.Meta(), col.ColumnInfo, newCol.ColumnInfo); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
// Copy index related options to the new spec.
|
|
indexFlags := col.FieldType.GetFlag() & (mysql.PriKeyFlag | mysql.UniqueKeyFlag | mysql.MultipleKeyFlag)
|
|
newCol.FieldType.AddFlag(indexFlags)
|
|
if mysql.HasPriKeyFlag(col.FieldType.GetFlag()) {
|
|
newCol.FieldType.AddFlag(mysql.NotNullFlag)
|
|
// TODO: If user explicitly set NULL, we should throw error ErrPrimaryCantHaveNull.
|
|
}
|
|
|
|
if err = ProcessModifyColumnOptions(sctx, newCol, specNewColumn.Options); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
if err = checkModifyTypes(col.ColumnInfo, newCol.ColumnInfo, isColumnWithIndex(col.Name.L, t.Meta().Indices)); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
mayNeedChangeColData := !noReorgDataStrict(t.Meta(), col.ColumnInfo, newCol.ColumnInfo)
|
|
if mayNeedChangeColData {
|
|
if err = isGeneratedRelatedColumn(t.Meta(), newCol.ColumnInfo, col.ColumnInfo); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if isColumnarIndexColumn(t.Meta(), col.ColumnInfo) {
|
|
return nil, dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("columnar indexes on the column")
|
|
}
|
|
// new col's origin default value be the same as the new default value.
|
|
originDefVal, err := generateOriginDefaultValue(newCol.ColumnInfo, sctx, false)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if err = newCol.ColumnInfo.SetOriginDefaultValue(originDefVal); err != nil {
|
|
return nil, dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("new column set origin default value failed")
|
|
}
|
|
}
|
|
|
|
err = preCheckPartitionModifiableColumn(sctx, t, col, newCol)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We don't support modifying column from not_auto_increment to auto_increment.
|
|
if !mysql.HasAutoIncrementFlag(col.GetFlag()) && mysql.HasAutoIncrementFlag(newCol.GetFlag()) {
|
|
return nil, dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("can't set auto_increment")
|
|
}
|
|
// Not support auto id with default value.
|
|
if mysql.HasAutoIncrementFlag(newCol.GetFlag()) && newCol.GetDefaultValue() != nil {
|
|
return nil, dbterror.ErrInvalidDefaultValue.GenWithStackByArgs(newCol.Name)
|
|
}
|
|
// Disallow modifying column from auto_increment to not auto_increment if the session variable `AllowRemoveAutoInc` is false.
|
|
//nolint:forbidigo
|
|
if !sctx.GetSessionVars().AllowRemoveAutoInc && mysql.HasAutoIncrementFlag(col.GetFlag()) && !mysql.HasAutoIncrementFlag(newCol.GetFlag()) {
|
|
return nil, dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("can't remove auto_increment without @@tidb_allow_remove_auto_inc enabled")
|
|
}
|
|
|
|
// We support modifying the type definitions of 'null' to 'not null' now.
|
|
if !mysql.HasNotNullFlag(col.GetFlag()) && mysql.HasNotNullFlag(newCol.GetFlag()) {
|
|
if err = checkForNullValue(ctx, sctx, true, ident.Schema, ident.Name, newCol.ColumnInfo, col.ColumnInfo); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
if err = checkColumnWithIndexConstraint(t.Meta(), col.ColumnInfo, newCol.ColumnInfo); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// As same with MySQL, we don't support modifying the stored status for generated columns.
|
|
if err = checkModifyGeneratedColumn(sctx, schema.Name, t, col, newCol, specNewColumn, spec.Position); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if errG != nil {
|
|
// According to issue https://github.com/pingcap/tidb/issues/24321,
|
|
// changing the type of a column involving generating a column is prohibited.
|
|
return nil, dbterror.ErrUnsupportedOnGeneratedColumn.GenWithStackByArgs(errG.Error())
|
|
}
|
|
|
|
if t.Meta().TTLInfo != nil {
|
|
// the column referenced by TTL should be a time type
|
|
if t.Meta().TTLInfo.ColumnName.L == originalColName.L && !types.IsTypeTime(newCol.ColumnInfo.FieldType.GetType()) {
|
|
return nil, errors.Trace(dbterror.ErrUnsupportedColumnInTTLConfig.GenWithStackByArgs(newCol.ColumnInfo.Name.O))
|
|
}
|
|
}
|
|
|
|
var newAutoRandBits uint64
|
|
if newAutoRandBits, err = checkAutoRandom(t.Meta(), col, specNewColumn); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
txn, err := sctx.Txn(true)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
bdrRole, err := meta.NewMutator(txn).GetBDRRole()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if bdrRole == string(ast.BDRRolePrimary) &&
|
|
deniedByBDRWhenModifyColumn(newCol.FieldType, col.FieldType, specNewColumn.Options) && !filter.IsSystemSchema(schema.Name.L) {
|
|
return nil, dbterror.ErrBDRRestrictedDDL.FastGenByArgs(bdrRole)
|
|
}
|
|
|
|
job := &model.Job{
|
|
Version: model.GetJobVerInUse(),
|
|
SchemaID: schema.ID,
|
|
TableID: t.Meta().ID,
|
|
SchemaName: schema.Name.L,
|
|
TableName: t.Meta().Name.L,
|
|
Type: model.ActionModifyColumn,
|
|
BinlogInfo: &model.HistoryInfo{},
|
|
NeedReorg: mayNeedChangeColData,
|
|
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
|
|
SQLMode: sctx.GetSessionVars().SQLMode,
|
|
SessionVars: make(map[string]string),
|
|
}
|
|
err = initJobReorgMetaFromVariables(ctx, job, t, sctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
args := &model.ModifyColumnArgs{
|
|
Column: newCol.ColumnInfo,
|
|
OldColumnName: originalColName,
|
|
Position: spec.Position,
|
|
NewShardBits: newAutoRandBits,
|
|
}
|
|
return NewJobWrapperWithArgs(job, args, false), nil
|
|
}
|
|
|
|
// noReorgDataStrict is a strong check to decide whether we need to change the column data.
|
|
// If it returns true, it means we don't need the reorg no matter what the data is.
|
|
func noReorgDataStrict(tblInfo *model.TableInfo, oldCol, newCol *model.ColumnInfo) bool {
|
|
toUnsigned := mysql.HasUnsignedFlag(newCol.GetFlag())
|
|
originUnsigned := mysql.HasUnsignedFlag(oldCol.GetFlag())
|
|
needTruncationOrToggleSign := func() bool {
|
|
return (newCol.GetFlen() > 0 && (newCol.GetFlen() < oldCol.GetFlen() || newCol.GetDecimal() < oldCol.GetDecimal())) ||
|
|
(toUnsigned != originUnsigned)
|
|
}
|
|
// Ignore the potential max display length represented by integer's flen, use default flen instead.
|
|
defaultOldColFlen, _ := mysql.GetDefaultFieldLengthAndDecimal(oldCol.GetType())
|
|
defaultNewColFlen, _ := mysql.GetDefaultFieldLengthAndDecimal(newCol.GetType())
|
|
needTruncationOrToggleSignForInteger := func() bool {
|
|
return (defaultNewColFlen > 0 && defaultNewColFlen < defaultOldColFlen) || (toUnsigned != originUnsigned)
|
|
}
|
|
|
|
// Deal with the same type.
|
|
if oldCol.GetType() == newCol.GetType() {
|
|
switch oldCol.GetType() {
|
|
case mysql.TypeNewDecimal:
|
|
// Since type decimal will encode the precision, frac, negative(signed) and wordBuf into storage together, there is no short
|
|
// cut to eliminate data reorg change for column type change between decimal.
|
|
return oldCol.GetFlen() == newCol.GetFlen() && oldCol.GetDecimal() == newCol.GetDecimal() && toUnsigned == originUnsigned
|
|
case mysql.TypeEnum, mysql.TypeSet:
|
|
return !IsElemsChangedToModifyColumn(oldCol.GetElems(), newCol.GetElems())
|
|
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong:
|
|
return toUnsigned == originUnsigned
|
|
case mysql.TypeString:
|
|
// Due to the behavior of padding \x00 at binary type, always change column data when binary length changed
|
|
if types.IsBinaryStr(&oldCol.FieldType) {
|
|
return newCol.GetFlen() == oldCol.GetFlen()
|
|
}
|
|
case mysql.TypeTiDBVectorFloat32:
|
|
return !(newCol.GetFlen() != types.UnspecifiedLength && oldCol.GetFlen() != newCol.GetFlen())
|
|
}
|
|
|
|
return !needTruncationOrToggleSign()
|
|
}
|
|
|
|
oldTp := oldCol.GetType()
|
|
newTp := newCol.GetType()
|
|
// VARCHAR->CHAR, may need reorg.
|
|
if types.IsTypeVarchar(oldTp) && newTp == mysql.TypeString {
|
|
return false
|
|
}
|
|
// CHAR->VARCHAR
|
|
if oldTp == mysql.TypeString && types.IsTypeVarchar(newTp) {
|
|
// If there are related index, the index may need reorg.
|
|
relatedIndexes := getRelatedIndexIDs(tblInfo, oldCol.ID, false)
|
|
if len(relatedIndexes) > 0 {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Deal with the different type.
|
|
switch oldCol.GetType() {
|
|
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
|
|
switch newCol.GetType() {
|
|
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
|
|
return !needTruncationOrToggleSign()
|
|
}
|
|
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong:
|
|
switch newCol.GetType() {
|
|
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong:
|
|
return !needTruncationOrToggleSignForInteger()
|
|
}
|
|
// conversion between float and double needs reorganization, see issue #31372
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// ConvertBetweenCharAndVarchar check whether column converted between char and varchar
|
|
// TODO: it is used for plugins. so change plugin's using and remove it.
|
|
func ConvertBetweenCharAndVarchar(oldCol, newCol byte) bool {
|
|
return types.ConvertBetweenCharAndVarchar(oldCol, newCol)
|
|
}
|
|
|
|
// IsElemsChangedToModifyColumn check elems changed
|
|
func IsElemsChangedToModifyColumn(oldElems, newElems []string) bool {
|
|
if len(newElems) < len(oldElems) {
|
|
return true
|
|
}
|
|
for index, oldElem := range oldElems {
|
|
newElem := newElems[index]
|
|
if oldElem != newElem {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// ProcessColumnCharsetAndCollation process column charset and collation
|
|
func ProcessColumnCharsetAndCollation(ctx *metabuild.Context, col *table.Column, newCol *table.Column, meta *model.TableInfo, specNewColumn *ast.ColumnDef, schema *model.DBInfo) error {
|
|
var chs, coll string
|
|
var err error
|
|
// TODO: Remove it when all table versions are greater than or equal to TableInfoVersion1.
|
|
// If newCol's charset is empty and the table's version less than TableInfoVersion1,
|
|
// we will not modify the charset of the column. This behavior is not compatible with MySQL.
|
|
if len(newCol.FieldType.GetCharset()) == 0 && meta.Version < model.TableInfoVersion1 {
|
|
chs = col.FieldType.GetCharset()
|
|
coll = col.FieldType.GetCollate()
|
|
} else {
|
|
chs, coll, err = getCharsetAndCollateInColumnDef(specNewColumn, ctx.GetDefaultCollationForUTF8MB4())
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
chs, coll, err = ResolveCharsetCollation([]ast.CharsetOpt{
|
|
{Chs: chs, Col: coll},
|
|
{Chs: meta.Charset, Col: meta.Collate},
|
|
{Chs: schema.Charset, Col: schema.Collate},
|
|
}, ctx.GetDefaultCollationForUTF8MB4())
|
|
chs, coll = OverwriteCollationWithBinaryFlag(specNewColumn, chs, coll, ctx.GetDefaultCollationForUTF8MB4())
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
if err = setCharsetCollationFlenDecimal(ctx, &newCol.FieldType, newCol.Name.O, chs, coll); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
decodeEnumSetBinaryLiteralToUTF8(&newCol.FieldType, chs)
|
|
return nil
|
|
}
|
|
|
|
func getReplacedColumns(tbInfo *model.TableInfo, oldCol, newCol *model.ColumnInfo) []*model.ColumnInfo {
|
|
columns := make([]*model.ColumnInfo, 0, len(tbInfo.Columns))
|
|
columns = append(columns, tbInfo.Columns...)
|
|
// Replace old column with new column.
|
|
for i, col := range columns {
|
|
if col.Name.L != oldCol.Name.L {
|
|
continue
|
|
}
|
|
columns[i] = newCol.Clone()
|
|
columns[i].Name = oldCol.Name
|
|
return columns
|
|
}
|
|
|
|
return columns
|
|
}
|
|
|
|
// checkColumnWithIndexConstraint is used to check the related index constraint of the modified column.
|
|
// Index has a max-prefix-length constraint. eg: a varchar(100), index idx(a), modifying column a to a varchar(4000)
|
|
// will cause index idx to break the max-prefix-length constraint.
|
|
func checkColumnWithIndexConstraint(tbInfo *model.TableInfo, originalCol, newCol *model.ColumnInfo) error {
|
|
columns := getReplacedColumns(tbInfo, originalCol, newCol)
|
|
|
|
pkIndex := tables.FindPrimaryIndex(tbInfo)
|
|
|
|
checkOneIndex := func(indexInfo *model.IndexInfo) (err error) {
|
|
var modified bool
|
|
for _, col := range indexInfo.Columns {
|
|
if col.Name.L == originalCol.Name.L {
|
|
modified = true
|
|
break
|
|
}
|
|
}
|
|
if !modified {
|
|
return
|
|
}
|
|
err = checkIndexInModifiableColumns(columns, indexInfo)
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = checkIndexPrefixLength(columns, indexInfo.Columns, indexInfo.GetColumnarIndexType())
|
|
return
|
|
}
|
|
|
|
// Check primary key first.
|
|
var err error
|
|
|
|
if pkIndex != nil {
|
|
err = checkOneIndex(pkIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Check secondary indexes.
|
|
for _, indexInfo := range tbInfo.Indices {
|
|
if indexInfo.Primary {
|
|
continue
|
|
}
|
|
// the second param should always be set to true, check index length only if it was modified
|
|
// checkOneIndex needs one param only.
|
|
err = checkOneIndex(indexInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func checkIndexInModifiableColumns(columns []*model.ColumnInfo, idxInfo *model.IndexInfo) error {
|
|
indexType := idxInfo.GetColumnarIndexType()
|
|
for _, ic := range idxInfo.Columns {
|
|
col := model.FindColumnInfo(columns, ic.Name.L)
|
|
if col == nil {
|
|
return dbterror.ErrKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ic.Name)
|
|
}
|
|
|
|
if indexType != model.ColumnarIndexTypeNA {
|
|
continue
|
|
}
|
|
|
|
prefixLength := types.UnspecifiedLength
|
|
if types.IsTypePrefixable(col.FieldType.GetType()) && col.FieldType.GetFlen() > ic.Length {
|
|
// When the index column is changed, prefix length is only valid
|
|
// if the type is still prefixable and larger than old prefix length.
|
|
prefixLength = ic.Length
|
|
}
|
|
if err := checkIndexColumn(col, prefixLength, false); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkModifyTypes checks if the 'origin' type can be modified to 'to' type no matter directly change
|
|
// or change by reorg. It returns error if the two types are incompatible and correlated change are not
|
|
// supported. However, even the two types can be change, if the "origin" type contains primary key, error will be returned.
|
|
func checkModifyTypes(from, to *model.ColumnInfo, needRewriteCollationData bool) error {
|
|
fromFt := &from.FieldType
|
|
toFt := &to.FieldType
|
|
canReorg, err := types.CheckModifyTypeCompatible(fromFt, toFt)
|
|
if err != nil {
|
|
if !canReorg {
|
|
return errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs(err.Error()))
|
|
}
|
|
if mysql.HasPriKeyFlag(fromFt.GetFlag()) {
|
|
msg := "this column has primary key flag"
|
|
return dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs(msg)
|
|
}
|
|
}
|
|
|
|
err = checkModifyCharsetAndCollation(toFt.GetCharset(), toFt.GetCollate(), fromFt.GetCharset(), fromFt.GetCollate(), needRewriteCollationData)
|
|
|
|
if err != nil {
|
|
if toFt.GetCharset() == charset.CharsetGBK || fromFt.GetCharset() == charset.CharsetGBK {
|
|
return errors.Trace(err)
|
|
}
|
|
if strings.Contains(err.Error(), "Unsupported modifying collation") {
|
|
colErrMsg := "Unsupported modifying collation of column '%s' from '%s' to '%s' when index is defined on it."
|
|
err = dbterror.ErrUnsupportedModifyCollation.GenWithStack(colErrMsg, from.Name.L, from.GetCollate(), to.GetCollate())
|
|
}
|
|
|
|
// column type change can handle the charset change between these two types in the process of the reorg.
|
|
if dbterror.ErrUnsupportedModifyCharset.Equal(err) && canReorg {
|
|
return nil
|
|
}
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// ProcessModifyColumnOptions process column options.
|
|
// Export for tiflow.
|
|
func ProcessModifyColumnOptions(ctx sessionctx.Context, col *table.Column, options []*ast.ColumnOption) error {
|
|
var sb strings.Builder
|
|
restoreFlags := format.RestoreStringSingleQuotes | format.RestoreKeyWordLowercase | format.RestoreNameBackQuotes |
|
|
format.RestoreSpacesAroundBinaryOperation | format.RestoreWithoutSchemaName | format.RestoreWithoutSchemaName
|
|
restoreCtx := format.NewRestoreCtx(restoreFlags, &sb)
|
|
|
|
var hasDefaultValue, setOnUpdateNow bool
|
|
var err error
|
|
var hasNullFlag bool
|
|
for _, opt := range options {
|
|
switch opt.Tp {
|
|
case ast.ColumnOptionDefaultValue:
|
|
hasDefaultValue, err = SetDefaultValue(ctx.GetExprCtx(), col, opt)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
case ast.ColumnOptionComment:
|
|
err := setColumnComment(ctx.GetExprCtx(), col, opt)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
case ast.ColumnOptionNotNull:
|
|
col.AddFlag(mysql.NotNullFlag)
|
|
case ast.ColumnOptionNull:
|
|
hasNullFlag = true
|
|
col.DelFlag(mysql.NotNullFlag)
|
|
case ast.ColumnOptionAutoIncrement:
|
|
col.AddFlag(mysql.AutoIncrementFlag)
|
|
case ast.ColumnOptionPrimaryKey:
|
|
return errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStack("can't change column constraint (PRIMARY KEY)"))
|
|
case ast.ColumnOptionUniqKey:
|
|
return errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStack("can't change column constraint (UNIQUE KEY)"))
|
|
case ast.ColumnOptionOnUpdate:
|
|
// TODO: Support other time functions.
|
|
if !(col.GetType() == mysql.TypeTimestamp || col.GetType() == mysql.TypeDatetime) {
|
|
return dbterror.ErrInvalidOnUpdate.GenWithStackByArgs(col.Name)
|
|
}
|
|
if !expression.IsValidCurrentTimestampExpr(opt.Expr, &col.FieldType) {
|
|
return dbterror.ErrInvalidOnUpdate.GenWithStackByArgs(col.Name)
|
|
}
|
|
col.AddFlag(mysql.OnUpdateNowFlag)
|
|
setOnUpdateNow = true
|
|
case ast.ColumnOptionGenerated:
|
|
sb.Reset()
|
|
err = opt.Expr.Restore(restoreCtx)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
col.GeneratedExprString = sb.String()
|
|
col.GeneratedStored = opt.Stored
|
|
col.Dependences = make(map[string]struct{})
|
|
// Only used by checkModifyGeneratedColumn, there is no need to set a ctor for it.
|
|
col.GeneratedExpr = table.NewClonableExprNode(nil, opt.Expr)
|
|
for _, colName := range FindColumnNamesInExpr(opt.Expr) {
|
|
col.Dependences[colName.Name.L] = struct{}{}
|
|
}
|
|
case ast.ColumnOptionCollate:
|
|
col.SetCollate(opt.StrValue)
|
|
case ast.ColumnOptionReference:
|
|
return errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("can't modify with references"))
|
|
case ast.ColumnOptionFulltext:
|
|
return errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("can't modify with full text"))
|
|
case ast.ColumnOptionCheck:
|
|
return errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("can't modify with check"))
|
|
// Ignore ColumnOptionAutoRandom. It will be handled later.
|
|
case ast.ColumnOptionAutoRandom:
|
|
default:
|
|
return errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs(fmt.Sprintf("unknown column option type: %d", opt.Tp)))
|
|
}
|
|
}
|
|
|
|
if err = processAndCheckDefaultValueAndColumn(ctx.GetExprCtx(), col, nil, hasDefaultValue, setOnUpdateNow, hasNullFlag); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func checkAutoRandom(tableInfo *model.TableInfo, originCol *table.Column, specNewColumn *ast.ColumnDef) (uint64, error) {
|
|
var oldShardBits, oldRangeBits uint64
|
|
if isClusteredPKColumn(originCol, tableInfo) {
|
|
oldShardBits = tableInfo.AutoRandomBits
|
|
oldRangeBits = tableInfo.AutoRandomRangeBits
|
|
}
|
|
newShardBits, newRangeBits, err := extractAutoRandomBitsFromColDef(specNewColumn)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
switch {
|
|
case oldShardBits == newShardBits:
|
|
case oldShardBits < newShardBits:
|
|
addingAutoRandom := oldShardBits == 0
|
|
if addingAutoRandom {
|
|
convFromAutoInc := mysql.HasAutoIncrementFlag(originCol.GetFlag()) && originCol.IsPKHandleColumn(tableInfo)
|
|
if !convFromAutoInc {
|
|
return 0, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomAlterChangeFromAutoInc)
|
|
}
|
|
}
|
|
if autoid.AutoRandomShardBitsMax < newShardBits {
|
|
errMsg := fmt.Sprintf(autoid.AutoRandomOverflowErrMsg,
|
|
autoid.AutoRandomShardBitsMax, newShardBits, specNewColumn.Name.Name.O)
|
|
return 0, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
|
|
}
|
|
// increasing auto_random shard bits is allowed.
|
|
case oldShardBits > newShardBits:
|
|
if newShardBits == 0 {
|
|
return 0, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomAlterErrMsg)
|
|
}
|
|
return 0, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomDecreaseBitErrMsg)
|
|
}
|
|
|
|
modifyingAutoRandCol := oldShardBits > 0 || newShardBits > 0
|
|
if modifyingAutoRandCol {
|
|
// Disallow changing the column field type.
|
|
if originCol.GetType() != specNewColumn.Tp.GetType() {
|
|
return 0, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomModifyColTypeErrMsg)
|
|
}
|
|
if originCol.GetType() != mysql.TypeLonglong {
|
|
return 0, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(fmt.Sprintf(autoid.AutoRandomOnNonBigIntColumn, types.TypeStr(originCol.GetType())))
|
|
}
|
|
// Disallow changing from auto_random to auto_increment column.
|
|
if containsColumnOption(specNewColumn, ast.ColumnOptionAutoIncrement) {
|
|
return 0, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomIncompatibleWithAutoIncErrMsg)
|
|
}
|
|
// Disallow specifying a default value on auto_random column.
|
|
if containsColumnOption(specNewColumn, ast.ColumnOptionDefaultValue) {
|
|
return 0, dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(autoid.AutoRandomIncompatibleWithDefaultValueErrMsg)
|
|
}
|
|
}
|
|
if rangeBitsIsChanged(oldRangeBits, newRangeBits) {
|
|
return 0, dbterror.ErrInvalidAutoRandom.FastGenByArgs(autoid.AutoRandomUnsupportedAlterRangeBits)
|
|
}
|
|
return newShardBits, nil
|
|
}
|
|
|
|
func isClusteredPKColumn(col *table.Column, tblInfo *model.TableInfo) bool {
|
|
switch {
|
|
case tblInfo.PKIsHandle:
|
|
return mysql.HasPriKeyFlag(col.GetFlag())
|
|
case tblInfo.IsCommonHandle:
|
|
pk := tables.FindPrimaryIndex(tblInfo)
|
|
for _, c := range pk.Columns {
|
|
if c.Name.L == col.Name.L {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func rangeBitsIsChanged(oldBits, newBits uint64) bool {
|
|
if oldBits == 0 {
|
|
oldBits = autoid.AutoRandomRangeBitsDefault
|
|
}
|
|
if newBits == 0 {
|
|
newBits = autoid.AutoRandomRangeBitsDefault
|
|
}
|
|
return oldBits != newBits
|
|
}
|