Files
tidb/pkg/ddl/delete_range.go

565 lines
19 KiB
Go

// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"context"
"encoding/hex"
"math"
"strings"
"sync"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/tablecodec"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"go.uber.org/zap"
)
const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = `(%?, %?, %?, %?, %?)`
delBatchSize = 65536
delBackLog = 128
)
// Only used in the BR unit test. Once these const variables modified, please make sure compatible with BR.
const (
BRInsertDeleteRangeSQLPrefix = insertDeleteRangeSQLPrefix
BRInsertDeleteRangeSQLValue = insertDeleteRangeSQLValue
)
var (
// batchInsertDeleteRangeSize is the maximum size for each batch insert statement in the delete-range.
batchInsertDeleteRangeSize = 256
)
type delRangeManager interface {
// addDelRangeJob add a DDL job into gc_delete_range table.
addDelRangeJob(ctx context.Context, job *model.Job) error
// removeFromGCDeleteRange removes the deleting table job from gc_delete_range table by jobID and tableID.
// It's use for recover the table that was mistakenly deleted.
removeFromGCDeleteRange(ctx context.Context, jobID int64) error
start()
clear()
}
type delRange struct {
store kv.Storage
sessPool *sess.Pool
emulatorCh chan struct{}
keys []kv.Key
quitCh chan struct{}
wait sync.WaitGroup // wait is only used when storeSupport is false.
storeSupport bool
}
// newDelRangeManager returns a delRangeManager.
func newDelRangeManager(store kv.Storage, sessPool *sess.Pool) delRangeManager {
dr := &delRange{
store: store,
sessPool: sessPool,
storeSupport: store.SupportDeleteRange(),
quitCh: make(chan struct{}),
}
if !dr.storeSupport {
dr.emulatorCh = make(chan struct{}, delBackLog)
dr.keys = make([]kv.Key, 0, delBatchSize)
}
return dr
}
// addDelRangeJob implements delRangeManager interface.
func (dr *delRange) addDelRangeJob(ctx context.Context, job *model.Job) error {
sctx, err := dr.sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer dr.sessPool.Put(sctx)
// The same Job ID uses the same element ID allocator
wrapper := newDelRangeExecWrapper(sctx)
err = AddDelRangeJobInternal(ctx, wrapper, job)
if err != nil {
logutil.DDLLogger().Error("add job into delete-range table failed", zap.Int64("jobID", job.ID), zap.String("jobType", job.Type.String()), zap.Error(err))
return errors.Trace(err)
}
if !dr.storeSupport {
dr.emulatorCh <- struct{}{}
}
logutil.DDLLogger().Info("add job into delete-range table", zap.Int64("jobID", job.ID), zap.String("jobType", job.Type.String()))
return nil
}
// AddDelRangeJobInternal implements the generation the delete ranges for the provided job and consumes the delete ranges through delRangeExecWrapper.
func AddDelRangeJobInternal(ctx context.Context, wrapper DelRangeExecWrapper, job *model.Job) error {
var err error
var ea elementIDAlloc
if job.MultiSchemaInfo != nil {
err = insertJobIntoDeleteRangeTableMultiSchema(ctx, wrapper, job, &ea)
} else {
err = insertJobIntoDeleteRangeTable(ctx, wrapper, job, &ea)
}
return errors.Trace(err)
}
func insertJobIntoDeleteRangeTableMultiSchema(ctx context.Context, wrapper DelRangeExecWrapper, job *model.Job, ea *elementIDAlloc) error {
for i, sub := range job.MultiSchemaInfo.SubJobs {
proxyJob := sub.ToProxyJob(job, i)
if JobNeedGC(&proxyJob) {
err := insertJobIntoDeleteRangeTable(ctx, wrapper, &proxyJob, ea)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
}
// removeFromGCDeleteRange implements delRangeManager interface.
func (dr *delRange) removeFromGCDeleteRange(ctx context.Context, jobID int64) error {
sctx, err := dr.sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer dr.sessPool.Put(sctx)
err = util.RemoveMultiFromGCDeleteRange(ctx, sctx, jobID)
return errors.Trace(err)
}
// start implements delRangeManager interface.
func (dr *delRange) start() {
if !dr.storeSupport {
dr.wait.Add(1)
go dr.startEmulator()
}
}
// clear implements delRangeManager interface.
func (dr *delRange) clear() {
logutil.DDLLogger().Info("closing delRange")
close(dr.quitCh)
dr.wait.Wait()
}
// startEmulator is only used for those storage engines which don't support
// delete-range. The emulator fetches records from gc_delete_range table and
// deletes all keys in each DelRangeTask.
func (dr *delRange) startEmulator() {
defer dr.wait.Done()
logutil.DDLLogger().Info("start delRange emulator")
for {
select {
case <-dr.emulatorCh:
case <-dr.quitCh:
return
}
if util.IsEmulatorGCEnable() {
err := dr.doDelRangeWork()
terror.Log(errors.Trace(err))
}
}
}
func (dr *delRange) doDelRangeWork() error {
sctx, err := dr.sessPool.Get()
if err != nil {
logutil.DDLLogger().Error("delRange emulator get session failed", zap.Error(err))
return errors.Trace(err)
}
defer dr.sessPool.Put(sctx)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
ranges, err := util.LoadDeleteRanges(ctx, sctx, math.MaxInt64)
if err != nil {
logutil.DDLLogger().Error("delRange emulator load tasks failed", zap.Error(err))
return errors.Trace(err)
}
for _, r := range ranges {
if err := dr.doTask(sctx, r); err != nil {
logutil.DDLLogger().Error("delRange emulator do task failed", zap.Error(err))
return errors.Trace(err)
}
}
return nil
}
func (dr *delRange) doTask(sctx sessionctx.Context, r util.DelRangeTask) error {
var oldStartKey, newStartKey kv.Key
oldStartKey = r.StartKey
for {
finish := true
dr.keys = dr.keys[:0]
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, dr.store, false, func(_ context.Context, txn kv.Transaction) error {
if topsqlstate.TopSQLEnabled() {
// Only when TiDB run without PD(use unistore as storage for test) will run into here, so just set a mock internal resource tagger.
txn.SetOption(kv.ResourceGroupTagger, util.GetInternalResourceGroupTaggerForTopSQL())
}
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
}
defer iter.Close()
txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
for range delBatchSize {
if !iter.Valid() {
break
}
finish = false
dr.keys = append(dr.keys, iter.Key().Clone())
newStartKey = iter.Key().Next()
if err := iter.Next(); err != nil {
return errors.Trace(err)
}
}
for _, key := range dr.keys {
err := txn.Delete(key)
if err != nil && !kv.ErrNotExist.Equal(err) {
return errors.Trace(err)
}
}
return nil
})
if err != nil {
return errors.Trace(err)
}
if finish {
if err := util.CompleteDeleteRange(sctx, r, true); err != nil {
logutil.DDLLogger().Error("delRange emulator complete task failed", zap.Error(err))
return errors.Trace(err)
}
startKey, endKey := r.Range()
logutil.DDLLogger().Info("delRange emulator complete task",
zap.Int64("jobID", r.JobID),
zap.Int64("elementID", r.ElementID),
zap.Stringer("startKey", startKey),
zap.Stringer("endKey", endKey))
break
}
if err := util.UpdateDeleteRange(sctx, r, newStartKey, oldStartKey); err != nil {
logutil.DDLLogger().Error("delRange emulator update task failed", zap.Error(err))
}
oldStartKey = newStartKey
}
return nil
}
// insertJobIntoDeleteRangeTable parses the job into delete-range arguments,
// and inserts a new record into gc_delete_range table. The primary key is
// (job ID, element ID), so we ignore key conflict error.
func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrapper, job *model.Job, ea *elementIDAlloc) error {
if err := wrapper.UpdateTSOForJob(); err != nil {
return errors.Trace(err)
}
ctx = kv.WithInternalSourceType(ctx, getDDLRequestSource(job.Type))
switch job.Type {
case model.ActionDropSchema:
args, err := model.GetFinishedDropSchemaArgs(job)
if err != nil {
return errors.Trace(err)
}
tableIDs := args.AllDroppedTableIDs
for i := 0; i < len(tableIDs); i += batchInsertDeleteRangeSize {
batchEnd := min(len(tableIDs), i+batchInsertDeleteRangeSize)
if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, tableIDs[i:batchEnd], ea, "drop schema: table IDs"); err != nil {
return errors.Trace(err)
}
}
case model.ActionDropTable:
tableID := job.TableID
// The startKey here is for compatibility with previous versions, old version did not endKey so don't have to deal with.
args, err := model.GetFinishedDropTableArgs(job)
if err != nil {
return errors.Trace(err)
}
physicalTableIDs := args.OldPartitionIDs
if len(physicalTableIDs) > 0 {
if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, physicalTableIDs, ea, "drop table: partition table IDs"); err != nil {
return errors.Trace(err)
}
// logical table may contain global index regions, so delete the logical table range.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "drop table: table ID"))
case model.ActionTruncateTable:
tableID := job.TableID
args, err := model.GetFinishedTruncateTableArgs(job)
if err != nil {
return errors.Trace(err)
}
oldPartitionIDs := args.OldPartitionIDs
if len(oldPartitionIDs) > 0 {
if err := doBatchDeleteTablesRange(ctx, wrapper, job.ID, oldPartitionIDs, ea, "truncate table: partition table IDs"); err != nil {
return errors.Trace(err)
}
}
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition:
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "drop partition: physical table ID(s)"))
case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
// Delete dropped partitions, as well as replaced global indexes.
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
for _, idx := range args.OldGlobalIndexes {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, idx.TableID, []int64{idx.IndexID}, ea, "reorganize partition, replaced global indexes"); err != nil {
return errors.Trace(err)
}
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize partition: physical table ID(s)"))
case model.ActionTruncateTablePartition:
args, err := model.GetTruncateTableArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPartitionIDs, ea, "truncate partition: physical table ID(s)"))
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
args, err := model.GetFinishedModifyIndexArgs(job)
if err != nil {
return errors.Trace(err)
}
// Determine the physicalIDs to be added.
physicalIDs := []int64{job.TableID}
if len(args.PartitionIDs) > 0 {
physicalIDs = args.PartitionIDs
}
for _, indexArg := range args.IndexArgs {
// Determine the index IDs to be added.
tempIdxID := tablecodec.TempIndexPrefix | indexArg.IndexID
var indexIDs []int64
if job.State == model.JobStateRollbackDone {
indexIDs = []int64{indexArg.IndexID, tempIdxID}
} else {
indexIDs = []int64{tempIdxID}
}
if indexArg.IsGlobal {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "add index: physical table ID(s)"); err != nil {
return errors.Trace(err)
}
} else {
for _, pid := range physicalIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "add index: physical table ID(s)"); err != nil {
return errors.Trace(err)
}
}
}
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
args, err := model.GetFinishedModifyIndexArgs(job)
if err != nil {
return errors.Trace(err)
}
tableID := job.TableID
partitionIDs := args.PartitionIDs
indexIDs := []int64{args.IndexArgs[0].IndexID}
// partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table.
if len(partitionIDs) == 0 {
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, tableID, indexIDs, ea, "drop index: table ID"))
}
failpoint.Inject("checkDropGlobalIndex", func(val failpoint.Value) {
if val.(bool) {
panic("drop global index must not delete partition index range")
}
})
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "drop index: partition table ID"); err != nil {
return errors.Trace(err)
}
}
case model.ActionDropColumn:
args, err := model.GetTableColumnArgs(job)
if err != nil {
return errors.Trace(err)
}
if len(args.IndexIDs) > 0 {
if len(args.PartitionIDs) == 0 {
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, args.IndexIDs, ea, "drop column: table ID"))
}
for _, pid := range args.PartitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, args.IndexIDs, ea, "drop column: partition table ID"); err != nil {
return errors.Trace(err)
}
}
}
case model.ActionModifyColumn:
args, err := model.GetFinishedModifyColumnArgs(job)
if err != nil {
return errors.Trace(err)
}
indexIDs, partitionIDs := args.IndexIDs, args.PartitionIDs
if len(indexIDs) == 0 {
return nil
}
if len(partitionIDs) == 0 {
return doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "modify column: table ID")
}
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "modify column: partition table ID"); err != nil {
return errors.Trace(err)
}
}
}
return nil
}
func doBatchDeleteIndiceRange(ctx context.Context, wrapper DelRangeExecWrapper, jobID, tableID int64, indexIDs []int64, ea *elementIDAlloc, comment string) error {
logutil.DDLLogger().Info("insert into delete-range indices", zap.Int64("jobID", jobID), zap.Int64("tableID", tableID), zap.Int64s("indexIDs", indexIDs), zap.String("comment", comment))
var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
wrapper.PrepareParamsList(len(indexIDs) * 5)
tableID, ok := wrapper.RewriteTableID(tableID)
if !ok {
return nil
}
for i, indexID := range indexIDs {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
buf.WriteString(insertDeleteRangeSQLValue)
if i != len(indexIDs)-1 {
buf.WriteString(",")
}
elemID := ea.allocForIndexID(tableID, indexID)
wrapper.AppendParamsList(jobID, elemID, startKeyEncoded, endKeyEncoded)
}
return errors.Trace(wrapper.ConsumeDeleteRange(ctx, buf.String()))
}
func doBatchDeleteTablesRange(ctx context.Context, wrapper DelRangeExecWrapper, jobID int64, tableIDs []int64, ea *elementIDAlloc, comment string) error {
logutil.DDLLogger().Info("insert into delete-range table", zap.Int64("jobID", jobID), zap.Int64s("tableIDs", tableIDs), zap.String("comment", comment))
var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
wrapper.PrepareParamsList(len(tableIDs) * 5)
for i, tableID := range tableIDs {
tableID, ok := wrapper.RewriteTableID(tableID)
if !ok {
continue
}
startKey := tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
buf.WriteString(insertDeleteRangeSQLValue)
if i != len(tableIDs)-1 {
buf.WriteString(",")
}
elemID := ea.allocForPhysicalID(tableID)
wrapper.AppendParamsList(jobID, elemID, startKeyEncoded, endKeyEncoded)
}
return errors.Trace(wrapper.ConsumeDeleteRange(ctx, buf.String()))
}
// DelRangeExecWrapper consumes the delete ranges with the provided table ID(s) and index ID(s).
type DelRangeExecWrapper interface {
// generate a new tso for the next job
UpdateTSOForJob() error
// initialize the paramsList
PrepareParamsList(sz int)
// rewrite table id if necessary, used for BR
RewriteTableID(tableID int64) (int64, bool)
// (job_id, element_id, start_key, end_key, ts)
// ts is generated by delRangeExecWrapper itself
AppendParamsList(jobID, elemID int64, startKey, endKey string)
// consume the delete range. For TiDB Server, it insert rows into mysql.gc_delete_range.
ConsumeDeleteRange(ctx context.Context, sql string) error
}
// sessionDelRangeExecWrapper is a lightweight wrapper that implements the DelRangeExecWrapper interface and used for TiDB Server.
// It consumes the delete ranges by directly insert rows into mysql.gc_delete_range.
type sessionDelRangeExecWrapper struct {
sctx sessionctx.Context
ts uint64
// temporary values
paramsList []any
}
func newDelRangeExecWrapper(sctx sessionctx.Context) DelRangeExecWrapper {
return &sessionDelRangeExecWrapper{
sctx: sctx,
paramsList: nil,
}
}
func (sdr *sessionDelRangeExecWrapper) UpdateTSOForJob() error {
now, err := getNowTSO(sdr.sctx)
if err != nil {
return errors.Trace(err)
}
sdr.ts = now
return nil
}
func (sdr *sessionDelRangeExecWrapper) PrepareParamsList(sz int) {
sdr.paramsList = make([]any, 0, sz)
}
func (*sessionDelRangeExecWrapper) RewriteTableID(tableID int64) (int64, bool) {
return tableID, true
}
func (sdr *sessionDelRangeExecWrapper) AppendParamsList(jobID, elemID int64, startKey, endKey string) {
sdr.paramsList = append(sdr.paramsList, jobID, elemID, startKey, endKey, sdr.ts)
}
func (sdr *sessionDelRangeExecWrapper) ConsumeDeleteRange(ctx context.Context, sql string) error {
_, err := sdr.sctx.GetSQLExecutor().ExecuteInternal(ctx, sql, sdr.paramsList...)
sdr.paramsList = nil
return errors.Trace(err)
}
// getNowTS gets the current timestamp, in TSO.
func getNowTSO(ctx sessionctx.Context) (uint64, error) {
currVer, err := ctx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return 0, errors.Trace(err)
}
return currVer.Ver, nil
}