1359 lines
45 KiB
Go
1359 lines
45 KiB
Go
// Copyright 2015 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ddl
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math/bits"
|
|
"slices"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/ddl/logutil"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/meta"
|
|
"github.com/pingcap/tidb/pkg/meta/autoid"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/metrics"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
"github.com/pingcap/tidb/pkg/parser/terror"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
contextutil "github.com/pingcap/tidb/pkg/util/context"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror"
|
|
"github.com/pingcap/tidb/pkg/util/intest"
|
|
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
|
|
"github.com/pingcap/tidb/pkg/util/rowcodec"
|
|
kvutil "github.com/tikv/client-go/v2/util"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// InitAndAddColumnToTable initializes the ColumnInfo in-place and adds it to the table.
|
|
func InitAndAddColumnToTable(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) *model.ColumnInfo {
|
|
cols := tblInfo.Columns
|
|
colInfo.ID = AllocateColumnID(tblInfo)
|
|
colInfo.State = model.StateNone
|
|
// To support add column asynchronously, we should mark its offset as the last column.
|
|
// So that we can use origin column offset to get value from row.
|
|
colInfo.Offset = len(cols)
|
|
// Append the column info to the end of the tblInfo.Columns.
|
|
// It will be reordered to the right offset in "Columns" when its state is changed to public.
|
|
tblInfo.Columns = append(cols, colInfo)
|
|
return colInfo
|
|
}
|
|
|
|
func checkAddColumn(t *meta.Mutator, job *model.Job) (tblInfo *model.TableInfo, columnInfo *model.ColumnInfo, col *model.ColumnInfo,
|
|
pos *ast.ColumnPosition, _ bool /* ifNotExists */, err error) {
|
|
schemaID := job.SchemaID
|
|
tblInfo, err = GetTableInfoAndCancelFaultJob(t, job, schemaID)
|
|
if err != nil {
|
|
return nil, nil, nil, nil, false, errors.Trace(err)
|
|
}
|
|
|
|
args, err := model.GetTableColumnArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return nil, nil, nil, nil, false, errors.Trace(err)
|
|
}
|
|
col, pos, ifNotExists := args.Col, args.Pos, args.IgnoreExistenceErr
|
|
|
|
columnInfo = model.FindColumnInfo(tblInfo.Columns, col.Name.L)
|
|
if columnInfo != nil {
|
|
if columnInfo.State == model.StatePublic {
|
|
// We already have a column with the same column name.
|
|
job.State = model.JobStateCancelled
|
|
return nil, nil, nil, nil, ifNotExists, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
|
|
}
|
|
}
|
|
|
|
err = CheckAfterPositionExists(tblInfo, pos)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return nil, nil, nil, nil, false, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
|
|
}
|
|
|
|
return tblInfo, columnInfo, col, pos, false, nil
|
|
}
|
|
|
|
// CheckAfterPositionExists makes sure the column specified in AFTER clause is exists.
|
|
// For example, ALTER TABLE t ADD COLUMN c3 INT AFTER c1.
|
|
func CheckAfterPositionExists(tblInfo *model.TableInfo, pos *ast.ColumnPosition) error {
|
|
if pos != nil && pos.Tp == ast.ColumnPositionAfter {
|
|
c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L)
|
|
if c == nil {
|
|
return infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
|
|
for _, indexInfo := range indexInfos {
|
|
indexInfo.State = state
|
|
}
|
|
}
|
|
|
|
func checkDropColumnForStatePublic(colInfo *model.ColumnInfo) (err error) {
|
|
// When the dropping column has not-null flag and it hasn't the default value, we can backfill the column value like "add column".
|
|
// NOTE: If the state of StateWriteOnly can be rollbacked, we'd better reconsider the original default value.
|
|
// And we need consider the column without not-null flag.
|
|
if colInfo.GetOriginDefaultValue() == nil && mysql.HasNotNullFlag(colInfo.GetFlag()) {
|
|
// If the column is timestamp default current_timestamp, and DDL owner is new version TiDB that set column.Version to 1,
|
|
// then old TiDB update record in the column write only stage will uses the wrong default value of the dropping column.
|
|
// Because new version of the column default value is UTC time, but old version TiDB will think the default value is the time in system timezone.
|
|
// But currently will be ok, because we can't cancel the drop column job when the job is running,
|
|
// so the column will be dropped succeed and client will never see the wrong default value of the dropped column.
|
|
// More info about this problem, see PR#9115.
|
|
originDefVal, err := generateOriginDefaultValue(colInfo, nil, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return colInfo.SetOriginDefaultValue(originDefVal)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func onDropColumn(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
tblInfo, colInfo, idxInfos, ifExists, err := checkDropColumn(jobCtx, job)
|
|
if err != nil {
|
|
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
|
|
// Convert the "not exists" error to a warning.
|
|
job.Warning = toTError(err)
|
|
job.State = model.JobStateDone
|
|
return ver, nil
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
if job.MultiSchemaInfo != nil && !job.IsRollingback() && job.MultiSchemaInfo.Revertible {
|
|
job.MarkNonRevertible()
|
|
job.SchemaState = colInfo.State
|
|
// Store the mark and enter the next DDL handling loop.
|
|
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, false)
|
|
}
|
|
|
|
originalState := colInfo.State
|
|
switch colInfo.State {
|
|
case model.StatePublic:
|
|
// public -> write only
|
|
colInfo.State = model.StateWriteOnly
|
|
setIndicesState(idxInfos, model.StateWriteOnly)
|
|
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
|
|
err = checkDropColumnForStatePublic(colInfo)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, originalState != colInfo.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
case model.StateWriteOnly:
|
|
// write only -> delete only
|
|
failpoint.InjectCall("onDropColumnStateWriteOnly")
|
|
colInfo.State = model.StateDeleteOnly
|
|
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
|
|
if len(idxInfos) > 0 {
|
|
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
|
|
for _, idx := range tblInfo.Indices {
|
|
if !indexInfoContains(idx.ID, idxInfos) {
|
|
newIndices = append(newIndices, idx)
|
|
}
|
|
}
|
|
tblInfo.Indices = newIndices
|
|
}
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
dropColumnArgs, err := model.GetTableColumnArgs(job)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
dropColumnArgs.IndexIDs = indexInfosToIDList(idxInfos)
|
|
job.FillArgs(dropColumnArgs)
|
|
case model.StateDeleteOnly:
|
|
// delete only -> reorganization
|
|
colInfo.State = model.StateDeleteReorganization
|
|
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
case model.StateDeleteReorganization:
|
|
// reorganization -> absent
|
|
// All reorganization jobs are done, drop this column.
|
|
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
|
|
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
|
|
colInfo.State = model.StateNone
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != colInfo.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// Finish this job.
|
|
if job.IsRollingback() {
|
|
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
|
|
} else {
|
|
// We should set related index IDs for job
|
|
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
|
|
dropColumnArgs, err := model.GetTableColumnArgs(job)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
dropColumnArgs.PartitionIDs = getPartitionIDs(tblInfo)
|
|
job.FillArgs(dropColumnArgs)
|
|
}
|
|
default:
|
|
return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State))
|
|
}
|
|
job.SchemaState = colInfo.State
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
func checkDropColumn(jobCtx *jobContext, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
|
|
schemaID := job.SchemaID
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, schemaID)
|
|
if err != nil {
|
|
return nil, nil, nil, false, errors.Trace(err)
|
|
}
|
|
|
|
args, err := model.GetTableColumnArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return nil, nil, nil, false, errors.Trace(err)
|
|
}
|
|
|
|
colName, ifExists := args.Col.Name, args.IgnoreExistenceErr
|
|
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
|
|
if colInfo == nil || colInfo.Hidden {
|
|
job.State = model.JobStateCancelled
|
|
return nil, nil, nil, ifExists, dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
|
|
}
|
|
if err = isDroppableColumn(tblInfo, colName); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return nil, nil, nil, false, errors.Trace(err)
|
|
}
|
|
if err = checkDropColumnWithForeignKeyConstraintInOwner(jobCtx.infoCache, job, tblInfo, colName.L); err != nil {
|
|
return nil, nil, nil, false, errors.Trace(err)
|
|
}
|
|
if err = checkDropColumnWithTTLConfig(tblInfo, colName.L); err != nil {
|
|
return nil, nil, nil, false, errors.Trace(err)
|
|
}
|
|
idxInfos := listIndicesWithColumn(colName.L, tblInfo.Indices)
|
|
return tblInfo, colInfo, idxInfos, false, nil
|
|
}
|
|
|
|
func isDroppableColumn(tblInfo *model.TableInfo, colName ast.CIStr) error {
|
|
if ok, dep, isHidden := hasDependentByGeneratedColumn(tblInfo, colName); ok {
|
|
if isHidden {
|
|
return dbterror.ErrDependentByFunctionalIndex.GenWithStackByArgs(dep)
|
|
}
|
|
return dbterror.ErrDependentByGeneratedColumn.GenWithStackByArgs(dep)
|
|
}
|
|
|
|
if len(tblInfo.Columns) == 1 {
|
|
return dbterror.ErrCantRemoveAllFields.GenWithStack("can't drop only column %s in table %s",
|
|
colName, tblInfo.Name)
|
|
}
|
|
// We only support dropping column with single-value none Primary Key index covered now.
|
|
err := isColumnCanDropWithIndex(colName.L, tblInfo.Indices)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = IsColumnDroppableWithCheckConstraint(colName, tblInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = checkColumnReferencedByPartialCondition(tblInfo, colName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func onSetDefaultValue(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetSetDefaultValueArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
newCol := args.Col
|
|
return updateColumnDefaultValue(jobCtx, job, newCol, &newCol.Name)
|
|
}
|
|
|
|
// UpdateIndexCol sets index column name and offset from changing ColumnInfo.
|
|
func UpdateIndexCol(idxCol *model.IndexColumn, changingCol *model.ColumnInfo) {
|
|
idxCol.Name = changingCol.Name
|
|
idxCol.Offset = changingCol.Offset
|
|
canPrefix := types.IsTypePrefixable(changingCol.GetType())
|
|
if !canPrefix || (changingCol.GetFlen() <= idxCol.Length) {
|
|
idxCol.Length = types.UnspecifiedLength
|
|
}
|
|
}
|
|
|
|
func removeChangingColAndIdxs(tblInfo *model.TableInfo, changingColID int64) {
|
|
restIdx := tblInfo.Indices[:0]
|
|
for _, idx := range tblInfo.Indices {
|
|
if !idx.HasColumnInIndexColumns(tblInfo, changingColID) {
|
|
restIdx = append(restIdx, idx)
|
|
}
|
|
}
|
|
tblInfo.Indices = restIdx
|
|
|
|
restCols := tblInfo.Columns[:0]
|
|
for _, c := range tblInfo.Columns {
|
|
if c.ID != changingColID {
|
|
restCols = append(restCols, c)
|
|
}
|
|
}
|
|
tblInfo.Columns = restCols
|
|
}
|
|
|
|
func removeOldIndexes(tblInfo *model.TableInfo, changingIdxs []*model.IndexInfo) {
|
|
// Remove the changing indexes.
|
|
for i, idx := range tblInfo.Indices {
|
|
for _, cIdx := range changingIdxs {
|
|
if cIdx.ID == idx.ID {
|
|
tblInfo.Indices[i] = nil
|
|
break
|
|
}
|
|
}
|
|
}
|
|
tmp := tblInfo.Indices[:0]
|
|
for _, idx := range tblInfo.Indices {
|
|
if idx != nil {
|
|
tmp = append(tmp, idx)
|
|
}
|
|
}
|
|
tblInfo.Indices = tmp
|
|
}
|
|
|
|
// updateNewIdxColsNameOffset updates the name&offset of the index column.
|
|
func updateNewIdxColsNameOffset(changingIdxs []*model.IndexInfo,
|
|
oldName ast.CIStr, changingCol *model.ColumnInfo) {
|
|
for _, idx := range changingIdxs {
|
|
for _, col := range idx.Columns {
|
|
if col.Name.L == oldName.L {
|
|
UpdateIndexCol(col, changingCol)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateModifyingCols(oldCol, changingCol *model.ColumnInfo) {
|
|
changingCol.ChangeStateInfo = nil
|
|
// After changing the column, the column's type is change, so it needs to set OriginDefaultValue back
|
|
// so that there is no error in getting the default value from OriginDefaultValue.
|
|
// Besides, nil data that was not backfilled in the "add column" is backfilled after the column is changed.
|
|
// So it can set OriginDefaultValue to nil.
|
|
changingCol.OriginDefaultValue = nil
|
|
oldCol.ChangeStateInfo = &model.ChangeStateInfo{DependencyColumnOffset: changingCol.Offset}
|
|
}
|
|
|
|
func moveChangingColumnToDest(tblInfo *model.TableInfo, oldCol, changingCol *model.ColumnInfo, pos *ast.ColumnPosition) {
|
|
// Swap the old column with new column position.
|
|
oldOffset := oldCol.Offset
|
|
changingOffset := changingCol.Offset
|
|
tblInfo.MoveColumnInfo(oldOffset, changingOffset)
|
|
tblInfo.MoveColumnInfo(changingCol.Offset, oldOffset)
|
|
// Move the new column to a correct offset.
|
|
// The validation of the position is done in `validatePosition`.
|
|
destOffset, err := LocateOffsetToMove(changingCol.Offset, pos, tblInfo)
|
|
intest.AssertNoError(err)
|
|
tblInfo.MoveColumnInfo(changingCol.Offset, destOffset)
|
|
}
|
|
|
|
// moveOldColumnToBack is used to make sure the columns in TableInfo
|
|
// are in correct order after the old column is changed to non-public state.
|
|
func moveOldColumnToBack(tblInfo *model.TableInfo, oldCol *model.ColumnInfo) {
|
|
order := []model.SchemaState{
|
|
model.StatePublic,
|
|
model.StateWriteReorganization,
|
|
model.StateWriteOnly,
|
|
model.StateDeleteReorganization,
|
|
model.StateDeleteOnly,
|
|
model.StateNone,
|
|
}
|
|
for len(order) > 0 && order[len(order)-1] != oldCol.State {
|
|
order = order[:len(order)-1]
|
|
}
|
|
dest := len(tblInfo.Columns) - 1
|
|
for i, col := range tblInfo.Columns {
|
|
if col.ID == oldCol.ID {
|
|
continue
|
|
}
|
|
if !slices.Contains(order, col.State) {
|
|
dest = i
|
|
break
|
|
}
|
|
}
|
|
tblInfo.MoveColumnInfo(oldCol.Offset, dest)
|
|
}
|
|
|
|
// indexContainsOtherReorg checks if the index still contains other changing columns
|
|
func indexContainsOtherReorg(
|
|
tblInfo *model.TableInfo,
|
|
idx *model.IndexInfo,
|
|
currentChangingCol *model.ColumnInfo,
|
|
) bool {
|
|
for _, idxCol := range idx.Columns {
|
|
tblCol := tblInfo.Columns[idxCol.Offset]
|
|
if tblCol.ID == currentChangingCol.ID {
|
|
continue // ignore current changing column.
|
|
}
|
|
if idxCol.UseChangingType || tblCol.ChangeStateInfo != nil {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func moveIndexInfoToDest(tblInfo *model.TableInfo, changingCol *model.ColumnInfo,
|
|
oldIdxInfos, changingIdxInfos []*model.IndexInfo) {
|
|
for i, cIdx := range changingIdxInfos {
|
|
// For the index that still contains other changing column,
|
|
// we leave the swaping work to the last modify column job.
|
|
if !indexContainsOtherReorg(tblInfo, cIdx, changingCol) {
|
|
swapIndexInfoByID(tblInfo, oldIdxInfos[i].ID, changingIdxInfos[i].ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func swapIndexInfoByID(tblInfo *model.TableInfo, idxIDA, idxIDB int64) {
|
|
offsetA := 0
|
|
offsetB := 0
|
|
for i, idx := range tblInfo.Indices {
|
|
switch idx.ID {
|
|
case idxIDA:
|
|
offsetA = i
|
|
case idxIDB:
|
|
offsetB = i
|
|
}
|
|
}
|
|
tblInfo.Indices[offsetA], tblInfo.Indices[offsetB] = tblInfo.Indices[offsetB], tblInfo.Indices[offsetA]
|
|
}
|
|
|
|
func buildRelatedIndexInfos(tblInfo *model.TableInfo, colID int64) []*model.IndexInfo {
|
|
var indexInfos []*model.IndexInfo
|
|
for _, idx := range tblInfo.Indices {
|
|
if idx.HasColumnInIndexColumns(tblInfo, colID) {
|
|
indexInfos = append(indexInfos, idx)
|
|
}
|
|
}
|
|
// In multi-schema change, if more than one column of an index is being modified,
|
|
// the added hidden index will be changed(including index ID) in place by the latter modified column subjob.
|
|
// We need to sort by index ID to ensure the order of changing indexes equals to the old indexes.
|
|
slices.SortFunc(indexInfos, func(idxA, idxB *model.IndexInfo) int {
|
|
if idxA.ID < idxB.ID {
|
|
return -1
|
|
}
|
|
if idxA.ID > idxB.ID {
|
|
return 1
|
|
}
|
|
return 0
|
|
})
|
|
return indexInfos
|
|
}
|
|
|
|
func getIngestTempIndexIDs(job *model.Job, idxInfos []*model.IndexInfo) []int64 {
|
|
ids := make([]int64, 0, len(idxInfos))
|
|
if !job.ReorgMeta.ReorgTp.NeedMergeProcess() {
|
|
return ids
|
|
}
|
|
for _, idx := range idxInfos {
|
|
ids = append(ids, tablecodec.TempIndexPrefix|idx.ID)
|
|
}
|
|
return ids
|
|
}
|
|
|
|
func getRelatedIndexIDs(tblInfo *model.TableInfo, colID int64, needTempIndex bool) []int64 {
|
|
var idxIDs []int64
|
|
for _, idx := range tblInfo.Indices {
|
|
if idx.HasColumnInIndexColumns(tblInfo, colID) {
|
|
idxIDs = append(idxIDs, idx.ID)
|
|
if needTempIndex {
|
|
idxIDs = append(idxIDs, tablecodec.TempIndexPrefix|idx.ID)
|
|
}
|
|
}
|
|
}
|
|
return idxIDs
|
|
}
|
|
|
|
// LocateOffsetToMove returns the offset of the column to move.
|
|
func LocateOffsetToMove(currentOffset int, pos *ast.ColumnPosition, tblInfo *model.TableInfo) (destOffset int, err error) {
|
|
if pos == nil {
|
|
return currentOffset, nil
|
|
}
|
|
// Get column offset.
|
|
switch pos.Tp {
|
|
case ast.ColumnPositionFirst:
|
|
return 0, nil
|
|
case ast.ColumnPositionAfter:
|
|
c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L)
|
|
if c == nil || c.State != model.StatePublic {
|
|
return 0, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
|
|
}
|
|
if currentOffset <= c.Offset {
|
|
return c.Offset, nil
|
|
}
|
|
return c.Offset + 1, nil
|
|
case ast.ColumnPositionNone:
|
|
return currentOffset, nil
|
|
default:
|
|
return 0, errors.Errorf("unknown column position type")
|
|
}
|
|
}
|
|
|
|
// BuildElements is exported for testing.
|
|
func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) []*meta.Element {
|
|
elements := make([]*meta.Element, 0, len(changingIdxs)+1)
|
|
elements = append(elements, &meta.Element{ID: changingCol.ID, TypeKey: meta.ColumnElementKey})
|
|
for _, idx := range changingIdxs {
|
|
elements = append(elements, &meta.Element{ID: idx.ID, TypeKey: meta.IndexElementKey})
|
|
}
|
|
return elements
|
|
}
|
|
|
|
func (w *worker) updatePhysicalTableRow(
|
|
ctx context.Context,
|
|
t table.Table,
|
|
reorgInfo *reorgInfo,
|
|
) error {
|
|
logutil.DDLLogger().Info("start to update table row", zap.Stringer("job", reorgInfo.Job), zap.Stringer("reorgInfo", reorgInfo))
|
|
if tbl, ok := t.(table.PartitionedTable); ok {
|
|
done := false
|
|
for !done {
|
|
p := tbl.GetPartition(reorgInfo.PhysicalTableID)
|
|
if p == nil {
|
|
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
|
|
}
|
|
workType := typeReorgPartitionWorker
|
|
switch reorgInfo.Job.Type {
|
|
case model.ActionReorganizePartition,
|
|
model.ActionRemovePartitioning,
|
|
model.ActionAlterTablePartitioning:
|
|
// Expected
|
|
case model.ActionModifyColumn:
|
|
workType = typeUpdateColumnWorker
|
|
default:
|
|
return dbterror.ErrCancelledDDLJob.GenWithStack("Unsupported job Type.")
|
|
}
|
|
err := w.writePhysicalTableRecord(ctx, w.sessPool, p, workType, reorgInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
done, err = updateReorgInfo(w.sessPool, tbl, reorgInfo)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
if tbl, ok := t.(table.PhysicalTable); ok {
|
|
return w.writePhysicalTableRecord(ctx, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo)
|
|
}
|
|
return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID)
|
|
}
|
|
|
|
// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
|
|
var TestReorgGoroutineRunning = make(chan struct{})
|
|
|
|
// modifyTableColumn modify the table column data for all rows.
|
|
func (w *worker) modifyTableColumn(
|
|
jobCtx *jobContext,
|
|
t table.Table,
|
|
reorgInfo *reorgInfo,
|
|
) error {
|
|
ctx := jobCtx.stepCtx
|
|
failpoint.Inject("mockInfiniteReorgLogic", func() {
|
|
TestReorgGoroutineRunning <- struct{}{}
|
|
<-ctx.Done()
|
|
// Job is cancelled. So it can't be done.
|
|
failpoint.Return(dbterror.ErrCancelledDDLJob)
|
|
})
|
|
if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
|
|
err := w.updatePhysicalTableRow(ctx, t, reorgInfo)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type updateColumnWorker struct {
|
|
*backfillCtx
|
|
oldColInfo *model.ColumnInfo
|
|
newColInfo *model.ColumnInfo
|
|
|
|
// The following attributes are used to reduce memory allocation.
|
|
rowRecords []*rowRecord
|
|
rowDecoder *decoder.RowDecoder
|
|
|
|
rowMap map[int64]types.Datum
|
|
|
|
checksumNeeded bool
|
|
}
|
|
|
|
func getOldAndNewColumnsForUpdateColumn(t table.Table, currElementID int64) (oldCol, newCol *model.ColumnInfo) {
|
|
for _, col := range t.WritableCols() {
|
|
if col.ID == currElementID {
|
|
changingColumn := table.FindCol(t.Cols(), col.GetChangingOriginName())
|
|
if changingColumn != nil {
|
|
newCol = col.ColumnInfo
|
|
oldCol = changingColumn.ColumnInfo
|
|
return
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func newUpdateColumnWorker(id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *ReorgContext) (*updateColumnWorker, error) {
|
|
bCtx, err := newBackfillCtx(id, reorgInfo, reorgInfo.SchemaName, t, jc, metrics.LblUpdateColRate, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
|
|
logutil.DDLLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
|
|
zap.Stringer("reorgInfo", reorgInfo))
|
|
return nil, nil
|
|
}
|
|
oldCol, newCol := getOldAndNewColumnsForUpdateColumn(t, reorgInfo.currElement.ID)
|
|
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
|
|
failpoint.Inject("forceRowLevelChecksumOnUpdateColumnBackfill", func() {
|
|
orig := vardef.EnableRowLevelChecksum.Load()
|
|
defer vardef.EnableRowLevelChecksum.Store(orig)
|
|
vardef.EnableRowLevelChecksum.Store(true)
|
|
})
|
|
return &updateColumnWorker{
|
|
backfillCtx: bCtx,
|
|
oldColInfo: oldCol,
|
|
newColInfo: newCol,
|
|
rowDecoder: rowDecoder,
|
|
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
|
|
checksumNeeded: vardef.EnableRowLevelChecksum.Load(),
|
|
}, nil
|
|
}
|
|
|
|
func (w *updateColumnWorker) AddMetricInfo(cnt float64) {
|
|
w.metricCounter.Add(cnt)
|
|
}
|
|
|
|
func (*updateColumnWorker) String() string {
|
|
return typeUpdateColumnWorker.String()
|
|
}
|
|
|
|
func (w *updateColumnWorker) GetCtx() *backfillCtx {
|
|
return w.backfillCtx
|
|
}
|
|
|
|
type rowRecord struct {
|
|
key []byte // It's used to lock a record. Record it to reduce the encoding time.
|
|
vals []byte // It's the record.
|
|
warning *terror.Error // It's used to record the cast warning of a record.
|
|
}
|
|
|
|
// getNextHandleKey gets next handle of entry that we are going to process.
|
|
func getNextHandleKey(taskRange reorgBackfillTask,
|
|
taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) {
|
|
if !taskDone {
|
|
// The task is not done. So we need to pick the last processed entry's handle and add one.
|
|
return lastAccessedHandle.Next()
|
|
}
|
|
|
|
return taskRange.endKey.Next()
|
|
}
|
|
|
|
func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*rowRecord, kv.Key, bool, error) {
|
|
w.rowRecords = w.rowRecords[:0]
|
|
startTime := time.Now()
|
|
|
|
// taskDone means that the added handle is out of taskRange.endHandle.
|
|
taskDone := false
|
|
var lastAccessedHandle kv.Key
|
|
oprStartTime := startTime
|
|
err := iterateSnapshotKeys(w.jobContext, w.ddlCtx.store, taskRange.priority, taskRange.physicalTable.RecordPrefix(),
|
|
txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
|
|
oprEndTime := time.Now()
|
|
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in updateColumnWorker fetchRowColVals", 0)
|
|
oprStartTime = oprEndTime
|
|
|
|
taskDone = recordKey.Cmp(taskRange.endKey) >= 0
|
|
|
|
if taskDone || len(w.rowRecords) >= w.batchCnt {
|
|
return false, nil
|
|
}
|
|
|
|
if err1 := w.getRowRecord(handle, recordKey, rawRow); err1 != nil {
|
|
return false, errors.Trace(err1)
|
|
}
|
|
lastAccessedHandle = recordKey
|
|
if recordKey.Cmp(taskRange.endKey) == 0 {
|
|
taskDone = true
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
})
|
|
|
|
if len(w.rowRecords) == 0 {
|
|
taskDone = true
|
|
}
|
|
|
|
logutil.DDLLogger().Debug("txn fetches handle info",
|
|
zap.Uint64("txnStartTS", txn.StartTS()),
|
|
zap.String("taskRange", taskRange.String()),
|
|
zap.Duration("takeTime", time.Since(startTime)))
|
|
return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
|
|
}
|
|
|
|
var (
|
|
// testCheckReorgTimeout is used to mock timeout when reorg data.
|
|
testCheckReorgTimeout = int32(0)
|
|
)
|
|
|
|
func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, rawRow []byte) error {
|
|
sysTZ := w.loc
|
|
_, err := w.rowDecoder.DecodeTheExistedColumnMap(w.exprCtx, handle, rawRow, sysTZ, w.rowMap)
|
|
if err != nil {
|
|
return errors.Trace(dbterror.ErrCantDecodeRecord.GenWithStackByArgs("column", err))
|
|
}
|
|
|
|
if _, ok := w.rowMap[w.newColInfo.ID]; ok {
|
|
// The column is already added by update or insert statement, skip it.
|
|
w.cleanRowMap()
|
|
return nil
|
|
}
|
|
|
|
var recordWarning *terror.Error
|
|
// Since every updateColumnWorker handle their own work individually, we can cache warning in statement context when casting datum.
|
|
oldWarn := w.warnings.GetWarnings()
|
|
if oldWarn == nil {
|
|
oldWarn = []contextutil.SQLWarn{}
|
|
} else {
|
|
oldWarn = oldWarn[:0]
|
|
}
|
|
w.warnings.SetWarnings(oldWarn)
|
|
val := w.rowMap[w.oldColInfo.ID]
|
|
col := w.newColInfo
|
|
if val.Kind() == types.KindNull && col.FieldType.GetType() == mysql.TypeTimestamp && mysql.HasNotNullFlag(col.GetFlag()) {
|
|
if v, err := expression.GetTimeCurrentTimestamp(w.exprCtx.GetEvalCtx(), col.GetType(), col.GetDecimal()); err == nil {
|
|
// convert null value to timestamp should be substituted with current timestamp if NOT_NULL flag is set.
|
|
w.rowMap[w.oldColInfo.ID] = v
|
|
}
|
|
}
|
|
newColVal, err := table.CastColumnValue(w.exprCtx, w.rowMap[w.oldColInfo.ID], w.newColInfo, false, false)
|
|
if err != nil {
|
|
return w.reformatErrors(err)
|
|
}
|
|
warn := w.warnings.GetWarnings()
|
|
if len(warn) != 0 {
|
|
//nolint:forcetypeassert
|
|
recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error)
|
|
}
|
|
|
|
failpoint.Inject("MockReorgTimeoutInOneRegion", func(val failpoint.Value) {
|
|
//nolint:forcetypeassert
|
|
if val.(bool) {
|
|
if handle.IntValue() == 3000 && atomic.CompareAndSwapInt32(&testCheckReorgTimeout, 0, 1) {
|
|
failpoint.Return(errors.Trace(dbterror.ErrWaitReorgTimeout))
|
|
}
|
|
}
|
|
})
|
|
|
|
w.rowMap[w.newColInfo.ID] = newColVal
|
|
_, err = w.rowDecoder.EvalRemainedExprColumnMap(w.exprCtx, w.rowMap)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
newColumnIDs := make([]int64, 0, len(w.rowMap))
|
|
newRow := make([]types.Datum, 0, len(w.rowMap))
|
|
for colID, val := range w.rowMap {
|
|
newColumnIDs = append(newColumnIDs, colID)
|
|
newRow = append(newRow, val)
|
|
}
|
|
rd := w.tblCtx.GetRowEncodingConfig().RowEncoder
|
|
ec := w.exprCtx.GetEvalCtx().ErrCtx()
|
|
var checksum rowcodec.Checksum
|
|
if w.checksumNeeded {
|
|
checksum = rowcodec.RawChecksum{Handle: handle}
|
|
}
|
|
newRowVal, err := tablecodec.EncodeRow(sysTZ, newRow, newColumnIDs, nil, nil, checksum, rd)
|
|
err = ec.HandleError(err)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
w.rowRecords = append(w.rowRecords, &rowRecord{key: recordKey, vals: newRowVal, warning: recordWarning})
|
|
w.cleanRowMap()
|
|
return nil
|
|
}
|
|
|
|
// reformatErrors casted error because `convertTo` function couldn't package column name and datum value for some errors.
|
|
func (w *updateColumnWorker) reformatErrors(err error) error {
|
|
// Since row count is not precious in concurrent reorganization, here we substitute row count with datum value.
|
|
if types.ErrTruncated.Equal(err) || types.ErrDataTooLong.Equal(err) {
|
|
dStr := datumToStringNoErr(w.rowMap[w.oldColInfo.ID])
|
|
err = types.ErrTruncated.GenWithStack("Data truncated for column '%s', value is '%s'", w.oldColInfo.Name, dStr)
|
|
}
|
|
|
|
if types.ErrWarnDataOutOfRange.Equal(err) {
|
|
dStr := datumToStringNoErr(w.rowMap[w.oldColInfo.ID])
|
|
err = types.ErrWarnDataOutOfRange.GenWithStack("Out of range value for column '%s', the value is '%s'", w.oldColInfo.Name, dStr)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func datumToStringNoErr(d types.Datum) string {
|
|
if v, err := d.ToString(); err == nil {
|
|
return v
|
|
}
|
|
return fmt.Sprintf("%v", d.GetValue())
|
|
}
|
|
|
|
func (w *updateColumnWorker) cleanRowMap() {
|
|
for id := range w.rowMap {
|
|
delete(w.rowMap, id)
|
|
}
|
|
}
|
|
|
|
// BackfillData will backfill the table record in a transaction. A lock corresponds to a rowKey if the value of rowKey is changed.
|
|
func (w *updateColumnWorker) BackfillData(_ context.Context, handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
|
|
oprStartTime := time.Now()
|
|
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
|
|
errInTxn = kv.RunInNewTxn(ctx, w.ddlCtx.store, true, func(_ context.Context, txn kv.Transaction) error {
|
|
taskCtx.addedCount = 0
|
|
taskCtx.scanCount = 0
|
|
updateTxnEntrySizeLimitIfNeeded(txn)
|
|
|
|
// Because TiCDC do not want this kind of change,
|
|
// so we set the lossy DDL reorg txn source to 1 to
|
|
// avoid TiCDC to replicate this kind of change.
|
|
var txnSource uint64
|
|
if val := txn.GetOption(kv.TxnSource); val != nil {
|
|
txnSource, _ = val.(uint64)
|
|
}
|
|
err := kv.SetLossyDDLReorgSource(&txnSource, kv.LossyDDLColumnReorgSource)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
txn.SetOption(kv.TxnSource, txnSource)
|
|
|
|
txn.SetOption(kv.Priority, handleRange.priority)
|
|
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil {
|
|
txn.SetOption(kv.ResourceGroupTagger, tagger)
|
|
}
|
|
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)
|
|
|
|
rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
taskCtx.nextKey = nextKey
|
|
taskCtx.done = taskDone
|
|
|
|
// Optimize for few warnings!
|
|
warningsMap := make(map[errors.ErrorID]*terror.Error, 2)
|
|
warningsCountMap := make(map[errors.ErrorID]int64, 2)
|
|
for _, rowRecord := range rowRecords {
|
|
taskCtx.scanCount++
|
|
|
|
err = txn.Set(rowRecord.key, rowRecord.vals)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
taskCtx.addedCount++
|
|
if rowRecord.warning != nil {
|
|
if _, ok := warningsCountMap[rowRecord.warning.ID()]; ok {
|
|
warningsCountMap[rowRecord.warning.ID()]++
|
|
} else {
|
|
warningsCountMap[rowRecord.warning.ID()] = 1
|
|
warningsMap[rowRecord.warning.ID()] = rowRecord.warning
|
|
}
|
|
}
|
|
}
|
|
|
|
// Collect the warnings.
|
|
taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap
|
|
|
|
return nil
|
|
})
|
|
logSlowOperations(time.Since(oprStartTime), "BackfillData", 3000)
|
|
failpoint.InjectCall("mockUpdateColumnWorkerStuck")
|
|
return
|
|
}
|
|
|
|
func validatePosition(tblInfo *model.TableInfo, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) error {
|
|
if pos != nil && pos.RelativeColumn != nil && oldCol.Name.L == pos.RelativeColumn.Name.L {
|
|
// For cases like `modify column b after b`, it should report this error.
|
|
return errors.Trace(infoschema.ErrColumnNotExists.GenWithStackByArgs(oldCol.Name, tblInfo.Name))
|
|
}
|
|
_, err := LocateOffsetToMove(oldCol.Offset, pos, tblInfo)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func markOldIndexesRemoving(oldIdxs []*model.IndexInfo, changingIdxs []*model.IndexInfo) {
|
|
for i := range oldIdxs {
|
|
oldIdxName := oldIdxs[i].Name.O
|
|
publicName := ast.NewCIStr(oldIdxs[i].GetRemovingOriginName())
|
|
removingName := ast.NewCIStr(model.GenRemovingObjName(oldIdxName))
|
|
|
|
changingIdxs[i].Name = publicName
|
|
oldIdxs[i].Name = removingName
|
|
}
|
|
}
|
|
|
|
// markOldObjectRemoving changes the names of the old and new indexes/columns to mark them as removing and public respectively.
|
|
func markOldObjectRemoving(oldCol, changingCol *model.ColumnInfo, oldIdxs, changingIdxs []*model.IndexInfo, newColName ast.CIStr) {
|
|
if oldCol.ID != changingCol.ID {
|
|
publicName := newColName
|
|
removingName := ast.NewCIStr(model.GenRemovingObjName(oldCol.Name.O))
|
|
renameColumnTo(oldCol, oldIdxs, removingName)
|
|
renameColumnTo(changingCol, changingIdxs, publicName)
|
|
}
|
|
|
|
markOldIndexesRemoving(oldIdxs, changingIdxs)
|
|
}
|
|
|
|
func removeOldObjects(tblInfo *model.TableInfo, oldCol *model.ColumnInfo, oldIdxs []*model.IndexInfo) []int64 {
|
|
tblInfo.MoveColumnInfo(oldCol.Offset, len(tblInfo.Columns)-1)
|
|
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
|
|
var removedIdxIDs []int64
|
|
if len(oldIdxs) > 0 {
|
|
removedIdxIDs = make([]int64, 0, len(oldIdxs))
|
|
for _, idx := range oldIdxs {
|
|
removedIdxIDs = append(removedIdxIDs, idx.ID)
|
|
}
|
|
removeOldIndexes(tblInfo, oldIdxs)
|
|
}
|
|
return removedIdxIDs
|
|
}
|
|
|
|
func renameColumnTo(col *model.ColumnInfo, idxInfos []*model.IndexInfo, newName ast.CIStr) {
|
|
for _, idx := range idxInfos {
|
|
for _, idxCol := range idx.Columns {
|
|
if idxCol.Name.L == col.Name.L {
|
|
idxCol.Name = newName
|
|
}
|
|
}
|
|
}
|
|
col.Name = newName
|
|
}
|
|
|
|
func updateObjectState(col *model.ColumnInfo, idxs []*model.IndexInfo, state model.SchemaState) {
|
|
if col != nil {
|
|
col.State = state
|
|
}
|
|
for _, idx := range idxs {
|
|
idx.State = state
|
|
}
|
|
}
|
|
|
|
func checkAndApplyAutoRandomBits(jobCtx *jobContext, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
|
|
oldCol *model.ColumnInfo, newCol *model.ColumnInfo, newAutoRandBits uint64) error {
|
|
if newAutoRandBits == 0 {
|
|
return nil
|
|
}
|
|
metaMut := jobCtx.metaMut
|
|
idAcc := metaMut.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID)
|
|
err := checkNewAutoRandomBits(idAcc, oldCol, newCol, newAutoRandBits, tblInfo.AutoRandomRangeBits, tblInfo.SepAutoInc())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return applyNewAutoRandomBits(jobCtx, dbInfo, tblInfo, oldCol, newAutoRandBits)
|
|
}
|
|
|
|
// checkNewAutoRandomBits checks whether the new auto_random bits number can cause overflow.
|
|
func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.ColumnInfo,
|
|
newCol *model.ColumnInfo, newShardBits, newRangeBits uint64, sepAutoInc bool) error {
|
|
shardFmt := autoid.NewShardIDFormat(&newCol.FieldType, newShardBits, newRangeBits)
|
|
|
|
idAcc := idAccessors.RandomID()
|
|
convertedFromAutoInc := mysql.HasAutoIncrementFlag(oldCol.GetFlag())
|
|
if convertedFromAutoInc {
|
|
if sepAutoInc {
|
|
idAcc = idAccessors.IncrementID(model.TableInfoVersion5)
|
|
} else {
|
|
idAcc = idAccessors.RowID()
|
|
}
|
|
}
|
|
// Generate a new auto ID first to prevent concurrent update in DML.
|
|
_, err := idAcc.Inc(1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
currentIncBitsVal, err := idAcc.Get()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Find the max number of available shard bits by
|
|
// counting leading zeros in current inc part of auto_random ID.
|
|
usedBits := uint64(64 - bits.LeadingZeros64(uint64(currentIncBitsVal)))
|
|
if usedBits > shardFmt.IncrementalBits {
|
|
overflowCnt := usedBits - shardFmt.IncrementalBits
|
|
errMsg := fmt.Sprintf(autoid.AutoRandomOverflowErrMsg, newShardBits-overflowCnt, newShardBits, oldCol.Name.O)
|
|
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *ddlCtx) getAutoIDRequirement() autoid.Requirement {
|
|
return &asAutoIDRequirement{
|
|
store: d.store,
|
|
autoidCli: d.autoidCli,
|
|
}
|
|
}
|
|
|
|
type asAutoIDRequirement struct {
|
|
store kv.Storage
|
|
autoidCli *autoid.ClientDiscover
|
|
}
|
|
|
|
var _ autoid.Requirement = &asAutoIDRequirement{}
|
|
|
|
func (r *asAutoIDRequirement) Store() kv.Storage {
|
|
return r.store
|
|
}
|
|
|
|
func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
|
|
return r.autoidCli
|
|
}
|
|
|
|
// applyNewAutoRandomBits set auto_random bits to TableInfo and
|
|
// migrate auto_increment ID to auto_random ID if possible.
|
|
func applyNewAutoRandomBits(jobCtx *jobContext, dbInfo *model.DBInfo,
|
|
tblInfo *model.TableInfo, oldCol *model.ColumnInfo, newAutoRandBits uint64) error {
|
|
tblInfo.AutoRandomBits = newAutoRandBits
|
|
needMigrateFromAutoIncToAutoRand := mysql.HasAutoIncrementFlag(oldCol.GetFlag())
|
|
if !needMigrateFromAutoIncToAutoRand {
|
|
return nil
|
|
}
|
|
autoRandAlloc := autoid.NewAllocatorsFromTblInfo(jobCtx.getAutoIDRequirement(), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType)
|
|
if autoRandAlloc == nil {
|
|
errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O)
|
|
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
|
|
}
|
|
idAcc := jobCtx.metaMut.GetAutoIDAccessors(dbInfo.ID, tblInfo.ID).RowID()
|
|
nextAutoIncID, err := idAcc.Get()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
err = autoRandAlloc.Rebase(context.Background(), nextAutoIncID, false)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if err := idAcc.Del(); err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkForNullValue ensure there are no null values of the column of this table.
|
|
// `isDataTruncated` indicates whether the new field and the old field type are the same, in order to be compatible with mysql.
|
|
func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTruncated bool, schema, table ast.CIStr, newCol *model.ColumnInfo, oldCols ...*model.ColumnInfo) error {
|
|
needCheckNullValue := false
|
|
for _, oldCol := range oldCols {
|
|
if oldCol.GetType() != mysql.TypeTimestamp && newCol.GetType() == mysql.TypeTimestamp {
|
|
// special case for convert null value of non-timestamp type to timestamp type, null value will be substituted with current timestamp.
|
|
continue
|
|
}
|
|
needCheckNullValue = true
|
|
}
|
|
if !needCheckNullValue {
|
|
return nil
|
|
}
|
|
var buf strings.Builder
|
|
buf.WriteString("select 1 from %n.%n where ")
|
|
paramsList := make([]any, 0, 2+len(oldCols))
|
|
paramsList = append(paramsList, schema.L, table.L)
|
|
for i, col := range oldCols {
|
|
if i == 0 {
|
|
buf.WriteString("%n is null")
|
|
paramsList = append(paramsList, col.Name.L)
|
|
} else {
|
|
buf.WriteString(" or %n is null")
|
|
paramsList = append(paramsList, col.Name.L)
|
|
}
|
|
}
|
|
buf.WriteString(" limit 1")
|
|
//nolint:forcetypeassert
|
|
rows, _, err := sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(ctx, nil, buf.String(), paramsList...)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
rowCount := len(rows)
|
|
if rowCount != 0 {
|
|
if isDataTruncated {
|
|
return dbterror.ErrWarnDataTruncated.GenWithStackByArgs(newCol.Name.L, rowCount)
|
|
}
|
|
return dbterror.ErrInvalidUseOfNull
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func updateColumnDefaultValue(jobCtx *jobContext, job *model.Job, newCol *model.ColumnInfo, oldColName *ast.CIStr) (ver int64, _ error) {
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
|
|
job.MarkNonRevertible()
|
|
// Store the mark and enter the next DDL handling loop.
|
|
return updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, false)
|
|
}
|
|
|
|
oldCol := model.FindColumnInfo(tblInfo.Columns, oldColName.L)
|
|
if oldCol == nil || oldCol.State != model.StatePublic {
|
|
job.State = model.JobStateCancelled
|
|
return ver, infoschema.ErrColumnNotExists.GenWithStackByArgs(newCol.Name, tblInfo.Name)
|
|
}
|
|
|
|
if hasDefaultValue, _, err := checkColumnDefaultValue(newReorgExprCtx(), table.ToColumn(oldCol.Clone()), newCol.DefaultValue); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
} else if !hasDefaultValue {
|
|
job.State = model.JobStateCancelled
|
|
return ver, dbterror.ErrInvalidDefaultValue.GenWithStackByArgs(newCol.Name)
|
|
}
|
|
|
|
// The newCol's offset may be the value of the old schema version, so we can't use newCol directly.
|
|
oldCol.DefaultValue = newCol.DefaultValue
|
|
oldCol.DefaultValueBit = newCol.DefaultValueBit
|
|
oldCol.DefaultIsExpr = newCol.DefaultIsExpr
|
|
if mysql.HasNoDefaultValueFlag(newCol.GetFlag()) {
|
|
oldCol.AddFlag(mysql.NoDefaultValueFlag)
|
|
} else {
|
|
oldCol.DelFlag(mysql.NoDefaultValueFlag)
|
|
err = checkDefaultValue(newReorgExprCtx(), table.ToColumn(oldCol), true)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, err
|
|
}
|
|
}
|
|
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func isColumnWithIndex(colName string, indices []*model.IndexInfo) bool {
|
|
for _, indexInfo := range indices {
|
|
for _, col := range indexInfo.Columns {
|
|
if col.Name.L == colName {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isColumnCanDropWithIndex(colName string, indices []*model.IndexInfo) error {
|
|
for _, indexInfo := range indices {
|
|
if indexInfo.Primary || len(indexInfo.Columns) > 1 || indexInfo.IsColumnarIndex() {
|
|
for _, col := range indexInfo.Columns {
|
|
if col.Name.L == colName {
|
|
errMsg := "with composite index covered or Primary Key covered now"
|
|
if indexInfo.IsColumnarIndex() {
|
|
errMsg = "with Columnar Index covered now"
|
|
}
|
|
return dbterror.ErrCantDropColWithIndex.GenWithStack("can't drop column %s "+errMsg, colName)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func listIndicesWithColumn(colName string, indices []*model.IndexInfo) []*model.IndexInfo {
|
|
ret := make([]*model.IndexInfo, 0)
|
|
for _, indexInfo := range indices {
|
|
if len(indexInfo.Columns) == 1 && colName == indexInfo.Columns[0].Name.L {
|
|
ret = append(ret, indexInfo)
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// GetColumnForeignKeyInfo returns the wanted foreign key info
|
|
func GetColumnForeignKeyInfo(colName string, fkInfos []*model.FKInfo) *model.FKInfo {
|
|
for _, fkInfo := range fkInfos {
|
|
for _, col := range fkInfo.Cols {
|
|
if col.L == colName {
|
|
return fkInfo
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AllocateColumnID allocates next column ID from TableInfo.
|
|
func AllocateColumnID(tblInfo *model.TableInfo) int64 {
|
|
tblInfo.MaxColumnID++
|
|
return tblInfo.MaxColumnID
|
|
}
|
|
|
|
func checkAddColumnTooManyColumns(colNum int) error {
|
|
if uint32(colNum) > atomic.LoadUint32(&config.GetGlobalConfig().TableColumnCountLimit) {
|
|
return dbterror.ErrTooManyFields
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// modifyColsFromNull2NotNull modifies the type definitions of 'null' to 'not null'.
|
|
// Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values.
|
|
func modifyColsFromNull2NotNull(
|
|
ctx context.Context,
|
|
w *worker,
|
|
dbInfo *model.DBInfo,
|
|
tblInfo *model.TableInfo,
|
|
cols []*model.ColumnInfo,
|
|
newCol *model.ColumnInfo,
|
|
isDataTruncated bool,
|
|
) error {
|
|
// Get sessionctx from context resource pool.
|
|
var sctx sessionctx.Context
|
|
sctx, err := w.sessPool.Get()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer w.sessPool.Put(sctx)
|
|
|
|
skipCheck := false
|
|
if !skipCheck {
|
|
// If there is a null value inserted, it cannot be modified and needs to be rollback.
|
|
err = checkForNullValue(ctx, sctx, isDataTruncated, dbInfo.Name, tblInfo.Name, newCol, cols...)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
// Prevent this field from inserting null values.
|
|
for _, col := range cols {
|
|
col.AddFlag(mysql.PreventNullInsertFlag)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func generateOriginDefaultValue(col *model.ColumnInfo, ctx sessionctx.Context, checkUnsafeFunc bool) (any, error) {
|
|
var err error
|
|
odValue := col.GetDefaultValue()
|
|
if odValue == nil && mysql.HasNotNullFlag(col.GetFlag()) ||
|
|
// It's for drop column and modify column.
|
|
(col.DefaultIsExpr && odValue != strings.ToUpper(ast.CurrentTimestamp) && ctx == nil) {
|
|
switch col.GetType() {
|
|
// Just use enum field's first element for OriginDefaultValue.
|
|
case mysql.TypeEnum:
|
|
defEnum, verr := types.ParseEnumValue(col.GetElems(), 1)
|
|
if verr != nil {
|
|
return nil, errors.Trace(verr)
|
|
}
|
|
defVal := types.NewCollateMysqlEnumDatum(defEnum, col.GetCollate())
|
|
return defVal.ToString()
|
|
default:
|
|
zeroVal := table.GetZeroValue(col)
|
|
odValue, err = zeroVal.ToString()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if odValue == strings.ToUpper(ast.CurrentTimestamp) {
|
|
var t time.Time
|
|
if ctx == nil {
|
|
t = time.Now()
|
|
} else {
|
|
t, _ = expression.GetStmtTimestamp(ctx.GetExprCtx().GetEvalCtx())
|
|
}
|
|
if col.GetType() == mysql.TypeTimestamp {
|
|
odValue = types.NewTime(types.FromGoTime(t.UTC()), col.GetType(), col.GetDecimal()).String()
|
|
} else if col.GetType() == mysql.TypeDatetime {
|
|
odValue = types.NewTime(types.FromGoTime(t), col.GetType(), col.GetDecimal()).String()
|
|
}
|
|
return odValue, nil
|
|
}
|
|
|
|
if col.DefaultIsExpr && ctx != nil {
|
|
valStr, ok := odValue.(string)
|
|
if !ok {
|
|
return nil, dbterror.ErrDefValGeneratedNamedFunctionIsNotAllowed.GenWithStackByArgs(col.Name.String())
|
|
}
|
|
oldValue := strings.ToLower(valStr)
|
|
// It's checked in getFuncCallDefaultValue.
|
|
if !strings.Contains(oldValue, fmt.Sprintf("%s(%s(),", ast.DateFormat, ast.Now)) &&
|
|
!strings.Contains(oldValue, ast.StrToDate) && checkUnsafeFunc {
|
|
return nil, errors.Trace(dbterror.ErrBinlogUnsafeSystemFunction)
|
|
}
|
|
|
|
defVal, err := table.GetColDefaultValue(ctx.GetExprCtx(), col)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
odValue, err = defVal.ToString()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
return odValue, nil
|
|
}
|
|
|
|
func indexInfoContains(idxID int64, idxInfos []*model.IndexInfo) bool {
|
|
for _, idxInfo := range idxInfos {
|
|
if idxID == idxInfo.ID {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func indexInfosToIDList(idxInfos []*model.IndexInfo) []int64 {
|
|
ids := make([]int64, 0, len(idxInfos))
|
|
for _, idxInfo := range idxInfos {
|
|
ids = append(ids, idxInfo.ID)
|
|
}
|
|
return ids
|
|
}
|
|
|
|
func getExpressionIndexOriginName(originalName ast.CIStr) string {
|
|
columnName := strings.TrimPrefix(originalName.O, expressionIndexPrefix+"_")
|
|
var pos int
|
|
if pos = strings.LastIndex(columnName, "_"); pos == -1 {
|
|
return columnName
|
|
}
|
|
return columnName[:pos]
|
|
}
|