1799 lines
59 KiB
Go
1799 lines
59 KiB
Go
// Copyright 2015 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ddl
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/ddl/label"
|
|
"github.com/pingcap/tidb/pkg/ddl/logutil"
|
|
"github.com/pingcap/tidb/pkg/ddl/notifier"
|
|
"github.com/pingcap/tidb/pkg/ddl/placement"
|
|
sess "github.com/pingcap/tidb/pkg/ddl/session"
|
|
"github.com/pingcap/tidb/pkg/domain/infosync"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/meta"
|
|
"github.com/pingcap/tidb/pkg/meta/autoid"
|
|
"github.com/pingcap/tidb/pkg/meta/metadef"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/charset"
|
|
field_types "github.com/pingcap/tidb/pkg/parser/types"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
"github.com/pingcap/tidb/pkg/table/tables"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror"
|
|
"github.com/pingcap/tidb/pkg/util/gcutil"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const tiflashCheckTiDBHTTPAPIHalfInterval = 2500 * time.Millisecond
|
|
|
|
func repairTableOrViewWithCheck(t *meta.Mutator, job *model.Job, schemaID int64, tbInfo *model.TableInfo) error {
|
|
err := checkTableInfoValid(tbInfo)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return errors.Trace(err)
|
|
}
|
|
return t.UpdateTable(schemaID, tbInfo)
|
|
}
|
|
|
|
func (w *worker) onDropTableOrView(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetDropTableArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
jobCtx.jobArgs = args
|
|
tblInfo, err := checkTableExistAndCancelNonExistJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
originalState := job.SchemaState
|
|
switch tblInfo.State {
|
|
case model.StatePublic:
|
|
// public -> write only
|
|
if job.Type == model.ActionDropTable {
|
|
err = checkDropTableHasForeignKeyReferredInOwner(jobCtx.infoCache, job, args)
|
|
if err != nil {
|
|
return ver, err
|
|
}
|
|
}
|
|
tblInfo.State = model.StateWriteOnly
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != tblInfo.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
case model.StateWriteOnly:
|
|
// write only -> delete only
|
|
tblInfo.State = model.StateDeleteOnly
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != tblInfo.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
case model.StateDeleteOnly:
|
|
tblInfo.State = model.StateNone
|
|
oldIDs := getPartitionIDs(tblInfo)
|
|
ruleIDs := append(getPartitionRuleIDs(job.SchemaName, tblInfo), fmt.Sprintf(label.TableIDFormat, label.IDPrefix, job.SchemaName, tblInfo.Name.L))
|
|
|
|
args.OldPartitionIDs = oldIDs
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != tblInfo.State)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
metaMut := jobCtx.metaMut
|
|
if tblInfo.IsSequence() {
|
|
if err = metaMut.DropSequence(job.SchemaID, job.TableID); err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
} else {
|
|
if err = metaMut.DropTableOrView(job.SchemaID, job.TableID); err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
if err = metaMut.GetAutoIDAccessors(job.SchemaID, job.TableID).Del(); err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
if tblInfo.TiFlashReplica != nil {
|
|
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo)
|
|
if e != nil {
|
|
logutil.DDLLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
|
|
}
|
|
}
|
|
// Placement rules cannot be removed immediately after drop table / truncate table, because the
|
|
// tables can be flashed back or recovered, therefore it moved to doGCPlacementRules in gc_worker.go.
|
|
if !tblInfo.IsSequence() && !tblInfo.IsView() {
|
|
dropTableEvent := notifier.NewDropTableEvent(tblInfo)
|
|
err = asyncNotifyEvent(jobCtx, dropTableEvent, job, noSubJob, w.sess)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
if err := deleteTableAffinityGroupsInPD(jobCtx, tblInfo, nil); err != nil {
|
|
logutil.DDLLogger().Error("failed to delete affinity groups from PD", zap.Error(err), zap.Int64("tableID", tblInfo.ID))
|
|
}
|
|
|
|
// Finish this job.
|
|
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
|
|
startKey := tablecodec.EncodeTablePrefix(job.TableID)
|
|
job.FillFinishedArgs(&model.DropTableArgs{
|
|
StartKey: startKey,
|
|
OldPartitionIDs: oldIDs,
|
|
OldRuleIDs: ruleIDs,
|
|
})
|
|
default:
|
|
return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State))
|
|
}
|
|
job.SchemaState = tblInfo.State
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
func (w *worker) onRecoverTable(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
args, err := model.GetRecoverArgs(job)
|
|
if err != nil {
|
|
// Invalid arguments, cancel this job.
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
jobCtx.jobArgs = args
|
|
recoverInfo := args.RecoverTableInfos()[0]
|
|
|
|
schemaID := recoverInfo.SchemaID
|
|
tblInfo := recoverInfo.TableInfo
|
|
if tblInfo.TTLInfo != nil {
|
|
// force disable TTL job schedule for recovered table
|
|
tblInfo.TTLInfo.Enable = false
|
|
}
|
|
|
|
// check GC and safe point
|
|
gcEnable, err := checkGCEnable(w)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
err = checkTableNotExists(jobCtx.infoCache, schemaID, tblInfo.Name.L)
|
|
if err != nil {
|
|
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) {
|
|
job.State = model.JobStateCancelled
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
metaMut := jobCtx.metaMut
|
|
err = checkTableIDNotExists(metaMut, schemaID, tblInfo.ID)
|
|
if err != nil {
|
|
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) {
|
|
job.State = model.JobStateCancelled
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// Recover table divide into 2 steps:
|
|
// 1. Check GC enable status, to decided whether enable GC after recover table.
|
|
// a. Why not disable GC before put the job to DDL job queue?
|
|
// Think about concurrency problem. If a recover job-1 is doing and already disabled GC,
|
|
// then, another recover table job-2 check GC enable will get disable before into the job queue.
|
|
// then, after recover table job-2 finished, the GC will be disabled.
|
|
// b. Why split into 2 steps? 1 step also can finish this job: check GC -> disable GC -> recover table -> finish job.
|
|
// What if the transaction commit failed? then, the job will retry, but the GC already disabled when first running.
|
|
// So, after this job retry succeed, the GC will be disabled.
|
|
// 2. Do recover table job.
|
|
// a. Check whether GC enabled, if enabled, disable GC first.
|
|
// b. Check GC safe point. If drop table time if after safe point time, then can do recover.
|
|
// otherwise, can't recover table, because the records of the table may already delete by gc.
|
|
// c. Remove GC task of the table from gc_delete_range table.
|
|
// d. Create table and rebase table auto ID.
|
|
// e. Finish.
|
|
switch tblInfo.State {
|
|
case model.StateNone:
|
|
// none -> write only
|
|
// check GC enable and update flag.
|
|
if gcEnable {
|
|
args.CheckFlag = recoverCheckFlagEnableGC
|
|
} else {
|
|
args.CheckFlag = recoverCheckFlagDisableGC
|
|
}
|
|
job.FillArgs(args)
|
|
|
|
job.SchemaState = model.StateWriteOnly
|
|
tblInfo.State = model.StateWriteOnly
|
|
case model.StateWriteOnly:
|
|
// write only -> public
|
|
// do recover table.
|
|
if gcEnable {
|
|
err = disableGC(w)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Errorf("disable gc failed, try again later. err: %v", err)
|
|
}
|
|
}
|
|
// check GC safe point
|
|
err = checkSafePoint(w, recoverInfo.SnapshotTS)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
ver, err = w.recoverTable(jobCtx.stepCtx, metaMut, job, recoverInfo)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
tableInfo := tblInfo.Clone()
|
|
tableInfo.State = model.StatePublic
|
|
tableInfo.UpdateTS = metaMut.StartTS
|
|
|
|
var tids []int64
|
|
if recoverInfo.TableInfo.GetPartitionInfo() != nil {
|
|
tids = getPartitionIDs(recoverInfo.TableInfo)
|
|
tids = append(tids, recoverInfo.TableInfo.ID)
|
|
} else {
|
|
tids = []int64{recoverInfo.TableInfo.ID}
|
|
}
|
|
args.AffectedPhysicalIDs = tids
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tableInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
tblInfo.State = model.StatePublic
|
|
tblInfo.UpdateTS = metaMut.StartTS
|
|
// Finish this job.
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
default:
|
|
return ver, dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State)
|
|
}
|
|
return ver, nil
|
|
}
|
|
|
|
func (w *worker) recoverTable(
|
|
ctx context.Context,
|
|
t *meta.Mutator,
|
|
job *model.Job,
|
|
recoverInfo *model.RecoverTableInfo,
|
|
) (ver int64, err error) {
|
|
tableRuleID, partRuleIDs, oldRuleIDs, oldRules, err := getOldLabelRules(recoverInfo.TableInfo, recoverInfo.OldSchemaName, recoverInfo.OldTableName)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Wrapf(err, "failed to get old label rules from PD")
|
|
}
|
|
// Remove dropped table DDL job from gc_delete_range table.
|
|
err = w.delRangeManager.removeFromGCDeleteRange(ctx, recoverInfo.DropJobID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
err = clearTablePlacementAndBundles(ctx, recoverInfo.TableInfo)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
tableInfo := recoverInfo.TableInfo.Clone()
|
|
tableInfo.State = model.StatePublic
|
|
tableInfo.UpdateTS = t.StartTS
|
|
err = t.CreateTableAndSetAutoID(recoverInfo.SchemaID, tableInfo, recoverInfo.AutoIDs)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
failpoint.Inject("mockRecoverTableCommitErr", func(val failpoint.Value) {
|
|
if val.(bool) && atomic.CompareAndSwapUint32(&mockRecoverTableCommitErrOnce, 0, 1) {
|
|
err = failpoint.Enable(`tikvclient/mockCommitErrorOpt`, "return(true)")
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
})
|
|
|
|
err = updateLabelRules(job, recoverInfo.TableInfo, oldRules, tableRuleID, partRuleIDs, oldRuleIDs, recoverInfo.TableInfo.ID)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Wrapf(err, "failed to update the label rule to PD")
|
|
}
|
|
|
|
return ver, nil
|
|
}
|
|
|
|
func clearTablePlacementAndBundles(ctx context.Context, tblInfo *model.TableInfo) error {
|
|
failpoint.Inject("mockClearTablePlacementAndBundlesErr", func() {
|
|
failpoint.Return(errors.New("mock error for clearTablePlacementAndBundles"))
|
|
})
|
|
var bundles []*placement.Bundle
|
|
if tblInfo.PlacementPolicyRef != nil {
|
|
tblInfo.PlacementPolicyRef = nil
|
|
bundles = append(bundles, placement.NewBundle(tblInfo.ID))
|
|
}
|
|
|
|
if tblInfo.Partition != nil {
|
|
for i := range tblInfo.Partition.Definitions {
|
|
par := &tblInfo.Partition.Definitions[i]
|
|
if par.PlacementPolicyRef != nil {
|
|
par.PlacementPolicyRef = nil
|
|
bundles = append(bundles, placement.NewBundle(par.ID))
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(bundles) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return infosync.PutRuleBundlesWithDefaultRetry(ctx, bundles)
|
|
}
|
|
|
|
// mockRecoverTableCommitErrOnce uses to make sure
|
|
// `mockRecoverTableCommitErr` only mock error once.
|
|
var mockRecoverTableCommitErrOnce uint32
|
|
|
|
func enableGC(w *worker) error {
|
|
ctx, err := w.sessPool.Get()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer w.sessPool.Put(ctx)
|
|
|
|
return gcutil.EnableGC(ctx)
|
|
}
|
|
|
|
func disableGC(w *worker) error {
|
|
ctx, err := w.sessPool.Get()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer w.sessPool.Put(ctx)
|
|
|
|
return gcutil.DisableGC(ctx)
|
|
}
|
|
|
|
func checkGCEnable(w *worker) (enable bool, err error) {
|
|
ctx, err := w.sessPool.Get()
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
defer w.sessPool.Put(ctx)
|
|
|
|
return gcutil.CheckGCEnable(ctx)
|
|
}
|
|
|
|
func checkSafePoint(w *worker, snapshotTS uint64) error {
|
|
ctx, err := w.sessPool.Get()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer w.sessPool.Put(ctx)
|
|
|
|
return gcutil.ValidateSnapshot(ctx, snapshotTS)
|
|
}
|
|
|
|
func getTable(r autoid.Requirement, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) {
|
|
allocs := autoid.NewAllocatorsFromTblInfo(r, schemaID, tblInfo)
|
|
tbl, err := table.TableFromMeta(allocs, tblInfo)
|
|
return tbl, errors.Trace(err)
|
|
}
|
|
|
|
// GetTableInfoAndCancelFaultJob is exported for test.
|
|
func GetTableInfoAndCancelFaultJob(t *meta.Mutator, job *model.Job, schemaID int64) (*model.TableInfo, error) {
|
|
tblInfo, err := checkTableExistAndCancelNonExistJob(t, job, schemaID)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
if tblInfo.State != model.StatePublic {
|
|
job.State = model.JobStateCancelled
|
|
return nil, dbterror.ErrInvalidDDLState.GenWithStack("table %s is not in public, but %s", tblInfo.Name, tblInfo.State)
|
|
}
|
|
|
|
return tblInfo, nil
|
|
}
|
|
|
|
func checkTableExistAndCancelNonExistJob(t *meta.Mutator, job *model.Job, schemaID int64) (*model.TableInfo, error) {
|
|
tblInfo, err := getTableInfo(t, job.TableID, schemaID)
|
|
if err == nil {
|
|
// Check if table name is renamed.
|
|
if job.TableName != "" && tblInfo.Name.L != job.TableName && job.Type != model.ActionRepairTable {
|
|
job.State = model.JobStateCancelled
|
|
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(job.SchemaName, job.TableName)
|
|
}
|
|
return tblInfo, nil
|
|
}
|
|
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
|
|
job.State = model.JobStateCancelled
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
func getTableInfo(t *meta.Mutator, tableID, schemaID int64) (*model.TableInfo, error) {
|
|
// Check this table's database.
|
|
tblInfo, err := t.GetTable(schemaID, tableID)
|
|
if err != nil {
|
|
if meta.ErrDBNotExists.Equal(err) {
|
|
return nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(
|
|
fmt.Sprintf("(Schema ID %d)", schemaID),
|
|
))
|
|
}
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
// Check the table.
|
|
if tblInfo == nil {
|
|
return nil, errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(
|
|
fmt.Sprintf("(Schema ID %d)", schemaID),
|
|
fmt.Sprintf("(Table ID %d)", tableID),
|
|
))
|
|
}
|
|
return tblInfo, nil
|
|
}
|
|
|
|
// onTruncateTable delete old table meta, and creates a new table identical to old table except for table ID.
|
|
// As all the old data is encoded with old table ID, it can not be accessed anymore.
|
|
// A background job will be created to delete old data.
|
|
func (w *worker) onTruncateTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
schemaID := job.SchemaID
|
|
args, err := model.GetTruncateTableArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
jobCtx.jobArgs = args
|
|
metaMut := jobCtx.metaMut
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, schemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
if tblInfo.IsView() || tblInfo.IsSequence() {
|
|
job.State = model.JobStateCancelled
|
|
return ver, infoschema.ErrTableNotExists.GenWithStackByArgs(job.SchemaName, tblInfo.Name.O)
|
|
}
|
|
// Copy the old tableInfo for later usage.
|
|
oldTblInfo := tblInfo.Clone()
|
|
err = checkTruncateTableHasForeignKeyReferredInOwner(jobCtx.infoCache, job, tblInfo, args.FKCheck)
|
|
if err != nil {
|
|
return ver, err
|
|
}
|
|
err = metaMut.DropTableOrView(schemaID, tblInfo.ID)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
err = metaMut.GetAutoIDAccessors(schemaID, tblInfo.ID).Del()
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
failpoint.Inject("truncateTableErr", func(val failpoint.Value) {
|
|
if val.(bool) {
|
|
job.State = model.JobStateCancelled
|
|
failpoint.Return(ver, errors.New("occur an error after dropping table"))
|
|
}
|
|
})
|
|
|
|
// Clear the TiFlash replica progress from ETCD.
|
|
if tblInfo.TiFlashReplica != nil {
|
|
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo)
|
|
if e != nil {
|
|
logutil.DDLLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
|
|
}
|
|
}
|
|
|
|
var (
|
|
oldPartitionIDs []int64
|
|
newPartitionIDs = args.NewPartitionIDs
|
|
)
|
|
if tblInfo.GetPartitionInfo() != nil {
|
|
oldPartitionIDs = getPartitionIDs(tblInfo)
|
|
// We use the new partition ID because all the old data is encoded with the old partition ID, it can not be accessed anymore.
|
|
newPartitionIDs, err = truncateTableByReassignPartitionIDs(metaMut, tblInfo, newPartitionIDs)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
if pi := tblInfo.GetPartitionInfo(); pi != nil {
|
|
oldIDs := make([]int64, 0, len(oldPartitionIDs))
|
|
newIDs := make([]int64, 0, len(oldPartitionIDs))
|
|
newDefs := pi.Definitions
|
|
for i := range oldPartitionIDs {
|
|
newDef := &newDefs[i]
|
|
newID := newDef.ID
|
|
if newDef.PlacementPolicyRef != nil {
|
|
oldIDs = append(oldIDs, oldPartitionIDs[i])
|
|
newIDs = append(newIDs, newID)
|
|
}
|
|
}
|
|
args.OldPartIDsWithPolicy = oldIDs
|
|
args.NewPartIDsWithPolicy = newIDs
|
|
}
|
|
|
|
tableRuleID, partRuleIDs, _, oldRules, err := getOldLabelRules(tblInfo, job.SchemaName, tblInfo.Name.L)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Wrapf(err, "failed to get old label rules from PD")
|
|
}
|
|
|
|
err = updateLabelRules(job, tblInfo, oldRules, tableRuleID, partRuleIDs, []string{}, args.NewTableID)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Wrapf(err, "failed to update the label rule to PD")
|
|
}
|
|
|
|
// Clear the TiFlash replica available status.
|
|
if tblInfo.TiFlashReplica != nil {
|
|
// Set PD rules for TiFlash
|
|
if pi := tblInfo.GetPartitionInfo(); pi != nil {
|
|
if e := infosync.ConfigureTiFlashPDForPartitions(true, &pi.Definitions, tblInfo.TiFlashReplica.Count, &tblInfo.TiFlashReplica.LocationLabels, tblInfo.ID); e != nil {
|
|
logutil.DDLLogger().Error("ConfigureTiFlashPDForPartitions fails", zap.Error(err))
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(e)
|
|
}
|
|
} else {
|
|
if e := infosync.ConfigureTiFlashPDForTable(args.NewTableID, tblInfo.TiFlashReplica.Count, &tblInfo.TiFlashReplica.LocationLabels); e != nil {
|
|
logutil.DDLLogger().Error("ConfigureTiFlashPDForTable fails", zap.Error(err))
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(e)
|
|
}
|
|
}
|
|
tblInfo.TiFlashReplica.AvailablePartitionIDs = nil
|
|
tblInfo.TiFlashReplica.Available = false
|
|
}
|
|
|
|
tblInfo.ID = args.NewTableID
|
|
|
|
// build table & partition bundles if any.
|
|
bundles, err := placement.NewFullTableBundles(metaMut, tblInfo)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
|
|
}
|
|
|
|
err = metaMut.CreateTableOrView(schemaID, tblInfo)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
failpoint.Inject("mockTruncateTableUpdateVersionError", func(val failpoint.Value) {
|
|
if val.(bool) {
|
|
failpoint.Return(ver, errors.New("mock update version error"))
|
|
}
|
|
})
|
|
|
|
var scatterScope string
|
|
if val, ok := job.GetSystemVars(vardef.TiDBScatterRegion); ok {
|
|
scatterScope = val
|
|
}
|
|
preSplitAndScatterTable(w.sess.Context, jobCtx.store, tblInfo, scatterScope)
|
|
|
|
// Create new affinity groups first (critical operation - must succeed)
|
|
if tblInfo.Affinity != nil {
|
|
if err = createTableAffinityGroupsInPD(jobCtx, tblInfo); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
// Delete old affinity groups (best-effort cleanup - ignore errors)
|
|
// TRUNCATE TABLE: always try to delete old table's affinity groups
|
|
if oldTblInfo.Affinity != nil {
|
|
if err := deleteTableAffinityGroupsInPD(jobCtx, oldTblInfo, nil); err != nil {
|
|
logutil.DDLLogger().Error("failed to delete old affinity groups from PD", zap.Error(err), zap.Int64("tableID", oldTblInfo.ID))
|
|
}
|
|
}
|
|
|
|
ver, err = updateSchemaVersion(jobCtx, job)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
truncateTableEvent := notifier.NewTruncateTableEvent(tblInfo, oldTblInfo)
|
|
err = asyncNotifyEvent(jobCtx, truncateTableEvent, job, noSubJob, w.sess)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
// see truncateTableByReassignPartitionIDs for why they might change.
|
|
args.OldPartitionIDs = oldPartitionIDs
|
|
args.NewPartitionIDs = newPartitionIDs
|
|
job.FillFinishedArgs(args)
|
|
return ver, nil
|
|
}
|
|
|
|
func onRebaseAutoIncrementIDType(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
return onRebaseAutoID(jobCtx, job, autoid.AutoIncrementType)
|
|
}
|
|
|
|
func onRebaseAutoRandomType(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
return onRebaseAutoID(jobCtx, job, autoid.AutoRandomType)
|
|
}
|
|
|
|
func onRebaseAutoID(jobCtx *jobContext, job *model.Job, tp autoid.AllocatorType) (ver int64, _ error) {
|
|
args, err := model.GetRebaseAutoIDArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
schemaID := job.SchemaID
|
|
newBase, force := args.NewBase, args.Force
|
|
|
|
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
|
|
job.MarkNonRevertible()
|
|
return ver, nil
|
|
}
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, schemaID)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
tbl, err := getTable(jobCtx.getAutoIDRequirement(), schemaID, tblInfo)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if !force {
|
|
newBaseTemp, err := adjustNewBaseToNextGlobalID(nil, tbl, tp, newBase)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
if newBase != newBaseTemp {
|
|
job.Warning = toTError(fmt.Errorf("Can't reset AUTO_INCREMENT to %d without FORCE option, using %d instead",
|
|
newBase, newBaseTemp,
|
|
))
|
|
}
|
|
newBase = newBaseTemp
|
|
}
|
|
|
|
if tp == autoid.AutoIncrementType {
|
|
tblInfo.AutoIncID = newBase
|
|
} else {
|
|
tblInfo.AutoRandID = newBase
|
|
}
|
|
|
|
if alloc := tbl.Allocators(nil).Get(tp); alloc != nil {
|
|
// The next value to allocate is `newBase`.
|
|
newEnd := newBase - 1
|
|
if force {
|
|
err = alloc.ForceRebase(newEnd)
|
|
} else {
|
|
err = alloc.Rebase(context.Background(), newEnd, false)
|
|
}
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func onModifyTableAutoIDCache(jobCtx *jobContext, job *model.Job) (int64, error) {
|
|
args, err := model.GetModifyTableAutoIDCacheArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
|
|
tblInfo.AutoIDCache = args.NewCache
|
|
ver, err := updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func (w *worker) onShardRowID(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetShardRowIDArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
shardRowIDBits := args.ShardRowIDBits
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
if shardRowIDBits < tblInfo.ShardRowIDBits {
|
|
tblInfo.ShardRowIDBits = shardRowIDBits
|
|
} else {
|
|
tbl, err := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
err = verifyNoOverflowShardBits(w.sessPool, tbl, shardRowIDBits)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, err
|
|
}
|
|
tblInfo.ShardRowIDBits = shardRowIDBits
|
|
// MaxShardRowIDBits use to check the overflow of auto ID.
|
|
tblInfo.MaxShardRowIDBits = shardRowIDBits
|
|
}
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func verifyNoOverflowShardBits(s *sess.Pool, tbl table.Table, shardRowIDBits uint64) error {
|
|
if shardRowIDBits == 0 {
|
|
return nil
|
|
}
|
|
ctx, err := s.Get()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer s.Put(ctx)
|
|
// Check next global max auto ID first.
|
|
autoIncID, err := tbl.Allocators(ctx.GetTableCtx()).Get(autoid.RowIDAllocType).NextGlobalAutoID()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if tables.OverflowShardBits(autoIncID, shardRowIDBits, autoid.RowIDBitLength, true) {
|
|
return autoid.ErrAutoincReadFailed.GenWithStack("shard_row_id_bits %d will cause next global auto ID %v overflow", shardRowIDBits, autoIncID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func onRenameTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetRenameTableArgs(job)
|
|
if err != nil {
|
|
// Invalid arguments, cancel this job.
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
jobCtx.jobArgs = args
|
|
|
|
oldSchemaID, oldSchemaName, tableName := args.OldSchemaID, args.OldSchemaName, args.NewTableName
|
|
if job.SchemaState == model.StatePublic {
|
|
return finishJobRenameTable(jobCtx, job)
|
|
}
|
|
newSchemaID := job.SchemaID
|
|
err = checkTableNotExists(jobCtx.infoCache, newSchemaID, tableName.L)
|
|
if err != nil {
|
|
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) {
|
|
job.State = model.JobStateCancelled
|
|
}
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
metaMut := jobCtx.metaMut
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, oldSchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
oldTableName := tblInfo.Name
|
|
ver, err = checkAndRenameTables(metaMut, job, tblInfo, args)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
fkh := newForeignKeyHelper()
|
|
err = adjustForeignKeyChildTableInfoAfterRenameTable(jobCtx.infoCache, metaMut,
|
|
job, &fkh, tblInfo, oldSchemaName, oldTableName, tableName, newSchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
ver, err = updateSchemaVersion(jobCtx, job, fkh.getLoadedTables()...)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.SchemaState = model.StatePublic
|
|
return ver, nil
|
|
}
|
|
|
|
func onRenameTables(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetRenameTablesArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
jobCtx.jobArgs = args
|
|
|
|
if job.SchemaState == model.StatePublic {
|
|
return finishJobRenameTables(jobCtx, job)
|
|
}
|
|
|
|
fkh := newForeignKeyHelper()
|
|
metaMut := jobCtx.metaMut
|
|
for _, info := range args.RenameTableInfos {
|
|
job.TableID = info.TableID
|
|
job.TableName = info.OldTableName.L
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, info.OldSchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
ver, err := checkAndRenameTables(metaMut, job, tblInfo, info)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
err = adjustForeignKeyChildTableInfoAfterRenameTable(
|
|
jobCtx.infoCache, metaMut, job, &fkh, tblInfo,
|
|
info.OldSchemaName, info.OldTableName, info.NewTableName, info.NewSchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
ver, err = updateSchemaVersion(jobCtx, job, fkh.getLoadedTables()...)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.SchemaState = model.StatePublic
|
|
return ver, nil
|
|
}
|
|
|
|
func checkAndRenameTables(t *meta.Mutator, job *model.Job, tblInfo *model.TableInfo, args *model.RenameTableArgs) (ver int64, _ error) {
|
|
err := t.DropTableOrView(args.OldSchemaID, tblInfo.ID)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
failpoint.Inject("renameTableErr", func(val failpoint.Value) {
|
|
if valStr, ok := val.(string); ok {
|
|
if args.NewTableName.L == valStr {
|
|
job.State = model.JobStateCancelled
|
|
failpoint.Return(ver, errors.New("occur an error after renaming table"))
|
|
}
|
|
}
|
|
})
|
|
|
|
oldTableName := tblInfo.Name
|
|
tableRuleID, partRuleIDs, oldRuleIDs, oldRules, err := getOldLabelRules(tblInfo, args.OldSchemaName.L, oldTableName.L)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Wrapf(err, "failed to get old label rules from PD")
|
|
}
|
|
|
|
if tblInfo.AutoIDSchemaID == 0 && args.NewSchemaID != args.OldSchemaID {
|
|
// The auto id is referenced by a schema id + table id
|
|
// Table ID is not changed between renames, but schema id can change.
|
|
// To allow concurrent use of the auto id during rename, keep the auto id
|
|
// by always reference it with the schema id it was originally created in.
|
|
tblInfo.AutoIDSchemaID = args.OldSchemaID
|
|
}
|
|
if args.NewSchemaID == tblInfo.AutoIDSchemaID {
|
|
// Back to the original schema id, no longer needed.
|
|
tblInfo.AutoIDSchemaID = 0
|
|
}
|
|
|
|
tblInfo.Name = args.NewTableName
|
|
err = t.CreateTableOrView(args.NewSchemaID, tblInfo)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
err = updateLabelRules(job, tblInfo, oldRules, tableRuleID, partRuleIDs, oldRuleIDs, tblInfo.ID)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Wrapf(err, "failed to update the label rule to PD")
|
|
}
|
|
|
|
return ver, nil
|
|
}
|
|
|
|
func adjustForeignKeyChildTableInfoAfterRenameTable(
|
|
infoCache *infoschema.InfoCache, t *meta.Mutator, job *model.Job,
|
|
fkh *foreignKeyHelper, tblInfo *model.TableInfo,
|
|
oldSchemaName, oldTableName, newTableName ast.CIStr, newSchemaID int64) error {
|
|
if !vardef.EnableForeignKey.Load() || newTableName.L == oldTableName.L {
|
|
return nil
|
|
}
|
|
is := infoCache.GetLatest()
|
|
newDB, ok := is.SchemaByID(newSchemaID)
|
|
if !ok {
|
|
job.State = model.JobStateCancelled
|
|
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(fmt.Sprintf("schema-ID: %v", newSchemaID))
|
|
}
|
|
referredFKs := is.GetTableReferredForeignKeys(oldSchemaName.L, oldTableName.L)
|
|
if len(referredFKs) == 0 {
|
|
return nil
|
|
}
|
|
fkh.addLoadedTable(oldSchemaName.L, oldTableName.L, newDB.ID, tblInfo)
|
|
for _, referredFK := range referredFKs {
|
|
childTableInfo, err := fkh.getTableFromStorage(is, t, referredFK.ChildSchema, referredFK.ChildTable)
|
|
if err != nil {
|
|
if infoschema.ErrTableNotExists.Equal(err) || infoschema.ErrDatabaseNotExists.Equal(err) {
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
childFKInfo := model.FindFKInfoByName(childTableInfo.tblInfo.ForeignKeys, referredFK.ChildFKName.L)
|
|
if childFKInfo == nil {
|
|
continue
|
|
}
|
|
childFKInfo.RefSchema = newDB.Name
|
|
childFKInfo.RefTable = newTableName
|
|
}
|
|
for _, info := range fkh.loaded {
|
|
err := updateTable(t, info.schemaID, info.tblInfo, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// We split the renaming table job into two steps:
|
|
// 1. rename table and update the schema version.
|
|
// 2. update the job state to JobStateDone.
|
|
// This is the requirement from TiCDC because
|
|
// - it uses the job state to check whether the DDL is finished.
|
|
// - there is a gap between schema reloading and job state updating:
|
|
// when the job state is updated to JobStateDone, before the new schema reloaded,
|
|
// there may be DMLs that use the old schema.
|
|
// - TiCDC cannot handle the DMLs that use the old schema, because
|
|
// the commit TS of the DMLs are greater than the job state updating TS.
|
|
func finishJobRenameTable(jobCtx *jobContext, job *model.Job) (int64, error) {
|
|
tblInfo, err := getTableInfo(jobCtx.metaMut, job.TableID, job.SchemaID)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
// Before updating the schema version, we need to reset the old schema ID to new schema ID, so that
|
|
// the table info can be dropped normally in `ApplyDiff`. This is because renaming table requires two
|
|
// schema versions to complete.
|
|
args := jobCtx.jobArgs.(*model.RenameTableArgs)
|
|
args.OldSchemaIDForSchemaDiff = job.SchemaID
|
|
|
|
ver, err := updateSchemaVersion(jobCtx, job)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func finishJobRenameTables(jobCtx *jobContext, job *model.Job) (int64, error) {
|
|
args := jobCtx.jobArgs.(*model.RenameTablesArgs)
|
|
infos := args.RenameTableInfos
|
|
tblSchemaIDs := make(map[int64]int64, len(infos))
|
|
for _, info := range infos {
|
|
tblSchemaIDs[info.TableID] = info.NewSchemaID
|
|
}
|
|
tblInfos := make([]*model.TableInfo, 0, len(infos))
|
|
for _, info := range infos {
|
|
tblID := info.TableID
|
|
tblInfo, err := getTableInfo(jobCtx.metaMut, tblID, tblSchemaIDs[tblID])
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
tblInfos = append(tblInfos, tblInfo)
|
|
}
|
|
// Before updating the schema version, we need to reset the old schema ID to new schema ID, so that
|
|
// the table info can be dropped normally in `ApplyDiff`. This is because renaming table requires two
|
|
// schema versions to complete.
|
|
for _, info := range infos {
|
|
info.OldSchemaIDForSchemaDiff = info.NewSchemaID
|
|
}
|
|
ver, err := updateSchemaVersion(jobCtx, job)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos)
|
|
return ver, nil
|
|
}
|
|
|
|
func onModifyTableComment(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetModifyTableCommentArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
|
|
job.MarkNonRevertible()
|
|
return ver, nil
|
|
}
|
|
|
|
tblInfo.Comment = args.Comment
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func onModifyTableCharsetAndCollate(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetModifyTableCharsetAndCollateArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
toCharset, toCollate, needsOverwriteCols := args.ToCharset, args.ToCollate, args.NeedsOverwriteCols
|
|
metaMut := jobCtx.metaMut
|
|
dbInfo, err := checkSchemaExistAndCancelNotExistJob(metaMut, job)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// double check.
|
|
_, err = checkAlterTableCharset(tblInfo, dbInfo, toCharset, toCollate, needsOverwriteCols)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
|
|
job.MarkNonRevertible()
|
|
return ver, nil
|
|
}
|
|
|
|
tblInfo.Charset = toCharset
|
|
tblInfo.Collate = toCollate
|
|
|
|
if needsOverwriteCols {
|
|
// update column charset.
|
|
for _, col := range tblInfo.Columns {
|
|
if field_types.HasCharset(&col.FieldType) {
|
|
col.SetCharset(toCharset)
|
|
col.SetCollate(toCollate)
|
|
} else {
|
|
col.SetCharset(charset.CharsetBin)
|
|
col.SetCollate(charset.CharsetBin)
|
|
}
|
|
}
|
|
}
|
|
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func (w *worker) onSetTableFlashReplica(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetSetTiFlashReplicaArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
replicaInfo := args.TiflashReplica
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// Ban setting replica count for tables in system database.
|
|
if metadef.IsMemOrSysDB(job.SchemaName) {
|
|
return ver, errors.Trace(dbterror.ErrUnsupportedTiFlashOperationForSysOrMemTable)
|
|
}
|
|
|
|
// Check the validity of the replica count. For example, not exceeding the tiflash store count.
|
|
err = w.checkTiFlashReplicaCount(replicaInfo.Count)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// We should check this first, in order to avoid creating redundant DDL jobs.
|
|
if pi := tblInfo.GetPartitionInfo(); pi != nil {
|
|
logutil.DDLLogger().Info("Set TiFlash replica pd rule for partitioned table", zap.Int64("tableID", tblInfo.ID))
|
|
if e := infosync.ConfigureTiFlashPDForPartitions(false, &pi.Definitions, replicaInfo.Count, &replicaInfo.Labels, tblInfo.ID); e != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(e)
|
|
}
|
|
// Partitions that in adding mid-state. They have high priorities, so we should set accordingly pd rules.
|
|
if e := infosync.ConfigureTiFlashPDForPartitions(true, &pi.AddingDefinitions, replicaInfo.Count, &replicaInfo.Labels, tblInfo.ID); e != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(e)
|
|
}
|
|
} else {
|
|
logutil.DDLLogger().Info("Set TiFlash replica pd rule", zap.Int64("tableID", tblInfo.ID))
|
|
if e := infosync.ConfigureTiFlashPDForTable(tblInfo.ID, replicaInfo.Count, &replicaInfo.Labels); e != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(e)
|
|
}
|
|
}
|
|
|
|
if replicaInfo.Count > 0 {
|
|
available := false
|
|
if args.ResetAvailable {
|
|
// Reset the available field to false. This is required when fixing the placement rules after native BR.
|
|
// Because the `available` field may be true after native BR restore, but the user should wait before
|
|
// TiFlash peers are rebuilt.
|
|
available = false
|
|
} else if tblInfo.TiFlashReplica != nil {
|
|
// If there is already TiFlash replica info, we should keep the Available field.
|
|
// For example, during the process of increasing the number of TiFlash replicas from 2 to 3,
|
|
// or decreasing it from 3 to 2, if the `available` field of the original TiFlash replica
|
|
// is already `True`, its value remains unchanged. In this case, the optimizer can still choose
|
|
// to route queries to TiFlash for execution, avoiding any impact on service continuity.
|
|
available = tblInfo.TiFlashReplica.Available
|
|
}
|
|
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
|
|
Count: replicaInfo.Count,
|
|
LocationLabels: replicaInfo.Labels,
|
|
Available: available,
|
|
}
|
|
} else {
|
|
if tblInfo.TiFlashReplica != nil {
|
|
err = infosync.DeleteTiFlashTableSyncProgress(tblInfo)
|
|
if err != nil {
|
|
logutil.DDLLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(err))
|
|
}
|
|
}
|
|
tblInfo.TiFlashReplica = nil
|
|
}
|
|
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func (w *worker) checkTiFlashReplicaCount(replicaCount uint64) error {
|
|
ctx, err := w.sessPool.Get()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
defer w.sessPool.Put(ctx)
|
|
|
|
return checkTiFlashReplicaCount(ctx, replicaCount)
|
|
}
|
|
|
|
func onUpdateTiFlashReplicaStatus(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
args, err := model.GetUpdateTiFlashReplicaStatusArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
available, physicalID := args.Available, args.PhysicalID
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
if tblInfo.TiFlashReplica == nil || (tblInfo.ID == physicalID && tblInfo.TiFlashReplica.Available == available) ||
|
|
(tblInfo.ID != physicalID && available == tblInfo.TiFlashReplica.IsPartitionAvailable(physicalID)) {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Errorf("the replica available status of table %s is already updated", tblInfo.Name.String())
|
|
}
|
|
|
|
if tblInfo.ID == physicalID {
|
|
tblInfo.TiFlashReplica.Available = available
|
|
} else if pi := tblInfo.GetPartitionInfo(); pi != nil {
|
|
// Partition replica become available.
|
|
if available {
|
|
allAvailable := true
|
|
for _, p := range pi.Definitions {
|
|
if p.ID == physicalID {
|
|
tblInfo.TiFlashReplica.AvailablePartitionIDs = append(tblInfo.TiFlashReplica.AvailablePartitionIDs, physicalID)
|
|
}
|
|
allAvailable = allAvailable && tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID)
|
|
}
|
|
tblInfo.TiFlashReplica.Available = allAvailable
|
|
} else {
|
|
// Partition replica become unavailable.
|
|
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
|
|
if id == physicalID {
|
|
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
|
|
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
|
|
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
|
|
tblInfo.TiFlashReplica.Available = false
|
|
logutil.DDLLogger().Info("TiFlash replica become unavailable", zap.Int64("tableID", tblInfo.ID), zap.Int64("partitionID", id))
|
|
break
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Errorf("unknown physical ID %v in table %v", physicalID, tblInfo.Name.O)
|
|
}
|
|
|
|
if tblInfo.TiFlashReplica.Available {
|
|
logutil.DDLLogger().Info("TiFlash replica available", zap.Int64("tableID", tblInfo.ID))
|
|
}
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
// checking using cached info schema should be enough, as:
|
|
// - we will reload schema until success when become the owner
|
|
// - existing tables are correctly checked in the first place
|
|
// - we calculate job dependencies before running jobs, so there will not be 2
|
|
// jobs creating same table running concurrently.
|
|
//
|
|
// if there are 2 owners A and B, we have 2 consecutive jobs J1 and J2 which
|
|
// are creating the same table T. those 2 jobs might be running concurrently when
|
|
// A sees J1 first and B sees J2 first. But for B sees J2 first, J1 must already
|
|
// be done and synced, and been deleted from tidb_ddl_job table, as we are querying
|
|
// jobs in the order of job id. During syncing J1, B should have synced the schema
|
|
// with the latest schema version, so when B runs J2, below check will see the table
|
|
// T already exists, and J2 will fail.
|
|
func checkTableNotExists(infoCache *infoschema.InfoCache, schemaID int64, tableName string) error {
|
|
is := infoCache.GetLatest()
|
|
return checkTableNotExistsFromInfoSchema(is, schemaID, tableName)
|
|
}
|
|
|
|
func checkConstraintNamesNotExists(t *meta.Mutator, schemaID int64, constraints []*model.ConstraintInfo) error {
|
|
if len(constraints) == 0 {
|
|
return nil
|
|
}
|
|
tbInfos, err := t.ListTables(context.Background(), schemaID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, tb := range tbInfos {
|
|
for _, constraint := range constraints {
|
|
if constraint.State != model.StateWriteOnly {
|
|
if constraintInfo := tb.FindConstraintInfoByName(constraint.Name.L); constraintInfo != nil {
|
|
return infoschema.ErrCheckConstraintDupName.GenWithStackByArgs(constraint.Name.L)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func checkTableIDNotExists(t *meta.Mutator, schemaID, tableID int64) error {
|
|
tbl, err := t.GetTable(schemaID, tableID)
|
|
if err != nil {
|
|
if meta.ErrDBNotExists.Equal(err) {
|
|
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("")
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
if tbl != nil {
|
|
return infoschema.ErrTableExists.GenWithStackByArgs(tbl.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func checkTableNotExistsFromInfoSchema(is infoschema.InfoSchema, schemaID int64, tableName string) error {
|
|
// Check this table's database.
|
|
schema, ok := is.SchemaByID(schemaID)
|
|
if !ok {
|
|
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs("")
|
|
}
|
|
if is.TableExists(schema.Name, ast.NewCIStr(tableName)) {
|
|
return infoschema.ErrTableExists.GenWithStackByArgs(tableName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information
|
|
func updateVersionAndTableInfoWithCheck(jobCtx *jobContext, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) (
|
|
ver int64, err error) {
|
|
err = checkTableInfoValid(tblInfo)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
for _, info := range multiInfos {
|
|
err = checkTableInfoValid(info.tblInfo)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
return updateVersionAndTableInfo(jobCtx, job, tblInfo, shouldUpdateVer, multiInfos...)
|
|
}
|
|
|
|
// updateVersionAndTableInfo updates the schema version and the table information.
|
|
func updateVersionAndTableInfo(jobCtx *jobContext, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) (
|
|
ver int64, err error) {
|
|
failpoint.Inject("mockUpdateVersionAndTableInfoErr", func(val failpoint.Value) {
|
|
switch val.(int) {
|
|
case 1:
|
|
failpoint.Return(ver, errors.New("mock update version and tableInfo error"))
|
|
case 2:
|
|
// We change it cancelled directly here, because we want to get the original error with the job id appended.
|
|
// The job ID will be used to get the job from history queue and we will assert it's args.
|
|
job.State = model.JobStateCancelled
|
|
failpoint.Return(ver, errors.New("mock update version and tableInfo error, jobID="+strconv.Itoa(int(job.ID))))
|
|
default:
|
|
}
|
|
})
|
|
if shouldUpdateVer && (job.MultiSchemaInfo == nil || !job.MultiSchemaInfo.SkipVersion) {
|
|
ver, err = updateSchemaVersion(jobCtx, job, multiInfos...)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
needUpdateTs := tblInfo.State == model.StatePublic &&
|
|
job.Type != model.ActionTruncateTable &&
|
|
job.Type != model.ActionTruncateTablePartition &&
|
|
job.Type != model.ActionRenameTable &&
|
|
job.Type != model.ActionRenameTables &&
|
|
job.Type != model.ActionExchangeTablePartition
|
|
|
|
err = updateTable(jobCtx.metaMut, job.SchemaID, tblInfo, needUpdateTs)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
for _, info := range multiInfos {
|
|
err = updateTable(jobCtx.metaMut, info.schemaID, info.tblInfo, needUpdateTs)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
}
|
|
return ver, nil
|
|
}
|
|
|
|
func updateTable(t *meta.Mutator, schemaID int64, tblInfo *model.TableInfo, needUpdateTs bool) error {
|
|
if needUpdateTs {
|
|
tblInfo.UpdateTS = t.StartTS
|
|
}
|
|
return t.UpdateTable(schemaID, tblInfo)
|
|
}
|
|
|
|
type schemaIDAndTableInfo struct {
|
|
schemaID int64
|
|
tblInfo *model.TableInfo
|
|
}
|
|
|
|
func onRepairTable(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
|
|
schemaID := job.SchemaID
|
|
metaMut := jobCtx.metaMut
|
|
args, err := model.GetRepairTableArgs(job)
|
|
if err != nil {
|
|
// Invalid arguments, cancel this job.
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
tblInfo := args.TableInfo
|
|
tblInfo.State = model.StateNone
|
|
|
|
// Check the old DB and old table exist.
|
|
_, err = GetTableInfoAndCancelFaultJob(metaMut, job, schemaID)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
// When in repair mode, the repaired table in a server is not access to user,
|
|
// the table after repairing will be removed from repair list. Other server left
|
|
// behind alive may need to restart to get the latest schema version.
|
|
ver, err = updateSchemaVersion(jobCtx, job)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
switch tblInfo.State {
|
|
case model.StateNone:
|
|
// none -> public
|
|
tblInfo.State = model.StatePublic
|
|
tblInfo.UpdateTS = metaMut.StartTS
|
|
err = repairTableOrViewWithCheck(metaMut, job, schemaID, tblInfo)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
// Finish this job.
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
default:
|
|
return ver, dbterror.ErrInvalidDDLState.GenWithStackByArgs("table", tblInfo.State)
|
|
}
|
|
}
|
|
|
|
func onAlterTableAttributes(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
args, err := model.GetAlterTableAttributesArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(args.LabelRule.Labels) == 0 {
|
|
patch := label.NewRulePatch([]*label.Rule{}, []string{args.LabelRule.ID})
|
|
err = infosync.UpdateLabelRules(jobCtx.stepCtx, patch)
|
|
} else {
|
|
labelRule := label.Rule(*args.LabelRule)
|
|
err = infosync.PutLabelRule(jobCtx.stepCtx, &labelRule)
|
|
}
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Wrapf(err, "failed to notify PD the label rules")
|
|
}
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
|
|
return ver, nil
|
|
}
|
|
|
|
func onAlterTablePartitionAttributes(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
args, err := model.GetAlterTablePartitionArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
partitionID, rule := args.PartitionID, args.LabelRule
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
ptInfo := tblInfo.GetPartitionInfo()
|
|
if ptInfo.GetNameByID(partitionID) == "" {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(table.ErrUnknownPartition.GenWithStackByArgs("drop?", tblInfo.Name.O))
|
|
}
|
|
|
|
if len(rule.Labels) == 0 {
|
|
patch := label.NewRulePatch([]*label.Rule{}, []string{rule.ID})
|
|
err = infosync.UpdateLabelRules(context.TODO(), patch)
|
|
} else {
|
|
labelRule := label.Rule(*rule)
|
|
err = infosync.PutLabelRule(context.TODO(), &labelRule)
|
|
}
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Wrapf(err, "failed to notify PD the label rules")
|
|
}
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
|
|
return ver, nil
|
|
}
|
|
|
|
func onAlterTablePartitionPlacement(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
args, err := model.GetAlterTablePartitionArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
partitionID, policyRefInfo := args.PartitionID, args.PolicyRefInfo
|
|
metaMut := jobCtx.metaMut
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
ptInfo := tblInfo.GetPartitionInfo()
|
|
var partitionDef *model.PartitionDefinition
|
|
definitions := ptInfo.Definitions
|
|
oldPartitionEnablesPlacement := false
|
|
for i := range definitions {
|
|
if partitionID == definitions[i].ID {
|
|
def := &definitions[i]
|
|
oldPartitionEnablesPlacement = def.PlacementPolicyRef != nil
|
|
def.PlacementPolicyRef = policyRefInfo
|
|
partitionDef = &definitions[i]
|
|
break
|
|
}
|
|
}
|
|
|
|
if partitionDef == nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(table.ErrUnknownPartition.GenWithStackByArgs("drop?", tblInfo.Name.O))
|
|
}
|
|
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if _, err = checkPlacementPolicyRefValidAndCanNonValidJob(metaMut, job, partitionDef.PlacementPolicyRef); err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
bundle, err := placement.NewPartitionBundle(metaMut, *partitionDef)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if bundle == nil && oldPartitionEnablesPlacement {
|
|
bundle = placement.NewBundle(partitionDef.ID)
|
|
}
|
|
|
|
// Send the placement bundle to PD.
|
|
if bundle != nil {
|
|
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
|
|
}
|
|
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
|
|
}
|
|
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
func onAlterTablePlacement(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
args, err := model.GetAlterTablePlacementArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
policyRefInfo := args.PlacementPolicyRef
|
|
metaMut := jobCtx.metaMut
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if _, err = checkPlacementPolicyRefValidAndCanNonValidJob(metaMut, job, policyRefInfo); err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
|
|
oldTableEnablesPlacement := tblInfo.PlacementPolicyRef != nil
|
|
tblInfo.PlacementPolicyRef = policyRefInfo
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
bundle, err := placement.NewTableBundle(metaMut, tblInfo)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
if bundle == nil && oldTableEnablesPlacement {
|
|
bundle = placement.NewBundle(tblInfo.ID)
|
|
}
|
|
|
|
// Send the placement bundle to PD.
|
|
if bundle != nil {
|
|
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
|
|
}
|
|
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
|
|
return ver, nil
|
|
}
|
|
|
|
func getOldLabelRules(tblInfo *model.TableInfo, oldSchemaName, oldTableName string) (tableRuleID string, partRuleIDs, oldRuleIDs []string, oldRules map[string]*label.Rule, err error) {
|
|
tableRuleID = fmt.Sprintf(label.TableIDFormat, label.IDPrefix, oldSchemaName, oldTableName)
|
|
oldRuleIDs = []string{tableRuleID}
|
|
if tblInfo.GetPartitionInfo() != nil {
|
|
for _, def := range tblInfo.GetPartitionInfo().Definitions {
|
|
partRuleIDs = append(partRuleIDs, fmt.Sprintf(label.PartitionIDFormat, label.IDPrefix, oldSchemaName, oldTableName, def.Name.L))
|
|
}
|
|
}
|
|
|
|
oldRuleIDs = append(oldRuleIDs, partRuleIDs...)
|
|
oldRules, err = infosync.GetLabelRules(context.TODO(), oldRuleIDs)
|
|
return tableRuleID, partRuleIDs, oldRuleIDs, oldRules, err
|
|
}
|
|
|
|
func updateLabelRules(job *model.Job, tblInfo *model.TableInfo, oldRules map[string]*label.Rule, tableRuleID string, partRuleIDs, oldRuleIDs []string, tID int64) error {
|
|
if oldRules == nil {
|
|
return nil
|
|
}
|
|
var newRules []*label.Rule
|
|
if tblInfo.GetPartitionInfo() != nil {
|
|
for idx, def := range tblInfo.GetPartitionInfo().Definitions {
|
|
if r, ok := oldRules[partRuleIDs[idx]]; ok {
|
|
newRules = append(newRules, r.Clone().Reset(job.SchemaName, tblInfo.Name.L, def.Name.L, def.ID))
|
|
}
|
|
}
|
|
}
|
|
ids := []int64{tID}
|
|
if r, ok := oldRules[tableRuleID]; ok {
|
|
if tblInfo.GetPartitionInfo() != nil {
|
|
for _, def := range tblInfo.GetPartitionInfo().Definitions {
|
|
ids = append(ids, def.ID)
|
|
}
|
|
}
|
|
newRules = append(newRules, r.Clone().Reset(job.SchemaName, tblInfo.Name.L, "", ids...))
|
|
}
|
|
|
|
patch := label.NewRulePatch(newRules, oldRuleIDs)
|
|
return infosync.UpdateLabelRules(context.TODO(), patch)
|
|
}
|
|
|
|
func onAlterCacheTable(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
tbInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
// If the table is already in the cache state
|
|
if tbInfo.TableCacheStatusType == model.TableCacheStatusEnable {
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
if tbInfo.TempTableType != model.TempTableNone {
|
|
return ver, errors.Trace(dbterror.ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache"))
|
|
}
|
|
|
|
if tbInfo.Partition != nil {
|
|
return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("partition mode"))
|
|
}
|
|
|
|
switch tbInfo.TableCacheStatusType {
|
|
case model.TableCacheStatusDisable:
|
|
// disable -> switching
|
|
tbInfo.TableCacheStatusType = model.TableCacheStatusSwitching
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tbInfo, true)
|
|
if err != nil {
|
|
return ver, err
|
|
}
|
|
case model.TableCacheStatusSwitching:
|
|
// switching -> enable
|
|
tbInfo.TableCacheStatusType = model.TableCacheStatusEnable
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tbInfo, true)
|
|
if err != nil {
|
|
return ver, err
|
|
}
|
|
// Finish this job.
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
|
|
default:
|
|
job.State = model.JobStateCancelled
|
|
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("alter table cache", tbInfo.TableCacheStatusType.String())
|
|
}
|
|
return ver, err
|
|
}
|
|
|
|
func onAlterNoCacheTable(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
tbInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
// If the table is not in the cache state
|
|
if tbInfo.TableCacheStatusType == model.TableCacheStatusDisable {
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
|
|
return ver, nil
|
|
}
|
|
|
|
switch tbInfo.TableCacheStatusType {
|
|
case model.TableCacheStatusEnable:
|
|
// enable -> switching
|
|
tbInfo.TableCacheStatusType = model.TableCacheStatusSwitching
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tbInfo, true)
|
|
if err != nil {
|
|
return ver, err
|
|
}
|
|
case model.TableCacheStatusSwitching:
|
|
// switching -> disable
|
|
tbInfo.TableCacheStatusType = model.TableCacheStatusDisable
|
|
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tbInfo, true)
|
|
if err != nil {
|
|
return ver, err
|
|
}
|
|
// Finish this job.
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo)
|
|
default:
|
|
job.State = model.JobStateCancelled
|
|
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("alter table no cache", tbInfo.TableCacheStatusType.String())
|
|
}
|
|
return ver, err
|
|
}
|
|
|
|
func onRefreshMeta(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
_, err = model.GetRefreshMetaArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
// update schema version
|
|
ver, err = updateSchemaVersion(jobCtx, job)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
job.State = model.JobStateDone
|
|
job.SchemaState = model.StatePublic
|
|
return ver, nil
|
|
}
|
|
|
|
func onAlterTableAffinity(jobCtx *jobContext, job *model.Job) (ver int64, err error) {
|
|
args, err := model.GetAlterTableAffinityArgs(job)
|
|
if err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
|
|
tblInfo, err := GetTableInfoAndCancelFaultJob(jobCtx.metaMut, job, job.SchemaID)
|
|
if err != nil {
|
|
return 0, errors.Trace(err)
|
|
}
|
|
oldTblInfo := tblInfo.Clone()
|
|
|
|
if err = validateTableAffinity(tblInfo, args.Affinity); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return 0, errors.Trace(err)
|
|
}
|
|
tblInfo.Affinity = args.Affinity
|
|
|
|
// Create new affinity groups first (critical operation - must succeed)
|
|
if tblInfo.Affinity != nil {
|
|
if err = createTableAffinityGroupsInPD(jobCtx, tblInfo); err != nil {
|
|
job.State = model.JobStateCancelled
|
|
return ver, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
// Delete old affinity groups (best-effort cleanup - ignore errors)
|
|
// ALTER TABLE AFFINITY: only delete when old table had affinity configuration
|
|
// This ensures 'ALTER TABLE AFFINITY = 'none'' correctly cleans up stale affinity groups
|
|
// Skip deletion if the affinity level remains the same to ensure idempotency
|
|
if oldTblInfo.Affinity != nil {
|
|
// Only delete if affinity is removed or level changed (same level means same group IDs)
|
|
if tblInfo.Affinity == nil || oldTblInfo.Affinity.Level != tblInfo.Affinity.Level {
|
|
if err := deleteTableAffinityGroupsInPD(jobCtx, oldTblInfo, nil); err != nil {
|
|
logutil.DDLLogger().Error("failed to delete old affinity groups from PD", zap.Error(err), zap.Int64("tableID", oldTblInfo.ID))
|
|
}
|
|
}
|
|
}
|
|
|
|
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
|
|
if err != nil {
|
|
return ver, errors.Trace(err)
|
|
}
|
|
|
|
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
|
return ver, nil
|
|
}
|