sanity_check: adapt checkDeleteRangeCnt() for multi-schema change (#35870)
ref pingcap/tidb#33078, ref pingcap/tidb#33392
This commit is contained in:
@ -20,53 +20,50 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/parser"
|
||||
"github.com/pingcap/tidb/parser/ast"
|
||||
"github.com/pingcap/tidb/parser/model"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/mathutil"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func checkRangeCntByTableIDs(physicalTableIDs []int64, cnt int64) {
|
||||
if len(physicalTableIDs) > 0 {
|
||||
if len(physicalTableIDs) != int(cnt) {
|
||||
panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(physicalTableIDs), cnt))
|
||||
}
|
||||
} else if cnt != 1 {
|
||||
panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", 1, cnt))
|
||||
}
|
||||
}
|
||||
|
||||
func checkRangeCntByTableIDsAndIndexIDs(partitionTableIDs []int64, indexIDs []int64, cnt int64) {
|
||||
if len(indexIDs) == 0 {
|
||||
return
|
||||
}
|
||||
expectedCnt := len(indexIDs)
|
||||
if len(partitionTableIDs) > 0 {
|
||||
expectedCnt *= len(partitionTableIDs)
|
||||
}
|
||||
if expectedCnt != int(cnt) {
|
||||
panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", expectedCnt, cnt))
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ddl) checkDeleteRangeCnt(job *model.Job) {
|
||||
sctx, _ := d.sessPool.get()
|
||||
actualCnt, err := queryDeleteRangeCnt(d.sessPool, job.ID)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "Not Supported") {
|
||||
return // For mock session, we don't support executing SQLs.
|
||||
}
|
||||
logutil.BgLogger().Error("query delete range count failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
expectedCnt, err := expectedDeleteRangeCnt(job)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("decode job's delete range count failed", zap.Error(err))
|
||||
panic(err)
|
||||
}
|
||||
if actualCnt != expectedCnt {
|
||||
panic(fmt.Sprintf("expect delete range count %d, actual count %d", expectedCnt, actualCnt))
|
||||
}
|
||||
}
|
||||
|
||||
func queryDeleteRangeCnt(sessPool *sessionPool, jobID int64) (int, error) {
|
||||
sctx, _ := sessPool.get()
|
||||
s, _ := sctx.(sqlexec.SQLExecutor)
|
||||
defer func() {
|
||||
d.sessPool.put(sctx)
|
||||
sessPool.put(sctx)
|
||||
}()
|
||||
|
||||
query := `select sum(cnt) from
|
||||
(select count(1) cnt from mysql.gc_delete_range where job_id = %? union all
|
||||
select count(1) cnt from mysql.gc_delete_range_done where job_id = %?) as gdr;`
|
||||
rs, err := s.ExecuteInternal(context.TODO(), query, job.ID, job.ID)
|
||||
rs, err := s.ExecuteInternal(context.TODO(), query, jobID, jobID)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "Not Supported") {
|
||||
return
|
||||
}
|
||||
panic(err)
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
defer func() {
|
||||
_ = rs.Close()
|
||||
@ -74,74 +71,88 @@ func (d *ddl) checkDeleteRangeCnt(job *model.Job) {
|
||||
req := rs.NewChunk(nil)
|
||||
err = rs.Next(context.TODO(), req)
|
||||
if err != nil {
|
||||
panic("should not happened, err:" + err.Error())
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
cnt, _ := req.GetRow(0).GetMyDecimal(0).ToInt()
|
||||
return int(cnt), nil
|
||||
}
|
||||
|
||||
func expectedDeleteRangeCnt(job *model.Job) (int, error) {
|
||||
switch job.Type {
|
||||
case model.ActionDropSchema:
|
||||
var tableIDs []int64
|
||||
if err := job.DecodeArgs(&tableIDs); err != nil {
|
||||
panic("should not happened")
|
||||
}
|
||||
if len(tableIDs) != int(cnt) {
|
||||
panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(tableIDs), cnt))
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
return len(tableIDs), nil
|
||||
case model.ActionDropTable, model.ActionTruncateTable:
|
||||
var startKey kv.Key
|
||||
var physicalTableIDs []int64
|
||||
var ruleIDs []string
|
||||
if err := job.DecodeArgs(&startKey, &physicalTableIDs, &ruleIDs); err != nil {
|
||||
panic("Error in drop/truncate table, please report a bug with this stack trace and how it happened")
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
checkRangeCntByTableIDs(physicalTableIDs, cnt)
|
||||
return mathutil.Max(len(physicalTableIDs), 1), nil
|
||||
case model.ActionDropTablePartition, model.ActionTruncateTablePartition:
|
||||
var physicalTableIDs []int64
|
||||
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
|
||||
panic("should not happened")
|
||||
}
|
||||
if len(physicalTableIDs) != int(cnt) {
|
||||
panic("should not happened" + fmt.Sprintf("expect count: %d, real count: %d", len(physicalTableIDs), cnt))
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
return len(physicalTableIDs), nil
|
||||
case model.ActionAddIndex, model.ActionAddPrimaryKey:
|
||||
var indexID int64
|
||||
var partitionIDs []int64
|
||||
if err := job.DecodeArgs(&indexID, &partitionIDs); err != nil {
|
||||
panic("should not happened")
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
checkRangeCntByTableIDs(partitionIDs, cnt)
|
||||
return mathutil.Max(len(partitionIDs), 1), nil
|
||||
case model.ActionDropIndex, model.ActionDropPrimaryKey:
|
||||
var indexName interface{}
|
||||
var indexID int64
|
||||
var partitionIDs []int64
|
||||
if err := job.DecodeArgs(&indexName, &indexID, &partitionIDs); err != nil {
|
||||
panic("should not happened")
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
checkRangeCntByTableIDsAndIndexIDs(partitionIDs, []int64{indexID}, cnt)
|
||||
return mathutil.Max(len(partitionIDs), 1), nil
|
||||
case model.ActionDropIndexes:
|
||||
var indexIDs []int64
|
||||
var partitionIDs []int64
|
||||
if err := job.DecodeArgs(&[]model.CIStr{}, &[]bool{}, &indexIDs, &partitionIDs); err != nil {
|
||||
panic("should not happened")
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt)
|
||||
physicalCnt := mathutil.Max(len(partitionIDs), 1)
|
||||
return physicalCnt * len(indexIDs), nil
|
||||
case model.ActionDropColumn:
|
||||
var colName model.CIStr
|
||||
var ifExists bool
|
||||
var indexIDs []int64
|
||||
var partitionIDs []int64
|
||||
if err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs); err != nil {
|
||||
panic("should not happened")
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt)
|
||||
physicalCnt := mathutil.Max(len(partitionIDs), 1)
|
||||
return physicalCnt * len(indexIDs), nil
|
||||
case model.ActionModifyColumn:
|
||||
var indexIDs []int64
|
||||
var partitionIDs []int64
|
||||
if err := job.DecodeArgs(&indexIDs, &partitionIDs); err != nil {
|
||||
panic("should not happened")
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
checkRangeCntByTableIDsAndIndexIDs(partitionIDs, indexIDs, cnt)
|
||||
physicalCnt := mathutil.Max(len(partitionIDs), 1)
|
||||
return physicalCnt * len(indexIDs), nil
|
||||
case model.ActionMultiSchemaChange:
|
||||
totalExpectedCnt := 0
|
||||
for _, sub := range job.MultiSchemaInfo.SubJobs {
|
||||
p := sub.ToProxyJob(job)
|
||||
cnt, err := expectedDeleteRangeCnt(p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
totalExpectedCnt += cnt
|
||||
}
|
||||
return totalExpectedCnt, nil
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// checkHistoryJobInTest does some sanity check to make sure something is correct after DDL complete.
|
||||
|
||||
Reference in New Issue
Block a user