Files
tidb/pkg/ddl/constraint.go

436 lines
15 KiB
Go

// Copyright 2023-2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"context"
"fmt"
"strings"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/dbterror"
)
func (w *worker) onAddCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
return rollingBackAddConstraint(jobCtx, job)
}
failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
})
dbInfo, tblInfo, constraintInfoInMeta, constraintInfoInJob, err := checkAddCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}
if constraintInfoInMeta == nil {
// It's first time to run add constraint job, so there is no constraint info in meta.
// Use the raw constraint info from job directly and modify table info here.
constraintInfoInJob.ID = allocateConstraintID(tblInfo)
// Reset constraint name according to real-time constraints name at this point.
constrNames := map[string]bool{}
for _, constr := range tblInfo.Constraints {
constrNames[constr.Name.L] = true
}
setNameForConstraintInfo(tblInfo.Name.L, constrNames, []*model.ConstraintInfo{constraintInfoInJob})
// Double check the constraint dependency.
existedColsMap := make(map[string]struct{})
cols := tblInfo.Columns
for _, v := range cols {
if v.State == model.StatePublic {
existedColsMap[v.Name.L] = struct{}{}
}
}
dependedCols := constraintInfoInJob.ConstraintCols
for _, k := range dependedCols {
if _, ok := existedColsMap[k.L]; !ok {
// The table constraint depended on a non-existed column.
return ver, dbterror.ErrTableCheckConstraintReferUnknown.GenWithStackByArgs(constraintInfoInJob.Name, k)
}
}
tblInfo.Constraints = append(tblInfo.Constraints, constraintInfoInJob)
constraintInfoInMeta = constraintInfoInJob
}
// If not enforced, add it directly.
if !constraintInfoInMeta.Enforced {
constraintInfoInMeta.State = model.StatePublic
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}
switch constraintInfoInMeta.State {
case model.StateNone:
job.SchemaState = model.StateWriteOnly
constraintInfoInMeta.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
job.SchemaState = model.StateWriteReorganization
constraintInfoInMeta.State = model.StateWriteReorganization
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteReorganization:
err = w.verifyRemainRecordsForCheckConstraint(jobCtx.stepCtx, dbInfo, tblInfo, constraintInfoInMeta)
if err != nil {
if dbterror.ErrCheckConstraintIsViolated.Equal(err) {
job.State = model.JobStateRollingback
}
return ver, errors.Trace(err)
}
constraintInfoInMeta.State = model.StatePublic
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("constraint", constraintInfoInMeta.State)
}
return ver, errors.Trace(err)
}
func checkAddCheckConstraint(t *meta.Mutator, job *model.Job) (dbInfo *model.DBInfo, tblInfo *model.TableInfo, _, _ *model.ConstraintInfo, err error) {
schemaID := job.SchemaID
dbInfo, err = t.GetDatabase(job.SchemaID)
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
}
tblInfo, err = GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
}
args, err := model.GetAddCheckConstraintArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, errors.Trace(err)
}
constraintInfo1 := args.Constraint
// do the double-check with constraint existence.
constraintInfo2 := tblInfo.FindConstraintInfoByName(constraintInfo1.Name.L)
if constraintInfo2 != nil {
if constraintInfo2.State == model.StatePublic {
// We already have a constraint with the same constraint name.
job.State = model.JobStateCancelled
return nil, nil, nil, nil, infoschema.ErrColumnExists.GenWithStackByArgs(constraintInfo1.Name)
}
// if not, that means constraint was in intermediate state.
}
err = checkConstraintNamesNotExists(t, schemaID, []*model.ConstraintInfo{constraintInfo1})
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, err
}
return dbInfo, tblInfo, constraintInfo2, constraintInfo1, nil
}
// onDropCheckConstraint can be called from two case:
// 1: rollback in add constraint.(in rollback function the job.args will be changed)
// 2: user drop constraint ddl.
func onDropCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
tblInfo, constraintInfo, err := checkDropCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}
switch constraintInfo.State {
case model.StatePublic:
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
// write only state constraint will still take effect to check the newly inserted data.
// So the dependent column shouldn't be dropped even in this intermediate state.
constraintInfo.State = model.StateNone
// remove the constraint from tableInfo.
for i, constr := range tblInfo.Constraints {
if constr.Name.L == constraintInfo.Name.L {
tblInfo.Constraints = append(tblInfo.Constraints[0:i], tblInfo.Constraints[i+1:]...)
}
}
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
default:
err = dbterror.ErrInvalidDDLJob.GenWithStackByArgs("constraint", tblInfo.State)
}
return ver, errors.Trace(err)
}
func checkDropCheckConstraint(t *meta.Mutator, job *model.Job) (*model.TableInfo, *model.ConstraintInfo, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, errors.Trace(err)
}
args, err := model.GetCheckConstraintArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, errors.Trace(err)
}
// double check with constraint existence.
constraintInfo := tblInfo.FindConstraintInfoByName(args.ConstraintName.L)
if constraintInfo == nil {
job.State = model.JobStateCancelled
return nil, nil, dbterror.ErrConstraintNotFound.GenWithStackByArgs(args.ConstraintName)
}
return tblInfo, constraintInfo, nil
}
func (w *worker) onAlterCheckConstraint(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
dbInfo, tblInfo, constraintInfo, enforced, err := checkAlterCheckConstraint(jobCtx.metaMut, job)
if err != nil {
return ver, errors.Trace(err)
}
if job.IsRollingback() {
return rollingBackAlterConstraint(jobCtx, job)
}
// Current State is desired.
if constraintInfo.State == model.StatePublic && constraintInfo.Enforced == enforced {
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return
}
// enforced will fetch table data and check the constraint.
if enforced {
switch constraintInfo.State {
case model.StatePublic:
job.SchemaState = model.StateWriteReorganization
constraintInfo.State = model.StateWriteReorganization
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteReorganization:
job.SchemaState = model.StateWriteOnly
constraintInfo.State = model.StateWriteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
err = w.verifyRemainRecordsForCheckConstraint(jobCtx.stepCtx, dbInfo, tblInfo, constraintInfo)
if err != nil {
if dbterror.ErrCheckConstraintIsViolated.Equal(err) {
job.State = model.JobStateRollingback
}
return ver, errors.Trace(err)
}
constraintInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}
} else {
constraintInfo.Enforced = enforced
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
// update version and tableInfo error will cause retry.
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
}
return ver, err
}
func checkAlterCheckConstraint(t *meta.Mutator, job *model.Job) (*model.DBInfo, *model.TableInfo, *model.ConstraintInfo, bool, error) {
schemaID := job.SchemaID
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return nil, nil, nil, false, errors.Trace(err)
}
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, false, errors.Trace(err)
}
args, err := model.GetCheckConstraintArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, false, errors.Trace(err)
}
// do the double check with constraint existence.
constraintInfo := tblInfo.FindConstraintInfoByName(args.ConstraintName.L)
if constraintInfo == nil {
job.State = model.JobStateCancelled
return nil, nil, nil, false, dbterror.ErrConstraintNotFound.GenWithStackByArgs(args.ConstraintName)
}
return dbInfo, tblInfo, constraintInfo, args.Enforced, nil
}
func allocateConstraintID(tblInfo *model.TableInfo) int64 {
tblInfo.MaxConstraintID++
return tblInfo.MaxConstraintID
}
func buildConstraintInfo(tblInfo *model.TableInfo, dependedCols []ast.CIStr, constr *ast.Constraint, state model.SchemaState) (*model.ConstraintInfo, error) {
constraintName := ast.NewCIStr(constr.Name)
if err := checkTooLongConstraint(constraintName); err != nil {
return nil, errors.Trace(err)
}
// Restore check constraint expression to string.
var sb strings.Builder
restoreFlags := format.RestoreStringSingleQuotes | format.RestoreKeyWordLowercase | format.RestoreNameBackQuotes |
format.RestoreSpacesAroundBinaryOperation | format.RestoreWithoutSchemaName | format.RestoreWithoutTableName
restoreCtx := format.NewRestoreCtx(restoreFlags, &sb)
sb.Reset()
err := constr.Expr.Restore(restoreCtx)
if err != nil {
return nil, errors.Trace(err)
}
// Create constraint info.
constraintInfo := &model.ConstraintInfo{
Name: constraintName,
Table: tblInfo.Name,
ConstraintCols: dependedCols,
ExprString: sb.String(),
Enforced: constr.Enforced,
InColumn: constr.InColumn,
State: state,
}
return constraintInfo, nil
}
func checkTooLongConstraint(constr ast.CIStr) error {
if len(constr.L) > mysql.MaxConstraintIdentifierLen {
return dbterror.ErrTooLongIdent.GenWithStackByArgs(constr)
}
return nil
}
// findDependentColsInExpr returns a set of string, which indicates
// the names of the columns that are dependent by exprNode.
func findDependentColsInExpr(expr ast.ExprNode) map[string]struct{} {
colNames := FindColumnNamesInExpr(expr)
colsMap := make(map[string]struct{}, len(colNames))
for _, depCol := range colNames {
colsMap[depCol.Name.L] = struct{}{}
}
return colsMap
}
func (w *worker) verifyRemainRecordsForCheckConstraint(
ctx context.Context,
dbInfo *model.DBInfo,
tableInfo *model.TableInfo,
constr *model.ConstraintInfo,
) error {
// Inject a fail-point to skip the remaining records check.
failpoint.Inject("mockVerifyRemainDataSuccess", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil)
}
})
// Get sessionctx from ddl context resource pool in ddl worker.
var sctx sessionctx.Context
sctx, err := w.sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.Put(sctx)
// If there is any row can't pass the check expression, the add constraint action will error.
// It's no need to construct expression node out and pull the chunk rows through it. Here we
// can let the check expression restored string as the filter in where clause directly.
// Prepare internal SQL to fetch data from physical table under this filter.
sql := fmt.Sprintf("select 1 from `%s`.`%s` where not %s limit 1", dbInfo.Name.L, tableInfo.Name.L, constr.ExprString)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
rows, _, err := sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(ctx, nil, sql)
if err != nil {
return errors.Trace(err)
}
rowCount := len(rows)
if rowCount != 0 {
return dbterror.ErrCheckConstraintIsViolated.GenWithStackByArgs(constr.Name.L)
}
return nil
}
func setNameForConstraintInfo(tableLowerName string, namesMap map[string]bool, infos []*model.ConstraintInfo) {
cnt := 1
constraintPrefix := tableLowerName + "_chk_"
for _, constrInfo := range infos {
if constrInfo.Name.O == "" {
constrName := fmt.Sprintf("%s%d", constraintPrefix, cnt)
for {
// loop until find constrName that haven't been used.
if !namesMap[constrName] {
namesMap[constrName] = true
break
}
cnt++
constrName = fmt.Sprintf("%s%d", constraintPrefix, cnt)
}
constrInfo.Name = ast.NewCIStr(constrName)
}
}
}
// IsColumnDroppableWithCheckConstraint check whether the column in check-constraint whose dependent col is more than 1
func IsColumnDroppableWithCheckConstraint(col ast.CIStr, tblInfo *model.TableInfo) error {
for _, cons := range tblInfo.Constraints {
if len(cons.ConstraintCols) > 1 {
for _, colName := range cons.ConstraintCols {
if colName.L == col.L {
return dbterror.ErrCantDropColWithCheckConstraint.GenWithStackByArgs(cons.Name, col)
}
}
}
}
return nil
}
// IsColumnRenameableWithCheckConstraint check whether the column is referenced in check-constraint
func IsColumnRenameableWithCheckConstraint(col ast.CIStr, tblInfo *model.TableInfo) error {
for _, cons := range tblInfo.Constraints {
for _, colName := range cons.ConstraintCols {
if colName.L == col.L {
return dbterror.ErrCantDropColWithCheckConstraint.GenWithStackByArgs(cons.Name, col)
}
}
}
return nil
}