Files
tidb/pkg/ddl/schema_version.go

465 lines
17 KiB
Go

// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"context"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/metadef"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/intest"
tikverr "github.com/tikv/client-go/v2/error"
"go.uber.org/zap"
)
// SetSchemaDiffForCreateTables set SchemaDiff for ActionCreateTables.
func SetSchemaDiffForCreateTables(diff *model.SchemaDiff, job *model.Job) error {
args, err := model.GetBatchCreateTableArgs(job)
if err != nil {
return errors.Trace(err)
}
diff.AffectedOpts = make([]*model.AffectedOption, len(args.Tables))
for i := range args.Tables {
tblInfo := args.Tables[i].TableInfo
diff.AffectedOpts[i] = &model.AffectedOption{
SchemaID: job.SchemaID,
OldSchemaID: job.SchemaID,
TableID: tblInfo.ID,
OldTableID: tblInfo.ID,
}
}
return nil
}
// SetSchemaDiffForTruncateTable set SchemaDiff for ActionTruncateTable.
func SetSchemaDiffForTruncateTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) error {
// Truncate table has two table ID, should be handled differently.
args := jobCtx.jobArgs.(*model.TruncateTableArgs)
diff.TableID = args.NewTableID
diff.OldTableID = job.TableID
// affects are used to update placement rule cache
if len(args.OldPartIDsWithPolicy) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.OldPartIDsWithPolicy, args.NewPartIDsWithPolicy)
}
return nil
}
// SetSchemaDiffForCreateView set SchemaDiff for ActionCreateView.
func SetSchemaDiffForCreateView(diff *model.SchemaDiff, job *model.Job) error {
args, err := model.GetCreateTableArgs(job)
if err != nil {
return errors.Trace(err)
}
tbInfo, orReplace, oldTbInfoID := args.TableInfo, args.OnExistReplace, args.OldViewTblID
// When the statement is "create or replace view " and we need to drop the old view,
// it has two table IDs and should be handled differently.
if oldTbInfoID > 0 && orReplace {
diff.OldTableID = oldTbInfoID
}
diff.TableID = tbInfo.ID
return nil
}
// SetSchemaDiffForRenameTable set SchemaDiff for ActionRenameTable.
func SetSchemaDiffForRenameTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) error {
args := jobCtx.jobArgs.(*model.RenameTableArgs)
adjustedOldSchemaID := args.OldSchemaID
if args.OldSchemaIDForSchemaDiff > 0 {
adjustedOldSchemaID = args.OldSchemaIDForSchemaDiff
}
diff.OldSchemaID = adjustedOldSchemaID
diff.TableID = job.TableID
return nil
}
// SetSchemaDiffForRenameTables set SchemaDiff for ActionRenameTables.
func SetSchemaDiffForRenameTables(diff *model.SchemaDiff, _ *model.Job, jobCtx *jobContext) error {
args := jobCtx.jobArgs.(*model.RenameTablesArgs)
affects := make([]*model.AffectedOption, len(args.RenameTableInfos)-1)
for i, info := range args.RenameTableInfos {
// Do not add the first table to AffectedOpts. Related issue tidb#47064.
if i == 0 {
continue
}
adjustedOldSchemaID := info.OldSchemaID
if info.OldSchemaIDForSchemaDiff > 0 {
adjustedOldSchemaID = info.OldSchemaIDForSchemaDiff
}
affects[i-1] = &model.AffectedOption{
SchemaID: info.NewSchemaID,
TableID: info.TableID,
OldTableID: info.TableID,
OldSchemaID: adjustedOldSchemaID,
}
}
adjustedOldSchemaID := args.RenameTableInfos[0].OldSchemaID
if args.RenameTableInfos[0].OldSchemaIDForSchemaDiff > 0 {
adjustedOldSchemaID = args.RenameTableInfos[0].OldSchemaIDForSchemaDiff
}
diff.TableID = args.RenameTableInfos[0].TableID
diff.SchemaID = args.RenameTableInfos[0].NewSchemaID
diff.OldSchemaID = adjustedOldSchemaID
diff.AffectedOpts = affects
return nil
}
// SetSchemaDiffForExchangeTablePartition set SchemaDiff for ActionExchangeTablePartition.
func SetSchemaDiffForExchangeTablePartition(diff *model.SchemaDiff, job *model.Job, multiInfos ...schemaIDAndTableInfo) error {
// From start of function: diff.SchemaID = job.SchemaID
// Old is original non partitioned table
diff.OldTableID = job.TableID
diff.OldSchemaID = job.SchemaID
// Update the partitioned table (it is only done in the last state)
// See ddl.ExchangeTablePartition
args, err := model.GetExchangeTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
// This is needed for not crashing TiFlash!
// TODO: Update TiFlash, to handle StateWriteOnly
diff.AffectedOpts = []*model.AffectedOption{{
TableID: args.PTTableID,
}}
// when job state transit from rolling-back to rollback-done, the schema state
// is also public, diff.TableID should be the old non-partitioned table ID too.
if job.State == model.JobStateRollbackDone || job.SchemaState != model.StatePublic {
// No change, just to refresh the non-partitioned table
// with its new ExchangePartitionInfo.
diff.TableID = job.TableID
// Keep this as Schema ID of non-partitioned table
// to avoid trigger early rename in TiFlash
diff.AffectedOpts[0].SchemaID = job.SchemaID
// Need reload partition table, use diff.AffectedOpts[0].OldSchemaID to mark it.
if len(multiInfos) > 0 {
diff.AffectedOpts[0].OldSchemaID = args.PTSchemaID
}
} else {
// Swap
diff.TableID = args.PartitionID
// Also add correct SchemaID in case different schemas
diff.AffectedOpts[0].SchemaID = args.PTSchemaID
}
return nil
}
// SetSchemaDiffForTruncateTablePartition set SchemaDiff for ActionTruncateTablePartition.
func SetSchemaDiffForTruncateTablePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
diff.TableID = job.TableID
args := jobCtx.jobArgs.(*model.TruncateTableArgs)
if args.ShouldUpdateAffectedPartitions {
diff.AffectedOpts = buildPlacementAffects(args.OldPartitionIDs, args.NewPartitionIDs)
}
}
// SetSchemaDiffForDropTable set SchemaDiff for ActionDropTable.
func SetSchemaDiffForDropTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
// affects are used to update placement rule cache
diff.TableID = job.TableID
args := jobCtx.jobArgs.(*model.DropTableArgs)
if len(args.OldPartitionIDs) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.OldPartitionIDs, args.OldPartitionIDs)
}
}
// SetSchemaDiffForDropTablePartition set SchemaDiff for ActionDropTablePartition.
func SetSchemaDiffForDropTablePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
// affects are used to update placement rule cache
diff.TableID = job.TableID
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
if len(args.OldPhysicalTblIDs) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.OldPhysicalTblIDs, args.OldPhysicalTblIDs)
}
}
// SetSchemaDiffForRecoverTable set SchemaDiff for ActionRecoverTable.
func SetSchemaDiffForRecoverTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
// affects are used to update placement rule cache
diff.TableID = job.TableID
args := jobCtx.jobArgs.(*model.RecoverArgs)
if len(args.AffectedPhysicalIDs) > 0 {
diff.AffectedOpts = buildPlacementAffects(args.AffectedPhysicalIDs, args.AffectedPhysicalIDs)
}
}
// SetSchemaDiffForReorganizePartition set SchemaDiff for ActionReorganizePartition.
func SetSchemaDiffForReorganizePartition(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
diff.TableID = job.TableID
// TODO: should this be for every state of Reorganize?
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
droppedIDs, addedIDs := args.OldPhysicalTblIDs, args.NewPartitionIDs
if len(addedIDs) > 0 {
// to use AffectedOpts we need both new and old to have the same length
maxParts := max(len(droppedIDs), len(addedIDs))
// Also initialize them to 0!
oldIDs := make([]int64, maxParts)
copy(oldIDs, droppedIDs)
newIDs := make([]int64, maxParts)
copy(newIDs, addedIDs)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
}
// SetSchemaDiffForPartitionModify set SchemaDiff for ActionRemovePartitioning, ActionAlterTablePartitioning.
func SetSchemaDiffForPartitionModify(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) {
diff.TableID = job.TableID
diff.OldTableID = job.TableID
if job.SchemaState == model.StateNone {
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
partInfo := args.PartInfo
// Final part, new table id is assigned
diff.TableID = partInfo.NewTableID
droppedIDs, addedIDs := args.OldPhysicalTblIDs, args.NewPartitionIDs
if len(addedIDs) > 0 {
// to use AffectedOpts we need both new and old to have the same length
maxParts := max(len(droppedIDs), len(addedIDs))
// Also initialize them to 0!
oldIDs := make([]int64, maxParts)
copy(oldIDs, droppedIDs)
newIDs := make([]int64, maxParts)
copy(newIDs, addedIDs)
diff.AffectedOpts = buildPlacementAffects(oldIDs, newIDs)
}
}
}
// SetSchemaDiffForCreateTable set SchemaDiff for ActionCreateTable.
func SetSchemaDiffForCreateTable(diff *model.SchemaDiff, job *model.Job, jobCtx *jobContext) error {
diff.TableID = job.TableID
tbInfo := jobCtx.jobArgs.(*model.CreateTableArgs).TableInfo
// When create table with foreign key, there are two schema status change:
// 1. none -> write-only
// 2. write-only -> public
// In the second status change write-only -> public, infoschema loader should apply drop old table first, then
// apply create new table. So need to set diff.OldTableID here to make sure it.
if tbInfo.State == model.StatePublic && len(tbInfo.ForeignKeys) > 0 {
diff.OldTableID = job.TableID
}
return nil
}
// SetSchemaDiffForRecoverSchema set SchemaDiff for ActionRecoverSchema.
func SetSchemaDiffForRecoverSchema(diff *model.SchemaDiff, job *model.Job) error {
args, err := model.GetRecoverArgs(job)
if err != nil {
return errors.Trace(err)
}
recoverTabsInfo := args.RecoverTableInfos()
diff.AffectedOpts = make([]*model.AffectedOption, len(recoverTabsInfo))
for i := range recoverTabsInfo {
diff.AffectedOpts[i] = &model.AffectedOption{
SchemaID: job.SchemaID,
OldSchemaID: job.SchemaID,
TableID: recoverTabsInfo[i].TableInfo.ID,
OldTableID: recoverTabsInfo[i].TableInfo.ID,
}
}
diff.ReadTableFromMeta = true
return nil
}
// SetSchemaDiffForFlashbackCluster set SchemaDiff for ActionFlashbackCluster.
func SetSchemaDiffForFlashbackCluster(diff *model.SchemaDiff, job *model.Job) {
diff.TableID = -1
if job.SchemaState == model.StatePublic {
diff.RegenerateSchemaMap = true
}
}
// SetSchemaDiffForMultiInfos set SchemaDiff for multiInfos.
func SetSchemaDiffForMultiInfos(diff *model.SchemaDiff, multiInfos ...schemaIDAndTableInfo) {
if len(multiInfos) > 0 {
existsMap := make(map[int64]struct{})
existsMap[diff.TableID] = struct{}{}
for _, affect := range diff.AffectedOpts {
existsMap[affect.TableID] = struct{}{}
}
for _, info := range multiInfos {
_, exist := existsMap[info.tblInfo.ID]
if exist {
continue
}
existsMap[info.tblInfo.ID] = struct{}{}
diff.AffectedOpts = append(diff.AffectedOpts, &model.AffectedOption{
SchemaID: info.schemaID,
OldSchemaID: info.schemaID,
TableID: info.tblInfo.ID,
OldTableID: info.tblInfo.ID,
})
}
}
}
// updateSchemaVersion increments the schema version by 1 and sets SchemaDiff.
func updateSchemaVersion(jobCtx *jobContext, job *model.Job, multiInfos ...schemaIDAndTableInfo) (int64, error) {
schemaVersion, err := jobCtx.setSchemaVersion(jobCtx, job)
if err != nil {
return 0, errors.Trace(err)
}
diff := &model.SchemaDiff{
Version: schemaVersion,
Type: job.Type,
SchemaID: job.SchemaID,
}
switch job.Type {
case model.ActionCreateTables:
err = SetSchemaDiffForCreateTables(diff, job)
case model.ActionTruncateTable:
err = SetSchemaDiffForTruncateTable(diff, job, jobCtx)
case model.ActionCreateView:
err = SetSchemaDiffForCreateView(diff, job)
case model.ActionRenameTable:
err = SetSchemaDiffForRenameTable(diff, job, jobCtx)
case model.ActionRenameTables:
err = SetSchemaDiffForRenameTables(diff, job, jobCtx)
case model.ActionExchangeTablePartition:
err = SetSchemaDiffForExchangeTablePartition(diff, job, multiInfos...)
case model.ActionTruncateTablePartition:
SetSchemaDiffForTruncateTablePartition(diff, job, jobCtx)
case model.ActionDropTablePartition:
SetSchemaDiffForDropTablePartition(diff, job, jobCtx)
case model.ActionRecoverTable:
SetSchemaDiffForRecoverTable(diff, job, jobCtx)
case model.ActionDropTable:
SetSchemaDiffForDropTable(diff, job, jobCtx)
case model.ActionReorganizePartition:
SetSchemaDiffForReorganizePartition(diff, job, jobCtx)
case model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
SetSchemaDiffForPartitionModify(diff, job, jobCtx)
case model.ActionCreateTable:
err = SetSchemaDiffForCreateTable(diff, job, jobCtx)
case model.ActionRecoverSchema:
err = SetSchemaDiffForRecoverSchema(diff, job)
case model.ActionFlashbackCluster:
SetSchemaDiffForFlashbackCluster(diff, job)
default:
diff.TableID = job.TableID
}
if err != nil {
return 0, err
}
SetSchemaDiffForMultiInfos(diff, multiInfos...)
err = jobCtx.metaMut.SetSchemaDiff(diff)
return schemaVersion, errors.Trace(err)
}
func waitVersionSynced(
ctx context.Context,
jobCtx *jobContext,
job *model.Job,
latestSchemaVersion int64,
) (err error) {
failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
}
mockDDLErrOnce = -1
}
})
timeStart := time.Now()
defer func() {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.DDLWaitSchemaSynced, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(timeStart).Seconds())
}()
checkAssumedSvr := shouldCheckAssumedServer(job)
logger := logutil.DDLLogger().With(zap.Int64("ver", latestSchemaVersion), zap.Bool("checkAssumedSvr", checkAssumedSvr))
// WaitVersionSynced returns only when all TiDB schemas are synced(exclude the isolated TiDB).
sum, err := jobCtx.schemaVerSyncer.WaitVersionSynced(ctx, job.ID, latestSchemaVersion, checkAssumedSvr)
if err != nil {
logger.Info("wait schema version synced encounter error",
zap.Int64("jobID", job.ID), zap.Duration("take time", time.Since(timeStart)), zap.Error(err))
return err
}
failpoint.InjectCall("afterWaitVersionSynced", sum)
logger.Info("wait schema version synced success",
zap.Duration("take time", time.Since(timeStart)),
zap.String("job", job.String()), zap.Stringer("summary", sum))
return nil
}
// assumedServer is the virtual server that involved in the online schema change
// of some keyspace, it's run on a real server with different keyspace, it's only
// used in cross keyspace scenario.
func shouldCheckAssumedServer(job *model.Job) bool {
if kerneltype.IsClassic() {
return false
}
// check the table ID is enough, as we forbid doing DDL which involve multiple
// table IDs on system tables in nextgen, such as RenameTables, TruncateTable,
// ExchangePartition, etc.
return metadef.IsReservedID(job.TableID)
}
// waitVersionSyncedWithoutMDL handles the following situation:
// If the job enters a new state, and the worker crash when it's in the process of
// version sync, then the worker restarts quickly, we may run the job immediately again,
// but schema version might not sync.
// So here we get the latest schema version to make sure all servers' schema version
// update to the latest schema version in a cluster.
func waitVersionSyncedWithoutMDL(ctx context.Context, jobCtx *jobContext, job *model.Job) error {
if !job.IsRunning() && !job.IsRollingback() && !job.IsDone() && !job.IsRollbackDone() {
return nil
}
ver, err := jobCtx.store.CurrentVersion(kv.GlobalTxnScope)
failpoint.Inject("mockGetCurrentVersionFailed", func(val failpoint.Value) {
if val.(bool) {
// ref: https://github.com/tikv/client-go/blob/master/tikv/kv.go#L505-L532
ver, err = kv.NewVersion(0), tikverr.NewErrPDServerTimeout("mock PD timeout")
}
})
// If we failed to get the current version, caller will retry after one second again.
if err != nil {
logutil.DDLLogger().Warn("get current version failed", zap.Int64("jobID", job.ID), zap.Error(err))
return err
}
snapshot := jobCtx.store.GetSnapshot(ver)
m := meta.NewReader(snapshot)
latestSchemaVersion, err := m.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
logutil.DDLLogger().Warn("get global version failed", zap.Int64("jobID", job.ID), zap.Error(err))
return err
}
// Try adding guard for schema version in test
if intest.InTest {
intest.Assert(latestSchemaVersion > 0, "latestSchemaVersion should be greater than 0")
}
failpoint.Inject("checkDownBeforeUpdateGlobalVersion", func(val failpoint.Value) {
if val.(bool) {
if mockDDLErrOnce > 0 && mockDDLErrOnce != latestSchemaVersion {
panic("check down before update global version failed")
}
mockDDLErrOnce = -1
}
})
return updateGlobalVersionAndWaitSynced(ctx, jobCtx, latestSchemaVersion, job)
}