ddl, parser: fix concurrent rename table (#33354)
close pingcap/tidb#32610
This commit is contained in:
@ -48,6 +48,7 @@ import (
|
||||
"github.com/pingcap/tidb/util/admin"
|
||||
"github.com/pingcap/tidb/util/gcutil"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -1991,3 +1992,122 @@ func (s *stateChangeSuite) TestRestrainDropColumnWithIndex() {
|
||||
tk.MustExec("alter table t drop column a;")
|
||||
tk.MustExec("drop table if exists t;")
|
||||
}
|
||||
|
||||
func TestParallelRenameTable(t *testing.T) {
|
||||
store, d, clean := testkit.CreateMockStoreAndDomain(t)
|
||||
defer clean()
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("create database test2")
|
||||
tk.MustExec("drop table if exists t")
|
||||
tk.MustExec("create table t(a int default 0, b int default 0, key idx((b+1)))")
|
||||
tk1 := testkit.NewTestKit(t, store)
|
||||
tk1.MustExec("use test")
|
||||
tk3 := testkit.NewTestKit(t, store)
|
||||
tk3.MustExec("use test")
|
||||
|
||||
var concurrentDDLQueryPre string
|
||||
var concurrentDDLQuery string
|
||||
firstDDL := true
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var checkErr error
|
||||
d2 := d.DDL()
|
||||
originalCallback := d2.GetHook()
|
||||
defer d2.SetHook(originalCallback)
|
||||
callback := &ddl.TestDDLCallback{Do: d}
|
||||
callback.OnJobRunBeforeExported = func(job *model.Job) {
|
||||
switch job.SchemaState {
|
||||
case model.StateNone:
|
||||
if firstDDL {
|
||||
firstDDL = false
|
||||
} else {
|
||||
return
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if concurrentDDLQueryPre != "" {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
// We assume that no error, we don't want to test it.
|
||||
tk3.MustExec(concurrentDDLQueryPre)
|
||||
wg.Done()
|
||||
}()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
_, err := tk1.Exec(concurrentDDLQuery)
|
||||
if err != nil {
|
||||
checkErr = err
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
d2.SetHook(callback)
|
||||
|
||||
// rename then add column
|
||||
concurrentDDLQuery = "alter table t add column g int"
|
||||
tk.MustExec("rename table t to t1")
|
||||
wg.Wait()
|
||||
require.Error(t, checkErr)
|
||||
require.True(t, strings.Contains(checkErr.Error(), "Table 'test.t' doesn't exist"), checkErr.Error())
|
||||
tk.MustExec("rename table t1 to t")
|
||||
checkErr = nil
|
||||
|
||||
// rename then add column, but rename to other database
|
||||
concurrentDDLQuery = "alter table t add column g int"
|
||||
firstDDL = true
|
||||
tk.MustExec("rename table t to test2.t1")
|
||||
wg.Wait()
|
||||
require.Error(t, checkErr)
|
||||
// [schema:1146]Table '(Schema ID 1).(Table ID 65)' doesn't exist
|
||||
require.True(t, strings.Contains(checkErr.Error(), "doesn't exist"), checkErr.Error())
|
||||
tk.MustExec("rename table test2.t1 to test.t")
|
||||
checkErr = nil
|
||||
|
||||
// rename then add column, but rename to other database and create same name table
|
||||
concurrentDDLQuery = "alter table t add column g int"
|
||||
firstDDL = true
|
||||
tk.MustExec("rename table t to test2.t1")
|
||||
concurrentDDLQueryPre = "create table t(a int)"
|
||||
wg.Wait()
|
||||
require.Error(t, checkErr)
|
||||
// [schema:1146]Table '(Schema ID 1).(Table ID 65)' doesn't exist
|
||||
require.True(t, strings.Contains(checkErr.Error(), "doesn't exist"), checkErr.Error())
|
||||
tk.MustExec("rename table test2.t1 to test.t")
|
||||
concurrentDDLQueryPre = ""
|
||||
checkErr = nil
|
||||
|
||||
// rename then rename
|
||||
concurrentDDLQuery = "rename table t to t2"
|
||||
firstDDL = true
|
||||
tk.MustExec("rename table t to t1")
|
||||
wg.Wait()
|
||||
require.Error(t, checkErr)
|
||||
require.True(t, strings.Contains(checkErr.Error(), "Table 'test.t' doesn't exist"), checkErr.Error())
|
||||
tk.MustExec("rename table t1 to t")
|
||||
checkErr = nil
|
||||
|
||||
// rename then rename, but rename to other database
|
||||
concurrentDDLQuery = "rename table t to t2"
|
||||
firstDDL = true
|
||||
tk.MustExec("rename table t to test2.t1")
|
||||
wg.Wait()
|
||||
require.Error(t, checkErr)
|
||||
require.True(t, strings.Contains(checkErr.Error(), "doesn't exist"), checkErr.Error())
|
||||
tk.MustExec("rename table test2.t1 to test.t")
|
||||
checkErr = nil
|
||||
|
||||
// renames then add index on one table
|
||||
tk.MustExec("create table t2(a int)")
|
||||
tk.MustExec("create table t3(a int)")
|
||||
concurrentDDLQuery = "alter table t add index(a)"
|
||||
firstDDL = true
|
||||
tk.MustExec("rename table t to tt, t2 to tt2, t3 to tt3")
|
||||
wg.Wait()
|
||||
require.Error(t, checkErr)
|
||||
require.True(t, strings.Contains(checkErr.Error(), "Table 'test.t' doesn't exist"), checkErr.Error())
|
||||
tk.MustExec("rename table tt to t")
|
||||
}
|
||||
|
||||
@ -412,6 +412,7 @@ func (d *ddl) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, place
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionAlterTablePlacement,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{placementPolicyRef},
|
||||
@ -2306,6 +2307,7 @@ func (d *ddl) createTableWithInfoJob(
|
||||
SchemaID: schema.ID,
|
||||
TableID: tbInfo.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tbInfo.Name.L,
|
||||
Type: actionType,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: args,
|
||||
@ -2559,6 +2561,8 @@ func (d *ddl) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo) (er
|
||||
SchemaID: schemaID,
|
||||
TableID: tbInfo.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tbInfo.Name.L,
|
||||
|
||||
Type: model.ActionRecoverTable,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{tbInfo, recoverInfo.AutoIDs.RowID, recoverInfo.DropJobID,
|
||||
@ -3284,6 +3288,7 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: actionType,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{newBase, force},
|
||||
@ -3335,6 +3340,7 @@ func (d *ddl) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal uint
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{uVal},
|
||||
}
|
||||
@ -3503,6 +3509,7 @@ func (d *ddl) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTab
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionAddColumn,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{col, spec.Position, 0},
|
||||
@ -3579,6 +3586,7 @@ func (d *ddl) AddColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alte
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionAddColumns,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{columns, positions, offsets, ifNotExists},
|
||||
@ -3640,6 +3648,7 @@ func (d *ddl) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, spec *
|
||||
SchemaID: schema.ID,
|
||||
TableID: meta.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionAddTablePartition,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{partInfo},
|
||||
@ -3735,6 +3744,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
|
||||
SchemaID: schema.ID,
|
||||
TableID: meta.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionTruncateTablePartition,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{pids},
|
||||
@ -3780,6 +3790,7 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
|
||||
SchemaID: schema.ID,
|
||||
TableID: meta.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: meta.Name.L,
|
||||
Type: model.ActionDropTablePartition,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{partNames},
|
||||
@ -3975,6 +3986,7 @@ func (d *ddl) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
|
||||
SchemaID: ntSchema.ID,
|
||||
TableID: ntMeta.ID,
|
||||
SchemaName: ntSchema.Name.L,
|
||||
TableName: ntMeta.Name.L,
|
||||
Type: model.ActionExchangeTablePartition,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{defID, ptSchema.ID, ptMeta.ID, partName, spec.WithValidation},
|
||||
@ -4016,6 +4028,7 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionDropColumn,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
MultiSchemaInfo: multiSchemaInfo,
|
||||
@ -4093,6 +4106,7 @@ func (d *ddl) DropColumns(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alt
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionDropColumns,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
MultiSchemaInfo: multiSchemaInfo,
|
||||
@ -4514,6 +4528,7 @@ func (d *ddl) getModifiableColumnJob(ctx context.Context, sctx sessionctx.Contex
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionModifyColumn,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
ReorgMeta: &model.DDLReorgMeta{
|
||||
@ -4757,6 +4772,7 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al
|
||||
SchemaID: schema.ID,
|
||||
TableID: tbl.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tbl.Meta().Name.L,
|
||||
Type: model.ActionModifyColumn,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
ReorgMeta: &model.DDLReorgMeta{
|
||||
@ -4848,6 +4864,7 @@ func (d *ddl) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionSetDefaultValue,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{col},
|
||||
@ -4875,6 +4892,7 @@ func (d *ddl) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, spec *a
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionModifyTableComment,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{spec.Comment},
|
||||
@ -4896,6 +4914,7 @@ func (d *ddl) AlterTableAutoIDCache(ctx sessionctx.Context, ident ast.Ident, new
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionModifyTableAutoIdCache,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{newCache},
|
||||
@ -4948,6 +4967,7 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionModifyTableCharsetAndCollate,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{toCharset, toCollate, needsOverwriteCols},
|
||||
@ -5005,6 +5025,7 @@ func (d *ddl) AlterTableSetTiFlashReplica(ctx sessionctx.Context, ident ast.Iden
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionSetTiFlashReplica,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{*replicaInfo},
|
||||
@ -5114,6 +5135,7 @@ func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int64, a
|
||||
SchemaID: db.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: db.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionUpdateTiFlashReplicaStatus,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{available, physicalID},
|
||||
@ -5219,6 +5241,7 @@ func (d *ddl) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *ast.Alt
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionRenameIndex,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{spec.FromKey, spec.ToKey},
|
||||
@ -5250,6 +5273,7 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionDropTable,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
}
|
||||
@ -5283,6 +5307,7 @@ func (d *ddl) DropView(ctx sessionctx.Context, ti ast.Ident) (err error) {
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionDropView,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
}
|
||||
@ -5313,6 +5338,7 @@ func (d *ddl) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error {
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionTruncateTable,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{newTableID},
|
||||
@ -5367,6 +5393,7 @@ func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident,
|
||||
SchemaID: schemas[1].ID,
|
||||
TableID: tableID,
|
||||
SchemaName: schemas[1].Name.L,
|
||||
TableName: oldIdent.Name.L,
|
||||
Type: model.ActionRenameTable,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{schemas[0].ID, newIdent.Name, schemas[0].Name},
|
||||
@ -5379,6 +5406,7 @@ func (d *ddl) RenameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Ident,
|
||||
|
||||
func (d *ddl) RenameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Ident, isAlterTable bool) error {
|
||||
is := d.GetInfoSchemaWithInterceptor(ctx)
|
||||
oldTableNames := make([]*model.CIStr, 0, len(oldIdents))
|
||||
tableNames := make([]*model.CIStr, 0, len(oldIdents))
|
||||
oldSchemaIDs := make([]int64, 0, len(oldIdents))
|
||||
newSchemaIDs := make([]int64, 0, len(oldIdents))
|
||||
@ -5403,6 +5431,7 @@ func (d *ddl) RenameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id
|
||||
}
|
||||
|
||||
tableIDs = append(tableIDs, tableID)
|
||||
oldTableNames = append(oldTableNames, &oldIdents[i].Name)
|
||||
tableNames = append(tableNames, &newIdents[i].Name)
|
||||
oldSchemaIDs = append(oldSchemaIDs, schemas[0].ID)
|
||||
newSchemaIDs = append(newSchemaIDs, schemas[1].ID)
|
||||
@ -5415,7 +5444,7 @@ func (d *ddl) RenameTables(ctx sessionctx.Context, oldIdents, newIdents []ast.Id
|
||||
SchemaName: schemas[1].Name.L,
|
||||
Type: model.ActionRenameTables,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames},
|
||||
Args: []interface{}{oldSchemaIDs, newSchemaIDs, tableNames, tableIDs, oldSchemaNames, oldTableNames},
|
||||
}
|
||||
|
||||
err = d.DoDDLJob(ctx, job)
|
||||
@ -5595,6 +5624,7 @@ func (d *ddl) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexName m
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionAddPrimaryKey,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
ReorgMeta: &model.DDLReorgMeta{
|
||||
@ -5785,6 +5815,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast.Inde
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionAddIndex,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
ReorgMeta: &model.DDLReorgMeta{
|
||||
@ -5912,6 +5943,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionAddForeignKey,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{fkInfo},
|
||||
@ -5938,6 +5970,7 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionDropForeignKey,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{fkName},
|
||||
@ -5993,6 +6026,7 @@ func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: jobTp,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{indexName},
|
||||
@ -6046,6 +6080,7 @@ func (d *ddl) DropIndexes(ctx sessionctx.Context, ti ast.Ident, specs []*ast.Alt
|
||||
SchemaID: schema.ID,
|
||||
TableID: t.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
Type: model.ActionDropIndexes,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{indexNames, ifExists},
|
||||
@ -6453,6 +6488,7 @@ func (d *ddl) RepairTable(ctx sessionctx.Context, table *ast.TableName, createSt
|
||||
SchemaID: oldDBInfo.ID,
|
||||
TableID: newTableInfo.ID,
|
||||
SchemaName: oldDBInfo.Name.L,
|
||||
TableName: newTableInfo.Name.L,
|
||||
Type: model.ActionRepairTable,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{newTableInfo},
|
||||
@ -6531,6 +6567,7 @@ func (d *ddl) AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt)
|
||||
SchemaID: db.ID,
|
||||
TableID: tbl.Meta().ID,
|
||||
SchemaName: db.Name.L,
|
||||
TableName: tbl.Meta().Name.L,
|
||||
Type: model.ActionAlterSequence,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{ident, stmt.SeqOptions},
|
||||
@ -6560,6 +6597,7 @@ func (d *ddl) DropSequence(ctx sessionctx.Context, ti ast.Ident, ifExists bool)
|
||||
SchemaID: schema.ID,
|
||||
TableID: tbl.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tbl.Meta().Name.L,
|
||||
Type: model.ActionDropSequence,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
}
|
||||
@ -6592,6 +6630,7 @@ func (d *ddl) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, inde
|
||||
SchemaID: schema.ID,
|
||||
TableID: tb.Meta().ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tb.Meta().Name.L,
|
||||
Type: model.ActionAlterIndexVisibility,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{indexName, invisible},
|
||||
@ -6621,6 +6660,7 @@ func (d *ddl) AlterTableAttributes(ctx sessionctx.Context, ident ast.Ident, spec
|
||||
SchemaID: schema.ID,
|
||||
TableID: meta.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: meta.Name.L,
|
||||
Type: model.ActionAlterTableAttributes,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{rule},
|
||||
@ -6662,6 +6702,7 @@ func (d *ddl) AlterTablePartitionAttributes(ctx sessionctx.Context, ident ast.Id
|
||||
SchemaID: schema.ID,
|
||||
TableID: meta.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: meta.Name.L,
|
||||
Type: model.ActionAlterTablePartitionAttributes,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{partitionID, rule},
|
||||
@ -6730,6 +6771,7 @@ func (d *ddl) AlterTablePartitionPlacement(ctx sessionctx.Context, tableIdent as
|
||||
SchemaID: schema.ID,
|
||||
TableID: tblInfo.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: tblInfo.Name.L,
|
||||
Type: model.ActionAlterTablePartitionPlacement,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{partitionID, policyRefInfo},
|
||||
@ -6996,6 +7038,7 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error)
|
||||
job := &model.Job{
|
||||
SchemaID: schema.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
TableID: t.Meta().ID,
|
||||
Type: model.ActionAlterCacheTable,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
@ -7052,6 +7095,7 @@ func (d *ddl) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err error
|
||||
job := &model.Job{
|
||||
SchemaID: schema.ID,
|
||||
SchemaName: schema.Name.L,
|
||||
TableName: t.Meta().Name.L,
|
||||
TableID: t.Meta().ID,
|
||||
Type: model.ActionAlterNoCacheTable,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
|
||||
@ -576,6 +576,11 @@ func GetTableInfoAndCancelFaultJob(t *meta.Meta, job *model.Job, schemaID int64)
|
||||
func checkTableExistAndCancelNonExistJob(t *meta.Meta, job *model.Job, schemaID int64) (*model.TableInfo, error) {
|
||||
tblInfo, err := getTableInfo(t, job.TableID, schemaID)
|
||||
if err == nil {
|
||||
// Check if table name is renamed.
|
||||
if job.TableName != "" && tblInfo.Name.L != job.TableName && job.Type != model.ActionRepairTable {
|
||||
job.State = model.JobStateCancelled
|
||||
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(job.SchemaName, job.TableName)
|
||||
}
|
||||
return tblInfo, nil
|
||||
}
|
||||
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
|
||||
@ -910,7 +915,8 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
|
||||
tableNames := []*model.CIStr{}
|
||||
tableIDs := []int64{}
|
||||
oldSchemaNames := []*model.CIStr{}
|
||||
if err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &tableNames, &tableIDs, &oldSchemaNames); err != nil {
|
||||
oldTableNames := []*model.CIStr{}
|
||||
if err := job.DecodeArgs(&oldSchemaIDs, &newSchemaIDs, &tableNames, &tableIDs, &oldSchemaNames, &oldTableNames); err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
@ -919,6 +925,7 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
|
||||
var err error
|
||||
for i, oldSchemaID := range oldSchemaIDs {
|
||||
job.TableID = tableIDs[i]
|
||||
job.TableName = oldTableNames[i].L
|
||||
ver, tblInfo, err := checkAndRenameTables(t, job, oldSchemaID, newSchemaIDs[i], oldSchemaNames[i], tableNames[i])
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
|
||||
@ -56,15 +56,11 @@ func testRenameTable(
|
||||
return job
|
||||
}
|
||||
|
||||
func testRenameTables(
|
||||
t *testing.T, ctx sessionctx.Context, d *ddl,
|
||||
oldSchemaIDs, newSchemaIDs []int64, newTableNames []*model.CIStr,
|
||||
oldTableIDs []int64, oldSchemaNames []*model.CIStr,
|
||||
) *model.Job {
|
||||
func testRenameTables(t *testing.T, ctx sessionctx.Context, d *ddl, oldSchemaIDs, newSchemaIDs []int64, newTableNames []*model.CIStr, oldTableIDs []int64, oldSchemaNames, oldTableNames []*model.CIStr) *model.Job {
|
||||
job := &model.Job{
|
||||
Type: model.ActionRenameTables,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames},
|
||||
Args: []interface{}{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames, oldTableNames},
|
||||
}
|
||||
ctx.SetValue(sessionctx.QueryString, "skip")
|
||||
require.NoError(t, d.DoDDLJob(ctx, job))
|
||||
@ -336,14 +332,7 @@ func TestRenameTables(t *testing.T) {
|
||||
newTblInfos = append(newTblInfos, tblInfo)
|
||||
}
|
||||
|
||||
job := testRenameTables(
|
||||
t, ctx, d,
|
||||
[]int64{dbInfo.ID, dbInfo.ID},
|
||||
[]int64{dbInfo.ID, dbInfo.ID},
|
||||
[]*model.CIStr{&newTblInfos[0].Name, &newTblInfos[1].Name},
|
||||
[]int64{tblInfos[0].ID, tblInfos[1].ID},
|
||||
[]*model.CIStr{&dbInfo.Name, &dbInfo.Name},
|
||||
)
|
||||
job := testRenameTables(t, ctx, d, []int64{dbInfo.ID, dbInfo.ID}, []int64{dbInfo.ID, dbInfo.ID}, []*model.CIStr{&newTblInfos[0].Name, &newTblInfos[1].Name}, []int64{tblInfos[0].ID, tblInfos[1].ID}, []*model.CIStr{&dbInfo.Name, &dbInfo.Name}, []*model.CIStr{&tblInfos[0].Name, &tblInfos[1].Name})
|
||||
|
||||
txn, _ := ctx.Txn(true)
|
||||
historyJob, _ := meta.NewMeta(txn).GetHistoryDDLJob(job.ID)
|
||||
|
||||
@ -265,6 +265,7 @@ type Job struct {
|
||||
SchemaID int64 `json:"schema_id"`
|
||||
TableID int64 `json:"table_id"`
|
||||
SchemaName string `json:"schema_name"`
|
||||
TableName string `json:"table_name"`
|
||||
State JobState `json:"state"`
|
||||
Error *terror.Error `json:"err"`
|
||||
// ErrorCount will be increased, every time we meet an error when running job.
|
||||
|
||||
Reference in New Issue
Block a user