ddl: change drop partition and truncate partition's job args to support multi partition id array (#18419)
This commit is contained in:
@ -2772,6 +2772,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
pids := []int64{pid}
|
||||
|
||||
job := &model.Job{
|
||||
SchemaID: schema.ID,
|
||||
@ -2779,7 +2780,7 @@ func (d *ddl) TruncateTablePartition(ctx sessionctx.Context, ident ast.Ident, sp
|
||||
SchemaName: schema.Name.L,
|
||||
Type: model.ActionTruncateTablePartition,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{pid},
|
||||
Args: []interface{}{pids},
|
||||
}
|
||||
|
||||
err = d.doDDLJob(ctx, job)
|
||||
@ -2811,7 +2812,8 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
|
||||
}
|
||||
|
||||
partName := spec.PartitionNames[0].L
|
||||
err = checkDropTablePartition(meta, partName)
|
||||
partNames := []string{partName}
|
||||
err = checkDropTablePartition(meta, partNames)
|
||||
if err != nil {
|
||||
if ErrDropPartitionNonExistent.Equal(err) && spec.IfExists {
|
||||
ctx.GetSessionVars().StmtCtx.AppendNote(err)
|
||||
@ -2826,7 +2828,7 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
|
||||
SchemaName: schema.Name.L,
|
||||
Type: model.ActionDropTablePartition,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{partName},
|
||||
Args: []interface{}{partNames},
|
||||
}
|
||||
|
||||
err = d.doDDLJob(ctx, job)
|
||||
|
||||
@ -832,7 +832,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
|
||||
|
||||
// test truncate table partition failed caused by canceled.
|
||||
test = &tests[24]
|
||||
truncateTblPartitionArgs := []interface{}{partitionTblInfo.Partition.Definitions[0].ID}
|
||||
truncateTblPartitionArgs := []interface{}{[]int64{partitionTblInfo.Partition.Definitions[0].ID}}
|
||||
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, partitionTblInfo.ID, test.act, truncateTblPartitionArgs, &test.cancelState)
|
||||
c.Check(checkErr, IsNil)
|
||||
changedTable = testGetTable(c, d, dbInfo.ID, partitionTblInfo.ID)
|
||||
|
||||
@ -289,13 +289,17 @@ func insertJobIntoDeleteRangeTable(ctx sessionctx.Context, job *model.Job) error
|
||||
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
|
||||
return doInsert(s, job.ID, tableID, startKey, endKey, now)
|
||||
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
|
||||
var physicalTableID int64
|
||||
if err := job.DecodeArgs(&physicalTableID); err != nil {
|
||||
var physicalTableIDs []int64
|
||||
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
|
||||
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
|
||||
return doInsert(s, job.ID, physicalTableID, startKey, endKey, now)
|
||||
for _, physicalTableID := range physicalTableIDs {
|
||||
startKey := tablecodec.EncodeTablePrefix(physicalTableID)
|
||||
endKey := tablecodec.EncodeTablePrefix(physicalTableID + 1)
|
||||
if err := doInsert(s, job.ID, physicalTableID, startKey, endKey, now); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
|
||||
case model.ActionAddIndex, model.ActionAddPrimaryKey:
|
||||
tableID := job.TableID
|
||||
|
||||
@ -576,38 +576,48 @@ func validRangePartitionType(col *model.ColumnInfo) bool {
|
||||
}
|
||||
|
||||
// checkDropTablePartition checks if the partition exists and does not allow deleting the last existing partition in the table.
|
||||
func checkDropTablePartition(meta *model.TableInfo, partName string) error {
|
||||
func checkDropTablePartition(meta *model.TableInfo, partLowerNames []string) error {
|
||||
pi := meta.Partition
|
||||
if pi.Type != model.PartitionTypeRange && pi.Type != model.PartitionTypeList {
|
||||
return errOnlyOnRangeListPartition.GenWithStackByArgs("DROP")
|
||||
}
|
||||
oldDefs := pi.Definitions
|
||||
for _, def := range oldDefs {
|
||||
if strings.EqualFold(def.Name.L, strings.ToLower(partName)) {
|
||||
if len(oldDefs) == 1 {
|
||||
return errors.Trace(ErrDropLastPartition)
|
||||
for _, pn := range partLowerNames {
|
||||
found := false
|
||||
for _, def := range oldDefs {
|
||||
if def.Name.L == pn {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if !found {
|
||||
return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(pn))
|
||||
}
|
||||
}
|
||||
return errors.Trace(ErrDropPartitionNonExistent.GenWithStackByArgs(partName))
|
||||
if len(oldDefs) == len(partLowerNames) {
|
||||
return errors.Trace(ErrDropLastPartition)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// removePartitionInfo each ddl job deletes a partition.
|
||||
func removePartitionInfo(tblInfo *model.TableInfo, partName string) int64 {
|
||||
func removePartitionInfo(tblInfo *model.TableInfo, partLowerNames []string) []int64 {
|
||||
oldDefs := tblInfo.Partition.Definitions
|
||||
newDefs := make([]model.PartitionDefinition, 0, len(oldDefs)-1)
|
||||
var pid int64
|
||||
for i := 0; i < len(oldDefs); i++ {
|
||||
if !strings.EqualFold(oldDefs[i].Name.L, strings.ToLower(partName)) {
|
||||
continue
|
||||
var pids []int64
|
||||
for _, partName := range partLowerNames {
|
||||
for i := 0; i < len(oldDefs); i++ {
|
||||
if oldDefs[i].Name.L != partName {
|
||||
continue
|
||||
}
|
||||
pids = append(pids, oldDefs[i].ID)
|
||||
newDefs = append(oldDefs[:i], oldDefs[i+1:]...)
|
||||
break
|
||||
}
|
||||
pid = oldDefs[i].ID
|
||||
newDefs = append(oldDefs[:i], oldDefs[i+1:]...)
|
||||
break
|
||||
}
|
||||
|
||||
tblInfo.Partition.Definitions = newDefs
|
||||
return pid
|
||||
return pids
|
||||
}
|
||||
|
||||
func getPartitionDef(tblInfo *model.TableInfo, partName string) (index int, def *model.PartitionDefinition, _ error) {
|
||||
@ -622,8 +632,8 @@ func getPartitionDef(tblInfo *model.TableInfo, partName string) (index int, def
|
||||
|
||||
// onDropTablePartition deletes old partition meta.
|
||||
func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
|
||||
var partName string
|
||||
if err := job.DecodeArgs(&partName); err != nil {
|
||||
var partNames []string
|
||||
if err := job.DecodeArgs(&partNames); err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
@ -632,12 +642,12 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
|
||||
err = checkDropTablePartition(tblInfo, partName)
|
||||
err = checkDropTablePartition(tblInfo, partNames)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
physicalTableID := removePartitionInfo(tblInfo, partName)
|
||||
physicalTableIDs := removePartitionInfo(tblInfo, partNames)
|
||||
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
@ -646,15 +656,15 @@ func onDropTablePartition(t *meta.Meta, job *model.Job) (ver int64, _ error) {
|
||||
// Finish this job.
|
||||
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
|
||||
// A background job will be created to delete old partition data.
|
||||
job.Args = []interface{}{physicalTableID}
|
||||
job.Args = []interface{}{physicalTableIDs}
|
||||
return ver, nil
|
||||
}
|
||||
|
||||
// onDropTablePartition truncates old partition meta.
|
||||
func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) {
|
||||
var ver int64
|
||||
var oldID int64
|
||||
if err := job.DecodeArgs(&oldID); err != nil {
|
||||
var oldIDs []int64
|
||||
if err := job.DecodeArgs(&oldIDs); err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
@ -667,20 +677,23 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
|
||||
return ver, errors.Trace(ErrPartitionMgmtOnNonpartitioned)
|
||||
}
|
||||
|
||||
var newPartition *model.PartitionDefinition
|
||||
for i := 0; i < len(pi.Definitions); i++ {
|
||||
def := &pi.Definitions[i]
|
||||
if def.ID == oldID {
|
||||
pid, err1 := t.GenGlobalID()
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err1)
|
||||
newPartitions := make([]model.PartitionDefinition, 0, len(oldIDs))
|
||||
for _, oldID := range oldIDs {
|
||||
for i := 0; i < len(pi.Definitions); i++ {
|
||||
def := &pi.Definitions[i]
|
||||
if def.ID == oldID {
|
||||
pid, err1 := t.GenGlobalID()
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err1)
|
||||
}
|
||||
def.ID = pid
|
||||
// Shallow copy only use the def.ID in event handle.
|
||||
newPartitions = append(newPartitions, *def)
|
||||
break
|
||||
}
|
||||
def.ID = pid
|
||||
newPartition = def
|
||||
break
|
||||
}
|
||||
}
|
||||
if newPartition == nil {
|
||||
if len(newPartitions) == 0 {
|
||||
return ver, table.ErrUnknownPartition.GenWithStackByArgs("drop?", tblInfo.Name.O)
|
||||
}
|
||||
|
||||
@ -688,12 +701,14 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
|
||||
if tblInfo.TiFlashReplica != nil {
|
||||
tblInfo.TiFlashReplica.Available = false
|
||||
// Set partition replica become unavailable.
|
||||
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
|
||||
if id == oldID {
|
||||
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
|
||||
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
|
||||
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
|
||||
break
|
||||
for _, oldID := range oldIDs {
|
||||
for i, id := range tblInfo.TiFlashReplica.AvailablePartitionIDs {
|
||||
if id == oldID {
|
||||
newIDs := tblInfo.TiFlashReplica.AvailablePartitionIDs[:i]
|
||||
newIDs = append(newIDs, tblInfo.TiFlashReplica.AvailablePartitionIDs[i+1:]...)
|
||||
tblInfo.TiFlashReplica.AvailablePartitionIDs = newIDs
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -705,9 +720,9 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
|
||||
|
||||
// Finish this job.
|
||||
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
|
||||
asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: []model.PartitionDefinition{*newPartition}}})
|
||||
asyncNotifyEvent(d, &util.Event{Tp: model.ActionTruncateTablePartition, TableInfo: tblInfo, PartInfo: &model.PartitionInfo{Definitions: newPartitions}})
|
||||
// A background job will be created to delete old partition data.
|
||||
job.Args = []interface{}{oldID}
|
||||
job.Args = []interface{}{oldIDs}
|
||||
return ver, nil
|
||||
}
|
||||
|
||||
|
||||
154
ddl/partition_test.go
Normal file
154
ddl/partition_test.go
Normal file
@ -0,0 +1,154 @@
|
||||
// Copyright 2020 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package ddl
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/types"
|
||||
)
|
||||
|
||||
var _ = Suite(&testPartitionSuite{})
|
||||
|
||||
type testPartitionSuite struct {
|
||||
store kv.Storage
|
||||
}
|
||||
|
||||
func (s *testPartitionSuite) SetUpSuite(c *C) {
|
||||
s.store = testCreateStore(c, "test_store")
|
||||
}
|
||||
|
||||
func (s *testPartitionSuite) TearDownSuite(c *C) {
|
||||
err := s.store.Close()
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
func (s *testPartitionSuite) TestDropAndTruncatePartition(c *C) {
|
||||
d := testNewDDLAndStart(
|
||||
context.Background(),
|
||||
c,
|
||||
WithStore(s.store),
|
||||
WithLease(testLease),
|
||||
)
|
||||
defer d.Stop()
|
||||
dbInfo := testSchemaInfo(c, d, "test_partition")
|
||||
testCreateSchema(c, testNewContext(d), d, dbInfo)
|
||||
// generate 5 partition in tableInfo.
|
||||
tblInfo, partIDs := buildTableInfoWithPartition(c, d)
|
||||
ctx := testNewContext(d)
|
||||
testCreateTable(c, ctx, d, dbInfo, tblInfo)
|
||||
|
||||
testDropPartition(c, ctx, d, dbInfo, tblInfo, []string{"p0", "p1"})
|
||||
|
||||
testTruncatePartition(c, ctx, d, dbInfo, tblInfo, []int64{partIDs[3], partIDs[4]})
|
||||
}
|
||||
|
||||
func buildTableInfoWithPartition(c *C, d *ddl) (*model.TableInfo, []int64) {
|
||||
tbl := &model.TableInfo{
|
||||
Name: model.NewCIStr("t"),
|
||||
}
|
||||
col := &model.ColumnInfo{
|
||||
Name: model.NewCIStr("c"),
|
||||
Offset: 1,
|
||||
State: model.StatePublic,
|
||||
FieldType: *types.NewFieldType(mysql.TypeLong),
|
||||
ID: allocateColumnID(tbl),
|
||||
}
|
||||
genIDs, err := d.genGlobalIDs(1)
|
||||
c.Assert(err, IsNil)
|
||||
tbl.ID = genIDs[0]
|
||||
tbl.Columns = []*model.ColumnInfo{col}
|
||||
tbl.Charset = "utf8"
|
||||
tbl.Collate = "utf8_bin"
|
||||
|
||||
partIDs, err := d.genGlobalIDs(5)
|
||||
c.Assert(err, IsNil)
|
||||
partInfo := &model.PartitionInfo{
|
||||
Type: model.PartitionTypeRange,
|
||||
Expr: tbl.Columns[0].Name.L,
|
||||
Enable: true,
|
||||
Definitions: []model.PartitionDefinition{
|
||||
{
|
||||
ID: partIDs[0],
|
||||
Name: model.NewCIStr("p0"),
|
||||
LessThan: []string{"100"},
|
||||
},
|
||||
{
|
||||
ID: partIDs[1],
|
||||
Name: model.NewCIStr("p1"),
|
||||
LessThan: []string{"200"},
|
||||
},
|
||||
{
|
||||
ID: partIDs[2],
|
||||
Name: model.NewCIStr("p2"),
|
||||
LessThan: []string{"300"},
|
||||
},
|
||||
{
|
||||
ID: partIDs[3],
|
||||
Name: model.NewCIStr("p3"),
|
||||
LessThan: []string{"400"},
|
||||
},
|
||||
{
|
||||
ID: partIDs[4],
|
||||
Name: model.NewCIStr("p4"),
|
||||
LessThan: []string{"500"},
|
||||
},
|
||||
},
|
||||
}
|
||||
tbl.Partition = partInfo
|
||||
return tbl, partIDs
|
||||
}
|
||||
|
||||
func buildDropPartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job {
|
||||
return &model.Job{
|
||||
SchemaID: dbInfo.ID,
|
||||
TableID: tblInfo.ID,
|
||||
Type: model.ActionDropTablePartition,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{partNames},
|
||||
}
|
||||
}
|
||||
|
||||
func testDropPartition(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, partNames []string) *model.Job {
|
||||
job := buildDropPartitionJob(dbInfo, tblInfo, partNames)
|
||||
err := d.doDDLJob(ctx, job)
|
||||
c.Assert(err, IsNil)
|
||||
v := getSchemaVer(c, ctx)
|
||||
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
|
||||
return job
|
||||
}
|
||||
|
||||
func buildTruncatePartitionJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job {
|
||||
return &model.Job{
|
||||
SchemaID: dbInfo.ID,
|
||||
TableID: tblInfo.ID,
|
||||
Type: model.ActionTruncateTablePartition,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{pids},
|
||||
}
|
||||
}
|
||||
|
||||
func testTruncatePartition(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, pids []int64) *model.Job {
|
||||
job := buildTruncatePartitionJob(dbInfo, tblInfo, pids)
|
||||
err := d.doDDLJob(ctx, job)
|
||||
c.Assert(err, IsNil)
|
||||
v := getSchemaVer(c, ctx)
|
||||
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
|
||||
return job
|
||||
}
|
||||
Reference in New Issue
Block a user