Files
tidb/br/pkg/checksum/executor.go

426 lines
11 KiB
Go

// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package checksum
import (
"context"
"slices"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/distsql"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
// ExecutorBuilder is used to build a "kv.Request".
type ExecutorBuilder struct {
table *model.TableInfo
ts uint64
oldTable *metautil.Table
concurrency uint
backoffWeight int
oldKeyspace []byte
newKeyspace []byte
resourceGroupName string
requestSource util.RequestSource
}
// NewExecutorBuilder returns a new executor builder.
func NewExecutorBuilder(table *model.TableInfo, ts uint64) *ExecutorBuilder {
return &ExecutorBuilder{
table: table,
ts: ts,
concurrency: vardef.DefDistSQLScanConcurrency,
}
}
// SetOldTable set a old table info to the builder.
func (builder *ExecutorBuilder) SetOldTable(oldTable *metautil.Table) *ExecutorBuilder {
builder.oldTable = oldTable
return builder
}
// SetConcurrency set the concurrency of the checksum executing.
func (builder *ExecutorBuilder) SetConcurrency(conc uint) *ExecutorBuilder {
builder.concurrency = conc
return builder
}
// SetBackoffWeight set the backoffWeight of the checksum executing.
func (builder *ExecutorBuilder) SetBackoffWeight(backoffWeight int) *ExecutorBuilder {
builder.backoffWeight = backoffWeight
return builder
}
func (builder *ExecutorBuilder) SetOldKeyspace(keyspace []byte) *ExecutorBuilder {
builder.oldKeyspace = keyspace
return builder
}
func (builder *ExecutorBuilder) SetNewKeyspace(keyspace []byte) *ExecutorBuilder {
builder.newKeyspace = keyspace
return builder
}
func (builder *ExecutorBuilder) SetResourceGroupName(name string) *ExecutorBuilder {
builder.resourceGroupName = name
return builder
}
func (builder *ExecutorBuilder) SetRequestSource(reqSource util.RequestSource) *ExecutorBuilder {
builder.requestSource = reqSource
return builder
}
func (builder *ExecutorBuilder) SetExplicitRequestSourceType(name string) *ExecutorBuilder {
builder.requestSource.ExplicitRequestSourceType = name
return builder
}
// Build builds a checksum executor.
func (builder *ExecutorBuilder) Build() (*Executor, error) {
reqs, err := buildChecksumRequest(
builder.table,
builder.oldTable,
builder.ts,
builder.concurrency,
builder.oldKeyspace,
builder.newKeyspace,
builder.resourceGroupName,
builder.requestSource,
)
if err != nil {
return nil, errors.Trace(err)
}
return &Executor{reqs: reqs, backoffWeight: builder.backoffWeight}, nil
}
func buildChecksumRequest(
newTable *model.TableInfo,
oldTable *metautil.Table,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
resourceGroupName string,
requestSource util.RequestSource,
) ([]*kv.Request, error) {
var partDefs []model.PartitionDefinition
if part := newTable.Partition; part != nil {
partDefs = part.Definitions
}
reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS, concurrency,
oldKeyspace, newKeyspace, resourceGroupName, requestSource)
if err != nil {
return nil, errors.Trace(err)
}
reqs = append(reqs, rs...)
for _, partDef := range partDefs {
var oldPartID int64
if oldTable != nil {
oldPartID, err = utils.GetPartitionByName(oldTable.Info, partDef.Name)
if err != nil {
return nil, errors.Trace(err)
}
}
rs, err := buildRequest(newTable, partDef.ID, oldTable, oldPartID, startTS, concurrency,
oldKeyspace, newKeyspace, resourceGroupName, requestSource)
if err != nil {
return nil, errors.Trace(err)
}
reqs = append(reqs, rs...)
}
return reqs, nil
}
func buildRequest(
tableInfo *model.TableInfo,
tableID int64,
oldTable *metautil.Table,
oldTableID int64,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
resourceGroupName string,
requestSource util.RequestSource,
) ([]*kv.Request, error) {
reqs := make([]*kv.Request, 0)
req, err := buildTableRequest(tableInfo, tableID, oldTable, oldTableID, startTS, concurrency,
oldKeyspace, newKeyspace, resourceGroupName, requestSource)
if err != nil {
return nil, errors.Trace(err)
}
reqs = append(reqs, req)
for _, indexInfo := range tableInfo.Indices {
if indexInfo.State != model.StatePublic {
continue
}
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Info.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
break
}
}
if oldIndexInfo == nil {
log.Panic("index not found in origin table, "+
"please check the restore table has the same index info with origin table",
zap.Int64("table id", tableID),
zap.Stringer("table name", tableInfo.Name),
zap.Int64("origin table id", oldTableID),
zap.Stringer("origin table name", oldTable.Info.Name),
zap.Stringer("index name", indexInfo.Name))
}
}
req, err = buildIndexRequest(
tableID, indexInfo, oldTableID, oldIndexInfo, startTS, concurrency,
oldKeyspace, newKeyspace, resourceGroupName, requestSource)
if err != nil {
return nil, errors.Trace(err)
}
reqs = append(reqs, req)
}
return reqs, nil
}
func buildTableRequest(
tableInfo *model.TableInfo,
tableID int64,
oldTable *metautil.Table,
oldTableID int64,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
resourceGroupName string,
requestSource util.RequestSource,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldTable != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: slices.Concat(oldKeyspace, tablecodec.GenTableRecordPrefix(oldTableID)),
NewPrefix: slices.Concat(newKeyspace, tablecodec.GenTableRecordPrefix(tableID)),
}
}
checksum := &tipb.ChecksumRequest{
ScanOn: tipb.ChecksumScanOn_Table,
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
Rule: rule,
}
var ranges []*ranger.Range
if tableInfo.IsCommonHandle {
ranges = ranger.FullNotNullRange()
} else {
ranges = ranger.FullIntRange(false)
}
var builder distsql.RequestBuilder
// Use low priority to reducing impact to other requests.
builder.Request.Priority = kv.PriorityLow
return builder.SetHandleRanges(nil, tableID, tableInfo.IsCommonHandle, ranges).
SetStartTS(startTS).
SetChecksumRequest(checksum).
SetConcurrency(int(concurrency)).
SetResourceGroupName(resourceGroupName).
SetRequestSource(requestSource).
Build()
}
func buildIndexRequest(
tableID int64,
indexInfo *model.IndexInfo,
oldTableID int64,
oldIndexInfo *model.IndexInfo,
startTS uint64,
concurrency uint,
oldKeyspace []byte,
newKeyspace []byte,
resourceGroupName string,
requestSource util.RequestSource,
) (*kv.Request, error) {
var rule *tipb.ChecksumRewriteRule
if oldIndexInfo != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: slices.Concat(oldKeyspace, tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)),
NewPrefix: slices.Concat(newKeyspace, tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)),
}
}
checksum := &tipb.ChecksumRequest{
ScanOn: tipb.ChecksumScanOn_Index,
Algorithm: tipb.ChecksumAlgorithm_Crc64_Xor,
Rule: rule,
}
ranges := ranger.FullRange()
var builder distsql.RequestBuilder
// Use low priority to reducing impact to other requests.
builder.Request.Priority = kv.PriorityLow
return builder.SetIndexRanges(nil, tableID, indexInfo.ID, ranges).
SetStartTS(startTS).
SetChecksumRequest(checksum).
SetConcurrency(int(concurrency)).
SetResourceGroupName(resourceGroupName).
SetRequestSource(requestSource).
Build()
}
func sendChecksumRequest(
ctx context.Context, client kv.Client, req *kv.Request, vars *kv.Variables,
) (resp *tipb.ChecksumResponse, err error) {
res, err := distsql.Checksum(ctx, client, req, vars)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err1 := res.Close(); err1 != nil {
err = err1
}
}()
resp = &tipb.ChecksumResponse{}
for {
data, err := res.NextRaw(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if data == nil {
break
}
checksum := &tipb.ChecksumResponse{}
if err = checksum.Unmarshal(data); err != nil {
return nil, errors.Trace(err)
}
updateChecksumResponse(resp, checksum)
}
return resp, nil
}
func updateChecksumResponse(resp, update *tipb.ChecksumResponse) {
resp.Checksum ^= update.Checksum
resp.TotalKvs += update.TotalKvs
resp.TotalBytes += update.TotalBytes
}
// Executor is a checksum executor.
type Executor struct {
reqs []*kv.Request
backoffWeight int
}
// Len returns the total number of checksum requests.
func (exec *Executor) Len() int {
return len(exec.reqs)
}
// Each executes the function to each requests in the executor.
func (exec *Executor) Each(f func(*kv.Request) error) error {
for _, req := range exec.reqs {
err := f(req)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
// RawRequests extracts the raw requests associated with this executor.
// This is mainly used for debugging only.
func (exec *Executor) RawRequests() ([]*tipb.ChecksumRequest, error) {
res := make([]*tipb.ChecksumRequest, 0, len(exec.reqs))
for _, req := range exec.reqs {
rawReq := new(tipb.ChecksumRequest)
if err := proto.Unmarshal(req.Data, rawReq); err != nil {
return nil, errors.Trace(err)
}
res = append(res, rawReq)
}
return res, nil
}
// Execute executes a checksum executor.
func (exec *Executor) Execute(
ctx context.Context,
client kv.Client,
updateFn func(),
) (*tipb.ChecksumResponse, error) {
checksumResp := &tipb.ChecksumResponse{}
for _, req := range exec.reqs {
// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
//
// It is useful in TiDB, however, it's a place holder in BR.
killed := uint32(0)
var (
resp *tipb.ChecksumResponse
err error
)
err = utils.WithRetry(ctx, func() error {
vars := kv.NewVariables(&killed)
if exec.backoffWeight > 0 {
vars.BackOffWeight = exec.backoffWeight
}
resp, err = sendChecksumRequest(ctx, client, req, vars)
failpoint.Inject("checksumRetryErr", func(val failpoint.Value) {
// first time reach here. return error
if val.(bool) {
err = errors.New("inject checksum error")
}
})
if err != nil {
return errors.Trace(err)
}
return nil
}, utils.NewChecksumBackoffStrategy())
if err != nil {
return nil, errors.Trace(err)
}
updateChecksumResponse(checksumResp, resp)
updateFn()
}
return checksumResp, checkContextDone(ctx)
}
// The coprocessor won't return the error if the context is done,
// so sometimes BR would get the incomplete result.
// checkContextDone makes sure the result is not affected by CONTEXT DONE.
func checkContextDone(ctx context.Context) error {
ctxErr := ctx.Err()
if ctxErr != nil {
return errors.Annotate(ctxErr, "context is cancelled by other error")
}
return nil
}